You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@zipkin.apache.org by ad...@apache.org on 2019/05/09 03:09:11 UTC

[incubator-zipkin] 01/01: Uses okio/moshi natively in Elasticsearch everywhere

This is an automated email from the ASF dual-hosted git repository.

adriancole pushed a commit to branch use-okio-buffer
in repository https://gitbox.apache.org/repos/asf/incubator-zipkin.git

commit 4139224ad7c45d5941183b4436b2fcf3a25730bf
Author: Adrian Cole <ac...@pivotal.io>
AuthorDate: Thu May 9 11:06:29 2019 +0800

    Uses okio/moshi natively in Elasticsearch everywhere
    
    Before, we used a combination of our buffer and Okio's Buffer, and a
    combination of our json encoder and Moshi's. This changes all the code
    in Elasticsearch to use Okio buffers which reduces the amount of array
    allocations and allows us to do cheap content hashes in the future.
    
    The content hashes will be used to obviate duplicate document indexing,
    for example, when reporters backoff and retry with the same span. This
    will help simplify analysis.
---
 .../elasticsearch/ElasticsearchSpanConsumer.java   | 142 +++-------------
 .../elasticsearch/internal/BulkIndexSupport.java   | 187 +++++++++++++++++++++
 .../elasticsearch/internal/HttpBulkIndexer.java    |  40 +++--
 .../ElasticsearchSpanConsumerTest.java             |  97 +----------
 .../zipkin2/elasticsearch/InternalForTests.java    |  31 +++-
 .../internal/BulkIndexSupportTest.java             | 119 +++++++++++++
 6 files changed, 384 insertions(+), 232 deletions(-)

diff --git a/zipkin-storage/elasticsearch/src/main/java/zipkin2/elasticsearch/ElasticsearchSpanConsumer.java b/zipkin-storage/elasticsearch/src/main/java/zipkin2/elasticsearch/ElasticsearchSpanConsumer.java
index 1bbf27a..3553fe4 100644
--- a/zipkin-storage/elasticsearch/src/main/java/zipkin2/elasticsearch/ElasticsearchSpanConsumer.java
+++ b/zipkin-storage/elasticsearch/src/main/java/zipkin2/elasticsearch/ElasticsearchSpanConsumer.java
@@ -16,37 +16,24 @@
  */
 package zipkin2.elasticsearch;
 
-import com.squareup.moshi.JsonWriter;
-import java.io.IOException;
 import java.util.ArrayList;
 import java.util.LinkedHashSet;
 import java.util.List;
 import java.util.Map;
 import java.util.Set;
-import java.util.concurrent.TimeUnit;
-import java.util.logging.Level;
-import java.util.logging.Logger;
-import okio.Buffer;
-import okio.ByteString;
-import zipkin2.Annotation;
 import zipkin2.Call;
 import zipkin2.Span;
-import zipkin2.codec.SpanBytesEncoder;
+import zipkin2.elasticsearch.internal.BulkIndexSupport;
 import zipkin2.elasticsearch.internal.HttpBulkIndexer;
 import zipkin2.elasticsearch.internal.IndexNameFormatter;
 import zipkin2.internal.DelayLimiter;
-import zipkin2.internal.Nullable;
 import zipkin2.storage.SpanConsumer;
 
 import static zipkin2.elasticsearch.ElasticsearchAutocompleteTags.AUTOCOMPLETE;
 import static zipkin2.elasticsearch.ElasticsearchSpanStore.SPAN;
-import static zipkin2.internal.JsonEscaper.jsonEscape;
-import static zipkin2.internal.JsonEscaper.jsonEscapedSizeInBytes;
+import static zipkin2.elasticsearch.internal.HttpBulkIndexer.INDEX_CHARS_LIMIT;
 
 class ElasticsearchSpanConsumer implements SpanConsumer { // not final for testing
-  static final Logger LOG = Logger.getLogger(ElasticsearchSpanConsumer.class.getName());
-  static final int INDEX_CHARS_LIMIT = 256;
-  static final ByteString EMPTY_JSON = ByteString.of(new byte[] {'{', '}'});
 
   final ElasticsearchStorage es;
   final Set<String> autocompleteKeys;
@@ -80,19 +67,17 @@ class ElasticsearchSpanConsumer implements SpanConsumer { // not final for testi
 
   void indexSpans(BulkSpanIndexer indexer, List<Span> spans) {
     for (Span span : spans) {
-      long spanTimestamp = span.timestampAsLong();
-      long indexTimestamp = 0L; // which index to store this span into
-      if (spanTimestamp != 0L) {
-        indexTimestamp = spanTimestamp = TimeUnit.MICROSECONDS.toMillis(spanTimestamp);
-      } else {
+      final long indexTimestamp; // which index to store this span into
+      if (span.timestampAsLong() != 0L) {
+        indexTimestamp = span.timestampAsLong() / 1000L;
+      } else if (!span.annotations().isEmpty()) {
         // guessTimestamp is made for determining the span's authoritative timestamp. When choosing
         // the index bucket, any annotation is better than using current time.
-        if (!span.annotations().isEmpty()) {
-          indexTimestamp = span.annotations().get(0).timestamp() / 1000;
-        }
-        if (indexTimestamp == 0L) indexTimestamp = System.currentTimeMillis();
+        indexTimestamp = span.annotations().get(0).timestamp() / 1000L;
+      } else {
+        indexTimestamp = System.currentTimeMillis();
       }
-      indexer.add(indexTimestamp, span, spanTimestamp);
+      indexer.add(indexTimestamp, span);
       if (searchEnabled && !span.tags().isEmpty()) {
         indexer.addAutocompleteValues(indexTimestamp, span);
       }
@@ -104,18 +89,17 @@ class ElasticsearchSpanConsumer implements SpanConsumer { // not final for testi
     final HttpBulkIndexer indexer;
     final ElasticsearchSpanConsumer consumer;
     final List<AutocompleteContext> pendingAutocompleteContexts = new ArrayList<>();
+    final BulkIndexSupport<Span> spanIndexSupport;
 
     BulkSpanIndexer(ElasticsearchSpanConsumer consumer) {
       this.indexer = new HttpBulkIndexer("index-span", consumer.es);
       this.consumer = consumer;
+      this.spanIndexSupport = consumer.searchEnabled ? BulkIndexSupport.SPAN : BulkIndexSupport.SPAN_SEARCH_DISABLED;
     }
 
-    void add(long indexTimestamp, Span span, long timestampMillis) {
+    void add(long indexTimestamp, Span span) {
       String index = consumer.formatTypeAndTimestampForInsert(SPAN, indexTimestamp);
-      byte[] document = consumer.searchEnabled
-        ? prefixWithTimestampMillisAndQuery(span, timestampMillis)
-        : SpanBytesEncoder.JSON_V2.encode(span);
-      indexer.add(index, SPAN, document, null /* Allow ES to choose an ID */);
+      indexer.add(index, SPAN, span, spanIndexSupport);
     }
 
     void addAutocompleteValues(long indexTimestamp, Span span) {
@@ -127,24 +111,12 @@ class ElasticsearchSpanConsumer implements SpanConsumer { // not final for testi
         // If the autocomplete whitelist doesn't contain the key, skip storing its value
         if (!consumer.autocompleteKeys.contains(tag.getKey())) continue;
 
-        // Id is used to dedupe server side as necessary. Arbitrarily same format as _q value.
-        String id = tag.getKey() + "=" + tag.getValue();
-        AutocompleteContext context = new AutocompleteContext(indexTimestamp, id);
+        AutocompleteContext context =
+          new AutocompleteContext(indexTimestamp, tag.getKey(), tag.getValue());
         if (!consumer.delayLimiter.shouldInvoke(context)) continue;
         pendingAutocompleteContexts.add(context);
 
-        // encode using zipkin's internal buffer so we don't have to catch exceptions etc
-        int sizeInBytes = 27; // {"tagKey":"","tagValue":""}
-        sizeInBytes += jsonEscapedSizeInBytes(tag.getKey());
-        sizeInBytes += jsonEscapedSizeInBytes(tag.getValue());
-        zipkin2.internal.Buffer b = zipkin2.internal.Buffer.allocate(sizeInBytes);
-        b.writeAscii("{\"tagKey\":\"");
-        b.writeUtf8(jsonEscape(tag.getKey()));
-        b.writeAscii("\",\"tagValue\":\"");
-        b.writeUtf8(jsonEscape(tag.getValue()));
-        b.writeAscii("\"}");
-        byte[] document = b.toByteArray();
-        indexer.add(idx, AUTOCOMPLETE, document, id);
+        indexer.add(idx, AUTOCOMPLETE, tag, BulkIndexSupport.AUTOCOMPLETE);
       }
     }
 
@@ -160,89 +132,31 @@ class ElasticsearchSpanConsumer implements SpanConsumer { // not final for testi
     }
   }
 
-  /**
-   * In order to allow systems like Kibana to search by timestamp, we add a field "timestamp_millis"
-   * when storing. The cheapest way to do this without changing the codec is prefixing it to the
-   * json. For example. {"traceId":"... becomes {"timestamp_millis":12345,"traceId":"...
-   *
-   * <p>Tags are stored as a dictionary. Since some tag names will include inconsistent number of
-   * dots (ex "error" and perhaps "error.message"), we cannot index them naturally with
-   * elasticsearch. Instead, we add an index-only (non-source) field of {@code _q} which includes
-   * valid search queries. For example, the tag {@code error -> 500} results in {@code
-   * "_q":["error", "error=500"]}. This matches the input query syntax, and can be checked manually
-   * with curl.
-   *
-   * <p>Ex {@code curl -s localhost:9200/zipkin:span-2017-08-11/_search?q=_q:error=500}
-   */
-  static byte[] prefixWithTimestampMillisAndQuery(Span span, long timestampMillis) {
-    Buffer prefix = new Buffer();
-    JsonWriter writer = JsonWriter.of(prefix);
-    try {
-      writer.beginObject();
-
-      if (timestampMillis != 0L) writer.name("timestamp_millis").value(timestampMillis);
-      if (!span.tags().isEmpty() || !span.annotations().isEmpty()) {
-        writer.name("_q");
-        writer.beginArray();
-        for (Annotation a : span.annotations()) {
-          if (a.value().length() > INDEX_CHARS_LIMIT) continue;
-          writer.value(a.value());
-        }
-        for (Map.Entry<String, String> tag : span.tags().entrySet()) {
-          int length = tag.getKey().length() + tag.getValue().length() + 1;
-          if (length > INDEX_CHARS_LIMIT) continue;
-          writer.value(tag.getKey()); // search is possible by key alone
-          writer.value(tag.getKey() + "=" + tag.getValue());
-        }
-        writer.endArray();
-      }
-      writer.endObject();
-    } catch (IOException e) {
-      // very unexpected to have an IOE for an in-memory write
-      assert false : "Error indexing query for span: " + span;
-      if (LOG.isLoggable(Level.FINE)) {
-        LOG.log(Level.FINE, "Error indexing query for span: " + span, e);
-      }
-      return SpanBytesEncoder.JSON_V2.encode(span);
-    }
-    byte[] document = SpanBytesEncoder.JSON_V2.encode(span);
-    if (prefix.rangeEquals(0L, EMPTY_JSON)) return document;
-    return mergeJson(prefix.readByteArray(), document);
-  }
-
-  static byte[] mergeJson(byte[] prefix, byte[] suffix) {
-    byte[] newSpanBytes = new byte[prefix.length + suffix.length - 1];
-    int pos = 0;
-    System.arraycopy(prefix, 0, newSpanBytes, pos, prefix.length);
-    pos += prefix.length;
-    newSpanBytes[pos - 1] = ',';
-    // starting at position 1 discards the old head of '{'
-    System.arraycopy(suffix, 1, newSpanBytes, pos, suffix.length - 1);
-    return newSpanBytes;
-  }
-
   static final class AutocompleteContext {
-    final long indexTimestamp;
-    final String autocompleteId;
+    final long timestamp;
+    final String key, value;
 
-    AutocompleteContext(long indexTimestamp, String autocompleteId) {
-      this.indexTimestamp = indexTimestamp;
-      this.autocompleteId = autocompleteId;
+    AutocompleteContext(long timestamp, String key, String value) {
+      this.timestamp = timestamp;
+      this.key = key;
+      this.value = value;
     }
 
     @Override public boolean equals(Object o) {
       if (o == this) return true;
       if (!(o instanceof AutocompleteContext)) return false;
       AutocompleteContext that = (AutocompleteContext) o;
-      return indexTimestamp == that.indexTimestamp && autocompleteId.equals(that.autocompleteId);
+      return timestamp == that.timestamp && key.equals(that.key) && value.equals(that.value);
     }
 
     @Override public int hashCode() {
       int h$ = 1;
       h$ *= 1000003;
-      h$ ^= (int) (h$ ^ ((indexTimestamp >>> 32) ^ indexTimestamp));
+      h$ ^= (int) (h$ ^ ((timestamp >>> 32) ^ timestamp));
+      h$ *= 1000003;
+      h$ ^= key.hashCode();
       h$ *= 1000003;
-      h$ ^= autocompleteId.hashCode();
+      h$ ^= value.hashCode();
       return h$;
     }
   }
diff --git a/zipkin-storage/elasticsearch/src/main/java/zipkin2/elasticsearch/internal/BulkIndexSupport.java b/zipkin-storage/elasticsearch/src/main/java/zipkin2/elasticsearch/internal/BulkIndexSupport.java
new file mode 100644
index 0000000..8a9eb99
--- /dev/null
+++ b/zipkin-storage/elasticsearch/src/main/java/zipkin2/elasticsearch/internal/BulkIndexSupport.java
@@ -0,0 +1,187 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package zipkin2.elasticsearch.internal;
+
+import com.squareup.moshi.JsonWriter;
+import java.io.IOException;
+import java.util.Iterator;
+import java.util.Map;
+import okio.Buffer;
+import zipkin2.Annotation;
+import zipkin2.Endpoint;
+import zipkin2.Span;
+
+import static zipkin2.elasticsearch.internal.HttpBulkIndexer.INDEX_CHARS_LIMIT;
+
+public abstract class BulkIndexSupport<T> {
+
+  /** Write a complete json document according to index strategy */
+  public abstract void writeDocument(T input, JsonWriter writer);
+
+  /** Call {@code writer.name("_id").value(id)} unless you want Elasticsearch to pick one */
+  public abstract void writeIdField(T input, JsonWriter writer);
+
+  public static final BulkIndexSupport<Span> SPAN = new BulkIndexSupport<Span>() {
+    @Override public void writeDocument(Span input, JsonWriter writer) {
+      write(input, true, writer);
+    }
+
+    @Override public void writeIdField(Span input, JsonWriter writer) {
+      // Allow ES to choose an ID
+    }
+  };
+  public static final BulkIndexSupport<Span> SPAN_SEARCH_DISABLED = new BulkIndexSupport<Span>() {
+    @Override public void writeDocument(Span input, JsonWriter writer) {
+      write(input, false, writer);
+    }
+
+    @Override public void writeIdField(Span input, JsonWriter writer) {
+      // Allow ES to choose an ID
+    }
+  };
+
+  public static final BulkIndexSupport<Map.Entry<String, String>> AUTOCOMPLETE =
+    new BulkIndexSupport<Map.Entry<String, String>>() {
+      @Override public void writeDocument(Map.Entry<String, String> input, JsonWriter writer) {
+        writeAutocompleteEntry(input.getKey(), input.getValue(), writer);
+      }
+
+      @Override public void writeIdField(Map.Entry<String, String> input, JsonWriter writer) {
+        Buffer id = new Buffer();
+        // Id is used to dedupe server side as necessary. Arbitrarily same format as _q value.
+        id.writeUtf8(input.getKey()).writeByte('=').writeUtf8(input.getValue());
+        try {
+          // This uses text from users directly, so may imply json escaping. As such we have to
+          // allocate a string in order to use Moshi's json escaping functionality.
+          writer.name("_id").value(id.readUtf8());
+        } catch (IOException e) {
+          throw new AssertionError(e); // No I/O writing to a Buffer.
+        }
+      }
+    };
+
+  static final Endpoint EMPTY_ENDPOINT = Endpoint.newBuilder().build();
+
+  /**
+   * In order to allow systems like Kibana to search by timestamp, we add a field "timestamp_millis"
+   * when storing. The cheapest way to do this without changing the codec is prefixing it to the
+   * json. For example. {"traceId":"... becomes {"timestamp_millis":12345,"traceId":"...
+   *
+   * <p>Tags are stored as a dictionary. Since some tag names will include inconsistent number of
+   * dots (ex "error" and perhaps "error.message"), we cannot index them naturally with
+   * elasticsearch. Instead, we add an index-only (non-source) field of {@code _q} which includes
+   * valid search queries. For example, the tag {@code error -> 500} results in {@code
+   * "_q":["error", "error=500"]}. This matches the input query syntax, and can be checked manually
+   * with curl.
+   *
+   * <p>Ex {@code curl -s localhost:9200/zipkin:span-2017-08-11/_search?q=_q:error=500}
+   *
+   * @param searchEnabled encodes timestamp_millis and _q when non-empty
+   */
+  static void write(Span span, boolean searchEnabled, JsonWriter writer) {
+    try {
+      writer.beginObject();
+      if (searchEnabled) addSearchFields(span, writer);
+      writer.name("traceId").value(span.traceId());
+      if (span.parentId() != null) writer.name("parentId").value(span.parentId());
+      writer.name("id").value(span.id());
+      if (span.kind() != null) writer.name("kind").value(span.kind().toString());
+      if (span.name() != null) writer.name("name").value(span.name());
+      if (span.timestampAsLong() != 0L) writer.name("timestamp").value(span.timestampAsLong());
+      if (span.durationAsLong() != 0L) writer.name("duration").value(span.durationAsLong());
+      if (span.localEndpoint() != null && !EMPTY_ENDPOINT.equals(span.localEndpoint())) {
+        writer.name("localEndpoint");
+        write(span.localEndpoint(), writer);
+      }
+      if (span.remoteEndpoint() != null && !EMPTY_ENDPOINT.equals(span.remoteEndpoint())) {
+        writer.name("remoteEndpoint");
+        write(span.remoteEndpoint(), writer);
+      }
+      if (!span.annotations().isEmpty()) {
+        writer.name("annotations");
+        writer.beginArray();
+        for (int i = 0, length = span.annotations().size(); i < length; ) {
+          write(span.annotations().get(i++), writer);
+        }
+        writer.endArray();
+      }
+      if (!span.tags().isEmpty()) {
+        writer.name("tags");
+        writer.beginObject();
+        Iterator<Map.Entry<String, String>> tags = span.tags().entrySet().iterator();
+        while (tags.hasNext()) write(tags.next(), writer);
+        writer.endObject();
+      }
+      if (Boolean.TRUE.equals(span.debug())) writer.name("debug").value(true);
+      if (Boolean.TRUE.equals(span.shared())) writer.name("shared").value(true);
+      writer.endObject();
+    } catch (IOException e) {
+      throw new AssertionError(e); // No I/O writing to a Buffer.
+    }
+  }
+
+  static void writeAutocompleteEntry(String key, String value, JsonWriter writer) {
+    try {
+      writer.beginObject();
+      writer.name("tagKey").value(key);
+      writer.name("tagValue").value(value);
+      writer.endObject();
+    } catch (IOException e) {
+      throw new AssertionError(e); // No I/O writing to a Buffer.
+    }
+  }
+
+  static void write(Map.Entry<String, String> tag, JsonWriter writer) throws IOException {
+    writer.name(tag.getKey()).value(tag.getValue());
+  }
+
+  static void write(Annotation annotation, JsonWriter writer) throws IOException {
+    writer.beginObject();
+    writer.name("timestamp").value(annotation.timestamp());
+    writer.name("value").value(annotation.value());
+    writer.endObject();
+  }
+
+  static void write(Endpoint endpoint, JsonWriter writer) throws IOException {
+    writer.beginObject();
+    if (endpoint.serviceName() != null) writer.name("serviceName").value(endpoint.serviceName());
+    if (endpoint.ipv4() != null) writer.name("ipv4").value(endpoint.ipv4());
+    if (endpoint.ipv6() != null) writer.name("ipv6").value(endpoint.ipv6());
+    if (endpoint.portAsInt() != 0) writer.name("port").value(endpoint.portAsInt());
+    writer.endObject();
+  }
+
+  static void addSearchFields(Span span, JsonWriter writer) throws IOException {
+    long timestampMillis = span.timestampAsLong() / 1000L;
+    if (timestampMillis != 0L) writer.name("timestamp_millis").value(timestampMillis);
+    if (!span.tags().isEmpty() || !span.annotations().isEmpty()) {
+      writer.name("_q");
+      writer.beginArray();
+      for (Annotation a : span.annotations()) {
+        if (a.value().length() > INDEX_CHARS_LIMIT) continue;
+        writer.value(a.value());
+      }
+      for (Map.Entry<String, String> tag : span.tags().entrySet()) {
+        int length = tag.getKey().length() + tag.getValue().length() + 1;
+        if (length > INDEX_CHARS_LIMIT) continue;
+        writer.value(tag.getKey()); // search is possible by key alone
+        writer.value(tag.getKey() + "=" + tag.getValue());
+      }
+      writer.endArray();
+    }
+  }
+}
diff --git a/zipkin-storage/elasticsearch/src/main/java/zipkin2/elasticsearch/internal/HttpBulkIndexer.java b/zipkin-storage/elasticsearch/src/main/java/zipkin2/elasticsearch/internal/HttpBulkIndexer.java
index 2eeffea..fd21b6f 100644
--- a/zipkin-storage/elasticsearch/src/main/java/zipkin2/elasticsearch/internal/HttpBulkIndexer.java
+++ b/zipkin-storage/elasticsearch/src/main/java/zipkin2/elasticsearch/internal/HttpBulkIndexer.java
@@ -16,6 +16,7 @@
  */
 package zipkin2.elasticsearch.internal;
 
+import com.squareup.moshi.JsonWriter;
 import java.io.IOException;
 import java.util.concurrent.RejectedExecutionException;
 import okhttp3.HttpUrl;
@@ -26,13 +27,11 @@ import okio.Buffer;
 import okio.BufferedSource;
 import zipkin2.elasticsearch.ElasticsearchStorage;
 import zipkin2.elasticsearch.internal.client.HttpCall;
-import zipkin2.internal.Nullable;
-
-import static zipkin2.internal.JsonEscaper.jsonEscape;
 
 // See https://www.elastic.co/guide/en/elasticsearch/reference/current/docs-bulk.html
 // exposed to re-use for testing writes of dependency links
 public final class HttpBulkIndexer {
+  public static final int INDEX_CHARS_LIMIT = 256;
   static final MediaType APPLICATION_JSON = MediaType.parse("application/json");
 
   final String tag;
@@ -69,24 +68,29 @@ public final class HttpBulkIndexer {
     }
   }
 
-  public void add(String index, String typeName, byte[] document, @Nullable String id) {
-    writeIndexMetadata(index, typeName, id);
-    writeDocument(document);
+  public <T> void add(String index, String typeName, T input, BulkIndexSupport<T> indexSupport) {
+    writeIndexMetadata(index, typeName, input, indexSupport);
+    body.writeByte('\n');
+    indexSupport.writeDocument(input, com.squareup.moshi.JsonWriter.of(body));
+    body.writeByte('\n');
   }
 
-  void writeIndexMetadata(String index, String typeName, @Nullable String id) {
-    body.writeUtf8("{\"index\":{\"_index\":\"").writeUtf8(index).writeByte('"');
-    // the _type parameter is needed for Elasticsearch < 6.x
-    if (shouldAddType) body.writeUtf8(",\"_type\":\"").writeUtf8(typeName).writeByte('"');
-    if (id != null) {
-      body.writeUtf8(",\"_id\":\"").writeUtf8(jsonEscape(id).toString()).writeByte('"');
+  <T> void writeIndexMetadata(String index, String typeName, T input,
+    BulkIndexSupport<T> indexSupport) {
+    JsonWriter jsonWriter = JsonWriter.of(body);
+    try {
+      jsonWriter.beginObject();
+      jsonWriter.name("index");
+      jsonWriter.beginObject();
+      jsonWriter.name("_index").value(index);
+      // the _type parameter is needed for Elasticsearch < 6.x
+      if (shouldAddType) jsonWriter.name("_type").value(typeName);
+      indexSupport.writeIdField(input, jsonWriter);
+      jsonWriter.endObject();
+      jsonWriter.endObject();
+    } catch (IOException e) {
+      throw new AssertionError(e); // No I/O writing to a Buffer.
     }
-    body.writeUtf8("}}\n");
-  }
-
-  void writeDocument(byte[] document) {
-    body.write(document);
-    body.writeByte('\n');
   }
 
   /** Creates a bulk request when there is more than one object to store */
diff --git a/zipkin-storage/elasticsearch/src/test/java/zipkin2/elasticsearch/ElasticsearchSpanConsumerTest.java b/zipkin-storage/elasticsearch/src/test/java/zipkin2/elasticsearch/ElasticsearchSpanConsumerTest.java
index f74fae8..eaff7e3 100644
--- a/zipkin-storage/elasticsearch/src/test/java/zipkin2/elasticsearch/ElasticsearchSpanConsumerTest.java
+++ b/zipkin-storage/elasticsearch/src/test/java/zipkin2/elasticsearch/ElasticsearchSpanConsumerTest.java
@@ -31,7 +31,6 @@ import zipkin2.Endpoint;
 import zipkin2.Span;
 import zipkin2.Span.Kind;
 import zipkin2.TestObjects;
-import zipkin2.codec.SpanBytesDecoder;
 import zipkin2.codec.SpanBytesEncoder;
 import zipkin2.internal.Nullable;
 import zipkin2.storage.SpanConsumer;
@@ -40,7 +39,6 @@ import static java.util.Arrays.asList;
 import static org.assertj.core.api.Assertions.assertThat;
 import static org.assertj.core.api.Assertions.failBecauseExceptionWasNotThrown;
 import static zipkin2.TestObjects.TODAY;
-import static zipkin2.elasticsearch.ElasticsearchSpanConsumer.prefixWithTimestampMillisAndQuery;
 
 public class ElasticsearchSpanConsumerTest {
   static final Endpoint WEB_ENDPOINT = Endpoint.newBuilder().serviceName("web").build();
@@ -87,100 +85,7 @@ public class ElasticsearchSpanConsumerTest {
     accept(span);
 
     assertThat(es.takeRequest().getBody().readUtf8())
-      .contains("\n{\"timestamp_millis\":" + Long.toString(TODAY) + ",\"traceId\":");
-  }
-
-  @Test
-  public void prefixWithTimestampMillisAndQuery_skipsWhenNoData() throws Exception {
-    Span span =
-      Span.newBuilder()
-        .traceId("20")
-        .id("22")
-        .name("")
-        .parentId("21")
-        .timestamp(0L)
-        .localEndpoint(WEB_ENDPOINT)
-        .kind(Kind.CLIENT)
-        .build();
-
-    byte[] result = prefixWithTimestampMillisAndQuery(span, span.timestampAsLong());
-
-    assertThat(new String(result, "UTF-8")).startsWith("{\"traceId\":\"");
-  }
-
-  @Test
-  public void prefixWithTimestampMillisAndQuery_addsTimestampMillis() throws Exception {
-    Span span =
-      Span.newBuilder()
-        .traceId("20")
-        .id("22")
-        .name("")
-        .parentId("21")
-        .timestamp(1L)
-        .localEndpoint(WEB_ENDPOINT)
-        .kind(Kind.CLIENT)
-        .build();
-
-    byte[] result = prefixWithTimestampMillisAndQuery(span, span.timestampAsLong());
-
-    assertThat(new String(result, "UTF-8")).startsWith("{\"timestamp_millis\":1,\"traceId\":");
-  }
-
-  @Test
-  public void prefixWithTimestampMillisAndQuery_addsAnnotationQuery() throws Exception {
-    Span span =
-      Span.newBuilder()
-        .traceId("20")
-        .id("22")
-        .name("")
-        .parentId("21")
-        .localEndpoint(WEB_ENDPOINT)
-        .addAnnotation(1L, "\"foo")
-        .build();
-
-    byte[] result = prefixWithTimestampMillisAndQuery(span, span.timestampAsLong());
-
-    assertThat(new String(result, "UTF-8")).startsWith("{\"_q\":[\"\\\"foo\"],\"traceId");
-  }
-
-  @Test
-  public void prefixWithTimestampMillisAndQuery_addsAnnotationQueryTags() throws Exception {
-    Span span =
-      Span.newBuilder()
-        .traceId("20")
-        .id("22")
-        .name("")
-        .parentId("21")
-        .localEndpoint(WEB_ENDPOINT)
-        .putTag("\"foo", "\"bar")
-        .build();
-
-    byte[] result = prefixWithTimestampMillisAndQuery(span, span.timestampAsLong());
-
-    assertThat(new String(result, "UTF-8"))
-      .startsWith("{\"_q\":[\"\\\"foo\",\"\\\"foo=\\\"bar\"],\"traceId");
-  }
-
-  @Test
-  public void prefixWithTimestampMillisAndQuery_readable() {
-    Span span =
-      Span.newBuilder().traceId("20").id("20").name("get").timestamp(TODAY * 1000).build();
-
-    assertThat(
-      SpanBytesDecoder.JSON_V2.decodeOne(
-        prefixWithTimestampMillisAndQuery(span, span.timestamp())))
-      .isEqualTo(span); // ignores timestamp_millis field
-  }
-
-  @Test
-  public void doesntWriteDocumentId() throws Exception {
-    es.enqueue(new MockResponse());
-
-    accept(Span.newBuilder().traceId("1").id("1").name("foo").build());
-
-    RecordedRequest request = es.takeRequest();
-    assertThat(request.getBody().readByteString().utf8())
-      .doesNotContain("\"_type\":\"span\",\"_id\"");
+      .contains("\n{\"timestamp_millis\":" + TODAY + ",\"traceId\":");
   }
 
   @Test
diff --git a/zipkin-storage/elasticsearch/src/test/java/zipkin2/elasticsearch/InternalForTests.java b/zipkin-storage/elasticsearch/src/test/java/zipkin2/elasticsearch/InternalForTests.java
index c38b8eb..d9e1fc3 100644
--- a/zipkin-storage/elasticsearch/src/test/java/zipkin2/elasticsearch/InternalForTests.java
+++ b/zipkin-storage/elasticsearch/src/test/java/zipkin2/elasticsearch/InternalForTests.java
@@ -16,11 +16,12 @@
  */
 package zipkin2.elasticsearch;
 
+import com.squareup.moshi.JsonWriter;
 import java.io.IOException;
 import java.io.UncheckedIOException;
 import java.util.List;
 import zipkin2.DependencyLink;
-import zipkin2.codec.DependencyLinkBytesEncoder;
+import zipkin2.elasticsearch.internal.BulkIndexSupport;
 import zipkin2.elasticsearch.internal.HttpBulkIndexer;
 
 /** Package accessor for integration tests */
@@ -31,9 +32,7 @@ public class InternalForTests {
       .formatTypeAndTimestampForInsert("dependency", midnightUTC);
     HttpBulkIndexer indexer = new HttpBulkIndexer("indexlinks", es);
     for (DependencyLink link : links) {
-      byte[] document = DependencyLinkBytesEncoder.JSON_V1.encode(link);
-      indexer.add(index, "dependency", document,
-        link.parent() + "|" + link.child()); // Unique constraint
+      indexer.add(index, "dependency", link, DEPENDENCY_LINK_BULK_INDEX_SUPPORT);
     }
     try {
       indexer.newCall().execute();
@@ -41,4 +40,28 @@ public class InternalForTests {
       throw new UncheckedIOException(e);
     }
   }
+
+  static final BulkIndexSupport<DependencyLink> DEPENDENCY_LINK_BULK_INDEX_SUPPORT =
+    new BulkIndexSupport<DependencyLink>() {
+      @Override public void writeDocument(DependencyLink link, JsonWriter writer) {
+        try {
+          writer.beginObject();
+          writer.name("parent").value(link.parent());
+          writer.name("child").value(link.child());
+          writer.name("callCount").value(link.callCount());
+          if (link.errorCount() > 0) writer.name("errorCount").value(link.errorCount());
+          writer.endObject();
+        } catch (IOException e) {
+          throw new AssertionError(e); // No I/O writing to a Buffer.
+        }
+      }
+
+      @Override public void writeIdField(DependencyLink link, JsonWriter writer) {
+        try {
+          writer.name("_id").value(link.parent() + "|" + link.child());
+        } catch (IOException e) {
+          throw new AssertionError(e); // No I/O writing to a Buffer.
+        }
+      }
+    };
 }
diff --git a/zipkin-storage/elasticsearch/src/test/java/zipkin2/elasticsearch/internal/BulkIndexSupportTest.java b/zipkin-storage/elasticsearch/src/test/java/zipkin2/elasticsearch/internal/BulkIndexSupportTest.java
new file mode 100644
index 0000000..eeead05
--- /dev/null
+++ b/zipkin-storage/elasticsearch/src/test/java/zipkin2/elasticsearch/internal/BulkIndexSupportTest.java
@@ -0,0 +1,119 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package zipkin2.elasticsearch.internal;
+
+import com.squareup.moshi.JsonWriter;
+import okio.Buffer;
+import org.junit.Test;
+import zipkin2.Span;
+import zipkin2.Span.Kind;
+import zipkin2.codec.SpanBytesDecoder;
+
+import static org.assertj.core.api.Assertions.assertThat;
+import static zipkin2.TestObjects.CLIENT_SPAN;
+import static zipkin2.TestObjects.FRONTEND;
+import static zipkin2.TestObjects.TODAY;
+
+public class BulkIndexSupportTest {
+  Buffer buffer = new Buffer();
+
+  @Test public void span_doesntAddDocumentId() {
+    BulkIndexSupport.SPAN.writeDocument(CLIENT_SPAN, JsonWriter.of(buffer));
+    buffer.writeByte('\n');
+    BulkIndexSupport.SPAN_SEARCH_DISABLED.writeDocument(CLIENT_SPAN, JsonWriter.of(buffer));
+    buffer.writeByte('\n');
+
+    assertThat(buffer.readUtf8()).doesNotContain("\"_id\"");
+  }
+
+  @Test public void spanSearchFields_skipsWhenNoData() {
+    Span span = Span.newBuilder()
+      .traceId("20")
+      .id("22")
+      .parentId("21")
+      .timestamp(0L)
+      .localEndpoint(FRONTEND)
+      .kind(Kind.CLIENT)
+      .build();
+
+    BulkIndexSupport.SPAN.writeDocument(span, JsonWriter.of(buffer));
+
+    assertThat(buffer.readUtf8()).startsWith("{\"traceId\":\"");
+  }
+
+  @Test public void spanSearchFields_addsTimestampFieldWhenNoTags() {
+    Span span =
+      Span.newBuilder()
+        .traceId("20")
+        .id("22")
+        .name("")
+        .parentId("21")
+        .timestamp(1000L)
+        .localEndpoint(FRONTEND)
+        .kind(Kind.CLIENT)
+        .build();
+
+    BulkIndexSupport.SPAN.writeDocument(span, JsonWriter.of(buffer));
+
+    assertThat(buffer.readUtf8()).startsWith("{\"timestamp_millis\":1,\"traceId\":");
+  }
+
+  @Test public void spanSearchFields_addsQueryFieldForAnnotations() {
+    Span span = Span.newBuilder()
+      .traceId("20")
+      .id("22")
+      .name("")
+      .parentId("21")
+      .localEndpoint(FRONTEND)
+      .addAnnotation(1L, "\"foo")
+      .build();
+
+    BulkIndexSupport.SPAN.writeDocument(span, JsonWriter.of(buffer));
+
+    assertThat(buffer.readUtf8()).startsWith("{\"_q\":[\"\\\"foo\"],\"traceId");
+  }
+
+  @Test public void spanSearchFields_addsQueryFieldForTags() {
+    Span span = Span.newBuilder()
+      .traceId("20")
+      .id("22")
+      .parentId("21")
+      .localEndpoint(FRONTEND)
+      .putTag("\"foo", "\"bar")
+      .build();
+
+    BulkIndexSupport.SPAN.writeDocument(span, JsonWriter.of(buffer));
+
+    assertThat(buffer.readUtf8()).startsWith("{\"_q\":[\"\\\"foo\",\"\\\"foo=\\\"bar\"],\"traceId");
+  }
+
+  @Test public void spanSearchFields_readableByNormalJsonCodec() {
+    Span span =
+      Span.newBuilder().traceId("20").id("20").name("get").timestamp(TODAY * 1000).build();
+
+    BulkIndexSupport.SPAN.writeDocument(span, JsonWriter.of(buffer));
+
+    assertThat(SpanBytesDecoder.JSON_V2.decodeOne(buffer.readByteArray()))
+      .isEqualTo(span); // ignores timestamp_millis field
+  }
+
+  @Test public void spanSearchDisabled_doesntAddQueryFields() {
+    BulkIndexSupport.SPAN_SEARCH_DISABLED.writeDocument(CLIENT_SPAN, JsonWriter.of(buffer));
+
+    assertThat(buffer.readUtf8()).startsWith("{\"traceId\":\"");
+  }
+}