You are viewing a plain text version of this content. The canonical link for it is here.
Posted to jira@kafka.apache.org by "DHRUV BANSAL (JIRA)" <ji...@apache.org> on 2017/12/09 16:44:00 UTC
[jira] [Created] (KAFKA-6339) Integration test with embedded kafka.
DHRUV BANSAL created KAFKA-6339:
-----------------------------------
Summary: Integration test with embedded kafka.
Key: KAFKA-6339
URL: https://issues.apache.org/jira/browse/KAFKA-6339
Project: Kafka
Issue Type: Bug
Affects Versions: 0.11.0.2
Reporter: DHRUV BANSAL
I am using Kafka version - 0.11.0.2
Trying to write an integration test for one of the components I am writing over Kafka.
Following code works fine with Kafka version 0.10.0.0 but not working with mentioned version (0.11.0.2)
{{
// setup Zookeeper
EmbeddedZookeeper embeddedZookeeper = new EmbeddedZookeeper();
String zkConnect = ZKHOST + ":" + embeddedZookeeper.port();
ZkClient zkClient = new ZkClient(zkConnect, 30000, 30000, ZKStringSerializer$.MODULE$);
ZkUtils zkUtils = ZkUtils.apply(zkClient, false);
// setup Broker
Properties brokerProps = new Properties();
brokerProps.setProperty("zookeeper.connect", zkConnect);
brokerProps.setProperty("broker.id", "0");
brokerProps.setProperty("offsets.topic.replication.factor", "1");
String kafka_log_path = Files.createTempDirectory("kafka-").toAbsolutePath().toString();
System.out.println("kafka log path " + kafka_log_path);
brokerProps.setProperty("log.dirs", kafka_log_path);
brokerProps.setProperty("listeners", "PLAINTEXT://" + BROKERHOST + ":" + BROKERPORT);
KafkaConfig config = new KafkaConfig(brokerProps);
Time mock = new MockTime();
KafkaServer kafkaServer = TestUtils.createServer(config, mock);
// create topic
AdminUtils.createTopic(zkUtils, TOPIC, 1, 1, new Properties(), RackAwareMode.Disabled$.MODULE$);
// setup producer
Properties producerProps = new Properties();
producerProps.setProperty("bootstrap.servers", BROKERHOST + ":" + BROKERPORT);
producerProps.setProperty("key.serializer", "org.apache.kafka.common.serialization.IntegerSerializer");
producerProps.setProperty("value.serializer", "org.apache.kafka.common.serialization.ByteArraySerializer");
KafkaProducer<Integer, byte[]> producer = new KafkaProducer<Integer, byte[]>(producerProps);
// setup consumer
Properties consumerProps = new Properties();
consumerProps.setProperty("bootstrap.servers", BROKERHOST + ":" + BROKERPORT);
consumerProps.setProperty("group.id", "group0");
consumerProps.setProperty("client.id", "consumer0");
consumerProps.setProperty("key.deserializer", "org.apache.kafka.common.serialization.IntegerDeserializer");
consumerProps.setProperty("value.deserializer", "org.apache.kafka.common.serialization.ByteArrayDeserializer");
consumerProps.put("auto.offset.reset", "earliest"); // to make sure the consumer starts from the beginning of
// the topic
KafkaConsumer<Integer, byte[]> consumer = new KafkaConsumer<>(consumerProps);
consumer.subscribe(Arrays.asList(TOPIC));
// send message
ProducerRecord<Integer, byte[]> data = new ProducerRecord<>(TOPIC, 42,
"test-message".getBytes(StandardCharsets.UTF_8));
Future<RecordMetadata> record = producer.send(data);
RecordMetadata metadata = record.get();
// starting consumer
ConsumerRecords<Integer, byte[]> records = consumer.poll(3000);
assertEquals(1, records.count());
Iterator<ConsumerRecord<Integer, byte[]>> recordIterator = records.iterator();
ConsumerRecord<Integer, byte[]> consumedRecord = recordIterator.next();
System.out.printf("offset = %d, key = %s, value = %s", consumedRecord.offset(), consumedRecord.key(),
consumedRecord.value());
assertEquals(42, (int) consumedRecord.key());
assertEquals("test-message", new String(consumedRecord.value(), StandardCharsets.UTF_8));
kafkaServer.shutdown();
zkClient.close();
embeddedZookeeper.shutdown();
}}
Please provide support for the same and there should be proper documentation for the intergration test with each release.
--
This message was sent by Atlassian JIRA
(v6.4.14#64029)