You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@zipkin.apache.org by ad...@apache.org on 2019/05/08 00:42:13 UTC
[incubator-zipkin] branch master updated: Removes Kafka 0.8 support
(KAFKA_ZOOKEEPER) (#2564)
This is an automated email from the ASF dual-hosted git repository.
adriancole pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-zipkin.git
The following commit(s) were added to refs/heads/master by this push:
new 8b182fb Removes Kafka 0.8 support (KAFKA_ZOOKEEPER) (#2564)
8b182fb is described below
commit 8b182fb62396334bf0323d5cdbc65b6bf3660e45
Author: Adrian Cole <ad...@users.noreply.github.com>
AuthorDate: Wed May 8 08:42:08 2019 +0800
Removes Kafka 0.8 support (KAFKA_ZOOKEEPER) (#2564)
This removes support for Kafka 0.8 (last updated almost 4 years ago).
Notably, this means those using `KAFKA_ZOOKEEPER` to configure their
broker need to switch to `KAFKA_BOOTSTRAP_SERVERS` instead.
Note: Kafka 0.8 was not packaged into zipkin-server, it was an optional
add-on. However, our docker image was created in such a way that it felt
like it was available by default.
See https://lists.apache.org/thread.html/432df5a806ee27dd959ded5ebf5e7cc6bd4370f6b1b1daf7bf594e80@%3Cdev.zipkin.apache.org%3E
---
.travis.yml | 6 -
zipkin-autoconfigure/collector-kafka08/README.md | 135 ----------
zipkin-autoconfigure/collector-kafka08/pom.xml | 104 --------
.../kafka08/KafkaZooKeeperSetCondition.java | 45 ----
.../ZipkinKafka08CollectorAutoConfiguration.java | 63 -----
.../kafka08/ZipkinKafkaCollectorProperties.java | 90 -------
.../src/main/resources/META-INF/spring.factories | 2 -
.../src/main/resources/zipkin-server-kafka08.yml | 7 -
.../autoconfigure/collector/kafka08/Access.java | 42 ----
...ipkinKafka08CollectorAutoConfigurationTest.java | 111 ---------
.../kafka/v1/NestedPropertyOverrideTest.java | 46 ----
zipkin-autoconfigure/pom.xml | 1 -
zipkin-collector/kafka/README.md | 2 +-
zipkin-collector/kafka08/README.md | 41 ---
zipkin-collector/kafka08/pom.xml | 59 -----
.../zipkin2/collector/kafka08/KafkaCollector.java | 277 ---------------------
.../collector/kafka08/KafkaStreamProcessor.java | 84 -------
.../collector/kafka08/ITKafkaCollector.java | 250 -------------------
.../zipkin2/collector/kafka08/KafkaTestGraph.java | 51 ----
.../kafka08/src/test/resources/log4j.properties | 7 -
.../kafka08/src/test/resources/log4j2.properties | 11 -
zipkin-collector/pom.xml | 1 -
zipkin-server/README.md | 8 -
23 files changed, 1 insertion(+), 1442 deletions(-)
diff --git a/.travis.yml b/.travis.yml
index 9747dd9..74e4a50f 100755
--- a/.travis.yml
+++ b/.travis.yml
@@ -11,7 +11,6 @@ cache:
# zipkin-ui gets dependencies via NPM
- $HOME/.npm
- $HOME/.m2
- - $HOME/kafka_2.11-0.8.2.2
language: java
@@ -28,11 +27,6 @@ before_install:
# Required for Elasticsearch 5 (See https://github.com/docker-library/docs/tree/master/elasticsearch#host-setup)
- sudo sysctl -w vm.max_map_count=262144
- # Manually install and run zk+kafka as it isn't an available service
- - test -d $HOME/kafka_2.11-0.8.2.2/bin || curl -sSL https://archive.apache.org/dist/kafka/0.8.2.2/kafka_2.11-0.8.2.2.tgz | bash -c '(cd $HOME; tar -xzf -)'
- - nohup bash -c "cd $HOME/kafka_2.11-0.8.2.2 && bin/zookeeper-server-start.sh config/zookeeper.properties >/dev/null 2>&1 &"
- - nohup bash -c "cd $HOME/kafka_2.11-0.8.2.2 && bin/kafka-server-start.sh config/server.properties >/dev/null 2>&1 &"
-
# Quiet Maven invoker logs (Downloading... when running zipkin-server/src/it)
- echo "MAVEN_OPTS='-Dorg.slf4j.simpleLogger.log.org.apache.maven.cli.transfer.Slf4jMavenTransferListener=warn'" > ~/.mavenrc
diff --git a/zipkin-autoconfigure/collector-kafka08/README.md b/zipkin-autoconfigure/collector-kafka08/README.md
deleted file mode 100644
index 558b74d..0000000
--- a/zipkin-autoconfigure/collector-kafka08/README.md
+++ /dev/null
@@ -1,135 +0,0 @@
-# Kafka 0.8 Collector Auto-configure Module
-
-This module provides support for running the kafa 0.8 collector as a
-component of Zipkin server. To activate this collector, reference the
-module jar when running the Zipkin server and configure the ZooKeeper
-connection string via the `KAFKA_ZOOKEEPER` environment
-variable or `zipkin.collector.kafka.zookeeper` property.
-
-## Quick start
-
-JRE 8 is required to run Zipkin server.
-
-Fetch the latest released
-[executable jar for Zipkin server](https://search.maven.org/remote_content?g=io.zipkin.java&a=zipkin-server&v=LATEST&c=exec)
-and
-[autoconfigure module jar for the kafka collector](https://search.maven.org/remote_content?g=io.zipkin.java&a=zipkin-autoconfigure-collector-kafka08&v=LATEST&c=module).
-Run Zipkin server with the Kafka 0.10+ collector enabled.
-
-For example:
-
-```bash
-$ curl -sSL https://zipkin.io/quickstart.sh | bash -s
-$ curl -sSL https://zipkin.io/quickstart.sh | bash -s io.zipkin.java:zipkin-autoconfigure-collector-kafka08:LATEST:module kafka08.jar
-$ KAFKA_ZOOKEEPER=127.0.0.1:2181 \
- java \
- -Dloader.path='kafka08.jar,kafka08.jar!/lib' \
- -Dspring.profiles.active=kafka08 \
- -cp zipkin.jar \
- org.springframework.boot.loader.PropertiesLauncher
-```
-
-After executing these steps, the Zipkin UI will be available
-[http://localhost:9411](http://localhost:9411) or port 9411 of the remote host the Zipkin server
-was started on.
-
-The Zipkin server can be further configured as described in the
-[Zipkin server documentation](../../zipkin-server/README.md).
-
-## How this works
-
-The Zipkin server executable jar and the autoconfigure module jar for
-the kafka collector are required. The module jar contains the code for
-loading and configuring the kafka collector, and any dependencies that
-are not already packaged in the Zipkin server jar
-(e.g. zipkin-collector-kafka08, kafka-clients).
-
-Using PropertiesLauncher as the main class runs the Zipkin server
-executable jar the same as it would be if executed using
-`java -jar zipkin.jar`, except it provides the option to load resources
-from outside the executable jar into the classpath. Those external
-resources are specified using the `loader.path` system property. In this
-case, it is configured to load the kafka collector module jar
-(`zipkin-autoconfigure-collector-kafka08-module.jar`) and the jar files
-contained in the `lib/` directory within that module jar
-(`zipkin-autoconfigure-collector-kafka08-module.jar!/lib`).
-
-The `spring.profiles=kafka08` system property causes configuration from
-[zipkin-server-kafka08.yml](src/main/resources/zipkin-server-kafka08.yml)
-to be loaded.
-
-For more information on how this works, see [Spring Boot's documentation
-on the executable jar format](https://docs.spring.io/spring-boot/docs/current/reference/html/executable-jar.html). The
-[section on PropertiesLauncher](https://docs.spring.io/spring-boot/docs/current/reference/html/executable-jar.html#executable-jar-property-launcher-features)
-has more detail on how the external module jar and the libraries it
-contains are loaded.
-
-## Configuration
-
-The following configuration points apply apply when `KAFKA_ZOOKEEPER` or
-`zipkin.collector.kafka.zookeeper` is set. They can be configured by
-setting an environment variable or by setting a java system property
-using the `-Dproperty.name=value` command line argument. Some settings
-correspond to "Consumer Configs" in [Kafka 0.8 documentation](https://kafka.apache.org/082/documentation.html#consumerconfigs).
-
-Environment Variable | Property | Consumer Config | Description
---- | --- | --- | ---
-`KAFKA_ZOOKEEPER` | `zipkin.collector.kafka.zookeeper` | zookeeper.connect | Comma-separated list of zookeeper host/ports, ex. 127.0.0.1:2181. No default
-`KAFKA_GROUP_ID` | `zipkin.collector.kafka.group-id` | group.id | The consumer group this process is consuming on behalf of. Defaults to `zipkin`
-`KAFKA_TOPIC` | `zipkin.collector.kafka.topic` | N/A | The topic that zipkin spans will be consumed from. Defaults to `zipkin`
-`KAFKA_STREAMS` | `zipkin.collector.kafka.streams` | N/A | Count of threads consuming the topic. Defaults to `1`
-
-### Other Kafka consumer properties
-You may need to set other [Kafka consumer properties](https://kafka.apache.org/082/documentation.html#consumerconfigs), in
-addition to the ones with explicit properties defined by the collector.
-In this case, you need to prefix that property name with
-`zipkin.collector.kafka.overrides` and pass it as a system property argument.
-
-For example, to override `auto.offset.reset`, you can set a system property named
-`zipkin.collector.kafka.overrides.auto.offset.reset`:
-
-```bash
-$ KAFKA_ZOOKEEPER=127.0.0.1:2181 \
- java \
- -Dloader.path='kafka08.jar,kafka08.jar!/lib' \
- -Dspring.profiles.active=kafka08 \
- -Dzipkin.collector.kafka.overrides.auto.offset.reset=latest \
- -cp zipkin.jar \
- org.springframework.boot.loader.PropertiesLauncher
-```
-
-### Examples
-
-Multiple ZooKeeper servers:
-
-```bash
-$ KAFKA_ZOOKEEPER=zk1:2181,zk2:2181 \
- java \
- -Dloader.path='kafka08.jar,kafka08.jar!/lib' \
- -Dspring.profiles.active=kafka08 \
- -cp zipkin.jar \
- org.springframework.boot.loader.PropertiesLauncher
-```
-
-Alternate topic name(s):
-
-```bash
-$ KAFKA_ZOOKEEPER=127.0.0.1:2181 \
- java \
- -Dloader.path='kafka08.jar,kafka08.jar!/lib' \
- -Dspring.profiles.active=kafka08 \
- -Dzipkin.collector.kafka.topic=zapkin,zipken \
- -cp zipkin.jar \
- org.springframework.boot.loader.PropertiesLauncher
-```
-
-Specifying ZooKeeper as a system property, instead of an environment variable:
-
-```bash
-$ java \
- -Dloader.path='kafka08.jar,kafka08.jar!/lib' \
- -Dspring.profiles.active=kafka08 \
- -Dzipkin.collector.kafka.zookeeper=127.0.0.1:2181 \
- -cp zipkin.jar \
- org.springframework.boot.loader.PropertiesLauncher
-```
diff --git a/zipkin-autoconfigure/collector-kafka08/pom.xml b/zipkin-autoconfigure/collector-kafka08/pom.xml
deleted file mode 100644
index 7d384ab..0000000
--- a/zipkin-autoconfigure/collector-kafka08/pom.xml
+++ /dev/null
@@ -1,104 +0,0 @@
-<?xml version="1.0" encoding="UTF-8"?>
-<!--
-
- Licensed to the Apache Software Foundation (ASF) under one or more
- contributor license agreements. See the NOTICE file distributed with
- this work for additional information regarding copyright ownership.
- The ASF licenses this file to You under the Apache License, Version 2.0
- (the "License"); you may not use this file except in compliance with
- the License. You may obtain a copy of the License at
-
- http://www.apache.org/licenses/LICENSE-2.0
-
- Unless required by applicable law or agreed to in writing, software
- distributed under the License is distributed on an "AS IS" BASIS,
- WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- See the License for the specific language governing permissions and
- limitations under the License.
-
--->
-<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
- <modelVersion>4.0.0</modelVersion>
-
- <parent>
- <groupId>org.apache.zipkin</groupId>
- <artifactId>zipkin-autoconfigure-parent</artifactId>
- <version>2.13.1-SNAPSHOT</version>
- </parent>
-
- <artifactId>zipkin-autoconfigure-collector-kafka08</artifactId>
- <name>Auto Configuration: Kafka Collector</name>
-
- <properties>
- <main.basedir>${project.basedir}/../..</main.basedir>
- </properties>
-
- <dependencies>
- <dependency>
- <groupId>${project.groupId}.zipkin2</groupId>
- <artifactId>zipkin-collector-kafka08</artifactId>
- <version>${project.version}</version>
- <exclusions>
- <exclusion>
- <groupId>log4j</groupId>
- <artifactId>log4j</artifactId>
- </exclusion>
- </exclusions>
- </dependency>
- <!-- com.101tec:zkclient has a log4j dep, re-route it with the bridge
- https://logging.apache.org/log4j/2.x/manual/migration.html -->
- <dependency>
- <groupId>org.apache.logging.log4j</groupId>
- <artifactId>log4j-1.2-api</artifactId>
- </dependency>
- </dependencies>
-
- <build>
- <plugins>
- <plugin>
- <groupId>org.springframework.boot</groupId>
- <artifactId>spring-boot-maven-plugin</artifactId>
- <version>${spring-boot.version}</version>
- <configuration>
- <layoutFactory implementation="zipkin.layout.ZipkinLayoutFactory">
- <name>zipkin</name>
- </layoutFactory>
- <classifier>module</classifier>
- <!-- exclude dependencies already packaged in zipkin-server -->
- <!-- https://github.com/spring-projects/spring-boot/issues/3426 transitive exclude doesn't work -->
- <excludeGroupIds>
- org.springframework.boot,org.springframework,org.slf4j,commons-logging,com.google.code.gson
- </excludeGroupIds>
- <excludes>
- <!-- excludes direct dependency instead of the group id, as otherwise we'd exclude ourselves -->
- <exclude>
- <groupId>${project.groupId}.zipkin2</groupId>
- <artifactId>zipkin</artifactId>
- </exclude>
- <exclude>
- <groupId>${project.groupId}.zipkin2</groupId>
- <artifactId>zipkin-collector</artifactId>
- </exclude>
- <!-- excludes already packaged logging libraries in the server. Can't use group ID or we
- would miss the api bridge -->
- <exclude>
- <groupId>org.apache.logging.log4j</groupId>
- <artifactId>log4j-core</artifactId>
- </exclude>
- <exclude>
- <groupId>org.apache.logging.log4j</groupId>
- <artifactId>log4j-api</artifactId>
- </exclude>
- </excludes>
- </configuration>
- <dependencies>
- <dependency>
- <groupId>org.apache.zipkin.layout</groupId>
- <artifactId>zipkin-layout-factory</artifactId>
- <version>${zipkin-layout-factory.version}</version>
- </dependency>
- </dependencies>
- </plugin>
- </plugins>
- </build>
-</project>
diff --git a/zipkin-autoconfigure/collector-kafka08/src/main/java/zipkin2/autoconfigure/collector/kafka08/KafkaZooKeeperSetCondition.java b/zipkin-autoconfigure/collector-kafka08/src/main/java/zipkin2/autoconfigure/collector/kafka08/KafkaZooKeeperSetCondition.java
deleted file mode 100644
index cc86eea..0000000
--- a/zipkin-autoconfigure/collector-kafka08/src/main/java/zipkin2/autoconfigure/collector/kafka08/KafkaZooKeeperSetCondition.java
+++ /dev/null
@@ -1,45 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package zipkin2.autoconfigure.collector.kafka08;
-
-import org.springframework.boot.autoconfigure.condition.ConditionOutcome;
-import org.springframework.boot.autoconfigure.condition.SpringBootCondition;
-import org.springframework.context.annotation.ConditionContext;
-import org.springframework.core.type.AnnotatedTypeMetadata;
-
-/**
- * This condition passes when {@link ZipkinKafkaCollectorProperties#getZookeeper()} is set to
- * non-empty.
- *
- * <p>This is here because the yaml defaults this property to empty like this, and spring-boot
- * doesn't have an option to treat empty properties as unset.
- *
- * <pre>{@code
- * zookeeper: ${KAFKA_ZOOKEEPER:}
- * }</pre>
- */
-final class KafkaZooKeeperSetCondition extends SpringBootCondition {
- static final String PROPERTY_NAME = "zipkin.collector.kafka.zookeeper";
-
- @Override
- public ConditionOutcome getMatchOutcome(ConditionContext context, AnnotatedTypeMetadata a) {
- String kafkaZookeeper = context.getEnvironment().getProperty(PROPERTY_NAME);
- return kafkaZookeeper == null || kafkaZookeeper.isEmpty()
- ? ConditionOutcome.noMatch(PROPERTY_NAME + " isn't set")
- : ConditionOutcome.match();
- }
-}
diff --git a/zipkin-autoconfigure/collector-kafka08/src/main/java/zipkin2/autoconfigure/collector/kafka08/ZipkinKafka08CollectorAutoConfiguration.java b/zipkin-autoconfigure/collector-kafka08/src/main/java/zipkin2/autoconfigure/collector/kafka08/ZipkinKafka08CollectorAutoConfiguration.java
deleted file mode 100644
index 54b8f8a..0000000
--- a/zipkin-autoconfigure/collector-kafka08/src/main/java/zipkin2/autoconfigure/collector/kafka08/ZipkinKafka08CollectorAutoConfiguration.java
+++ /dev/null
@@ -1,63 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package zipkin2.autoconfigure.collector.kafka08;
-
-import org.springframework.boot.context.properties.EnableConfigurationProperties;
-import org.springframework.context.annotation.Bean;
-import org.springframework.context.annotation.Conditional;
-import org.springframework.context.annotation.Configuration;
-import zipkin2.collector.CollectorMetrics;
-import zipkin2.collector.CollectorSampler;
-import zipkin2.collector.kafka08.KafkaCollector;
-import zipkin2.storage.StorageComponent;
-
-/**
- * This collector consumes a topic, decodes spans from thrift messages and stores them subject to
- * sampling policy.
- */
-@Configuration
-@EnableConfigurationProperties(ZipkinKafkaCollectorProperties.class)
-@Conditional(KafkaZooKeeperSetCondition.class)
-class ZipkinKafka08CollectorAutoConfiguration {
-
- /**
- * This launches a thread to run start. This prevents a several second hang, or worse crash if
- * zookeeper isn't running, yet.
- */
- @Bean
- KafkaCollector kafka(
- ZipkinKafkaCollectorProperties kafka,
- CollectorSampler sampler,
- CollectorMetrics metrics,
- StorageComponent storage) {
- final KafkaCollector result =
- kafka.toBuilder().sampler(sampler).metrics(metrics).storage(storage).build();
-
- // don't use @Bean(initMethod = "start") as it can crash the process if zookeeper is down
- Thread start =
- new Thread("start " + result.getClass().getSimpleName()) {
- @Override
- public void run() {
- result.start();
- }
- };
- start.setDaemon(true);
- start.start();
-
- return result;
- }
-}
diff --git a/zipkin-autoconfigure/collector-kafka08/src/main/java/zipkin2/autoconfigure/collector/kafka08/ZipkinKafkaCollectorProperties.java b/zipkin-autoconfigure/collector-kafka08/src/main/java/zipkin2/autoconfigure/collector/kafka08/ZipkinKafkaCollectorProperties.java
deleted file mode 100644
index 28e9663..0000000
--- a/zipkin-autoconfigure/collector-kafka08/src/main/java/zipkin2/autoconfigure/collector/kafka08/ZipkinKafkaCollectorProperties.java
+++ /dev/null
@@ -1,90 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package zipkin2.autoconfigure.collector.kafka08;
-
-import java.util.LinkedHashMap;
-import java.util.Map;
-import org.springframework.boot.context.properties.ConfigurationProperties;
-import zipkin2.collector.kafka08.KafkaCollector;
-
-@ConfigurationProperties("zipkin.collector.kafka")
-class ZipkinKafkaCollectorProperties {
- private String topic = "zipkin";
- private String zookeeper;
- private String groupId = "zipkin";
- private int streams = 1;
- private int maxMessageSize = 1024 * 1024;
- private Map<String, String> overrides = new LinkedHashMap<>();
-
- public String getTopic() {
- return topic;
- }
-
- public void setTopic(String topic) {
- this.topic = topic;
- }
-
- public String getZookeeper() {
- return zookeeper;
- }
-
- public void setZookeeper(String zookeeper) {
- this.zookeeper = "".equals(zookeeper) ? null : zookeeper;
- }
-
- public String getGroupId() {
- return groupId;
- }
-
- public void setGroupId(String groupId) {
- this.groupId = groupId;
- }
-
- public int getStreams() {
- return streams;
- }
-
- public void setStreams(int streams) {
- this.streams = streams;
- }
-
- public int getMaxMessageSize() {
- return maxMessageSize;
- }
-
- public void setMaxMessageSize(int maxMessageSize) {
- this.maxMessageSize = maxMessageSize;
- }
-
- public Map<String, String> getOverrides() {
- return overrides;
- }
-
- public void setOverrides(Map<String, String> overrides) {
- this.overrides = overrides;
- }
-
- public KafkaCollector.Builder toBuilder() {
- return KafkaCollector.builder()
- .topic(topic)
- .zookeeper(zookeeper)
- .groupId(groupId)
- .streams(streams)
- .maxMessageSize(maxMessageSize)
- .overrides(overrides);
- }
-}
diff --git a/zipkin-autoconfigure/collector-kafka08/src/main/resources/META-INF/spring.factories b/zipkin-autoconfigure/collector-kafka08/src/main/resources/META-INF/spring.factories
deleted file mode 100644
index 9daca53..0000000
--- a/zipkin-autoconfigure/collector-kafka08/src/main/resources/META-INF/spring.factories
+++ /dev/null
@@ -1,2 +0,0 @@
-org.springframework.boot.autoconfigure.EnableAutoConfiguration=\
-zipkin2.autoconfigure.collector.kafka08.ZipkinKafka08CollectorAutoConfiguration
diff --git a/zipkin-autoconfigure/collector-kafka08/src/main/resources/zipkin-server-kafka08.yml b/zipkin-autoconfigure/collector-kafka08/src/main/resources/zipkin-server-kafka08.yml
deleted file mode 100644
index 04ccf25..0000000
--- a/zipkin-autoconfigure/collector-kafka08/src/main/resources/zipkin-server-kafka08.yml
+++ /dev/null
@@ -1,7 +0,0 @@
-zipkin:
- collector:
- kafka:
- # ZooKeeper host string, comma-separated host:port value.
- zookeeper: ${KAFKA_ZOOKEEPER:}
- # Maximum size of a message containing spans in bytes
- max-message-size: ${KAFKA_MAX_MESSAGE_SIZE:1048576}
diff --git a/zipkin-autoconfigure/collector-kafka08/src/test/java/zipkin2/autoconfigure/collector/kafka08/Access.java b/zipkin-autoconfigure/collector-kafka08/src/test/java/zipkin2/autoconfigure/collector/kafka08/Access.java
deleted file mode 100644
index 1ceeb0d..0000000
--- a/zipkin-autoconfigure/collector-kafka08/src/test/java/zipkin2/autoconfigure/collector/kafka08/Access.java
+++ /dev/null
@@ -1,42 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package zipkin2.autoconfigure.collector.kafka08;
-
-import org.springframework.boot.autoconfigure.context.PropertyPlaceholderAutoConfiguration;
-import org.springframework.boot.context.properties.EnableConfigurationProperties;
-import org.springframework.context.annotation.AnnotationConfigApplicationContext;
-import org.springframework.context.annotation.Configuration;
-import zipkin2.collector.kafka08.KafkaCollector;
-
-/** opens package access for testing */
-public final class Access {
-
- /** Just registering properties to avoid automatically connecting to a Kafka server */
- public static void registerKafkaProperties(AnnotationConfigApplicationContext context) {
- context.register(
- PropertyPlaceholderAutoConfiguration.class, EnableKafkaCollectorProperties.class);
- }
-
- @Configuration
- @EnableConfigurationProperties(ZipkinKafkaCollectorProperties.class)
- static class EnableKafkaCollectorProperties {}
-
- public static KafkaCollector.Builder collectorBuilder(
- AnnotationConfigApplicationContext context) {
- return context.getBean(ZipkinKafkaCollectorProperties.class).toBuilder();
- }
-}
diff --git a/zipkin-autoconfigure/collector-kafka08/src/test/java/zipkin2/autoconfigure/collector/kafka08/ZipkinKafka08CollectorAutoConfigurationTest.java b/zipkin-autoconfigure/collector-kafka08/src/test/java/zipkin2/autoconfigure/collector/kafka08/ZipkinKafka08CollectorAutoConfigurationTest.java
deleted file mode 100644
index b8c2b81..0000000
--- a/zipkin-autoconfigure/collector-kafka08/src/test/java/zipkin2/autoconfigure/collector/kafka08/ZipkinKafka08CollectorAutoConfigurationTest.java
+++ /dev/null
@@ -1,111 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package zipkin2.autoconfigure.collector.kafka08;
-
-import org.junit.After;
-import org.junit.Rule;
-import org.junit.Test;
-import org.junit.rules.ExpectedException;
-import org.springframework.beans.factory.NoSuchBeanDefinitionException;
-import org.springframework.boot.autoconfigure.context.PropertyPlaceholderAutoConfiguration;
-import org.springframework.boot.test.util.TestPropertyValues;
-import org.springframework.context.annotation.AnnotationConfigApplicationContext;
-import org.springframework.context.annotation.Bean;
-import org.springframework.context.annotation.Configuration;
-import zipkin2.collector.Collector;
-import zipkin2.collector.CollectorMetrics;
-import zipkin2.collector.CollectorSampler;
-import zipkin2.collector.kafka08.KafkaCollector;
-import zipkin2.storage.InMemoryStorage;
-import zipkin2.storage.StorageComponent;
-
-import static org.assertj.core.api.Assertions.assertThat;
-
-public class ZipkinKafka08CollectorAutoConfigurationTest {
-
- @Rule public ExpectedException thrown = ExpectedException.none();
-
- AnnotationConfigApplicationContext context;
-
- @After
- public void close() {
- if (context != null) {
- context.close();
- }
- }
-
- @Test
- public void doesntProvidesCollectorComponent_whenKafkaZooKeeperUnset() {
- context = new AnnotationConfigApplicationContext();
- context.register(
- PropertyPlaceholderAutoConfiguration.class,
- ZipkinKafka08CollectorAutoConfiguration.class,
- InMemoryConfiguration.class);
- context.refresh();
-
- thrown.expect(NoSuchBeanDefinitionException.class);
- context.getBean(Collector.class);
- }
-
- @Test
- public void providesCollectorComponent_whenZooKeeperSet() {
- context = new AnnotationConfigApplicationContext();
- TestPropertyValues.of("zipkin.collector.kafka.zookeeper:localhost").applyTo(context);
- context.register(
- PropertyPlaceholderAutoConfiguration.class,
- ZipkinKafka08CollectorAutoConfiguration.class,
- InMemoryConfiguration.class);
- context.refresh();
-
- assertThat(context.getBean(KafkaCollector.class)).isNotNull();
- }
-
- @Test
- public void canOverrideProperty_topic() {
- context = new AnnotationConfigApplicationContext();
- TestPropertyValues.of(
- "zipkin.collector.kafka.zookeeper:localhost",
- "zipkin.collector.kafka.topic:zapkin")
- .applyTo(context);
- context.register(
- PropertyPlaceholderAutoConfiguration.class,
- ZipkinKafka08CollectorAutoConfiguration.class,
- InMemoryConfiguration.class);
- context.refresh();
-
- assertThat(context.getBean(ZipkinKafkaCollectorProperties.class).getTopic())
- .isEqualTo("zapkin");
- }
-
- @Configuration
- static class InMemoryConfiguration {
- @Bean
- CollectorSampler sampler() {
- return CollectorSampler.ALWAYS_SAMPLE;
- }
-
- @Bean
- CollectorMetrics metrics() {
- return CollectorMetrics.NOOP_METRICS;
- }
-
- @Bean
- StorageComponent storage() {
- return InMemoryStorage.newBuilder().build();
- }
- }
-}
diff --git a/zipkin-autoconfigure/collector-kafka08/src/test/java/zipkin2/collector/kafka/v1/NestedPropertyOverrideTest.java b/zipkin-autoconfigure/collector-kafka08/src/test/java/zipkin2/collector/kafka/v1/NestedPropertyOverrideTest.java
deleted file mode 100644
index 88a490c..0000000
--- a/zipkin-autoconfigure/collector-kafka08/src/test/java/zipkin2/collector/kafka/v1/NestedPropertyOverrideTest.java
+++ /dev/null
@@ -1,46 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package zipkin2.collector.kafka08;
-
-import org.junit.Test;
-import org.springframework.boot.test.util.TestPropertyValues;
-import org.springframework.context.annotation.AnnotationConfigApplicationContext;
-import zipkin2.autoconfigure.collector.kafka08.Access;
-import zipkin2.storage.InMemoryStorage;
-
-import static org.assertj.core.api.Assertions.assertThat;
-
-public class NestedPropertyOverrideTest {
- @Test
- public void overrideWithNestedProperties() {
- AnnotationConfigApplicationContext context = new AnnotationConfigApplicationContext();
- TestPropertyValues.of(
- "zipkin.collector.kafka.zookeeper:localhost",
- "zipkin.collector.kafka.overrides.auto.offset.reset:largest").applyTo(context);
- Access.registerKafkaProperties(context);
- context.refresh();
-
- assertThat(
- Access.collectorBuilder(context)
- .storage(InMemoryStorage.newBuilder().build())
- .build()
- .connector
- .config
- .autoOffsetReset())
- .isEqualTo("largest");
- }
-}
diff --git a/zipkin-autoconfigure/pom.xml b/zipkin-autoconfigure/pom.xml
index cc1c81e..5cc3810 100644
--- a/zipkin-autoconfigure/pom.xml
+++ b/zipkin-autoconfigure/pom.xml
@@ -37,7 +37,6 @@
</properties>
<modules>
- <module>collector-kafka08</module>
<module>collector-scribe</module>
</modules>
diff --git a/zipkin-collector/kafka/README.md b/zipkin-collector/kafka/README.md
index 9b459a0..49eed4d 100644
--- a/zipkin-collector/kafka/README.md
+++ b/zipkin-collector/kafka/README.md
@@ -1,4 +1,4 @@
-# collector-kafka10
+# collector-kafka
## KafkaCollector
This collector is implemented as a Kafka consumer supporting Kafka brokers running
diff --git a/zipkin-collector/kafka08/README.md b/zipkin-collector/kafka08/README.md
deleted file mode 100644
index 142f53f..0000000
--- a/zipkin-collector/kafka08/README.md
+++ /dev/null
@@ -1,41 +0,0 @@
-# collector-kafka
-
-## KafkaCollector
-This collector polls a Kafka 8.2.2+ topic for messages that contain
-a list of spans in json or TBinaryProtocol big-endian encoding. These
-spans are pushed to a span consumer.
-
-`zipkin2.collector.kafka08.KafkaCollector.Builder` includes defaults that will
-operate against a Kafka topic advertised in Zookeeper.
-
-## Encoding spans into Kafka messages
-The message's binary data includes a list of spans. Supported encodings
-are the same as the http [POST /spans](http://zipkin.io/zipkin-api/#/paths/%252Fspans) body.
-
-### Json
-The message's binary data is a list of spans in json. The first character must be '[' (decimal 91).
-
-`Codec.JSON.writeSpans(spans)` performs the correct json encoding.
-
-Here's an example, sending a list of a single span to the zipkin topic:
-
-```bash
-$ kafka-console-producer.sh --broker-list $ADVERTISED_HOST:9092 --topic zipkin
-[{"traceId":"1","name":"bang","id":"2","timestamp":1234,"binaryAnnotations":[{"key":"lc","value":"bamm-bamm","endpoint":{"serviceName":"flintstones","ipv4":"127.0.0.1"}}]}]
-```
-
-### Thrift
-The message's binary data includes a list header followed by N spans serialized in TBinaryProtocol
-
-`Codec.THRIFT.writeSpans(spans)` encodes spans in the following fashion:
-```
-write_byte(12) // type of the list elements: 12 == struct
-write_i32(count) // count of spans that will follow
-for (int i = 0; i < count; i++) {
- writeTBinaryProtocol(spans(i))
-}
-```
-
-### Legacy encoding
-Older versions of zipkin accepted a single span per message, as opposed
-to a list per message. This practice is deprecated, but still supported.
diff --git a/zipkin-collector/kafka08/pom.xml b/zipkin-collector/kafka08/pom.xml
deleted file mode 100644
index 10d80a0..0000000
--- a/zipkin-collector/kafka08/pom.xml
+++ /dev/null
@@ -1,59 +0,0 @@
-<?xml version="1.0" encoding="UTF-8"?>
-<!--
-
- Licensed to the Apache Software Foundation (ASF) under one or more
- contributor license agreements. See the NOTICE file distributed with
- this work for additional information regarding copyright ownership.
- The ASF licenses this file to You under the Apache License, Version 2.0
- (the "License"); you may not use this file except in compliance with
- the License. You may obtain a copy of the License at
-
- http://www.apache.org/licenses/LICENSE-2.0
-
- Unless required by applicable law or agreed to in writing, software
- distributed under the License is distributed on an "AS IS" BASIS,
- WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- See the License for the specific language governing permissions and
- limitations under the License.
-
--->
-<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
- <modelVersion>4.0.0</modelVersion>
-
- <parent>
- <groupId>org.apache.zipkin.zipkin2</groupId>
- <artifactId>zipkin-collector-parent</artifactId>
- <version>2.13.1-SNAPSHOT</version>
- </parent>
-
- <artifactId>zipkin-collector-kafka08</artifactId>
- <name>Collector: Kafka (Legacy)</name>
-
- <properties>
- <main.basedir>${project.basedir}/../..</main.basedir>
- <!-- This is pinned to Kafka 0.8.x client as 0.9.x brokers work with them, but not visa-versa
- http://docs.confluent.io/2.0.0/upgrade.html -->
- <kafka.version>0.8.2.2</kafka.version>
- </properties>
-
- <dependencies>
- <dependency>
- <groupId>${project.groupId}</groupId>
- <artifactId>zipkin-collector</artifactId>
- <version>${project.version}</version>
- </dependency>
-
- <dependency>
- <groupId>org.apache.kafka</groupId>
- <artifactId>kafka_2.11</artifactId>
- <version>${kafka.version}</version>
- <exclusions>
- <!-- don't eagerly bind slf4j -->
- <exclusion>
- <groupId>org.slf4j</groupId>
- <artifactId>slf4j-log4j12</artifactId>
- </exclusion>
- </exclusions>
- </dependency>
- </dependencies>
-</project>
diff --git a/zipkin-collector/kafka08/src/main/java/zipkin2/collector/kafka08/KafkaCollector.java b/zipkin-collector/kafka08/src/main/java/zipkin2/collector/kafka08/KafkaCollector.java
deleted file mode 100644
index b68cc03..0000000
--- a/zipkin-collector/kafka08/src/main/java/zipkin2/collector/kafka08/KafkaCollector.java
+++ /dev/null
@@ -1,277 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package zipkin2.collector.kafka08;
-
-import java.util.LinkedHashMap;
-import java.util.Map;
-import java.util.Properties;
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Executors;
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.atomic.AtomicReference;
-import kafka.consumer.ConsumerConfig;
-import kafka.consumer.KafkaStream;
-import kafka.javaapi.consumer.ZookeeperConsumerConnector;
-import zipkin2.CheckResult;
-import zipkin2.collector.Collector;
-import zipkin2.collector.CollectorComponent;
-import zipkin2.collector.CollectorMetrics;
-import zipkin2.collector.CollectorSampler;
-import zipkin2.storage.SpanConsumer;
-import zipkin2.storage.StorageComponent;
-
-import static kafka.consumer.Consumer.createJavaConsumerConnector;
-import static org.apache.kafka.clients.consumer.ConsumerConfig.AUTO_OFFSET_RESET_CONFIG;
-import static org.apache.kafka.clients.consumer.ConsumerConfig.GROUP_ID_CONFIG;
-
-/**
- * This collector polls a Kafka topic for messages that contain TBinaryProtocol big-endian encoded
- * lists of spans. These spans are pushed to a {@link SpanConsumer#accept span consumer}.
- *
- * <p>This collector remains a Kafka 0.8.x consumer, while Zipkin systems update to 0.9+.
- */
-public final class KafkaCollector extends CollectorComponent {
-
- public static Builder builder() {
- return new Builder();
- }
-
- /** Configuration including defaults needed to consume spans from a Kafka topic. */
- public static final class Builder extends CollectorComponent.Builder {
- final Properties properties = new Properties();
- Collector.Builder delegate = Collector.newBuilder(KafkaCollector.class);
- CollectorMetrics metrics = CollectorMetrics.NOOP_METRICS;
- String topic = "zipkin";
- int streams = 1;
-
- @Override
- public Builder storage(StorageComponent storage) {
- delegate.storage(storage);
- return this;
- }
-
- @Override
- public Builder sampler(CollectorSampler sampler) {
- delegate.sampler(sampler);
- return this;
- }
-
- @Override
- public Builder metrics(CollectorMetrics metrics) {
- if (metrics == null) throw new NullPointerException("metrics == null");
- this.metrics = metrics.forTransport("kafka");
- delegate.metrics(this.metrics);
- return this;
- }
-
- /** Topic zipkin spans will be consumed from. Defaults to "zipkin" */
- public Builder topic(String topic) {
- if (topic == null) throw new NullPointerException("topic == null");
- this.topic = topic;
- return this;
- }
-
- /** The zookeeper connect string, ex. 127.0.0.1:2181. No default */
- public Builder zookeeper(String zookeeper) {
- if (zookeeper == null) throw new NullPointerException("zookeeper == null");
- properties.put("zookeeper.connect", zookeeper);
- return this;
- }
-
- /** The consumer group this process is consuming on behalf of. Defaults to "zipkin" */
- public Builder groupId(String groupId) {
- if (groupId == null) throw new NullPointerException("groupId == null");
- properties.put(GROUP_ID_CONFIG, groupId);
- return this;
- }
-
- /** Count of threads/streams consuming the topic. Defaults to 1 */
- public Builder streams(int streams) {
- this.streams = streams;
- return this;
- }
-
- /** Maximum size of a message containing spans in bytes. Defaults to 1 MiB */
- public Builder maxMessageSize(int bytes) {
- properties.put("fetch.message.max.bytes", String.valueOf(bytes));
- return this;
- }
-
- /**
- * By default, a consumer will be built from properties derived from builder defaults, as well
- * "auto.offset.reset" -> "smallest". Any properties set here will override the consumer config.
- *
- * <p>For example: Only consume spans since you connected by setting the below.
- *
- * <pre>{@code
- * Map<String, String> overrides = new LinkedHashMap<>();
- * overrides.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "largest");
- * builder.overrides(overrides);
- * }</pre>
- *
- * @see org.apache.kafka.clients.consumer.ConsumerConfig
- */
- public final Builder overrides(Map<String, ?> overrides) {
- if (overrides == null) throw new NullPointerException("overrides == null");
- properties.putAll(overrides);
- return this;
- }
-
- @Override
- public KafkaCollector build() {
- return new KafkaCollector(this);
- }
-
- Builder() {
- // Settings below correspond to "Old Consumer Configs"
- // http://kafka.apache.org/documentation.html
- properties.put(GROUP_ID_CONFIG, "zipkin");
- properties.put("fetch.message.max.bytes", String.valueOf(1024 * 1024));
- // Same default as zipkin-scala, and keeps tests from hanging
- properties.put(AUTO_OFFSET_RESET_CONFIG, "smallest");
- }
- }
-
- final LazyConnector connector;
- final LazyStreams streams;
-
- KafkaCollector(Builder builder) {
- connector = new LazyConnector(builder);
- streams = new LazyStreams(builder, connector);
- }
-
- @Override
- public KafkaCollector start() {
- connector.get();
- streams.get();
- return this;
- }
-
- @Override
- public CheckResult check() {
- try {
- connector.get(); // make sure the connector didn't throw
- CheckResult failure = streams.failure.get(); // check the streams didn't quit
- if (failure != null) return failure;
- return CheckResult.OK;
- } catch (RuntimeException e) {
- return CheckResult.failed(e);
- }
- }
-
- static final class LazyConnector {
-
- final ConsumerConfig config;
- volatile ZookeeperConsumerConnector connector;
-
- LazyConnector(Builder builder) {
- this.config = new ConsumerConfig(builder.properties);
- }
-
- ZookeeperConsumerConnector get() {
- if (connector == null) {
- synchronized (this) {
- if (connector == null) {
- connector = (ZookeeperConsumerConnector) createJavaConsumerConnector(config);
- }
- }
- }
- return connector;
- }
-
- void close() {
- ZookeeperConsumerConnector maybeConnector = connector;
- if (maybeConnector == null) return;
- maybeConnector.shutdown();
- }
- }
-
- @Override
- public void close() {
- streams.close();
- connector.close();
- }
-
- static final class LazyStreams {
- final int streams;
- final String topic;
- final Collector collector;
- final CollectorMetrics metrics;
- final LazyConnector connector;
- final AtomicReference<CheckResult> failure = new AtomicReference<>();
- volatile ExecutorService pool;
-
- LazyStreams(Builder builder, LazyConnector connector) {
- this.streams = builder.streams;
- this.topic = builder.topic;
- this.collector = builder.delegate.build();
- this.metrics = builder.metrics;
- this.connector = connector;
- }
-
- ExecutorService get() {
- if (pool == null) {
- synchronized (this) {
- if (pool == null) {
- pool = compute();
- }
- }
- }
- return pool;
- }
-
- void close() {
- ExecutorService maybePool = pool;
- if (maybePool == null) return;
- maybePool.shutdownNow();
- try {
- maybePool.awaitTermination(1, TimeUnit.SECONDS);
- } catch (InterruptedException e) {
- // at least we tried
- }
- }
-
- ExecutorService compute() {
- ExecutorService pool =
- streams == 1
- ? Executors.newSingleThreadExecutor()
- : Executors.newFixedThreadPool(streams);
-
- Map<String, Integer> topicCountMap = new LinkedHashMap<>(1);
- topicCountMap.put(topic, streams);
-
- for (KafkaStream<byte[], byte[]> stream :
- connector.get().createMessageStreams(topicCountMap).get(topic)) {
- pool.execute(guardFailures(new KafkaStreamProcessor(stream, collector, metrics)));
- }
- return pool;
- }
-
- Runnable guardFailures(final Runnable delegate) {
- return new Runnable() {
- @Override
- public void run() {
- try {
- delegate.run();
- } catch (RuntimeException e) {
- failure.set(CheckResult.failed(e));
- }
- }
- };
- }
- }
-}
diff --git a/zipkin-collector/kafka08/src/main/java/zipkin2/collector/kafka08/KafkaStreamProcessor.java b/zipkin-collector/kafka08/src/main/java/zipkin2/collector/kafka08/KafkaStreamProcessor.java
deleted file mode 100644
index 4232c2e..0000000
--- a/zipkin-collector/kafka08/src/main/java/zipkin2/collector/kafka08/KafkaStreamProcessor.java
+++ /dev/null
@@ -1,84 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package zipkin2.collector.kafka08;
-
-import java.util.Collections;
-import kafka.consumer.ConsumerIterator;
-import kafka.consumer.KafkaStream;
-import zipkin2.Callback;
-import zipkin2.Span;
-import zipkin2.codec.SpanBytesDecoder;
-import zipkin2.collector.Collector;
-import zipkin2.collector.CollectorMetrics;
-
-/** Consumes spans from Kafka messages, ignoring malformed input */
-final class KafkaStreamProcessor implements Runnable {
- static final Callback<Void> NOOP =
- new Callback<Void>() {
- @Override
- public void onSuccess(Void value) {}
-
- @Override
- public void onError(Throwable t) {}
- };
-
- final KafkaStream<byte[], byte[]> stream;
- final Collector collector;
- final CollectorMetrics metrics;
-
- KafkaStreamProcessor(
- KafkaStream<byte[], byte[]> stream, Collector collector, CollectorMetrics metrics) {
- this.stream = stream;
- this.collector = collector;
- this.metrics = metrics;
- }
-
- @Override
- public void run() {
- ConsumerIterator<byte[], byte[]> messages = stream.iterator();
- while (messages.hasNext()) {
- byte[] bytes = messages.next().message();
- metrics.incrementMessages();
- metrics.incrementBytes(bytes.length);
- if (bytes.length == 0) continue; // lenient on empty messages
-
- if (bytes.length < 2) { // need two bytes to check if protobuf
- metrics.incrementMessagesDropped();
- continue;
- }
-
- // If we received legacy single-span encoding, decode it into a singleton list
- if (!protobuf3(bytes) && bytes[0] <= 16 && bytes[0] != 12 /* thrift, but not a list */) {
- Span span;
- try {
- span = SpanBytesDecoder.THRIFT.decodeOne(bytes);
- } catch (RuntimeException e) {
- metrics.incrementMessagesDropped();
- continue;
- }
- collector.accept(Collections.singletonList(span), NOOP);
- } else {
- collector.acceptSpans(bytes, NOOP);
- }
- }
- }
-
- /* span key or trace ID key */
- static boolean protobuf3(byte[] bytes) {
- return bytes[0] == 10 && bytes[1] != 0; // varint follows and won't be zero
- }
-}
diff --git a/zipkin-collector/kafka08/src/test/java/zipkin2/collector/kafka08/ITKafkaCollector.java b/zipkin-collector/kafka08/src/test/java/zipkin2/collector/kafka08/ITKafkaCollector.java
deleted file mode 100644
index 290a71e..0000000
--- a/zipkin-collector/kafka08/src/test/java/zipkin2/collector/kafka08/ITKafkaCollector.java
+++ /dev/null
@@ -1,250 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package zipkin2.collector.kafka08;
-
-import java.util.Arrays;
-import java.util.List;
-import java.util.concurrent.LinkedBlockingQueue;
-import java.util.concurrent.atomic.AtomicInteger;
-import kafka.javaapi.producer.Producer;
-import kafka.producer.KeyedMessage;
-import org.I0Itec.zkclient.exception.ZkTimeoutException;
-import org.junit.ClassRule;
-import org.junit.Rule;
-import org.junit.Test;
-import org.junit.rules.ExpectedException;
-import org.junit.rules.Timeout;
-import zipkin2.Call;
-import zipkin2.Callback;
-import zipkin2.CheckResult;
-import zipkin2.Span;
-import zipkin2.TestObjects;
-import zipkin2.codec.SpanBytesEncoder;
-import zipkin2.collector.InMemoryCollectorMetrics;
-import zipkin2.collector.kafka08.KafkaCollector.Builder;
-import zipkin2.storage.SpanConsumer;
-import zipkin2.storage.SpanStore;
-import zipkin2.storage.StorageComponent;
-
-import static org.assertj.core.api.Assertions.assertThat;
-import static zipkin2.TestObjects.CLIENT_SPAN;
-import static zipkin2.TestObjects.UTF_8;
-import static zipkin2.codec.SpanBytesEncoder.THRIFT;
-
-public class ITKafkaCollector {
- @Rule public ExpectedException thrown = ExpectedException.none();
- @ClassRule public static Timeout globalTimeout = Timeout.seconds(20);
-
- List<Span> spans = Arrays.asList(TestObjects.LOTS_OF_SPANS[0], TestObjects.LOTS_OF_SPANS[1]);
-
- Producer<String, byte[]> producer = KafkaTestGraph.INSTANCE.producer();
- InMemoryCollectorMetrics metrics = new InMemoryCollectorMetrics();
- InMemoryCollectorMetrics kafkaMetrics = metrics.forTransport("kafka");
-
- LinkedBlockingQueue<List<Span>> recvdSpans = new LinkedBlockingQueue<>();
- SpanConsumer consumer = (spans) -> {
- recvdSpans.add(spans);
- return Call.create(null);
- };
-
- @Test
- public void checkPasses() {
- try (KafkaCollector collector = newKafkaTransport(builder("check_passes"), consumer)) {
- assertThat(collector.check().ok()).isTrue();
- }
- }
-
- @Test
- public void start_failsOnInvalidZooKeeper() {
- thrown.expect(ZkTimeoutException.class);
- thrown.expectMessage("Unable to connect to zookeeper server within timeout: 6000");
-
- Builder builder = builder("fail_invalid_zk").zookeeper("1.1.1.1");
-
- try (KafkaCollector collector = newKafkaTransport(builder, consumer)) {}
- }
-
- @Test
- public void canSetMaxMessageSize() {
- Builder builder = builder("max_message").maxMessageSize(1);
-
- try (KafkaCollector collector = newKafkaTransport(builder, consumer)) {
- assertThat(collector.connector.get().config().fetchMessageMaxBytes()).isEqualTo(1);
- }
- }
-
- /** Ensures legacy encoding works: a single TBinaryProtocol encoded span */
- @Test
- public void messageWithSingleThriftSpan() throws Exception {
- Builder builder = builder("single_span");
-
- byte[] bytes = THRIFT.encode(CLIENT_SPAN);
- producer.send(new KeyedMessage<>(builder.topic, bytes));
-
- try (KafkaCollector collector = newKafkaTransport(builder, consumer)) {
- assertThat(recvdSpans.take()).containsExactly(CLIENT_SPAN);
- }
-
- assertThat(kafkaMetrics.messages()).isEqualTo(1);
- assertThat(kafkaMetrics.messagesDropped()).isZero();
- assertThat(kafkaMetrics.bytes()).isEqualTo(bytes.length);
- assertThat(kafkaMetrics.spans()).isEqualTo(1);
- assertThat(kafkaMetrics.spansDropped()).isZero();
- }
-
- /** Ensures list encoding works: a TBinaryProtocol encoded list of spans */
- @Test
- public void messageWithMultipleSpans_thrift() throws Exception {
- messageWithMultipleSpans(builder("multiple_spans_thrift"), THRIFT);
- }
-
- /** Ensures list encoding works: a json encoded list of spans */
- @Test
- public void messageWithMultipleSpans_json() throws Exception {
- messageWithMultipleSpans(builder("multiple_spans_json"), SpanBytesEncoder.JSON_V1);
- }
-
- /** Ensures list encoding works: a version 2 json list of spans */
- @Test
- public void messageWithMultipleSpans_json2() throws Exception {
- messageWithMultipleSpans(builder("multiple_spans_json2"), SpanBytesEncoder.JSON_V2);
- }
-
- /** Ensures list encoding works: proto3 ListOfSpans */
- @Test
- public void messageWithMultipleSpans_proto3() throws Exception {
- messageWithMultipleSpans(builder("multiple_spans_proto3"), SpanBytesEncoder.PROTO3);
- }
-
- void messageWithMultipleSpans(Builder builder, SpanBytesEncoder encoder) throws Exception {
- byte[] message = encoder.encodeList(spans);
-
- producer.send(new KeyedMessage<>(builder.topic, message));
-
- try (KafkaCollector collector = newKafkaTransport(builder, consumer)) {
- assertThat(recvdSpans.take()).containsAll(spans);
- }
-
- assertThat(kafkaMetrics.messages()).isEqualTo(1);
- assertThat(kafkaMetrics.messagesDropped()).isZero();
- assertThat(kafkaMetrics.bytes()).isEqualTo(message.length);
- assertThat(kafkaMetrics.spans()).isEqualTo(spans.size());
- assertThat(kafkaMetrics.spansDropped()).isZero();
- }
-
- /** Ensures malformed spans don't hang the collector */
- @Test
- public void skipsMalformedData() throws Exception {
- Builder builder = builder("decoder_exception");
-
- byte[] malformed1 = "[\"='".getBytes(UTF_8); // screwed up json
- byte[] malformed2 = "malformed".getBytes(UTF_8);
- producer.send(new KeyedMessage<>(builder.topic, THRIFT.encodeList(spans)));
- producer.send(new KeyedMessage<>(builder.topic, new byte[0]));
- producer.send(new KeyedMessage<>(builder.topic, malformed1));
- producer.send(new KeyedMessage<>(builder.topic, malformed2));
- producer.send(new KeyedMessage<>(builder.topic, THRIFT.encodeList(spans)));
-
- try (KafkaCollector collector = newKafkaTransport(builder, consumer)) {
- assertThat(recvdSpans.take()).containsExactlyElementsOf(spans);
- // the only way we could read this, is if the malformed spans were skipped.
- assertThat(recvdSpans.take()).containsExactlyElementsOf(spans);
- }
-
- assertThat(kafkaMetrics.messages()).isEqualTo(5);
- assertThat(kafkaMetrics.messagesDropped()).isEqualTo(2); // only malformed, not empty
- assertThat(kafkaMetrics.bytes())
- .isEqualTo(THRIFT.encodeList(spans).length * 2 + malformed1.length + malformed2.length);
- assertThat(kafkaMetrics.spans()).isEqualTo(spans.size() * 2);
- assertThat(kafkaMetrics.spansDropped()).isZero();
- }
-
- /** Guards against errors that leak from storage, such as InvalidQueryException */
- @Test
- public void skipsOnStorageException() throws Exception {
- Builder builder = builder("storage_exception");
-
- AtomicInteger counter = new AtomicInteger();
- consumer = (input) -> new Call.Base<Void>() {
-
- @Override protected Void doExecute() {
- throw new AssertionError();
- }
-
- @Override protected void doEnqueue(Callback<Void> callback) {
- if (counter.getAndIncrement() == 1) {
- callback.onError(new RuntimeException("storage fell over"));
- } else {
- recvdSpans.add(spans);
- callback.onSuccess(null);
- }
- }
-
- @Override public Call<Void> clone() {
- throw new AssertionError();
- }
- };
-
- producer.send(new KeyedMessage<>(builder.topic, THRIFT.encodeList(spans)));
- producer.send(new KeyedMessage<>(builder.topic, THRIFT.encodeList(spans))); // tossed on error
- producer.send(new KeyedMessage<>(builder.topic, THRIFT.encodeList(spans)));
-
- try (KafkaCollector collector = newKafkaTransport(builder, consumer)) {
- assertThat(recvdSpans.take()).containsExactlyElementsOf(spans);
- // the only way we could read this, is if the malformed span was skipped.
- assertThat(recvdSpans.take()).containsExactlyElementsOf(spans);
- }
-
- assertThat(kafkaMetrics.messages()).isEqualTo(3);
- assertThat(kafkaMetrics.messagesDropped()).isZero(); // storage failure isn't a message failure
- assertThat(kafkaMetrics.bytes()).isEqualTo(THRIFT.encodeList(spans).length * 3);
- assertThat(kafkaMetrics.spans()).isEqualTo(spans.size() * 3);
- assertThat(kafkaMetrics.spansDropped()).isEqualTo(spans.size()); // only one dropped
- }
-
- Builder builder(String topic) {
- return new Builder().metrics(metrics).zookeeper("127.0.0.1:2181").topic(topic);
- }
-
- KafkaCollector newKafkaTransport(Builder builder, SpanConsumer consumer) {
- return new KafkaCollector(builder.storage(buildStorage(consumer))).start();
- }
-
- StorageComponent buildStorage(final SpanConsumer spanConsumer) {
- return new StorageComponent() {
- @Override
- public SpanStore spanStore() {
- throw new AssertionError();
- }
-
- @Override
- public SpanConsumer spanConsumer() {
- return spanConsumer;
- }
-
- @Override
- public CheckResult check() {
- return CheckResult.OK;
- }
-
- @Override
- public void close() {
- throw new AssertionError();
- }
- };
- }
-}
diff --git a/zipkin-collector/kafka08/src/test/java/zipkin2/collector/kafka08/KafkaTestGraph.java b/zipkin-collector/kafka08/src/test/java/zipkin2/collector/kafka08/KafkaTestGraph.java
deleted file mode 100644
index 3ff9beb..0000000
--- a/zipkin-collector/kafka08/src/test/java/zipkin2/collector/kafka08/KafkaTestGraph.java
+++ /dev/null
@@ -1,51 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package zipkin2.collector.kafka08;
-
-import java.util.Properties;
-import kafka.common.FailedToSendMessageException;
-import kafka.javaapi.producer.Producer;
-import kafka.producer.KeyedMessage;
-import kafka.producer.ProducerConfig;
-import org.I0Itec.zkclient.ZkClient;
-import org.I0Itec.zkclient.exception.ZkTimeoutException;
-import org.junit.AssumptionViolatedException;
-
-/** Tests only execute when ZK and Kafka are listening on 127.0.0.1 on default ports. */
-enum KafkaTestGraph {
- INSTANCE;
-
- private AssumptionViolatedException ex;
- private Producer<String, byte[]> producer;
-
- synchronized Producer<String, byte[]> producer() {
- if (ex != null) throw ex;
- if (this.producer == null) {
- Properties producerProps = new Properties();
- producerProps.put("metadata.broker.list", "127.0.0.1:9092");
- producerProps.put("producer.type", "sync");
- producer = new Producer<>(new ProducerConfig(producerProps));
- try {
- new ZkClient("127.0.0.1:2181", 1000);
- producer.send(new KeyedMessage<>("test", new byte[0]));
- } catch (FailedToSendMessageException | ZkTimeoutException e) {
- throw ex = new AssumptionViolatedException(e.getMessage(), e);
- }
- }
- return producer;
- }
-}
diff --git a/zipkin-collector/kafka08/src/test/resources/log4j.properties b/zipkin-collector/kafka08/src/test/resources/log4j.properties
deleted file mode 100644
index 15345f3..0000000
--- a/zipkin-collector/kafka08/src/test/resources/log4j.properties
+++ /dev/null
@@ -1,7 +0,0 @@
-# By default, everything goes to console and file
-log4j.rootLogger=WARN, A1
-# A1 is set to be a ConsoleAppender.
-log4j.appender.A1=org.apache.log4j.ConsoleAppender
-log4j.appender.A1.layout=org.apache.log4j.PatternLayout
-log4j.appender.A1.layout.ConversionPattern=%d [%t] %-5p %c - %m%n
-log4j.appender.A1.ImmediateFlush=true
diff --git a/zipkin-collector/kafka08/src/test/resources/log4j2.properties b/zipkin-collector/kafka08/src/test/resources/log4j2.properties
deleted file mode 100755
index c437666..0000000
--- a/zipkin-collector/kafka08/src/test/resources/log4j2.properties
+++ /dev/null
@@ -1,11 +0,0 @@
-appenders=console
-appender.console.type=Console
-appender.console.name=STDOUT
-appender.console.layout.type=PatternLayout
-appender.console.layout.pattern=%d{ABSOLUTE} %-5p [%t] %C{2} (%F:%L) - %m%n
-rootLogger.level=warn
-rootLogger.appenderRefs=stdout
-rootLogger.appenderRef.stdout.ref=STDOUT
-# don't waste logs when ZK check fails
-logger.zk.name=org.apache.zookeeper.ClientCnxn
-logger.zk.level=off
diff --git a/zipkin-collector/pom.xml b/zipkin-collector/pom.xml
index 153398d..246f176 100644
--- a/zipkin-collector/pom.xml
+++ b/zipkin-collector/pom.xml
@@ -42,7 +42,6 @@
<module>kafka</module>
<module>rabbitmq</module>
<module>scribe</module>
- <module>kafka08</module>
</modules>
<dependencies>
diff --git a/zipkin-server/README.md b/zipkin-server/README.md
index 7832c84..7e64055 100644
--- a/zipkin-server/README.md
+++ b/zipkin-server/README.md
@@ -352,14 +352,6 @@ Specifying bootstrap servers as a system property, instead of an environment var
$ java -Dzipkin.collector.kafka.bootstrap-servers=127.0.0.1:9092 -jar zipkin.jar
```
-#### Migration from Kafka < 0.8.1
-
-As explained [on kafka wiki](https://cwiki.apache.org/confluence/display/KAFKA/Committing+and+fetching+consumer+offsets+in+Kafka), offsets were stored in ZooKeeper. This has changed and offsets are now stored directly in Kafka. You need to update offsets in Kafka 0.10 by following the instructions.
-
-#### Kafka (Legacy) Collector
-The default collector is for Kafka 0.10.x+ brokers. You can use Kafka
-0.8 brokers via an external module. See [zipkin-autoconfigure/collector-kafka08](../zipkin-autoconfigure/collector-kafka08/).
-
### RabbitMQ collector
The [RabbitMQ collector](../zipkin-collector/rabbitmq) will be enabled when the `addresses` or `uri` for the RabbitMQ server(s) is set.