You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@zipkin.apache.org by ad...@apache.org on 2019/05/10 07:18:26 UTC

[incubator-zipkin] branch master updated: Implements deduplication in Elasticsearch (#2573)

This is an automated email from the ASF dual-hosted git repository.

adriancole pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-zipkin.git


The following commit(s) were added to refs/heads/master by this push:
     new 7e6c076  Implements deduplication in Elasticsearch (#2573)
7e6c076 is described below

commit 7e6c07650fbb7ca05764d48a820e148c87f2dd85
Author: Adrian Cole <ad...@users.noreply.github.com>
AuthorDate: Fri May 10 15:18:21 2019 +0800

    Implements deduplication in Elasticsearch (#2573)
    
    This sets span index ID to `${traceID}-${MD5(json)}` to allow for
    server-side deduplication and uses okio based libraries to be more
    efficient writing to Elasticsearch.
---
 benchmarks/pom.xml                                 |   7 +-
 .../internal/BulkRequestBenchmarks.java            |  88 ++++++++++
 .../elasticsearch/ElasticsearchSpanConsumer.java   | 151 ++++-------------
 .../{HttpBulkIndexer.java => BulkCallBuilder.java} |  94 +++++++----
 .../elasticsearch/internal/BulkIndexWriter.java    | 178 +++++++++++++++++++++
 .../ElasticsearchSpanConsumerTest.java             |  99 +-----------
 .../zipkin2/elasticsearch/InternalForTests.java    |  32 +++-
 .../integration/ITElasticsearchStorageV6.java      |   3 -
 .../integration/ITElasticsearchStorageV7.java      |   5 -
 ...lkIndexerTest.java => BulkCallBuilderTest.java} |   4 +-
 .../internal/BulkIndexWriterTest.java              | 121 ++++++++++++++
 11 files changed, 516 insertions(+), 266 deletions(-)

diff --git a/benchmarks/pom.xml b/benchmarks/pom.xml
index e2dfb5f..02b84e1 100644
--- a/benchmarks/pom.xml
+++ b/benchmarks/pom.xml
@@ -70,11 +70,16 @@
     </dependency>
 
     <dependency>
-      <groupId>org.apache.zipkin</groupId>
+      <groupId>${project.groupId}</groupId>
       <artifactId>zipkin-server</artifactId>
       <version>${project.version}</version>
     </dependency>
     <dependency>
+      <groupId>${project.groupId}.zipkin2</groupId>
+      <artifactId>zipkin-storage-elasticsearch</artifactId>
+      <version>${project.version}</version>
+    </dependency>
+    <dependency>
       <groupId>com.squareup.wire</groupId>
       <artifactId>wire-runtime</artifactId>
       <version>${wire.version}</version>
diff --git a/benchmarks/src/main/java/zipkin2/elasticsearch/internal/BulkRequestBenchmarks.java b/benchmarks/src/main/java/zipkin2/elasticsearch/internal/BulkRequestBenchmarks.java
new file mode 100644
index 0000000..210edac
--- /dev/null
+++ b/benchmarks/src/main/java/zipkin2/elasticsearch/internal/BulkRequestBenchmarks.java
@@ -0,0 +1,88 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package zipkin2.elasticsearch.internal;
+
+import com.google.common.io.ByteStreams;
+import java.io.IOException;
+import java.util.concurrent.TimeUnit;
+import okio.Okio;
+import org.openjdk.jmh.annotations.Benchmark;
+import org.openjdk.jmh.annotations.BenchmarkMode;
+import org.openjdk.jmh.annotations.Fork;
+import org.openjdk.jmh.annotations.Measurement;
+import org.openjdk.jmh.annotations.Mode;
+import org.openjdk.jmh.annotations.OutputTimeUnit;
+import org.openjdk.jmh.annotations.Scope;
+import org.openjdk.jmh.annotations.State;
+import org.openjdk.jmh.annotations.Threads;
+import org.openjdk.jmh.annotations.Warmup;
+import org.openjdk.jmh.runner.Runner;
+import org.openjdk.jmh.runner.RunnerException;
+import org.openjdk.jmh.runner.options.Options;
+import org.openjdk.jmh.runner.options.OptionsBuilder;
+import zipkin2.Span;
+import zipkin2.codec.CodecBenchmarks;
+import zipkin2.codec.SpanBytesDecoder;
+import zipkin2.elasticsearch.ElasticsearchStorage;
+
+@Measurement(iterations = 5, time = 1)
+@Warmup(iterations = 10, time = 1)
+@Fork(3)
+@BenchmarkMode(Mode.SampleTime)
+@OutputTimeUnit(TimeUnit.MICROSECONDS)
+@State(Scope.Thread)
+@Threads(2)
+public class BulkRequestBenchmarks {
+  static final Span CLIENT_SPAN = SpanBytesDecoder.JSON_V2.decodeOne(read("/zipkin2-client.json"));
+
+  final ElasticsearchStorage es = ElasticsearchStorage.newBuilder().build();
+  final BulkCallBuilder builder = new BulkCallBuilder(es, 6.7f, "index-span");
+
+  final long indexTimestamp = CLIENT_SPAN.timestampAsLong() / 1000L;
+  final String spanIndex =
+    es.indexNameFormatter().formatTypeAndTimestampForInsert("span", '-', indexTimestamp);
+
+  @Benchmark public void buildAndWriteRequest_singleSpan() throws IOException {
+    builder.index(spanIndex, "span", CLIENT_SPAN, BulkIndexWriter.SPAN);
+    builder.build().call.request().body().writeTo(Okio.buffer(Okio.blackhole()));
+  }
+
+  @Benchmark public void buildAndWriteRequest_tenSpans() throws IOException {
+    for (int i = 0; i < 10; i++) {
+      builder.index(spanIndex, "span", CLIENT_SPAN, BulkIndexWriter.SPAN);
+    }
+    builder.build().call.request().body().writeTo(Okio.buffer(Okio.blackhole()));
+  }
+
+  // Convenience main entry-point
+  public static void main(String[] args) throws RunnerException {
+    Options opt = new OptionsBuilder()
+      .addProfiler("gc")
+      .include(".*" + BulkRequestBenchmarks.class.getSimpleName() + ".*")
+      .build();
+
+    new Runner(opt).run();
+  }
+
+  static byte[] read(String resource) {
+    try {
+      return ByteStreams.toByteArray(CodecBenchmarks.class.getResourceAsStream(resource));
+    } catch (IOException e) {
+      throw new IllegalStateException(e);
+    }
+  }
+}
diff --git a/zipkin-storage/elasticsearch/src/main/java/zipkin2/elasticsearch/ElasticsearchSpanConsumer.java b/zipkin-storage/elasticsearch/src/main/java/zipkin2/elasticsearch/ElasticsearchSpanConsumer.java
index 1bbf27a..cf2db5d 100644
--- a/zipkin-storage/elasticsearch/src/main/java/zipkin2/elasticsearch/ElasticsearchSpanConsumer.java
+++ b/zipkin-storage/elasticsearch/src/main/java/zipkin2/elasticsearch/ElasticsearchSpanConsumer.java
@@ -16,37 +16,24 @@
  */
 package zipkin2.elasticsearch;
 
-import com.squareup.moshi.JsonWriter;
-import java.io.IOException;
 import java.util.ArrayList;
 import java.util.LinkedHashSet;
 import java.util.List;
 import java.util.Map;
 import java.util.Set;
-import java.util.concurrent.TimeUnit;
-import java.util.logging.Level;
-import java.util.logging.Logger;
-import okio.Buffer;
-import okio.ByteString;
-import zipkin2.Annotation;
 import zipkin2.Call;
 import zipkin2.Span;
-import zipkin2.codec.SpanBytesEncoder;
-import zipkin2.elasticsearch.internal.HttpBulkIndexer;
+import zipkin2.elasticsearch.internal.BulkCallBuilder;
+import zipkin2.elasticsearch.internal.BulkIndexWriter;
 import zipkin2.elasticsearch.internal.IndexNameFormatter;
 import zipkin2.internal.DelayLimiter;
-import zipkin2.internal.Nullable;
 import zipkin2.storage.SpanConsumer;
 
 import static zipkin2.elasticsearch.ElasticsearchAutocompleteTags.AUTOCOMPLETE;
 import static zipkin2.elasticsearch.ElasticsearchSpanStore.SPAN;
-import static zipkin2.internal.JsonEscaper.jsonEscape;
-import static zipkin2.internal.JsonEscaper.jsonEscapedSizeInBytes;
+import static zipkin2.elasticsearch.internal.BulkCallBuilder.INDEX_CHARS_LIMIT;
 
 class ElasticsearchSpanConsumer implements SpanConsumer { // not final for testing
-  static final Logger LOG = Logger.getLogger(ElasticsearchSpanConsumer.class.getName());
-  static final int INDEX_CHARS_LIMIT = 256;
-  static final ByteString EMPTY_JSON = ByteString.of(new byte[] {'{', '}'});
 
   final ElasticsearchStorage es;
   final Set<String> autocompleteKeys;
@@ -80,19 +67,17 @@ class ElasticsearchSpanConsumer implements SpanConsumer { // not final for testi
 
   void indexSpans(BulkSpanIndexer indexer, List<Span> spans) {
     for (Span span : spans) {
-      long spanTimestamp = span.timestampAsLong();
-      long indexTimestamp = 0L; // which index to store this span into
-      if (spanTimestamp != 0L) {
-        indexTimestamp = spanTimestamp = TimeUnit.MICROSECONDS.toMillis(spanTimestamp);
-      } else {
+      final long indexTimestamp; // which index to store this span into
+      if (span.timestampAsLong() != 0L) {
+        indexTimestamp = span.timestampAsLong() / 1000L;
+      } else if (!span.annotations().isEmpty()) {
         // guessTimestamp is made for determining the span's authoritative timestamp. When choosing
         // the index bucket, any annotation is better than using current time.
-        if (!span.annotations().isEmpty()) {
-          indexTimestamp = span.annotations().get(0).timestamp() / 1000;
-        }
-        if (indexTimestamp == 0L) indexTimestamp = System.currentTimeMillis();
+        indexTimestamp = span.annotations().get(0).timestamp() / 1000L;
+      } else {
+        indexTimestamp = System.currentTimeMillis();
       }
-      indexer.add(indexTimestamp, span, spanTimestamp);
+      indexer.add(indexTimestamp, span);
       if (searchEnabled && !span.tags().isEmpty()) {
         indexer.addAutocompleteValues(indexTimestamp, span);
       }
@@ -101,21 +86,21 @@ class ElasticsearchSpanConsumer implements SpanConsumer { // not final for testi
 
   /** Mutable type used for each call to store spans */
   static final class BulkSpanIndexer {
-    final HttpBulkIndexer indexer;
+    final BulkCallBuilder bulkCallBuilder;
     final ElasticsearchSpanConsumer consumer;
     final List<AutocompleteContext> pendingAutocompleteContexts = new ArrayList<>();
+    final BulkIndexWriter<Span> spanWriter;
 
     BulkSpanIndexer(ElasticsearchSpanConsumer consumer) {
-      this.indexer = new HttpBulkIndexer("index-span", consumer.es);
+      this.bulkCallBuilder = new BulkCallBuilder(consumer.es, consumer.es.version(), "index-span");
       this.consumer = consumer;
+      this.spanWriter =
+        consumer.searchEnabled ? BulkIndexWriter.SPAN : BulkIndexWriter.SPAN_SEARCH_DISABLED;
     }
 
-    void add(long indexTimestamp, Span span, long timestampMillis) {
+    void add(long indexTimestamp, Span span) {
       String index = consumer.formatTypeAndTimestampForInsert(SPAN, indexTimestamp);
-      byte[] document = consumer.searchEnabled
-        ? prefixWithTimestampMillisAndQuery(span, timestampMillis)
-        : SpanBytesEncoder.JSON_V2.encode(span);
-      indexer.add(index, SPAN, document, null /* Allow ES to choose an ID */);
+      bulkCallBuilder.index(index, SPAN, span, spanWriter);
     }
 
     void addAutocompleteValues(long indexTimestamp, Span span) {
@@ -127,29 +112,17 @@ class ElasticsearchSpanConsumer implements SpanConsumer { // not final for testi
         // If the autocomplete whitelist doesn't contain the key, skip storing its value
         if (!consumer.autocompleteKeys.contains(tag.getKey())) continue;
 
-        // Id is used to dedupe server side as necessary. Arbitrarily same format as _q value.
-        String id = tag.getKey() + "=" + tag.getValue();
-        AutocompleteContext context = new AutocompleteContext(indexTimestamp, id);
+        AutocompleteContext context =
+          new AutocompleteContext(indexTimestamp, tag.getKey(), tag.getValue());
         if (!consumer.delayLimiter.shouldInvoke(context)) continue;
         pendingAutocompleteContexts.add(context);
 
-        // encode using zipkin's internal buffer so we don't have to catch exceptions etc
-        int sizeInBytes = 27; // {"tagKey":"","tagValue":""}
-        sizeInBytes += jsonEscapedSizeInBytes(tag.getKey());
-        sizeInBytes += jsonEscapedSizeInBytes(tag.getValue());
-        zipkin2.internal.Buffer b = zipkin2.internal.Buffer.allocate(sizeInBytes);
-        b.writeAscii("{\"tagKey\":\"");
-        b.writeUtf8(jsonEscape(tag.getKey()));
-        b.writeAscii("\",\"tagValue\":\"");
-        b.writeUtf8(jsonEscape(tag.getValue()));
-        b.writeAscii("\"}");
-        byte[] document = b.toByteArray();
-        indexer.add(idx, AUTOCOMPLETE, document, id);
+        bulkCallBuilder.index(idx, AUTOCOMPLETE, tag, BulkIndexWriter.AUTOCOMPLETE);
       }
     }
 
     Call<Void> newCall() {
-      Call<Void> storeCall = indexer.newCall();
+      Call<Void> storeCall = bulkCallBuilder.build();
       if (pendingAutocompleteContexts.isEmpty()) return storeCall;
       return storeCall.handleError((error, callback) -> {
         for (AutocompleteContext context : pendingAutocompleteContexts) {
@@ -160,89 +133,31 @@ class ElasticsearchSpanConsumer implements SpanConsumer { // not final for testi
     }
   }
 
-  /**
-   * In order to allow systems like Kibana to search by timestamp, we add a field "timestamp_millis"
-   * when storing. The cheapest way to do this without changing the codec is prefixing it to the
-   * json. For example. {"traceId":"... becomes {"timestamp_millis":12345,"traceId":"...
-   *
-   * <p>Tags are stored as a dictionary. Since some tag names will include inconsistent number of
-   * dots (ex "error" and perhaps "error.message"), we cannot index them naturally with
-   * elasticsearch. Instead, we add an index-only (non-source) field of {@code _q} which includes
-   * valid search queries. For example, the tag {@code error -> 500} results in {@code
-   * "_q":["error", "error=500"]}. This matches the input query syntax, and can be checked manually
-   * with curl.
-   *
-   * <p>Ex {@code curl -s localhost:9200/zipkin:span-2017-08-11/_search?q=_q:error=500}
-   */
-  static byte[] prefixWithTimestampMillisAndQuery(Span span, long timestampMillis) {
-    Buffer prefix = new Buffer();
-    JsonWriter writer = JsonWriter.of(prefix);
-    try {
-      writer.beginObject();
-
-      if (timestampMillis != 0L) writer.name("timestamp_millis").value(timestampMillis);
-      if (!span.tags().isEmpty() || !span.annotations().isEmpty()) {
-        writer.name("_q");
-        writer.beginArray();
-        for (Annotation a : span.annotations()) {
-          if (a.value().length() > INDEX_CHARS_LIMIT) continue;
-          writer.value(a.value());
-        }
-        for (Map.Entry<String, String> tag : span.tags().entrySet()) {
-          int length = tag.getKey().length() + tag.getValue().length() + 1;
-          if (length > INDEX_CHARS_LIMIT) continue;
-          writer.value(tag.getKey()); // search is possible by key alone
-          writer.value(tag.getKey() + "=" + tag.getValue());
-        }
-        writer.endArray();
-      }
-      writer.endObject();
-    } catch (IOException e) {
-      // very unexpected to have an IOE for an in-memory write
-      assert false : "Error indexing query for span: " + span;
-      if (LOG.isLoggable(Level.FINE)) {
-        LOG.log(Level.FINE, "Error indexing query for span: " + span, e);
-      }
-      return SpanBytesEncoder.JSON_V2.encode(span);
-    }
-    byte[] document = SpanBytesEncoder.JSON_V2.encode(span);
-    if (prefix.rangeEquals(0L, EMPTY_JSON)) return document;
-    return mergeJson(prefix.readByteArray(), document);
-  }
-
-  static byte[] mergeJson(byte[] prefix, byte[] suffix) {
-    byte[] newSpanBytes = new byte[prefix.length + suffix.length - 1];
-    int pos = 0;
-    System.arraycopy(prefix, 0, newSpanBytes, pos, prefix.length);
-    pos += prefix.length;
-    newSpanBytes[pos - 1] = ',';
-    // starting at position 1 discards the old head of '{'
-    System.arraycopy(suffix, 1, newSpanBytes, pos, suffix.length - 1);
-    return newSpanBytes;
-  }
-
   static final class AutocompleteContext {
-    final long indexTimestamp;
-    final String autocompleteId;
+    final long timestamp;
+    final String key, value;
 
-    AutocompleteContext(long indexTimestamp, String autocompleteId) {
-      this.indexTimestamp = indexTimestamp;
-      this.autocompleteId = autocompleteId;
+    AutocompleteContext(long timestamp, String key, String value) {
+      this.timestamp = timestamp;
+      this.key = key;
+      this.value = value;
     }
 
     @Override public boolean equals(Object o) {
       if (o == this) return true;
       if (!(o instanceof AutocompleteContext)) return false;
       AutocompleteContext that = (AutocompleteContext) o;
-      return indexTimestamp == that.indexTimestamp && autocompleteId.equals(that.autocompleteId);
+      return timestamp == that.timestamp && key.equals(that.key) && value.equals(that.value);
     }
 
     @Override public int hashCode() {
       int h$ = 1;
       h$ *= 1000003;
-      h$ ^= (int) (h$ ^ ((indexTimestamp >>> 32) ^ indexTimestamp));
+      h$ ^= (int) (h$ ^ ((timestamp >>> 32) ^ timestamp));
+      h$ *= 1000003;
+      h$ ^= key.hashCode();
       h$ *= 1000003;
-      h$ ^= autocompleteId.hashCode();
+      h$ ^= value.hashCode();
       return h$;
     }
   }
diff --git a/zipkin-storage/elasticsearch/src/main/java/zipkin2/elasticsearch/internal/HttpBulkIndexer.java b/zipkin-storage/elasticsearch/src/main/java/zipkin2/elasticsearch/internal/BulkCallBuilder.java
similarity index 51%
rename from zipkin-storage/elasticsearch/src/main/java/zipkin2/elasticsearch/internal/HttpBulkIndexer.java
rename to zipkin-storage/elasticsearch/src/main/java/zipkin2/elasticsearch/internal/BulkCallBuilder.java
index 2eeffea..e5d7c6f 100644
--- a/zipkin-storage/elasticsearch/src/main/java/zipkin2/elasticsearch/internal/HttpBulkIndexer.java
+++ b/zipkin-storage/elasticsearch/src/main/java/zipkin2/elasticsearch/internal/BulkCallBuilder.java
@@ -16,6 +16,7 @@
  */
 package zipkin2.elasticsearch.internal;
 
+import com.squareup.moshi.JsonWriter;
 import java.io.IOException;
 import java.util.concurrent.RejectedExecutionException;
 import okhttp3.HttpUrl;
@@ -23,16 +24,15 @@ import okhttp3.MediaType;
 import okhttp3.Request;
 import okhttp3.RequestBody;
 import okio.Buffer;
+import okio.BufferedSink;
 import okio.BufferedSource;
 import zipkin2.elasticsearch.ElasticsearchStorage;
 import zipkin2.elasticsearch.internal.client.HttpCall;
-import zipkin2.internal.Nullable;
-
-import static zipkin2.internal.JsonEscaper.jsonEscape;
 
 // See https://www.elastic.co/guide/en/elasticsearch/reference/current/docs-bulk.html
 // exposed to re-use for testing writes of dependency links
-public final class HttpBulkIndexer {
+public final class BulkCallBuilder {
+  public static final int INDEX_CHARS_LIMIT = 256;
   static final MediaType APPLICATION_JSON = MediaType.parse("application/json");
 
   final String tag;
@@ -41,12 +41,12 @@ public final class HttpBulkIndexer {
   final String pipeline;
   final boolean waitForRefresh;
 
-  // Mutated for each call to add
-  final Buffer body = new Buffer();
+  // Mutated for each call to index
+  final Buffer buffer = new Buffer();
 
-  public HttpBulkIndexer(String tag, ElasticsearchStorage es) {
+  public BulkCallBuilder(ElasticsearchStorage es, float esVersion, String tag) {
     this.tag = tag;
-    shouldAddType = es.version() < 7.0f;
+    shouldAddType = esVersion < 7.0f;
     http = es.http();
     pipeline = es.pipeline();
     waitForRefresh = es.flushOnWrites();
@@ -55,52 +55,80 @@ public final class HttpBulkIndexer {
   enum CheckForErrors implements HttpCall.BodyConverter<Void> {
     INSTANCE;
 
-    @Override
-    public Void convert(BufferedSource b) throws IOException {
+    @Override public Void convert(BufferedSource b) throws IOException {
       String content = b.readUtf8();
       if (content.contains("\"status\":429")) throw new RejectedExecutionException(content);
       if (content.contains("\"errors\":true")) throw new IllegalStateException(content);
       return null;
     }
 
-    @Override
-    public String toString() {
+    @Override public String toString() {
       return "CheckForErrors";
     }
   }
 
-  public void add(String index, String typeName, byte[] document, @Nullable String id) {
-    writeIndexMetadata(index, typeName, id);
-    writeDocument(document);
+  public <T> void index(String index, String typeName, T input, BulkIndexWriter<T> writer) {
+    Buffer document = new Buffer();
+    String id = writer.writeDocument(input, document);
+    writeIndexMetadata(buffer, index, typeName, id);
+    buffer.writeByte('\n');
+    buffer.write(document, document.size());
+    buffer.writeByte('\n');
   }
 
-  void writeIndexMetadata(String index, String typeName, @Nullable String id) {
-    body.writeUtf8("{\"index\":{\"_index\":\"").writeUtf8(index).writeByte('"');
-    // the _type parameter is needed for Elasticsearch < 6.x
-    if (shouldAddType) body.writeUtf8(",\"_type\":\"").writeUtf8(typeName).writeByte('"');
-    if (id != null) {
-      body.writeUtf8(",\"_id\":\"").writeUtf8(jsonEscape(id).toString()).writeByte('"');
+  void writeIndexMetadata(Buffer indexBuffer, String index, String typeName, String id) {
+    JsonWriter jsonWriter = JsonWriter.of(indexBuffer);
+    try {
+      jsonWriter.beginObject();
+      jsonWriter.name("index");
+      jsonWriter.beginObject();
+      jsonWriter.name("_index").value(index);
+      // the _type parameter is needed for Elasticsearch < 6.x
+      if (shouldAddType) jsonWriter.name("_type").value(typeName);
+      jsonWriter.name("_id").value(id);
+      jsonWriter.endObject();
+      jsonWriter.endObject();
+    } catch (IOException e) {
+      throw new AssertionError(e); // No I/O writing to a Buffer.
     }
-    body.writeUtf8("}}\n");
-  }
-
-  void writeDocument(byte[] document) {
-    body.write(document);
-    body.writeByte('\n');
   }
 
   /** Creates a bulk request when there is more than one object to store */
-  public HttpCall<Void> newCall() {
+  public HttpCall<Void> build() {
     HttpUrl.Builder urlBuilder = http.baseUrl.newBuilder("_bulk");
     if (pipeline != null) urlBuilder.addQueryParameter("pipeline", pipeline);
     if (waitForRefresh) urlBuilder.addQueryParameter("refresh", "wait_for");
 
-    Request request = new Request.Builder()
-      .url(urlBuilder.build())
-      .tag(tag)
-      .post(RequestBody.create(APPLICATION_JSON, body.readByteString()))
-      .build();
+    RequestBody body = new BufferRequestBody(buffer);
 
+    Request request = new Request.Builder().url(urlBuilder.build()).tag(tag).post(body).build();
     return http.newCall(request, CheckForErrors.INSTANCE);
   }
+
+  /** This avoids allocating a large byte array (by using a poolable buffer instead). */
+  static final class BufferRequestBody extends RequestBody {
+    final long contentLength;
+    final Buffer buffer;
+
+    BufferRequestBody(Buffer buffer) {
+      this.contentLength = buffer.size();
+      this.buffer = buffer;
+    }
+
+    @Override public MediaType contentType() {
+      return APPLICATION_JSON;
+    }
+
+    @Override public long contentLength() {
+      return contentLength;
+    }
+
+    @Override public boolean isOneShot() {
+      return true;
+    }
+
+    @Override public void writeTo(BufferedSink sink) throws IOException {
+      sink.write(buffer, contentLength);
+    }
+  }
 }
diff --git a/zipkin-storage/elasticsearch/src/main/java/zipkin2/elasticsearch/internal/BulkIndexWriter.java b/zipkin-storage/elasticsearch/src/main/java/zipkin2/elasticsearch/internal/BulkIndexWriter.java
new file mode 100644
index 0000000..e9fd409
--- /dev/null
+++ b/zipkin-storage/elasticsearch/src/main/java/zipkin2/elasticsearch/internal/BulkIndexWriter.java
@@ -0,0 +1,178 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package zipkin2.elasticsearch.internal;
+
+import com.squareup.moshi.JsonWriter;
+import java.io.IOException;
+import java.util.Iterator;
+import java.util.Map;
+import okio.Buffer;
+import okio.BufferedSink;
+import okio.HashingSink;
+import okio.Okio;
+import zipkin2.Annotation;
+import zipkin2.Endpoint;
+import zipkin2.Span;
+
+import static zipkin2.elasticsearch.internal.BulkCallBuilder.INDEX_CHARS_LIMIT;
+
+public abstract class BulkIndexWriter<T> {
+
+  /**
+   * Write a complete json document according to index strategy and returns the ID field.
+   */
+  public abstract String writeDocument(T input, BufferedSink writer);
+
+  public static final BulkIndexWriter<Span> SPAN = new BulkIndexWriter<Span>() {
+    @Override public String writeDocument(Span input, BufferedSink sink) {
+      return write(input, true, sink);
+    }
+  };
+  public static final BulkIndexWriter<Span>
+    SPAN_SEARCH_DISABLED = new BulkIndexWriter<Span>() {
+    @Override public String writeDocument(Span input, BufferedSink sink) {
+      return write(input, false, sink);
+    }
+  };
+
+  public static final BulkIndexWriter<Map.Entry<String, String>> AUTOCOMPLETE =
+    new BulkIndexWriter<Map.Entry<String, String>>() {
+      @Override public String writeDocument(Map.Entry<String, String> input, BufferedSink sink) {
+        writeAutocompleteEntry(input.getKey(), input.getValue(), JsonWriter.of(sink));
+        // Id is used to dedupe server side as necessary. Arbitrarily same format as _q value.
+        return input.getKey() + "=" + input.getValue();
+      }
+    };
+
+  static final Endpoint EMPTY_ENDPOINT = Endpoint.newBuilder().build();
+
+  /**
+   * In order to allow systems like Kibana to search by timestamp, we add a field "timestamp_millis"
+   * when storing. The cheapest way to do this without changing the codec is prefixing it to the
+   * json. For example. {"traceId":"... becomes {"timestamp_millis":12345,"traceId":"...
+   *
+   * <p>Tags are stored as a dictionary. Since some tag names will include inconsistent number of
+   * dots (ex "error" and perhaps "error.message"), we cannot index them naturally with
+   * elasticsearch. Instead, we add an index-only (non-source) field of {@code _q} which includes
+   * valid search queries. For example, the tag {@code error -> 500} results in {@code
+   * "_q":["error", "error=500"]}. This matches the input query syntax, and can be checked manually
+   * with curl.
+   *
+   * <p>Ex {@code curl -s localhost:9200/zipkin:span-2017-08-11/_search?q=_q:error=500}
+   *
+   * @param searchEnabled encodes timestamp_millis and _q when non-empty
+   */
+  static String write(Span span, boolean searchEnabled, BufferedSink sink) {
+    HashingSink hashingSink = HashingSink.md5(sink);
+    JsonWriter writer = JsonWriter.of(Okio.buffer(hashingSink));
+    try {
+      writer.beginObject();
+      if (searchEnabled) addSearchFields(span, writer);
+      writer.name("traceId").value(span.traceId());
+      if (span.parentId() != null) writer.name("parentId").value(span.parentId());
+      writer.name("id").value(span.id());
+      if (span.kind() != null) writer.name("kind").value(span.kind().toString());
+      if (span.name() != null) writer.name("name").value(span.name());
+      if (span.timestampAsLong() != 0L) writer.name("timestamp").value(span.timestampAsLong());
+      if (span.durationAsLong() != 0L) writer.name("duration").value(span.durationAsLong());
+      if (span.localEndpoint() != null && !EMPTY_ENDPOINT.equals(span.localEndpoint())) {
+        writer.name("localEndpoint");
+        write(span.localEndpoint(), writer);
+      }
+      if (span.remoteEndpoint() != null && !EMPTY_ENDPOINT.equals(span.remoteEndpoint())) {
+        writer.name("remoteEndpoint");
+        write(span.remoteEndpoint(), writer);
+      }
+      if (!span.annotations().isEmpty()) {
+        writer.name("annotations");
+        writer.beginArray();
+        for (int i = 0, length = span.annotations().size(); i < length; ) {
+          write(span.annotations().get(i++), writer);
+        }
+        writer.endArray();
+      }
+      if (!span.tags().isEmpty()) {
+        writer.name("tags");
+        writer.beginObject();
+        Iterator<Map.Entry<String, String>> tags = span.tags().entrySet().iterator();
+        while (tags.hasNext()) write(tags.next(), writer);
+        writer.endObject();
+      }
+      if (Boolean.TRUE.equals(span.debug())) writer.name("debug").value(true);
+      if (Boolean.TRUE.equals(span.shared())) writer.name("shared").value(true);
+      writer.endObject();
+      writer.flush();
+      hashingSink.flush();
+    } catch (IOException e) {
+      throw new AssertionError(e); // No I/O writing to a Buffer.
+    }
+    return new Buffer()
+      .writeUtf8(span.traceId()).writeByte('-').writeUtf8(hashingSink.hash().hex())
+      .readUtf8();
+  }
+
+  static void writeAutocompleteEntry(String key, String value, JsonWriter writer) {
+    try {
+      writer.beginObject();
+      writer.name("tagKey").value(key);
+      writer.name("tagValue").value(value);
+      writer.endObject();
+    } catch (IOException e) {
+      throw new AssertionError(e); // No I/O writing to a Buffer.
+    }
+  }
+
+  static void write(Map.Entry<String, String> tag, JsonWriter writer) throws IOException {
+    writer.name(tag.getKey()).value(tag.getValue());
+  }
+
+  static void write(Annotation annotation, JsonWriter writer) throws IOException {
+    writer.beginObject();
+    writer.name("timestamp").value(annotation.timestamp());
+    writer.name("value").value(annotation.value());
+    writer.endObject();
+  }
+
+  static void write(Endpoint endpoint, JsonWriter writer) throws IOException {
+    writer.beginObject();
+    if (endpoint.serviceName() != null) writer.name("serviceName").value(endpoint.serviceName());
+    if (endpoint.ipv4() != null) writer.name("ipv4").value(endpoint.ipv4());
+    if (endpoint.ipv6() != null) writer.name("ipv6").value(endpoint.ipv6());
+    if (endpoint.portAsInt() != 0) writer.name("port").value(endpoint.portAsInt());
+    writer.endObject();
+  }
+
+  static void addSearchFields(Span span, JsonWriter writer) throws IOException {
+    long timestampMillis = span.timestampAsLong() / 1000L;
+    if (timestampMillis != 0L) writer.name("timestamp_millis").value(timestampMillis);
+    if (!span.tags().isEmpty() || !span.annotations().isEmpty()) {
+      writer.name("_q");
+      writer.beginArray();
+      for (Annotation a : span.annotations()) {
+        if (a.value().length() > INDEX_CHARS_LIMIT) continue;
+        writer.value(a.value());
+      }
+      for (Map.Entry<String, String> tag : span.tags().entrySet()) {
+        int length = tag.getKey().length() + tag.getValue().length() + 1;
+        if (length > INDEX_CHARS_LIMIT) continue;
+        writer.value(tag.getKey()); // search is possible by key alone
+        writer.value(tag.getKey() + "=" + tag.getValue());
+      }
+      writer.endArray();
+    }
+  }
+}
diff --git a/zipkin-storage/elasticsearch/src/test/java/zipkin2/elasticsearch/ElasticsearchSpanConsumerTest.java b/zipkin-storage/elasticsearch/src/test/java/zipkin2/elasticsearch/ElasticsearchSpanConsumerTest.java
index f74fae8..22284ca 100644
--- a/zipkin-storage/elasticsearch/src/test/java/zipkin2/elasticsearch/ElasticsearchSpanConsumerTest.java
+++ b/zipkin-storage/elasticsearch/src/test/java/zipkin2/elasticsearch/ElasticsearchSpanConsumerTest.java
@@ -31,7 +31,6 @@ import zipkin2.Endpoint;
 import zipkin2.Span;
 import zipkin2.Span.Kind;
 import zipkin2.TestObjects;
-import zipkin2.codec.SpanBytesDecoder;
 import zipkin2.codec.SpanBytesEncoder;
 import zipkin2.internal.Nullable;
 import zipkin2.storage.SpanConsumer;
@@ -40,7 +39,6 @@ import static java.util.Arrays.asList;
 import static org.assertj.core.api.Assertions.assertThat;
 import static org.assertj.core.api.Assertions.failBecauseExceptionWasNotThrown;
 import static zipkin2.TestObjects.TODAY;
-import static zipkin2.elasticsearch.ElasticsearchSpanConsumer.prefixWithTimestampMillisAndQuery;
 
 public class ElasticsearchSpanConsumerTest {
   static final Endpoint WEB_ENDPOINT = Endpoint.newBuilder().serviceName("web").build();
@@ -87,100 +85,7 @@ public class ElasticsearchSpanConsumerTest {
     accept(span);
 
     assertThat(es.takeRequest().getBody().readUtf8())
-      .contains("\n{\"timestamp_millis\":" + Long.toString(TODAY) + ",\"traceId\":");
-  }
-
-  @Test
-  public void prefixWithTimestampMillisAndQuery_skipsWhenNoData() throws Exception {
-    Span span =
-      Span.newBuilder()
-        .traceId("20")
-        .id("22")
-        .name("")
-        .parentId("21")
-        .timestamp(0L)
-        .localEndpoint(WEB_ENDPOINT)
-        .kind(Kind.CLIENT)
-        .build();
-
-    byte[] result = prefixWithTimestampMillisAndQuery(span, span.timestampAsLong());
-
-    assertThat(new String(result, "UTF-8")).startsWith("{\"traceId\":\"");
-  }
-
-  @Test
-  public void prefixWithTimestampMillisAndQuery_addsTimestampMillis() throws Exception {
-    Span span =
-      Span.newBuilder()
-        .traceId("20")
-        .id("22")
-        .name("")
-        .parentId("21")
-        .timestamp(1L)
-        .localEndpoint(WEB_ENDPOINT)
-        .kind(Kind.CLIENT)
-        .build();
-
-    byte[] result = prefixWithTimestampMillisAndQuery(span, span.timestampAsLong());
-
-    assertThat(new String(result, "UTF-8")).startsWith("{\"timestamp_millis\":1,\"traceId\":");
-  }
-
-  @Test
-  public void prefixWithTimestampMillisAndQuery_addsAnnotationQuery() throws Exception {
-    Span span =
-      Span.newBuilder()
-        .traceId("20")
-        .id("22")
-        .name("")
-        .parentId("21")
-        .localEndpoint(WEB_ENDPOINT)
-        .addAnnotation(1L, "\"foo")
-        .build();
-
-    byte[] result = prefixWithTimestampMillisAndQuery(span, span.timestampAsLong());
-
-    assertThat(new String(result, "UTF-8")).startsWith("{\"_q\":[\"\\\"foo\"],\"traceId");
-  }
-
-  @Test
-  public void prefixWithTimestampMillisAndQuery_addsAnnotationQueryTags() throws Exception {
-    Span span =
-      Span.newBuilder()
-        .traceId("20")
-        .id("22")
-        .name("")
-        .parentId("21")
-        .localEndpoint(WEB_ENDPOINT)
-        .putTag("\"foo", "\"bar")
-        .build();
-
-    byte[] result = prefixWithTimestampMillisAndQuery(span, span.timestampAsLong());
-
-    assertThat(new String(result, "UTF-8"))
-      .startsWith("{\"_q\":[\"\\\"foo\",\"\\\"foo=\\\"bar\"],\"traceId");
-  }
-
-  @Test
-  public void prefixWithTimestampMillisAndQuery_readable() {
-    Span span =
-      Span.newBuilder().traceId("20").id("20").name("get").timestamp(TODAY * 1000).build();
-
-    assertThat(
-      SpanBytesDecoder.JSON_V2.decodeOne(
-        prefixWithTimestampMillisAndQuery(span, span.timestamp())))
-      .isEqualTo(span); // ignores timestamp_millis field
-  }
-
-  @Test
-  public void doesntWriteDocumentId() throws Exception {
-    es.enqueue(new MockResponse());
-
-    accept(Span.newBuilder().traceId("1").id("1").name("foo").build());
-
-    RecordedRequest request = es.takeRequest();
-    assertThat(request.getBody().readByteString().utf8())
-      .doesNotContain("\"_type\":\"span\",\"_id\"");
+      .contains("\n{\"timestamp_millis\":" + TODAY + ",\"traceId\":");
   }
 
   @Test
@@ -310,7 +215,7 @@ public class ElasticsearchSpanConsumerTest {
 
     // index timestamp is the server timestamp, not current time!
     assertThat(es.takeRequest().getBody().readByteString().utf8())
-      .contains("{\"index\":{\"_index\":\"zipkin:span-1971-01-01\",\"_type\":\"span\"}}");
+      .startsWith("{\"index\":{\"_index\":\"zipkin:span-1971-01-01\",\"_type\":\"span\"");
   }
 
   /** Much simpler template which doesn't write the timestamp_millis field */
diff --git a/zipkin-storage/elasticsearch/src/test/java/zipkin2/elasticsearch/InternalForTests.java b/zipkin-storage/elasticsearch/src/test/java/zipkin2/elasticsearch/InternalForTests.java
index c38b8eb..b221fa0 100644
--- a/zipkin-storage/elasticsearch/src/test/java/zipkin2/elasticsearch/InternalForTests.java
+++ b/zipkin-storage/elasticsearch/src/test/java/zipkin2/elasticsearch/InternalForTests.java
@@ -16,12 +16,14 @@
  */
 package zipkin2.elasticsearch;
 
+import com.squareup.moshi.JsonWriter;
 import java.io.IOException;
 import java.io.UncheckedIOException;
 import java.util.List;
+import okio.BufferedSink;
 import zipkin2.DependencyLink;
-import zipkin2.codec.DependencyLinkBytesEncoder;
-import zipkin2.elasticsearch.internal.HttpBulkIndexer;
+import zipkin2.elasticsearch.internal.BulkIndexWriter;
+import zipkin2.elasticsearch.internal.BulkCallBuilder;
 
 /** Package accessor for integration tests */
 public class InternalForTests {
@@ -29,16 +31,32 @@ public class InternalForTests {
     long midnightUTC) {
     String index = ((ElasticsearchSpanConsumer) es.spanConsumer())
       .formatTypeAndTimestampForInsert("dependency", midnightUTC);
-    HttpBulkIndexer indexer = new HttpBulkIndexer("indexlinks", es);
+    BulkCallBuilder indexer = new BulkCallBuilder(es, es.version(), "indexlinks");
     for (DependencyLink link : links) {
-      byte[] document = DependencyLinkBytesEncoder.JSON_V1.encode(link);
-      indexer.add(index, "dependency", document,
-        link.parent() + "|" + link.child()); // Unique constraint
+      indexer.index(index, "dependency", link, DEPENDENCY_LINK_BULK_INDEX_SUPPORT);
     }
     try {
-      indexer.newCall().execute();
+      indexer.build().execute();
     } catch (IOException e) {
       throw new UncheckedIOException(e);
     }
   }
+
+  static final BulkIndexWriter<DependencyLink> DEPENDENCY_LINK_BULK_INDEX_SUPPORT =
+    new BulkIndexWriter<DependencyLink>() {
+      @Override public String writeDocument(DependencyLink link, BufferedSink sink) {
+        JsonWriter writer = JsonWriter.of(sink);
+        try {
+          writer.beginObject();
+          writer.name("parent").value(link.parent());
+          writer.name("child").value(link.child());
+          writer.name("callCount").value(link.callCount());
+          if (link.errorCount() > 0) writer.name("errorCount").value(link.errorCount());
+          writer.endObject();
+        } catch (IOException e) {
+          throw new AssertionError(e); // No I/O writing to a Buffer.
+        }
+        return link.parent() + "|" + link.child();
+      }
+    };
 }
diff --git a/zipkin-storage/elasticsearch/src/test/java/zipkin2/elasticsearch/integration/ITElasticsearchStorageV6.java b/zipkin-storage/elasticsearch/src/test/java/zipkin2/elasticsearch/integration/ITElasticsearchStorageV6.java
index 03731d5..a50ec5c 100644
--- a/zipkin-storage/elasticsearch/src/test/java/zipkin2/elasticsearch/integration/ITElasticsearchStorageV6.java
+++ b/zipkin-storage/elasticsearch/src/test/java/zipkin2/elasticsearch/integration/ITElasticsearchStorageV6.java
@@ -55,9 +55,6 @@ public class ITElasticsearchStorageV6 {
       return storage;
     }
 
-    @Override @Test @Ignore("No consumer-side span deduplication") public void deduplicates() {
-    }
-
     @Before @Override public void clear() throws IOException {
       storage.clear();
     }
diff --git a/zipkin-storage/elasticsearch/src/test/java/zipkin2/elasticsearch/integration/ITElasticsearchStorageV7.java b/zipkin-storage/elasticsearch/src/test/java/zipkin2/elasticsearch/integration/ITElasticsearchStorageV7.java
index c6a4367..fe0cd9c 100644
--- a/zipkin-storage/elasticsearch/src/test/java/zipkin2/elasticsearch/integration/ITElasticsearchStorageV7.java
+++ b/zipkin-storage/elasticsearch/src/test/java/zipkin2/elasticsearch/integration/ITElasticsearchStorageV7.java
@@ -20,9 +20,7 @@ import java.io.IOException;
 import java.util.List;
 import org.junit.Before;
 import org.junit.ClassRule;
-import org.junit.Ignore;
 import org.junit.Rule;
-import org.junit.Test;
 import org.junit.experimental.runners.Enclosed;
 import org.junit.rules.TestName;
 import org.junit.runner.RunWith;
@@ -55,9 +53,6 @@ public class ITElasticsearchStorageV7 {
       return storage;
     }
 
-    @Override @Test @Ignore("No consumer-side span deduplication") public void deduplicates() {
-    }
-
     @Before @Override public void clear() throws IOException {
       storage.clear();
     }
diff --git a/zipkin-storage/elasticsearch/src/test/java/zipkin2/elasticsearch/internal/HttpBulkIndexerTest.java b/zipkin-storage/elasticsearch/src/test/java/zipkin2/elasticsearch/internal/BulkCallBuilderTest.java
similarity index 95%
rename from zipkin-storage/elasticsearch/src/test/java/zipkin2/elasticsearch/internal/HttpBulkIndexerTest.java
rename to zipkin-storage/elasticsearch/src/test/java/zipkin2/elasticsearch/internal/BulkCallBuilderTest.java
index 0f8589d..36e3ecd 100644
--- a/zipkin-storage/elasticsearch/src/test/java/zipkin2/elasticsearch/internal/HttpBulkIndexerTest.java
+++ b/zipkin-storage/elasticsearch/src/test/java/zipkin2/elasticsearch/internal/BulkCallBuilderTest.java
@@ -23,9 +23,9 @@ import okio.ByteString;
 import org.junit.Rule;
 import org.junit.Test;
 import org.junit.rules.ExpectedException;
-import zipkin2.elasticsearch.internal.HttpBulkIndexer.CheckForErrors;
+import zipkin2.elasticsearch.internal.BulkCallBuilder.CheckForErrors;
 
-public class HttpBulkIndexerTest {
+public class BulkCallBuilderTest {
   @Rule public ExpectedException expectedException = ExpectedException.none();
 
   @Test
diff --git a/zipkin-storage/elasticsearch/src/test/java/zipkin2/elasticsearch/internal/BulkIndexWriterTest.java b/zipkin-storage/elasticsearch/src/test/java/zipkin2/elasticsearch/internal/BulkIndexWriterTest.java
new file mode 100644
index 0000000..7c7857d
--- /dev/null
+++ b/zipkin-storage/elasticsearch/src/test/java/zipkin2/elasticsearch/internal/BulkIndexWriterTest.java
@@ -0,0 +1,121 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package zipkin2.elasticsearch.internal;
+
+import okio.Buffer;
+import org.junit.Test;
+import zipkin2.Span;
+import zipkin2.Span.Kind;
+import zipkin2.codec.SpanBytesDecoder;
+
+import static org.assertj.core.api.Assertions.assertThat;
+import static zipkin2.TestObjects.CLIENT_SPAN;
+import static zipkin2.TestObjects.FRONTEND;
+import static zipkin2.TestObjects.TODAY;
+
+public class BulkIndexWriterTest {
+  Buffer buffer = new Buffer();
+
+  @Test public void span_addsDocumentId() {
+    String id = BulkIndexWriter.SPAN.writeDocument(CLIENT_SPAN, buffer);
+    assertThat(id)
+      .isEqualTo(CLIENT_SPAN.traceId() + "-" + buffer.readByteString().md5().hex());
+  }
+
+  @Test public void spanSearchDisabled_addsDocumentId() {
+    String id = BulkIndexWriter.SPAN_SEARCH_DISABLED.writeDocument(CLIENT_SPAN, buffer);
+    assertThat(id)
+      .isEqualTo(CLIENT_SPAN.traceId() + "-" + buffer.readByteString().md5().hex());
+  }
+
+  @Test public void spanSearchFields_skipsWhenNoData() {
+    Span span = Span.newBuilder()
+      .traceId("20")
+      .id("22")
+      .parentId("21")
+      .timestamp(0L)
+      .localEndpoint(FRONTEND)
+      .kind(Kind.CLIENT)
+      .build();
+
+    BulkIndexWriter.SPAN.writeDocument(span, buffer);
+
+    assertThat(buffer.readUtf8()).startsWith("{\"traceId\":\"");
+  }
+
+  @Test public void spanSearchFields_addsTimestampFieldWhenNoTags() {
+    Span span =
+      Span.newBuilder()
+        .traceId("20")
+        .id("22")
+        .name("")
+        .parentId("21")
+        .timestamp(1000L)
+        .localEndpoint(FRONTEND)
+        .kind(Kind.CLIENT)
+        .build();
+
+    BulkIndexWriter.SPAN.writeDocument(span, buffer);
+
+    assertThat(buffer.readUtf8()).startsWith("{\"timestamp_millis\":1,\"traceId\":");
+  }
+
+  @Test public void spanSearchFields_addsQueryFieldForAnnotations() {
+    Span span = Span.newBuilder()
+      .traceId("20")
+      .id("22")
+      .name("")
+      .parentId("21")
+      .localEndpoint(FRONTEND)
+      .addAnnotation(1L, "\"foo")
+      .build();
+
+    BulkIndexWriter.SPAN.writeDocument(span, buffer);
+
+    assertThat(buffer.readUtf8()).startsWith("{\"_q\":[\"\\\"foo\"],\"traceId");
+  }
+
+  @Test public void spanSearchFields_addsQueryFieldForTags() {
+    Span span = Span.newBuilder()
+      .traceId("20")
+      .id("22")
+      .parentId("21")
+      .localEndpoint(FRONTEND)
+      .putTag("\"foo", "\"bar")
+      .build();
+
+    BulkIndexWriter.SPAN.writeDocument(span, buffer);
+
+    assertThat(buffer.readUtf8()).startsWith("{\"_q\":[\"\\\"foo\",\"\\\"foo=\\\"bar\"],\"traceId");
+  }
+
+  @Test public void spanSearchFields_readableByNormalJsonCodec() {
+    Span span =
+      Span.newBuilder().traceId("20").id("20").name("get").timestamp(TODAY * 1000).build();
+
+    BulkIndexWriter.SPAN.writeDocument(span, buffer);
+
+    assertThat(SpanBytesDecoder.JSON_V2.decodeOne(buffer.readByteArray()))
+      .isEqualTo(span); // ignores timestamp_millis field
+  }
+
+  @Test public void spanSearchDisabled_doesntAddQueryFields() {
+    BulkIndexWriter.SPAN_SEARCH_DISABLED.writeDocument(CLIENT_SPAN, buffer);
+
+    assertThat(buffer.readUtf8()).startsWith("{\"traceId\":\"");
+  }
+}