You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@htrace.apache.org by cm...@apache.org on 2014/12/06 05:07:40 UTC

[4/8] incubator-htrace git commit: HTRACE-8. Change our base package from org.htrace to org.apache.htrace (cmccabe / stack)

http://git-wip-us.apache.org/repos/asf/incubator-htrace/blob/a27cd4da/htrace-hbase/src/main/java/org/apache/htrace/viewer/HBaseSpanViewerServer.java
----------------------------------------------------------------------
diff --git a/htrace-hbase/src/main/java/org/apache/htrace/viewer/HBaseSpanViewerServer.java b/htrace-hbase/src/main/java/org/apache/htrace/viewer/HBaseSpanViewerServer.java
new file mode 100644
index 0000000..da90008
--- /dev/null
+++ b/htrace-hbase/src/main/java/org/apache/htrace/viewer/HBaseSpanViewerServer.java
@@ -0,0 +1,115 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.htrace.viewer;
+
+import java.io.IOException;
+import java.net.InetSocketAddress;
+import java.net.URI;
+import java.util.ArrayList;
+import java.util.List;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.http.HttpServer2;
+import org.apache.hadoop.net.NetUtils;
+import org.apache.hadoop.util.Tool;
+import org.apache.hadoop.util.ToolRunner;
+import org.apache.hadoop.hbase.HBaseConfiguration;
+
+public class HBaseSpanViewerServer implements Tool {
+  private static final Log LOG = LogFactory.getLog(HBaseSpanViewerServer.class);
+  public static final String HTRACE_VIEWER_HTTP_ADDRESS_KEY = "htrace.viewer.http.address";
+  public static final String HTRACE_VIEWER_HTTP_ADDRESS_DEFAULT = "0.0.0.0:16900";
+  public static final String HTRACE_CONF_ATTR = "htrace.conf";
+  public static final String HTRACE_APPDIR = "webapps";
+  public static final String NAME = "htrace";
+
+  private Configuration conf;
+  private HttpServer2 httpServer;
+  private InetSocketAddress httpAddress;
+
+  public void setConf(Configuration conf) {
+    this.conf = conf;
+  }
+
+  public Configuration getConf() {
+    return this.conf;
+  }
+
+  void start() throws IOException {
+    httpAddress = NetUtils.createSocketAddr(
+        conf.get(HTRACE_VIEWER_HTTP_ADDRESS_KEY, HTRACE_VIEWER_HTTP_ADDRESS_DEFAULT));
+    conf.set(HTRACE_VIEWER_HTTP_ADDRESS_KEY, NetUtils.getHostPortString(httpAddress));
+    HttpServer2.Builder builder = new HttpServer2.Builder();
+    builder.setName(NAME).setConf(conf);
+    if (httpAddress.getPort() == 0) {
+      builder.setFindPort(true);
+    }
+    URI uri = URI.create("http://" + NetUtils.getHostPortString(httpAddress));
+    builder.addEndpoint(uri);
+    LOG.info("Starting Web-server for " + NAME + " at: " + uri);
+    httpServer = builder.build();
+    httpServer.setAttribute(HTRACE_CONF_ATTR, conf);
+    httpServer.addServlet("gettraces",
+                          HBaseSpanViewerTracesServlet.PREFIX,
+                          HBaseSpanViewerTracesServlet.class);
+    httpServer.addServlet("getspans",
+                          HBaseSpanViewerSpansServlet.PREFIX + "/*",
+                          HBaseSpanViewerSpansServlet.class);
+
+    // for webapps/htrace bundled in jar.
+    String rb = httpServer.getClass()
+                          .getClassLoader()
+                          .getResource("webapps/" + NAME)
+                          .toString();
+    httpServer.getWebAppContext().setResourceBase(rb);
+
+    httpServer.start();
+    httpAddress = httpServer.getConnectorAddress(0);
+  }
+
+  void join() throws Exception {
+    if (httpServer != null) {
+      httpServer.join();
+    }
+  }
+
+  void stop() throws Exception {
+    if (httpServer != null) {
+      httpServer.stop();
+    }
+  }
+
+  InetSocketAddress getHttpAddress() {
+    return httpAddress;
+  }
+
+  public int run(String[] args) throws Exception {
+    start();
+    join();
+    stop();
+    return 0;
+  }
+
+  /**
+   * @throws IOException
+   */
+  public static void main(String[] args) throws Exception {
+    ToolRunner.run(HBaseConfiguration.create(), new HBaseSpanViewerServer(), args);
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-htrace/blob/a27cd4da/htrace-hbase/src/main/java/org/apache/htrace/viewer/HBaseSpanViewerSpansServlet.java
----------------------------------------------------------------------
diff --git a/htrace-hbase/src/main/java/org/apache/htrace/viewer/HBaseSpanViewerSpansServlet.java b/htrace-hbase/src/main/java/org/apache/htrace/viewer/HBaseSpanViewerSpansServlet.java
new file mode 100644
index 0000000..8f3f50f
--- /dev/null
+++ b/htrace-hbase/src/main/java/org/apache/htrace/viewer/HBaseSpanViewerSpansServlet.java
@@ -0,0 +1,97 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.htrace.viewer;
+
+import java.io.IOException;
+import java.io.PrintWriter;
+
+import javax.servlet.ServletContext;
+import javax.servlet.ServletException;
+import javax.servlet.http.HttpServlet;
+import javax.servlet.http.HttpServletRequest;
+import javax.servlet.http.HttpServletResponse;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.util.ServletUtil;
+import org.apache.htrace.protobuf.generated.SpanProtos;
+
+public class HBaseSpanViewerSpansServlet extends HttpServlet {
+  private static final Log LOG = LogFactory.getLog(HBaseSpanViewerSpansServlet.class);
+  public static final String PREFIX = "/getspans";
+  private static final ThreadLocal<HBaseSpanViewer> tlviewer =
+      new ThreadLocal<HBaseSpanViewer>() {
+        @Override
+        protected HBaseSpanViewer initialValue() {
+          return null;
+        }
+      };
+
+  @Override
+  @SuppressWarnings("unchecked")
+  public void doGet(HttpServletRequest request, HttpServletResponse response)
+      throws ServletException, IOException {
+    final String path =
+        validatePath(ServletUtil.getDecodedPath(request, PREFIX));
+    if (path == null) {
+      response.setContentType("text/plain");
+      response.getWriter().print("Invalid input");
+      return;
+    }
+    HBaseSpanViewer viewer = tlviewer.get();
+    if (viewer == null) {
+      final Configuration conf = (Configuration) getServletContext()
+        .getAttribute(HBaseSpanViewerServer.HTRACE_CONF_ATTR);
+      viewer = new HBaseSpanViewer(conf);
+      tlviewer.set(viewer);
+    }
+    Long traceid = Long.parseLong(path.substring(1));
+    response.setContentType("application/javascript");
+    PrintWriter out = response.getWriter();
+    out.print("[");
+    boolean first = true;
+    for (SpanProtos.Span span : viewer.getSpans(traceid)) {
+      if (first) {
+        first = false;
+      } else {
+        out.print(",");
+      }
+      out.print(HBaseSpanViewer.toJsonString(span));
+    }
+    out.print("]");
+  }
+
+  @Override
+  public void init() throws ServletException {
+  }
+
+  @Override
+  public void destroy() {
+    HBaseSpanViewer viewer = tlviewer.get();
+    if (viewer != null) {
+      viewer.close();
+    }
+  }
+
+  public static String validatePath(String p) {
+    return p == null || p.length() == 0?
+      null: new Path(p).toUri().getPath();
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-htrace/blob/a27cd4da/htrace-hbase/src/main/java/org/apache/htrace/viewer/HBaseSpanViewerTracesServlet.java
----------------------------------------------------------------------
diff --git a/htrace-hbase/src/main/java/org/apache/htrace/viewer/HBaseSpanViewerTracesServlet.java b/htrace-hbase/src/main/java/org/apache/htrace/viewer/HBaseSpanViewerTracesServlet.java
new file mode 100644
index 0000000..b0370c4
--- /dev/null
+++ b/htrace-hbase/src/main/java/org/apache/htrace/viewer/HBaseSpanViewerTracesServlet.java
@@ -0,0 +1,84 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.htrace.viewer;
+
+import java.io.IOException;
+import java.io.PrintWriter;
+
+import javax.servlet.ServletContext;
+import javax.servlet.ServletException;
+import javax.servlet.http.HttpServlet;
+import javax.servlet.http.HttpServletRequest;
+import javax.servlet.http.HttpServletResponse;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.util.ServletUtil;
+import org.apache.htrace.protobuf.generated.SpanProtos;
+
+public class HBaseSpanViewerTracesServlet extends HttpServlet {
+  private static final Log LOG = LogFactory.getLog(HBaseSpanViewerTracesServlet.class);
+  public static final String PREFIX = "/gettraces";
+  private static final ThreadLocal<HBaseSpanViewer> tlviewer =
+      new ThreadLocal<HBaseSpanViewer>() {
+        @Override
+        protected HBaseSpanViewer initialValue() {
+          return null;
+        }
+      };
+
+  @Override
+  @SuppressWarnings("unchecked")
+  public void doGet(HttpServletRequest request, HttpServletResponse response)
+      throws ServletException, IOException {
+    HBaseSpanViewer viewer = tlviewer.get();
+    if (viewer == null) {
+      final Configuration conf = (Configuration) getServletContext()
+        .getAttribute(HBaseSpanViewerServer.HTRACE_CONF_ATTR);
+      viewer = new HBaseSpanViewer(conf);
+      tlviewer.set(viewer);
+    }
+    response.setContentType("application/javascript");
+    PrintWriter out = response.getWriter();
+    out.print("[");
+    boolean first = true;
+    for (SpanProtos.Span span : viewer.getRootSpans()) {
+      if (first) {
+        first = false;
+      } else {
+        out.print(",");
+      }
+      out.print(HBaseSpanViewer.toJsonString(span));
+    }
+    out.print("]");
+  }
+
+  @Override
+  public void init() throws ServletException {
+  }
+
+  @Override
+  public void destroy() {
+    HBaseSpanViewer viewer = tlviewer.get();
+    if (viewer != null) {
+      viewer.close();
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-htrace/blob/a27cd4da/htrace-hbase/src/main/java/org/htrace/impl/HBaseSpanReceiver.java
----------------------------------------------------------------------
diff --git a/htrace-hbase/src/main/java/org/htrace/impl/HBaseSpanReceiver.java b/htrace-hbase/src/main/java/org/htrace/impl/HBaseSpanReceiver.java
deleted file mode 100644
index 85af08c..0000000
--- a/htrace-hbase/src/main/java/org/htrace/impl/HBaseSpanReceiver.java
+++ /dev/null
@@ -1,367 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.htrace.impl;
-
-import com.google.common.util.concurrent.ThreadFactoryBuilder;
-import org.apache.commons.codec.binary.Base64;
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.hbase.HConstants;
-import org.apache.hadoop.hbase.HBaseConfiguration;
-import org.apache.hadoop.hbase.client.HConnection;
-import org.apache.hadoop.hbase.client.HConnectionManager;
-import org.apache.hadoop.hbase.client.HTableInterface;
-import org.apache.hadoop.hbase.client.Put;
-import org.apache.hadoop.hbase.trace.HBaseHTraceConfiguration;
-import org.apache.hadoop.hbase.util.Bytes;
-import org.htrace.HTraceConfiguration;
-import org.htrace.Sampler;
-import org.htrace.Span;
-import org.htrace.SpanReceiver;
-import org.htrace.TimelineAnnotation;
-import org.htrace.Trace;
-import org.htrace.TraceScope;
-import org.htrace.protobuf.generated.SpanProtos;
-import java.io.ByteArrayOutputStream;
-import java.io.IOException;
-import java.net.InetAddress;
-import java.net.UnknownHostException;
-import java.nio.ByteBuffer;
-import java.util.ArrayList;
-import java.util.List;
-import java.util.concurrent.ArrayBlockingQueue;
-import java.util.concurrent.BlockingQueue;
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Executors;
-import java.util.concurrent.ThreadFactory;
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.atomic.AtomicBoolean;
-
-/**
- * HBase is an open source distributed datastore.
- * This span receiver store spans into HBase.
- * HTrace spans are queued into a blocking queue.
- * From there background worker threads will send them
- * to a HBase database.
- */
-public class HBaseSpanReceiver implements SpanReceiver {
-  private static final Log LOG = LogFactory.getLog(HBaseSpanReceiver.class);
-
-  public static final String COLLECTOR_QUORUM_KEY = "htrace.hbase.collector-quorum";
-  public static final String DEFAULT_COLLECTOR_QUORUM = "127.0.0.1";
-  public static final String ZOOKEEPER_CLIENT_PORT_KEY = "htrace.hbase.zookeeper.property.clientPort";
-  public static final int DEFAULT_ZOOKEEPER_CLIENT_PORT = 2181;
-  public static final String ZOOKEEPER_ZNODE_PARENT_KEY = "htrace.hbase.zookeeper.znode.parent";
-  public static final String DEFAULT_ZOOKEEPER_ZNODE_PARENT = "/hbase";
-  public static final String NUM_THREADS_KEY = "htrace.hbase.num-threads";
-  public static final int DEFAULT_NUM_THREADS = 1;
-  public static final String MAX_SPAN_BATCH_SIZE_KEY = "htrace.hbase.batch.size";
-  public static final int DEFAULT_MAX_SPAN_BATCH_SIZE = 100;
-  public static final String TABLE_KEY = "htrace.hbase.table";
-  public static final String DEFAULT_TABLE = "htrace";
-  public static final String COLUMNFAMILY_KEY = "htrace.hbase.columnfamily";
-  public static final String DEFAULT_COLUMNFAMILY = "s";
-  public static final String INDEXFAMILY_KEY = "htrace.hbase.indexfamily";
-  public static final String DEFAULT_INDEXFAMILY = "i";
-  public static final byte[] INDEX_SPAN_QUAL = Bytes.toBytes("s");
-  public static final byte[] INDEX_TIME_QUAL = Bytes.toBytes("t");
-
-  /**
-   * How long this receiver will try and wait for all threads to shutdown.
-   */
-  private static final int SHUTDOWN_TIMEOUT = 30;
-
-  /**
-   * How many errors in a row before we start dropping traces on the floor.
-   */
-  private static final int MAX_ERRORS = 10;
-
-  /**
-   * The queue that will get all HTrace spans that are to be sent.
-   */
-  private final BlockingQueue<Span> queue;
-
-  /**
-   * Boolean used to signal that the threads should end.
-   */
-  private final AtomicBoolean running = new AtomicBoolean(true);
-
-  /**
-   * The thread factory used to create new ExecutorService.
-   * <p/>
-   * This will be the same factory for the lifetime of this object so that
-   * no thread names will ever be duplicated.
-   */
-  private final ThreadFactory tf;
-
-  ////////////////////
-  /// Variables that will change on each call to configure()
-  ///////////////////
-  private ExecutorService service;
-  private HTraceConfiguration conf;
-  private Configuration hconf;
-  private byte[] table;
-  private byte[] cf;
-  private byte[] icf;
-  private int maxSpanBatchSize;
-
-  public HBaseSpanReceiver() {
-    this.queue = new ArrayBlockingQueue<Span>(1000);
-    this.tf = new ThreadFactoryBuilder().setDaemon(true)
-                                        .setNameFormat("hbaseSpanReceiver-%d")
-                                        .build();
-  }
-
-  @Override
-  public void configure(HTraceConfiguration conf) {
-    this.conf = conf;
-    this.hconf = HBaseConfiguration.create();
-    this.table = Bytes.toBytes(conf.get(TABLE_KEY, DEFAULT_TABLE));
-    this.cf = Bytes.toBytes(conf.get(COLUMNFAMILY_KEY, DEFAULT_COLUMNFAMILY));
-    this.icf = Bytes.toBytes(conf.get(INDEXFAMILY_KEY, DEFAULT_INDEXFAMILY));
-    this.maxSpanBatchSize = conf.getInt(MAX_SPAN_BATCH_SIZE_KEY,
-                                        DEFAULT_MAX_SPAN_BATCH_SIZE);
-    String quorum = conf.get(COLLECTOR_QUORUM_KEY, DEFAULT_COLLECTOR_QUORUM);
-    hconf.set(HConstants.ZOOKEEPER_QUORUM, quorum);
-    String znodeParent = conf.get(ZOOKEEPER_ZNODE_PARENT_KEY, DEFAULT_ZOOKEEPER_ZNODE_PARENT);
-    hconf.set(HConstants.ZOOKEEPER_ZNODE_PARENT, znodeParent);
-    int clientPort = conf.getInt(ZOOKEEPER_CLIENT_PORT_KEY, DEFAULT_ZOOKEEPER_CLIENT_PORT);
-    hconf.setInt(HConstants.ZOOKEEPER_CLIENT_PORT, clientPort);
-
-    // If there are already threads runnnig tear them down.
-    if (this.service != null) {
-      this.service.shutdownNow();
-      this.service = null;
-    }
-    int numThreads = conf.getInt(NUM_THREADS_KEY, DEFAULT_NUM_THREADS);
-    this.service = Executors.newFixedThreadPool(numThreads, tf);
-    for (int i = 0; i < numThreads; i++) {
-      this.service.submit(new WriteSpanRunnable());
-    }
-  }
-
-  private class WriteSpanRunnable implements Runnable {
-    private HConnection hconnection;
-    private HTableInterface htable;
-
-    public WriteSpanRunnable() {
-    }
-
-    /**
-     * This runnable sends a HTrace span to the HBase.
-     */
-    @Override
-    public void run() {
-      SpanProtos.Span.Builder sbuilder = SpanProtos.Span.newBuilder();
-      SpanProtos.TimelineAnnotation.Builder tlbuilder =
-          SpanProtos.TimelineAnnotation.newBuilder();
-      List<Span> dequeuedSpans = new ArrayList<Span>(maxSpanBatchSize);
-      long errorCount = 0;
-
-      while (running.get() || queue.size() > 0) {
-        Span firstSpan = null;
-        try {
-          // Block for up to a second. to try and get a span.
-          // We only block for a little bit in order to notice
-          // if the running value has changed
-          firstSpan = queue.poll(1, TimeUnit.SECONDS);
-
-          // If the poll was successful then it's possible that there
-          // will be other spans to get. Try and get them.
-          if (firstSpan != null) {
-            // Add the first one that we got
-            dequeuedSpans.add(firstSpan);
-            // Try and get up to 100 queues
-            queue.drainTo(dequeuedSpans, maxSpanBatchSize - 1);
-          }
-        } catch (InterruptedException ie) {
-          // Ignored.
-        }
-        startClient();
-        if (dequeuedSpans.isEmpty()) {
-          try {
-            this.htable.flushCommits();
-          } catch (IOException e) {
-            LOG.error("failed to flush writes to HBase.");
-            closeClient();
-          }
-          continue;
-        }
-
-        try {
-          for (Span span : dequeuedSpans) {
-            sbuilder.clear()
-                    .setTraceId(span.getTraceId())
-                    .setParentId(span.getParentId())
-                    .setStart(span.getStartTimeMillis())
-                    .setStop(span.getStopTimeMillis())
-                    .setSpanId(span.getSpanId())
-                    .setProcessId(span.getProcessId())
-                    .setDescription(span.getDescription());
-            for (TimelineAnnotation ta : span.getTimelineAnnotations()) {
-              sbuilder.addTimeline(tlbuilder.clear()
-                                            .setTime(ta.getTime())
-                                            .setMessage(ta.getMessage())
-                                            .build());
-            }
-            Put put = new Put(Bytes.toBytes(span.getTraceId()));
-            put.add(HBaseSpanReceiver.this.cf,
-                    sbuilder.build().toByteArray(),
-                    null);
-            if (span.getParentId() == Span.ROOT_SPAN_ID) {
-              put.add(HBaseSpanReceiver.this.icf,
-                      INDEX_TIME_QUAL,
-                      Bytes.toBytes(span.getStartTimeMillis()));
-              put.add(HBaseSpanReceiver.this.icf,
-                      INDEX_SPAN_QUAL,
-                      sbuilder.build().toByteArray());
-            }
-            this.htable.put(put);
-          }
-          // clear the list for the next time through.
-          dequeuedSpans.clear();
-          // reset the error counter.
-          errorCount = 0;
-        } catch (Exception e) {
-          errorCount += 1;
-          // If there have been ten errors in a row start dropping things.
-          if (errorCount < MAX_ERRORS) {
-            try {
-              queue.addAll(dequeuedSpans);
-            } catch (IllegalStateException ex) {
-              LOG.error("Drop " + dequeuedSpans.size() +
-                        " span(s) because writing to HBase failed.");
-            }
-          }
-          closeClient();
-          try {
-            // Since there was an error sleep just a little bit to try and allow the
-            // HBase some time to recover.
-            Thread.sleep(500);
-          } catch (InterruptedException e1) {
-            // Ignored
-          }
-        }
-      }
-      closeClient();
-    }
-
-    /**
-     * Close out the connection.
-     */
-    private void closeClient() {
-      // close out the transport.
-      try {
-        if (this.htable != null) {
-          this.htable.close();
-          this.htable = null;
-        }
-        if (this.hconnection != null) {
-          this.hconnection.close();
-          this.hconnection = null;
-        }
-      } catch (IOException e) {
-        LOG.warn("Failed to close HBase connection. " + e.getMessage());
-      }
-    }
-
-    /**
-     * Re-connect to HBase
-     */
-    private void startClient() {
-      if (this.htable == null) {
-        try {
-          hconnection = HConnectionManager.createConnection(hconf);
-          htable = hconnection.getTable(table);
-        } catch (IOException e) {
-          LOG.warn("Failed to create HBase connection. " + e.getMessage());
-        }
-      }
-    }
-  }
-
-  /**
-   * Close the receiver.
-   * <p/>
-   * This tries to shutdown thread pool.
-   *
-   * @throws IOException
-   */
-  @Override
-  public void close() throws IOException {
-    running.set(false);
-    service.shutdown();
-    try {
-      if (!service.awaitTermination(SHUTDOWN_TIMEOUT, TimeUnit.SECONDS)) {
-        LOG.error("Was not able to process all remaining spans upon closing in: " +
-            SHUTDOWN_TIMEOUT + " " + TimeUnit.SECONDS +
-            ". Left Spans could be dropped.");
-       }
-    } catch (InterruptedException e1) {
-      LOG.warn("Thread interrupted when terminating executor.", e1);
-    }
-  }
-
-  @Override
-  public void receiveSpan(Span span) {
-    if (running.get()) {
-      try {
-        this.queue.add(span);
-      } catch (IllegalStateException e) {
-        // todo: supress repeating error logs.
-        LOG.error("Error trying to append span (" +
-            span.getDescription() + 
-            ") to the queue. Blocking Queue was full.");
-      }
-    }
-  }
-
-  /**
-   * Run basic test.
-   * @throws IOException
-   */
-  public static void main(String[] args) throws Exception {
-    HBaseSpanReceiver receiver = new HBaseSpanReceiver();
-    receiver.configure(new HBaseHTraceConfiguration(HBaseConfiguration.create()));
-    Trace.addReceiver(receiver);
-    TraceScope parent = 
-        Trace.startSpan("HBaseSpanReceiver.main.parent", Sampler.ALWAYS);
-    Thread.sleep(10);
-    long traceid = parent.getSpan().getTraceId();
-    TraceScope child1 =
-        Trace.startSpan("HBaseSpanReceiver.main.child.1");
-    Thread.sleep(10);
-    TraceScope child2 =
-        Trace.startSpan("HBaseSpanReceiver.main.child.2", parent.getSpan());
-    Thread.sleep(10);
-    TraceScope gchild =
-        Trace.startSpan("HBaseSpanReceiver.main.grandchild");
-    Trace.addTimelineAnnotation("annotation 1.");
-    Thread.sleep(10);
-    Trace.addTimelineAnnotation("annotation 2.");
-    gchild.close();
-    Thread.sleep(10);
-    child2.close();
-    Thread.sleep(10);
-    child1.close();
-    parent.close();
-    receiver.close();
-    System.out.println("trace id: " + traceid);
-  }
-}