You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@zipkin.apache.org by ad...@apache.org on 2019/05/15 12:44:33 UTC
[incubator-zipkin] branch master updated: Decode ByteBuffer
directly where possible. (#2595)
This is an automated email from the ASF dual-hosted git repository.
adriancole pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-zipkin.git
The following commit(s) were added to refs/heads/master by this push:
new a4d145d Decode ByteBuffer directly where possible. (#2595)
a4d145d is described below
commit a4d145dfcb27e763d4593f6f5cb5a6d7df83c1be
Author: Anuraag Agrawal <an...@gmail.com>
AuthorDate: Wed May 15 21:44:27 2019 +0900
Decode ByteBuffer directly where possible. (#2595)
* Decode ByteBuffer directly where possible.
* Move empty check after empty JSON massaging, and don't increment dropped messages both before returning an exception and when handling the exception.
* Fix bytebuffer wrapping.
---
.../server/internal/ZipkinHttpCollector.java | 137 +++++++++++++--------
.../java/zipkin2/SpanBytesDecoderDetector.java | 32 +++--
2 files changed, 107 insertions(+), 62 deletions(-)
diff --git a/zipkin-server/src/main/java/zipkin2/server/internal/ZipkinHttpCollector.java b/zipkin-server/src/main/java/zipkin2/server/internal/ZipkinHttpCollector.java
index 897f63e..e894104 100644
--- a/zipkin-server/src/main/java/zipkin2/server/internal/ZipkinHttpCollector.java
+++ b/zipkin-server/src/main/java/zipkin2/server/internal/ZipkinHttpCollector.java
@@ -25,18 +25,20 @@ import com.linecorp.armeria.common.HttpResponse;
import com.linecorp.armeria.common.HttpStatus;
import com.linecorp.armeria.common.MediaType;
import com.linecorp.armeria.common.RequestContext;
+import com.linecorp.armeria.common.ResponseHeaders;
import com.linecorp.armeria.server.ServiceRequestContext;
import com.linecorp.armeria.server.annotation.Consumes;
import com.linecorp.armeria.server.annotation.ConsumesJson;
import com.linecorp.armeria.server.annotation.ExceptionHandler;
import com.linecorp.armeria.server.annotation.ExceptionHandlerFunction;
import com.linecorp.armeria.server.annotation.Post;
-import com.linecorp.armeria.server.annotation.RequestConverter;
-import com.linecorp.armeria.server.annotation.RequestConverterFunction;
+import io.netty.buffer.ByteBufHolder;
+import io.netty.util.ReferenceCountUtil;
import java.lang.annotation.ElementType;
import java.lang.annotation.Retention;
import java.lang.annotation.RetentionPolicy;
import java.lang.annotation.Target;
+import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.CompletableFuture;
@@ -58,7 +60,6 @@ import static com.linecorp.armeria.common.HttpStatus.INTERNAL_SERVER_ERROR;
import static zipkin2.server.internal.BodyIsExceptionMessage.testForUnexpectedFormat;
@ConditionalOnProperty(name = "zipkin.collector.http.enabled", matchIfMissing = true)
-@RequestConverter(UnzippingBytesRequestConverter.class)
@ExceptionHandler(BodyIsExceptionMessage.class)
public class ZipkinHttpCollector {
static final Logger LOGGER = LogManager.getLogger();
@@ -74,66 +75,101 @@ public class ZipkinHttpCollector {
}
@Post("/api/v2/spans")
- public HttpResponse uploadSpans(byte[] serializedSpans) {
- return validateAndStoreSpans(SpanBytesDecoder.JSON_V2, serializedSpans);
+ public HttpResponse uploadSpans(ServiceRequestContext ctx, HttpRequest req) {
+ return validateAndStoreSpans(SpanBytesDecoder.JSON_V2, ctx, req);
}
@Post("/api/v2/spans")
@ConsumesJson
- public HttpResponse uploadSpansJson(byte[] serializedSpans) {
- return validateAndStoreSpans(SpanBytesDecoder.JSON_V2, serializedSpans);
+ public HttpResponse uploadSpansJson(ServiceRequestContext ctx, HttpRequest req) {
+ return validateAndStoreSpans(SpanBytesDecoder.JSON_V2, ctx, req);
}
@Post("/api/v2/spans")
@ConsumesProtobuf
- public HttpResponse uploadSpansProtobuf(byte[] serializedSpans) {
- return validateAndStoreSpans(SpanBytesDecoder.PROTO3, serializedSpans);
+ public HttpResponse uploadSpansProtobuf(ServiceRequestContext ctx, HttpRequest req) {
+ return validateAndStoreSpans(SpanBytesDecoder.PROTO3, ctx, req);
}
@Post("/api/v1/spans")
- public HttpResponse uploadSpansV1(byte[] serializedSpans) {
- return validateAndStoreSpans(SpanBytesDecoder.JSON_V1, serializedSpans);
+ public HttpResponse uploadSpansV1(ServiceRequestContext ctx, HttpRequest req) {
+ return validateAndStoreSpans(SpanBytesDecoder.JSON_V1, ctx, req);
}
@Post("/api/v1/spans")
@ConsumesJson
- public HttpResponse uploadSpansV1Json(byte[] serializedSpans) {
- return validateAndStoreSpans(SpanBytesDecoder.JSON_V1, serializedSpans);
+ public HttpResponse uploadSpansV1Json(ServiceRequestContext ctx, HttpRequest req) {
+ return validateAndStoreSpans(SpanBytesDecoder.JSON_V1, ctx, req);
}
@Post("/api/v1/spans")
@ConsumesThrift
- public HttpResponse uploadSpansV1Thrift(byte[] serializedSpans) {
- return validateAndStoreSpans(SpanBytesDecoder.THRIFT, serializedSpans);
+ public HttpResponse uploadSpansV1Thrift(ServiceRequestContext ctx, HttpRequest req) {
+ return validateAndStoreSpans(SpanBytesDecoder.THRIFT, ctx, req);
}
/** This synchronously decodes the message so that users can see data errors. */
- HttpResponse validateAndStoreSpans(SpanBytesDecoder decoder, byte[] serializedSpans) {
- // logging already handled upstream in UnzippingBytesRequestConverter where request context exists
- if (serializedSpans.length == 0) return HttpResponse.of(HttpStatus.ACCEPTED);
- try {
- SpanBytesDecoderDetector.decoderForListMessage(serializedSpans);
- } catch (IllegalArgumentException e) {
- metrics.incrementMessagesDropped();
- return HttpResponse.of(
- BAD_REQUEST, MediaType.PLAIN_TEXT_UTF_8, "Expected a " + decoder + " encoded list\n");
- }
+ HttpResponse validateAndStoreSpans(SpanBytesDecoder decoder, ServiceRequestContext ctx, HttpRequest req) {
+ CompletableCallback result = new CompletableCallback();
- SpanBytesDecoder unexpectedDecoder = testForUnexpectedFormat(decoder, serializedSpans);
- if (unexpectedDecoder != null) {
- metrics.incrementMessagesDropped();
- return HttpResponse.of(
- BAD_REQUEST, MediaType.PLAIN_TEXT_UTF_8,
- "Expected a " + decoder + " encoded list, but received: " + unexpectedDecoder + "\n");
- }
+ req.aggregateWithPooledObjects(ctx.eventLoop(), ctx.alloc()).handle((msg, t) -> {
+ if (t != null) {
+ result.onError(t);
+ return null;
+ }
+
+ try {
+ final HttpData content;
+ try {
+ content = UnzippingBytesRequestConverter.convertRequest(ctx, msg);
+ } catch (IllegalArgumentException e) {
+ result.onError(e);
+ return null;
+ }
+
+ // logging already handled upstream in UnzippingBytesRequestConverter where request context exists
+ if (content.isEmpty()) {
+ result.onSuccess(null);
+ return null;
+ }
+
+ final ByteBuffer nioBuffer;
+ if (content instanceof ByteBufHolder) {
+ nioBuffer = ((ByteBufHolder) content).content().nioBuffer();
+ } else {
+ // Currently this will happen for gzip spans. Need to fix armeria's gzip decoder to allow
+ // returning pooled buffers on request.
+ nioBuffer = ByteBuffer.wrap(content.array(), content.offset(), content.length());
+ }
+
+ try {
+ SpanBytesDecoderDetector.decoderForListMessage(nioBuffer);
+ } catch (IllegalArgumentException e) {
+ result.onError(new IllegalArgumentException("Expected a " + decoder + " encoded list\n"));
+ return null;
+ }
+
+ SpanBytesDecoder unexpectedDecoder = testForUnexpectedFormat(decoder, nioBuffer);
+ if (unexpectedDecoder != null) {
+ result.onError(new IllegalArgumentException(
+ "Expected a " + decoder + " encoded list, but received: " + unexpectedDecoder + "\n"));
+ return null;
+ }
+
+ List<Span> spans = new ArrayList<>();
+ if (!decoder.decodeList(nioBuffer, spans)) {
+ result.onError(new IllegalArgumentException("Empty " + decoder.name() + " message"));
+ return null;
+ }
+ // UnzippingBytesRequestConverter handles incrementing message and bytes
+ collector.accept(spans, result);
+ } finally {
+ ReferenceCountUtil.release(msg.content());
+ }
+
+ return null;
+ });
- CompletableCallback result = new CompletableCallback();
- List<Span> spans = new ArrayList<>();
- if (!decoder.decodeList(serializedSpans, spans)) {
- throw new IllegalArgumentException("Empty " + decoder.name() + " message");
- }
- // UnzippingBytesRequestConverter handles incrementing message and bytes
- collector.accept(spans, result);
return HttpResponse.from(result);
}
@@ -158,8 +194,10 @@ public class ZipkinHttpCollector {
final class CompletableCallback extends CompletableFuture<HttpResponse>
implements Callback<Void> {
+ static final ResponseHeaders ACCEPTED_RESPONSE = ResponseHeaders.of(HttpStatus.ACCEPTED);
+
@Override public void onSuccess(Void value) {
- complete(HttpResponse.of(HttpStatus.ACCEPTED));
+ complete(HttpResponse.of(ACCEPTED_RESPONSE));
}
@Override public void onError(Throwable t) {
@@ -167,11 +205,10 @@ final class CompletableCallback extends CompletableFuture<HttpResponse>
}
}
-final class UnzippingBytesRequestConverter implements RequestConverterFunction {
+final class UnzippingBytesRequestConverter {
static final GzipStreamDecoderFactory GZIP_DECODER_FACTORY = new GzipStreamDecoderFactory();
- @Override public Object convertRequest(ServiceRequestContext ctx, AggregatedHttpMessage request,
- Class<?> expectedResultType) {
+ static HttpData convertRequest(ServiceRequestContext ctx, AggregatedHttpMessage request) {
ZipkinHttpCollector.metrics.incrementMessages();
String encoding = request.headers().get(HttpHeaderNames.CONTENT_ENCODING);
HttpData content = request.content();
@@ -187,12 +224,12 @@ final class UnzippingBytesRequestConverter implements RequestConverterFunction {
if (content.isEmpty()) ZipkinHttpCollector.maybeLog("Empty POST body", ctx, request);
if (content.length() == 2 && "[]".equals(content.toStringAscii())) {
ZipkinHttpCollector.maybeLog("Empty JSON list POST body", ctx, request);
+ ReferenceCountUtil.release(content);
content = HttpData.EMPTY_DATA;
}
- byte[] result = content.array();
- ZipkinHttpCollector.metrics.incrementBytes(result.length);
- return result;
+ ZipkinHttpCollector.metrics.incrementBytes(content.length());
+ return content;
}
}
@@ -212,7 +249,7 @@ final class BodyIsExceptionMessage implements ExceptionHandlerFunction {
* Some formats clash on partial data. For example, a v1 and v2 span is identical if only the span
* name is sent. This looks for unexpected data format.
*/
- static SpanBytesDecoder testForUnexpectedFormat(BytesDecoder<Span> decoder, byte[] body) {
+ static SpanBytesDecoder testForUnexpectedFormat(BytesDecoder<Span> decoder, ByteBuffer body) {
if (decoder == SpanBytesDecoder.JSON_V2) {
if (contains(body, BINARY_ANNOTATION_FIELD_SUFFIX)) {
return SpanBytesDecoder.JSON_V1;
@@ -231,11 +268,11 @@ final class BodyIsExceptionMessage implements ExceptionHandlerFunction {
static final byte[] ENDPOINT_FIELD_SUFFIX = {'E', 'n', 'd', 'p', 'o', 'i', 'n', 't', '"'};
static final byte[] TAGS_FIELD = {'"', 't', 'a', 'g', 's', '"'};
- static boolean contains(byte[] bytes, byte[] subsequence) {
+ static boolean contains(ByteBuffer bytes, byte[] subsequence) {
bytes:
- for (int i = 0; i < bytes.length - subsequence.length + 1; i++) {
+ for (int i = 0; i < bytes.remaining() - subsequence.length + 1; i++) {
for (int j = 0; j < subsequence.length; j++) {
- if (bytes[i + j] != subsequence[j]) {
+ if (bytes.get(bytes.position() + i + j) != subsequence[j]) {
continue bytes;
}
}
diff --git a/zipkin/src/main/java/zipkin2/SpanBytesDecoderDetector.java b/zipkin/src/main/java/zipkin2/SpanBytesDecoderDetector.java
index 7a451a4..42e2aa3 100644
--- a/zipkin/src/main/java/zipkin2/SpanBytesDecoderDetector.java
+++ b/zipkin/src/main/java/zipkin2/SpanBytesDecoderDetector.java
@@ -16,6 +16,7 @@
*/
package zipkin2;
+import java.nio.ByteBuffer;
import zipkin2.codec.BytesDecoder;
import zipkin2.codec.SpanBytesDecoder;
@@ -59,7 +60,7 @@ public final class SpanBytesDecoderDetector {
/** @throws IllegalArgumentException if the input isn't a v1 json or thrift single-span message */
public static BytesDecoder<Span> decoderForMessage(byte[] span) {
- BytesDecoder<Span> decoder = detectDecoder(span);
+ BytesDecoder<Span> decoder = detectDecoder(ByteBuffer.wrap(span));
if (span[0] == 12 /* List[ThriftSpan] */ || span[0] == '[') {
throw new IllegalArgumentException("Expected json or thrift object, not list encoding");
}
@@ -71,21 +72,27 @@ public final class SpanBytesDecoderDetector {
/** @throws IllegalArgumentException if the input isn't a json, proto3 or thrift list message. */
public static BytesDecoder<Span> decoderForListMessage(byte[] spans) {
+ return decoderForListMessage(ByteBuffer.wrap(spans));
+ }
+
+ public static BytesDecoder<Span> decoderForListMessage(ByteBuffer spans) {
BytesDecoder<Span> decoder = detectDecoder(spans);
- if (spans[0] != 12 /* List[ThriftSpan] */
- && spans[0] != 11 /* apache/incubator-zipkin-reporter-java#133 */
- && !protobuf3(spans) && spans[0] != '[') {
+ byte first = spans.get(spans.position());
+ if (first != 12 /* List[ThriftSpan] */
+ && first != 11 /* apache/incubator-zipkin-reporter-java#133 */
+ && !protobuf3(spans) && first != '[') {
throw new IllegalArgumentException("Expected json, proto3 or thrift list encoding");
}
return decoder;
}
/** @throws IllegalArgumentException if the input isn't a json or thrift list or object. */
- static BytesDecoder<Span> detectDecoder(byte[] bytes) {
- if (bytes[0] <= 16) { // binary format
+ static BytesDecoder<Span> detectDecoder(ByteBuffer bytes) {
+ byte first = bytes.get(bytes.position());
+ if (first <= 16) { // binary format
if (protobuf3(bytes)) return SpanBytesDecoder.PROTO3;
return SpanBytesDecoder.THRIFT; /* the first byte is the TType, in a range 0-16 */
- } else if (bytes[0] != '[' && bytes[0] != '{') {
+ } else if (first != '[' && first != '{') {
throw new IllegalArgumentException("Could not detect the span format");
}
if (contains(bytes, ENDPOINT_FIELD_SUFFIX)) return SpanBytesDecoder.JSON_V2;
@@ -93,11 +100,11 @@ public final class SpanBytesDecoderDetector {
return SpanBytesDecoder.JSON_V1;
}
- static boolean contains(byte[] bytes, byte[] subsequence) {
+ static boolean contains(ByteBuffer bytes, byte[] subsequence) {
bytes:
- for (int i = 0; i < bytes.length - subsequence.length + 1; i++) {
+ for (int i = 0; i < bytes.remaining() - subsequence.length + 1; i++) {
for (int j = 0; j < subsequence.length; j++) {
- if (bytes[i + j] != subsequence[j]) {
+ if (bytes.get(bytes.position() + i + j) != subsequence[j]) {
continue bytes;
}
}
@@ -107,8 +114,9 @@ public final class SpanBytesDecoderDetector {
}
/* span key or trace ID key */
- static boolean protobuf3(byte[] bytes) {
- return bytes[0] == 10 && bytes[1] != 0; // varint follows and won't be zero
+ static boolean protobuf3(ByteBuffer bytes) {
+ // varint follows and won't be zero
+ return bytes.get(bytes.position()) == 10 && bytes.get(bytes.position() + 1) != 0;
}
SpanBytesDecoderDetector() {}