You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@htrace.apache.org by cm...@apache.org on 2014/12/06 05:07:38 UTC
[2/8] incubator-htrace git commit: HTRACE-8. Change our base package
from org.htrace to org.apache.htrace (cmccabe / stack)
http://git-wip-us.apache.org/repos/asf/incubator-htrace/blob/a27cd4da/htrace-hbase/src/main/java/org/htrace/viewer/HBaseSpanViewerServer.java
----------------------------------------------------------------------
diff --git a/htrace-hbase/src/main/java/org/htrace/viewer/HBaseSpanViewerServer.java b/htrace-hbase/src/main/java/org/htrace/viewer/HBaseSpanViewerServer.java
deleted file mode 100644
index 3394b33..0000000
--- a/htrace-hbase/src/main/java/org/htrace/viewer/HBaseSpanViewerServer.java
+++ /dev/null
@@ -1,115 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.htrace.viewer;
-
-import java.io.IOException;
-import java.net.InetSocketAddress;
-import java.net.URI;
-import java.util.ArrayList;
-import java.util.List;
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.http.HttpServer2;
-import org.apache.hadoop.net.NetUtils;
-import org.apache.hadoop.util.Tool;
-import org.apache.hadoop.util.ToolRunner;
-import org.apache.hadoop.hbase.HBaseConfiguration;
-
-public class HBaseSpanViewerServer implements Tool {
- private static final Log LOG = LogFactory.getLog(HBaseSpanViewerServer.class);
- public static final String HTRACE_VIEWER_HTTP_ADDRESS_KEY = "htrace.viewer.http.address";
- public static final String HTRACE_VIEWER_HTTP_ADDRESS_DEFAULT = "0.0.0.0:16900";
- public static final String HTRACE_CONF_ATTR = "htrace.conf";
- public static final String HTRACE_APPDIR = "webapps";
- public static final String NAME = "htrace";
-
- private Configuration conf;
- private HttpServer2 httpServer;
- private InetSocketAddress httpAddress;
-
- public void setConf(Configuration conf) {
- this.conf = conf;
- }
-
- public Configuration getConf() {
- return this.conf;
- }
-
- void start() throws IOException {
- httpAddress = NetUtils.createSocketAddr(
- conf.get(HTRACE_VIEWER_HTTP_ADDRESS_KEY, HTRACE_VIEWER_HTTP_ADDRESS_DEFAULT));
- conf.set(HTRACE_VIEWER_HTTP_ADDRESS_KEY, NetUtils.getHostPortString(httpAddress));
- HttpServer2.Builder builder = new HttpServer2.Builder();
- builder.setName(NAME).setConf(conf);
- if (httpAddress.getPort() == 0) {
- builder.setFindPort(true);
- }
- URI uri = URI.create("http://" + NetUtils.getHostPortString(httpAddress));
- builder.addEndpoint(uri);
- LOG.info("Starting Web-server for " + NAME + " at: " + uri);
- httpServer = builder.build();
- httpServer.setAttribute(HTRACE_CONF_ATTR, conf);
- httpServer.addServlet("gettraces",
- HBaseSpanViewerTracesServlet.PREFIX,
- HBaseSpanViewerTracesServlet.class);
- httpServer.addServlet("getspans",
- HBaseSpanViewerSpansServlet.PREFIX + "/*",
- HBaseSpanViewerSpansServlet.class);
-
- // for webapps/htrace bundled in jar.
- String rb = httpServer.getClass()
- .getClassLoader()
- .getResource("webapps/" + NAME)
- .toString();
- httpServer.getWebAppContext().setResourceBase(rb);
-
- httpServer.start();
- httpAddress = httpServer.getConnectorAddress(0);
- }
-
- void join() throws Exception {
- if (httpServer != null) {
- httpServer.join();
- }
- }
-
- void stop() throws Exception {
- if (httpServer != null) {
- httpServer.stop();
- }
- }
-
- InetSocketAddress getHttpAddress() {
- return httpAddress;
- }
-
- public int run(String[] args) throws Exception {
- start();
- join();
- stop();
- return 0;
- }
-
- /**
- * @throws IOException
- */
- public static void main(String[] args) throws Exception {
- ToolRunner.run(HBaseConfiguration.create(), new HBaseSpanViewerServer(), args);
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-htrace/blob/a27cd4da/htrace-hbase/src/main/java/org/htrace/viewer/HBaseSpanViewerSpansServlet.java
----------------------------------------------------------------------
diff --git a/htrace-hbase/src/main/java/org/htrace/viewer/HBaseSpanViewerSpansServlet.java b/htrace-hbase/src/main/java/org/htrace/viewer/HBaseSpanViewerSpansServlet.java
deleted file mode 100644
index be4a79c..0000000
--- a/htrace-hbase/src/main/java/org/htrace/viewer/HBaseSpanViewerSpansServlet.java
+++ /dev/null
@@ -1,95 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.htrace.viewer;
-
-import java.io.IOException;
-import java.io.PrintWriter;
-import javax.servlet.ServletContext;
-import javax.servlet.ServletException;
-import javax.servlet.http.HttpServlet;
-import javax.servlet.http.HttpServletRequest;
-import javax.servlet.http.HttpServletResponse;
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.util.ServletUtil;
-import org.htrace.protobuf.generated.SpanProtos;
-
-public class HBaseSpanViewerSpansServlet extends HttpServlet {
- private static final Log LOG = LogFactory.getLog(HBaseSpanViewerSpansServlet.class);
- public static final String PREFIX = "/getspans";
- private static final ThreadLocal<HBaseSpanViewer> tlviewer =
- new ThreadLocal<HBaseSpanViewer>() {
- @Override
- protected HBaseSpanViewer initialValue() {
- return null;
- }
- };
-
- @Override
- @SuppressWarnings("unchecked")
- public void doGet(HttpServletRequest request, HttpServletResponse response)
- throws ServletException, IOException {
- final String path =
- validatePath(ServletUtil.getDecodedPath(request, PREFIX));
- if (path == null) {
- response.setContentType("text/plain");
- response.getWriter().print("Invalid input");
- return;
- }
- HBaseSpanViewer viewer = tlviewer.get();
- if (viewer == null) {
- final Configuration conf = (Configuration) getServletContext()
- .getAttribute(HBaseSpanViewerServer.HTRACE_CONF_ATTR);
- viewer = new HBaseSpanViewer(conf);
- tlviewer.set(viewer);
- }
- Long traceid = Long.parseLong(path.substring(1));
- response.setContentType("application/javascript");
- PrintWriter out = response.getWriter();
- out.print("[");
- boolean first = true;
- for (SpanProtos.Span span : viewer.getSpans(traceid)) {
- if (first) {
- first = false;
- } else {
- out.print(",");
- }
- out.print(HBaseSpanViewer.toJsonString(span));
- }
- out.print("]");
- }
-
- @Override
- public void init() throws ServletException {
- }
-
- @Override
- public void destroy() {
- HBaseSpanViewer viewer = tlviewer.get();
- if (viewer != null) {
- viewer.close();
- }
- }
-
- public static String validatePath(String p) {
- return p == null || p.length() == 0?
- null: new Path(p).toUri().getPath();
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-htrace/blob/a27cd4da/htrace-hbase/src/main/java/org/htrace/viewer/HBaseSpanViewerTracesServlet.java
----------------------------------------------------------------------
diff --git a/htrace-hbase/src/main/java/org/htrace/viewer/HBaseSpanViewerTracesServlet.java b/htrace-hbase/src/main/java/org/htrace/viewer/HBaseSpanViewerTracesServlet.java
deleted file mode 100644
index 3af49fc..0000000
--- a/htrace-hbase/src/main/java/org/htrace/viewer/HBaseSpanViewerTracesServlet.java
+++ /dev/null
@@ -1,82 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.htrace.viewer;
-
-import java.io.IOException;
-import java.io.PrintWriter;
-import javax.servlet.ServletContext;
-import javax.servlet.ServletException;
-import javax.servlet.http.HttpServlet;
-import javax.servlet.http.HttpServletRequest;
-import javax.servlet.http.HttpServletResponse;
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.util.ServletUtil;
-import org.htrace.protobuf.generated.SpanProtos;
-
-public class HBaseSpanViewerTracesServlet extends HttpServlet {
- private static final Log LOG = LogFactory.getLog(HBaseSpanViewerTracesServlet.class);
- public static final String PREFIX = "/gettraces";
- private static final ThreadLocal<HBaseSpanViewer> tlviewer =
- new ThreadLocal<HBaseSpanViewer>() {
- @Override
- protected HBaseSpanViewer initialValue() {
- return null;
- }
- };
-
- @Override
- @SuppressWarnings("unchecked")
- public void doGet(HttpServletRequest request, HttpServletResponse response)
- throws ServletException, IOException {
- HBaseSpanViewer viewer = tlviewer.get();
- if (viewer == null) {
- final Configuration conf = (Configuration) getServletContext()
- .getAttribute(HBaseSpanViewerServer.HTRACE_CONF_ATTR);
- viewer = new HBaseSpanViewer(conf);
- tlviewer.set(viewer);
- }
- response.setContentType("application/javascript");
- PrintWriter out = response.getWriter();
- out.print("[");
- boolean first = true;
- for (SpanProtos.Span span : viewer.getRootSpans()) {
- if (first) {
- first = false;
- } else {
- out.print(",");
- }
- out.print(HBaseSpanViewer.toJsonString(span));
- }
- out.print("]");
- }
-
- @Override
- public void init() throws ServletException {
- }
-
- @Override
- public void destroy() {
- HBaseSpanViewer viewer = tlviewer.get();
- if (viewer != null) {
- viewer.close();
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-htrace/blob/a27cd4da/htrace-hbase/src/main/protobuf/Span.proto
----------------------------------------------------------------------
diff --git a/htrace-hbase/src/main/protobuf/Span.proto b/htrace-hbase/src/main/protobuf/Span.proto
index 6e58b7c..e8dc369 100644
--- a/htrace-hbase/src/main/protobuf/Span.proto
+++ b/htrace-hbase/src/main/protobuf/Span.proto
@@ -16,7 +16,7 @@
* limitations under the License.
*/
-option java_package = "org.htrace.protobuf.generated";
+option java_package = "org.apache.htrace.protobuf.generated";
option java_outer_classname = "SpanProtos";
option java_generate_equals_and_hash = true;
option optimize_for = SPEED;
http://git-wip-us.apache.org/repos/asf/incubator-htrace/blob/a27cd4da/htrace-hbase/src/main/webapps/htrace/spans.js
----------------------------------------------------------------------
diff --git a/htrace-hbase/src/main/webapps/htrace/spans.js b/htrace-hbase/src/main/webapps/htrace/spans.js
index c7c38af..03ff6fb 100644
--- a/htrace-hbase/src/main/webapps/htrace/spans.js
+++ b/htrace-hbase/src/main/webapps/htrace/spans.js
@@ -21,7 +21,7 @@ const width_span = 700;
const size_tl = 6;
const margin = {top: 50, bottom: 50, left: 50, right: 1000, process: 250};
-const ROOT_SPAN_ID = "477902"; // constants defined in org.htrace.Span
+const ROOT_SPAN_ID = "477902"; // constants defined in org.apache.htrace.Span
const traceid = window.location.search.substring(1).split("=")[1];
d3.json("/getspans/" + traceid, function(spans) {
http://git-wip-us.apache.org/repos/asf/incubator-htrace/blob/a27cd4da/htrace-hbase/src/test/java/org/apache/htrace/impl/HBaseTestUtil.java
----------------------------------------------------------------------
diff --git a/htrace-hbase/src/test/java/org/apache/htrace/impl/HBaseTestUtil.java b/htrace-hbase/src/test/java/org/apache/htrace/impl/HBaseTestUtil.java
new file mode 100644
index 0000000..bcc5d27
--- /dev/null
+++ b/htrace-hbase/src/test/java/org/apache/htrace/impl/HBaseTestUtil.java
@@ -0,0 +1,89 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.htrace.impl;
+
+import java.io.IOException;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.HBaseConfiguration;
+import org.apache.hadoop.hbase.HBaseTestingUtility;
+import org.apache.hadoop.hbase.HConstants;
+import org.apache.hadoop.hbase.client.HTableInterface;
+import org.apache.hadoop.hbase.trace.HBaseHTraceConfiguration;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.htrace.HTraceConfiguration;
+import org.apache.htrace.SpanReceiver;
+import org.apache.htrace.impl.HBaseSpanReceiver;
+import org.junit.Assert;
+
+
+public class HBaseTestUtil {
+ private static final Log LOG = LogFactory.getLog(HBaseTestUtil.class);
+
+ public static Configuration configure(Configuration conf) {
+ Configuration hconf = HBaseConfiguration.create(conf);
+ hconf.set(HBaseHTraceConfiguration.KEY_PREFIX +
+ HBaseSpanReceiver.COLLECTOR_QUORUM_KEY,
+ conf.get(HConstants.ZOOKEEPER_QUORUM));
+ hconf.setInt(HBaseHTraceConfiguration.KEY_PREFIX +
+ HBaseSpanReceiver.ZOOKEEPER_CLIENT_PORT_KEY,
+ conf.getInt(HConstants.ZOOKEEPER_CLIENT_PORT, 2181));
+ hconf.set(HBaseHTraceConfiguration.KEY_PREFIX +
+ HBaseSpanReceiver.ZOOKEEPER_ZNODE_PARENT_KEY,
+ conf.get(HConstants.ZOOKEEPER_ZNODE_PARENT));
+ return hconf;
+ }
+
+ public static HTableInterface createTable(HBaseTestingUtility util) {
+ HTableInterface htable = null;
+ try {
+ htable = util.createTable(Bytes.toBytes(HBaseSpanReceiver.DEFAULT_TABLE),
+ new byte[][]{Bytes.toBytes(HBaseSpanReceiver.DEFAULT_COLUMNFAMILY),
+ Bytes.toBytes(HBaseSpanReceiver.DEFAULT_INDEXFAMILY)});
+ } catch (IOException e) {
+ Assert.fail("failed to create htrace table. " + e.getMessage());
+ }
+ return htable;
+ }
+
+ public static SpanReceiver startReceiver(Configuration conf) {
+ /* TODO: FIX!!!!! CIRCULAR DEPENDENCY BACK TO HBASE
+ SpanReceiver receiver = new HBaseSpanReceiver();
+ receiver.configure(new HBaseHTraceConfiguration(conf));
+ return receiver;
+ */
+ return null;
+ }
+
+ public static SpanReceiver startReceiver(HBaseTestingUtility util) {
+ return startReceiver(configure(util.getConfiguration()));
+ }
+
+ public static void stopReceiver(SpanReceiver receiver) {
+ if (receiver != null) {
+ try {
+ receiver.close();
+ receiver = null;
+ } catch (IOException e) {
+ Assert.fail("failed to close span receiver. " + e.getMessage());
+ }
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-htrace/blob/a27cd4da/htrace-hbase/src/test/java/org/apache/htrace/impl/TestHBaseSpanReceiver.java
----------------------------------------------------------------------
diff --git a/htrace-hbase/src/test/java/org/apache/htrace/impl/TestHBaseSpanReceiver.java b/htrace-hbase/src/test/java/org/apache/htrace/impl/TestHBaseSpanReceiver.java
new file mode 100644
index 0000000..4d6d15c
--- /dev/null
+++ b/htrace-hbase/src/test/java/org/apache/htrace/impl/TestHBaseSpanReceiver.java
@@ -0,0 +1,226 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.htrace.impl;
+
+import com.google.common.collect.Multimap;
+
+import java.io.InputStream;
+import java.io.ByteArrayInputStream;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.hbase.Cell;
+import org.apache.hadoop.hbase.CellUtil;
+import org.apache.hadoop.hbase.HBaseTestingUtility;
+import org.apache.hadoop.hbase.client.HTableInterface;
+import org.apache.hadoop.hbase.client.Result;
+import org.apache.hadoop.hbase.client.ResultScanner;
+import org.apache.hadoop.hbase.client.Scan;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.htrace.Span;
+import org.apache.htrace.SpanReceiver;
+import org.apache.htrace.TimelineAnnotation;
+import org.apache.htrace.TraceTree;
+import org.apache.htrace.impl.HBaseSpanReceiver;
+import org.apache.htrace.protobuf.generated.SpanProtos;
+import org.junit.AfterClass;
+import org.junit.BeforeClass;
+import org.junit.Test;
+import org.junit.Ignore;
+import org.junit.Assert;
+import org.apache.htrace.TraceCreator;
+
+
+public class TestHBaseSpanReceiver {
+ private static final Log LOG = LogFactory.getLog(TestHBaseSpanReceiver.class);
+ private static final HBaseTestingUtility UTIL = new HBaseTestingUtility();
+
+ @BeforeClass
+ public static void setUpBeforeClass() throws Exception {
+ UTIL.startMiniCluster(1);
+ }
+
+ @AfterClass
+ public static void tearDownAfterClass() throws Exception {
+ UTIL.shutdownMiniCluster();
+ }
+
+ // Reenable after fix circular dependency
+ @Ignore @Test
+ public void testHBaseSpanReceiver() {
+ HTableInterface htable = HBaseTestUtil.createTable(UTIL);
+ SpanReceiver receiver = HBaseTestUtil.startReceiver(UTIL);
+ TraceCreator tc = new TraceCreator(receiver);
+ tc.createThreadedTrace();
+ tc.createSimpleTrace();
+ tc.createSampleRpcTrace();
+ HBaseTestUtil.stopReceiver(receiver);
+ Scan scan = new Scan();
+ scan.addFamily(Bytes.toBytes(HBaseSpanReceiver.DEFAULT_COLUMNFAMILY));
+ scan.setMaxVersions(1);
+ ArrayList<Span> spans = new ArrayList<Span>();
+ try {
+ ResultScanner scanner = htable.getScanner(scan);
+ Result result = null;
+ while ((result = scanner.next()) != null) {
+ for (Cell cell : result.listCells()) {
+ InputStream in = new ByteArrayInputStream(cell.getQualifierArray(),
+ cell.getQualifierOffset(),
+ cell.getQualifierLength());
+ spans.add(new TestSpan(SpanProtos.Span.parseFrom(in)));
+ }
+ }
+ } catch (IOException e) {
+ Assert.fail("failed to get spans from HBase. " + e.getMessage());
+ }
+
+ TraceTree traceTree = new TraceTree(spans);
+ Collection<Span> roots = traceTree.getRoots();
+ Assert.assertEquals(3, roots.size());
+
+ Map<String, Span> descs = new HashMap<String, Span>();
+ for (Span root : roots) {
+ descs.put(root.getDescription(), root);
+ }
+ Assert.assertTrue(descs.keySet().contains(TraceCreator.RPC_TRACE_ROOT));
+ Assert.assertTrue(descs.keySet().contains(TraceCreator.SIMPLE_TRACE_ROOT));
+ Assert.assertTrue(descs.keySet().contains(TraceCreator.THREADED_TRACE_ROOT));
+
+ /** TODO: FIX!!!!!!
+ Multimap<Long, Span> spansByParentId = traceTree.getSpansByParentIdMap();
+ Span rpcRoot = descs.get(TraceCreator.RPC_TRACE_ROOT);
+ Assert.assertEquals(1, spansByParentId.get(rpcRoot.getSpanId()).size());
+ Span rpcChild1 = spansByParentId.get(rpcRoot.getSpanId()).iterator().next();
+ Assert.assertEquals(1, spansByParentId.get(rpcChild1.getSpanId()).size());
+ Span rpcChild2 = spansByParentId.get(rpcChild1.getSpanId()).iterator().next();
+ Assert.assertEquals(1, spansByParentId.get(rpcChild2.getSpanId()).size());
+ Span rpcChild3 = spansByParentId.get(rpcChild2.getSpanId()).iterator().next();
+ Assert.assertEquals(0, spansByParentId.get(rpcChild3.getSpanId()).size());
+
+ Scan iscan = new Scan();
+ iscan.addColumn(Bytes.toBytes(HBaseSpanReceiver.DEFAULT_INDEXFAMILY),
+ HBaseSpanReceiver.INDEX_SPAN_QUAL);
+ try {
+ ResultScanner scanner = htable.getScanner(iscan);
+ Result result = null;
+ while ((result = scanner.next()) != null) {
+ for (Cell cell : result.listCells()) {
+ InputStream in = new ByteArrayInputStream(cell.getValueArray(),
+ cell.getValueOffset(),
+ cell.getValueLength());
+ Assert.assertEquals(SpanProtos.Span.parseFrom(in).getParentId(),
+ Span.ROOT_SPAN_ID);
+ }
+ }
+ } catch (IOException e) {
+ Assert.fail("failed to get spans from index family. " + e.getMessage());
+ }
+ */
+ }
+
+ private class TestSpan implements Span {
+ SpanProtos.Span span;
+
+ public TestSpan(SpanProtos.Span span) {
+ this.span = span;
+ }
+
+ @Override
+ public long getTraceId() {
+ return span.getTraceId();
+ }
+
+ @Override
+ public long getParentId() {
+ return span.getParentId();
+ }
+
+ @Override
+ public long getStartTimeMillis() {
+ return span.getStart();
+ }
+
+ @Override
+ public long getStopTimeMillis() {
+ return span.getStop();
+ }
+
+ @Override
+ public long getSpanId() {
+ return span.getSpanId();
+ }
+
+ @Override
+ public String getProcessId() {
+ return span.getProcessId();
+ }
+
+ @Override
+ public String getDescription() {
+ return span.getDescription();
+ }
+
+ @Override
+ public String toString() {
+ return String.format("Span{Id:0x%16x,parentId:0x%16x,pid:%s,desc:%s}",
+ getSpanId(), getParentId(),
+ getProcessId(), getDescription());
+ }
+
+ @Override
+ public Map<byte[], byte[]> getKVAnnotations() {
+ return Collections.emptyMap();
+ }
+
+ @Override
+ public List<TimelineAnnotation> getTimelineAnnotations() {
+ return Collections.emptyList();
+ }
+
+ @Override
+ public void addKVAnnotation(byte[] key, byte[] value) {}
+
+ @Override
+ public void addTimelineAnnotation(String msg) {}
+
+ @Override
+ public synchronized void stop() {}
+
+ @Override
+ public synchronized boolean isRunning() {
+ return false;
+ }
+
+ @Override
+ public synchronized long getAccumulatedMillis() {
+ return span.getStop() - span.getStart();
+ }
+
+ @Override
+ public Span child(String description) {
+ return null;
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-htrace/blob/a27cd4da/htrace-hbase/src/test/java/org/apache/htrace/viewer/TestHBaseSpanViewer.java
----------------------------------------------------------------------
diff --git a/htrace-hbase/src/test/java/org/apache/htrace/viewer/TestHBaseSpanViewer.java b/htrace-hbase/src/test/java/org/apache/htrace/viewer/TestHBaseSpanViewer.java
new file mode 100644
index 0000000..21e603a
--- /dev/null
+++ b/htrace-hbase/src/test/java/org/apache/htrace/viewer/TestHBaseSpanViewer.java
@@ -0,0 +1,97 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.htrace.viewer;
+
+import java.io.IOException;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.htrace.protobuf.generated.SpanProtos.Span;
+import org.apache.htrace.protobuf.generated.SpanProtos.TimelineAnnotation;
+import org.apache.htrace.viewer.HBaseSpanViewer;
+import org.junit.Test;
+import org.junit.Assert;
+
+public class TestHBaseSpanViewer {
+ private static final Log LOG = LogFactory.getLog(TestHBaseSpanViewer.class);
+
+ @Test
+ public void testProtoToJson() {
+ Span.Builder sbuilder = Span.newBuilder();
+ TimelineAnnotation.Builder tlbuilder = TimelineAnnotation.newBuilder();
+ sbuilder.clear().setTraceId(1)
+ .setParentId(2)
+ .setStart(3)
+ .setStop(4)
+ .setSpanId(5)
+ .setProcessId("pid")
+ .setDescription("description");
+ for (int i = 0; i < 3; i++) {
+ sbuilder.addTimeline(tlbuilder.clear()
+ .setTime(i)
+ .setMessage("message" + i)
+ .build());
+ }
+ Span span = sbuilder.build();
+ try {
+ String json = HBaseSpanViewer.toJsonString(span);
+ String expected =
+ "{\"trace_id\":\"1\","
+ + "\"parent_id\":\"2\","
+ + "\"start\":\"3\","
+ + "\"stop\":\"4\","
+ + "\"span_id\":\"5\","
+ + "\"process_id\":\"pid\","
+ + "\"description\":\"description\","
+ + "\"timeline\":["
+ + "{\"time\":\"0\",\"message\":\"message0\"},"
+ + "{\"time\":\"1\",\"message\":\"message1\"},"
+ + "{\"time\":\"2\",\"message\":\"message2\"}]}";
+ Assert.assertEquals(json, expected);
+ } catch (IOException e) {
+ Assert.fail("failed to get json from span. " + e.getMessage());
+ }
+ }
+
+ @Test
+ public void testProtoWithoutTimelineToJson() {
+ Span.Builder sbuilder = Span.newBuilder();
+ sbuilder.clear().setTraceId(1)
+ .setParentId(2)
+ .setStart(3)
+ .setStop(4)
+ .setSpanId(5)
+ .setProcessId("pid")
+ .setDescription("description");
+ Span span = sbuilder.build();
+ try {
+ String json = HBaseSpanViewer.toJsonString(span);
+ String expected =
+ "{\"trace_id\":\"1\","
+ + "\"parent_id\":\"2\","
+ + "\"start\":\"3\","
+ + "\"stop\":\"4\","
+ + "\"span_id\":\"5\","
+ + "\"process_id\":\"pid\","
+ + "\"description\":\"description\"}";
+ Assert.assertEquals(json, expected);
+ } catch (IOException e) {
+ Assert.fail("failed to get json from span. " + e.getMessage());
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-htrace/blob/a27cd4da/htrace-hbase/src/test/java/org/htrace/impl/HBaseTestUtil.java
----------------------------------------------------------------------
diff --git a/htrace-hbase/src/test/java/org/htrace/impl/HBaseTestUtil.java b/htrace-hbase/src/test/java/org/htrace/impl/HBaseTestUtil.java
deleted file mode 100644
index 5e8436b..0000000
--- a/htrace-hbase/src/test/java/org/htrace/impl/HBaseTestUtil.java
+++ /dev/null
@@ -1,84 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.htrace.impl;
-
-import java.io.IOException;
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.hbase.HBaseConfiguration;
-import org.apache.hadoop.hbase.HBaseTestingUtility;
-import org.apache.hadoop.hbase.HConstants;
-import org.apache.hadoop.hbase.client.HTableInterface;
-import org.apache.hadoop.hbase.trace.HBaseHTraceConfiguration;
-import org.apache.hadoop.hbase.util.Bytes;
-import org.junit.Assert;
-import org.htrace.SpanReceiver;
-import org.htrace.HTraceConfiguration;
-
-
-public class HBaseTestUtil {
- private static final Log LOG = LogFactory.getLog(HBaseTestUtil.class);
-
- public static Configuration configure(Configuration conf) {
- Configuration hconf = HBaseConfiguration.create(conf);
- hconf.set(HBaseHTraceConfiguration.KEY_PREFIX +
- HBaseSpanReceiver.COLLECTOR_QUORUM_KEY,
- conf.get(HConstants.ZOOKEEPER_QUORUM));
- hconf.setInt(HBaseHTraceConfiguration.KEY_PREFIX +
- HBaseSpanReceiver.ZOOKEEPER_CLIENT_PORT_KEY,
- conf.getInt(HConstants.ZOOKEEPER_CLIENT_PORT, 2181));
- hconf.set(HBaseHTraceConfiguration.KEY_PREFIX +
- HBaseSpanReceiver.ZOOKEEPER_ZNODE_PARENT_KEY,
- conf.get(HConstants.ZOOKEEPER_ZNODE_PARENT));
- return hconf;
- }
-
- public static HTableInterface createTable(HBaseTestingUtility util) {
- HTableInterface htable = null;
- try {
- htable = util.createTable(Bytes.toBytes(HBaseSpanReceiver.DEFAULT_TABLE),
- new byte[][]{Bytes.toBytes(HBaseSpanReceiver.DEFAULT_COLUMNFAMILY),
- Bytes.toBytes(HBaseSpanReceiver.DEFAULT_INDEXFAMILY)});
- } catch (IOException e) {
- Assert.fail("failed to create htrace table. " + e.getMessage());
- }
- return htable;
- }
-
- public static SpanReceiver startReceiver(Configuration conf) {
- SpanReceiver receiver = new HBaseSpanReceiver();
- receiver.configure(new HBaseHTraceConfiguration(conf));
- return receiver;
- }
-
- public static SpanReceiver startReceiver(HBaseTestingUtility util) {
- return startReceiver(configure(util.getConfiguration()));
- }
-
- public static void stopReceiver(SpanReceiver receiver) {
- if (receiver != null) {
- try {
- receiver.close();
- receiver = null;
- } catch (IOException e) {
- Assert.fail("failed to close span receiver. " + e.getMessage());
- }
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-htrace/blob/a27cd4da/htrace-hbase/src/test/java/org/htrace/impl/TestHBaseSpanReceiver.java
----------------------------------------------------------------------
diff --git a/htrace-hbase/src/test/java/org/htrace/impl/TestHBaseSpanReceiver.java b/htrace-hbase/src/test/java/org/htrace/impl/TestHBaseSpanReceiver.java
deleted file mode 100644
index a247f05..0000000
--- a/htrace-hbase/src/test/java/org/htrace/impl/TestHBaseSpanReceiver.java
+++ /dev/null
@@ -1,219 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.htrace.impl;
-
-import com.google.common.collect.Multimap;
-import java.io.InputStream;
-import java.io.ByteArrayInputStream;
-import java.io.IOException;
-import java.util.ArrayList;
-import java.util.Collection;
-import java.util.Collections;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
-import org.apache.hadoop.hbase.Cell;
-import org.apache.hadoop.hbase.CellUtil;
-import org.apache.hadoop.hbase.HBaseTestingUtility;
-import org.apache.hadoop.hbase.client.HTableInterface;
-import org.apache.hadoop.hbase.client.Result;
-import org.apache.hadoop.hbase.client.ResultScanner;
-import org.apache.hadoop.hbase.client.Scan;
-import org.apache.hadoop.hbase.util.Bytes;
-import org.junit.AfterClass;
-import org.junit.BeforeClass;
-import org.junit.Test;
-import org.junit.Assert;
-import org.htrace.Span;
-import org.htrace.SpanReceiver;
-import org.htrace.TimelineAnnotation;
-import org.htrace.TraceCreator;
-import org.htrace.TraceTree;
-import org.htrace.protobuf.generated.SpanProtos;
-
-
-public class TestHBaseSpanReceiver {
- private static final Log LOG = LogFactory.getLog(TestHBaseSpanReceiver.class);
- private static final HBaseTestingUtility UTIL = new HBaseTestingUtility();
-
- @BeforeClass
- public static void setUpBeforeClass() throws Exception {
- UTIL.startMiniCluster(1);
- }
-
- @AfterClass
- public static void tearDownAfterClass() throws Exception {
- UTIL.shutdownMiniCluster();
- }
-
- @Test
- public void testHBaseSpanReceiver() {
- HTableInterface htable = HBaseTestUtil.createTable(UTIL);
- SpanReceiver receiver = HBaseTestUtil.startReceiver(UTIL);
- TraceCreator tc = new TraceCreator(receiver);
- tc.createThreadedTrace();
- tc.createSimpleTrace();
- tc.createSampleRpcTrace();
- HBaseTestUtil.stopReceiver(receiver);
- Scan scan = new Scan();
- scan.addFamily(Bytes.toBytes(HBaseSpanReceiver.DEFAULT_COLUMNFAMILY));
- scan.setMaxVersions(1);
- ArrayList<Span> spans = new ArrayList<Span>();
- try {
- ResultScanner scanner = htable.getScanner(scan);
- Result result = null;
- while ((result = scanner.next()) != null) {
- for (Cell cell : result.listCells()) {
- InputStream in = new ByteArrayInputStream(cell.getQualifierArray(),
- cell.getQualifierOffset(),
- cell.getQualifierLength());
- spans.add(new TestSpan(SpanProtos.Span.parseFrom(in)));
- }
- }
- } catch (IOException e) {
- Assert.fail("failed to get spans from HBase. " + e.getMessage());
- }
-
- TraceTree traceTree = new TraceTree(spans);
- Collection<Span> roots = traceTree.getRoots();
- Assert.assertEquals(3, roots.size());
-
- Map<String, Span> descs = new HashMap<String, Span>();
- for (Span root : roots) {
- descs.put(root.getDescription(), root);
- }
- Assert.assertTrue(descs.keySet().contains(TraceCreator.RPC_TRACE_ROOT));
- Assert.assertTrue(descs.keySet().contains(TraceCreator.SIMPLE_TRACE_ROOT));
- Assert.assertTrue(descs.keySet().contains(TraceCreator.THREADED_TRACE_ROOT));
-
- Multimap<Long, Span> spansByParentId = traceTree.getSpansByParentIdMap();
- Span rpcRoot = descs.get(TraceCreator.RPC_TRACE_ROOT);
- Assert.assertEquals(1, spansByParentId.get(rpcRoot.getSpanId()).size());
- Span rpcChild1 = spansByParentId.get(rpcRoot.getSpanId()).iterator().next();
- Assert.assertEquals(1, spansByParentId.get(rpcChild1.getSpanId()).size());
- Span rpcChild2 = spansByParentId.get(rpcChild1.getSpanId()).iterator().next();
- Assert.assertEquals(1, spansByParentId.get(rpcChild2.getSpanId()).size());
- Span rpcChild3 = spansByParentId.get(rpcChild2.getSpanId()).iterator().next();
- Assert.assertEquals(0, spansByParentId.get(rpcChild3.getSpanId()).size());
-
- Scan iscan = new Scan();
- iscan.addColumn(Bytes.toBytes(HBaseSpanReceiver.DEFAULT_INDEXFAMILY),
- HBaseSpanReceiver.INDEX_SPAN_QUAL);
- try {
- ResultScanner scanner = htable.getScanner(iscan);
- Result result = null;
- while ((result = scanner.next()) != null) {
- for (Cell cell : result.listCells()) {
- InputStream in = new ByteArrayInputStream(cell.getValueArray(),
- cell.getValueOffset(),
- cell.getValueLength());
- Assert.assertEquals(SpanProtos.Span.parseFrom(in).getParentId(),
- Span.ROOT_SPAN_ID);
- }
- }
- } catch (IOException e) {
- Assert.fail("failed to get spans from index family. " + e.getMessage());
- }
- }
-
- private class TestSpan implements Span {
- SpanProtos.Span span;
-
- public TestSpan(SpanProtos.Span span) {
- this.span = span;
- }
-
- @Override
- public long getTraceId() {
- return span.getTraceId();
- }
-
- @Override
- public long getParentId() {
- return span.getParentId();
- }
-
- @Override
- public long getStartTimeMillis() {
- return span.getStart();
- }
-
- @Override
- public long getStopTimeMillis() {
- return span.getStop();
- }
-
- @Override
- public long getSpanId() {
- return span.getSpanId();
- }
-
- @Override
- public String getProcessId() {
- return span.getProcessId();
- }
-
- @Override
- public String getDescription() {
- return span.getDescription();
- }
-
- @Override
- public String toString() {
- return String.format("Span{Id:0x%16x,parentId:0x%16x,pid:%s,desc:%s}",
- getSpanId(), getParentId(),
- getProcessId(), getDescription());
- }
-
- @Override
- public Map<byte[], byte[]> getKVAnnotations() {
- return Collections.emptyMap();
- }
-
- @Override
- public List<TimelineAnnotation> getTimelineAnnotations() {
- return Collections.emptyList();
- }
-
- @Override
- public void addKVAnnotation(byte[] key, byte[] value) {}
-
- @Override
- public void addTimelineAnnotation(String msg) {}
-
- @Override
- public synchronized void stop() {}
-
- @Override
- public synchronized boolean isRunning() {
- return false;
- }
-
- @Override
- public synchronized long getAccumulatedMillis() {
- return span.getStop() - span.getStart();
- }
-
- @Override
- public Span child(String description) {
- return null;
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-htrace/blob/a27cd4da/htrace-hbase/src/test/java/org/htrace/viewer/TestHBaseSpanViewer.java
----------------------------------------------------------------------
diff --git a/htrace-hbase/src/test/java/org/htrace/viewer/TestHBaseSpanViewer.java b/htrace-hbase/src/test/java/org/htrace/viewer/TestHBaseSpanViewer.java
deleted file mode 100644
index 9c5515c..0000000
--- a/htrace-hbase/src/test/java/org/htrace/viewer/TestHBaseSpanViewer.java
+++ /dev/null
@@ -1,95 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.htrace.viewer;
-
-import java.io.IOException;
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
-import org.junit.Test;
-import org.junit.Assert;
-import org.htrace.protobuf.generated.SpanProtos.Span;
-import org.htrace.protobuf.generated.SpanProtos.TimelineAnnotation;
-
-public class TestHBaseSpanViewer {
- private static final Log LOG = LogFactory.getLog(TestHBaseSpanViewer.class);
-
- @Test
- public void testProtoToJson() {
- Span.Builder sbuilder = Span.newBuilder();
- TimelineAnnotation.Builder tlbuilder = TimelineAnnotation.newBuilder();
- sbuilder.clear().setTraceId(1)
- .setParentId(2)
- .setStart(3)
- .setStop(4)
- .setSpanId(5)
- .setProcessId("pid")
- .setDescription("description");
- for (int i = 0; i < 3; i++) {
- sbuilder.addTimeline(tlbuilder.clear()
- .setTime(i)
- .setMessage("message" + i)
- .build());
- }
- Span span = sbuilder.build();
- try {
- String json = HBaseSpanViewer.toJsonString(span);
- String expected =
- "{\"trace_id\":\"1\","
- + "\"parent_id\":\"2\","
- + "\"start\":\"3\","
- + "\"stop\":\"4\","
- + "\"span_id\":\"5\","
- + "\"process_id\":\"pid\","
- + "\"description\":\"description\","
- + "\"timeline\":["
- + "{\"time\":\"0\",\"message\":\"message0\"},"
- + "{\"time\":\"1\",\"message\":\"message1\"},"
- + "{\"time\":\"2\",\"message\":\"message2\"}]}";
- Assert.assertEquals(json, expected);
- } catch (IOException e) {
- Assert.fail("failed to get json from span. " + e.getMessage());
- }
- }
-
- @Test
- public void testProtoWithoutTimelineToJson() {
- Span.Builder sbuilder = Span.newBuilder();
- sbuilder.clear().setTraceId(1)
- .setParentId(2)
- .setStart(3)
- .setStop(4)
- .setSpanId(5)
- .setProcessId("pid")
- .setDescription("description");
- Span span = sbuilder.build();
- try {
- String json = HBaseSpanViewer.toJsonString(span);
- String expected =
- "{\"trace_id\":\"1\","
- + "\"parent_id\":\"2\","
- + "\"start\":\"3\","
- + "\"stop\":\"4\","
- + "\"span_id\":\"5\","
- + "\"process_id\":\"pid\","
- + "\"description\":\"description\"}";
- Assert.assertEquals(json, expected);
- } catch (IOException e) {
- Assert.fail("failed to get json from span. " + e.getMessage());
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-htrace/blob/a27cd4da/htrace-zipkin/pom.xml
----------------------------------------------------------------------
diff --git a/htrace-zipkin/pom.xml b/htrace-zipkin/pom.xml
index d9f8116..81b712e 100644
--- a/htrace-zipkin/pom.xml
+++ b/htrace-zipkin/pom.xml
@@ -17,7 +17,7 @@ language governing permissions and limitations under the License. -->
<parent>
<artifactId>htrace</artifactId>
- <groupId>org.htrace</groupId>
+ <groupId>org.apache.htrace</groupId>
<version>3.0.4</version>
</parent>
@@ -66,7 +66,7 @@ language governing permissions and limitations under the License. -->
<dependencies>
<!-- Module deps. -->
<dependency>
- <groupId>org.htrace</groupId>
+ <groupId>org.apache.htrace</groupId>
<artifactId>htrace-core</artifactId>
<version>${project.version}</version>
<scope>provided</scope>
http://git-wip-us.apache.org/repos/asf/incubator-htrace/blob/a27cd4da/htrace-zipkin/src/main/java/org/apache/htrace/impl/ZipkinSpanReceiver.java
----------------------------------------------------------------------
diff --git a/htrace-zipkin/src/main/java/org/apache/htrace/impl/ZipkinSpanReceiver.java b/htrace-zipkin/src/main/java/org/apache/htrace/impl/ZipkinSpanReceiver.java
new file mode 100644
index 0000000..86f32f7
--- /dev/null
+++ b/htrace-zipkin/src/main/java/org/apache/htrace/impl/ZipkinSpanReceiver.java
@@ -0,0 +1,363 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.htrace.impl;
+
+import com.google.common.util.concurrent.ThreadFactoryBuilder;
+import com.twitter.zipkin.gen.LogEntry;
+import com.twitter.zipkin.gen.Scribe;
+
+import org.apache.commons.codec.binary.Base64;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.htrace.HTraceConfiguration;
+import org.apache.htrace.Span;
+import org.apache.htrace.SpanReceiver;
+import org.apache.htrace.zipkin.HTraceToZipkinConverter;
+import org.apache.thrift.protocol.TBinaryProtocol;
+import org.apache.thrift.protocol.TProtocol;
+import org.apache.thrift.protocol.TProtocolFactory;
+import org.apache.thrift.transport.TFramedTransport;
+import org.apache.thrift.transport.TIOStreamTransport;
+import org.apache.thrift.transport.TSocket;
+import org.apache.thrift.transport.TTransport;
+import org.apache.thrift.transport.TTransportException;
+
+import java.io.ByteArrayOutputStream;
+import java.io.IOException;
+import java.net.InetAddress;
+import java.net.UnknownHostException;
+import java.nio.ByteBuffer;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.concurrent.ArrayBlockingQueue;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ThreadFactory;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
+
+/**
+ * Zipkin is an open source tracing library. This span receiver acts as a bridge between HTrace and
+ * Zipkin, that converts HTrace Span objects into Zipkin Span objects.
+ * <p/>
+ * HTrace spans are queued into a blocking queue. From there background worker threads will
+ * batch the spans together and then send them through to a Zipkin collector.
+ */
+public class ZipkinSpanReceiver implements SpanReceiver {
+ private static final Log LOG = LogFactory.getLog(ZipkinSpanReceiver.class);
+
+ /**
+ * Default hostname to fall back on.
+ */
+ private static final String DEFAULT_COLLECTOR_HOSTNAME = "localhost";
+
+ /**
+ * Default collector port.
+ */
+ private static final int DEFAULT_COLLECTOR_PORT = 9410; // trace collector default port.
+
+ /**
+ * this is used to tell scribe that the entries are for zipkin..
+ */
+ private static final String CATEGORY = "zipkin";
+
+ /**
+ * Whether the service which is traced is in client or a server mode. It is used while creating
+ * the Endpoint.
+ */
+ private static final boolean DEFAULT_IN_CLIENT_MODE = false;
+
+ /**
+ * How long this receiver will try and wait for all threads to shutdown.
+ */
+ private static final int SHUTDOWN_TIMEOUT = 30;
+
+ /**
+ * How many spans this receiver will try and send in one batch.
+ */
+ private static final int MAX_SPAN_BATCH_SIZE = 100;
+
+ /**
+ * How many errors in a row before we start dropping traces on the floor.
+ */
+ private static final int MAX_ERRORS = 10;
+
+ /**
+ * The queue that will get all HTrace spans that are to be sent.
+ */
+ private final BlockingQueue<Span> queue;
+
+ /**
+ * Factory used to encode a Zipkin Span to bytes.
+ */
+ private final TProtocolFactory protocolFactory;
+
+ /**
+ * Boolean used to signal that the threads should end.
+ */
+ private final AtomicBoolean running = new AtomicBoolean(true);
+
+ /**
+ * The thread factory used to create new ExecutorService.
+ * <p/>
+ * This will be the same factory for the lifetime of this object so that
+ * no thread names will ever be duplicated.
+ */
+ private final ThreadFactory tf;
+
+ ////////////////////
+ /// Variables that will change on each call to configure()
+ ///////////////////
+ private HTraceToZipkinConverter converter;
+ private ExecutorService service;
+ private HTraceConfiguration conf;
+ private String collectorHostname;
+ private int collectorPort;
+
+ public ZipkinSpanReceiver() {
+ this.queue = new ArrayBlockingQueue<Span>(1000);
+ this.protocolFactory = new TBinaryProtocol.Factory();
+
+ tf = new ThreadFactoryBuilder().setDaemon(true)
+ .setNameFormat("zipkinSpanReceiver-%d")
+ .build();
+ }
+
+ @Override
+ public void configure(HTraceConfiguration conf) {
+ this.conf = conf;
+
+ this.collectorHostname = conf.get("zipkin.collector-hostname",
+ DEFAULT_COLLECTOR_HOSTNAME);
+ this.collectorPort = conf.getInt("zipkin.collector-port",
+ DEFAULT_COLLECTOR_PORT);
+
+ // initialize the endpoint. This endpoint is used while writing the Span.
+ initConverter();
+
+ int numThreads = conf.getInt("zipkin.num-threads", 1);
+
+ // If there are already threads runnnig tear them down.
+ if (this.service != null) {
+ this.service.shutdownNow();
+ this.service = null;
+ }
+
+ this.service = Executors.newFixedThreadPool(numThreads, tf);
+
+ for (int i = 0; i < numThreads; i++) {
+ this.service.submit(new WriteSpanRunnable());
+ }
+ }
+
+ /**
+ * Set up the HTrace to Zipkin converter.
+ */
+ private void initConverter() {
+ InetAddress tracedServiceHostname = null;
+ // Try and get the hostname. If it's not configured try and get the local hostname.
+ try {
+ String host = conf.get("zipkin.traced-service-hostname",
+ InetAddress.getLocalHost().getHostAddress());
+
+ tracedServiceHostname = InetAddress.getByName(host);
+ } catch (UnknownHostException e) {
+ LOG.error("Couldn't get the localHost address", e);
+ }
+ short tracedServicePort = (short) conf.getInt("zipkin.traced-service-port", -1);
+ byte[] address = tracedServiceHostname != null
+ ? tracedServiceHostname.getAddress() : DEFAULT_COLLECTOR_HOSTNAME.getBytes();
+ int ipv4 = ByteBuffer.wrap(address).getInt();
+ this.converter = new HTraceToZipkinConverter(ipv4, tracedServicePort);
+ }
+
+
+ private class WriteSpanRunnable implements Runnable {
+ /**
+ * scribe client to push zipkin spans
+ */
+ private Scribe.Client scribeClient = null;
+ private final ByteArrayOutputStream baos;
+ private final TProtocol streamProtocol;
+
+ public WriteSpanRunnable() {
+ baos = new ByteArrayOutputStream();
+ streamProtocol = protocolFactory.getProtocol(new TIOStreamTransport(baos));
+ }
+
+ /**
+ * This runnable converts a HTrace span to a Zipkin span and sends it across the zipkin
+ * collector as a thrift object. The scribe client which is used for rpc writes a list of
+ * LogEntry objects, so the span objects are first transformed into LogEntry objects before
+ * sending to the zipkin-collector.
+ * <p/>
+ * Here is a little ascii art which shows the above transformation:
+ * <pre>
+ * +------------+ +------------+ +------------+ +-----------------+
+ * | HTrace Span|-->|Zipkin Span |-->| (LogEntry) | ===========> | Zipkin Collector|
+ * +------------+ +------------+ +------------+ (Scribe rpc) +-----------------+
+ * </pre>
+ */
+ @Override
+ public void run() {
+
+ List<Span> dequeuedSpans = new ArrayList<Span>(MAX_SPAN_BATCH_SIZE);
+
+ long errorCount = 0;
+
+ while (running.get() || queue.size() > 0) {
+ Span firstSpan = null;
+ try {
+ // Block for up to a second. to try and get a span.
+ // We only block for a little bit in order to notice if the running value has changed
+ firstSpan = queue.poll(1, TimeUnit.SECONDS);
+
+ // If the poll was successful then it's possible that there
+ // will be other spans to get. Try and get them.
+ if (firstSpan != null) {
+ // Add the first one that we got
+ dequeuedSpans.add(firstSpan);
+ // Try and get up to 100 queues
+ queue.drainTo(dequeuedSpans, MAX_SPAN_BATCH_SIZE - 1);
+ }
+
+ } catch (InterruptedException ie) {
+ // Ignored.
+ }
+
+ if (dequeuedSpans.isEmpty()) continue;
+
+ // If this is the first time through or there was an error re-connect
+ if (scribeClient == null) {
+ startClient();
+ }
+ // Create a new list every time through so that the list doesn't change underneath
+ // thrift as it's sending.
+ List<LogEntry> entries = new ArrayList<LogEntry>(dequeuedSpans.size());
+ try {
+ // Convert every de-queued span
+ for (Span htraceSpan : dequeuedSpans) {
+ // convert the HTrace span to Zipkin span
+ com.twitter.zipkin.gen.Span zipkinSpan = converter.convert(htraceSpan);
+ // Clear any old data.
+ baos.reset();
+ // Write the span to a BAOS
+ zipkinSpan.write(streamProtocol);
+
+ // Do Base64 encoding and put the string into a log entry.
+ LogEntry logEntry =
+ new LogEntry(CATEGORY, Base64.encodeBase64String(baos.toByteArray()));
+ entries.add(logEntry);
+ }
+
+ // Send the entries
+ scribeClient.Log(entries);
+ // clear the list for the next time through.
+ dequeuedSpans.clear();
+ // reset the error counter.
+ errorCount = 0;
+ } catch (Exception e) {
+ LOG.error("Error when writing to the zipkin collector: " +
+ collectorHostname + ":" + collectorPort, e);
+
+ errorCount += 1;
+ // If there have been ten errors in a row start dropping things.
+ if (errorCount < MAX_ERRORS) {
+ try {
+ queue.addAll(dequeuedSpans);
+ } catch (IllegalStateException ex) {
+ LOG.error("Drop " + dequeuedSpans.size() + " span(s) because queue is full");
+ }
+ }
+
+ closeClient();
+ try {
+ // Since there was an error sleep just a little bit to try and allow the
+ // zipkin collector some time to recover.
+ Thread.sleep(500);
+ } catch (InterruptedException e1) {
+ // Ignored
+ }
+ }
+ }
+ closeClient();
+ }
+
+ /**
+ * Close out the connection.
+ */
+ private void closeClient() {
+ // close out the transport.
+ if (scribeClient != null) {
+ scribeClient.getInputProtocol().getTransport().close();
+ scribeClient = null;
+ }
+ }
+
+ /**
+ * Re-connect to Zipkin.
+ */
+ private void startClient() {
+ if (this.scribeClient == null) {
+ TTransport transport = new TFramedTransport(new TSocket(collectorHostname, collectorPort));
+ try {
+ transport.open();
+ } catch (TTransportException e) {
+ e.printStackTrace();
+ }
+ TProtocol protocol = protocolFactory.getProtocol(transport);
+ this.scribeClient = new Scribe.Client(protocol);
+ }
+ }
+ }
+
+ /**
+ * Close the receiver.
+ * <p/>
+ * This tries to shut
+ *
+ * @throws IOException
+ */
+ @Override
+ public void close() throws IOException {
+ running.set(false);
+ service.shutdown();
+ try {
+ if (!service.awaitTermination(SHUTDOWN_TIMEOUT, TimeUnit.SECONDS)) {
+ LOG.error("Was not able to process all remaining spans to write upon closing in: " +
+ SHUTDOWN_TIMEOUT + " " + TimeUnit.SECONDS + ". There could be un-sent spans still left." +
+ " They have been dropped.");
+ }
+ } catch (InterruptedException e1) {
+ LOG.warn("Thread interrupted when terminating executor.", e1);
+ }
+ }
+
+ @Override
+ public void receiveSpan(Span span) {
+ if (running.get()) {
+ try {
+ this.queue.add(span);
+ } catch (IllegalStateException e) {
+ LOG.error("Error trying to append span (" + span.getDescription() + ") to the queue."
+ + " Blocking Queue was full.");
+ }
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-htrace/blob/a27cd4da/htrace-zipkin/src/main/java/org/apache/htrace/zipkin/HTraceToZipkinConverter.java
----------------------------------------------------------------------
diff --git a/htrace-zipkin/src/main/java/org/apache/htrace/zipkin/HTraceToZipkinConverter.java b/htrace-zipkin/src/main/java/org/apache/htrace/zipkin/HTraceToZipkinConverter.java
new file mode 100644
index 0000000..09ab1ea
--- /dev/null
+++ b/htrace-zipkin/src/main/java/org/apache/htrace/zipkin/HTraceToZipkinConverter.java
@@ -0,0 +1,195 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.htrace.zipkin;
+
+import com.twitter.zipkin.gen.Annotation;
+import com.twitter.zipkin.gen.AnnotationType;
+import com.twitter.zipkin.gen.BinaryAnnotation;
+import com.twitter.zipkin.gen.Endpoint;
+import com.twitter.zipkin.gen.Span;
+import com.twitter.zipkin.gen.zipkinCoreConstants;
+
+import org.apache.htrace.TimelineAnnotation;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+/**
+ * This class is responsible for converting a HTrace.Span to a Zipkin.Span object. To use the Zipkin
+ * infrastructure (collector, front end), we need to store the Span information in a zipkin specific
+ * format. This class transforms a HTrace:Span object to a Zipkin:Span object.
+ * <p/>
+ * This is how both Span objects are related:
+ * <table>
+ * <col width="50%"/> <col width="50%"/> <thead>
+ * <tr>
+ * <th>HTrace:Span</th>
+ * <th>Zipkin:Span</th>
+ * </tr>
+ * <thead> <tbody>
+ * <tr>
+ * <td>TraceId</td>
+ * <td>TraceId</td>
+ * </tr>
+ * <tr>
+ * <td>ParentId</td>
+ * <td>ParentId</td>
+ * </tr>
+ * <tr>
+ * <td>SpanId</td>
+ * <td>id</td>
+ * </tr>
+ * <tr>
+ * <td>Description</td>
+ * <td>Name</td>
+ * </tr>
+ * <tr>
+ * <td>startTime, stopTime</td>
+ * <td>Annotations (cs, cr, sr, ss)</td>
+ * </tr>
+ * <tr>
+ * <td>Other annotations</td>
+ * <td>Annotations</td>
+ * </tr>
+ * </tbody>
+ * </table>
+ * <p/>
+ */
+public class HTraceToZipkinConverter {
+
+ private final int ipv4Address;
+ private final short port;
+
+
+ private static final Map<String, Integer> DEFAULT_PORTS = new HashMap<String, Integer>();
+
+ static {
+ DEFAULT_PORTS.put("hmaster", 60000);
+ DEFAULT_PORTS.put("hregionserver", 60020);
+ DEFAULT_PORTS.put("namenode", 8020);
+ DEFAULT_PORTS.put("datanode", 50010);
+ }
+
+ public HTraceToZipkinConverter(int ipv4Address, short port) {
+ this.ipv4Address = ipv4Address;
+ this.port = port;
+ }
+
+ /**
+ * Converts a given HTrace span to a Zipkin Span.
+ * <ul>
+ * <li>First set the start annotation. [CS, SR], depending whether it is a client service or not.
+ * <li>Set other id's, etc [TraceId's etc]
+ * <li>Create binary annotations based on data from HTrace Span object.
+ * <li>Set the last annotation. [SS, CR]
+ * </ul>
+ */
+ public Span convert(org.apache.htrace.Span hTraceSpan) {
+ Span zipkinSpan = new Span();
+ String serviceName = hTraceSpan.getProcessId().toLowerCase();
+ Endpoint ep = new Endpoint(ipv4Address, (short) getPort(serviceName), serviceName);
+ List<Annotation> annotationList = createZipkinAnnotations(hTraceSpan, ep);
+ List<BinaryAnnotation> binaryAnnotationList = createZipkinBinaryAnnotations(hTraceSpan, ep);
+ zipkinSpan.setTrace_id(hTraceSpan.getTraceId());
+ if (hTraceSpan.getParentId() != org.apache.htrace.Span.ROOT_SPAN_ID) {
+ zipkinSpan.setParent_id(hTraceSpan.getParentId());
+ }
+ zipkinSpan.setId(hTraceSpan.getSpanId());
+ zipkinSpan.setName(hTraceSpan.getDescription());
+ zipkinSpan.setAnnotations(annotationList);
+ zipkinSpan.setBinary_annotations(binaryAnnotationList);
+ return zipkinSpan;
+ }
+
+ /**
+ * Add annotations from the htrace Span.
+ */
+ private List<Annotation> createZipkinAnnotations(org.apache.htrace.Span hTraceSpan,
+ Endpoint ep) {
+ List<Annotation> annotationList = new ArrayList<Annotation>();
+
+ // add first zipkin annotation.
+ annotationList.add(createZipkinAnnotation(zipkinCoreConstants.CLIENT_SEND, hTraceSpan.getStartTimeMillis(), ep, true));
+ annotationList.add(createZipkinAnnotation(zipkinCoreConstants.SERVER_RECV, hTraceSpan.getStartTimeMillis(), ep, true));
+ // add HTrace time annotation
+ for (TimelineAnnotation ta : hTraceSpan.getTimelineAnnotations()) {
+ annotationList.add(createZipkinAnnotation(ta.getMessage(), ta.getTime(), ep, true));
+ }
+ // add last zipkin annotation
+ annotationList.add(createZipkinAnnotation(zipkinCoreConstants.SERVER_SEND, hTraceSpan.getStopTimeMillis(), ep, false));
+ annotationList.add(createZipkinAnnotation(zipkinCoreConstants.CLIENT_RECV, hTraceSpan.getStopTimeMillis(), ep, false));
+ return annotationList;
+ }
+
+ /**
+ * Creates a list of Annotations that are present in HTrace Span object.
+ *
+ * @return list of Annotations that could be added to Zipkin Span.
+ */
+ private List<BinaryAnnotation> createZipkinBinaryAnnotations(org.apache.htrace.Span span,
+ Endpoint ep) {
+ List<BinaryAnnotation> l = new ArrayList<BinaryAnnotation>();
+ for (Map.Entry<byte[], byte[]> e : span.getKVAnnotations().entrySet()) {
+ BinaryAnnotation binaryAnn = new BinaryAnnotation();
+ binaryAnn.setAnnotation_type(AnnotationType.BYTES);
+ binaryAnn.setKey(new String(e.getKey()));
+ binaryAnn.setValue(e.getValue());
+ binaryAnn.setHost(ep);
+ l.add(binaryAnn);
+ }
+ return l;
+ }
+
+ /**
+ * Create an annotation with the correct times and endpoint.
+ *
+ * @param value Annotation value
+ * @param time timestamp will be extracted
+ * @param ep the endopint this annotation will be associated with.
+ * @param sendRequest use the first or last timestamp.
+ */
+ private static Annotation createZipkinAnnotation(String value, long time,
+ Endpoint ep, boolean sendRequest) {
+ Annotation annotation = new Annotation();
+ annotation.setHost(ep);
+
+ // Zipkin is in microseconds
+ if (sendRequest) {
+ annotation.setTimestamp(time * 1000);
+ } else {
+ annotation.setTimestamp(time * 1000);
+ }
+
+ annotation.setDuration(1);
+ annotation.setValue(value);
+ return annotation;
+ }
+
+ private int getPort(String serviceName) {
+ if (port != -1) {
+ return port;
+ }
+
+ Integer p = DEFAULT_PORTS.get(serviceName);
+ if (p != null) {
+ return p;
+ }
+ return 80;
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-htrace/blob/a27cd4da/htrace-zipkin/src/main/java/org/htrace/impl/ZipkinSpanReceiver.java
----------------------------------------------------------------------
diff --git a/htrace-zipkin/src/main/java/org/htrace/impl/ZipkinSpanReceiver.java b/htrace-zipkin/src/main/java/org/htrace/impl/ZipkinSpanReceiver.java
deleted file mode 100644
index 9bd178c..0000000
--- a/htrace-zipkin/src/main/java/org/htrace/impl/ZipkinSpanReceiver.java
+++ /dev/null
@@ -1,362 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.htrace.impl;
-
-import com.google.common.util.concurrent.ThreadFactoryBuilder;
-import com.twitter.zipkin.gen.LogEntry;
-import com.twitter.zipkin.gen.Scribe;
-import org.apache.commons.codec.binary.Base64;
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
-import org.apache.thrift.protocol.TBinaryProtocol;
-import org.apache.thrift.protocol.TProtocol;
-import org.apache.thrift.protocol.TProtocolFactory;
-import org.apache.thrift.transport.TFramedTransport;
-import org.apache.thrift.transport.TIOStreamTransport;
-import org.apache.thrift.transport.TSocket;
-import org.apache.thrift.transport.TTransport;
-import org.apache.thrift.transport.TTransportException;
-import org.htrace.HTraceConfiguration;
-import org.htrace.Span;
-import org.htrace.SpanReceiver;
-import org.htrace.zipkin.HTraceToZipkinConverter;
-
-import java.io.ByteArrayOutputStream;
-import java.io.IOException;
-import java.net.InetAddress;
-import java.net.UnknownHostException;
-import java.nio.ByteBuffer;
-import java.util.ArrayList;
-import java.util.List;
-import java.util.concurrent.ArrayBlockingQueue;
-import java.util.concurrent.BlockingQueue;
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Executors;
-import java.util.concurrent.ThreadFactory;
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.atomic.AtomicBoolean;
-
-/**
- * Zipkin is an open source tracing library. This span receiver acts as a bridge between HTrace and
- * Zipkin, that converts HTrace Span objects into Zipkin Span objects.
- * <p/>
- * HTrace spans are queued into a blocking queue. From there background worker threads will
- * batch the spans together and then send them through to a Zipkin collector.
- */
-public class ZipkinSpanReceiver implements SpanReceiver {
- private static final Log LOG = LogFactory.getLog(ZipkinSpanReceiver.class);
-
- /**
- * Default hostname to fall back on.
- */
- private static final String DEFAULT_COLLECTOR_HOSTNAME = "localhost";
-
- /**
- * Default collector port.
- */
- private static final int DEFAULT_COLLECTOR_PORT = 9410; // trace collector default port.
-
- /**
- * this is used to tell scribe that the entries are for zipkin..
- */
- private static final String CATEGORY = "zipkin";
-
- /**
- * Whether the service which is traced is in client or a server mode. It is used while creating
- * the Endpoint.
- */
- private static final boolean DEFAULT_IN_CLIENT_MODE = false;
-
- /**
- * How long this receiver will try and wait for all threads to shutdown.
- */
- private static final int SHUTDOWN_TIMEOUT = 30;
-
- /**
- * How many spans this receiver will try and send in one batch.
- */
- private static final int MAX_SPAN_BATCH_SIZE = 100;
-
- /**
- * How many errors in a row before we start dropping traces on the floor.
- */
- private static final int MAX_ERRORS = 10;
-
- /**
- * The queue that will get all HTrace spans that are to be sent.
- */
- private final BlockingQueue<Span> queue;
-
- /**
- * Factory used to encode a Zipkin Span to bytes.
- */
- private final TProtocolFactory protocolFactory;
-
- /**
- * Boolean used to signal that the threads should end.
- */
- private final AtomicBoolean running = new AtomicBoolean(true);
-
- /**
- * The thread factory used to create new ExecutorService.
- * <p/>
- * This will be the same factory for the lifetime of this object so that
- * no thread names will ever be duplicated.
- */
- private final ThreadFactory tf;
-
- ////////////////////
- /// Variables that will change on each call to configure()
- ///////////////////
- private HTraceToZipkinConverter converter;
- private ExecutorService service;
- private HTraceConfiguration conf;
- private String collectorHostname;
- private int collectorPort;
-
- public ZipkinSpanReceiver() {
- this.queue = new ArrayBlockingQueue<Span>(1000);
- this.protocolFactory = new TBinaryProtocol.Factory();
-
- tf = new ThreadFactoryBuilder().setDaemon(true)
- .setNameFormat("zipkinSpanReceiver-%d")
- .build();
- }
-
- @Override
- public void configure(HTraceConfiguration conf) {
- this.conf = conf;
-
- this.collectorHostname = conf.get("zipkin.collector-hostname",
- DEFAULT_COLLECTOR_HOSTNAME);
- this.collectorPort = conf.getInt("zipkin.collector-port",
- DEFAULT_COLLECTOR_PORT);
-
- // initialize the endpoint. This endpoint is used while writing the Span.
- initConverter();
-
- int numThreads = conf.getInt("zipkin.num-threads", 1);
-
- // If there are already threads runnnig tear them down.
- if (this.service != null) {
- this.service.shutdownNow();
- this.service = null;
- }
-
- this.service = Executors.newFixedThreadPool(numThreads, tf);
-
- for (int i = 0; i < numThreads; i++) {
- this.service.submit(new WriteSpanRunnable());
- }
- }
-
- /**
- * Set up the HTrace to Zipkin converter.
- */
- private void initConverter() {
- InetAddress tracedServiceHostname = null;
- // Try and get the hostname. If it's not configured try and get the local hostname.
- try {
- String host = conf.get("zipkin.traced-service-hostname",
- InetAddress.getLocalHost().getHostAddress());
-
- tracedServiceHostname = InetAddress.getByName(host);
- } catch (UnknownHostException e) {
- LOG.error("Couldn't get the localHost address", e);
- }
- short tracedServicePort = (short) conf.getInt("zipkin.traced-service-port", -1);
- byte[] address = tracedServiceHostname != null
- ? tracedServiceHostname.getAddress() : DEFAULT_COLLECTOR_HOSTNAME.getBytes();
- int ipv4 = ByteBuffer.wrap(address).getInt();
- this.converter = new HTraceToZipkinConverter(ipv4, tracedServicePort);
- }
-
-
- private class WriteSpanRunnable implements Runnable {
- /**
- * scribe client to push zipkin spans
- */
- private Scribe.Client scribeClient = null;
- private final ByteArrayOutputStream baos;
- private final TProtocol streamProtocol;
-
- public WriteSpanRunnable() {
- baos = new ByteArrayOutputStream();
- streamProtocol = protocolFactory.getProtocol(new TIOStreamTransport(baos));
- }
-
- /**
- * This runnable converts a HTrace span to a Zipkin span and sends it across the zipkin
- * collector as a thrift object. The scribe client which is used for rpc writes a list of
- * LogEntry objects, so the span objects are first transformed into LogEntry objects before
- * sending to the zipkin-collector.
- * <p/>
- * Here is a little ascii art which shows the above transformation:
- * <pre>
- * +------------+ +------------+ +------------+ +-----------------+
- * | HTrace Span|-->|Zipkin Span |-->| (LogEntry) | ===========> | Zipkin Collector|
- * +------------+ +------------+ +------------+ (Scribe rpc) +-----------------+
- * </pre>
- */
- @Override
- public void run() {
-
- List<Span> dequeuedSpans = new ArrayList<Span>(MAX_SPAN_BATCH_SIZE);
-
- long errorCount = 0;
-
- while (running.get() || queue.size() > 0) {
- Span firstSpan = null;
- try {
- // Block for up to a second. to try and get a span.
- // We only block for a little bit in order to notice if the running value has changed
- firstSpan = queue.poll(1, TimeUnit.SECONDS);
-
- // If the poll was successful then it's possible that there
- // will be other spans to get. Try and get them.
- if (firstSpan != null) {
- // Add the first one that we got
- dequeuedSpans.add(firstSpan);
- // Try and get up to 100 queues
- queue.drainTo(dequeuedSpans, MAX_SPAN_BATCH_SIZE - 1);
- }
-
- } catch (InterruptedException ie) {
- // Ignored.
- }
-
- if (dequeuedSpans.isEmpty()) continue;
-
- // If this is the first time through or there was an error re-connect
- if (scribeClient == null) {
- startClient();
- }
- // Create a new list every time through so that the list doesn't change underneath
- // thrift as it's sending.
- List<LogEntry> entries = new ArrayList<LogEntry>(dequeuedSpans.size());
- try {
- // Convert every de-queued span
- for (Span htraceSpan : dequeuedSpans) {
- // convert the HTrace span to Zipkin span
- com.twitter.zipkin.gen.Span zipkinSpan = converter.convert(htraceSpan);
- // Clear any old data.
- baos.reset();
- // Write the span to a BAOS
- zipkinSpan.write(streamProtocol);
-
- // Do Base64 encoding and put the string into a log entry.
- LogEntry logEntry =
- new LogEntry(CATEGORY, Base64.encodeBase64String(baos.toByteArray()));
- entries.add(logEntry);
- }
-
- // Send the entries
- scribeClient.Log(entries);
- // clear the list for the next time through.
- dequeuedSpans.clear();
- // reset the error counter.
- errorCount = 0;
- } catch (Exception e) {
- LOG.error("Error when writing to the zipkin collector: " +
- collectorHostname + ":" + collectorPort, e);
-
- errorCount += 1;
- // If there have been ten errors in a row start dropping things.
- if (errorCount < MAX_ERRORS) {
- try {
- queue.addAll(dequeuedSpans);
- } catch (IllegalStateException ex) {
- LOG.error("Drop " + dequeuedSpans.size() + " span(s) because queue is full");
- }
- }
-
- closeClient();
- try {
- // Since there was an error sleep just a little bit to try and allow the
- // zipkin collector some time to recover.
- Thread.sleep(500);
- } catch (InterruptedException e1) {
- // Ignored
- }
- }
- }
- closeClient();
- }
-
- /**
- * Close out the connection.
- */
- private void closeClient() {
- // close out the transport.
- if (scribeClient != null) {
- scribeClient.getInputProtocol().getTransport().close();
- scribeClient = null;
- }
- }
-
- /**
- * Re-connect to Zipkin.
- */
- private void startClient() {
- if (this.scribeClient == null) {
- TTransport transport = new TFramedTransport(new TSocket(collectorHostname, collectorPort));
- try {
- transport.open();
- } catch (TTransportException e) {
- e.printStackTrace();
- }
- TProtocol protocol = protocolFactory.getProtocol(transport);
- this.scribeClient = new Scribe.Client(protocol);
- }
- }
- }
-
- /**
- * Close the receiver.
- * <p/>
- * This tries to shut
- *
- * @throws IOException
- */
- @Override
- public void close() throws IOException {
- running.set(false);
- service.shutdown();
- try {
- if (!service.awaitTermination(SHUTDOWN_TIMEOUT, TimeUnit.SECONDS)) {
- LOG.error("Was not able to process all remaining spans to write upon closing in: " +
- SHUTDOWN_TIMEOUT + " " + TimeUnit.SECONDS + ". There could be un-sent spans still left." +
- " They have been dropped.");
- }
- } catch (InterruptedException e1) {
- LOG.warn("Thread interrupted when terminating executor.", e1);
- }
- }
-
- @Override
- public void receiveSpan(Span span) {
- if (running.get()) {
- try {
- this.queue.add(span);
- } catch (IllegalStateException e) {
- LOG.error("Error trying to append span (" + span.getDescription() + ") to the queue."
- + " Blocking Queue was full.");
- }
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-htrace/blob/a27cd4da/htrace-zipkin/src/main/java/org/htrace/zipkin/HTraceToZipkinConverter.java
----------------------------------------------------------------------
diff --git a/htrace-zipkin/src/main/java/org/htrace/zipkin/HTraceToZipkinConverter.java b/htrace-zipkin/src/main/java/org/htrace/zipkin/HTraceToZipkinConverter.java
deleted file mode 100644
index 0a3a60a..0000000
--- a/htrace-zipkin/src/main/java/org/htrace/zipkin/HTraceToZipkinConverter.java
+++ /dev/null
@@ -1,194 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.htrace.zipkin;
-
-import com.twitter.zipkin.gen.Annotation;
-import com.twitter.zipkin.gen.AnnotationType;
-import com.twitter.zipkin.gen.BinaryAnnotation;
-import com.twitter.zipkin.gen.Endpoint;
-import com.twitter.zipkin.gen.Span;
-import com.twitter.zipkin.gen.zipkinCoreConstants;
-import org.htrace.TimelineAnnotation;
-
-import java.util.ArrayList;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-
-/**
- * This class is responsible for converting a HTrace.Span to a Zipkin.Span object. To use the Zipkin
- * infrastructure (collector, front end), we need to store the Span information in a zipkin specific
- * format. This class transforms a HTrace:Span object to a Zipkin:Span object.
- * <p/>
- * This is how both Span objects are related:
- * <table>
- * <col width="50%"/> <col width="50%"/> <thead>
- * <tr>
- * <th>HTrace:Span</th>
- * <th>Zipkin:Span</th>
- * </tr>
- * <thead> <tbody>
- * <tr>
- * <td>TraceId</td>
- * <td>TraceId</td>
- * </tr>
- * <tr>
- * <td>ParentId</td>
- * <td>ParentId</td>
- * </tr>
- * <tr>
- * <td>SpanId</td>
- * <td>id</td>
- * </tr>
- * <tr>
- * <td>Description</td>
- * <td>Name</td>
- * </tr>
- * <tr>
- * <td>startTime, stopTime</td>
- * <td>Annotations (cs, cr, sr, ss)</td>
- * </tr>
- * <tr>
- * <td>Other annotations</td>
- * <td>Annotations</td>
- * </tr>
- * </tbody>
- * </table>
- * <p/>
- */
-public class HTraceToZipkinConverter {
-
- private final int ipv4Address;
- private final short port;
-
-
- private static final Map<String, Integer> DEFAULT_PORTS = new HashMap<String, Integer>();
-
- static {
- DEFAULT_PORTS.put("hmaster", 60000);
- DEFAULT_PORTS.put("hregionserver", 60020);
- DEFAULT_PORTS.put("namenode", 8020);
- DEFAULT_PORTS.put("datanode", 50010);
- }
-
- public HTraceToZipkinConverter(int ipv4Address, short port) {
- this.ipv4Address = ipv4Address;
- this.port = port;
- }
-
- /**
- * Converts a given HTrace span to a Zipkin Span.
- * <ul>
- * <li>First set the start annotation. [CS, SR], depending whether it is a client service or not.
- * <li>Set other id's, etc [TraceId's etc]
- * <li>Create binary annotations based on data from HTrace Span object.
- * <li>Set the last annotation. [SS, CR]
- * </ul>
- */
- public Span convert(org.htrace.Span hTraceSpan) {
- Span zipkinSpan = new Span();
- String serviceName = hTraceSpan.getProcessId().toLowerCase();
- Endpoint ep = new Endpoint(ipv4Address, (short) getPort(serviceName), serviceName);
- List<Annotation> annotationList = createZipkinAnnotations(hTraceSpan, ep);
- List<BinaryAnnotation> binaryAnnotationList = createZipkinBinaryAnnotations(hTraceSpan, ep);
- zipkinSpan.setTrace_id(hTraceSpan.getTraceId());
- if (hTraceSpan.getParentId() != org.htrace.Span.ROOT_SPAN_ID) {
- zipkinSpan.setParent_id(hTraceSpan.getParentId());
- }
- zipkinSpan.setId(hTraceSpan.getSpanId());
- zipkinSpan.setName(hTraceSpan.getDescription());
- zipkinSpan.setAnnotations(annotationList);
- zipkinSpan.setBinary_annotations(binaryAnnotationList);
- return zipkinSpan;
- }
-
- /**
- * Add annotations from the htrace Span.
- */
- private List<Annotation> createZipkinAnnotations(org.htrace.Span hTraceSpan,
- Endpoint ep) {
- List<Annotation> annotationList = new ArrayList<Annotation>();
-
- // add first zipkin annotation.
- annotationList.add(createZipkinAnnotation(zipkinCoreConstants.CLIENT_SEND, hTraceSpan.getStartTimeMillis(), ep, true));
- annotationList.add(createZipkinAnnotation(zipkinCoreConstants.SERVER_RECV, hTraceSpan.getStartTimeMillis(), ep, true));
- // add HTrace time annotation
- for (TimelineAnnotation ta : hTraceSpan.getTimelineAnnotations()) {
- annotationList.add(createZipkinAnnotation(ta.getMessage(), ta.getTime(), ep, true));
- }
- // add last zipkin annotation
- annotationList.add(createZipkinAnnotation(zipkinCoreConstants.SERVER_SEND, hTraceSpan.getStopTimeMillis(), ep, false));
- annotationList.add(createZipkinAnnotation(zipkinCoreConstants.CLIENT_RECV, hTraceSpan.getStopTimeMillis(), ep, false));
- return annotationList;
- }
-
- /**
- * Creates a list of Annotations that are present in HTrace Span object.
- *
- * @return list of Annotations that could be added to Zipkin Span.
- */
- private List<BinaryAnnotation> createZipkinBinaryAnnotations(org.htrace.Span span,
- Endpoint ep) {
- List<BinaryAnnotation> l = new ArrayList<BinaryAnnotation>();
- for (Map.Entry<byte[], byte[]> e : span.getKVAnnotations().entrySet()) {
- BinaryAnnotation binaryAnn = new BinaryAnnotation();
- binaryAnn.setAnnotation_type(AnnotationType.BYTES);
- binaryAnn.setKey(new String(e.getKey()));
- binaryAnn.setValue(e.getValue());
- binaryAnn.setHost(ep);
- l.add(binaryAnn);
- }
- return l;
- }
-
- /**
- * Create an annotation with the correct times and endpoint.
- *
- * @param value Annotation value
- * @param time timestamp will be extracted
- * @param ep the endopint this annotation will be associated with.
- * @param sendRequest use the first or last timestamp.
- */
- private static Annotation createZipkinAnnotation(String value, long time,
- Endpoint ep, boolean sendRequest) {
- Annotation annotation = new Annotation();
- annotation.setHost(ep);
-
- // Zipkin is in microseconds
- if (sendRequest) {
- annotation.setTimestamp(time * 1000);
- } else {
- annotation.setTimestamp(time * 1000);
- }
-
- annotation.setDuration(1);
- annotation.setValue(value);
- return annotation;
- }
-
- private int getPort(String serviceName) {
- if (port != -1) {
- return port;
- }
-
- Integer p = DEFAULT_PORTS.get(serviceName);
- if (p != null) {
- return p;
- }
- return 80;
- }
-}