You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@zipkin.apache.org by ad...@apache.org on 2019/05/15 12:44:33 UTC

[incubator-zipkin] branch master updated: Decode ByteBuffer directly where possible. (#2595)

This is an automated email from the ASF dual-hosted git repository.

adriancole pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-zipkin.git


The following commit(s) were added to refs/heads/master by this push:
     new a4d145d  Decode ByteBuffer directly where possible. (#2595)
a4d145d is described below

commit a4d145dfcb27e763d4593f6f5cb5a6d7df83c1be
Author: Anuraag Agrawal <an...@gmail.com>
AuthorDate: Wed May 15 21:44:27 2019 +0900

    Decode ByteBuffer directly where possible. (#2595)
    
    * Decode ByteBuffer directly where possible.
    
    * Move empty check after empty JSON massaging, and don't increment dropped messages both before returning an exception and when handling the exception.
    
    * Fix bytebuffer wrapping.
---
 .../server/internal/ZipkinHttpCollector.java       | 137 +++++++++++++--------
 .../java/zipkin2/SpanBytesDecoderDetector.java     |  32 +++--
 2 files changed, 107 insertions(+), 62 deletions(-)

diff --git a/zipkin-server/src/main/java/zipkin2/server/internal/ZipkinHttpCollector.java b/zipkin-server/src/main/java/zipkin2/server/internal/ZipkinHttpCollector.java
index 897f63e..e894104 100644
--- a/zipkin-server/src/main/java/zipkin2/server/internal/ZipkinHttpCollector.java
+++ b/zipkin-server/src/main/java/zipkin2/server/internal/ZipkinHttpCollector.java
@@ -25,18 +25,20 @@ import com.linecorp.armeria.common.HttpResponse;
 import com.linecorp.armeria.common.HttpStatus;
 import com.linecorp.armeria.common.MediaType;
 import com.linecorp.armeria.common.RequestContext;
+import com.linecorp.armeria.common.ResponseHeaders;
 import com.linecorp.armeria.server.ServiceRequestContext;
 import com.linecorp.armeria.server.annotation.Consumes;
 import com.linecorp.armeria.server.annotation.ConsumesJson;
 import com.linecorp.armeria.server.annotation.ExceptionHandler;
 import com.linecorp.armeria.server.annotation.ExceptionHandlerFunction;
 import com.linecorp.armeria.server.annotation.Post;
-import com.linecorp.armeria.server.annotation.RequestConverter;
-import com.linecorp.armeria.server.annotation.RequestConverterFunction;
+import io.netty.buffer.ByteBufHolder;
+import io.netty.util.ReferenceCountUtil;
 import java.lang.annotation.ElementType;
 import java.lang.annotation.Retention;
 import java.lang.annotation.RetentionPolicy;
 import java.lang.annotation.Target;
+import java.nio.ByteBuffer;
 import java.util.ArrayList;
 import java.util.List;
 import java.util.concurrent.CompletableFuture;
@@ -58,7 +60,6 @@ import static com.linecorp.armeria.common.HttpStatus.INTERNAL_SERVER_ERROR;
 import static zipkin2.server.internal.BodyIsExceptionMessage.testForUnexpectedFormat;
 
 @ConditionalOnProperty(name = "zipkin.collector.http.enabled", matchIfMissing = true)
-@RequestConverter(UnzippingBytesRequestConverter.class)
 @ExceptionHandler(BodyIsExceptionMessage.class)
 public class ZipkinHttpCollector {
   static final Logger LOGGER = LogManager.getLogger();
@@ -74,66 +75,101 @@ public class ZipkinHttpCollector {
   }
 
   @Post("/api/v2/spans")
-  public HttpResponse uploadSpans(byte[] serializedSpans) {
-    return validateAndStoreSpans(SpanBytesDecoder.JSON_V2, serializedSpans);
+  public HttpResponse uploadSpans(ServiceRequestContext ctx, HttpRequest req) {
+    return validateAndStoreSpans(SpanBytesDecoder.JSON_V2, ctx, req);
   }
 
   @Post("/api/v2/spans")
   @ConsumesJson
-  public HttpResponse uploadSpansJson(byte[] serializedSpans) {
-    return validateAndStoreSpans(SpanBytesDecoder.JSON_V2, serializedSpans);
+  public HttpResponse uploadSpansJson(ServiceRequestContext ctx, HttpRequest req) {
+    return validateAndStoreSpans(SpanBytesDecoder.JSON_V2, ctx, req);
   }
 
   @Post("/api/v2/spans")
   @ConsumesProtobuf
-  public HttpResponse uploadSpansProtobuf(byte[] serializedSpans) {
-    return validateAndStoreSpans(SpanBytesDecoder.PROTO3, serializedSpans);
+  public HttpResponse uploadSpansProtobuf(ServiceRequestContext ctx, HttpRequest req) {
+    return validateAndStoreSpans(SpanBytesDecoder.PROTO3, ctx, req);
   }
 
   @Post("/api/v1/spans")
-  public HttpResponse uploadSpansV1(byte[] serializedSpans) {
-    return validateAndStoreSpans(SpanBytesDecoder.JSON_V1, serializedSpans);
+  public HttpResponse uploadSpansV1(ServiceRequestContext ctx, HttpRequest req) {
+    return validateAndStoreSpans(SpanBytesDecoder.JSON_V1, ctx, req);
   }
 
   @Post("/api/v1/spans")
   @ConsumesJson
-  public HttpResponse uploadSpansV1Json(byte[] serializedSpans) {
-    return validateAndStoreSpans(SpanBytesDecoder.JSON_V1, serializedSpans);
+  public HttpResponse uploadSpansV1Json(ServiceRequestContext ctx, HttpRequest req) {
+    return validateAndStoreSpans(SpanBytesDecoder.JSON_V1, ctx, req);
   }
 
   @Post("/api/v1/spans")
   @ConsumesThrift
-  public HttpResponse uploadSpansV1Thrift(byte[] serializedSpans) {
-    return validateAndStoreSpans(SpanBytesDecoder.THRIFT, serializedSpans);
+  public HttpResponse uploadSpansV1Thrift(ServiceRequestContext ctx, HttpRequest req) {
+    return validateAndStoreSpans(SpanBytesDecoder.THRIFT, ctx, req);
   }
 
   /** This synchronously decodes the message so that users can see data errors. */
-  HttpResponse validateAndStoreSpans(SpanBytesDecoder decoder, byte[] serializedSpans) {
-    // logging already handled upstream in UnzippingBytesRequestConverter where request context exists
-    if (serializedSpans.length == 0) return HttpResponse.of(HttpStatus.ACCEPTED);
-    try {
-      SpanBytesDecoderDetector.decoderForListMessage(serializedSpans);
-    } catch (IllegalArgumentException e) {
-      metrics.incrementMessagesDropped();
-      return HttpResponse.of(
-        BAD_REQUEST, MediaType.PLAIN_TEXT_UTF_8, "Expected a " + decoder + " encoded list\n");
-    }
+  HttpResponse validateAndStoreSpans(SpanBytesDecoder decoder, ServiceRequestContext ctx, HttpRequest req) {
+    CompletableCallback result = new CompletableCallback();
 
-    SpanBytesDecoder unexpectedDecoder = testForUnexpectedFormat(decoder, serializedSpans);
-    if (unexpectedDecoder != null) {
-      metrics.incrementMessagesDropped();
-      return HttpResponse.of(
-        BAD_REQUEST, MediaType.PLAIN_TEXT_UTF_8,
-        "Expected a " + decoder + " encoded list, but received: " + unexpectedDecoder + "\n");
-    }
+    req.aggregateWithPooledObjects(ctx.eventLoop(), ctx.alloc()).handle((msg, t) -> {
+      if (t != null) {
+        result.onError(t);
+        return null;
+      }
+
+      try {
+        final HttpData content;
+        try {
+          content = UnzippingBytesRequestConverter.convertRequest(ctx, msg);
+        } catch (IllegalArgumentException e) {
+          result.onError(e);
+          return null;
+        }
+
+        // logging already handled upstream in UnzippingBytesRequestConverter where request context exists
+        if (content.isEmpty()) {
+          result.onSuccess(null);
+          return null;
+        }
+
+        final ByteBuffer nioBuffer;
+        if (content instanceof ByteBufHolder) {
+          nioBuffer = ((ByteBufHolder) content).content().nioBuffer();
+        } else {
+          // Currently this will happen for gzip spans. Need to fix armeria's gzip decoder to allow
+          // returning pooled buffers on request.
+          nioBuffer = ByteBuffer.wrap(content.array(), content.offset(), content.length());
+        }
+
+        try {
+          SpanBytesDecoderDetector.decoderForListMessage(nioBuffer);
+        } catch (IllegalArgumentException e) {
+          result.onError(new IllegalArgumentException("Expected a " + decoder + " encoded list\n"));
+          return null;
+        }
+
+        SpanBytesDecoder unexpectedDecoder = testForUnexpectedFormat(decoder, nioBuffer);
+        if (unexpectedDecoder != null) {
+          result.onError(new IllegalArgumentException(
+            "Expected a " + decoder + " encoded list, but received: " + unexpectedDecoder + "\n"));
+          return null;
+        }
+
+        List<Span> spans = new ArrayList<>();
+        if (!decoder.decodeList(nioBuffer, spans)) {
+          result.onError(new IllegalArgumentException("Empty " + decoder.name() + " message"));
+          return null;
+        }
+        // UnzippingBytesRequestConverter handles incrementing message and bytes
+        collector.accept(spans, result);
+      } finally {
+        ReferenceCountUtil.release(msg.content());
+      }
+
+      return null;
+    });
 
-    CompletableCallback result = new CompletableCallback();
-    List<Span> spans = new ArrayList<>();
-    if (!decoder.decodeList(serializedSpans, spans)) {
-      throw new IllegalArgumentException("Empty " + decoder.name() + " message");
-    }
-    // UnzippingBytesRequestConverter handles incrementing message and bytes
-    collector.accept(spans, result);
     return HttpResponse.from(result);
   }
 
@@ -158,8 +194,10 @@ public class ZipkinHttpCollector {
 final class CompletableCallback extends CompletableFuture<HttpResponse>
   implements Callback<Void> {
 
+  static final ResponseHeaders ACCEPTED_RESPONSE = ResponseHeaders.of(HttpStatus.ACCEPTED);
+
   @Override public void onSuccess(Void value) {
-    complete(HttpResponse.of(HttpStatus.ACCEPTED));
+    complete(HttpResponse.of(ACCEPTED_RESPONSE));
   }
 
   @Override public void onError(Throwable t) {
@@ -167,11 +205,10 @@ final class CompletableCallback extends CompletableFuture<HttpResponse>
   }
 }
 
-final class UnzippingBytesRequestConverter implements RequestConverterFunction {
+final class UnzippingBytesRequestConverter {
   static final GzipStreamDecoderFactory GZIP_DECODER_FACTORY = new GzipStreamDecoderFactory();
 
-  @Override public Object convertRequest(ServiceRequestContext ctx, AggregatedHttpMessage request,
-    Class<?> expectedResultType) {
+  static HttpData convertRequest(ServiceRequestContext ctx, AggregatedHttpMessage request) {
     ZipkinHttpCollector.metrics.incrementMessages();
     String encoding = request.headers().get(HttpHeaderNames.CONTENT_ENCODING);
     HttpData content = request.content();
@@ -187,12 +224,12 @@ final class UnzippingBytesRequestConverter implements RequestConverterFunction {
     if (content.isEmpty()) ZipkinHttpCollector.maybeLog("Empty POST body", ctx, request);
     if (content.length() == 2 && "[]".equals(content.toStringAscii())) {
       ZipkinHttpCollector.maybeLog("Empty JSON list POST body", ctx, request);
+      ReferenceCountUtil.release(content);
       content = HttpData.EMPTY_DATA;
     }
 
-    byte[] result = content.array();
-    ZipkinHttpCollector.metrics.incrementBytes(result.length);
-    return result;
+    ZipkinHttpCollector.metrics.incrementBytes(content.length());
+    return content;
   }
 }
 
@@ -212,7 +249,7 @@ final class BodyIsExceptionMessage implements ExceptionHandlerFunction {
    * Some formats clash on partial data. For example, a v1 and v2 span is identical if only the span
    * name is sent. This looks for unexpected data format.
    */
-  static SpanBytesDecoder testForUnexpectedFormat(BytesDecoder<Span> decoder, byte[] body) {
+  static SpanBytesDecoder testForUnexpectedFormat(BytesDecoder<Span> decoder, ByteBuffer body) {
     if (decoder == SpanBytesDecoder.JSON_V2) {
       if (contains(body, BINARY_ANNOTATION_FIELD_SUFFIX)) {
         return SpanBytesDecoder.JSON_V1;
@@ -231,11 +268,11 @@ final class BodyIsExceptionMessage implements ExceptionHandlerFunction {
   static final byte[] ENDPOINT_FIELD_SUFFIX = {'E', 'n', 'd', 'p', 'o', 'i', 'n', 't', '"'};
   static final byte[] TAGS_FIELD = {'"', 't', 'a', 'g', 's', '"'};
 
-  static boolean contains(byte[] bytes, byte[] subsequence) {
+  static boolean contains(ByteBuffer bytes, byte[] subsequence) {
     bytes:
-    for (int i = 0; i < bytes.length - subsequence.length + 1; i++) {
+    for (int i = 0; i < bytes.remaining() - subsequence.length + 1; i++) {
       for (int j = 0; j < subsequence.length; j++) {
-        if (bytes[i + j] != subsequence[j]) {
+        if (bytes.get(bytes.position() + i + j) != subsequence[j]) {
           continue bytes;
         }
       }
diff --git a/zipkin/src/main/java/zipkin2/SpanBytesDecoderDetector.java b/zipkin/src/main/java/zipkin2/SpanBytesDecoderDetector.java
index 7a451a4..42e2aa3 100644
--- a/zipkin/src/main/java/zipkin2/SpanBytesDecoderDetector.java
+++ b/zipkin/src/main/java/zipkin2/SpanBytesDecoderDetector.java
@@ -16,6 +16,7 @@
  */
 package zipkin2;
 
+import java.nio.ByteBuffer;
 import zipkin2.codec.BytesDecoder;
 import zipkin2.codec.SpanBytesDecoder;
 
@@ -59,7 +60,7 @@ public final class SpanBytesDecoderDetector {
 
   /** @throws IllegalArgumentException if the input isn't a v1 json or thrift single-span message */
   public static BytesDecoder<Span> decoderForMessage(byte[] span) {
-    BytesDecoder<Span> decoder = detectDecoder(span);
+    BytesDecoder<Span> decoder = detectDecoder(ByteBuffer.wrap(span));
     if (span[0] == 12 /* List[ThriftSpan] */ || span[0] == '[') {
       throw new IllegalArgumentException("Expected json or thrift object, not list encoding");
     }
@@ -71,21 +72,27 @@ public final class SpanBytesDecoderDetector {
 
   /** @throws IllegalArgumentException if the input isn't a json, proto3 or thrift list message. */
   public static BytesDecoder<Span> decoderForListMessage(byte[] spans) {
+    return decoderForListMessage(ByteBuffer.wrap(spans));
+  }
+
+  public static BytesDecoder<Span> decoderForListMessage(ByteBuffer spans) {
     BytesDecoder<Span> decoder = detectDecoder(spans);
-    if (spans[0] != 12 /* List[ThriftSpan] */
-      && spans[0] != 11 /* apache/incubator-zipkin-reporter-java#133 */
-      && !protobuf3(spans) && spans[0] != '[') {
+    byte first = spans.get(spans.position());
+    if (first != 12 /* List[ThriftSpan] */
+      && first != 11 /* apache/incubator-zipkin-reporter-java#133 */
+      && !protobuf3(spans) && first != '[') {
       throw new IllegalArgumentException("Expected json, proto3 or thrift list encoding");
     }
     return decoder;
   }
 
   /** @throws IllegalArgumentException if the input isn't a json or thrift list or object. */
-  static BytesDecoder<Span> detectDecoder(byte[] bytes) {
-    if (bytes[0] <= 16) { // binary format
+  static BytesDecoder<Span> detectDecoder(ByteBuffer bytes) {
+    byte first = bytes.get(bytes.position());
+    if (first <= 16) { // binary format
       if (protobuf3(bytes)) return SpanBytesDecoder.PROTO3;
       return SpanBytesDecoder.THRIFT; /* the first byte is the TType, in a range 0-16 */
-    } else if (bytes[0] != '[' && bytes[0] != '{') {
+    } else if (first != '[' && first != '{') {
       throw new IllegalArgumentException("Could not detect the span format");
     }
     if (contains(bytes, ENDPOINT_FIELD_SUFFIX)) return SpanBytesDecoder.JSON_V2;
@@ -93,11 +100,11 @@ public final class SpanBytesDecoderDetector {
     return SpanBytesDecoder.JSON_V1;
   }
 
-  static boolean contains(byte[] bytes, byte[] subsequence) {
+  static boolean contains(ByteBuffer bytes, byte[] subsequence) {
     bytes:
-    for (int i = 0; i < bytes.length - subsequence.length + 1; i++) {
+    for (int i = 0; i < bytes.remaining() - subsequence.length + 1; i++) {
       for (int j = 0; j < subsequence.length; j++) {
-        if (bytes[i + j] != subsequence[j]) {
+        if (bytes.get(bytes.position() + i + j) != subsequence[j]) {
           continue bytes;
         }
       }
@@ -107,8 +114,9 @@ public final class SpanBytesDecoderDetector {
   }
 
   /* span key or trace ID key */
-  static boolean protobuf3(byte[] bytes) {
-    return bytes[0] == 10 && bytes[1] != 0; // varint follows and won't be zero
+  static boolean protobuf3(ByteBuffer bytes) {
+    // varint follows and won't be zero
+    return bytes.get(bytes.position()) == 10 && bytes.get(bytes.position() + 1) != 0;
   }
 
   SpanBytesDecoderDetector() {}