You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@zipkin.apache.org by unknown <""...@apache.org> on 2019/05/11 06:01:33 UTC

[incubator-zipkin] branch master updated: Benchmark bytes / bytebuffer, protobuf vs zipkin vs wire. (#2579)

This is an automated email from the ASF dual-hosted git repository.

(unknown) pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-zipkin.git


The following commit(s) were added to refs/heads/master by this push:
     new 6f6cb32  Benchmark bytes / bytebuffer, protobuf vs zipkin vs wire. (#2579)
6f6cb32 is described below

commit 6f6cb328107ba0e77494c80ebca51165d97caa9c
Author: Anuraag Agrawal <an...@gmail.com>
AuthorDate: Sat May 11 15:01:28 2019 +0900

    Benchmark bytes / bytebuffer, protobuf vs zipkin vs wire. (#2579)
    
    * Benchmark bytes / bytebuffer, protobuf vs zipkin vs wire.
    
    * Add wire-based decoder too.
    
    * polish
    
    * changes to more realistic max message size
---
 benchmarks/pom.xml                                 |   8 +
 .../java/zipkin2/codec/ProtoCodecBenchmarks.java   | 121 +++++++++
 .../java/zipkin2/codec/ProtobufSpanDecoder.java    | 295 +++++++++++++++++++++
 .../main/java/zipkin2/codec/WireSpanDecoder.java   | 295 +++++++++++++++++++++
 4 files changed, 719 insertions(+)

diff --git a/benchmarks/pom.xml b/benchmarks/pom.xml
index 02b84e1..b4b79f9 100644
--- a/benchmarks/pom.xml
+++ b/benchmarks/pom.xml
@@ -70,6 +70,12 @@
     </dependency>
 
     <dependency>
+      <groupId>com.google.protobuf</groupId>
+      <artifactId>protobuf-java</artifactId>
+      <version>3.7.1</version>
+    </dependency>
+
+    <dependency>
       <groupId>${project.groupId}</groupId>
       <artifactId>zipkin-server</artifactId>
       <version>${project.version}</version>
@@ -144,6 +150,8 @@
             <configuration combine.self="override">
               <!-- instead of javac-with-errorprone -->
               <compilerId>javac</compilerId>
+              <source>1.8</source>
+              <target>1.8</target>
               <!-- scrub errorprone compiler args -->
               <compilerArgs />
             </configuration>
diff --git a/benchmarks/src/main/java/zipkin2/codec/ProtoCodecBenchmarks.java b/benchmarks/src/main/java/zipkin2/codec/ProtoCodecBenchmarks.java
new file mode 100644
index 0000000..c2c8602
--- /dev/null
+++ b/benchmarks/src/main/java/zipkin2/codec/ProtoCodecBenchmarks.java
@@ -0,0 +1,121 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package zipkin2.codec;
+
+import com.google.common.io.Resources;
+import io.netty.buffer.ByteBuf;
+import io.netty.buffer.ByteBufUtil;
+import io.netty.buffer.PooledByteBufAllocator;
+import java.io.IOException;
+import java.util.Collections;
+import java.util.List;
+import java.util.concurrent.TimeUnit;
+import org.openjdk.jmh.annotations.Benchmark;
+import org.openjdk.jmh.annotations.BenchmarkMode;
+import org.openjdk.jmh.annotations.Fork;
+import org.openjdk.jmh.annotations.Measurement;
+import org.openjdk.jmh.annotations.Mode;
+import org.openjdk.jmh.annotations.OutputTimeUnit;
+import org.openjdk.jmh.annotations.Param;
+import org.openjdk.jmh.annotations.Scope;
+import org.openjdk.jmh.annotations.Setup;
+import org.openjdk.jmh.annotations.State;
+import org.openjdk.jmh.annotations.TearDown;
+import org.openjdk.jmh.annotations.Threads;
+import org.openjdk.jmh.annotations.Warmup;
+import org.openjdk.jmh.runner.Runner;
+import org.openjdk.jmh.runner.options.Options;
+import org.openjdk.jmh.runner.options.OptionsBuilder;
+import zipkin2.Span;
+
+@Measurement(iterations = 5, time = 1)
+@Warmup(iterations = 10, time = 1)
+@Fork(3)
+@BenchmarkMode(Mode.SampleTime)
+@OutputTimeUnit(TimeUnit.MICROSECONDS)
+@State(Scope.Thread)
+@Threads(1)
+public class ProtoCodecBenchmarks {
+
+  static final byte[] clientSpanJsonV2 = read("/zipkin2-client.json");
+  static final Span clientSpan = SpanBytesDecoder.JSON_V2.decodeOne(clientSpanJsonV2);
+
+  // Assume a message is 1000 spans (which is a high number for as this is per-node-second)
+  static final List<Span> spans = Collections.nCopies(1000, clientSpan);
+  static final byte[] encodedBytes = SpanBytesEncoder.PROTO3.encodeList(spans);
+
+  private ByteBuf encodedBuf;
+
+  @Setup
+  public void setup() {
+    encodedBuf = PooledByteBufAllocator.DEFAULT.buffer(encodedBytes.length);
+    encodedBuf.writeBytes(encodedBytes);
+  }
+
+  @TearDown
+  public void tearDown() {
+    encodedBuf.release();
+  }
+
+  @Benchmark
+  public List<Span> bytes_zipkinDecoder() {
+    return SpanBytesDecoder.PROTO3.decodeList(encodedBytes);
+  }
+
+  @Benchmark
+  public List<Span> bytes_protobufDecoder() {
+    return ProtobufSpanDecoder.decodeList(encodedBytes);
+  }
+
+  @Benchmark
+  public List<Span> bytes_wireDecoder() {
+    return WireSpanDecoder.decodeList(encodedBytes);
+  }
+
+  @Benchmark
+  public List<Span> bytebuffer_zipkinDecoder() {
+    return SpanBytesDecoder.PROTO3.decodeList(ByteBufUtil.getBytes(encodedBuf));
+  }
+
+  @Benchmark
+  public List<Span> bytebuffer_protobufDecoder() {
+    return ProtobufSpanDecoder.decodeList(encodedBuf.nioBuffer());
+  }
+
+  @Benchmark
+  public List<Span> bytebuffer_wireDecoder() {
+    return WireSpanDecoder.decodeList(encodedBuf.nioBuffer());
+  }
+
+  // Convenience main entry-point
+  public static void main(String[] args) throws Exception {
+    Options opt = new OptionsBuilder()
+      .include(".*" + ProtoCodecBenchmarks.class.getSimpleName() + ".*bytebuffer_")
+      .addProfiler("gc")
+      .build();
+
+    new Runner(opt).run();
+  }
+
+  static byte[] read(String resource) {
+    try {
+      return Resources.toByteArray(Resources.getResource(CodecBenchmarks.class, resource));
+    } catch (IOException e) {
+      throw new IllegalStateException(e);
+    }
+  }
+}
diff --git a/benchmarks/src/main/java/zipkin2/codec/ProtobufSpanDecoder.java b/benchmarks/src/main/java/zipkin2/codec/ProtobufSpanDecoder.java
new file mode 100644
index 0000000..f63d62c
--- /dev/null
+++ b/benchmarks/src/main/java/zipkin2/codec/ProtobufSpanDecoder.java
@@ -0,0 +1,295 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package zipkin2.codec;
+
+import com.google.protobuf.CodedInputStream;
+import com.google.protobuf.WireFormat;
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.logging.Logger;
+import zipkin2.Endpoint;
+import zipkin2.Span;
+
+public class ProtobufSpanDecoder {
+  static final Logger LOG = Logger.getLogger(ProtobufSpanDecoder.class.getName());
+  static final boolean DEBUG = false;
+
+  // map<string,string> in proto is a special field with key, value
+  static final int MAP_KEY_KEY = (1 << 3) | WireFormat.WIRETYPE_LENGTH_DELIMITED;
+  static final int MAP_VALUE_KEY = (2 << 3) | WireFormat.WIRETYPE_LENGTH_DELIMITED;
+
+  static boolean decodeTag(CodedInputStream input, Span.Builder span) throws IOException {
+    // now, we are in the tag fields
+    String key = null, value = ""; // empty tags allowed
+
+    boolean done = false;
+    while (!done) {
+      int tag = input.readTag();
+      switch (tag) {
+        case 0:
+          done = true;
+          break;
+        case MAP_KEY_KEY: {
+          key = input.readStringRequireUtf8();
+          break;
+        }
+        case MAP_VALUE_KEY: {
+          value = input.readStringRequireUtf8();
+          break;
+        }
+        default: {
+          logAndSkip(input, tag);
+          break;
+        }
+      }
+    }
+
+    if (key == null) return false;
+    span.putTag(key, value);
+    return true;
+  }
+
+  static boolean decodeAnnotation(CodedInputStream input, Span.Builder span) throws IOException {
+    long timestamp = 0L;
+    String value = null;
+
+    boolean done = false;
+    while (!done) {
+      int tag = input.readTag();
+      switch (tag) {
+        case 0:
+          done = true;
+          break;
+        case 9: {
+          timestamp = input.readFixed64();
+          break;
+        }
+        case 18: {
+          value = input.readStringRequireUtf8();
+          break;
+        }
+        default: {
+          logAndSkip(input, tag);
+          break;
+        }
+      }
+    }
+
+    if (timestamp == 0L || value == null) return false;
+    span.addAnnotation(timestamp, value);
+    return true;
+  }
+
+  private static Endpoint decodeEndpoint(CodedInputStream input) throws IOException {
+    Endpoint.Builder endpoint = Endpoint.newBuilder();
+
+    boolean done = false;
+    while (!done) {
+      int tag = input.readTag();
+      switch (tag) {
+        case 0:
+          done = true;
+          break;
+        case 10: {
+          endpoint.serviceName(input.readStringRequireUtf8());
+          break;
+        }
+        case 18:
+        case 26: {
+          endpoint.parseIp(input.readByteArray());
+          break;
+        }
+        case 32: {
+          endpoint.port(input.readInt32());
+          break;
+        }
+        default: {
+          logAndSkip(input, tag);
+          break;
+        }
+      }
+    }
+    return endpoint.build();
+  }
+
+  public static Span decodeOne(CodedInputStream input) throws IOException {
+    Span.Builder span = Span.newBuilder();
+
+    boolean done = false;
+    while (!done) {
+      int tag = input.readTag();
+      switch (tag) {
+        case 0:
+          done = true;
+          break;
+        case 10: {
+          span.traceId(readHexString(input));
+          break;
+        }
+        case 18: {
+          span.parentId(readHexString(input));
+          break;
+        }
+        case 26: {
+          span.id(readHexString(input));
+          break;
+        }
+        case 32: {
+          int kind = input.readEnum();
+          if (kind == 0) break;
+          if (kind > Span.Kind.values().length) break;
+          span.kind(Span.Kind.values()[kind - 1]);
+          break;
+        }
+        case 42: {
+          span.name(input.readStringRequireUtf8());
+          break;
+        }
+        case 49: {
+          span.timestamp(input.readFixed64());
+          break;
+        }
+        case 56: {
+          span.duration(input.readUInt64());
+          break;
+        }
+        case 66: {
+          int length = input.readRawVarint32();
+          int oldLimit = input.pushLimit(length);
+
+          span.localEndpoint(decodeEndpoint(input));
+
+          input.checkLastTagWas(0);
+          input.popLimit(oldLimit);
+          break;
+        }
+        case 74: {
+          int length = input.readRawVarint32();
+          int oldLimit = input.pushLimit(length);
+
+          span.remoteEndpoint(decodeEndpoint(input));
+
+          input.checkLastTagWas(0);
+          input.popLimit(oldLimit);
+          break;
+        }
+        case 82: {
+          int length = input.readRawVarint32();
+          int oldLimit = input.pushLimit(length);
+
+          decodeAnnotation(input, span);
+
+          input.checkLastTagWas(0);
+          input.popLimit(oldLimit);
+          break;
+        }
+        case 90: {
+          int length = input.readRawVarint32();
+          int oldLimit = input.pushLimit(length);
+
+          decodeTag(input, span);
+
+          input.checkLastTagWas(0);
+          input.popLimit(oldLimit);
+          break;
+        }
+        case 96: {
+          span.debug(input.readBool());
+          break;
+        }
+        case 104: {
+          span.shared(input.readBool());
+          break;
+        }
+        default: {
+          logAndSkip(input, tag);
+          break;
+        }
+      }
+    }
+
+    return span.build();
+  }
+
+  public static List<Span> decodeList(byte[] spans) {
+    return decodeList(CodedInputStream.newInstance(spans));
+  }
+
+  public static List<Span> decodeList(ByteBuffer spans) {
+    return decodeList(CodedInputStream.newInstance(spans));
+  }
+
+  public static List<Span> decodeList(CodedInputStream input) {
+    ArrayList<Span> spans = new ArrayList<>();
+
+    try {
+      boolean done = false;
+      while (!done) {
+        int tag = input.readTag();
+        switch (tag) {
+          case 0:
+            done = true;
+            break;
+          case 10:
+            int length = input.readRawVarint32();
+            int oldLimit = input.pushLimit(length);
+            spans.add(decodeOne(input));
+            input.checkLastTagWas(0);
+            input.popLimit(oldLimit);
+            break;
+          default: {
+            logAndSkip(input, tag);
+            break;
+          }
+        }
+      }
+    } catch (IOException e) {
+      throw new RuntimeException(e);
+    }
+
+    return spans;
+  }
+
+  static final char[] HEX_DIGITS =
+    {'0', '1', '2', '3', '4', '5', '6', '7', '8', '9', 'a', 'b', 'c', 'd', 'e', 'f'};
+
+  private static String readHexString(CodedInputStream input) throws IOException {
+    int size = input.readRawVarint32();
+
+    char[] result = new char[size * 2];
+
+    for (int i = 0; i < result.length; i += 2) {
+      byte b = input.readRawByte();
+      result[i] = HEX_DIGITS[(b >> 4) & 0xf];
+      result[i + 1] = HEX_DIGITS[b & 0xf];
+    }
+
+    return new String(result);
+  }
+
+  static void logAndSkip(CodedInputStream input, int tag) throws IOException {
+    if (DEBUG) { // avoiding volatile reads as we don't log on skip in our normal codec
+      int nextWireType = WireFormat.getTagWireType(tag);
+      int nextFieldNumber = WireFormat.getTagFieldNumber(tag);
+      LOG.fine(String.format("Skipping field: byte=%s, fieldNumber=%s, wireType=%s",
+        input.getTotalBytesRead(), nextFieldNumber, nextWireType));
+    }
+    input.skipField(tag);
+  }
+}
diff --git a/benchmarks/src/main/java/zipkin2/codec/WireSpanDecoder.java b/benchmarks/src/main/java/zipkin2/codec/WireSpanDecoder.java
new file mode 100644
index 0000000..a7e7a24
--- /dev/null
+++ b/benchmarks/src/main/java/zipkin2/codec/WireSpanDecoder.java
@@ -0,0 +1,295 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package zipkin2.codec;
+
+import com.google.protobuf.WireFormat;
+import com.squareup.wire.ProtoAdapter;
+import com.squareup.wire.ProtoReader;
+import java.io.IOException;
+import java.io.UncheckedIOException;
+import java.nio.ByteBuffer;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.logging.Logger;
+import okio.Buffer;
+import zipkin2.Endpoint;
+import zipkin2.Span;
+
+public class WireSpanDecoder {
+  static final Logger LOG = Logger.getLogger(WireSpanDecoder.class.getName());
+  static final boolean DEBUG = false;
+
+  static boolean decodeTag(ProtoReader input, Span.Builder span) throws IOException {
+    // now, we are in the tag fields
+    String key = null, value = ""; // empty tags allowed
+
+    boolean done = false;
+    while (!done) {
+      int tag = input.nextTag();
+      switch (tag) {
+        case -1:
+          done = true;
+          break;
+        case 1: {
+          key = input.readString();
+          break;
+        }
+        case 2: {
+          value = input.readString();
+          break;
+        }
+        default: {
+          logAndSkip(input, tag);
+          break;
+        }
+      }
+    }
+
+    if (key == null) return false;
+    span.putTag(key, value);
+    return true;
+  }
+
+  static boolean decodeAnnotation(ProtoReader input, Span.Builder span) throws IOException {
+    long timestamp = 0L;
+    String value = null;
+
+    boolean done = false;
+    while (!done) {
+      int tag = input.nextTag();
+      switch (tag) {
+        case -1:
+          done = true;
+          break;
+        case 1: {
+          timestamp = input.readFixed64();
+          break;
+        }
+        case 2: {
+          value = input.readString();
+          break;
+        }
+        default: {
+          logAndSkip(input, tag);
+          break;
+        }
+      }
+    }
+
+    if (timestamp == 0L || value == null) return false;
+    span.addAnnotation(timestamp, value);
+    return true;
+  }
+
+  private static Endpoint decodeEndpoint(ProtoReader input) throws IOException {
+    Endpoint.Builder endpoint = Endpoint.newBuilder();
+
+    boolean done = false;
+    while (!done) {
+      int tag = input.nextTag();
+      switch (tag) {
+        case -1:
+          done = true;
+          break;
+        case 1: {
+          String s = input.readString();
+          endpoint.serviceName(s);
+          break;
+        }
+        case 2:
+        case 3: {
+          endpoint.parseIp(input.readBytes().toByteArray());
+          break;
+        }
+        case 4: {
+          endpoint.port(input.readVarint32());
+          break;
+        }
+        default: {
+          logAndSkip(input, tag);
+          break;
+        }
+      }
+    }
+    return endpoint.build();
+  }
+
+  public static Span decodeOne(ProtoReader input) throws IOException {
+    Span.Builder span = Span.newBuilder();
+
+    boolean done = false;
+    while (!done) {
+      int tag = input.nextTag();
+      switch (tag) {
+        case -1:
+          done = true;
+          break;
+        case 1: {
+          span.traceId(readHexString(input));
+          break;
+        }
+        case 2: {
+          span.parentId(readHexString(input));
+          break;
+        }
+        case 3: {
+          span.id(readHexString(input));
+          break;
+        }
+        case 4: {
+          int kind = input.readVarint32();
+          if (kind == 0) break;
+          if (kind > Span.Kind.values().length) break;
+          span.kind(Span.Kind.values()[kind - 1]);
+          break;
+        }
+        case 5: {
+          String name = input.readString();
+          span.name(name);
+          break;
+        }
+        case 6: {
+          span.timestamp(input.readFixed64());
+          break;
+        }
+        case 7: {
+          span.duration(input.readVarint64());
+          break;
+        }
+        case 8: {
+          long token = input.beginMessage();
+
+          span.localEndpoint(decodeEndpoint(input));
+
+          input.endMessage(token);
+          break;
+        }
+        case 9: {
+          long token = input.beginMessage();
+
+          span.remoteEndpoint(decodeEndpoint(input));
+
+          input.endMessage(token);
+          break;
+        }
+        case 10: {
+          long token = input.beginMessage();
+
+          decodeAnnotation(input, span);
+
+          input.endMessage(token);
+          break;
+        }
+        case 11: {
+          long token = input.beginMessage();
+
+          decodeTag(input, span);
+
+          input.endMessage(token);
+          break;
+        }
+        case 12: {
+          span.debug(ProtoAdapter.BOOL.decode(input));
+          break;
+        }
+        case 13: {
+          span.shared(ProtoAdapter.BOOL.decode(input));
+          break;
+        }
+        default: {
+          logAndSkip(input, tag);
+          break;
+        }
+      }
+    }
+
+    return span.build();
+  }
+
+  public static List<Span> decodeList(byte[] spans) {
+    return decodeList(new ProtoReader(new Buffer().write(spans)));
+  }
+
+  public static List<Span> decodeList(ByteBuffer spans) {
+    Buffer buffer = new Buffer();
+    try {
+      buffer.write(spans);
+    } catch (IOException e) {
+      throw new AssertionError(e); // no I/O
+    }
+    return decodeList(new ProtoReader(buffer));
+  }
+
+  public static List<Span> decodeList(ProtoReader input) {
+    ArrayList<Span> spans = new ArrayList<>();
+
+    final long token;
+    try {
+      token = input.beginMessage();
+    } catch (IOException e) {
+      throw new UncheckedIOException(e);
+    }
+
+    try {
+      boolean done = false;
+      while (!done) {
+        int tag = input.nextTag();
+        switch (tag) {
+          case -1:
+            done = true;
+            break;
+          case 1: {
+            long subToken = input.beginMessage();
+
+            spans.add(decodeOne(input));
+
+            input.endMessage(subToken);
+            break;
+          }
+          default: {
+            logAndSkip(input, tag);
+            break;
+          }
+        }
+      }
+    } catch (IOException e) {
+      throw new RuntimeException(e);
+    }
+
+    try {
+      input.endMessage(token);
+    } catch (IOException e) {
+      throw new UncheckedIOException(e);
+    }
+
+    return spans;
+  }
+
+  private static String readHexString(ProtoReader input) throws IOException {
+    return input.readBytes().hex();
+  }
+
+  static void logAndSkip(ProtoReader input, int tag) throws IOException {
+    if (DEBUG) { // avoiding volatile reads as we don't log on skip in our normal codec
+      int nextWireType = WireFormat.getTagWireType(tag);
+      int nextFieldNumber = WireFormat.getTagFieldNumber(tag);
+      LOG.fine(String.format("Skipping field: byte=%s, fieldNumber=%s, wireType=%s",
+        0, nextFieldNumber, nextWireType));
+    }
+    input.skip();
+  }
+}