You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hbase.apache.org by ps...@apache.org on 2020/12/12 20:51:47 UTC
[hbase-connectors] branch master updated: HBASE-25388 Using an
extension of MockProducer on testing side (#76)
This is an automated email from the ASF dual-hosted git repository.
psomogyi pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hbase-connectors.git
The following commit(s) were added to refs/heads/master by this push:
new 852be70 HBASE-25388 Using an extension of MockProducer on testing side (#76)
852be70 is described below
commit 852be70fb75f793fb8f7d80b7d368fcd2b907366
Author: Andras Katona <41...@users.noreply.github.com>
AuthorDate: Sat Dec 12 21:51:36 2020 +0100
HBASE-25388 Using an extension of MockProducer on testing side (#76)
Signed-off-by: Peter Somogyi <ps...@apache.org>
---
kafka/hbase-kafka-proxy/pom.xml | 7 ++
.../hadoop/hbase/kafka/ProducerForTesting.java | 93 ++--------------------
2 files changed, 15 insertions(+), 85 deletions(-)
diff --git a/kafka/hbase-kafka-proxy/pom.xml b/kafka/hbase-kafka-proxy/pom.xml
index a4372d4..356838d 100755
--- a/kafka/hbase-kafka-proxy/pom.xml
+++ b/kafka/hbase-kafka-proxy/pom.xml
@@ -161,6 +161,13 @@
<artifactId>kafka-clients</artifactId>
<version>${kafka-clients.version}</version>
</dependency>
+ <dependency>
+ <groupId>org.apache.kafka</groupId>
+ <artifactId>kafka-clients</artifactId>
+ <classifier>test</classifier>
+ <scope>test</scope>
+ <version>${kafka-clients.version}</version>
+ </dependency>
<!-- General dependencies -->
<dependency>
<groupId>org.apache.commons</groupId>
diff --git a/kafka/hbase-kafka-proxy/src/test/java/org/apache/hadoop/hbase/kafka/ProducerForTesting.java b/kafka/hbase-kafka-proxy/src/test/java/org/apache/hadoop/hbase/kafka/ProducerForTesting.java
index 7b767ca..e800501 100644
--- a/kafka/hbase-kafka-proxy/src/test/java/org/apache/hadoop/hbase/kafka/ProducerForTesting.java
+++ b/kafka/hbase-kafka-proxy/src/test/java/org/apache/hadoop/hbase/kafka/ProducerForTesting.java
@@ -19,35 +19,28 @@ import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.Future;
-import java.util.concurrent.TimeUnit;
import org.apache.avro.io.BinaryDecoder;
import org.apache.avro.io.DecoderFactory;
import org.apache.avro.specific.SpecificDatumReader;
-import org.apache.kafka.clients.consumer.OffsetAndMetadata;
-import org.apache.kafka.clients.producer.Callback;
-import org.apache.kafka.clients.producer.Producer;
+import org.apache.kafka.clients.producer.MockProducer;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.clients.producer.RecordMetadata;
-import org.apache.kafka.common.Metric;
-import org.apache.kafka.common.MetricName;
-import org.apache.kafka.common.PartitionInfo;
-import org.apache.kafka.common.TopicPartition;
-import org.apache.kafka.common.errors.ProducerFencedException;
+import org.apache.kafka.test.MockSerializer;
/**
* Mocks Kafka producer for testing
*/
-public class ProducerForTesting implements Producer<byte[], byte[]> {
+public class ProducerForTesting extends MockProducer<byte[], byte[]> {
Map<String, List<HBaseKafkaEvent>> messages = new HashMap<>();
SpecificDatumReader<HBaseKafkaEvent> dreader = new SpecificDatumReader<>(HBaseKafkaEvent.SCHEMA$);
- public Map<String, List<HBaseKafkaEvent>> getMessages() {
- return messages;
+ public ProducerForTesting() {
+ super(true, new MockSerializer(), new MockSerializer());
}
- @Override
- public void abortTransaction() throws ProducerFencedException {
+ public Map<String, List<HBaseKafkaEvent>> getMessages() {
+ return messages;
}
@Override
@@ -59,79 +52,9 @@ public class ProducerForTesting implements Producer<byte[], byte[]> {
messages.put(producerRecord.topic(), new ArrayList<>());
}
messages.get(producerRecord.topic()).add(event);
- return new Future<RecordMetadata>() {
- @Override
- public boolean cancel(boolean mayInterruptIfRunning) {
- return false;
- }
-
- @Override
- public boolean isCancelled() {
- return false;
- }
-
- @Override
- public boolean isDone() {
- return false;
- }
-
- @Override
- public RecordMetadata get() {
- return new RecordMetadata(null, 1, 1, 1, (long)1, 1, 1);
- }
-
- @Override
- public RecordMetadata get(long timeout, TimeUnit unit) {
- return null;
- }
- };
+ return super.send(producerRecord);
} catch (Exception e) {
throw new RuntimeException(e);
}
}
-
- @Override
- public Future<RecordMetadata> send(ProducerRecord<byte[], byte[]> producerRecord,
- Callback callback) {
- return null;
- }
-
- @Override
- public void flush() {
- }
-
- @Override
- public List<PartitionInfo> partitionsFor(String s) {
- return null;
- }
-
- @Override
- public Map<MetricName, ? extends Metric> metrics() {
- return null;
- }
-
- @Override
- public void close() {
- }
-
- @Override
- public void close(long l, TimeUnit timeUnit) {
- }
-
- @Override
- public void initTransactions() {
- }
-
- @Override
- public void beginTransaction() throws ProducerFencedException {
- }
-
- @Override
- public void sendOffsetsToTransaction(Map<TopicPartition, OffsetAndMetadata> offsets,
- String consumerGroupId) throws ProducerFencedException {
- }
-
- @Override
- public void commitTransaction() throws ProducerFencedException {
- }
}