You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@zipkin.apache.org by unknown <""...@apache.org> on 2019/05/11 06:01:33 UTC
[incubator-zipkin] branch master updated: Benchmark bytes /
bytebuffer, protobuf vs zipkin vs wire. (#2579)
This is an automated email from the ASF dual-hosted git repository.
(unknown) pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-zipkin.git
The following commit(s) were added to refs/heads/master by this push:
new 6f6cb32 Benchmark bytes / bytebuffer, protobuf vs zipkin vs wire. (#2579)
6f6cb32 is described below
commit 6f6cb328107ba0e77494c80ebca51165d97caa9c
Author: Anuraag Agrawal <an...@gmail.com>
AuthorDate: Sat May 11 15:01:28 2019 +0900
Benchmark bytes / bytebuffer, protobuf vs zipkin vs wire. (#2579)
* Benchmark bytes / bytebuffer, protobuf vs zipkin vs wire.
* Add wire-based decoder too.
* polish
* changes to more realistic max message size
---
benchmarks/pom.xml | 8 +
.../java/zipkin2/codec/ProtoCodecBenchmarks.java | 121 +++++++++
.../java/zipkin2/codec/ProtobufSpanDecoder.java | 295 +++++++++++++++++++++
.../main/java/zipkin2/codec/WireSpanDecoder.java | 295 +++++++++++++++++++++
4 files changed, 719 insertions(+)
diff --git a/benchmarks/pom.xml b/benchmarks/pom.xml
index 02b84e1..b4b79f9 100644
--- a/benchmarks/pom.xml
+++ b/benchmarks/pom.xml
@@ -70,6 +70,12 @@
</dependency>
<dependency>
+ <groupId>com.google.protobuf</groupId>
+ <artifactId>protobuf-java</artifactId>
+ <version>3.7.1</version>
+ </dependency>
+
+ <dependency>
<groupId>${project.groupId}</groupId>
<artifactId>zipkin-server</artifactId>
<version>${project.version}</version>
@@ -144,6 +150,8 @@
<configuration combine.self="override">
<!-- instead of javac-with-errorprone -->
<compilerId>javac</compilerId>
+ <source>1.8</source>
+ <target>1.8</target>
<!-- scrub errorprone compiler args -->
<compilerArgs />
</configuration>
diff --git a/benchmarks/src/main/java/zipkin2/codec/ProtoCodecBenchmarks.java b/benchmarks/src/main/java/zipkin2/codec/ProtoCodecBenchmarks.java
new file mode 100644
index 0000000..c2c8602
--- /dev/null
+++ b/benchmarks/src/main/java/zipkin2/codec/ProtoCodecBenchmarks.java
@@ -0,0 +1,121 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package zipkin2.codec;
+
+import com.google.common.io.Resources;
+import io.netty.buffer.ByteBuf;
+import io.netty.buffer.ByteBufUtil;
+import io.netty.buffer.PooledByteBufAllocator;
+import java.io.IOException;
+import java.util.Collections;
+import java.util.List;
+import java.util.concurrent.TimeUnit;
+import org.openjdk.jmh.annotations.Benchmark;
+import org.openjdk.jmh.annotations.BenchmarkMode;
+import org.openjdk.jmh.annotations.Fork;
+import org.openjdk.jmh.annotations.Measurement;
+import org.openjdk.jmh.annotations.Mode;
+import org.openjdk.jmh.annotations.OutputTimeUnit;
+import org.openjdk.jmh.annotations.Param;
+import org.openjdk.jmh.annotations.Scope;
+import org.openjdk.jmh.annotations.Setup;
+import org.openjdk.jmh.annotations.State;
+import org.openjdk.jmh.annotations.TearDown;
+import org.openjdk.jmh.annotations.Threads;
+import org.openjdk.jmh.annotations.Warmup;
+import org.openjdk.jmh.runner.Runner;
+import org.openjdk.jmh.runner.options.Options;
+import org.openjdk.jmh.runner.options.OptionsBuilder;
+import zipkin2.Span;
+
+@Measurement(iterations = 5, time = 1)
+@Warmup(iterations = 10, time = 1)
+@Fork(3)
+@BenchmarkMode(Mode.SampleTime)
+@OutputTimeUnit(TimeUnit.MICROSECONDS)
+@State(Scope.Thread)
+@Threads(1)
+public class ProtoCodecBenchmarks {
+
+ static final byte[] clientSpanJsonV2 = read("/zipkin2-client.json");
+ static final Span clientSpan = SpanBytesDecoder.JSON_V2.decodeOne(clientSpanJsonV2);
+
+ // Assume a message is 1000 spans (which is a high number for as this is per-node-second)
+ static final List<Span> spans = Collections.nCopies(1000, clientSpan);
+ static final byte[] encodedBytes = SpanBytesEncoder.PROTO3.encodeList(spans);
+
+ private ByteBuf encodedBuf;
+
+ @Setup
+ public void setup() {
+ encodedBuf = PooledByteBufAllocator.DEFAULT.buffer(encodedBytes.length);
+ encodedBuf.writeBytes(encodedBytes);
+ }
+
+ @TearDown
+ public void tearDown() {
+ encodedBuf.release();
+ }
+
+ @Benchmark
+ public List<Span> bytes_zipkinDecoder() {
+ return SpanBytesDecoder.PROTO3.decodeList(encodedBytes);
+ }
+
+ @Benchmark
+ public List<Span> bytes_protobufDecoder() {
+ return ProtobufSpanDecoder.decodeList(encodedBytes);
+ }
+
+ @Benchmark
+ public List<Span> bytes_wireDecoder() {
+ return WireSpanDecoder.decodeList(encodedBytes);
+ }
+
+ @Benchmark
+ public List<Span> bytebuffer_zipkinDecoder() {
+ return SpanBytesDecoder.PROTO3.decodeList(ByteBufUtil.getBytes(encodedBuf));
+ }
+
+ @Benchmark
+ public List<Span> bytebuffer_protobufDecoder() {
+ return ProtobufSpanDecoder.decodeList(encodedBuf.nioBuffer());
+ }
+
+ @Benchmark
+ public List<Span> bytebuffer_wireDecoder() {
+ return WireSpanDecoder.decodeList(encodedBuf.nioBuffer());
+ }
+
+ // Convenience main entry-point
+ public static void main(String[] args) throws Exception {
+ Options opt = new OptionsBuilder()
+ .include(".*" + ProtoCodecBenchmarks.class.getSimpleName() + ".*bytebuffer_")
+ .addProfiler("gc")
+ .build();
+
+ new Runner(opt).run();
+ }
+
+ static byte[] read(String resource) {
+ try {
+ return Resources.toByteArray(Resources.getResource(CodecBenchmarks.class, resource));
+ } catch (IOException e) {
+ throw new IllegalStateException(e);
+ }
+ }
+}
diff --git a/benchmarks/src/main/java/zipkin2/codec/ProtobufSpanDecoder.java b/benchmarks/src/main/java/zipkin2/codec/ProtobufSpanDecoder.java
new file mode 100644
index 0000000..f63d62c
--- /dev/null
+++ b/benchmarks/src/main/java/zipkin2/codec/ProtobufSpanDecoder.java
@@ -0,0 +1,295 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package zipkin2.codec;
+
+import com.google.protobuf.CodedInputStream;
+import com.google.protobuf.WireFormat;
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.logging.Logger;
+import zipkin2.Endpoint;
+import zipkin2.Span;
+
+public class ProtobufSpanDecoder {
+ static final Logger LOG = Logger.getLogger(ProtobufSpanDecoder.class.getName());
+ static final boolean DEBUG = false;
+
+ // map<string,string> in proto is a special field with key, value
+ static final int MAP_KEY_KEY = (1 << 3) | WireFormat.WIRETYPE_LENGTH_DELIMITED;
+ static final int MAP_VALUE_KEY = (2 << 3) | WireFormat.WIRETYPE_LENGTH_DELIMITED;
+
+ static boolean decodeTag(CodedInputStream input, Span.Builder span) throws IOException {
+ // now, we are in the tag fields
+ String key = null, value = ""; // empty tags allowed
+
+ boolean done = false;
+ while (!done) {
+ int tag = input.readTag();
+ switch (tag) {
+ case 0:
+ done = true;
+ break;
+ case MAP_KEY_KEY: {
+ key = input.readStringRequireUtf8();
+ break;
+ }
+ case MAP_VALUE_KEY: {
+ value = input.readStringRequireUtf8();
+ break;
+ }
+ default: {
+ logAndSkip(input, tag);
+ break;
+ }
+ }
+ }
+
+ if (key == null) return false;
+ span.putTag(key, value);
+ return true;
+ }
+
+ static boolean decodeAnnotation(CodedInputStream input, Span.Builder span) throws IOException {
+ long timestamp = 0L;
+ String value = null;
+
+ boolean done = false;
+ while (!done) {
+ int tag = input.readTag();
+ switch (tag) {
+ case 0:
+ done = true;
+ break;
+ case 9: {
+ timestamp = input.readFixed64();
+ break;
+ }
+ case 18: {
+ value = input.readStringRequireUtf8();
+ break;
+ }
+ default: {
+ logAndSkip(input, tag);
+ break;
+ }
+ }
+ }
+
+ if (timestamp == 0L || value == null) return false;
+ span.addAnnotation(timestamp, value);
+ return true;
+ }
+
+ private static Endpoint decodeEndpoint(CodedInputStream input) throws IOException {
+ Endpoint.Builder endpoint = Endpoint.newBuilder();
+
+ boolean done = false;
+ while (!done) {
+ int tag = input.readTag();
+ switch (tag) {
+ case 0:
+ done = true;
+ break;
+ case 10: {
+ endpoint.serviceName(input.readStringRequireUtf8());
+ break;
+ }
+ case 18:
+ case 26: {
+ endpoint.parseIp(input.readByteArray());
+ break;
+ }
+ case 32: {
+ endpoint.port(input.readInt32());
+ break;
+ }
+ default: {
+ logAndSkip(input, tag);
+ break;
+ }
+ }
+ }
+ return endpoint.build();
+ }
+
+ public static Span decodeOne(CodedInputStream input) throws IOException {
+ Span.Builder span = Span.newBuilder();
+
+ boolean done = false;
+ while (!done) {
+ int tag = input.readTag();
+ switch (tag) {
+ case 0:
+ done = true;
+ break;
+ case 10: {
+ span.traceId(readHexString(input));
+ break;
+ }
+ case 18: {
+ span.parentId(readHexString(input));
+ break;
+ }
+ case 26: {
+ span.id(readHexString(input));
+ break;
+ }
+ case 32: {
+ int kind = input.readEnum();
+ if (kind == 0) break;
+ if (kind > Span.Kind.values().length) break;
+ span.kind(Span.Kind.values()[kind - 1]);
+ break;
+ }
+ case 42: {
+ span.name(input.readStringRequireUtf8());
+ break;
+ }
+ case 49: {
+ span.timestamp(input.readFixed64());
+ break;
+ }
+ case 56: {
+ span.duration(input.readUInt64());
+ break;
+ }
+ case 66: {
+ int length = input.readRawVarint32();
+ int oldLimit = input.pushLimit(length);
+
+ span.localEndpoint(decodeEndpoint(input));
+
+ input.checkLastTagWas(0);
+ input.popLimit(oldLimit);
+ break;
+ }
+ case 74: {
+ int length = input.readRawVarint32();
+ int oldLimit = input.pushLimit(length);
+
+ span.remoteEndpoint(decodeEndpoint(input));
+
+ input.checkLastTagWas(0);
+ input.popLimit(oldLimit);
+ break;
+ }
+ case 82: {
+ int length = input.readRawVarint32();
+ int oldLimit = input.pushLimit(length);
+
+ decodeAnnotation(input, span);
+
+ input.checkLastTagWas(0);
+ input.popLimit(oldLimit);
+ break;
+ }
+ case 90: {
+ int length = input.readRawVarint32();
+ int oldLimit = input.pushLimit(length);
+
+ decodeTag(input, span);
+
+ input.checkLastTagWas(0);
+ input.popLimit(oldLimit);
+ break;
+ }
+ case 96: {
+ span.debug(input.readBool());
+ break;
+ }
+ case 104: {
+ span.shared(input.readBool());
+ break;
+ }
+ default: {
+ logAndSkip(input, tag);
+ break;
+ }
+ }
+ }
+
+ return span.build();
+ }
+
+ public static List<Span> decodeList(byte[] spans) {
+ return decodeList(CodedInputStream.newInstance(spans));
+ }
+
+ public static List<Span> decodeList(ByteBuffer spans) {
+ return decodeList(CodedInputStream.newInstance(spans));
+ }
+
+ public static List<Span> decodeList(CodedInputStream input) {
+ ArrayList<Span> spans = new ArrayList<>();
+
+ try {
+ boolean done = false;
+ while (!done) {
+ int tag = input.readTag();
+ switch (tag) {
+ case 0:
+ done = true;
+ break;
+ case 10:
+ int length = input.readRawVarint32();
+ int oldLimit = input.pushLimit(length);
+ spans.add(decodeOne(input));
+ input.checkLastTagWas(0);
+ input.popLimit(oldLimit);
+ break;
+ default: {
+ logAndSkip(input, tag);
+ break;
+ }
+ }
+ }
+ } catch (IOException e) {
+ throw new RuntimeException(e);
+ }
+
+ return spans;
+ }
+
+ static final char[] HEX_DIGITS =
+ {'0', '1', '2', '3', '4', '5', '6', '7', '8', '9', 'a', 'b', 'c', 'd', 'e', 'f'};
+
+ private static String readHexString(CodedInputStream input) throws IOException {
+ int size = input.readRawVarint32();
+
+ char[] result = new char[size * 2];
+
+ for (int i = 0; i < result.length; i += 2) {
+ byte b = input.readRawByte();
+ result[i] = HEX_DIGITS[(b >> 4) & 0xf];
+ result[i + 1] = HEX_DIGITS[b & 0xf];
+ }
+
+ return new String(result);
+ }
+
+ static void logAndSkip(CodedInputStream input, int tag) throws IOException {
+ if (DEBUG) { // avoiding volatile reads as we don't log on skip in our normal codec
+ int nextWireType = WireFormat.getTagWireType(tag);
+ int nextFieldNumber = WireFormat.getTagFieldNumber(tag);
+ LOG.fine(String.format("Skipping field: byte=%s, fieldNumber=%s, wireType=%s",
+ input.getTotalBytesRead(), nextFieldNumber, nextWireType));
+ }
+ input.skipField(tag);
+ }
+}
diff --git a/benchmarks/src/main/java/zipkin2/codec/WireSpanDecoder.java b/benchmarks/src/main/java/zipkin2/codec/WireSpanDecoder.java
new file mode 100644
index 0000000..a7e7a24
--- /dev/null
+++ b/benchmarks/src/main/java/zipkin2/codec/WireSpanDecoder.java
@@ -0,0 +1,295 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package zipkin2.codec;
+
+import com.google.protobuf.WireFormat;
+import com.squareup.wire.ProtoAdapter;
+import com.squareup.wire.ProtoReader;
+import java.io.IOException;
+import java.io.UncheckedIOException;
+import java.nio.ByteBuffer;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.logging.Logger;
+import okio.Buffer;
+import zipkin2.Endpoint;
+import zipkin2.Span;
+
+public class WireSpanDecoder {
+ static final Logger LOG = Logger.getLogger(WireSpanDecoder.class.getName());
+ static final boolean DEBUG = false;
+
+ static boolean decodeTag(ProtoReader input, Span.Builder span) throws IOException {
+ // now, we are in the tag fields
+ String key = null, value = ""; // empty tags allowed
+
+ boolean done = false;
+ while (!done) {
+ int tag = input.nextTag();
+ switch (tag) {
+ case -1:
+ done = true;
+ break;
+ case 1: {
+ key = input.readString();
+ break;
+ }
+ case 2: {
+ value = input.readString();
+ break;
+ }
+ default: {
+ logAndSkip(input, tag);
+ break;
+ }
+ }
+ }
+
+ if (key == null) return false;
+ span.putTag(key, value);
+ return true;
+ }
+
+ static boolean decodeAnnotation(ProtoReader input, Span.Builder span) throws IOException {
+ long timestamp = 0L;
+ String value = null;
+
+ boolean done = false;
+ while (!done) {
+ int tag = input.nextTag();
+ switch (tag) {
+ case -1:
+ done = true;
+ break;
+ case 1: {
+ timestamp = input.readFixed64();
+ break;
+ }
+ case 2: {
+ value = input.readString();
+ break;
+ }
+ default: {
+ logAndSkip(input, tag);
+ break;
+ }
+ }
+ }
+
+ if (timestamp == 0L || value == null) return false;
+ span.addAnnotation(timestamp, value);
+ return true;
+ }
+
+ private static Endpoint decodeEndpoint(ProtoReader input) throws IOException {
+ Endpoint.Builder endpoint = Endpoint.newBuilder();
+
+ boolean done = false;
+ while (!done) {
+ int tag = input.nextTag();
+ switch (tag) {
+ case -1:
+ done = true;
+ break;
+ case 1: {
+ String s = input.readString();
+ endpoint.serviceName(s);
+ break;
+ }
+ case 2:
+ case 3: {
+ endpoint.parseIp(input.readBytes().toByteArray());
+ break;
+ }
+ case 4: {
+ endpoint.port(input.readVarint32());
+ break;
+ }
+ default: {
+ logAndSkip(input, tag);
+ break;
+ }
+ }
+ }
+ return endpoint.build();
+ }
+
+ public static Span decodeOne(ProtoReader input) throws IOException {
+ Span.Builder span = Span.newBuilder();
+
+ boolean done = false;
+ while (!done) {
+ int tag = input.nextTag();
+ switch (tag) {
+ case -1:
+ done = true;
+ break;
+ case 1: {
+ span.traceId(readHexString(input));
+ break;
+ }
+ case 2: {
+ span.parentId(readHexString(input));
+ break;
+ }
+ case 3: {
+ span.id(readHexString(input));
+ break;
+ }
+ case 4: {
+ int kind = input.readVarint32();
+ if (kind == 0) break;
+ if (kind > Span.Kind.values().length) break;
+ span.kind(Span.Kind.values()[kind - 1]);
+ break;
+ }
+ case 5: {
+ String name = input.readString();
+ span.name(name);
+ break;
+ }
+ case 6: {
+ span.timestamp(input.readFixed64());
+ break;
+ }
+ case 7: {
+ span.duration(input.readVarint64());
+ break;
+ }
+ case 8: {
+ long token = input.beginMessage();
+
+ span.localEndpoint(decodeEndpoint(input));
+
+ input.endMessage(token);
+ break;
+ }
+ case 9: {
+ long token = input.beginMessage();
+
+ span.remoteEndpoint(decodeEndpoint(input));
+
+ input.endMessage(token);
+ break;
+ }
+ case 10: {
+ long token = input.beginMessage();
+
+ decodeAnnotation(input, span);
+
+ input.endMessage(token);
+ break;
+ }
+ case 11: {
+ long token = input.beginMessage();
+
+ decodeTag(input, span);
+
+ input.endMessage(token);
+ break;
+ }
+ case 12: {
+ span.debug(ProtoAdapter.BOOL.decode(input));
+ break;
+ }
+ case 13: {
+ span.shared(ProtoAdapter.BOOL.decode(input));
+ break;
+ }
+ default: {
+ logAndSkip(input, tag);
+ break;
+ }
+ }
+ }
+
+ return span.build();
+ }
+
+ public static List<Span> decodeList(byte[] spans) {
+ return decodeList(new ProtoReader(new Buffer().write(spans)));
+ }
+
+ public static List<Span> decodeList(ByteBuffer spans) {
+ Buffer buffer = new Buffer();
+ try {
+ buffer.write(spans);
+ } catch (IOException e) {
+ throw new AssertionError(e); // no I/O
+ }
+ return decodeList(new ProtoReader(buffer));
+ }
+
+ public static List<Span> decodeList(ProtoReader input) {
+ ArrayList<Span> spans = new ArrayList<>();
+
+ final long token;
+ try {
+ token = input.beginMessage();
+ } catch (IOException e) {
+ throw new UncheckedIOException(e);
+ }
+
+ try {
+ boolean done = false;
+ while (!done) {
+ int tag = input.nextTag();
+ switch (tag) {
+ case -1:
+ done = true;
+ break;
+ case 1: {
+ long subToken = input.beginMessage();
+
+ spans.add(decodeOne(input));
+
+ input.endMessage(subToken);
+ break;
+ }
+ default: {
+ logAndSkip(input, tag);
+ break;
+ }
+ }
+ }
+ } catch (IOException e) {
+ throw new RuntimeException(e);
+ }
+
+ try {
+ input.endMessage(token);
+ } catch (IOException e) {
+ throw new UncheckedIOException(e);
+ }
+
+ return spans;
+ }
+
+ private static String readHexString(ProtoReader input) throws IOException {
+ return input.readBytes().hex();
+ }
+
+ static void logAndSkip(ProtoReader input, int tag) throws IOException {
+ if (DEBUG) { // avoiding volatile reads as we don't log on skip in our normal codec
+ int nextWireType = WireFormat.getTagWireType(tag);
+ int nextFieldNumber = WireFormat.getTagFieldNumber(tag);
+ LOG.fine(String.format("Skipping field: byte=%s, fieldNumber=%s, wireType=%s",
+ 0, nextFieldNumber, nextWireType));
+ }
+ input.skip();
+ }
+}