You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@htrace.apache.org by cm...@apache.org on 2014/12/22 01:22:00 UTC

incubator-htrace git commit: HTRACE-18. Add htrace-flume, which implements a SpanReceiver that sends spans to Flume (Long Zhou via Colin P. McCabe)

Repository: incubator-htrace
Updated Branches:
  refs/heads/master d84df8f33 -> fa7a97fde


HTRACE-18. Add htrace-flume, which implements a SpanReceiver that sends spans to Flume (Long Zhou via Colin P. McCabe)


Project: http://git-wip-us.apache.org/repos/asf/incubator-htrace/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-htrace/commit/fa7a97fd
Tree: http://git-wip-us.apache.org/repos/asf/incubator-htrace/tree/fa7a97fd
Diff: http://git-wip-us.apache.org/repos/asf/incubator-htrace/diff/fa7a97fd

Branch: refs/heads/master
Commit: fa7a97fde8d339a114be1227eb877905d735e718
Parents: d84df8f
Author: Colin P. Mccabe <cm...@apache.org>
Authored: Sun Dec 21 16:18:58 2014 -0800
Committer: Colin P. Mccabe <cm...@apache.org>
Committed: Sun Dec 21 16:18:58 2014 -0800

----------------------------------------------------------------------
 .../src/main/java/org/apache/htrace/Span.java   |   5 +
 .../htrace/impl/LocalFileSpanReceiver.java      |  17 +-
 .../java/org/apache/htrace/impl/MilliSpan.java  |  25 ++
 htrace-flume/README.md                          |  59 ++++
 htrace-flume/pom.xml                            | 108 +++++++
 .../apache/htrace/impl/FlumeSpanReceiver.java   | 283 +++++++++++++++++++
 .../htrace/impl/TestFlumeSpanReceiver.java      | 176 ++++++++++++
 .../htrace/impl/TestHBaseSpanReceiver.java      |   3 +
 pom.xml                                         |   1 +
 9 files changed, 661 insertions(+), 16 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-htrace/blob/fa7a97fd/htrace-core/src/main/java/org/apache/htrace/Span.java
----------------------------------------------------------------------
diff --git a/htrace-core/src/main/java/org/apache/htrace/Span.java b/htrace-core/src/main/java/org/apache/htrace/Span.java
index 633bcba..b08cfc8 100644
--- a/htrace-core/src/main/java/org/apache/htrace/Span.java
+++ b/htrace-core/src/main/java/org/apache/htrace/Span.java
@@ -113,4 +113,9 @@ public interface Span {
    * @return
    */
   String getProcessId();
+
+  /**
+   * Serialize to Json
+   */
+  String toJson();
 }

http://git-wip-us.apache.org/repos/asf/incubator-htrace/blob/fa7a97fd/htrace-core/src/main/java/org/apache/htrace/impl/LocalFileSpanReceiver.java
----------------------------------------------------------------------
diff --git a/htrace-core/src/main/java/org/apache/htrace/impl/LocalFileSpanReceiver.java b/htrace-core/src/main/java/org/apache/htrace/impl/LocalFileSpanReceiver.java
index 627b758..7095008 100644
--- a/htrace-core/src/main/java/org/apache/htrace/impl/LocalFileSpanReceiver.java
+++ b/htrace-core/src/main/java/org/apache/htrace/impl/LocalFileSpanReceiver.java
@@ -21,13 +21,10 @@ import org.apache.commons.logging.LogFactory;
 import org.apache.htrace.HTraceConfiguration;
 import org.apache.htrace.Span;
 import org.apache.htrace.SpanReceiver;
-import org.mortbay.util.ajax.JSON;
 
 import java.io.BufferedWriter;
 import java.io.FileWriter;
 import java.io.IOException;
-import java.util.LinkedHashMap;
-import java.util.Map;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.LinkedBlockingQueue;
 import java.util.concurrent.ThreadPoolExecutor;
@@ -48,7 +45,6 @@ public class LocalFileSpanReceiver implements SpanReceiver {
   private String file;
   private FileWriter fwriter;
   private BufferedWriter bwriter;
-  private Map<String, Object> values;
   private ExecutorService executor;
   private long executorTerminationTimeoutDuration;
 
@@ -67,7 +63,6 @@ public class LocalFileSpanReceiver implements SpanReceiver {
       throw new RuntimeException(ioe);
     }
     this.bwriter = new BufferedWriter(fwriter);
-    this.values = new LinkedHashMap<String, Object>();
   }
 
 
@@ -81,19 +76,9 @@ public class LocalFileSpanReceiver implements SpanReceiver {
     @Override
     public void run() {
       try {
-        values.put("TraceID", span.getTraceId());
-        values.put("SpanID", span.getSpanId());
-        values.put("ParentID", span.getParentId());
-        values.put("ProcessID", span.getProcessId());
-        values.put("Start", span.getStartTimeMillis());
-        values.put("Stop", span.getStopTimeMillis());
-        values.put("Description", span.getDescription());
-        values.put("KVAnnotations", span.getKVAnnotations());
-        values.put("TLAnnotations", span.getTimelineAnnotations());
-        bwriter.write(JSON.toString(values));
+        bwriter.write(span.toJson());
         bwriter.newLine();
         bwriter.flush();
-        values.clear();
       } catch (IOException e) {
         LOG.error("Error when writing to file: " + file, e);
       }

http://git-wip-us.apache.org/repos/asf/incubator-htrace/blob/fa7a97fd/htrace-core/src/main/java/org/apache/htrace/impl/MilliSpan.java
----------------------------------------------------------------------
diff --git a/htrace-core/src/main/java/org/apache/htrace/impl/MilliSpan.java b/htrace-core/src/main/java/org/apache/htrace/impl/MilliSpan.java
index 9d24f68..f313e61 100644
--- a/htrace-core/src/main/java/org/apache/htrace/impl/MilliSpan.java
+++ b/htrace-core/src/main/java/org/apache/htrace/impl/MilliSpan.java
@@ -19,10 +19,12 @@ package org.apache.htrace.impl;
 import org.apache.htrace.Span;
 import org.apache.htrace.TimelineAnnotation;
 import org.apache.htrace.Tracer;
+import org.mortbay.util.ajax.JSON;
 
 import java.util.ArrayList;
 import java.util.Collections;
 import java.util.HashMap;
+import java.util.LinkedHashMap;
 import java.util.List;
 import java.util.Map;
 import java.util.Random;
@@ -158,4 +160,27 @@ public class MilliSpan implements Span {
   public String getProcessId() {
     return processId;
   }
+  
+  @Override
+  public String toJson() {
+    Map<String, Object> values = new LinkedHashMap<String, Object>();
+    values.put("TraceID", traceId);
+    values.put("SpanID", spanId);
+    values.put("ParentID", parentSpanId);
+    if (processId != null) {
+      values.put("ProcessID", processId);
+    }
+    values.put("Start", start);
+    values.put("Stop", stop);
+    if (description != null) {
+      values.put("Description", description);
+    }
+    if (timeline != null) {
+      values.put("TLAnnotations", timeline);
+    }
+    if (traceInfo != null){
+      values.put("KVAnnotations", traceInfo);
+    }
+    return JSON.toString(values);
+  }
 }

http://git-wip-us.apache.org/repos/asf/incubator-htrace/blob/fa7a97fd/htrace-flume/README.md
----------------------------------------------------------------------
diff --git a/htrace-flume/README.md b/htrace-flume/README.md
new file mode 100644
index 0000000..32bbb88
--- /dev/null
+++ b/htrace-flume/README.md
@@ -0,0 +1,59 @@
+htrace-flume
+============
+
+htrace-flume provides the span receiver which sends tracing spans to Flume collector.
+
+Tutorial
+--------
+
+1) build and deploy
+
+	$ cd htrace/htrace-flume
+	$ mvn compile assembly:single
+	$ cp target/htrace-flume-*-jar-with-dependencies.jar $HADOOP_HOME/share/hadoop/hdfs/lib/
+
+2) Edit hdfs-site.xml to include the following:
+
+	<property>
+		<name>hadoop.trace.spanreceiver.classes</name>
+		<value>org.htrace.impl.FlumeSpanReceiver</value>
+	</property>
+	<property>
+		<name>hadoop.htrace.flume.hostname</name>
+		<value>127.0.0.1</value>
+	</property>
+	<property>
+		<name>hadoop.htrace.flume.port</name>
+		<value>60000</value>
+	</property>
+
+3) Setup flume collector
+
+Create flume-conf.properties file. Below is a sample that sets up an hdfs sink.
+
+	agent.sources = avro-collection-source
+	agent.channels = memoryChannel
+	agent.sinks = loggerSink hdfs-sink
+
+	# avro source - should match the configurations in hdfs-site.xml
+	agent.sources.avro-collection-source.type = avro
+	agent.sources.avro-collection-source.bind = 127.0.0.1
+	agent.sources.avro-collection-source.port = 60000
+	agent.sources.avro-collection-source.channels = memoryChannel
+
+	#sample hdfs-sink, change to any sink that flume supports
+	agent.sinks.hdfs-sink.type = hdfs
+	agent.sinks.hdfs-sink.hdfs.path = hdfs://127.0.0.1:9000/flume
+	agent.sinks.hdfs-sink.channel = memoryChannel
+	agent.sinks.hdfs-sink.hdfs.fileType = DataStream
+	agent.sinks.hdfs-sink.hdfs.writeFormat = Text
+	agent.sinks.hdfs-sink.hdfs.rollSize = 0
+	agent.sinks.hdfs-sink.hdfs.rollCount = 10000
+	agent.sinks.hdfs-sink.hdfs.batchSize = 100
+
+	# memory channel
+	agent.channels.memoryChannel.capacity = 10000
+	agent.channels.memoryChannel.transactionCapacity = 1000
+
+Run flume agent using command "flume-ng agent -c ./conf/ -f ./conf/flume-conf.properties -n agent"
+

http://git-wip-us.apache.org/repos/asf/incubator-htrace/blob/fa7a97fd/htrace-flume/pom.xml
----------------------------------------------------------------------
diff --git a/htrace-flume/pom.xml b/htrace-flume/pom.xml
new file mode 100644
index 0000000..1bf258b
--- /dev/null
+++ b/htrace-flume/pom.xml
@@ -0,0 +1,108 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!-- Licensed to the Apache Software Foundation (ASF) under one or more contributor
+license agreements. See the NOTICE file distributed with this work for additional
+information regarding copyright ownership. The ASF licenses this file to
+You under the Apache License, Version 2.0 (the "License"); you may not use
+this file except in compliance with the License. You may obtain a copy of
+the License at http://www.apache.org/licenses/LICENSE-2.0 Unless required
+by applicable law or agreed to in writing, software distributed under the
+License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS
+OF ANY KIND, either express or implied. See the License for the specific
+language governing permissions and limitations under the License. -->
+<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+  <modelVersion>4.0.0</modelVersion>
+
+  <artifactId>htrace-flume</artifactId>
+  <packaging>jar</packaging>
+
+  <parent>
+    <artifactId>htrace</artifactId>
+    <groupId>org.apache.htrace</groupId>
+    <version>3.1.0-SNAPSHOT</version>
+  </parent>
+
+  <name>htrace-flume</name>
+  <description>A 'SpanReceiver' implementation that sends spans to Flume collector.</description>
+  <url>http://incubator.apache.org/projects/htrace.html</url>
+
+  <properties>
+    <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
+    <flume.version>1.5.2</flume.version>
+  </properties>
+
+  <build>
+    <plugins>
+      <plugin>
+        <groupId>org.apache.maven.plugins</groupId>
+        <artifactId>maven-source-plugin</artifactId>
+      </plugin>
+      <plugin>
+        <artifactId>maven-javadoc-plugin</artifactId>
+      </plugin>
+      <plugin>
+        <artifactId>maven-compiler-plugin</artifactId>
+      </plugin>
+      <plugin>
+        <groupId>org.apache.maven.plugins</groupId>
+        <artifactId>maven-gpg-plugin</artifactId>
+      </plugin>
+      <plugin>
+        <groupId>org.apache.rat</groupId>
+        <artifactId>apache-rat-plugin</artifactId>
+      </plugin>
+      <plugin>
+        <!-- explicitly define maven-deploy-plugin after other to force exec order -->
+        <artifactId>maven-deploy-plugin</artifactId>
+      </plugin>
+      <plugin>
+        <artifactId>maven-assembly-plugin</artifactId>
+        <configuration>
+          <descriptorRefs>
+            <descriptorRef>jar-with-dependencies</descriptorRef>
+          </descriptorRefs>
+        </configuration>
+      </plugin>
+    </plugins>
+  </build>
+
+  <dependencies>
+    <!-- Module deps. -->
+    <dependency>
+      <groupId>org.apache.htrace</groupId>
+      <artifactId>htrace-core</artifactId>
+      <version>${project.version}</version>
+      <scope>provided</scope>
+    </dependency>
+    <dependency>
+      <groupId>org.apache.htrace</groupId>
+      <artifactId>htrace-core</artifactId>
+      <version>${project.version}</version>
+      <classifier>tests</classifier>
+      <scope>test</scope>
+    </dependency>
+    <!-- Global deps. -->
+    <dependency>
+      <groupId>commons-logging</groupId>
+      <artifactId>commons-logging</artifactId>
+      <scope>provided</scope>
+    </dependency>
+    <dependency>
+      <groupId>junit</groupId>
+      <artifactId>junit</artifactId>
+      <scope>test</scope>
+    </dependency>
+    <!-- Flume specific deps. -->
+    <dependency>
+      <groupId>org.apache.flume</groupId>
+      <artifactId>flume-ng-sdk</artifactId>
+      <version>${flume.version}</version>
+    </dependency>
+    <dependency>
+      <groupId>org.apache.flume</groupId>
+      <artifactId>flume-ng-sdk</artifactId>
+      <version>${flume.version}</version>
+      <classifier>tests</classifier>
+      <scope>test</scope>
+    </dependency>
+  </dependencies>
+</project>

http://git-wip-us.apache.org/repos/asf/incubator-htrace/blob/fa7a97fd/htrace-flume/src/main/java/org/apache/htrace/impl/FlumeSpanReceiver.java
----------------------------------------------------------------------
diff --git a/htrace-flume/src/main/java/org/apache/htrace/impl/FlumeSpanReceiver.java b/htrace-flume/src/main/java/org/apache/htrace/impl/FlumeSpanReceiver.java
new file mode 100644
index 0000000..54b8a14
--- /dev/null
+++ b/htrace-flume/src/main/java/org/apache/htrace/impl/FlumeSpanReceiver.java
@@ -0,0 +1,283 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.htrace.impl;
+
+import java.io.IOException;
+import java.nio.charset.Charset;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.ArrayBlockingQueue;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ThreadFactory;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicLong;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.flume.Event;
+import org.apache.flume.FlumeException;
+import org.apache.flume.api.RpcClient;
+import org.apache.flume.api.RpcClientFactory;
+import org.apache.flume.event.EventBuilder;
+import org.apache.htrace.HTraceConfiguration;
+import org.apache.htrace.Span;
+import org.apache.htrace.SpanReceiver;
+
+public class FlumeSpanReceiver implements SpanReceiver {
+  private static final Log LOG = LogFactory.getLog(FlumeSpanReceiver.class);
+
+  public static final String NUM_THREADS_KEY = "htrace.flume.num-threads";
+  public static final int DEFAULT_NUM_THREADS = 1;
+  public static final String FLUME_HOSTNAME_KEY = "htrace.flume.hostname";
+  public static final String DEFAULT_FLUME_HOSTNAME = "localhost";
+  public static final String FLUME_PORT_KEY = "htrace.flume.port";
+  public static final String FLUME_BATCHSIZE_KEY = "htrace.flume.batchsize";
+  public static final int DEFAULT_FLUME_BATCHSIZE = 100;
+  
+  /**
+   * How long this receiver will try and wait for all threads to shutdown.
+   */
+  private static final int SHUTDOWN_TIMEOUT = 30;
+
+  /**
+   * How many errors in a row before we start dropping traces on the floor.
+   */
+  private static final int MAX_ERRORS = 10;
+
+  /**
+   * The queue that will get all HTrace spans that are to be sent.
+   */
+  private final BlockingQueue<Span> queue;
+
+  /**
+   * Boolean used to signal that the threads should end.
+   */
+  private final AtomicBoolean running = new AtomicBoolean(true);
+
+  /**
+   * The thread factory used to create new ExecutorService.
+   * <p/>
+   * This will be the same factory for the lifetime of this object so that
+   * no thread names will ever be duplicated.
+   */
+  private final ThreadFactory tf;
+
+  ////////////////////
+  /// Variables that will change on each call to configure()
+  ///////////////////
+  private ExecutorService service;
+  private int maxSpanBatchSize;
+  private String flumeHostName;
+  private int flumePort;
+
+  public FlumeSpanReceiver(HTraceConfiguration conf) {
+    this.queue = new ArrayBlockingQueue<Span>(1000);
+    this.tf = new SimpleThreadFactory();
+    configure(conf);
+  }
+
+  private class SimpleThreadFactory implements ThreadFactory {
+    final AtomicLong count = new AtomicLong(0);
+    @Override
+    public Thread newThread(Runnable arg0) {
+      String name = String.format("flumeSpanReceiver-%d", count.getAndIncrement());
+      Thread t = new Thread(arg0, name);
+      t.setDaemon(true);
+      return t;
+    }
+  }
+
+  private void configure (HTraceConfiguration conf) {
+
+    // Read configuration
+    int numThreads = conf.getInt(NUM_THREADS_KEY, DEFAULT_NUM_THREADS);
+    this.flumeHostName = conf.get(FLUME_HOSTNAME_KEY, DEFAULT_FLUME_HOSTNAME);
+    this.flumePort = conf.getInt(FLUME_PORT_KEY, 0);
+    if (this.flumePort == 0) {
+      throw new IllegalArgumentException(FLUME_PORT_KEY + " is required in configuration.");
+    }
+    this.maxSpanBatchSize = conf.getInt(FLUME_BATCHSIZE_KEY, DEFAULT_FLUME_BATCHSIZE);
+
+    // Initialize executors
+    // If there are already threads running tear them down.
+    if (this.service != null) {
+      this.service.shutdownNow();
+      this.service = null;
+    }
+    this.service = Executors.newFixedThreadPool(numThreads, tf);
+    for (int i = 0; i < numThreads; i++) {
+      this.service.submit(new WriteSpanRunnable());
+    }
+  }
+
+  private class WriteSpanRunnable implements Runnable {
+    private RpcClient flumeClient = null;
+
+    /**
+     * This runnable sends a HTrace span to the Flume.
+     */
+    @Override
+    public void run() {
+      List<Span> dequeuedSpans = new ArrayList<Span>(maxSpanBatchSize);
+      long errorCount = 0;
+
+      while (running.get() || queue.size() > 0) {
+        Span firstSpan = null;
+        try {
+          // Block for up to a second. to try and get a span.
+          // We only block for a little bit in order to notice
+          // if the running value has changed
+          firstSpan = queue.poll(1, TimeUnit.SECONDS);
+
+          // If the poll was successful then it's possible that there
+          // will be other spans to get. Try and get them.
+          if (firstSpan != null) {
+            // Add the first one that we got
+            dequeuedSpans.add(firstSpan);
+            // Try and get up to 100 queues
+            queue.drainTo(dequeuedSpans, maxSpanBatchSize - 1);
+          }
+        } catch (InterruptedException ie) {
+          // Ignored.
+        }
+
+        startClient();
+        if (dequeuedSpans.isEmpty()) {
+          continue;
+        }
+
+        try {
+          List<Event> events = new ArrayList<Event>(dequeuedSpans.size());
+          for (Span span : dequeuedSpans) {
+            // Headers allow Flume to filter
+            Map<String, String> headers = new HashMap<String, String>();
+            headers.put("TraceId",      Long.toString(span.getTraceId()));
+            headers.put("SpanId",       Long.toString(span.getSpanId()));
+            headers.put("ProcessId",    span.getProcessId());
+            headers.put("Description",  span.getDescription());
+
+            String body = span.toJson();
+
+            Event evt = EventBuilder.withBody(body, Charset.forName("UTF-8"), headers);
+            events.add(evt);
+          }
+          flumeClient.appendBatch(events);
+
+          // clear the list for the next time through.
+          dequeuedSpans.clear();
+          // reset the error counter.
+          errorCount = 0;
+        } catch (Exception e) {
+          errorCount += 1;
+          // If there have been ten errors in a row start dropping things.
+          if (errorCount < MAX_ERRORS) {
+            try {
+              queue.addAll(dequeuedSpans);
+            } catch (IllegalStateException ex) {
+              LOG.error("Drop " + dequeuedSpans.size() +
+                        " span(s) because writing to HBase failed.");
+            }
+          }
+          closeClient();
+          try {
+            // Since there was an error sleep just a little bit to try and allow the
+            // HBase some time to recover.
+            Thread.sleep(500);
+          } catch (InterruptedException e1) {
+            // Ignored
+          }
+        }
+      }
+      closeClient();
+    }
+
+    /**
+     * Close Flume RPC client
+     */
+    private void closeClient() {
+      if (flumeClient != null) {
+        try {
+          flumeClient.close();
+        } catch (FlumeException ex) {
+          LOG.warn("Error while trying to close Flume Rpc Client.", ex);
+        } finally {
+          flumeClient = null;
+        }
+      }
+    }
+
+    /**
+     * Create / reconnect Flume RPC client
+     */
+    private void startClient() {
+      // If current client is inactive, close it
+      if (flumeClient != null && !flumeClient.isActive()) {
+        flumeClient.close();
+        flumeClient = null;
+      }
+      // Create client if needed
+      if (flumeClient == null) {
+        try {
+          flumeClient = RpcClientFactory.getDefaultInstance(flumeHostName, flumePort, maxSpanBatchSize);
+        } catch (FlumeException e) {
+          LOG.warn("Failed to create Flume RPC Client. " + e.getMessage());
+        }
+      }
+    }
+  }
+
+  /**
+   * Close the receiver.
+   * <p/>
+   * This tries to shutdown thread pool.
+   *
+   * @throws IOException
+   */
+  @Override
+  public void close() throws IOException {
+    running.set(false);
+    service.shutdown();
+    try {
+      if (!service.awaitTermination(SHUTDOWN_TIMEOUT, TimeUnit.SECONDS)) {
+        LOG.error("Was not able to process all remaining spans upon closing in: " +
+            SHUTDOWN_TIMEOUT + " " + TimeUnit.SECONDS +
+            ". Left Spans could be dropped.");
+       }
+    } catch (InterruptedException e1) {
+      LOG.warn("Thread interrupted when terminating executor.", e1);
+    }
+  }
+
+  @Override
+  public void receiveSpan(Span span) {
+    if (running.get()) {
+      try {
+        this.queue.add(span);
+      } catch (IllegalStateException e) {
+        LOG.error("Error trying to append span (" +
+            span.getDescription() +
+            ") to the queue. Blocking Queue was full.");
+      }
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-htrace/blob/fa7a97fd/htrace-flume/src/test/java/org/apache/htrace/impl/TestFlumeSpanReceiver.java
----------------------------------------------------------------------
diff --git a/htrace-flume/src/test/java/org/apache/htrace/impl/TestFlumeSpanReceiver.java b/htrace-flume/src/test/java/org/apache/htrace/impl/TestFlumeSpanReceiver.java
new file mode 100644
index 0000000..a825690
--- /dev/null
+++ b/htrace-flume/src/test/java/org/apache/htrace/impl/TestFlumeSpanReceiver.java
@@ -0,0 +1,176 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.htrace.impl;
+
+import java.io.IOException;
+import java.nio.charset.Charset;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+import junit.framework.Assert;
+
+import org.apache.avro.AvroRemoteException;
+import org.apache.avro.ipc.Server;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.flume.EventDeliveryException;
+import org.apache.flume.FlumeException;
+import org.apache.flume.api.RpcTestUtils;
+import org.apache.flume.source.avro.AvroFlumeEvent;
+import org.apache.flume.source.avro.AvroSourceProtocol;
+import org.apache.flume.source.avro.Status;
+import org.apache.htrace.HTraceConfiguration;
+import org.apache.htrace.Span;
+import org.apache.htrace.SpanReceiver;
+import org.apache.htrace.Trace;
+import org.apache.htrace.TraceCreator;
+import org.junit.AfterClass;
+import org.junit.BeforeClass;
+import org.junit.Test;
+
+public class TestFlumeSpanReceiver {
+  private static final Log LOG = LogFactory.getLog(TestFlumeSpanReceiver.class);
+
+  private static final String ROOT_SPAN_DESC = "ROOT";
+
+  private SpanReceiver spanReceiver;
+  private Server flumeServer;
+  private TraceCreator traceCreator;
+
+  @BeforeClass
+  public static void setUpBeforeClass() throws Exception {
+  }
+
+  @AfterClass
+  public static void tearDownAfterClass() throws Exception {
+  }
+
+  @Test
+  public void testSimpleTraces() throws FlumeException,
+      EventDeliveryException, IOException {
+    AvroHandler avroHandler = null;
+    List<Span> spans = null;
+    try {
+      avroHandler = new AvroHandler();
+      startReceiver(null, avroHandler);
+      
+      spans = new ArrayList<Span>();
+      Span rootSpan = new MilliSpan(ROOT_SPAN_DESC, 1, Span.ROOT_SPAN_ID, 100, "test");
+      Span innerOne = rootSpan.child("Some good work");
+      Span innerTwo = innerOne.child("Some more good work");
+      innerTwo.stop();
+      spans.add(innerTwo);
+      innerOne.stop();
+      spans.add(innerOne);
+      rootSpan.addKVAnnotation("foo".getBytes(), "bar".getBytes());
+      rootSpan.addTimelineAnnotation("timeline");
+      rootSpan.stop();
+      spans.add(rootSpan);
+
+    } finally {
+      stopReceiver();
+    }
+    List<AvroFlumeEvent> events = avroHandler.getAllEvents();
+    Assert.assertEquals(spans.size(), events.size());
+    for (int i = 0; i < spans.size(); i ++) {
+      String json = new String(events.get(i).getBody().array(), Charset.forName("UTF-8"));
+      Assert.assertTrue(json.contains(spans.get(i).getDescription()));
+    }
+  }
+
+  @Test
+  public void testConcurrency() throws FlumeException,
+      EventDeliveryException, IOException {
+    try {
+      Map<String, String> extraConf = new HashMap<String, String>();
+      extraConf.put(FlumeSpanReceiver.NUM_THREADS_KEY, "5");
+      startReceiver(extraConf, new RpcTestUtils.OKAvroHandler());
+      traceCreator.createThreadedTrace();
+    } finally {
+      stopReceiver();
+    }
+  }
+
+  @Test
+  public void testResilience() throws FlumeException,
+      EventDeliveryException, IOException {
+    try {
+      startReceiver(null, new RpcTestUtils.FailedAvroHandler());
+      traceCreator.createThreadedTrace();
+    } finally {
+      stopReceiver();
+    }
+  }
+
+  private void startReceiver(Map<String, String> extraConf, AvroSourceProtocol avroHandler) {
+    // Start Flume server
+    Assert.assertNull(flumeServer);
+    flumeServer = RpcTestUtils.startServer(avroHandler);
+
+    // Create and configure span receiver
+    Map<String, String> conf = new HashMap<String, String>();
+    conf.put(FlumeSpanReceiver.FLUME_HOSTNAME_KEY, "127.0.0.1");
+    conf.put(FlumeSpanReceiver.FLUME_PORT_KEY, Integer.toString(flumeServer.getPort()));
+    if (extraConf != null) {
+      conf.putAll(extraConf);
+    }
+    
+    spanReceiver = new FlumeSpanReceiver(HTraceConfiguration.fromMap(conf));
+
+    // Create trace creator, it will register our receiver
+    traceCreator = new TraceCreator(spanReceiver);
+  }
+
+  private void stopReceiver() throws IOException {
+    // Close span receiver
+    if (spanReceiver != null) {
+      Trace.removeReceiver(spanReceiver);
+      spanReceiver.close();
+      spanReceiver = null;
+    }
+
+    // Close Flume server
+    if (flumeServer != null) {
+      RpcTestUtils.stopServer(flumeServer);
+      flumeServer = null;
+    }
+  }
+  
+  private static class AvroHandler implements AvroSourceProtocol {
+    private ArrayList<AvroFlumeEvent> all_events = new ArrayList<AvroFlumeEvent>();
+    
+    public List<AvroFlumeEvent> getAllEvents() {
+      return new ArrayList<AvroFlumeEvent>(all_events);
+    }
+    
+    @Override
+    public Status append(AvroFlumeEvent event) throws AvroRemoteException {
+      all_events.add(event);
+      return Status.OK;
+    }
+
+    @Override
+    public Status appendBatch(List<AvroFlumeEvent> events) throws
+        AvroRemoteException {
+      all_events.addAll(events);
+      return Status.OK;
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-htrace/blob/fa7a97fd/htrace-hbase/src/test/java/org/apache/htrace/impl/TestHBaseSpanReceiver.java
----------------------------------------------------------------------
diff --git a/htrace-hbase/src/test/java/org/apache/htrace/impl/TestHBaseSpanReceiver.java b/htrace-hbase/src/test/java/org/apache/htrace/impl/TestHBaseSpanReceiver.java
index e6a6491..d3cffe2 100644
--- a/htrace-hbase/src/test/java/org/apache/htrace/impl/TestHBaseSpanReceiver.java
+++ b/htrace-hbase/src/test/java/org/apache/htrace/impl/TestHBaseSpanReceiver.java
@@ -221,5 +221,8 @@ public class TestHBaseSpanReceiver {
     public Span child(String description) {
       return null;
     }
+
+    @Override
+    public String toJson() { return null; }
   }
 }

http://git-wip-us.apache.org/repos/asf/incubator-htrace/blob/fa7a97fd/pom.xml
----------------------------------------------------------------------
diff --git a/pom.xml b/pom.xml
index 10bf552..22853f3 100644
--- a/pom.xml
+++ b/pom.xml
@@ -24,6 +24,7 @@ language governing permissions and limitations under the License. -->
     <module>htrace-core</module>
     <module>htrace-zipkin</module>
     <module>htrace-hbase</module>
+    <module>htrace-flume</module>
   </modules>
 
   <parent>