You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hbase.apache.org by ps...@apache.org on 2020/12/12 20:51:47 UTC

[hbase-connectors] branch master updated: HBASE-25388 Using an extension of MockProducer on testing side (#76)

This is an automated email from the ASF dual-hosted git repository.

psomogyi pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hbase-connectors.git


The following commit(s) were added to refs/heads/master by this push:
     new 852be70  HBASE-25388 Using an extension of MockProducer on testing side (#76)
852be70 is described below

commit 852be70fb75f793fb8f7d80b7d368fcd2b907366
Author: Andras Katona <41...@users.noreply.github.com>
AuthorDate: Sat Dec 12 21:51:36 2020 +0100

    HBASE-25388 Using an extension of MockProducer on testing side (#76)
    
    Signed-off-by: Peter Somogyi <ps...@apache.org>
---
 kafka/hbase-kafka-proxy/pom.xml                    |  7 ++
 .../hadoop/hbase/kafka/ProducerForTesting.java     | 93 ++--------------------
 2 files changed, 15 insertions(+), 85 deletions(-)

diff --git a/kafka/hbase-kafka-proxy/pom.xml b/kafka/hbase-kafka-proxy/pom.xml
index a4372d4..356838d 100755
--- a/kafka/hbase-kafka-proxy/pom.xml
+++ b/kafka/hbase-kafka-proxy/pom.xml
@@ -161,6 +161,13 @@
       <artifactId>kafka-clients</artifactId>
       <version>${kafka-clients.version}</version>
     </dependency>
+    <dependency>
+      <groupId>org.apache.kafka</groupId>
+      <artifactId>kafka-clients</artifactId>
+      <classifier>test</classifier>
+      <scope>test</scope>
+      <version>${kafka-clients.version}</version>
+    </dependency>
     <!-- General dependencies -->
     <dependency>
       <groupId>org.apache.commons</groupId>
diff --git a/kafka/hbase-kafka-proxy/src/test/java/org/apache/hadoop/hbase/kafka/ProducerForTesting.java b/kafka/hbase-kafka-proxy/src/test/java/org/apache/hadoop/hbase/kafka/ProducerForTesting.java
index 7b767ca..e800501 100644
--- a/kafka/hbase-kafka-proxy/src/test/java/org/apache/hadoop/hbase/kafka/ProducerForTesting.java
+++ b/kafka/hbase-kafka-proxy/src/test/java/org/apache/hadoop/hbase/kafka/ProducerForTesting.java
@@ -19,35 +19,28 @@ import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 import java.util.concurrent.Future;
-import java.util.concurrent.TimeUnit;
 
 import org.apache.avro.io.BinaryDecoder;
 import org.apache.avro.io.DecoderFactory;
 import org.apache.avro.specific.SpecificDatumReader;
-import org.apache.kafka.clients.consumer.OffsetAndMetadata;
-import org.apache.kafka.clients.producer.Callback;
-import org.apache.kafka.clients.producer.Producer;
+import org.apache.kafka.clients.producer.MockProducer;
 import org.apache.kafka.clients.producer.ProducerRecord;
 import org.apache.kafka.clients.producer.RecordMetadata;
-import org.apache.kafka.common.Metric;
-import org.apache.kafka.common.MetricName;
-import org.apache.kafka.common.PartitionInfo;
-import org.apache.kafka.common.TopicPartition;
-import org.apache.kafka.common.errors.ProducerFencedException;
+import org.apache.kafka.test.MockSerializer;
 
 /**
  * Mocks Kafka producer for testing
  */
-public class ProducerForTesting implements Producer<byte[], byte[]> {
+public class ProducerForTesting extends MockProducer<byte[], byte[]> {
   Map<String, List<HBaseKafkaEvent>> messages = new HashMap<>();
   SpecificDatumReader<HBaseKafkaEvent> dreader = new SpecificDatumReader<>(HBaseKafkaEvent.SCHEMA$);
 
-  public Map<String, List<HBaseKafkaEvent>> getMessages() {
-    return messages;
+  public ProducerForTesting() {
+    super(true, new MockSerializer(), new MockSerializer());
   }
 
-  @Override
-  public void abortTransaction() throws ProducerFencedException {
+  public Map<String, List<HBaseKafkaEvent>> getMessages() {
+    return messages;
   }
 
   @Override
@@ -59,79 +52,9 @@ public class ProducerForTesting implements Producer<byte[], byte[]> {
         messages.put(producerRecord.topic(), new ArrayList<>());
       }
       messages.get(producerRecord.topic()).add(event);
-      return new Future<RecordMetadata>() {
-        @Override
-        public boolean cancel(boolean mayInterruptIfRunning) {
-          return false;
-        }
-
-        @Override
-        public boolean isCancelled() {
-          return false;
-        }
-
-        @Override
-        public boolean isDone() {
-          return false;
-        }
-
-        @Override
-        public RecordMetadata get() {
-          return new RecordMetadata(null, 1, 1, 1, (long)1, 1, 1);
-        }
-
-        @Override
-        public RecordMetadata get(long timeout, TimeUnit unit) {
-          return null;
-        }
-      };
+      return super.send(producerRecord);
     } catch (Exception e) {
       throw new RuntimeException(e);
     }
   }
-
-  @Override
-  public Future<RecordMetadata> send(ProducerRecord<byte[], byte[]> producerRecord,
-      Callback callback) {
-    return null;
-  }
-
-  @Override
-  public void flush() {
-  }
-
-  @Override
-  public List<PartitionInfo> partitionsFor(String s) {
-    return null;
-  }
-
-  @Override
-  public Map<MetricName, ? extends Metric> metrics() {
-    return null;
-  }
-
-  @Override
-  public void close() {
-  }
-
-  @Override
-  public void close(long l, TimeUnit timeUnit) {
-  }
-
-  @Override
-  public void initTransactions() {
-  }
-
-  @Override
-  public void beginTransaction() throws ProducerFencedException {
-  }
-
-  @Override
-  public void sendOffsetsToTransaction(Map<TopicPartition, OffsetAndMetadata> offsets,
-      String consumerGroupId) throws ProducerFencedException {
-  }
-
-  @Override
-  public void commitTransaction() throws ProducerFencedException {
-  }
 }