You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@zipkin.apache.org by ad...@apache.org on 2019/05/07 08:35:12 UTC

[incubator-zipkin] branch no-kafka08 created (now 0000b1e)

This is an automated email from the ASF dual-hosted git repository.

adriancole pushed a change to branch no-kafka08
in repository https://gitbox.apache.org/repos/asf/incubator-zipkin.git.


      at 0000b1e  Removes Kafka 0.8 support (KAFKA_ZOOKEEPER)

This branch includes the following new commits:

     new 0000b1e  Removes Kafka 0.8 support (KAFKA_ZOOKEEPER)

The 1 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.



[incubator-zipkin] 01/01: Removes Kafka 0.8 support (KAFKA_ZOOKEEPER)

Posted by ad...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

adriancole pushed a commit to branch no-kafka08
in repository https://gitbox.apache.org/repos/asf/incubator-zipkin.git

commit 0000b1ee883f73c0b7b49a37887182ef912b4d5e
Author: Adrian Cole <ac...@pivotal.io>
AuthorDate: Tue May 7 16:34:52 2019 +0800

    Removes Kafka 0.8 support (KAFKA_ZOOKEEPER)
    
    This removes support for Kafka 0.8 (last updated almost 4 years ago).
    Notably, this means those using `KAFKA_ZOOKEEPER` to configure their
    broker need to switch to `KAFKA_BOOTSTRAP_SERVERS` instead.
    
    Note: Kafka 0.8 was not packaged into zipkin-server, it was an optional
    add-on. However, our docker image was created in such a way that it felt
    like it was available by default.
    
    See https://lists.apache.org/thread.html/432df5a806ee27dd959ded5ebf5e7cc6bd4370f6b1b1daf7bf594e80@%3Cdev.zipkin.apache.org%3E
---
 zipkin-autoconfigure/collector-kafka08/README.md   | 135 ----------
 zipkin-autoconfigure/collector-kafka08/pom.xml     | 104 --------
 .../kafka08/KafkaZooKeeperSetCondition.java        |  45 ----
 .../ZipkinKafka08CollectorAutoConfiguration.java   |  63 -----
 .../kafka08/ZipkinKafkaCollectorProperties.java    |  90 -------
 .../src/main/resources/META-INF/spring.factories   |   2 -
 .../src/main/resources/zipkin-server-kafka08.yml   |   7 -
 .../autoconfigure/collector/kafka08/Access.java    |  42 ----
 ...ipkinKafka08CollectorAutoConfigurationTest.java | 111 ---------
 .../kafka/v1/NestedPropertyOverrideTest.java       |  46 ----
 zipkin-autoconfigure/pom.xml                       |   1 -
 zipkin-collector/kafka/README.md                   |   2 +-
 zipkin-collector/kafka08/README.md                 |  41 ---
 zipkin-collector/kafka08/pom.xml                   |  59 -----
 .../zipkin2/collector/kafka08/KafkaCollector.java  | 277 ---------------------
 .../collector/kafka08/KafkaStreamProcessor.java    |  84 -------
 .../collector/kafka08/ITKafkaCollector.java        | 250 -------------------
 .../zipkin2/collector/kafka08/KafkaTestGraph.java  |  51 ----
 .../kafka08/src/test/resources/log4j.properties    |   7 -
 .../kafka08/src/test/resources/log4j2.properties   |  11 -
 zipkin-collector/pom.xml                           |   1 -
 zipkin-server/README.md                            |   8 -
 22 files changed, 1 insertion(+), 1436 deletions(-)

diff --git a/zipkin-autoconfigure/collector-kafka08/README.md b/zipkin-autoconfigure/collector-kafka08/README.md
deleted file mode 100644
index 558b74d..0000000
--- a/zipkin-autoconfigure/collector-kafka08/README.md
+++ /dev/null
@@ -1,135 +0,0 @@
-# Kafka 0.8 Collector Auto-configure Module
-
-This module provides support for running the kafa 0.8 collector as a
-component of Zipkin server. To activate this collector, reference the
-module jar when running the Zipkin server and configure the ZooKeeper
-connection string via the `KAFKA_ZOOKEEPER` environment
-variable or `zipkin.collector.kafka.zookeeper` property.
-
-## Quick start
-
-JRE 8 is required to run Zipkin server.
-
-Fetch the latest released
-[executable jar for Zipkin server](https://search.maven.org/remote_content?g=io.zipkin.java&a=zipkin-server&v=LATEST&c=exec)
-and
-[autoconfigure module jar for the kafka collector](https://search.maven.org/remote_content?g=io.zipkin.java&a=zipkin-autoconfigure-collector-kafka08&v=LATEST&c=module).
-Run Zipkin server with the Kafka 0.10+ collector enabled.
-
-For example:
-
-```bash
-$ curl -sSL https://zipkin.io/quickstart.sh | bash -s
-$ curl -sSL https://zipkin.io/quickstart.sh | bash -s io.zipkin.java:zipkin-autoconfigure-collector-kafka08:LATEST:module kafka08.jar
-$ KAFKA_ZOOKEEPER=127.0.0.1:2181 \
-    java \
-    -Dloader.path='kafka08.jar,kafka08.jar!/lib' \
-    -Dspring.profiles.active=kafka08 \
-    -cp zipkin.jar \
-    org.springframework.boot.loader.PropertiesLauncher
-```
-
-After executing these steps, the Zipkin UI will be available
-[http://localhost:9411](http://localhost:9411) or port 9411 of the remote host the Zipkin server
-was started on.
-
-The Zipkin server can be further configured as described in the
-[Zipkin server documentation](../../zipkin-server/README.md).
-
-## How this works
-
-The Zipkin server executable jar and the autoconfigure module jar for
-the kafka collector are required. The module jar contains the code for
-loading and configuring the kafka collector, and any dependencies that
-are not already packaged in the Zipkin server jar
-(e.g. zipkin-collector-kafka08, kafka-clients).
-
-Using PropertiesLauncher as the main class runs the Zipkin server
-executable jar the same as it would be if executed using
-`java -jar zipkin.jar`, except it provides the option to load resources
-from outside the executable jar into the classpath. Those external
-resources are specified using the `loader.path` system property. In this
-case, it is configured to load the kafka collector module jar
-(`zipkin-autoconfigure-collector-kafka08-module.jar`) and the jar files
-contained in the `lib/` directory within that module jar
-(`zipkin-autoconfigure-collector-kafka08-module.jar!/lib`).
-
-The `spring.profiles=kafka08` system property causes configuration from
-[zipkin-server-kafka08.yml](src/main/resources/zipkin-server-kafka08.yml)
-to be loaded.
-
-For more information on how this works, see [Spring Boot's documentation
-on the executable jar format](https://docs.spring.io/spring-boot/docs/current/reference/html/executable-jar.html). The
-[section on PropertiesLauncher](https://docs.spring.io/spring-boot/docs/current/reference/html/executable-jar.html#executable-jar-property-launcher-features)
-has more detail on how the external module jar and the libraries it
-contains are loaded.
-
-## Configuration
-
-The following configuration points apply apply when `KAFKA_ZOOKEEPER` or
-`zipkin.collector.kafka.zookeeper` is set. They can be configured by
-setting an environment variable or by setting a java system property
-using the `-Dproperty.name=value` command line argument. Some settings
-correspond to "Consumer Configs" in [Kafka 0.8 documentation](https://kafka.apache.org/082/documentation.html#consumerconfigs).
-
-Environment Variable | Property | Consumer Config | Description
---- | --- | --- | ---
-`KAFKA_ZOOKEEPER` | `zipkin.collector.kafka.zookeeper` | zookeeper.connect | Comma-separated list of zookeeper host/ports, ex. 127.0.0.1:2181. No default
-`KAFKA_GROUP_ID` | `zipkin.collector.kafka.group-id` | group.id | The consumer group this process is consuming on behalf of. Defaults to `zipkin`
-`KAFKA_TOPIC` | `zipkin.collector.kafka.topic` | N/A | The topic that zipkin spans will be consumed from. Defaults to `zipkin`
-`KAFKA_STREAMS` | `zipkin.collector.kafka.streams` | N/A | Count of threads consuming the topic. Defaults to `1`
-
-### Other Kafka consumer properties
-You may need to set other [Kafka consumer properties](https://kafka.apache.org/082/documentation.html#consumerconfigs), in
-addition to the ones with explicit properties defined by the collector.
-In this case, you need to prefix that property name with
-`zipkin.collector.kafka.overrides` and pass it as a system property argument.
-
-For example, to override `auto.offset.reset`, you can set a system property named
-`zipkin.collector.kafka.overrides.auto.offset.reset`:
-
-```bash
-$ KAFKA_ZOOKEEPER=127.0.0.1:2181 \
-    java \
-    -Dloader.path='kafka08.jar,kafka08.jar!/lib' \
-    -Dspring.profiles.active=kafka08 \
-    -Dzipkin.collector.kafka.overrides.auto.offset.reset=latest \
-    -cp zipkin.jar \
-    org.springframework.boot.loader.PropertiesLauncher
-```
-
-### Examples
-
-Multiple ZooKeeper servers:
-
-```bash
-$ KAFKA_ZOOKEEPER=zk1:2181,zk2:2181 \
-    java \
-    -Dloader.path='kafka08.jar,kafka08.jar!/lib' \
-    -Dspring.profiles.active=kafka08 \
-    -cp zipkin.jar \
-    org.springframework.boot.loader.PropertiesLauncher
-```
-
-Alternate topic name(s):
-
-```bash
-$ KAFKA_ZOOKEEPER=127.0.0.1:2181 \
-    java \
-    -Dloader.path='kafka08.jar,kafka08.jar!/lib' \
-    -Dspring.profiles.active=kafka08 \
-    -Dzipkin.collector.kafka.topic=zapkin,zipken \
-    -cp zipkin.jar \
-    org.springframework.boot.loader.PropertiesLauncher
-```
-
-Specifying ZooKeeper as a system property, instead of an environment variable:
-
-```bash
-$ java \
-    -Dloader.path='kafka08.jar,kafka08.jar!/lib' \
-    -Dspring.profiles.active=kafka08 \
-    -Dzipkin.collector.kafka.zookeeper=127.0.0.1:2181 \
-    -cp zipkin.jar \
-    org.springframework.boot.loader.PropertiesLauncher
-```
diff --git a/zipkin-autoconfigure/collector-kafka08/pom.xml b/zipkin-autoconfigure/collector-kafka08/pom.xml
deleted file mode 100644
index 7d384ab..0000000
--- a/zipkin-autoconfigure/collector-kafka08/pom.xml
+++ /dev/null
@@ -1,104 +0,0 @@
-<?xml version="1.0" encoding="UTF-8"?>
-<!--
-
-    Licensed to the Apache Software Foundation (ASF) under one or more
-    contributor license agreements.  See the NOTICE file distributed with
-    this work for additional information regarding copyright ownership.
-    The ASF licenses this file to You under the Apache License, Version 2.0
-    (the "License"); you may not use this file except in compliance with
-    the License.  You may obtain a copy of the License at
-
-        http://www.apache.org/licenses/LICENSE-2.0
-
-    Unless required by applicable law or agreed to in writing, software
-    distributed under the License is distributed on an "AS IS" BASIS,
-    WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-    See the License for the specific language governing permissions and
-    limitations under the License.
-
--->
-<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
-  <modelVersion>4.0.0</modelVersion>
-
-  <parent>
-    <groupId>org.apache.zipkin</groupId>
-    <artifactId>zipkin-autoconfigure-parent</artifactId>
-    <version>2.13.1-SNAPSHOT</version>
-  </parent>
-
-  <artifactId>zipkin-autoconfigure-collector-kafka08</artifactId>
-  <name>Auto Configuration: Kafka Collector</name>
-
-  <properties>
-    <main.basedir>${project.basedir}/../..</main.basedir>
-  </properties>
-
-  <dependencies>
-    <dependency>
-      <groupId>${project.groupId}.zipkin2</groupId>
-      <artifactId>zipkin-collector-kafka08</artifactId>
-      <version>${project.version}</version>
-      <exclusions>
-        <exclusion>
-          <groupId>log4j</groupId>
-          <artifactId>log4j</artifactId>
-        </exclusion>
-      </exclusions>
-    </dependency>
-    <!-- com.101tec:zkclient has a log4j dep, re-route it with the bridge
-         https://logging.apache.org/log4j/2.x/manual/migration.html -->
-    <dependency>
-      <groupId>org.apache.logging.log4j</groupId>
-      <artifactId>log4j-1.2-api</artifactId>
-    </dependency>
-  </dependencies>
-
-  <build>
-    <plugins>
-      <plugin>
-        <groupId>org.springframework.boot</groupId>
-        <artifactId>spring-boot-maven-plugin</artifactId>
-        <version>${spring-boot.version}</version>
-        <configuration>
-          <layoutFactory implementation="zipkin.layout.ZipkinLayoutFactory">
-            <name>zipkin</name>
-          </layoutFactory>
-          <classifier>module</classifier>
-          <!-- exclude dependencies already packaged in zipkin-server -->
-          <!-- https://github.com/spring-projects/spring-boot/issues/3426 transitive exclude doesn't work -->
-          <excludeGroupIds>
-            org.springframework.boot,org.springframework,org.slf4j,commons-logging,com.google.code.gson
-          </excludeGroupIds>
-          <excludes>
-            <!-- excludes direct dependency instead of the group id, as otherwise we'd exclude ourselves -->
-            <exclude>
-              <groupId>${project.groupId}.zipkin2</groupId>
-              <artifactId>zipkin</artifactId>
-            </exclude>
-            <exclude>
-              <groupId>${project.groupId}.zipkin2</groupId>
-              <artifactId>zipkin-collector</artifactId>
-            </exclude>
-            <!-- excludes already packaged logging libraries in the server. Can't use group ID or we
-                 would miss the api bridge -->
-            <exclude>
-              <groupId>org.apache.logging.log4j</groupId>
-              <artifactId>log4j-core</artifactId>
-            </exclude>
-            <exclude>
-              <groupId>org.apache.logging.log4j</groupId>
-              <artifactId>log4j-api</artifactId>
-            </exclude>
-          </excludes>
-        </configuration>
-        <dependencies>
-          <dependency>
-            <groupId>org.apache.zipkin.layout</groupId>
-            <artifactId>zipkin-layout-factory</artifactId>
-            <version>${zipkin-layout-factory.version}</version>
-          </dependency>
-        </dependencies>
-      </plugin>
-    </plugins>
-  </build>
-</project>
diff --git a/zipkin-autoconfigure/collector-kafka08/src/main/java/zipkin2/autoconfigure/collector/kafka08/KafkaZooKeeperSetCondition.java b/zipkin-autoconfigure/collector-kafka08/src/main/java/zipkin2/autoconfigure/collector/kafka08/KafkaZooKeeperSetCondition.java
deleted file mode 100644
index cc86eea..0000000
--- a/zipkin-autoconfigure/collector-kafka08/src/main/java/zipkin2/autoconfigure/collector/kafka08/KafkaZooKeeperSetCondition.java
+++ /dev/null
@@ -1,45 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package zipkin2.autoconfigure.collector.kafka08;
-
-import org.springframework.boot.autoconfigure.condition.ConditionOutcome;
-import org.springframework.boot.autoconfigure.condition.SpringBootCondition;
-import org.springframework.context.annotation.ConditionContext;
-import org.springframework.core.type.AnnotatedTypeMetadata;
-
-/**
- * This condition passes when {@link ZipkinKafkaCollectorProperties#getZookeeper()} is set to
- * non-empty.
- *
- * <p>This is here because the yaml defaults this property to empty like this, and spring-boot
- * doesn't have an option to treat empty properties as unset.
- *
- * <pre>{@code
- * zookeeper: ${KAFKA_ZOOKEEPER:}
- * }</pre>
- */
-final class KafkaZooKeeperSetCondition extends SpringBootCondition {
-  static final String PROPERTY_NAME = "zipkin.collector.kafka.zookeeper";
-
-  @Override
-  public ConditionOutcome getMatchOutcome(ConditionContext context, AnnotatedTypeMetadata a) {
-    String kafkaZookeeper = context.getEnvironment().getProperty(PROPERTY_NAME);
-    return kafkaZookeeper == null || kafkaZookeeper.isEmpty()
-        ? ConditionOutcome.noMatch(PROPERTY_NAME + " isn't set")
-        : ConditionOutcome.match();
-  }
-}
diff --git a/zipkin-autoconfigure/collector-kafka08/src/main/java/zipkin2/autoconfigure/collector/kafka08/ZipkinKafka08CollectorAutoConfiguration.java b/zipkin-autoconfigure/collector-kafka08/src/main/java/zipkin2/autoconfigure/collector/kafka08/ZipkinKafka08CollectorAutoConfiguration.java
deleted file mode 100644
index 54b8f8a..0000000
--- a/zipkin-autoconfigure/collector-kafka08/src/main/java/zipkin2/autoconfigure/collector/kafka08/ZipkinKafka08CollectorAutoConfiguration.java
+++ /dev/null
@@ -1,63 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package zipkin2.autoconfigure.collector.kafka08;
-
-import org.springframework.boot.context.properties.EnableConfigurationProperties;
-import org.springframework.context.annotation.Bean;
-import org.springframework.context.annotation.Conditional;
-import org.springframework.context.annotation.Configuration;
-import zipkin2.collector.CollectorMetrics;
-import zipkin2.collector.CollectorSampler;
-import zipkin2.collector.kafka08.KafkaCollector;
-import zipkin2.storage.StorageComponent;
-
-/**
- * This collector consumes a topic, decodes spans from thrift messages and stores them subject to
- * sampling policy.
- */
-@Configuration
-@EnableConfigurationProperties(ZipkinKafkaCollectorProperties.class)
-@Conditional(KafkaZooKeeperSetCondition.class)
-class ZipkinKafka08CollectorAutoConfiguration {
-
-  /**
-   * This launches a thread to run start. This prevents a several second hang, or worse crash if
-   * zookeeper isn't running, yet.
-   */
-  @Bean
-  KafkaCollector kafka(
-      ZipkinKafkaCollectorProperties kafka,
-      CollectorSampler sampler,
-      CollectorMetrics metrics,
-      StorageComponent storage) {
-    final KafkaCollector result =
-        kafka.toBuilder().sampler(sampler).metrics(metrics).storage(storage).build();
-
-    // don't use @Bean(initMethod = "start") as it can crash the process if zookeeper is down
-    Thread start =
-        new Thread("start " + result.getClass().getSimpleName()) {
-          @Override
-          public void run() {
-            result.start();
-          }
-        };
-    start.setDaemon(true);
-    start.start();
-
-    return result;
-  }
-}
diff --git a/zipkin-autoconfigure/collector-kafka08/src/main/java/zipkin2/autoconfigure/collector/kafka08/ZipkinKafkaCollectorProperties.java b/zipkin-autoconfigure/collector-kafka08/src/main/java/zipkin2/autoconfigure/collector/kafka08/ZipkinKafkaCollectorProperties.java
deleted file mode 100644
index 28e9663..0000000
--- a/zipkin-autoconfigure/collector-kafka08/src/main/java/zipkin2/autoconfigure/collector/kafka08/ZipkinKafkaCollectorProperties.java
+++ /dev/null
@@ -1,90 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package zipkin2.autoconfigure.collector.kafka08;
-
-import java.util.LinkedHashMap;
-import java.util.Map;
-import org.springframework.boot.context.properties.ConfigurationProperties;
-import zipkin2.collector.kafka08.KafkaCollector;
-
-@ConfigurationProperties("zipkin.collector.kafka")
-class ZipkinKafkaCollectorProperties {
-  private String topic = "zipkin";
-  private String zookeeper;
-  private String groupId = "zipkin";
-  private int streams = 1;
-  private int maxMessageSize = 1024 * 1024;
-  private Map<String, String> overrides = new LinkedHashMap<>();
-
-  public String getTopic() {
-    return topic;
-  }
-
-  public void setTopic(String topic) {
-    this.topic = topic;
-  }
-
-  public String getZookeeper() {
-    return zookeeper;
-  }
-
-  public void setZookeeper(String zookeeper) {
-    this.zookeeper = "".equals(zookeeper) ? null : zookeeper;
-  }
-
-  public String getGroupId() {
-    return groupId;
-  }
-
-  public void setGroupId(String groupId) {
-    this.groupId = groupId;
-  }
-
-  public int getStreams() {
-    return streams;
-  }
-
-  public void setStreams(int streams) {
-    this.streams = streams;
-  }
-
-  public int getMaxMessageSize() {
-    return maxMessageSize;
-  }
-
-  public void setMaxMessageSize(int maxMessageSize) {
-    this.maxMessageSize = maxMessageSize;
-  }
-
-  public Map<String, String> getOverrides() {
-    return overrides;
-  }
-
-  public void setOverrides(Map<String, String> overrides) {
-    this.overrides = overrides;
-  }
-
-  public KafkaCollector.Builder toBuilder() {
-    return KafkaCollector.builder()
-        .topic(topic)
-        .zookeeper(zookeeper)
-        .groupId(groupId)
-        .streams(streams)
-        .maxMessageSize(maxMessageSize)
-        .overrides(overrides);
-  }
-}
diff --git a/zipkin-autoconfigure/collector-kafka08/src/main/resources/META-INF/spring.factories b/zipkin-autoconfigure/collector-kafka08/src/main/resources/META-INF/spring.factories
deleted file mode 100644
index 9daca53..0000000
--- a/zipkin-autoconfigure/collector-kafka08/src/main/resources/META-INF/spring.factories
+++ /dev/null
@@ -1,2 +0,0 @@
-org.springframework.boot.autoconfigure.EnableAutoConfiguration=\
-zipkin2.autoconfigure.collector.kafka08.ZipkinKafka08CollectorAutoConfiguration
diff --git a/zipkin-autoconfigure/collector-kafka08/src/main/resources/zipkin-server-kafka08.yml b/zipkin-autoconfigure/collector-kafka08/src/main/resources/zipkin-server-kafka08.yml
deleted file mode 100644
index 04ccf25..0000000
--- a/zipkin-autoconfigure/collector-kafka08/src/main/resources/zipkin-server-kafka08.yml
+++ /dev/null
@@ -1,7 +0,0 @@
-zipkin:
-  collector:
-    kafka:
-      # ZooKeeper host string, comma-separated host:port value.
-      zookeeper: ${KAFKA_ZOOKEEPER:}
-      # Maximum size of a message containing spans in bytes
-      max-message-size: ${KAFKA_MAX_MESSAGE_SIZE:1048576}
diff --git a/zipkin-autoconfigure/collector-kafka08/src/test/java/zipkin2/autoconfigure/collector/kafka08/Access.java b/zipkin-autoconfigure/collector-kafka08/src/test/java/zipkin2/autoconfigure/collector/kafka08/Access.java
deleted file mode 100644
index 1ceeb0d..0000000
--- a/zipkin-autoconfigure/collector-kafka08/src/test/java/zipkin2/autoconfigure/collector/kafka08/Access.java
+++ /dev/null
@@ -1,42 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package zipkin2.autoconfigure.collector.kafka08;
-
-import org.springframework.boot.autoconfigure.context.PropertyPlaceholderAutoConfiguration;
-import org.springframework.boot.context.properties.EnableConfigurationProperties;
-import org.springframework.context.annotation.AnnotationConfigApplicationContext;
-import org.springframework.context.annotation.Configuration;
-import zipkin2.collector.kafka08.KafkaCollector;
-
-/** opens package access for testing */
-public final class Access {
-
-  /** Just registering properties to avoid automatically connecting to a Kafka server */
-  public static void registerKafkaProperties(AnnotationConfigApplicationContext context) {
-    context.register(
-        PropertyPlaceholderAutoConfiguration.class, EnableKafkaCollectorProperties.class);
-  }
-
-  @Configuration
-  @EnableConfigurationProperties(ZipkinKafkaCollectorProperties.class)
-  static class EnableKafkaCollectorProperties {}
-
-  public static KafkaCollector.Builder collectorBuilder(
-      AnnotationConfigApplicationContext context) {
-    return context.getBean(ZipkinKafkaCollectorProperties.class).toBuilder();
-  }
-}
diff --git a/zipkin-autoconfigure/collector-kafka08/src/test/java/zipkin2/autoconfigure/collector/kafka08/ZipkinKafka08CollectorAutoConfigurationTest.java b/zipkin-autoconfigure/collector-kafka08/src/test/java/zipkin2/autoconfigure/collector/kafka08/ZipkinKafka08CollectorAutoConfigurationTest.java
deleted file mode 100644
index b8c2b81..0000000
--- a/zipkin-autoconfigure/collector-kafka08/src/test/java/zipkin2/autoconfigure/collector/kafka08/ZipkinKafka08CollectorAutoConfigurationTest.java
+++ /dev/null
@@ -1,111 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package zipkin2.autoconfigure.collector.kafka08;
-
-import org.junit.After;
-import org.junit.Rule;
-import org.junit.Test;
-import org.junit.rules.ExpectedException;
-import org.springframework.beans.factory.NoSuchBeanDefinitionException;
-import org.springframework.boot.autoconfigure.context.PropertyPlaceholderAutoConfiguration;
-import org.springframework.boot.test.util.TestPropertyValues;
-import org.springframework.context.annotation.AnnotationConfigApplicationContext;
-import org.springframework.context.annotation.Bean;
-import org.springframework.context.annotation.Configuration;
-import zipkin2.collector.Collector;
-import zipkin2.collector.CollectorMetrics;
-import zipkin2.collector.CollectorSampler;
-import zipkin2.collector.kafka08.KafkaCollector;
-import zipkin2.storage.InMemoryStorage;
-import zipkin2.storage.StorageComponent;
-
-import static org.assertj.core.api.Assertions.assertThat;
-
-public class ZipkinKafka08CollectorAutoConfigurationTest {
-
-  @Rule public ExpectedException thrown = ExpectedException.none();
-
-  AnnotationConfigApplicationContext context;
-
-  @After
-  public void close() {
-    if (context != null) {
-      context.close();
-    }
-  }
-
-  @Test
-  public void doesntProvidesCollectorComponent_whenKafkaZooKeeperUnset() {
-    context = new AnnotationConfigApplicationContext();
-    context.register(
-        PropertyPlaceholderAutoConfiguration.class,
-        ZipkinKafka08CollectorAutoConfiguration.class,
-        InMemoryConfiguration.class);
-    context.refresh();
-
-    thrown.expect(NoSuchBeanDefinitionException.class);
-    context.getBean(Collector.class);
-  }
-
-  @Test
-  public void providesCollectorComponent_whenZooKeeperSet() {
-    context = new AnnotationConfigApplicationContext();
-    TestPropertyValues.of("zipkin.collector.kafka.zookeeper:localhost").applyTo(context);
-    context.register(
-        PropertyPlaceholderAutoConfiguration.class,
-        ZipkinKafka08CollectorAutoConfiguration.class,
-        InMemoryConfiguration.class);
-    context.refresh();
-
-    assertThat(context.getBean(KafkaCollector.class)).isNotNull();
-  }
-
-  @Test
-  public void canOverrideProperty_topic() {
-    context = new AnnotationConfigApplicationContext();
-    TestPropertyValues.of(
-        "zipkin.collector.kafka.zookeeper:localhost",
-        "zipkin.collector.kafka.topic:zapkin")
-    .applyTo(context);
-    context.register(
-        PropertyPlaceholderAutoConfiguration.class,
-        ZipkinKafka08CollectorAutoConfiguration.class,
-        InMemoryConfiguration.class);
-    context.refresh();
-
-    assertThat(context.getBean(ZipkinKafkaCollectorProperties.class).getTopic())
-        .isEqualTo("zapkin");
-  }
-
-  @Configuration
-  static class InMemoryConfiguration {
-    @Bean
-    CollectorSampler sampler() {
-      return CollectorSampler.ALWAYS_SAMPLE;
-    }
-
-    @Bean
-    CollectorMetrics metrics() {
-      return CollectorMetrics.NOOP_METRICS;
-    }
-
-    @Bean
-    StorageComponent storage() {
-      return InMemoryStorage.newBuilder().build();
-    }
-  }
-}
diff --git a/zipkin-autoconfigure/collector-kafka08/src/test/java/zipkin2/collector/kafka/v1/NestedPropertyOverrideTest.java b/zipkin-autoconfigure/collector-kafka08/src/test/java/zipkin2/collector/kafka/v1/NestedPropertyOverrideTest.java
deleted file mode 100644
index 88a490c..0000000
--- a/zipkin-autoconfigure/collector-kafka08/src/test/java/zipkin2/collector/kafka/v1/NestedPropertyOverrideTest.java
+++ /dev/null
@@ -1,46 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package zipkin2.collector.kafka08;
-
-import org.junit.Test;
-import org.springframework.boot.test.util.TestPropertyValues;
-import org.springframework.context.annotation.AnnotationConfigApplicationContext;
-import zipkin2.autoconfigure.collector.kafka08.Access;
-import zipkin2.storage.InMemoryStorage;
-
-import static org.assertj.core.api.Assertions.assertThat;
-
-public class NestedPropertyOverrideTest {
-  @Test
-  public void overrideWithNestedProperties() {
-    AnnotationConfigApplicationContext context = new AnnotationConfigApplicationContext();
-    TestPropertyValues.of(
-        "zipkin.collector.kafka.zookeeper:localhost",
-        "zipkin.collector.kafka.overrides.auto.offset.reset:largest").applyTo(context);
-    Access.registerKafkaProperties(context);
-    context.refresh();
-
-    assertThat(
-            Access.collectorBuilder(context)
-                .storage(InMemoryStorage.newBuilder().build())
-                .build()
-                .connector
-                .config
-                .autoOffsetReset())
-        .isEqualTo("largest");
-  }
-}
diff --git a/zipkin-autoconfigure/pom.xml b/zipkin-autoconfigure/pom.xml
index cc1c81e..5cc3810 100644
--- a/zipkin-autoconfigure/pom.xml
+++ b/zipkin-autoconfigure/pom.xml
@@ -37,7 +37,6 @@
   </properties>
 
   <modules>
-    <module>collector-kafka08</module>
     <module>collector-scribe</module>
   </modules>
 
diff --git a/zipkin-collector/kafka/README.md b/zipkin-collector/kafka/README.md
index 9b459a0..49eed4d 100644
--- a/zipkin-collector/kafka/README.md
+++ b/zipkin-collector/kafka/README.md
@@ -1,4 +1,4 @@
-# collector-kafka10
+# collector-kafka
 
 ## KafkaCollector
 This collector is implemented as a Kafka consumer supporting Kafka brokers running
diff --git a/zipkin-collector/kafka08/README.md b/zipkin-collector/kafka08/README.md
deleted file mode 100644
index 142f53f..0000000
--- a/zipkin-collector/kafka08/README.md
+++ /dev/null
@@ -1,41 +0,0 @@
-# collector-kafka
-
-## KafkaCollector
-This collector polls a Kafka 8.2.2+ topic for messages that contain
-a list of spans in json or TBinaryProtocol big-endian encoding. These
-spans are pushed to a span consumer.
-
-`zipkin2.collector.kafka08.KafkaCollector.Builder` includes defaults that will
-operate against a Kafka topic advertised in Zookeeper.
-
-## Encoding spans into Kafka messages
-The message's binary data includes a list of spans. Supported encodings
-are the same as the http [POST /spans](http://zipkin.io/zipkin-api/#/paths/%252Fspans) body.
-
-### Json
-The message's binary data is a list of spans in json. The first character must be '[' (decimal 91).
-
-`Codec.JSON.writeSpans(spans)` performs the correct json encoding.
-
-Here's an example, sending a list of a single span to the zipkin topic:
-
-```bash
-$ kafka-console-producer.sh --broker-list $ADVERTISED_HOST:9092 --topic zipkin
-[{"traceId":"1","name":"bang","id":"2","timestamp":1234,"binaryAnnotations":[{"key":"lc","value":"bamm-bamm","endpoint":{"serviceName":"flintstones","ipv4":"127.0.0.1"}}]}]
-```
-
-### Thrift
-The message's binary data includes a list header followed by N spans serialized in TBinaryProtocol
-
-`Codec.THRIFT.writeSpans(spans)` encodes spans in the following fashion:
-```
-write_byte(12) // type of the list elements: 12 == struct
-write_i32(count) // count of spans that will follow
-for (int i = 0; i < count; i++) {
-  writeTBinaryProtocol(spans(i))
-}
-```
-
-### Legacy encoding
-Older versions of zipkin accepted a single span per message, as opposed
-to a list per message. This practice is deprecated, but still supported.
diff --git a/zipkin-collector/kafka08/pom.xml b/zipkin-collector/kafka08/pom.xml
deleted file mode 100644
index 10d80a0..0000000
--- a/zipkin-collector/kafka08/pom.xml
+++ /dev/null
@@ -1,59 +0,0 @@
-<?xml version="1.0" encoding="UTF-8"?>
-<!--
-
-    Licensed to the Apache Software Foundation (ASF) under one or more
-    contributor license agreements.  See the NOTICE file distributed with
-    this work for additional information regarding copyright ownership.
-    The ASF licenses this file to You under the Apache License, Version 2.0
-    (the "License"); you may not use this file except in compliance with
-    the License.  You may obtain a copy of the License at
-
-        http://www.apache.org/licenses/LICENSE-2.0
-
-    Unless required by applicable law or agreed to in writing, software
-    distributed under the License is distributed on an "AS IS" BASIS,
-    WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-    See the License for the specific language governing permissions and
-    limitations under the License.
-
--->
-<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
-  <modelVersion>4.0.0</modelVersion>
-
-  <parent>
-    <groupId>org.apache.zipkin.zipkin2</groupId>
-    <artifactId>zipkin-collector-parent</artifactId>
-    <version>2.13.1-SNAPSHOT</version>
-  </parent>
-
-  <artifactId>zipkin-collector-kafka08</artifactId>
-  <name>Collector: Kafka (Legacy)</name>
-
-  <properties>
-    <main.basedir>${project.basedir}/../..</main.basedir>
-    <!-- This is pinned to Kafka 0.8.x client as 0.9.x brokers work with them, but not visa-versa
-         http://docs.confluent.io/2.0.0/upgrade.html -->
-    <kafka.version>0.8.2.2</kafka.version>
-  </properties>
-
-  <dependencies>
-    <dependency>
-      <groupId>${project.groupId}</groupId>
-      <artifactId>zipkin-collector</artifactId>
-      <version>${project.version}</version>
-    </dependency>
-
-    <dependency>
-      <groupId>org.apache.kafka</groupId>
-      <artifactId>kafka_2.11</artifactId>
-      <version>${kafka.version}</version>
-      <exclusions>
-        <!-- don't eagerly bind slf4j -->
-        <exclusion>
-          <groupId>org.slf4j</groupId>
-          <artifactId>slf4j-log4j12</artifactId>
-        </exclusion>
-      </exclusions>
-    </dependency>
-  </dependencies>
-</project>
diff --git a/zipkin-collector/kafka08/src/main/java/zipkin2/collector/kafka08/KafkaCollector.java b/zipkin-collector/kafka08/src/main/java/zipkin2/collector/kafka08/KafkaCollector.java
deleted file mode 100644
index b68cc03..0000000
--- a/zipkin-collector/kafka08/src/main/java/zipkin2/collector/kafka08/KafkaCollector.java
+++ /dev/null
@@ -1,277 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package zipkin2.collector.kafka08;
-
-import java.util.LinkedHashMap;
-import java.util.Map;
-import java.util.Properties;
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Executors;
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.atomic.AtomicReference;
-import kafka.consumer.ConsumerConfig;
-import kafka.consumer.KafkaStream;
-import kafka.javaapi.consumer.ZookeeperConsumerConnector;
-import zipkin2.CheckResult;
-import zipkin2.collector.Collector;
-import zipkin2.collector.CollectorComponent;
-import zipkin2.collector.CollectorMetrics;
-import zipkin2.collector.CollectorSampler;
-import zipkin2.storage.SpanConsumer;
-import zipkin2.storage.StorageComponent;
-
-import static kafka.consumer.Consumer.createJavaConsumerConnector;
-import static org.apache.kafka.clients.consumer.ConsumerConfig.AUTO_OFFSET_RESET_CONFIG;
-import static org.apache.kafka.clients.consumer.ConsumerConfig.GROUP_ID_CONFIG;
-
-/**
- * This collector polls a Kafka topic for messages that contain TBinaryProtocol big-endian encoded
- * lists of spans. These spans are pushed to a {@link SpanConsumer#accept span consumer}.
- *
- * <p>This collector remains a Kafka 0.8.x consumer, while Zipkin systems update to 0.9+.
- */
-public final class KafkaCollector extends CollectorComponent {
-
-  public static Builder builder() {
-    return new Builder();
-  }
-
-  /** Configuration including defaults needed to consume spans from a Kafka topic. */
-  public static final class Builder extends CollectorComponent.Builder {
-    final Properties properties = new Properties();
-    Collector.Builder delegate = Collector.newBuilder(KafkaCollector.class);
-    CollectorMetrics metrics = CollectorMetrics.NOOP_METRICS;
-    String topic = "zipkin";
-    int streams = 1;
-
-    @Override
-    public Builder storage(StorageComponent storage) {
-      delegate.storage(storage);
-      return this;
-    }
-
-    @Override
-    public Builder sampler(CollectorSampler sampler) {
-      delegate.sampler(sampler);
-      return this;
-    }
-
-    @Override
-    public Builder metrics(CollectorMetrics metrics) {
-      if (metrics == null) throw new NullPointerException("metrics == null");
-      this.metrics = metrics.forTransport("kafka");
-      delegate.metrics(this.metrics);
-      return this;
-    }
-
-    /** Topic zipkin spans will be consumed from. Defaults to "zipkin" */
-    public Builder topic(String topic) {
-      if (topic == null) throw new NullPointerException("topic == null");
-      this.topic = topic;
-      return this;
-    }
-
-    /** The zookeeper connect string, ex. 127.0.0.1:2181. No default */
-    public Builder zookeeper(String zookeeper) {
-      if (zookeeper == null) throw new NullPointerException("zookeeper == null");
-      properties.put("zookeeper.connect", zookeeper);
-      return this;
-    }
-
-    /** The consumer group this process is consuming on behalf of. Defaults to "zipkin" */
-    public Builder groupId(String groupId) {
-      if (groupId == null) throw new NullPointerException("groupId == null");
-      properties.put(GROUP_ID_CONFIG, groupId);
-      return this;
-    }
-
-    /** Count of threads/streams consuming the topic. Defaults to 1 */
-    public Builder streams(int streams) {
-      this.streams = streams;
-      return this;
-    }
-
-    /** Maximum size of a message containing spans in bytes. Defaults to 1 MiB */
-    public Builder maxMessageSize(int bytes) {
-      properties.put("fetch.message.max.bytes", String.valueOf(bytes));
-      return this;
-    }
-
-    /**
-     * By default, a consumer will be built from properties derived from builder defaults, as well
-     * "auto.offset.reset" -> "smallest". Any properties set here will override the consumer config.
-     *
-     * <p>For example: Only consume spans since you connected by setting the below.
-     *
-     * <pre>{@code
-     * Map<String, String> overrides = new LinkedHashMap<>();
-     * overrides.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "largest");
-     * builder.overrides(overrides);
-     * }</pre>
-     *
-     * @see org.apache.kafka.clients.consumer.ConsumerConfig
-     */
-    public final Builder overrides(Map<String, ?> overrides) {
-      if (overrides == null) throw new NullPointerException("overrides == null");
-      properties.putAll(overrides);
-      return this;
-    }
-
-    @Override
-    public KafkaCollector build() {
-      return new KafkaCollector(this);
-    }
-
-    Builder() {
-      // Settings below correspond to "Old Consumer Configs"
-      // http://kafka.apache.org/documentation.html
-      properties.put(GROUP_ID_CONFIG, "zipkin");
-      properties.put("fetch.message.max.bytes", String.valueOf(1024 * 1024));
-      // Same default as zipkin-scala, and keeps tests from hanging
-      properties.put(AUTO_OFFSET_RESET_CONFIG, "smallest");
-    }
-  }
-
-  final LazyConnector connector;
-  final LazyStreams streams;
-
-  KafkaCollector(Builder builder) {
-    connector = new LazyConnector(builder);
-    streams = new LazyStreams(builder, connector);
-  }
-
-  @Override
-  public KafkaCollector start() {
-    connector.get();
-    streams.get();
-    return this;
-  }
-
-  @Override
-  public CheckResult check() {
-    try {
-      connector.get(); // make sure the connector didn't throw
-      CheckResult failure = streams.failure.get(); // check the streams didn't quit
-      if (failure != null) return failure;
-      return CheckResult.OK;
-    } catch (RuntimeException e) {
-      return CheckResult.failed(e);
-    }
-  }
-
-  static final class LazyConnector {
-
-    final ConsumerConfig config;
-    volatile ZookeeperConsumerConnector connector;
-
-    LazyConnector(Builder builder) {
-      this.config = new ConsumerConfig(builder.properties);
-    }
-
-    ZookeeperConsumerConnector get() {
-      if (connector == null) {
-        synchronized (this) {
-          if (connector == null) {
-            connector = (ZookeeperConsumerConnector) createJavaConsumerConnector(config);
-          }
-        }
-      }
-      return connector;
-    }
-
-    void close() {
-      ZookeeperConsumerConnector maybeConnector = connector;
-      if (maybeConnector == null) return;
-      maybeConnector.shutdown();
-    }
-  }
-
-  @Override
-  public void close() {
-    streams.close();
-    connector.close();
-  }
-
-  static final class LazyStreams {
-    final int streams;
-    final String topic;
-    final Collector collector;
-    final CollectorMetrics metrics;
-    final LazyConnector connector;
-    final AtomicReference<CheckResult> failure = new AtomicReference<>();
-    volatile ExecutorService pool;
-
-    LazyStreams(Builder builder, LazyConnector connector) {
-      this.streams = builder.streams;
-      this.topic = builder.topic;
-      this.collector = builder.delegate.build();
-      this.metrics = builder.metrics;
-      this.connector = connector;
-    }
-
-    ExecutorService get() {
-      if (pool == null) {
-        synchronized (this) {
-          if (pool == null) {
-            pool = compute();
-          }
-        }
-      }
-      return pool;
-    }
-
-    void close() {
-      ExecutorService maybePool = pool;
-      if (maybePool == null) return;
-      maybePool.shutdownNow();
-      try {
-        maybePool.awaitTermination(1, TimeUnit.SECONDS);
-      } catch (InterruptedException e) {
-        // at least we tried
-      }
-    }
-
-    ExecutorService compute() {
-      ExecutorService pool =
-          streams == 1
-              ? Executors.newSingleThreadExecutor()
-              : Executors.newFixedThreadPool(streams);
-
-      Map<String, Integer> topicCountMap = new LinkedHashMap<>(1);
-      topicCountMap.put(topic, streams);
-
-      for (KafkaStream<byte[], byte[]> stream :
-          connector.get().createMessageStreams(topicCountMap).get(topic)) {
-        pool.execute(guardFailures(new KafkaStreamProcessor(stream, collector, metrics)));
-      }
-      return pool;
-    }
-
-    Runnable guardFailures(final Runnable delegate) {
-      return new Runnable() {
-        @Override
-        public void run() {
-          try {
-            delegate.run();
-          } catch (RuntimeException e) {
-            failure.set(CheckResult.failed(e));
-          }
-        }
-      };
-    }
-  }
-}
diff --git a/zipkin-collector/kafka08/src/main/java/zipkin2/collector/kafka08/KafkaStreamProcessor.java b/zipkin-collector/kafka08/src/main/java/zipkin2/collector/kafka08/KafkaStreamProcessor.java
deleted file mode 100644
index 4232c2e..0000000
--- a/zipkin-collector/kafka08/src/main/java/zipkin2/collector/kafka08/KafkaStreamProcessor.java
+++ /dev/null
@@ -1,84 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package zipkin2.collector.kafka08;
-
-import java.util.Collections;
-import kafka.consumer.ConsumerIterator;
-import kafka.consumer.KafkaStream;
-import zipkin2.Callback;
-import zipkin2.Span;
-import zipkin2.codec.SpanBytesDecoder;
-import zipkin2.collector.Collector;
-import zipkin2.collector.CollectorMetrics;
-
-/** Consumes spans from Kafka messages, ignoring malformed input */
-final class KafkaStreamProcessor implements Runnable {
-  static final Callback<Void> NOOP =
-      new Callback<Void>() {
-        @Override
-        public void onSuccess(Void value) {}
-
-        @Override
-        public void onError(Throwable t) {}
-      };
-
-  final KafkaStream<byte[], byte[]> stream;
-  final Collector collector;
-  final CollectorMetrics metrics;
-
-  KafkaStreamProcessor(
-      KafkaStream<byte[], byte[]> stream, Collector collector, CollectorMetrics metrics) {
-    this.stream = stream;
-    this.collector = collector;
-    this.metrics = metrics;
-  }
-
-  @Override
-  public void run() {
-    ConsumerIterator<byte[], byte[]> messages = stream.iterator();
-    while (messages.hasNext()) {
-      byte[] bytes = messages.next().message();
-      metrics.incrementMessages();
-      metrics.incrementBytes(bytes.length);
-      if (bytes.length == 0) continue; // lenient on empty messages
-
-      if (bytes.length < 2) { // need two bytes to check if protobuf
-        metrics.incrementMessagesDropped();
-        continue;
-      }
-
-      // If we received legacy single-span encoding, decode it into a singleton list
-      if (!protobuf3(bytes) && bytes[0] <= 16 && bytes[0] != 12 /* thrift, but not a list */) {
-        Span span;
-        try {
-          span = SpanBytesDecoder.THRIFT.decodeOne(bytes);
-        } catch (RuntimeException e) {
-          metrics.incrementMessagesDropped();
-          continue;
-        }
-        collector.accept(Collections.singletonList(span), NOOP);
-      } else {
-        collector.acceptSpans(bytes, NOOP);
-      }
-    }
-  }
-
-  /* span key or trace ID key */
-  static boolean protobuf3(byte[] bytes) {
-    return bytes[0] == 10 && bytes[1] != 0; // varint follows and won't be zero
-  }
-}
diff --git a/zipkin-collector/kafka08/src/test/java/zipkin2/collector/kafka08/ITKafkaCollector.java b/zipkin-collector/kafka08/src/test/java/zipkin2/collector/kafka08/ITKafkaCollector.java
deleted file mode 100644
index 290a71e..0000000
--- a/zipkin-collector/kafka08/src/test/java/zipkin2/collector/kafka08/ITKafkaCollector.java
+++ /dev/null
@@ -1,250 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package zipkin2.collector.kafka08;
-
-import java.util.Arrays;
-import java.util.List;
-import java.util.concurrent.LinkedBlockingQueue;
-import java.util.concurrent.atomic.AtomicInteger;
-import kafka.javaapi.producer.Producer;
-import kafka.producer.KeyedMessage;
-import org.I0Itec.zkclient.exception.ZkTimeoutException;
-import org.junit.ClassRule;
-import org.junit.Rule;
-import org.junit.Test;
-import org.junit.rules.ExpectedException;
-import org.junit.rules.Timeout;
-import zipkin2.Call;
-import zipkin2.Callback;
-import zipkin2.CheckResult;
-import zipkin2.Span;
-import zipkin2.TestObjects;
-import zipkin2.codec.SpanBytesEncoder;
-import zipkin2.collector.InMemoryCollectorMetrics;
-import zipkin2.collector.kafka08.KafkaCollector.Builder;
-import zipkin2.storage.SpanConsumer;
-import zipkin2.storage.SpanStore;
-import zipkin2.storage.StorageComponent;
-
-import static org.assertj.core.api.Assertions.assertThat;
-import static zipkin2.TestObjects.CLIENT_SPAN;
-import static zipkin2.TestObjects.UTF_8;
-import static zipkin2.codec.SpanBytesEncoder.THRIFT;
-
-public class ITKafkaCollector {
-  @Rule public ExpectedException thrown = ExpectedException.none();
-  @ClassRule public static Timeout globalTimeout = Timeout.seconds(20);
-
-  List<Span> spans = Arrays.asList(TestObjects.LOTS_OF_SPANS[0], TestObjects.LOTS_OF_SPANS[1]);
-
-  Producer<String, byte[]> producer = KafkaTestGraph.INSTANCE.producer();
-  InMemoryCollectorMetrics metrics = new InMemoryCollectorMetrics();
-  InMemoryCollectorMetrics kafkaMetrics = metrics.forTransport("kafka");
-
-  LinkedBlockingQueue<List<Span>> recvdSpans = new LinkedBlockingQueue<>();
-  SpanConsumer consumer = (spans) -> {
-    recvdSpans.add(spans);
-    return Call.create(null);
-  };
-
-  @Test
-  public void checkPasses() {
-    try (KafkaCollector collector = newKafkaTransport(builder("check_passes"), consumer)) {
-      assertThat(collector.check().ok()).isTrue();
-    }
-  }
-
-  @Test
-  public void start_failsOnInvalidZooKeeper() {
-    thrown.expect(ZkTimeoutException.class);
-    thrown.expectMessage("Unable to connect to zookeeper server within timeout: 6000");
-
-    Builder builder = builder("fail_invalid_zk").zookeeper("1.1.1.1");
-
-    try (KafkaCollector collector = newKafkaTransport(builder, consumer)) {}
-  }
-
-  @Test
-  public void canSetMaxMessageSize() {
-    Builder builder = builder("max_message").maxMessageSize(1);
-
-    try (KafkaCollector collector = newKafkaTransport(builder, consumer)) {
-      assertThat(collector.connector.get().config().fetchMessageMaxBytes()).isEqualTo(1);
-    }
-  }
-
-  /** Ensures legacy encoding works: a single TBinaryProtocol encoded span */
-  @Test
-  public void messageWithSingleThriftSpan() throws Exception {
-    Builder builder = builder("single_span");
-
-    byte[] bytes = THRIFT.encode(CLIENT_SPAN);
-    producer.send(new KeyedMessage<>(builder.topic, bytes));
-
-    try (KafkaCollector collector = newKafkaTransport(builder, consumer)) {
-      assertThat(recvdSpans.take()).containsExactly(CLIENT_SPAN);
-    }
-
-    assertThat(kafkaMetrics.messages()).isEqualTo(1);
-    assertThat(kafkaMetrics.messagesDropped()).isZero();
-    assertThat(kafkaMetrics.bytes()).isEqualTo(bytes.length);
-    assertThat(kafkaMetrics.spans()).isEqualTo(1);
-    assertThat(kafkaMetrics.spansDropped()).isZero();
-  }
-
-  /** Ensures list encoding works: a TBinaryProtocol encoded list of spans */
-  @Test
-  public void messageWithMultipleSpans_thrift() throws Exception {
-    messageWithMultipleSpans(builder("multiple_spans_thrift"), THRIFT);
-  }
-
-  /** Ensures list encoding works: a json encoded list of spans */
-  @Test
-  public void messageWithMultipleSpans_json() throws Exception {
-    messageWithMultipleSpans(builder("multiple_spans_json"), SpanBytesEncoder.JSON_V1);
-  }
-
-  /** Ensures list encoding works: a version 2 json list of spans */
-  @Test
-  public void messageWithMultipleSpans_json2() throws Exception {
-    messageWithMultipleSpans(builder("multiple_spans_json2"), SpanBytesEncoder.JSON_V2);
-  }
-
-  /** Ensures list encoding works: proto3 ListOfSpans */
-  @Test
-  public void messageWithMultipleSpans_proto3() throws Exception {
-    messageWithMultipleSpans(builder("multiple_spans_proto3"), SpanBytesEncoder.PROTO3);
-  }
-
-  void messageWithMultipleSpans(Builder builder, SpanBytesEncoder encoder) throws Exception {
-    byte[] message = encoder.encodeList(spans);
-
-    producer.send(new KeyedMessage<>(builder.topic, message));
-
-    try (KafkaCollector collector = newKafkaTransport(builder, consumer)) {
-      assertThat(recvdSpans.take()).containsAll(spans);
-    }
-
-    assertThat(kafkaMetrics.messages()).isEqualTo(1);
-    assertThat(kafkaMetrics.messagesDropped()).isZero();
-    assertThat(kafkaMetrics.bytes()).isEqualTo(message.length);
-    assertThat(kafkaMetrics.spans()).isEqualTo(spans.size());
-    assertThat(kafkaMetrics.spansDropped()).isZero();
-  }
-
-  /** Ensures malformed spans don't hang the collector */
-  @Test
-  public void skipsMalformedData() throws Exception {
-    Builder builder = builder("decoder_exception");
-
-    byte[] malformed1 = "[\"='".getBytes(UTF_8); // screwed up json
-    byte[] malformed2 = "malformed".getBytes(UTF_8);
-    producer.send(new KeyedMessage<>(builder.topic, THRIFT.encodeList(spans)));
-    producer.send(new KeyedMessage<>(builder.topic, new byte[0]));
-    producer.send(new KeyedMessage<>(builder.topic, malformed1));
-    producer.send(new KeyedMessage<>(builder.topic, malformed2));
-    producer.send(new KeyedMessage<>(builder.topic, THRIFT.encodeList(spans)));
-
-    try (KafkaCollector collector = newKafkaTransport(builder, consumer)) {
-      assertThat(recvdSpans.take()).containsExactlyElementsOf(spans);
-      // the only way we could read this, is if the malformed spans were skipped.
-      assertThat(recvdSpans.take()).containsExactlyElementsOf(spans);
-    }
-
-    assertThat(kafkaMetrics.messages()).isEqualTo(5);
-    assertThat(kafkaMetrics.messagesDropped()).isEqualTo(2); // only malformed, not empty
-    assertThat(kafkaMetrics.bytes())
-      .isEqualTo(THRIFT.encodeList(spans).length * 2 + malformed1.length + malformed2.length);
-    assertThat(kafkaMetrics.spans()).isEqualTo(spans.size() * 2);
-    assertThat(kafkaMetrics.spansDropped()).isZero();
-  }
-
-  /** Guards against errors that leak from storage, such as InvalidQueryException */
-  @Test
-  public void skipsOnStorageException() throws Exception {
-    Builder builder = builder("storage_exception");
-
-    AtomicInteger counter = new AtomicInteger();
-    consumer = (input) -> new Call.Base<Void>() {
-
-      @Override protected Void doExecute() {
-        throw new AssertionError();
-      }
-
-      @Override protected void doEnqueue(Callback<Void> callback) {
-        if (counter.getAndIncrement() == 1) {
-          callback.onError(new RuntimeException("storage fell over"));
-        } else {
-          recvdSpans.add(spans);
-          callback.onSuccess(null);
-        }
-      }
-
-      @Override public Call<Void> clone() {
-        throw new AssertionError();
-      }
-    };
-
-    producer.send(new KeyedMessage<>(builder.topic, THRIFT.encodeList(spans)));
-    producer.send(new KeyedMessage<>(builder.topic, THRIFT.encodeList(spans))); // tossed on error
-    producer.send(new KeyedMessage<>(builder.topic, THRIFT.encodeList(spans)));
-
-    try (KafkaCollector collector = newKafkaTransport(builder, consumer)) {
-      assertThat(recvdSpans.take()).containsExactlyElementsOf(spans);
-      // the only way we could read this, is if the malformed span was skipped.
-      assertThat(recvdSpans.take()).containsExactlyElementsOf(spans);
-    }
-
-    assertThat(kafkaMetrics.messages()).isEqualTo(3);
-    assertThat(kafkaMetrics.messagesDropped()).isZero(); // storage failure isn't a message failure
-    assertThat(kafkaMetrics.bytes()).isEqualTo(THRIFT.encodeList(spans).length * 3);
-    assertThat(kafkaMetrics.spans()).isEqualTo(spans.size() * 3);
-    assertThat(kafkaMetrics.spansDropped()).isEqualTo(spans.size()); // only one dropped
-  }
-
-  Builder builder(String topic) {
-    return new Builder().metrics(metrics).zookeeper("127.0.0.1:2181").topic(topic);
-  }
-
-  KafkaCollector newKafkaTransport(Builder builder, SpanConsumer consumer) {
-    return new KafkaCollector(builder.storage(buildStorage(consumer))).start();
-  }
-
-  StorageComponent buildStorage(final SpanConsumer spanConsumer) {
-    return new StorageComponent() {
-      @Override
-      public SpanStore spanStore() {
-        throw new AssertionError();
-      }
-
-      @Override
-      public SpanConsumer spanConsumer() {
-        return spanConsumer;
-      }
-
-      @Override
-      public CheckResult check() {
-        return CheckResult.OK;
-      }
-
-      @Override
-      public void close() {
-        throw new AssertionError();
-      }
-    };
-  }
-}
diff --git a/zipkin-collector/kafka08/src/test/java/zipkin2/collector/kafka08/KafkaTestGraph.java b/zipkin-collector/kafka08/src/test/java/zipkin2/collector/kafka08/KafkaTestGraph.java
deleted file mode 100644
index 3ff9beb..0000000
--- a/zipkin-collector/kafka08/src/test/java/zipkin2/collector/kafka08/KafkaTestGraph.java
+++ /dev/null
@@ -1,51 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package zipkin2.collector.kafka08;
-
-import java.util.Properties;
-import kafka.common.FailedToSendMessageException;
-import kafka.javaapi.producer.Producer;
-import kafka.producer.KeyedMessage;
-import kafka.producer.ProducerConfig;
-import org.I0Itec.zkclient.ZkClient;
-import org.I0Itec.zkclient.exception.ZkTimeoutException;
-import org.junit.AssumptionViolatedException;
-
-/** Tests only execute when ZK and Kafka are listening on 127.0.0.1 on default ports. */
-enum KafkaTestGraph {
-  INSTANCE;
-
-  private AssumptionViolatedException ex;
-  private Producer<String, byte[]> producer;
-
-  synchronized Producer<String, byte[]> producer() {
-    if (ex != null) throw ex;
-    if (this.producer == null) {
-      Properties producerProps = new Properties();
-      producerProps.put("metadata.broker.list", "127.0.0.1:9092");
-      producerProps.put("producer.type", "sync");
-      producer = new Producer<>(new ProducerConfig(producerProps));
-      try {
-        new ZkClient("127.0.0.1:2181", 1000);
-        producer.send(new KeyedMessage<>("test", new byte[0]));
-      } catch (FailedToSendMessageException | ZkTimeoutException e) {
-        throw ex = new AssumptionViolatedException(e.getMessage(), e);
-      }
-    }
-    return producer;
-  }
-}
diff --git a/zipkin-collector/kafka08/src/test/resources/log4j.properties b/zipkin-collector/kafka08/src/test/resources/log4j.properties
deleted file mode 100644
index 15345f3..0000000
--- a/zipkin-collector/kafka08/src/test/resources/log4j.properties
+++ /dev/null
@@ -1,7 +0,0 @@
-# By default, everything goes to console and file
-log4j.rootLogger=WARN, A1
-# A1 is set to be a ConsoleAppender.
-log4j.appender.A1=org.apache.log4j.ConsoleAppender
-log4j.appender.A1.layout=org.apache.log4j.PatternLayout
-log4j.appender.A1.layout.ConversionPattern=%d [%t] %-5p %c - %m%n
-log4j.appender.A1.ImmediateFlush=true
diff --git a/zipkin-collector/kafka08/src/test/resources/log4j2.properties b/zipkin-collector/kafka08/src/test/resources/log4j2.properties
deleted file mode 100755
index c437666..0000000
--- a/zipkin-collector/kafka08/src/test/resources/log4j2.properties
+++ /dev/null
@@ -1,11 +0,0 @@
-appenders=console
-appender.console.type=Console
-appender.console.name=STDOUT
-appender.console.layout.type=PatternLayout
-appender.console.layout.pattern=%d{ABSOLUTE} %-5p [%t] %C{2} (%F:%L) - %m%n
-rootLogger.level=warn
-rootLogger.appenderRefs=stdout
-rootLogger.appenderRef.stdout.ref=STDOUT
-# don't waste logs when ZK check fails
-logger.zk.name=org.apache.zookeeper.ClientCnxn
-logger.zk.level=off
diff --git a/zipkin-collector/pom.xml b/zipkin-collector/pom.xml
index 153398d..246f176 100644
--- a/zipkin-collector/pom.xml
+++ b/zipkin-collector/pom.xml
@@ -42,7 +42,6 @@
     <module>kafka</module>
     <module>rabbitmq</module>
     <module>scribe</module>
-    <module>kafka08</module>
   </modules>
 
   <dependencies>
diff --git a/zipkin-server/README.md b/zipkin-server/README.md
index 7832c84..7e64055 100644
--- a/zipkin-server/README.md
+++ b/zipkin-server/README.md
@@ -352,14 +352,6 @@ Specifying bootstrap servers as a system property, instead of an environment var
 $ java -Dzipkin.collector.kafka.bootstrap-servers=127.0.0.1:9092 -jar zipkin.jar
 ```
 
-#### Migration from Kafka < 0.8.1
-
-As explained [on kafka wiki](https://cwiki.apache.org/confluence/display/KAFKA/Committing+and+fetching+consumer+offsets+in+Kafka), offsets were stored in ZooKeeper. This has changed and offsets are now stored directly in Kafka. You need to update offsets in Kafka 0.10 by following the instructions.
-
-#### Kafka (Legacy) Collector
-The default collector is for Kafka 0.10.x+ brokers. You can use Kafka
-0.8 brokers via an external module. See [zipkin-autoconfigure/collector-kafka08](../zipkin-autoconfigure/collector-kafka08/).
-
 ### RabbitMQ collector
 The [RabbitMQ collector](../zipkin-collector/rabbitmq) will be enabled when the `addresses` or `uri` for the RabbitMQ server(s) is set.