You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@metron.apache.org by ma...@apache.org on 2017/06/01 21:41:47 UTC
[28/44] metron git commit: METRON-891 KafkaConsumer should not be
shared among threads (jjmeyer via merrimanr) closes apache/metron#567
METRON-891 KafkaConsumer should not be shared among threads (jjmeyer via merrimanr) closes apache/metron#567
Project: http://git-wip-us.apache.org/repos/asf/metron/repo
Commit: http://git-wip-us.apache.org/repos/asf/metron/commit/47e2b735
Tree: http://git-wip-us.apache.org/repos/asf/metron/tree/47e2b735
Diff: http://git-wip-us.apache.org/repos/asf/metron/diff/47e2b735
Branch: refs/heads/Metron_0.4.0
Commit: 47e2b735ea682e7d647e3eaafac89a192a783e86
Parents: c0b0825
Author: jjmeyer <jj...@gmail.com>
Authored: Fri May 19 10:10:25 2017 -0500
Committer: merrimanr <me...@apache.org>
Committed: Fri May 19 10:10:25 2017 -0500
----------------------------------------------------------------------
dependencies_with_url.csv | 1 +
metron-interface/metron-rest/pom.xml | 34 +++-
.../apache/metron/rest/config/KafkaConfig.java | 52 ++++++-
.../metron/rest/controller/KafkaController.java | 104 +++++++------
.../metron/rest/service/KafkaService.java | 45 +++++-
.../rest/service/impl/KafkaServiceImpl.java | 155 +++++++++++--------
.../apache/metron/rest/config/TestConfig.java | 32 ++--
.../rest/service/impl/KafkaServiceImplTest.java | 15 +-
8 files changed, 297 insertions(+), 141 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/metron/blob/47e2b735/dependencies_with_url.csv
----------------------------------------------------------------------
diff --git a/dependencies_with_url.csv b/dependencies_with_url.csv
index 93a19b7..3752366 100644
--- a/dependencies_with_url.csv
+++ b/dependencies_with_url.csv
@@ -296,3 +296,4 @@ org.htrace:htrace-core:jar:3.0.4:compile,ASLv2,http://htrace.incubator.apache.or
net.byteseek:byteseek:jar:2.0.3:compile,BSD,https://github.com/nishihatapalmer/byteseek
org.springframework.security.kerberos:spring-security-kerberos-client:jar:1.0.1.RELEASE:compile,ASLv2,https://github.com/spring-projects/spring-security-kerberos
org.springframework.security.kerberos:spring-security-kerberos-core:jar:1.0.1.RELEASE:compile,ASLv2,https://github.com/spring-projects/spring-security-kerberos
+org.springframework.kafka:spring-kafka:jar:1.1.1.RELEASE:compile,ASLv2,https://github.com/spring-projects/spring-kafka
http://git-wip-us.apache.org/repos/asf/metron/blob/47e2b735/metron-interface/metron-rest/pom.xml
----------------------------------------------------------------------
diff --git a/metron-interface/metron-rest/pom.xml b/metron-interface/metron-rest/pom.xml
index b11e999..1c3ff92 100644
--- a/metron-interface/metron-rest/pom.xml
+++ b/metron-interface/metron-rest/pom.xml
@@ -33,9 +33,37 @@
<spring.kerberos.version>1.0.1.RELEASE</spring.kerberos.version>
<swagger.version>2.5.0</swagger.version>
<mysql.client.version>5.1.40</mysql.client.version>
+ <spring-kafka.version>1.1.1.RELEASE</spring-kafka.version>
</properties>
<dependencies>
<dependency>
+ <groupId>org.springframework.kafka</groupId>
+ <artifactId>spring-kafka</artifactId>
+ <version>${spring-kafka.version}</version>
+ <exclusions>
+ <exclusion>
+ <groupId>org.apache.kafka</groupId>
+ <artifactId>kafka-clients</artifactId>
+ </exclusion>
+ <exclusion>
+ <groupId>com.fasterxml.jackson.core</groupId>
+ <artifactId>jackson-core</artifactId>
+ </exclusion>
+ <exclusion>
+ <groupId>com.fasterxml.jackson.core</groupId>
+ <artifactId>jackson-databind</artifactId>
+ </exclusion>
+ <exclusion>
+ <groupId>org.springframework.retry</groupId>
+ <artifactId>spring-retry</artifactId>
+ </exclusion>
+ <exclusion>
+ <groupId>org.springframework</groupId>
+ <artifactId>spring-messaging</artifactId>
+ </exclusion>
+ </exclusions>
+ </dependency>
+ <dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
<exclusions>
@@ -75,7 +103,7 @@
<groupId>com.google.guava</groupId>
<artifactId>guava</artifactId>
</exclusion>
- </exclusions>
+ </exclusions>
</dependency>
<dependency>
<groupId>com.googlecode.json-simple</groupId>
@@ -112,8 +140,8 @@
<artifactId>jackson-databind</artifactId>
</exclusion>
<exclusion>
- <groupId>org.reflections</groupId>
- <artifactId>reflections</artifactId>
+ <groupId>org.reflections</groupId>
+ <artifactId>reflections</artifactId>
</exclusion>
</exclusions>
</dependency>
http://git-wip-us.apache.org/repos/asf/metron/blob/47e2b735/metron-interface/metron-rest/src/main/java/org/apache/metron/rest/config/KafkaConfig.java
----------------------------------------------------------------------
diff --git a/metron-interface/metron-rest/src/main/java/org/apache/metron/rest/config/KafkaConfig.java b/metron-interface/metron-rest/src/main/java/org/apache/metron/rest/config/KafkaConfig.java
index 309a549..247264b 100644
--- a/metron-interface/metron-rest/src/main/java/org/apache/metron/rest/config/KafkaConfig.java
+++ b/metron-interface/metron-rest/src/main/java/org/apache/metron/rest/config/KafkaConfig.java
@@ -27,33 +27,56 @@ import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.context.annotation.Profile;
import org.springframework.core.env.Environment;
+import org.springframework.kafka.core.ConsumerFactory;
+import org.springframework.kafka.core.DefaultKafkaConsumerFactory;
-import java.util.Properties;
+import java.util.HashMap;
+import java.util.Map;
import static org.apache.metron.rest.MetronRestConstants.TEST_PROFILE;
+/**
+ * Configuration used for connecting to Kafka.
+ */
@Configuration
@Profile("!" + TEST_PROFILE)
public class KafkaConfig {
-
+ /**
+ * The Spring environment.
+ */
private Environment environment;
+ /**
+ * Construvtor used to inject {@link Environment}.
+ * @param environment Spring environment to inject.
+ */
@Autowired
- public KafkaConfig(Environment environment) {
+ public KafkaConfig(final Environment environment) {
this.environment = environment;
}
+ /**
+ * The client used for ZooKeeper.
+ */
@Autowired
private ZkClient zkClient;
+ /**
+ * Bean for ZooKeeper
+ */
@Bean
public ZkUtils zkUtils() {
return ZkUtils.apply(zkClient, false);
}
- @Bean(destroyMethod = "close")
- public KafkaConsumer<String, String> kafkaConsumer() {
- Properties props = new Properties();
+ /**
+ * Create properties that will be used by {@link this#createConsumerFactory()}
+ *
+ * @return Configurations used by {@link this#createConsumerFactory()}.
+ */
+ @Bean
+ public Map<String, Object> consumerProperties() {
+ final Map<String, Object> props = new HashMap<>();
props.put("bootstrap.servers", environment.getProperty(MetronRestConstants.KAFKA_BROKER_URL_SPRING_PROPERTY));
props.put("group.id", "metron-rest");
props.put("enable.auto.commit", "false");
@@ -64,9 +87,24 @@ public class KafkaConfig {
if (environment.getProperty(MetronRestConstants.KERBEROS_ENABLED_SPRING_PROPERTY, Boolean.class, false)) {
props.put("security.protocol", "SASL_PLAINTEXT");
}
- return new KafkaConsumer<>(props);
+ return props;
+ }
+
+ /**
+ * Create a {@link ConsumerFactory} which will be used for certain Kafka interactions within config API.
+ *
+ * @return a {@link ConsumerFactory} used to create {@link KafkaConsumer} for interactions with Kafka.
+ */
+ @Bean
+ public ConsumerFactory<String, String> createConsumerFactory() {
+ return new DefaultKafkaConsumerFactory<>(consumerProperties());
}
+ /**
+ * Create a bean for {@link AdminUtils$}. This is primarily done to make testing a bit easier.
+ *
+ * @return {@link AdminUtils$} is written in scala. We return a reference to this class.
+ */
@Bean
public AdminUtils$ adminUtils() {
return AdminUtils$.MODULE$;
http://git-wip-us.apache.org/repos/asf/metron/blob/47e2b735/metron-interface/metron-rest/src/main/java/org/apache/metron/rest/controller/KafkaController.java
----------------------------------------------------------------------
diff --git a/metron-interface/metron-rest/src/main/java/org/apache/metron/rest/controller/KafkaController.java b/metron-interface/metron-rest/src/main/java/org/apache/metron/rest/controller/KafkaController.java
index 0cd4d54..2787504 100644
--- a/metron-interface/metron-rest/src/main/java/org/apache/metron/rest/controller/KafkaController.java
+++ b/metron-interface/metron-rest/src/main/java/org/apache/metron/rest/controller/KafkaController.java
@@ -35,62 +35,78 @@ import org.springframework.web.bind.annotation.RestController;
import java.util.Set;
+/**
+ * The API resource that is use to interact with Kafka.
+ */
@RestController
@RequestMapping("/api/v1/kafka")
public class KafkaController {
- @Autowired
- private KafkaService kafkaService;
+ /**
+ * Service used to interact with Kafka.
+ */
+ @Autowired
+ private KafkaService kafkaService;
- @ApiOperation(value = "Creates a new Kafka topic")
+ @ApiOperation(value = "Creates a new Kafka topic")
+ @ApiResponses({
@ApiResponse(message = "Returns saved Kafka topic", code = 200)
- @RequestMapping(value = "/topic", method = RequestMethod.POST)
- ResponseEntity<KafkaTopic> save(@ApiParam(name="topic", value="Kafka topic", required=true)@RequestBody KafkaTopic topic) throws RestException {
- return new ResponseEntity<>(kafkaService.createTopic(topic), HttpStatus.CREATED);
- }
+ })
+ @RequestMapping(value = "/topic", method = RequestMethod.POST)
+ ResponseEntity<KafkaTopic> save(final @ApiParam(name = "topic", value = "Kafka topic", required = true) @RequestBody KafkaTopic topic) throws RestException {
+ return new ResponseEntity<>(kafkaService.createTopic(topic), HttpStatus.CREATED);
+ }
- @ApiOperation(value = "Retrieves a Kafka topic")
- @ApiResponses(value = { @ApiResponse(message = "Returns Kafka topic", code = 200),
- @ApiResponse(message = "Kafka topic is missing", code = 404) })
- @RequestMapping(value = "/topic/{name}", method = RequestMethod.GET)
- ResponseEntity<KafkaTopic> get(@ApiParam(name="name", value="Kafka topic name", required=true)@PathVariable String name) throws RestException {
- KafkaTopic kafkaTopic = kafkaService.getTopic(name);
- if (kafkaTopic != null) {
- return new ResponseEntity<>(kafkaTopic, HttpStatus.OK);
- } else {
- return new ResponseEntity<>(HttpStatus.NOT_FOUND);
- }
+ @ApiOperation(value = "Retrieves a Kafka topic")
+ @ApiResponses(value = {
+ @ApiResponse(message = "Returns Kafka topic", code = 200),
+ @ApiResponse(message = "Kafka topic is missing", code = 404)
+ })
+ @RequestMapping(value = "/topic/{name}", method = RequestMethod.GET)
+ ResponseEntity<KafkaTopic> get(final @ApiParam(name = "name", value = "Kafka topic name", required = true) @PathVariable String name) throws RestException {
+ KafkaTopic kafkaTopic = kafkaService.getTopic(name);
+ if (kafkaTopic != null) {
+ return new ResponseEntity<>(kafkaTopic, HttpStatus.OK);
+ } else {
+ return new ResponseEntity<>(HttpStatus.NOT_FOUND);
}
+ }
- @ApiOperation(value = "Retrieves all Kafka topics")
+ @ApiOperation(value = "Retrieves all Kafka topics")
+ @ApiResponses({
@ApiResponse(message = "Returns a list of all Kafka topics", code = 200)
- @RequestMapping(value = "/topic", method = RequestMethod.GET)
- ResponseEntity<Set<String>> list() throws Exception {
- return new ResponseEntity<>(kafkaService.listTopics(), HttpStatus.OK);
- }
+ })
+ @RequestMapping(value = "/topic", method = RequestMethod.GET)
+ ResponseEntity<Set<String>> list() throws Exception {
+ return new ResponseEntity<>(kafkaService.listTopics(), HttpStatus.OK);
+ }
- @ApiOperation(value = "Deletes a Kafka topic")
- @ApiResponses(value = { @ApiResponse(message = "Kafka topic was deleted", code = 200),
- @ApiResponse(message = "Kafka topic is missing", code = 404) })
- @RequestMapping(value = "/topic/{name}", method = RequestMethod.DELETE)
- ResponseEntity<Void> delete(@ApiParam(name="name", value="Kafka topic name", required=true)@PathVariable String name) throws RestException {
- if (kafkaService.deleteTopic(name)) {
- return new ResponseEntity<>(HttpStatus.OK);
- } else {
- return new ResponseEntity<>(HttpStatus.NOT_FOUND);
- }
+ @ApiOperation(value = "Deletes a Kafka topic")
+ @ApiResponses(value = {
+ @ApiResponse(message = "Kafka topic was deleted", code = 200),
+ @ApiResponse(message = "Kafka topic is missing", code = 404)
+ })
+ @RequestMapping(value = "/topic/{name}", method = RequestMethod.DELETE)
+ ResponseEntity<Void> delete(final @ApiParam(name = "name", value = "Kafka topic name", required = true) @PathVariable String name) throws RestException {
+ if (kafkaService.deleteTopic(name)) {
+ return new ResponseEntity<>(HttpStatus.OK);
+ } else {
+ return new ResponseEntity<>(HttpStatus.NOT_FOUND);
}
+ }
- @ApiOperation(value = "Retrieves a sample message from a Kafka topic using the most recent offset")
- @ApiResponses(value = { @ApiResponse(message = "Returns sample message", code = 200),
- @ApiResponse(message = "Either Kafka topic is missing or contains no messages", code = 404) })
- @RequestMapping(value = "/topic/{name}/sample", method = RequestMethod.GET)
- ResponseEntity<String> getSample(@ApiParam(name="name", value="Kafka topic name", required=true)@PathVariable String name) throws RestException {
- String sampleMessage = kafkaService.getSampleMessage(name);
- if (sampleMessage != null) {
- return new ResponseEntity<>(sampleMessage, HttpStatus.OK);
- } else {
- return new ResponseEntity<>(HttpStatus.NOT_FOUND);
- }
+ @ApiOperation(value = "Retrieves a sample message from a Kafka topic using the most recent offset")
+ @ApiResponses(value = {
+ @ApiResponse(message = "Returns sample message", code = 200),
+ @ApiResponse(message = "Either Kafka topic is missing or contains no messages", code = 404)
+ })
+ @RequestMapping(value = "/topic/{name}/sample", method = RequestMethod.GET)
+ ResponseEntity<String> getSample(final @ApiParam(name = "name", value = "Kafka topic name", required = true) @PathVariable String name) throws RestException {
+ String sampleMessage = kafkaService.getSampleMessage(name);
+ if (sampleMessage != null) {
+ return new ResponseEntity<>(sampleMessage, HttpStatus.OK);
+ } else {
+ return new ResponseEntity<>(HttpStatus.NOT_FOUND);
}
+ }
}
http://git-wip-us.apache.org/repos/asf/metron/blob/47e2b735/metron-interface/metron-rest/src/main/java/org/apache/metron/rest/service/KafkaService.java
----------------------------------------------------------------------
diff --git a/metron-interface/metron-rest/src/main/java/org/apache/metron/rest/service/KafkaService.java b/metron-interface/metron-rest/src/main/java/org/apache/metron/rest/service/KafkaService.java
index f3cd901..bee00f2 100644
--- a/metron-interface/metron-rest/src/main/java/org/apache/metron/rest/service/KafkaService.java
+++ b/metron-interface/metron-rest/src/main/java/org/apache/metron/rest/service/KafkaService.java
@@ -22,16 +22,49 @@ import org.apache.metron.rest.model.KafkaTopic;
import java.util.Set;
+/**
+ * This is a set of operations created to interact with Kafka.
+ */
public interface KafkaService {
- String CONSUMER_OFFSETS_TOPIC = "__consumer_offsets";
+ /**
+ * Please see the following for documentation.
+ *
+ * @see <a href="https://kafka.apache.org/documentation/#impl_offsettracking">Kafka offset tracking documentation</a>.
+ */
+ String CONSUMER_OFFSETS_TOPIC = "__consumer_offsets";
- KafkaTopic createTopic(KafkaTopic topic) throws RestException;
+ /**
+ * Create a topic in Kafka for given information.
+ * @param topic The information used to create a Kafka topic.
+ * @return The Kafka topic created.
+ * @throws RestException If exceptions occur when creating a topic they should be wrapped in a {@link RestException}.
+ */
+ KafkaTopic createTopic(KafkaTopic topic) throws RestException;
- boolean deleteTopic(String name);
+ /**
+ * Delete a topic for a given name.
+ * @param name The name of the topic to delete.
+ * @return If topic was deleted true; otherwise false.
+ */
+ boolean deleteTopic(String name);
- KafkaTopic getTopic(String name);
+ /**
+ * Retrieves the Kafka topic for a given name.
+ * @param name The name of the Kafka topic to retrieve.
+ * @return A {@link KafkaTopic} with the name of {@code name}. Null if topic with name, {@code name}, doesn't exist.
+ */
+ KafkaTopic getTopic(String name);
- Set<String> listTopics();
+ /**
+ * Returns a set of all topics.
+ * @return A set of all topics in Kafka.
+ */
+ Set<String> listTopics();
- String getSampleMessage(String topic);
+ /**
+ * Return a single sample message from a given topic.
+ * @param topic The name of the topic to retrieve a sample message from.
+ * @return A string representation of the sample message retrieved. If topic doesn't exist null will be returned.
+ */
+ String getSampleMessage(String topic);
}
http://git-wip-us.apache.org/repos/asf/metron/blob/47e2b735/metron-interface/metron-rest/src/main/java/org/apache/metron/rest/service/impl/KafkaServiceImpl.java
----------------------------------------------------------------------
diff --git a/metron-interface/metron-rest/src/main/java/org/apache/metron/rest/service/impl/KafkaServiceImpl.java b/metron-interface/metron-rest/src/main/java/org/apache/metron/rest/service/impl/KafkaServiceImpl.java
index 33cb2e3..61e2618 100644
--- a/metron-interface/metron-rest/src/main/java/org/apache/metron/rest/service/impl/KafkaServiceImpl.java
+++ b/metron-interface/metron-rest/src/main/java/org/apache/metron/rest/service/impl/KafkaServiceImpl.java
@@ -20,8 +20,8 @@ package org.apache.metron.rest.service.impl;
import kafka.admin.AdminOperationException;
import kafka.admin.AdminUtils$;
import kafka.admin.RackAwareMode;
-import kafka.admin.RackAwareMode$;
import kafka.utils.ZkUtils;
+import org.apache.kafka.clients.consumer.Consumer;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.common.PartitionInfo;
@@ -30,6 +30,7 @@ import org.apache.metron.rest.RestException;
import org.apache.metron.rest.model.KafkaTopic;
import org.apache.metron.rest.service.KafkaService;
import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.kafka.core.ConsumerFactory;
import org.springframework.stereotype.Service;
import java.util.HashSet;
@@ -38,88 +39,106 @@ import java.util.Map;
import java.util.Set;
import java.util.stream.Collectors;
+/**
+ * The default service layer implementation of {@link KafkaService}.
+ *
+ * @see KafkaService
+ */
@Service
public class KafkaServiceImpl implements KafkaService {
- private ZkUtils zkUtils;
- private KafkaConsumer<String, String> kafkaConsumer;
- private AdminUtils$ adminUtils;
+ /**
+ * The timeout used when polling Kafka.
+ */
+ private static final int KAFKA_CONSUMER_TIMEOUT = 100;
- @Autowired
- public KafkaServiceImpl(ZkUtils zkUtils, KafkaConsumer<String, String> kafkaConsumer, AdminUtils$ adminUtils) {
- this.zkUtils = zkUtils;
- this.kafkaConsumer = kafkaConsumer;
- this.adminUtils = adminUtils;
- }
+ private final ZkUtils zkUtils;
+ private final ConsumerFactory<String, String> kafkaConsumerFactory;
+ private final AdminUtils$ adminUtils;
- @Override
- public KafkaTopic createTopic(KafkaTopic topic) throws RestException {
- if (!listTopics().contains(topic.getName())) {
- try {
- adminUtils.createTopic(zkUtils, topic.getName(), topic.getNumPartitions(), topic.getReplicationFactor(), topic.getProperties(),RackAwareMode.Disabled$.MODULE$ );
- } catch (AdminOperationException e) {
- throw new RestException(e);
- }
- }
- return topic;
+ /**
+ * @param zkUtils A utility class used to interact with ZooKeeper.
+ * @param kafkaConsumerFactory A class used to create {@link KafkaConsumer} in order to interact with Kafka.
+ * @param adminUtils A utility class used to do administration operations on Kafka.
+ */
+ @Autowired
+ public KafkaServiceImpl(final ZkUtils zkUtils,
+ final ConsumerFactory<String, String> kafkaConsumerFactory,
+ final AdminUtils$ adminUtils) {
+ this.zkUtils = zkUtils;
+ this.kafkaConsumerFactory = kafkaConsumerFactory;
+ this.adminUtils = adminUtils;
+ }
+
+ @Override
+ public KafkaTopic createTopic(final KafkaTopic topic) throws RestException {
+ if (!listTopics().contains(topic.getName())) {
+ try {
+ adminUtils.createTopic(zkUtils, topic.getName(), topic.getNumPartitions(), topic.getReplicationFactor(), topic.getProperties(), RackAwareMode.Disabled$.MODULE$);
+ } catch (AdminOperationException e) {
+ throw new RestException(e);
+ }
}
+ return topic;
+ }
- @Override
- public boolean deleteTopic(String name) {
- Set<String> topics = listTopics();
- if (topics != null && topics.contains(name)) {
- adminUtils.deleteTopic(zkUtils, name);
- return true;
- } else {
- return false;
- }
+ @Override
+ public boolean deleteTopic(final String name) {
+ final Set<String> topics = listTopics();
+ if (topics != null && topics.contains(name)) {
+ adminUtils.deleteTopic(zkUtils, name);
+ return true;
+ } else {
+ return false;
}
+ }
- @Override
- public KafkaTopic getTopic(String name) {
- KafkaTopic kafkaTopic = null;
- if (listTopics().contains(name)) {
- List<PartitionInfo> partitionInfos = kafkaConsumer.partitionsFor(name);
- if (partitionInfos.size() > 0) {
- PartitionInfo partitionInfo = partitionInfos.get(0);
- kafkaTopic = new KafkaTopic();
- kafkaTopic.setName(name);
- kafkaTopic.setNumPartitions(partitionInfos.size());
- kafkaTopic.setReplicationFactor(partitionInfo.replicas().length);
- }
+ @Override
+ public KafkaTopic getTopic(final String name) {
+ KafkaTopic kafkaTopic = null;
+ if (listTopics().contains(name)) {
+ try (Consumer<String, String> consumer = kafkaConsumerFactory.createConsumer()) {
+ final List<PartitionInfo> partitionInfos = consumer.partitionsFor(name);
+ if (partitionInfos.size() > 0) {
+ final PartitionInfo partitionInfo = partitionInfos.get(0);
+ kafkaTopic = new KafkaTopic();
+ kafkaTopic.setName(name);
+ kafkaTopic.setNumPartitions(partitionInfos.size());
+ kafkaTopic.setReplicationFactor(partitionInfo.replicas().length);
}
- return kafkaTopic;
+ }
}
+ return kafkaTopic;
+ }
- @Override
- public Set<String> listTopics() {
- Set<String> topics;
- synchronized (this) {
- Map<String, List<PartitionInfo>> topicsInfo = kafkaConsumer.listTopics();
- topics = topicsInfo == null ? new HashSet<>() : topicsInfo.keySet();
- topics.remove(CONSUMER_OFFSETS_TOPIC);
- }
- return topics;
+ @Override
+ public Set<String> listTopics() {
+ try (Consumer<String, String> consumer = kafkaConsumerFactory.createConsumer()) {
+ final Map<String, List<PartitionInfo>> topicsInfo = consumer.listTopics();
+ final Set<String> topics = topicsInfo == null ? new HashSet<>() : topicsInfo.keySet();
+ topics.remove(CONSUMER_OFFSETS_TOPIC);
+ return topics;
}
+ }
- @Override
- public String getSampleMessage(String topic) {
- String message = null;
- if (listTopics().contains(topic)) {
- synchronized (this) {
- kafkaConsumer.assign(kafkaConsumer.partitionsFor(topic).stream()
- .map(partitionInfo -> new TopicPartition(topic, partitionInfo.partition()))
- .collect(Collectors.toList()));
+ @Override
+ public String getSampleMessage(final String topic) {
+ String message = null;
+ if (listTopics().contains(topic)) {
+ try (Consumer<String, String> kafkaConsumer = kafkaConsumerFactory.createConsumer()) {
+ kafkaConsumer.assign(kafkaConsumer.partitionsFor(topic).stream()
+ .map(partitionInfo -> new TopicPartition(topic, partitionInfo.partition()))
+ .collect(Collectors.toList()));
- kafkaConsumer.assignment().stream()
- .filter(p -> (kafkaConsumer.position(p) -1) >= 0)
- .forEach(p -> kafkaConsumer.seek(p, kafkaConsumer.position(p) - 1));
+ kafkaConsumer.assignment().stream()
+ .filter(p -> (kafkaConsumer.position(p) - 1) >= 0)
+ .forEach(p -> kafkaConsumer.seek(p, kafkaConsumer.position(p) - 1));
- ConsumerRecords<String, String> records = kafkaConsumer.poll(100);
- message = records.isEmpty() ? null : records.iterator().next().value();
- kafkaConsumer.unsubscribe();
- }
- }
- return message;
+ final ConsumerRecords<String, String> records = kafkaConsumer.poll(KAFKA_CONSUMER_TIMEOUT);
+ message = records.isEmpty() ? null : records.iterator().next().value();
+ kafkaConsumer.unsubscribe();
+ }
}
+ return message;
+ }
}
http://git-wip-us.apache.org/repos/asf/metron/blob/47e2b735/metron-interface/metron-rest/src/test/java/org/apache/metron/rest/config/TestConfig.java
----------------------------------------------------------------------
diff --git a/metron-interface/metron-rest/src/test/java/org/apache/metron/rest/config/TestConfig.java b/metron-interface/metron-rest/src/test/java/org/apache/metron/rest/config/TestConfig.java
index edfd542..adfe056 100644
--- a/metron-interface/metron-rest/src/test/java/org/apache/metron/rest/config/TestConfig.java
+++ b/metron-interface/metron-rest/src/test/java/org/apache/metron/rest/config/TestConfig.java
@@ -25,7 +25,6 @@ import org.apache.curator.RetryPolicy;
import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.CuratorFrameworkFactory;
import org.apache.curator.retry.ExponentialBackoffRetry;
-import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.metron.integration.ComponentRunner;
import org.apache.metron.integration.UnableToStartException;
import org.apache.metron.integration.components.KafkaComponent;
@@ -36,8 +35,12 @@ import org.apache.metron.rest.service.impl.StormCLIWrapper;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.context.annotation.Profile;
+import org.springframework.kafka.core.ConsumerFactory;
+import org.springframework.kafka.core.DefaultKafkaConsumerFactory;
import org.springframework.web.client.RestTemplate;
+import java.util.HashMap;
+import java.util.Map;
import java.util.Properties;
import static org.apache.metron.rest.MetronRestConstants.TEST_PROFILE;
@@ -54,7 +57,7 @@ public class TestConfig {
@Bean
public ZKServerComponent zkServerComponent(Properties zkProperties) {
return new ZKServerComponent()
- .withPostStartCallback((zkComponent) -> zkProperties.setProperty(ZKServerComponent.ZOOKEEPER_PROPERTY, zkComponent.getConnectionString()));
+ .withPostStartCallback((zkComponent) -> zkProperties.setProperty(ZKServerComponent.ZOOKEEPER_PROPERTY, zkComponent.getConnectionString()));
}
@Bean
@@ -66,10 +69,10 @@ public class TestConfig {
@Bean
public ComponentRunner componentRunner(ZKServerComponent zkServerComponent, KafkaComponent kafkaWithZKComponent) {
ComponentRunner runner = new ComponentRunner.Builder()
- .withComponent("zk", zkServerComponent)
- .withComponent("kafka", kafkaWithZKComponent)
- .withCustomShutdownOrder(new String[] {"kafka","zk"})
- .build();
+ .withComponent("zk", zkServerComponent)
+ .withComponent("kafka", kafkaWithZKComponent)
+ .withCustomShutdownOrder(new String[]{"kafka", "zk"})
+ .build();
try {
runner.start();
} catch (UnableToStartException e) {
@@ -78,14 +81,14 @@ public class TestConfig {
return runner;
}
- @Bean(initMethod = "start", destroyMethod="close")
+ @Bean(initMethod = "start", destroyMethod = "close")
public CuratorFramework client(ComponentRunner componentRunner) {
RetryPolicy retryPolicy = new ExponentialBackoffRetry(1000, 3);
ZKServerComponent zkServerComponent = componentRunner.getComponent("zk", ZKServerComponent.class);
return CuratorFrameworkFactory.newClient(zkServerComponent.getConnectionString(), retryPolicy);
}
- @Bean(destroyMethod="close")
+ @Bean(destroyMethod = "close")
public ZkClient zkClient(ComponentRunner componentRunner) {
ZKServerComponent zkServerComponent = componentRunner.getComponent("zk", ZKServerComponent.class);
return new ZkClient(zkServerComponent.getConnectionString(), 10000, 10000, ZKStringSerializer$.MODULE$);
@@ -96,9 +99,9 @@ public class TestConfig {
return ZkUtils.apply(zkClient, false);
}
- @Bean(destroyMethod="close")
- public KafkaConsumer<String, String> kafkaConsumer(KafkaComponent kafkaWithZKComponent) {
- Properties props = new Properties();
+ @Bean
+ public Map<String, Object> kafkaConsumer(KafkaComponent kafkaWithZKComponent) {
+ Map<String, Object> props = new HashMap<>();
props.put("bootstrap.servers", kafkaWithZKComponent.getBrokerList());
props.put("group.id", "metron-config");
props.put("enable.auto.commit", "false");
@@ -106,7 +109,12 @@ public class TestConfig {
props.put("session.timeout.ms", "30000");
props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
- return new KafkaConsumer<>(props);
+ return props;
+ }
+
+ @Bean
+ public ConsumerFactory<String, String> createConsumerFactory() {
+ return new DefaultKafkaConsumerFactory<>(kafkaConsumer(kafkaWithZKComponent(zkProperties())));
}
@Bean
http://git-wip-us.apache.org/repos/asf/metron/blob/47e2b735/metron-interface/metron-rest/src/test/java/org/apache/metron/rest/service/impl/KafkaServiceImplTest.java
----------------------------------------------------------------------
diff --git a/metron-interface/metron-rest/src/test/java/org/apache/metron/rest/service/impl/KafkaServiceImplTest.java b/metron-interface/metron-rest/src/test/java/org/apache/metron/rest/service/impl/KafkaServiceImplTest.java
index c7d42b3..c92feab 100644
--- a/metron-interface/metron-rest/src/test/java/org/apache/metron/rest/service/impl/KafkaServiceImplTest.java
+++ b/metron-interface/metron-rest/src/test/java/org/apache/metron/rest/service/impl/KafkaServiceImplTest.java
@@ -41,6 +41,7 @@ import org.junit.runner.RunWith;
import org.powermock.core.classloader.annotations.PowerMockIgnore;
import org.powermock.core.classloader.annotations.PrepareForTest;
import org.powermock.modules.junit4.PowerMockRunner;
+import org.springframework.kafka.core.ConsumerFactory;
import java.util.ArrayList;
import java.util.HashMap;
@@ -71,6 +72,7 @@ public class KafkaServiceImplTest {
private ZkUtils zkUtils;
private KafkaConsumer<String, String> kafkaConsumer;
+ private ConsumerFactory<String, String> kafkaConsumerFactory;
private AdminUtils$ adminUtils;
private KafkaService kafkaService;
@@ -86,10 +88,13 @@ public class KafkaServiceImplTest {
@Before
public void setUp() throws Exception {
zkUtils = mock(ZkUtils.class);
+ kafkaConsumerFactory = mock(ConsumerFactory.class);
kafkaConsumer = mock(KafkaConsumer.class);
adminUtils = mock(AdminUtils$.class);
- kafkaService = new KafkaServiceImpl(zkUtils, kafkaConsumer, adminUtils);
+ when(kafkaConsumerFactory.createConsumer()).thenReturn(kafkaConsumer);
+
+ kafkaService = new KafkaServiceImpl(zkUtils, kafkaConsumerFactory, adminUtils);
}
@Test
@@ -104,6 +109,7 @@ public class KafkaServiceImplTest {
verifyZeroInteractions(zkUtils);
verify(kafkaConsumer).listTopics();
+ verify(kafkaConsumer).close();
verifyNoMoreInteractions(kafkaConsumer, zkUtils, adminUtils);
}
@@ -119,6 +125,7 @@ public class KafkaServiceImplTest {
verifyZeroInteractions(zkUtils);
verify(kafkaConsumer).listTopics();
+ verify(kafkaConsumer).close();
verifyNoMoreInteractions(kafkaConsumer, zkUtils);
}
@@ -137,6 +144,7 @@ public class KafkaServiceImplTest {
verifyZeroInteractions(zkUtils);
verify(kafkaConsumer).listTopics();
+ verify(kafkaConsumer).close();
verifyNoMoreInteractions(kafkaConsumer, zkUtils);
}
@@ -156,6 +164,7 @@ public class KafkaServiceImplTest {
verifyZeroInteractions(zkUtils);
verify(kafkaConsumer).listTopics();
+ verify(kafkaConsumer).close();
verifyNoMoreInteractions(kafkaConsumer, zkUtils);
}
@@ -167,6 +176,7 @@ public class KafkaServiceImplTest {
verifyZeroInteractions(zkUtils);
verify(kafkaConsumer).listTopics();
+ verify(kafkaConsumer).close();
verifyNoMoreInteractions(kafkaConsumer, zkUtils);
}
@@ -180,6 +190,7 @@ public class KafkaServiceImplTest {
assertTrue(kafkaService.deleteTopic("non_existent_topic"));
verify(kafkaConsumer).listTopics();
+ verify(kafkaConsumer).close();
verify(adminUtils).deleteTopic(zkUtils, "non_existent_topic");
verifyNoMoreInteractions(kafkaConsumer);
}
@@ -193,6 +204,7 @@ public class KafkaServiceImplTest {
assertFalse(kafkaService.deleteTopic("non_existent_topic"));
verify(kafkaConsumer).listTopics();
+ verify(kafkaConsumer).close();
verifyNoMoreInteractions(kafkaConsumer);
}
@@ -230,6 +242,7 @@ public class KafkaServiceImplTest {
verify(kafkaConsumer).listTopics();
verify(kafkaConsumer, times(0)).partitionsFor("t");
+ verify(kafkaConsumer).close();
verifyZeroInteractions(zkUtils);
verifyNoMoreInteractions(kafkaConsumer);
}