You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@htrace.apache.org by cm...@apache.org on 2014/12/06 05:07:40 UTC
[4/8] incubator-htrace git commit: HTRACE-8. Change our base package
from org.htrace to org.apache.htrace (cmccabe / stack)
http://git-wip-us.apache.org/repos/asf/incubator-htrace/blob/a27cd4da/htrace-hbase/src/main/java/org/apache/htrace/viewer/HBaseSpanViewerServer.java
----------------------------------------------------------------------
diff --git a/htrace-hbase/src/main/java/org/apache/htrace/viewer/HBaseSpanViewerServer.java b/htrace-hbase/src/main/java/org/apache/htrace/viewer/HBaseSpanViewerServer.java
new file mode 100644
index 0000000..da90008
--- /dev/null
+++ b/htrace-hbase/src/main/java/org/apache/htrace/viewer/HBaseSpanViewerServer.java
@@ -0,0 +1,115 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.htrace.viewer;
+
+import java.io.IOException;
+import java.net.InetSocketAddress;
+import java.net.URI;
+import java.util.ArrayList;
+import java.util.List;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.http.HttpServer2;
+import org.apache.hadoop.net.NetUtils;
+import org.apache.hadoop.util.Tool;
+import org.apache.hadoop.util.ToolRunner;
+import org.apache.hadoop.hbase.HBaseConfiguration;
+
+public class HBaseSpanViewerServer implements Tool {
+ private static final Log LOG = LogFactory.getLog(HBaseSpanViewerServer.class);
+ public static final String HTRACE_VIEWER_HTTP_ADDRESS_KEY = "htrace.viewer.http.address";
+ public static final String HTRACE_VIEWER_HTTP_ADDRESS_DEFAULT = "0.0.0.0:16900";
+ public static final String HTRACE_CONF_ATTR = "htrace.conf";
+ public static final String HTRACE_APPDIR = "webapps";
+ public static final String NAME = "htrace";
+
+ private Configuration conf;
+ private HttpServer2 httpServer;
+ private InetSocketAddress httpAddress;
+
+ public void setConf(Configuration conf) {
+ this.conf = conf;
+ }
+
+ public Configuration getConf() {
+ return this.conf;
+ }
+
+ void start() throws IOException {
+ httpAddress = NetUtils.createSocketAddr(
+ conf.get(HTRACE_VIEWER_HTTP_ADDRESS_KEY, HTRACE_VIEWER_HTTP_ADDRESS_DEFAULT));
+ conf.set(HTRACE_VIEWER_HTTP_ADDRESS_KEY, NetUtils.getHostPortString(httpAddress));
+ HttpServer2.Builder builder = new HttpServer2.Builder();
+ builder.setName(NAME).setConf(conf);
+ if (httpAddress.getPort() == 0) {
+ builder.setFindPort(true);
+ }
+ URI uri = URI.create("http://" + NetUtils.getHostPortString(httpAddress));
+ builder.addEndpoint(uri);
+ LOG.info("Starting Web-server for " + NAME + " at: " + uri);
+ httpServer = builder.build();
+ httpServer.setAttribute(HTRACE_CONF_ATTR, conf);
+ httpServer.addServlet("gettraces",
+ HBaseSpanViewerTracesServlet.PREFIX,
+ HBaseSpanViewerTracesServlet.class);
+ httpServer.addServlet("getspans",
+ HBaseSpanViewerSpansServlet.PREFIX + "/*",
+ HBaseSpanViewerSpansServlet.class);
+
+ // for webapps/htrace bundled in jar.
+ String rb = httpServer.getClass()
+ .getClassLoader()
+ .getResource("webapps/" + NAME)
+ .toString();
+ httpServer.getWebAppContext().setResourceBase(rb);
+
+ httpServer.start();
+ httpAddress = httpServer.getConnectorAddress(0);
+ }
+
+ void join() throws Exception {
+ if (httpServer != null) {
+ httpServer.join();
+ }
+ }
+
+ void stop() throws Exception {
+ if (httpServer != null) {
+ httpServer.stop();
+ }
+ }
+
+ InetSocketAddress getHttpAddress() {
+ return httpAddress;
+ }
+
+ public int run(String[] args) throws Exception {
+ start();
+ join();
+ stop();
+ return 0;
+ }
+
+ /**
+ * @throws IOException
+ */
+ public static void main(String[] args) throws Exception {
+ ToolRunner.run(HBaseConfiguration.create(), new HBaseSpanViewerServer(), args);
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-htrace/blob/a27cd4da/htrace-hbase/src/main/java/org/apache/htrace/viewer/HBaseSpanViewerSpansServlet.java
----------------------------------------------------------------------
diff --git a/htrace-hbase/src/main/java/org/apache/htrace/viewer/HBaseSpanViewerSpansServlet.java b/htrace-hbase/src/main/java/org/apache/htrace/viewer/HBaseSpanViewerSpansServlet.java
new file mode 100644
index 0000000..8f3f50f
--- /dev/null
+++ b/htrace-hbase/src/main/java/org/apache/htrace/viewer/HBaseSpanViewerSpansServlet.java
@@ -0,0 +1,97 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.htrace.viewer;
+
+import java.io.IOException;
+import java.io.PrintWriter;
+
+import javax.servlet.ServletContext;
+import javax.servlet.ServletException;
+import javax.servlet.http.HttpServlet;
+import javax.servlet.http.HttpServletRequest;
+import javax.servlet.http.HttpServletResponse;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.util.ServletUtil;
+import org.apache.htrace.protobuf.generated.SpanProtos;
+
+public class HBaseSpanViewerSpansServlet extends HttpServlet {
+ private static final Log LOG = LogFactory.getLog(HBaseSpanViewerSpansServlet.class);
+ public static final String PREFIX = "/getspans";
+ private static final ThreadLocal<HBaseSpanViewer> tlviewer =
+ new ThreadLocal<HBaseSpanViewer>() {
+ @Override
+ protected HBaseSpanViewer initialValue() {
+ return null;
+ }
+ };
+
+ @Override
+ @SuppressWarnings("unchecked")
+ public void doGet(HttpServletRequest request, HttpServletResponse response)
+ throws ServletException, IOException {
+ final String path =
+ validatePath(ServletUtil.getDecodedPath(request, PREFIX));
+ if (path == null) {
+ response.setContentType("text/plain");
+ response.getWriter().print("Invalid input");
+ return;
+ }
+ HBaseSpanViewer viewer = tlviewer.get();
+ if (viewer == null) {
+ final Configuration conf = (Configuration) getServletContext()
+ .getAttribute(HBaseSpanViewerServer.HTRACE_CONF_ATTR);
+ viewer = new HBaseSpanViewer(conf);
+ tlviewer.set(viewer);
+ }
+ Long traceid = Long.parseLong(path.substring(1));
+ response.setContentType("application/javascript");
+ PrintWriter out = response.getWriter();
+ out.print("[");
+ boolean first = true;
+ for (SpanProtos.Span span : viewer.getSpans(traceid)) {
+ if (first) {
+ first = false;
+ } else {
+ out.print(",");
+ }
+ out.print(HBaseSpanViewer.toJsonString(span));
+ }
+ out.print("]");
+ }
+
+ @Override
+ public void init() throws ServletException {
+ }
+
+ @Override
+ public void destroy() {
+ HBaseSpanViewer viewer = tlviewer.get();
+ if (viewer != null) {
+ viewer.close();
+ }
+ }
+
+ public static String validatePath(String p) {
+ return p == null || p.length() == 0?
+ null: new Path(p).toUri().getPath();
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-htrace/blob/a27cd4da/htrace-hbase/src/main/java/org/apache/htrace/viewer/HBaseSpanViewerTracesServlet.java
----------------------------------------------------------------------
diff --git a/htrace-hbase/src/main/java/org/apache/htrace/viewer/HBaseSpanViewerTracesServlet.java b/htrace-hbase/src/main/java/org/apache/htrace/viewer/HBaseSpanViewerTracesServlet.java
new file mode 100644
index 0000000..b0370c4
--- /dev/null
+++ b/htrace-hbase/src/main/java/org/apache/htrace/viewer/HBaseSpanViewerTracesServlet.java
@@ -0,0 +1,84 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.htrace.viewer;
+
+import java.io.IOException;
+import java.io.PrintWriter;
+
+import javax.servlet.ServletContext;
+import javax.servlet.ServletException;
+import javax.servlet.http.HttpServlet;
+import javax.servlet.http.HttpServletRequest;
+import javax.servlet.http.HttpServletResponse;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.util.ServletUtil;
+import org.apache.htrace.protobuf.generated.SpanProtos;
+
+public class HBaseSpanViewerTracesServlet extends HttpServlet {
+ private static final Log LOG = LogFactory.getLog(HBaseSpanViewerTracesServlet.class);
+ public static final String PREFIX = "/gettraces";
+ private static final ThreadLocal<HBaseSpanViewer> tlviewer =
+ new ThreadLocal<HBaseSpanViewer>() {
+ @Override
+ protected HBaseSpanViewer initialValue() {
+ return null;
+ }
+ };
+
+ @Override
+ @SuppressWarnings("unchecked")
+ public void doGet(HttpServletRequest request, HttpServletResponse response)
+ throws ServletException, IOException {
+ HBaseSpanViewer viewer = tlviewer.get();
+ if (viewer == null) {
+ final Configuration conf = (Configuration) getServletContext()
+ .getAttribute(HBaseSpanViewerServer.HTRACE_CONF_ATTR);
+ viewer = new HBaseSpanViewer(conf);
+ tlviewer.set(viewer);
+ }
+ response.setContentType("application/javascript");
+ PrintWriter out = response.getWriter();
+ out.print("[");
+ boolean first = true;
+ for (SpanProtos.Span span : viewer.getRootSpans()) {
+ if (first) {
+ first = false;
+ } else {
+ out.print(",");
+ }
+ out.print(HBaseSpanViewer.toJsonString(span));
+ }
+ out.print("]");
+ }
+
+ @Override
+ public void init() throws ServletException {
+ }
+
+ @Override
+ public void destroy() {
+ HBaseSpanViewer viewer = tlviewer.get();
+ if (viewer != null) {
+ viewer.close();
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-htrace/blob/a27cd4da/htrace-hbase/src/main/java/org/htrace/impl/HBaseSpanReceiver.java
----------------------------------------------------------------------
diff --git a/htrace-hbase/src/main/java/org/htrace/impl/HBaseSpanReceiver.java b/htrace-hbase/src/main/java/org/htrace/impl/HBaseSpanReceiver.java
deleted file mode 100644
index 85af08c..0000000
--- a/htrace-hbase/src/main/java/org/htrace/impl/HBaseSpanReceiver.java
+++ /dev/null
@@ -1,367 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.htrace.impl;
-
-import com.google.common.util.concurrent.ThreadFactoryBuilder;
-import org.apache.commons.codec.binary.Base64;
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.hbase.HConstants;
-import org.apache.hadoop.hbase.HBaseConfiguration;
-import org.apache.hadoop.hbase.client.HConnection;
-import org.apache.hadoop.hbase.client.HConnectionManager;
-import org.apache.hadoop.hbase.client.HTableInterface;
-import org.apache.hadoop.hbase.client.Put;
-import org.apache.hadoop.hbase.trace.HBaseHTraceConfiguration;
-import org.apache.hadoop.hbase.util.Bytes;
-import org.htrace.HTraceConfiguration;
-import org.htrace.Sampler;
-import org.htrace.Span;
-import org.htrace.SpanReceiver;
-import org.htrace.TimelineAnnotation;
-import org.htrace.Trace;
-import org.htrace.TraceScope;
-import org.htrace.protobuf.generated.SpanProtos;
-import java.io.ByteArrayOutputStream;
-import java.io.IOException;
-import java.net.InetAddress;
-import java.net.UnknownHostException;
-import java.nio.ByteBuffer;
-import java.util.ArrayList;
-import java.util.List;
-import java.util.concurrent.ArrayBlockingQueue;
-import java.util.concurrent.BlockingQueue;
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Executors;
-import java.util.concurrent.ThreadFactory;
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.atomic.AtomicBoolean;
-
-/**
- * HBase is an open source distributed datastore.
- * This span receiver store spans into HBase.
- * HTrace spans are queued into a blocking queue.
- * From there background worker threads will send them
- * to a HBase database.
- */
-public class HBaseSpanReceiver implements SpanReceiver {
- private static final Log LOG = LogFactory.getLog(HBaseSpanReceiver.class);
-
- public static final String COLLECTOR_QUORUM_KEY = "htrace.hbase.collector-quorum";
- public static final String DEFAULT_COLLECTOR_QUORUM = "127.0.0.1";
- public static final String ZOOKEEPER_CLIENT_PORT_KEY = "htrace.hbase.zookeeper.property.clientPort";
- public static final int DEFAULT_ZOOKEEPER_CLIENT_PORT = 2181;
- public static final String ZOOKEEPER_ZNODE_PARENT_KEY = "htrace.hbase.zookeeper.znode.parent";
- public static final String DEFAULT_ZOOKEEPER_ZNODE_PARENT = "/hbase";
- public static final String NUM_THREADS_KEY = "htrace.hbase.num-threads";
- public static final int DEFAULT_NUM_THREADS = 1;
- public static final String MAX_SPAN_BATCH_SIZE_KEY = "htrace.hbase.batch.size";
- public static final int DEFAULT_MAX_SPAN_BATCH_SIZE = 100;
- public static final String TABLE_KEY = "htrace.hbase.table";
- public static final String DEFAULT_TABLE = "htrace";
- public static final String COLUMNFAMILY_KEY = "htrace.hbase.columnfamily";
- public static final String DEFAULT_COLUMNFAMILY = "s";
- public static final String INDEXFAMILY_KEY = "htrace.hbase.indexfamily";
- public static final String DEFAULT_INDEXFAMILY = "i";
- public static final byte[] INDEX_SPAN_QUAL = Bytes.toBytes("s");
- public static final byte[] INDEX_TIME_QUAL = Bytes.toBytes("t");
-
- /**
- * How long this receiver will try and wait for all threads to shutdown.
- */
- private static final int SHUTDOWN_TIMEOUT = 30;
-
- /**
- * How many errors in a row before we start dropping traces on the floor.
- */
- private static final int MAX_ERRORS = 10;
-
- /**
- * The queue that will get all HTrace spans that are to be sent.
- */
- private final BlockingQueue<Span> queue;
-
- /**
- * Boolean used to signal that the threads should end.
- */
- private final AtomicBoolean running = new AtomicBoolean(true);
-
- /**
- * The thread factory used to create new ExecutorService.
- * <p/>
- * This will be the same factory for the lifetime of this object so that
- * no thread names will ever be duplicated.
- */
- private final ThreadFactory tf;
-
- ////////////////////
- /// Variables that will change on each call to configure()
- ///////////////////
- private ExecutorService service;
- private HTraceConfiguration conf;
- private Configuration hconf;
- private byte[] table;
- private byte[] cf;
- private byte[] icf;
- private int maxSpanBatchSize;
-
- public HBaseSpanReceiver() {
- this.queue = new ArrayBlockingQueue<Span>(1000);
- this.tf = new ThreadFactoryBuilder().setDaemon(true)
- .setNameFormat("hbaseSpanReceiver-%d")
- .build();
- }
-
- @Override
- public void configure(HTraceConfiguration conf) {
- this.conf = conf;
- this.hconf = HBaseConfiguration.create();
- this.table = Bytes.toBytes(conf.get(TABLE_KEY, DEFAULT_TABLE));
- this.cf = Bytes.toBytes(conf.get(COLUMNFAMILY_KEY, DEFAULT_COLUMNFAMILY));
- this.icf = Bytes.toBytes(conf.get(INDEXFAMILY_KEY, DEFAULT_INDEXFAMILY));
- this.maxSpanBatchSize = conf.getInt(MAX_SPAN_BATCH_SIZE_KEY,
- DEFAULT_MAX_SPAN_BATCH_SIZE);
- String quorum = conf.get(COLLECTOR_QUORUM_KEY, DEFAULT_COLLECTOR_QUORUM);
- hconf.set(HConstants.ZOOKEEPER_QUORUM, quorum);
- String znodeParent = conf.get(ZOOKEEPER_ZNODE_PARENT_KEY, DEFAULT_ZOOKEEPER_ZNODE_PARENT);
- hconf.set(HConstants.ZOOKEEPER_ZNODE_PARENT, znodeParent);
- int clientPort = conf.getInt(ZOOKEEPER_CLIENT_PORT_KEY, DEFAULT_ZOOKEEPER_CLIENT_PORT);
- hconf.setInt(HConstants.ZOOKEEPER_CLIENT_PORT, clientPort);
-
- // If there are already threads runnnig tear them down.
- if (this.service != null) {
- this.service.shutdownNow();
- this.service = null;
- }
- int numThreads = conf.getInt(NUM_THREADS_KEY, DEFAULT_NUM_THREADS);
- this.service = Executors.newFixedThreadPool(numThreads, tf);
- for (int i = 0; i < numThreads; i++) {
- this.service.submit(new WriteSpanRunnable());
- }
- }
-
- private class WriteSpanRunnable implements Runnable {
- private HConnection hconnection;
- private HTableInterface htable;
-
- public WriteSpanRunnable() {
- }
-
- /**
- * This runnable sends a HTrace span to the HBase.
- */
- @Override
- public void run() {
- SpanProtos.Span.Builder sbuilder = SpanProtos.Span.newBuilder();
- SpanProtos.TimelineAnnotation.Builder tlbuilder =
- SpanProtos.TimelineAnnotation.newBuilder();
- List<Span> dequeuedSpans = new ArrayList<Span>(maxSpanBatchSize);
- long errorCount = 0;
-
- while (running.get() || queue.size() > 0) {
- Span firstSpan = null;
- try {
- // Block for up to a second. to try and get a span.
- // We only block for a little bit in order to notice
- // if the running value has changed
- firstSpan = queue.poll(1, TimeUnit.SECONDS);
-
- // If the poll was successful then it's possible that there
- // will be other spans to get. Try and get them.
- if (firstSpan != null) {
- // Add the first one that we got
- dequeuedSpans.add(firstSpan);
- // Try and get up to 100 queues
- queue.drainTo(dequeuedSpans, maxSpanBatchSize - 1);
- }
- } catch (InterruptedException ie) {
- // Ignored.
- }
- startClient();
- if (dequeuedSpans.isEmpty()) {
- try {
- this.htable.flushCommits();
- } catch (IOException e) {
- LOG.error("failed to flush writes to HBase.");
- closeClient();
- }
- continue;
- }
-
- try {
- for (Span span : dequeuedSpans) {
- sbuilder.clear()
- .setTraceId(span.getTraceId())
- .setParentId(span.getParentId())
- .setStart(span.getStartTimeMillis())
- .setStop(span.getStopTimeMillis())
- .setSpanId(span.getSpanId())
- .setProcessId(span.getProcessId())
- .setDescription(span.getDescription());
- for (TimelineAnnotation ta : span.getTimelineAnnotations()) {
- sbuilder.addTimeline(tlbuilder.clear()
- .setTime(ta.getTime())
- .setMessage(ta.getMessage())
- .build());
- }
- Put put = new Put(Bytes.toBytes(span.getTraceId()));
- put.add(HBaseSpanReceiver.this.cf,
- sbuilder.build().toByteArray(),
- null);
- if (span.getParentId() == Span.ROOT_SPAN_ID) {
- put.add(HBaseSpanReceiver.this.icf,
- INDEX_TIME_QUAL,
- Bytes.toBytes(span.getStartTimeMillis()));
- put.add(HBaseSpanReceiver.this.icf,
- INDEX_SPAN_QUAL,
- sbuilder.build().toByteArray());
- }
- this.htable.put(put);
- }
- // clear the list for the next time through.
- dequeuedSpans.clear();
- // reset the error counter.
- errorCount = 0;
- } catch (Exception e) {
- errorCount += 1;
- // If there have been ten errors in a row start dropping things.
- if (errorCount < MAX_ERRORS) {
- try {
- queue.addAll(dequeuedSpans);
- } catch (IllegalStateException ex) {
- LOG.error("Drop " + dequeuedSpans.size() +
- " span(s) because writing to HBase failed.");
- }
- }
- closeClient();
- try {
- // Since there was an error sleep just a little bit to try and allow the
- // HBase some time to recover.
- Thread.sleep(500);
- } catch (InterruptedException e1) {
- // Ignored
- }
- }
- }
- closeClient();
- }
-
- /**
- * Close out the connection.
- */
- private void closeClient() {
- // close out the transport.
- try {
- if (this.htable != null) {
- this.htable.close();
- this.htable = null;
- }
- if (this.hconnection != null) {
- this.hconnection.close();
- this.hconnection = null;
- }
- } catch (IOException e) {
- LOG.warn("Failed to close HBase connection. " + e.getMessage());
- }
- }
-
- /**
- * Re-connect to HBase
- */
- private void startClient() {
- if (this.htable == null) {
- try {
- hconnection = HConnectionManager.createConnection(hconf);
- htable = hconnection.getTable(table);
- } catch (IOException e) {
- LOG.warn("Failed to create HBase connection. " + e.getMessage());
- }
- }
- }
- }
-
- /**
- * Close the receiver.
- * <p/>
- * This tries to shutdown thread pool.
- *
- * @throws IOException
- */
- @Override
- public void close() throws IOException {
- running.set(false);
- service.shutdown();
- try {
- if (!service.awaitTermination(SHUTDOWN_TIMEOUT, TimeUnit.SECONDS)) {
- LOG.error("Was not able to process all remaining spans upon closing in: " +
- SHUTDOWN_TIMEOUT + " " + TimeUnit.SECONDS +
- ". Left Spans could be dropped.");
- }
- } catch (InterruptedException e1) {
- LOG.warn("Thread interrupted when terminating executor.", e1);
- }
- }
-
- @Override
- public void receiveSpan(Span span) {
- if (running.get()) {
- try {
- this.queue.add(span);
- } catch (IllegalStateException e) {
- // todo: supress repeating error logs.
- LOG.error("Error trying to append span (" +
- span.getDescription() +
- ") to the queue. Blocking Queue was full.");
- }
- }
- }
-
- /**
- * Run basic test.
- * @throws IOException
- */
- public static void main(String[] args) throws Exception {
- HBaseSpanReceiver receiver = new HBaseSpanReceiver();
- receiver.configure(new HBaseHTraceConfiguration(HBaseConfiguration.create()));
- Trace.addReceiver(receiver);
- TraceScope parent =
- Trace.startSpan("HBaseSpanReceiver.main.parent", Sampler.ALWAYS);
- Thread.sleep(10);
- long traceid = parent.getSpan().getTraceId();
- TraceScope child1 =
- Trace.startSpan("HBaseSpanReceiver.main.child.1");
- Thread.sleep(10);
- TraceScope child2 =
- Trace.startSpan("HBaseSpanReceiver.main.child.2", parent.getSpan());
- Thread.sleep(10);
- TraceScope gchild =
- Trace.startSpan("HBaseSpanReceiver.main.grandchild");
- Trace.addTimelineAnnotation("annotation 1.");
- Thread.sleep(10);
- Trace.addTimelineAnnotation("annotation 2.");
- gchild.close();
- Thread.sleep(10);
- child2.close();
- Thread.sleep(10);
- child1.close();
- parent.close();
- receiver.close();
- System.out.println("trace id: " + traceid);
- }
-}