You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@zipkin.apache.org by ad...@apache.org on 2019/05/12 02:42:56 UTC
[incubator-zipkin] 01/01: Consolidates buffers and generally
improves string decoding
This is an automated email from the ASF dual-hosted git repository.
adriancole pushed a commit to branch more-efficient-strings
in repository https://gitbox.apache.org/repos/asf/incubator-zipkin.git
commit a6031815427a391613ca792a29b2af685a6fc845
Author: Adrian Cole <ac...@pivotal.io>
AuthorDate: Sun May 12 10:42:39 2019 +0800
Consolidates buffers and generally improves string decoding
---
.../src/main/java/zipkin2/SpanBenchmarks.java | 30 ++++++
.../main/java/zipkin2/codec/CodecBenchmarks.java | 105 ++++++++-------------
.../java/zipkin2/codec/ProtobufSpanDecoder.java | 2 +-
.../main/java/zipkin2/codec/WireSpanDecoder.java | 2 +-
.../storage/cassandra/v1/CassandraUtil.java | 25 ++---
.../zipkin2/storage/cassandra/CassandraUtil.java | 30 +++---
.../elasticsearch/ElasticsearchSpanConsumer.java | 4 +-
.../elasticsearch/VersionSpecificTemplates.java | 4 +-
.../elasticsearch/internal/BulkCallBuilder.java | 1 -
.../elasticsearch/internal/BulkIndexWriter.java | 6 +-
zipkin/src/main/java/zipkin2/Endpoint.java | 16 +---
zipkin/src/main/java/zipkin2/Span.java | 6 +-
.../src/main/java/zipkin2/internal/Platform.java | 24 ++---
.../main/java/zipkin2/internal/Proto3Codec.java | 10 +-
.../main/java/zipkin2/internal/Proto3Fields.java | 11 ++-
.../java/zipkin2/internal/Proto3ZipkinFields.java | 4 +-
.../main/java/zipkin2/internal/ThriftCodec.java | 49 ++++++++--
.../java/zipkin2/internal/ThriftEndpointCodec.java | 3 +
.../main/java/zipkin2/internal/UnsafeBuffer.java | 18 +++-
.../java/zipkin2/internal/V1ThriftSpanReader.java | 21 +++--
.../java/zipkin2/codec/SpanBytesDecoderTest.java | 28 ++++--
.../java/zipkin2/codec/V1SpanBytesDecoderTest.java | 27 +++++-
.../java/zipkin2/internal/Proto3FieldsTest.java | 2 +-
23 files changed, 262 insertions(+), 166 deletions(-)
diff --git a/benchmarks/src/main/java/zipkin2/SpanBenchmarks.java b/benchmarks/src/main/java/zipkin2/SpanBenchmarks.java
index 40eed40..a30a1d2 100644
--- a/benchmarks/src/main/java/zipkin2/SpanBenchmarks.java
+++ b/benchmarks/src/main/java/zipkin2/SpanBenchmarks.java
@@ -16,6 +16,10 @@
*/
package zipkin2;
+import com.esotericsoftware.kryo.Kryo;
+import com.esotericsoftware.kryo.io.Input;
+import com.esotericsoftware.kryo.io.Output;
+import com.esotericsoftware.kryo.serializers.JavaSerializer;
import java.util.concurrent.TimeUnit;
import org.openjdk.jmh.annotations.Benchmark;
import org.openjdk.jmh.annotations.BenchmarkMode;
@@ -46,6 +50,7 @@ public class SpanBenchmarks {
Endpoint.newBuilder().serviceName("frontend").ip("127.0.0.1").build();
static final Endpoint BACKEND =
Endpoint.newBuilder().serviceName("backend").ip("192.168.99.101").port(9000).build();
+ static final Span clientSpan = buildClientSpan(Span.newBuilder());
final Span.Builder sharedBuilder;
@@ -113,6 +118,31 @@ public class SpanBenchmarks {
return sharedBuilder.clone().build();
}
+ static final Kryo kryo = new Kryo();
+ static final byte[] clientSpanSerialized;
+
+ static {
+ kryo.register(Span.class, new JavaSerializer());
+ Output output = new Output(4096);
+ kryo.writeObject(output, clientSpan);
+ output.flush();
+ clientSpanSerialized = output.getBuffer();
+ }
+
+ /** manually implemented with json so not as slow as normal java */
+ @Benchmark
+ public Span serialize_kryo() {
+ return kryo.readObject(new Input(clientSpanSerialized), Span.class);
+ }
+
+ @Benchmark
+ public byte[] deserialize_kryo() {
+ Output output = new Output(clientSpanSerialized.length);
+ kryo.writeObject(output, clientSpan);
+ output.flush();
+ return output.getBuffer();
+ }
+
// Convenience main entry-point
public static void main(String[] args) throws RunnerException {
Options opt = new OptionsBuilder()
diff --git a/benchmarks/src/main/java/zipkin2/codec/CodecBenchmarks.java b/benchmarks/src/main/java/zipkin2/codec/CodecBenchmarks.java
index 7591b88..b808ffc 100644
--- a/benchmarks/src/main/java/zipkin2/codec/CodecBenchmarks.java
+++ b/benchmarks/src/main/java/zipkin2/codec/CodecBenchmarks.java
@@ -16,10 +16,6 @@
*/
package zipkin2.codec;
-import com.esotericsoftware.kryo.Kryo;
-import com.esotericsoftware.kryo.io.Input;
-import com.esotericsoftware.kryo.io.Output;
-import com.esotericsoftware.kryo.serializers.JavaSerializer;
import com.google.common.io.ByteStreams;
import java.io.IOException;
import java.util.Collections;
@@ -61,137 +57,112 @@ import zipkin2.Span;
public class CodecBenchmarks {
static final byte[] clientSpanJsonV2 = read("/zipkin2-client.json");
static final Span clientSpan = SpanBytesDecoder.JSON_V2.decodeOne(clientSpanJsonV2);
+ static final byte[] clientSpanJsonV1 = SpanBytesEncoder.JSON_V1.encode(clientSpan);
static final byte[] clientSpanProto3 = SpanBytesEncoder.PROTO3.encode(clientSpan);
- static final zipkin2.proto3.Span clientSpan_wire;
+ static final byte[] clientSpanThrift = SpanBytesEncoder.THRIFT.encode(clientSpan);
static final List<Span> tenClientSpans = Collections.nCopies(10, clientSpan);
static final byte[] tenClientSpansJsonV2 = SpanBytesEncoder.JSON_V2.encodeList(tenClientSpans);
- static final Kryo kryo = new Kryo();
- static final byte[] clientSpanSerialized;
-
- static {
- kryo.register(Span.class, new JavaSerializer());
- Output output = new Output(4096);
- kryo.writeObject(output, clientSpan);
- output.flush();
- clientSpanSerialized = output.getBuffer();
- try {
- clientSpan_wire = zipkin2.proto3.Span.ADAPTER.decode(clientSpanProto3);
- } catch (IOException e) {
- throw new AssertionError(e);
- }
- }
-
- /** manually implemented with json so not as slow as normal java */
- @Benchmark
- public Span readClientSpan_kryo() {
- return kryo.readObject(new Input(clientSpanSerialized), Span.class);
- }
@Benchmark
- public byte[] writeClientSpan_kryo() {
- Output output = new Output(clientSpanSerialized.length);
- kryo.writeObject(output, clientSpan);
- output.flush();
- return output.getBuffer();
+ public Span readClientSpan_JSON_V1() {
+ return SpanBytesDecoder.JSON_V1.decodeOne(clientSpanJsonV1);
}
@Benchmark
- public Span readClientSpan_json() {
+ public Span readClientSpan_JSON_V2() {
return SpanBytesDecoder.JSON_V2.decodeOne(clientSpanJsonV2);
}
@Benchmark
- public Span readClientSpan_proto3() {
+ public Span readClientSpan_PROTO3() {
return SpanBytesDecoder.PROTO3.decodeOne(clientSpanProto3);
}
@Benchmark
- public zipkin2.proto3.Span readClientSpan_proto3_wire() throws Exception {
- return zipkin2.proto3.Span.ADAPTER.decode(clientSpanProto3);
+ public Span readClientSpan_THRIFT() {
+ return SpanBytesDecoder.THRIFT.decodeOne(clientSpanThrift);
}
@Benchmark
- public List<Span> readTenClientSpans_json() {
- return SpanBytesDecoder.JSON_V2.decodeList(tenClientSpansJsonV2);
- }
-
- @Benchmark
- public byte[] writeClientSpan_json() {
+ public byte[] writeClientSpan_JSON_V2() {
return SpanBytesEncoder.JSON_V2.encode(clientSpan);
}
@Benchmark
- public byte[] writeTenClientSpans_json() {
- return SpanBytesEncoder.JSON_V2.encodeList(tenClientSpans);
+ public byte[] writeClientSpan_JSON_V1() {
+ return SpanBytesEncoder.JSON_V1.encode(clientSpan);
}
@Benchmark
- public byte[] writeClientSpan_json_v1() {
- return SpanBytesEncoder.JSON_V1.encode(clientSpan);
+ public byte[] writeClientSpan_PROTO3() {
+ return SpanBytesEncoder.PROTO3.encode(clientSpan);
}
@Benchmark
- public byte[] writeTenClientSpans_json_v1() {
- return SpanBytesEncoder.JSON_V1.encodeList(tenClientSpans);
+ public byte[] writeClientSpan_THRIFT() {
+ return SpanBytesEncoder.THRIFT.encode(clientSpan);
}
@Benchmark
- public byte[] writeClientSpan_proto3() {
- return SpanBytesEncoder.PROTO3.encode(clientSpan);
+ public List<Span> readTenClientSpans_JSON_V2() {
+ return SpanBytesDecoder.JSON_V2.decodeList(tenClientSpansJsonV2);
}
@Benchmark
- public byte[] writeClientSpan_proto3_wire() {
- return clientSpan_wire.encode();
+ public byte[] writeTenClientSpans_JSON_V2() {
+ return SpanBytesEncoder.JSON_V2.encodeList(tenClientSpans);
}
static final byte[] chineseSpanJsonV2 = read("/zipkin2-chinese.json");
static final Span chineseSpan = SpanBytesDecoder.JSON_V2.decodeOne(chineseSpanJsonV2);
- static final zipkin2.proto3.Span chineseSpan_wire;
static final byte[] chineseSpanProto3 = SpanBytesEncoder.PROTO3.encode(chineseSpan);
+ static final byte[] chineseSpanJsonV1 = SpanBytesEncoder.JSON_V1.encode(chineseSpan);
+ static final byte[] chineseSpanThrift = SpanBytesEncoder.THRIFT.encode(chineseSpan);
- static {
- try {
- chineseSpan_wire = zipkin2.proto3.Span.ADAPTER.decode(chineseSpanProto3);
- } catch (IOException e) {
- throw new AssertionError(e);
- }
+ @Benchmark
+ public Span readChineseSpan_JSON_V1() {
+ return SpanBytesDecoder.JSON_V1.decodeOne(chineseSpanJsonV1);
}
@Benchmark
- public Span readChineseSpan_json() {
+ public Span readChineseSpan_JSON_V2() {
return SpanBytesDecoder.JSON_V2.decodeOne(chineseSpanJsonV2);
}
@Benchmark
- public Span readChineseSpan_proto3() {
+ public Span readChineseSpan_PROTO3() {
return SpanBytesDecoder.PROTO3.decodeOne(chineseSpanProto3);
}
@Benchmark
- public zipkin2.proto3.Span readChineseSpan_proto3_wire() throws Exception {
- return zipkin2.proto3.Span.ADAPTER.decode(chineseSpanProto3);
+ public Span readChineseSpan_THRIFT() {
+ return SpanBytesDecoder.THRIFT.decodeOne(chineseSpanThrift);
}
@Benchmark
- public byte[] writeChineseSpan_json() {
+ public byte[] writeChineseSpan_JSON_V2() {
return SpanBytesEncoder.JSON_V2.encode(chineseSpan);
}
@Benchmark
- public byte[] writeChineseSpan_proto3() {
+ public byte[] writeChineseSpan_JSON_V1() {
+ return SpanBytesEncoder.JSON_V1.encode(chineseSpan);
+ }
+
+ @Benchmark
+ public byte[] writeChineseSpan_PROTO3() {
return SpanBytesEncoder.PROTO3.encode(chineseSpan);
}
@Benchmark
- public byte[] writeChineseSpan_proto3_wire() {
- return chineseSpan_wire.encode();
+ public byte[] writeChineseSpan_THRIFT() {
+ return SpanBytesEncoder.THRIFT.encode(chineseSpan);
}
// Convenience main entry-point
public static void main(String[] args) throws RunnerException {
Options opt = new OptionsBuilder()
- .include(".*" + CodecBenchmarks.class.getSimpleName())
+ .include(".*" + CodecBenchmarks.class.getSimpleName() +".*read.*Span_.*")
.addProfiler("gc")
.build();
diff --git a/benchmarks/src/main/java/zipkin2/codec/ProtobufSpanDecoder.java b/benchmarks/src/main/java/zipkin2/codec/ProtobufSpanDecoder.java
index 0df0d03..e53b8f8 100644
--- a/benchmarks/src/main/java/zipkin2/codec/ProtobufSpanDecoder.java
+++ b/benchmarks/src/main/java/zipkin2/codec/ProtobufSpanDecoder.java
@@ -279,7 +279,7 @@ public class ProtobufSpanDecoder {
throw new AssertionError("hex field greater than 32 chars long: " + length);
}
- char[] result = Platform.get().idBuffer();
+ char[] result = Platform.shortStringBuffer();
for (int i = 0; i < length; i += 2) {
byte b = input.readRawByte();
diff --git a/benchmarks/src/main/java/zipkin2/codec/WireSpanDecoder.java b/benchmarks/src/main/java/zipkin2/codec/WireSpanDecoder.java
index d5b1f84..52a5f8f 100644
--- a/benchmarks/src/main/java/zipkin2/codec/WireSpanDecoder.java
+++ b/benchmarks/src/main/java/zipkin2/codec/WireSpanDecoder.java
@@ -294,7 +294,7 @@ public class WireSpanDecoder {
throw new AssertionError("hex field greater than 32 chars long: " + length);
}
- char[] result = Platform.get().idBuffer();
+ char[] result = Platform.shortStringBuffer();
for (int i = 0; i < bytes.size(); i ++) {
byte b = bytes.getByte(i);
diff --git a/zipkin-storage/cassandra-v1/src/main/java/zipkin2/storage/cassandra/v1/CassandraUtil.java b/zipkin-storage/cassandra-v1/src/main/java/zipkin2/storage/cassandra/v1/CassandraUtil.java
index d051f63..4510e71 100644
--- a/zipkin-storage/cassandra-v1/src/main/java/zipkin2/storage/cassandra/v1/CassandraUtil.java
+++ b/zipkin-storage/cassandra-v1/src/main/java/zipkin2/storage/cassandra/v1/CassandraUtil.java
@@ -37,9 +37,12 @@ import zipkin2.Annotation;
import zipkin2.Call;
import zipkin2.Span;
import zipkin2.internal.Nullable;
+import zipkin2.internal.Platform;
+import zipkin2.internal.UnsafeBuffer;
import zipkin2.storage.QueryRequest;
import static com.google.common.base.Preconditions.checkArgument;
+import static zipkin2.internal.Platform.SHORT_STRING_LENGTH;
final class CassandraUtil {
static final Charset UTF_8 = Charset.forName("UTF-8");
@@ -47,13 +50,6 @@ final class CassandraUtil {
static final List<String> CORE_ANNOTATIONS =
ImmutableList.of("cs", "cr", "ss", "sr", "ms", "mr", "ws", "wr");
- /**
- * Zipkin's {@link QueryRequest#annotationQuery()} are equals match. Not all tags are lookup keys.
- * For example, sql query isn't something that is likely to be looked up by value and indexing
- * that could add a potentially kilobyte partition key on {@link Tables#ANNOTATIONS_INDEX}
- */
- static final int LONGEST_VALUE_TO_INDEX = 256;
-
private static final ThreadLocal<CharsetEncoder> UTF8_ENCODER =
new ThreadLocal<CharsetEncoder>() {
@Override
@@ -73,6 +69,11 @@ final class CassandraUtil {
/**
* Returns keys that concatenate the serviceName associated with an annotation or tag.
*
+ * <p>Values over {@link Platform#SHORT_STRING_LENGTH} are not considered. Zipkin's {@link
+ * QueryRequest#annotationQuery()} are equals match. Not all values are lookup values. For
+ * example, {@code sql.query} isn't something that is likely to be looked up by value and indexing
+ * that could add a potentially kilobyte partition key on {@link Tables#ANNOTATIONS_INDEX}
+ *
* @see QueryRequest#annotationQuery()
*/
static Set<String> annotationKeys(Span span) {
@@ -80,15 +81,17 @@ final class CassandraUtil {
String localServiceName = span.localServiceName();
if (localServiceName == null) return Collections.emptySet();
for (Annotation a : span.annotations()) {
+ if (a.value().length() > SHORT_STRING_LENGTH) continue;
+
// don't index core annotations as they aren't queryable
if (CORE_ANNOTATIONS.contains(a.value())) continue;
annotationKeys.add(localServiceName + ":" + a.value());
}
for (Map.Entry<String, String> e : span.tags().entrySet()) {
- if (e.getValue().length() <= LONGEST_VALUE_TO_INDEX) {
- annotationKeys.add(localServiceName + ":" + e.getKey());
- annotationKeys.add(localServiceName + ":" + e.getKey() + ":" + e.getValue());
- }
+ if (e.getValue().length() > SHORT_STRING_LENGTH) continue;
+
+ annotationKeys.add(localServiceName + ":" + e.getKey());
+ annotationKeys.add(localServiceName + ":" + e.getKey() + ":" + e.getValue());
}
return annotationKeys;
}
diff --git a/zipkin-storage/cassandra/src/main/java/zipkin2/storage/cassandra/CassandraUtil.java b/zipkin-storage/cassandra/src/main/java/zipkin2/storage/cassandra/CassandraUtil.java
index af8b19c..71de159 100644
--- a/zipkin-storage/cassandra/src/main/java/zipkin2/storage/cassandra/CassandraUtil.java
+++ b/zipkin-storage/cassandra/src/main/java/zipkin2/storage/cassandra/CassandraUtil.java
@@ -33,23 +33,18 @@ import zipkin2.Call;
import zipkin2.Span;
import zipkin2.internal.DateUtil;
import zipkin2.internal.Nullable;
+import zipkin2.internal.Platform;
import zipkin2.storage.QueryRequest;
-final class CassandraUtil {
- /**
- * Zipkin's {@link QueryRequest#annotationQuery()} are equals match. Not all tag serviceSpanKeys
- * are lookup serviceSpanKeys. For example, {@code sql.query} isn't something that is likely to be
- * looked up by value and indexing that could add a potentially kilobyte partition key on {@link
- * Schema#TABLE_SPAN}
- */
- static final int LONGEST_VALUE_TO_INDEX = 256;
+import static zipkin2.internal.Platform.SHORT_STRING_LENGTH;
+final class CassandraUtil {
/**
* Time window covered by a single bucket of the {@link Schema#TABLE_TRACE_BY_SERVICE_SPAN} and
* {@link Schema#TABLE_TRACE_BY_SERVICE_REMOTE_SERVICE}, in seconds. Default: 1 day
*/
private static final long DURATION_INDEX_BUCKET_WINDOW_SECONDS =
- Long.getLong("zipkin.store.cassandra.internal.durationIndexBucket", 24 * 60 * 60);
+ Long.getLong("zipkin.store.cassandra.internal.durationIndexBucket", 24 * 60 * 60);
public static int durationIndexBucket(long ts_micro) {
// if the window constant has microsecond precision, the division produces negative getValues
@@ -59,6 +54,11 @@ final class CassandraUtil {
/**
* Returns a set of annotation getValues and tags joined on equals, delimited by ░
*
+ * <p>Values over {@link Platform#SHORT_STRING_LENGTH} are not considered. Zipkin's {@link
+ * QueryRequest#annotationQuery()} are equals match. Not all values are lookup values. For
+ * example, {@code sql.query} isn't something that is likely to be looked up by value and indexing
+ * that could add a potentially kilobyte partition key on {@link Schema#TABLE_SPAN}
+ *
* @see QueryRequest#annotationQuery()
*/
static @Nullable String annotationQuery(Span span) {
@@ -67,16 +67,16 @@ final class CassandraUtil {
char delimiter = '░'; // as very unlikely to be in the query
StringBuilder result = new StringBuilder().append(delimiter);
for (Annotation a : span.annotations()) {
- if (a.value().length() > LONGEST_VALUE_TO_INDEX) continue;
+ if (a.value().length() > SHORT_STRING_LENGTH) continue;
result.append(a.value()).append(delimiter);
}
for (Map.Entry<String, String> tag : span.tags().entrySet()) {
- if (tag.getValue().length() > LONGEST_VALUE_TO_INDEX) continue;
+ if (tag.getValue().length() > SHORT_STRING_LENGTH) continue;
result.append(tag.getKey()).append(delimiter); // search is possible by key alone
- result.append(tag.getKey() + "=" + tag.getValue()).append(delimiter);
+ result.append(tag.getKey()).append('=').append(tag.getValue()).append(delimiter);
}
return result.length() == 1 ? null : result.toString();
}
@@ -107,9 +107,9 @@ final class CassandraUtil {
SortedMap<BigInteger, String> sorted = new TreeMap<>(Collections.reverseOrder());
for (Map.Entry<String, Long> entry : map.entrySet()) {
BigInteger uncollided =
- BigInteger.valueOf(entry.getValue())
- .multiply(OFFSET)
- .add(BigInteger.valueOf(RAND.nextInt() & Integer.MAX_VALUE));
+ BigInteger.valueOf(entry.getValue())
+ .multiply(OFFSET)
+ .add(BigInteger.valueOf(RAND.nextInt() & Integer.MAX_VALUE));
sorted.put(uncollided, entry.getKey());
}
return new LinkedHashSet<>(sorted.values());
diff --git a/zipkin-storage/elasticsearch/src/main/java/zipkin2/elasticsearch/ElasticsearchSpanConsumer.java b/zipkin-storage/elasticsearch/src/main/java/zipkin2/elasticsearch/ElasticsearchSpanConsumer.java
index cf2db5d..4667327 100644
--- a/zipkin-storage/elasticsearch/src/main/java/zipkin2/elasticsearch/ElasticsearchSpanConsumer.java
+++ b/zipkin-storage/elasticsearch/src/main/java/zipkin2/elasticsearch/ElasticsearchSpanConsumer.java
@@ -31,7 +31,7 @@ import zipkin2.storage.SpanConsumer;
import static zipkin2.elasticsearch.ElasticsearchAutocompleteTags.AUTOCOMPLETE;
import static zipkin2.elasticsearch.ElasticsearchSpanStore.SPAN;
-import static zipkin2.elasticsearch.internal.BulkCallBuilder.INDEX_CHARS_LIMIT;
+import static zipkin2.internal.Platform.SHORT_STRING_LENGTH;
class ElasticsearchSpanConsumer implements SpanConsumer { // not final for testing
@@ -107,7 +107,7 @@ class ElasticsearchSpanConsumer implements SpanConsumer { // not final for testi
String idx = consumer.formatTypeAndTimestampForInsert(AUTOCOMPLETE, indexTimestamp);
for (Map.Entry<String, String> tag : span.tags().entrySet()) {
int length = tag.getKey().length() + tag.getValue().length() + 1;
- if (length > INDEX_CHARS_LIMIT) continue;
+ if (length > SHORT_STRING_LENGTH) continue;
// If the autocomplete whitelist doesn't contain the key, skip storing its value
if (!consumer.autocompleteKeys.contains(tag.getKey())) continue;
diff --git a/zipkin-storage/elasticsearch/src/main/java/zipkin2/elasticsearch/VersionSpecificTemplates.java b/zipkin-storage/elasticsearch/src/main/java/zipkin2/elasticsearch/VersionSpecificTemplates.java
index f9e6bc9..0e401dd 100644
--- a/zipkin-storage/elasticsearch/src/main/java/zipkin2/elasticsearch/VersionSpecificTemplates.java
+++ b/zipkin-storage/elasticsearch/src/main/java/zipkin2/elasticsearch/VersionSpecificTemplates.java
@@ -26,6 +26,7 @@ import static zipkin2.elasticsearch.ElasticsearchAutocompleteTags.AUTOCOMPLETE;
import static zipkin2.elasticsearch.ElasticsearchSpanStore.DEPENDENCY;
import static zipkin2.elasticsearch.ElasticsearchSpanStore.SPAN;
import static zipkin2.elasticsearch.internal.JsonReaders.enterPath;
+import static zipkin2.internal.Platform.SHORT_STRING_LENGTH;
/** Returns a version-specific span and dependency index template */
final class VersionSpecificTemplates {
@@ -109,7 +110,8 @@ final class VersionSpecificTemplates {
+ " {\n"
+ " \"strings\": {\n"
+ " \"mapping\": {\n"
- + " \"type\": \"keyword\",\"norms\": false, \"ignore_above\": 256\n"
+ + " \"type\": \"keyword\",\"norms\": false,"
+ + " \"ignore_above\": " + SHORT_STRING_LENGTH + "\n"
+ " },\n"
+ " \"match_mapping_type\": \"string\",\n"
+ " \"match\": \"*\"\n"
diff --git a/zipkin-storage/elasticsearch/src/main/java/zipkin2/elasticsearch/internal/BulkCallBuilder.java b/zipkin-storage/elasticsearch/src/main/java/zipkin2/elasticsearch/internal/BulkCallBuilder.java
index 2361907..6c6e03e 100644
--- a/zipkin-storage/elasticsearch/src/main/java/zipkin2/elasticsearch/internal/BulkCallBuilder.java
+++ b/zipkin-storage/elasticsearch/src/main/java/zipkin2/elasticsearch/internal/BulkCallBuilder.java
@@ -35,7 +35,6 @@ import zipkin2.elasticsearch.internal.client.HttpCall;
// See https://www.elastic.co/guide/en/elasticsearch/reference/current/docs-bulk.html
// exposed to re-use for testing writes of dependency links
public final class BulkCallBuilder {
- public static final int INDEX_CHARS_LIMIT = 256;
static final MediaType APPLICATION_JSON = MediaType.parse("application/json");
final String tag;
diff --git a/zipkin-storage/elasticsearch/src/main/java/zipkin2/elasticsearch/internal/BulkIndexWriter.java b/zipkin-storage/elasticsearch/src/main/java/zipkin2/elasticsearch/internal/BulkIndexWriter.java
index e9fd409..d037ef1 100644
--- a/zipkin-storage/elasticsearch/src/main/java/zipkin2/elasticsearch/internal/BulkIndexWriter.java
+++ b/zipkin-storage/elasticsearch/src/main/java/zipkin2/elasticsearch/internal/BulkIndexWriter.java
@@ -28,7 +28,7 @@ import zipkin2.Annotation;
import zipkin2.Endpoint;
import zipkin2.Span;
-import static zipkin2.elasticsearch.internal.BulkCallBuilder.INDEX_CHARS_LIMIT;
+import static zipkin2.internal.Platform.SHORT_STRING_LENGTH;
public abstract class BulkIndexWriter<T> {
@@ -163,12 +163,12 @@ public abstract class BulkIndexWriter<T> {
writer.name("_q");
writer.beginArray();
for (Annotation a : span.annotations()) {
- if (a.value().length() > INDEX_CHARS_LIMIT) continue;
+ if (a.value().length() > SHORT_STRING_LENGTH) continue;
writer.value(a.value());
}
for (Map.Entry<String, String> tag : span.tags().entrySet()) {
int length = tag.getKey().length() + tag.getValue().length() + 1;
- if (length > INDEX_CHARS_LIMIT) continue;
+ if (length > SHORT_STRING_LENGTH) continue;
writer.value(tag.getKey()); // search is possible by key alone
writer.value(tag.getKey() + "=" + tag.getValue());
}
diff --git a/zipkin/src/main/java/zipkin2/Endpoint.java b/zipkin/src/main/java/zipkin2/Endpoint.java
index be6a214..9684541 100644
--- a/zipkin/src/main/java/zipkin2/Endpoint.java
+++ b/zipkin/src/main/java/zipkin2/Endpoint.java
@@ -25,6 +25,7 @@ import java.net.InetAddress;
import java.nio.ByteBuffer;
import java.util.Locale;
import zipkin2.internal.Nullable;
+import zipkin2.internal.Platform;
import static zipkin2.internal.UnsafeBuffer.HEX_DIGITS;
@@ -198,7 +199,7 @@ public final class Endpoint implements Serializable { // for Spark and Flink job
}
static String writeIpV4(byte[] ipBytes) {
- char[] buf = ipBuffer();
+ char[] buf = Platform.shortStringBuffer();
int pos = 0;
pos = writeBackwards(ipBytes[0] & 0xff, pos, buf);
buf[pos++] = '.';
@@ -368,20 +369,9 @@ public final class Endpoint implements Serializable { // for Spark and Flink job
return (c < '0' || c > '9') && (c < 'a' || c > 'f') && (c < 'A' || c > 'F');
}
- static final ThreadLocal<char[]> IP_BUFFER = new ThreadLocal<>();
-
- static char[] ipBuffer() {
- char[] ipBuffer = IP_BUFFER.get();
- if (ipBuffer == null) {
- ipBuffer = new char[39]; // maximum length of encoded ipv6
- IP_BUFFER.set(ipBuffer);
- }
- return ipBuffer;
- }
-
static String writeIpV6(byte[] ipv6) {
int pos = 0;
- char[] buf = ipBuffer();
+ char[] buf = Platform.shortStringBuffer();
// Compress the longest string of zeros
int zeroCompressionIndex = -1;
diff --git a/zipkin/src/main/java/zipkin2/Span.java b/zipkin/src/main/java/zipkin2/Span.java
index 1f07e63..6ea685d 100644
--- a/zipkin/src/main/java/zipkin2/Span.java
+++ b/zipkin/src/main/java/zipkin2/Span.java
@@ -416,7 +416,7 @@ public final class Span implements Serializable { // for Spark and Flink jobs
*/
public Builder traceId(long high, long low) {
if (high == 0L && low == 0L) throw new IllegalArgumentException("empty trace ID");
- char[] data = Platform.get().idBuffer();
+ char[] data = Platform.shortStringBuffer();
int pos = 0;
if (high != 0L) {
writeHexLong(data, pos, high);
@@ -660,7 +660,7 @@ public final class Span implements Serializable { // for Spark and Flink jobs
}
static String padLeft(String id, int desiredLength) {
- char[] data = Platform.get().idBuffer();
+ char[] data = Platform.shortStringBuffer();
int i = 0, length = id.length(), offset = desiredLength - length;
for (; i < offset; i++) data[i] = '0';
for (int j = 0; j < length; j++) data[i++] = id.charAt(j);
@@ -668,7 +668,7 @@ public final class Span implements Serializable { // for Spark and Flink jobs
}
static String toLowerHex(long v) {
- char[] data = Platform.get().idBuffer();
+ char[] data = Platform.shortStringBuffer();
writeHexLong(data, 0, v);
return new String(data, 0, 16);
}
diff --git a/zipkin/src/main/java/zipkin2/internal/Platform.java b/zipkin/src/main/java/zipkin2/internal/Platform.java
index 6e10fb8..c64faa0 100644
--- a/zipkin/src/main/java/zipkin2/internal/Platform.java
+++ b/zipkin/src/main/java/zipkin2/internal/Platform.java
@@ -27,23 +27,25 @@ import org.jvnet.animal_sniffer.IgnoreJRERequirement;
public abstract class Platform {
private static final Platform PLATFORM = findPlatform();
- private static final ThreadLocal<char[]> ID_BUFFER = new ThreadLocal<>();
-
Platform() {
}
+ static final ThreadLocal<char[]> SHORT_STRING_BUFFER = new ThreadLocal<>();
+ /** Maximum character length constraint of most names, IP literals and IDs. */
+ public static final int SHORT_STRING_LENGTH = 256;
+
/**
- * Returns a {@link ThreadLocal} reused {@code char[]} for use when decoding bytes into a hex
- * string. The buffer should be immediately copied into a {@link String} after decoding within the
- * same method.
+ * Returns a {@link ThreadLocal} reused {@code char[]} for use when decoding bytes into hex, IP
+ * literals, or {@link #SHORT_STRING_LENGTH short strings}. The buffer must never be leaked
+ * outside the method. Most will {@link String#String(char[], int, int) copy it into a string}.
*/
- public char[] idBuffer() {
- char[] idBuffer = ID_BUFFER.get();
- if (idBuffer == null) {
- idBuffer = new char[32];
- ID_BUFFER.set(idBuffer);
+ public static char[] shortStringBuffer() {
+ char[] shortStringBuffer = SHORT_STRING_BUFFER.get();
+ if (shortStringBuffer == null) {
+ shortStringBuffer = new char[SHORT_STRING_LENGTH];
+ SHORT_STRING_BUFFER.set(shortStringBuffer);
}
- return idBuffer;
+ return shortStringBuffer;
}
public RuntimeException uncheckedIOException(IOException e) {
diff --git a/zipkin/src/main/java/zipkin2/internal/Proto3Codec.java b/zipkin/src/main/java/zipkin2/internal/Proto3Codec.java
index c5ae215..52d646e 100644
--- a/zipkin/src/main/java/zipkin2/internal/Proto3Codec.java
+++ b/zipkin/src/main/java/zipkin2/internal/Proto3Codec.java
@@ -52,13 +52,17 @@ public final class Proto3Codec {
if (span == null) return false;
out.add(span);
return true;
- } catch (Exception e) {
+ } catch (RuntimeException e) {
throw exceptionReading("Span", e);
}
}
public static @Nullable Span readOne(byte[] bytes) {
- return SPAN.read(UnsafeBuffer.wrap(bytes, 0));
+ try {
+ return SPAN.read(UnsafeBuffer.wrap(bytes, 0));
+ } catch (RuntimeException e) {
+ throw exceptionReading("Span", e);
+ }
}
public static boolean readList(byte[] bytes, Collection<Span> out) {
@@ -71,7 +75,7 @@ public final class Proto3Codec {
if (span == null) return false;
out.add(span);
}
- } catch (Exception e) {
+ } catch (RuntimeException e) {
throw exceptionReading("List<Span>", e);
}
return true;
diff --git a/zipkin/src/main/java/zipkin2/internal/Proto3Fields.java b/zipkin/src/main/java/zipkin2/internal/Proto3Fields.java
index 0761795..e580e46 100644
--- a/zipkin/src/main/java/zipkin2/internal/Proto3Fields.java
+++ b/zipkin/src/main/java/zipkin2/internal/Proto3Fields.java
@@ -124,13 +124,18 @@ final class Proto3Fields {
* is returned when the length prefix is zero.
*/
final T readLengthPrefixAndValue(UnsafeBuffer b) {
- int length = readLengthPrefix(b);
+ int length = guardLength(b);
if (length == 0) return null;
return readValue(b, length);
}
- final int readLengthPrefix(UnsafeBuffer b) {
- return b.readVarint32();
+ final int guardLength(UnsafeBuffer buffer) {
+ int length = buffer.readVarint32();
+ if (length > buffer.remaining()) {
+ throw new IllegalArgumentException(
+ "Truncated: length " + length + " > bytes remaining " + buffer.remaining());
+ }
+ return length;
}
abstract int sizeOfValue(T value);
diff --git a/zipkin/src/main/java/zipkin2/internal/Proto3ZipkinFields.java b/zipkin/src/main/java/zipkin2/internal/Proto3ZipkinFields.java
index 7bc63a9..36a0bb9 100644
--- a/zipkin/src/main/java/zipkin2/internal/Proto3ZipkinFields.java
+++ b/zipkin/src/main/java/zipkin2/internal/Proto3ZipkinFields.java
@@ -139,7 +139,7 @@ final class Proto3ZipkinFields {
}
@Override boolean readLengthPrefixAndValue(UnsafeBuffer b, Span.Builder builder) {
- int length = readLengthPrefix(b);
+ int length = guardLength(b);
if (length == 0) return false;
int endPos = b.pos() + length;
@@ -187,7 +187,7 @@ final class Proto3ZipkinFields {
}
@Override boolean readLengthPrefixAndValue(UnsafeBuffer b, Span.Builder builder) {
- int length = readLengthPrefix(b);
+ int length = guardLength(b);
if (length == 0) return false;
int endPos = b.pos() + length;
diff --git a/zipkin/src/main/java/zipkin2/internal/ThriftCodec.java b/zipkin/src/main/java/zipkin2/internal/ThriftCodec.java
index 2355b4d..258c5a5 100644
--- a/zipkin/src/main/java/zipkin2/internal/ThriftCodec.java
+++ b/zipkin/src/main/java/zipkin2/internal/ThriftCodec.java
@@ -127,8 +127,9 @@ public final class ThriftCodec {
static IllegalArgumentException exceptionReading(String type, Exception e) {
String cause = e.getMessage() == null ? "Error" : e.getMessage();
if (e instanceof EOFException) cause = "EOF";
- if (e instanceof IllegalStateException || e instanceof BufferUnderflowException)
+ if (e instanceof IllegalStateException || e instanceof BufferUnderflowException) {
cause = "Malformed";
+ }
String message = String.format("%s reading %s from TBinary", cause, type);
throw new IllegalArgumentException(message, e);
}
@@ -187,29 +188,59 @@ public final class ThriftCodec {
static void skip(ByteBuffer bytes, int count) {
// avoid java.lang.NoSuchMethodError: java.nio.ByteBuffer.position(I)Ljava/nio/ByteBuffer;
// bytes.position(bytes.position() + count);
- for (int i = 0; i< count && bytes.hasRemaining(); i++) {
+ for (int i = 0; i < count && bytes.hasRemaining(); i++) {
bytes.get();
}
}
static byte[] readByteArray(ByteBuffer bytes) {
- byte[] result = new byte[guardLength(bytes)];
- bytes.get(result);
- return result;
+ return readByteArray(bytes, guardLength(bytes));
+ }
+
+ static final String ONE = Character.toString((char) 1);
+
+ static byte[] readByteArray(ByteBuffer bytes, int length) {
+ byte[] copy = new byte[length];
+ if (!bytes.hasArray()) {
+ bytes.get(copy);
+ return copy;
+ }
+
+ byte[] original = bytes.array();
+ int offset = bytes.arrayOffset() + bytes.position();
+ System.arraycopy(original, offset, copy, 0, length);
+ bytes.position(bytes.position() + length);
+ return copy;
}
static String readUtf8(ByteBuffer bytes) {
- // TODO: optimize out the array copy here
- return new String(readByteArray(bytes), UTF_8);
+ int length = guardLength(bytes);
+ if (length == 0) return ""; // ex empty name
+ if (length == 1) {
+ byte single = bytes.get();
+ if (single == 1) return ONE; // special case for address annotations
+ return Character.toString((char) single);
+ }
+
+ if (!bytes.hasArray()) return new String(readByteArray(bytes, length), UTF_8);
+
+ int offset = bytes.arrayOffset() + bytes.position();
+ String result = UnsafeBuffer.wrap(bytes.array(), offset).readUtf8(length);
+ bytes.position(bytes.position() + length);
+ return result;
}
static int guardLength(ByteBuffer buffer) {
int length = buffer.getInt();
+ guardLength(buffer, length);
+ return length;
+ }
+
+ static void guardLength(ByteBuffer buffer, int length) {
if (length > buffer.remaining()) {
throw new IllegalArgumentException(
- "Truncated: length " + length + " > bytes remaining " + buffer.remaining());
+ "Truncated: length " + length + " > bytes remaining " + buffer.remaining());
}
- return length;
}
static void writeListBegin(UnsafeBuffer buffer, int size) {
diff --git a/zipkin/src/main/java/zipkin2/internal/ThriftEndpointCodec.java b/zipkin/src/main/java/zipkin2/internal/ThriftEndpointCodec.java
index 9c0eb93..fd646ca 100644
--- a/zipkin/src/main/java/zipkin2/internal/ThriftEndpointCodec.java
+++ b/zipkin/src/main/java/zipkin2/internal/ThriftEndpointCodec.java
@@ -19,6 +19,7 @@ package zipkin2.internal;
import java.nio.ByteBuffer;
import zipkin2.Endpoint;
+import static zipkin2.internal.ThriftCodec.guardLength;
import static zipkin2.internal.ThriftCodec.skip;
import static zipkin2.internal.ThriftField.TYPE_I16;
import static zipkin2.internal.ThriftField.TYPE_I32;
@@ -41,6 +42,7 @@ final class ThriftEndpointCodec {
if (thriftField.type == TYPE_STOP) break;
if (thriftField.isEqualTo(IPV4)) {
+ guardLength(bytes, 4);
int ipv4 = bytes.getInt();
if (ipv4 != 0) {
result.parseIp( // allocation is ok here as Endpoint.ipv4Bytes would anyway
@@ -52,6 +54,7 @@ final class ThriftEndpointCodec {
});
}
} else if (thriftField.isEqualTo(PORT)) {
+ guardLength(bytes, 2);
result.port(bytes.getShort() & 0xFFFF);
} else if (thriftField.isEqualTo(SERVICE_NAME)) {
result.serviceName(ThriftCodec.readUtf8(bytes));
diff --git a/zipkin/src/main/java/zipkin2/internal/UnsafeBuffer.java b/zipkin/src/main/java/zipkin2/internal/UnsafeBuffer.java
index 5089b35..2b1bc71 100644
--- a/zipkin/src/main/java/zipkin2/internal/UnsafeBuffer.java
+++ b/zipkin/src/main/java/zipkin2/internal/UnsafeBuffer.java
@@ -98,11 +98,25 @@ public final class UnsafeBuffer {
String readUtf8(int length) {
require(length);
- String result = new String(buf, pos, length, UTF_8);
+ String result = maybeDecodeShortAsciiString(buf, pos, length);
+ if (result == null) new String(buf, pos, length, UTF_8);
pos += length;
return result;
}
+ // Speculatively assume all 7-bit ASCII characters.. common in normal tags and names
+ static String maybeDecodeShortAsciiString(byte[] buf, int offset, int length) {
+ if (length == 0) return ""; // ex error tag with no value
+ if (length > Platform.SHORT_STRING_LENGTH) return null;
+ char[] buffer = Platform.shortStringBuffer();
+ for (int i = 0; i < length; i++) {
+ byte b = buf[offset + i];
+ if ((b & 0x80) != 0) return null; // Not 7-bit ASCII character
+ buffer[i] = (char) b;
+ }
+ return new String(buffer, 0, length);
+ }
+
String readBytesAsHex(int length) {
// All our hex fields are at most 32 characters.
if (length > 32) {
@@ -110,7 +124,7 @@ public final class UnsafeBuffer {
}
require(length);
- char[] result = Platform.get().idBuffer();
+ char[] result = Platform.get().shortStringBuffer();
int hexLength = length * 2;
for (int i = 0; i < hexLength; i += 2) {
diff --git a/zipkin/src/main/java/zipkin2/internal/V1ThriftSpanReader.java b/zipkin/src/main/java/zipkin2/internal/V1ThriftSpanReader.java
index ac5ee35..541e0cc 100644
--- a/zipkin/src/main/java/zipkin2/internal/V1ThriftSpanReader.java
+++ b/zipkin/src/main/java/zipkin2/internal/V1ThriftSpanReader.java
@@ -20,8 +20,8 @@ import java.nio.ByteBuffer;
import zipkin2.Endpoint;
import zipkin2.v1.V1Span;
-import static zipkin2.internal.ThriftCodec.UTF_8;
-import static zipkin2.internal.ThriftCodec.readByteArray;
+import static zipkin2.internal.ThriftCodec.ONE;
+import static zipkin2.internal.ThriftCodec.guardLength;
import static zipkin2.internal.ThriftCodec.readListLength;
import static zipkin2.internal.ThriftCodec.readUtf8;
import static zipkin2.internal.ThriftCodec.skip;
@@ -62,14 +62,18 @@ public final class V1ThriftSpanReader {
if (thriftField.type == TYPE_STOP) break;
if (thriftField.isEqualTo(TRACE_ID_HIGH)) {
+ guardLength(bytes, 8);
builder.traceIdHigh(bytes.getLong());
} else if (thriftField.isEqualTo(TRACE_ID)) {
+ guardLength(bytes, 8);
builder.traceId(bytes.getLong());
} else if (thriftField.isEqualTo(NAME)) {
builder.name(readUtf8(bytes));
} else if (thriftField.isEqualTo(ID)) {
+ guardLength(bytes, 8);
builder.id(bytes.getLong());
} else if (thriftField.isEqualTo(PARENT_ID)) {
+ guardLength(bytes, 8);
builder.parentId(bytes.getLong());
} else if (thriftField.isEqualTo(ANNOTATIONS)) {
int length = readListLength(bytes);
@@ -82,10 +86,13 @@ public final class V1ThriftSpanReader {
BinaryAnnotationReader.read(bytes, builder);
}
} else if (thriftField.isEqualTo(DEBUG)) {
+ guardLength(bytes, 1);
builder.debug(bytes.get() == 1);
} else if (thriftField.isEqualTo(TIMESTAMP)) {
+ guardLength(bytes, 8);
builder.timestamp(bytes.getLong());
} else if (thriftField.isEqualTo(DURATION)) {
+ guardLength(bytes, 8);
builder.duration(bytes.getLong());
} else {
skip(bytes, thriftField.type);
@@ -111,6 +118,7 @@ public final class V1ThriftSpanReader {
if (thriftField.type == TYPE_STOP) break;
if (thriftField.isEqualTo(TIMESTAMP)) {
+ guardLength(bytes, 8);
timestamp = bytes.getLong();
} else if (thriftField.isEqualTo(VALUE)) {
value = readUtf8(bytes);
@@ -134,7 +142,7 @@ public final class V1ThriftSpanReader {
static void read(ByteBuffer bytes, V1Span.Builder builder) {
String key = null;
- byte[] value = null;
+ String value = null;
Endpoint endpoint = null;
boolean isBoolean = false;
boolean isString = false;
@@ -145,8 +153,9 @@ public final class V1ThriftSpanReader {
if (thriftField.isEqualTo(KEY)) {
key = readUtf8(bytes);
} else if (thriftField.isEqualTo(VALUE)) {
- value = readByteArray(bytes);
+ value = readUtf8(bytes);
} else if (thriftField.isEqualTo(TYPE)) {
+ guardLength(bytes, 4);
switch (bytes.getInt()) {
case 0:
isBoolean = true;
@@ -163,8 +172,8 @@ public final class V1ThriftSpanReader {
}
if (key == null || value == null) return;
if (isString) {
- builder.addBinaryAnnotation(key, new String(value, UTF_8), endpoint);
- } else if (isBoolean && value.length == 1 && value[0] == 1 && endpoint != null) {
+ builder.addBinaryAnnotation(key, value, endpoint);
+ } else if (isBoolean && ONE.equals(value) && endpoint != null) {
if (key.equals("sa") || key.equals("ca") || key.equals("ma")) {
builder.addBinaryAnnotation(key, endpoint);
}
diff --git a/zipkin/src/test/java/zipkin2/codec/SpanBytesDecoderTest.java b/zipkin/src/test/java/zipkin2/codec/SpanBytesDecoderTest.java
index f2875ef..679c3e6 100644
--- a/zipkin/src/test/java/zipkin2/codec/SpanBytesDecoderTest.java
+++ b/zipkin/src/test/java/zipkin2/codec/SpanBytesDecoderTest.java
@@ -17,6 +17,7 @@
package zipkin2.codec;
import java.util.ArrayList;
+import java.util.Arrays;
import java.util.Collections;
import java.util.List;
import org.junit.Rule;
@@ -40,6 +41,22 @@ public class SpanBytesDecoderTest {
@Rule public ExpectedException thrown = ExpectedException.none();
+ @Test public void niceErrorOnTruncatedSpans_PROTO3() {
+ thrown.expect(IllegalArgumentException.class);
+ thrown.expectMessage("Truncated: length 66 > bytes remaining 8 reading List<Span> from proto3");
+
+ byte[] encoded = SpanBytesEncoder.PROTO3.encodeList(TRACE);
+ SpanBytesDecoder.PROTO3.decodeList(Arrays.copyOfRange(encoded, 0, 10));
+ }
+
+ @Test public void niceErrorOnTruncatedSpan_PROTO3() {
+ thrown.expect(IllegalArgumentException.class);
+ thrown.expectMessage("Truncated: length 179 > bytes remaining 7 reading Span from proto3");
+
+ byte[] encoded = SpanBytesEncoder.PROTO3.encode(SPAN);
+ SpanBytesDecoder.PROTO3.decodeOne(Arrays.copyOfRange(encoded, 0, 10));
+ }
+
@Test public void emptyListOk_JSON_V1() {
assertThat(SpanBytesDecoder.JSON_V1.decodeList(new byte[0]))
.isEmpty(); // instead of throwing an exception
@@ -59,15 +76,6 @@ public class SpanBytesDecoderTest {
.isEmpty(); // instead of throwing an exception
}
- @Test public void emptyListOk_THRIFT() {
- assertThat(SpanBytesDecoder.THRIFT.decodeList(new byte[0]))
- .isEmpty(); // instead of throwing an exception
-
- byte[] emptyListLiteral = {12 /* TYPE_STRUCT */, 0, 0, 0, 0 /* zero length */};
- assertThat(SpanBytesDecoder.THRIFT.decodeList(emptyListLiteral))
- .isEmpty(); // instead of throwing an exception
- }
-
@Test public void spanRoundTrip_JSON_V2() {
assertThat(SpanBytesDecoder.JSON_V2.decodeOne(SpanBytesEncoder.JSON_V2.encode(span)))
.isEqualTo(span);
@@ -164,7 +172,7 @@ public class SpanBytesDecoderTest {
@Test public void niceErrorOnMalformed_inputSpans_PROTO3() {
thrown.expect(IllegalArgumentException.class);
- thrown.expectMessage("Malformed reading List<Span> from proto3");
+ thrown.expectMessage("Truncated: length 101 > bytes remaining 3 reading List<Span> from proto3");
SpanBytesDecoder.PROTO3.decodeList(new byte[] {'h', 'e', 'l', 'l', 'o'});
}
diff --git a/zipkin/src/test/java/zipkin2/codec/V1SpanBytesDecoderTest.java b/zipkin/src/test/java/zipkin2/codec/V1SpanBytesDecoderTest.java
index 5408781..4d66efa 100644
--- a/zipkin/src/test/java/zipkin2/codec/V1SpanBytesDecoderTest.java
+++ b/zipkin/src/test/java/zipkin2/codec/V1SpanBytesDecoderTest.java
@@ -17,6 +17,7 @@
package zipkin2.codec;
import java.util.ArrayList;
+import java.util.Arrays;
import java.util.Collections;
import java.util.List;
import org.junit.Ignore;
@@ -25,7 +26,6 @@ import org.junit.Test;
import org.junit.rules.ExpectedException;
import zipkin2.Endpoint;
import zipkin2.Span;
-import zipkin2.TestObjects;
import static org.assertj.core.api.Assertions.assertThat;
import static zipkin2.TestObjects.BACKEND;
@@ -42,6 +42,31 @@ public class V1SpanBytesDecoderTest {
@Rule public ExpectedException thrown = ExpectedException.none();
+ @Test public void niceErrorOnTruncatedSpans_THRIFT() {
+ thrown.expect(IllegalArgumentException.class);
+ thrown.expectMessage("Truncated: length 8 > bytes remaining 2 reading List<Span> from TBinary");
+
+ byte[] encoded = SpanBytesEncoder.THRIFT.encodeList(TRACE);
+ SpanBytesDecoder.THRIFT.decodeList(Arrays.copyOfRange(encoded, 0, 10));
+ }
+
+ @Test public void niceErrorOnTruncatedSpan_THRIFT() {
+ thrown.expect(IllegalArgumentException.class);
+ thrown.expectMessage("Truncated: length 8 > bytes remaining 7 reading Span from TBinary");
+
+ byte[] encoded = SpanBytesEncoder.THRIFT.encode(SPAN);
+ SpanBytesDecoder.THRIFT.decodeOne(Arrays.copyOfRange(encoded, 0, 10));
+ }
+
+ @Test public void emptyListOk_THRIFT() {
+ assertThat(SpanBytesDecoder.THRIFT.decodeList(new byte[0]))
+ .isEmpty(); // instead of throwing an exception
+
+ byte[] emptyListLiteral = {12 /* TYPE_STRUCT */, 0, 0, 0, 0 /* zero length */};
+ assertThat(SpanBytesDecoder.THRIFT.decodeList(emptyListLiteral))
+ .isEmpty(); // instead of throwing an exception
+ }
+
@Test
public void spanRoundTrip_JSON_V1() {
assertThat(SpanBytesDecoder.JSON_V1.decodeOne(SpanBytesEncoder.JSON_V1.encode(span)))
diff --git a/zipkin/src/test/java/zipkin2/internal/Proto3FieldsTest.java b/zipkin/src/test/java/zipkin2/internal/Proto3FieldsTest.java
index 5e1cef9..388faad 100644
--- a/zipkin/src/test/java/zipkin2/internal/Proto3FieldsTest.java
+++ b/zipkin/src/test/java/zipkin2/internal/Proto3FieldsTest.java
@@ -179,7 +179,7 @@ public class Proto3FieldsTest {
buf.reset();
buf.skip(1); // skip the key
- assertThat(field.readLengthPrefix(buf))
+ assertThat(field.guardLength(buf))
.isEqualTo(10);
}