You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@gobblin.apache.org by su...@apache.org on 2021/06/30 00:21:48 UTC
[gobblin] branch master updated: [GOBBLIN-1483] Handle null valued
ConsumerRecords in Kafka Streaming Extractor[]
This is an automated email from the ASF dual-hosted git repository.
suvasude pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/gobblin.git
The following commit(s) were added to refs/heads/master by this push:
new 15c88f3 [GOBBLIN-1483] Handle null valued ConsumerRecords in Kafka Streaming Extractor[]
15c88f3 is described below
commit 15c88f33330f60f62b18c367d02220839d7ab1a6
Author: suvasude <su...@linkedin.biz>
AuthorDate: Tue Jun 29 17:21:41 2021 -0700
[GOBBLIN-1483] Handle null valued ConsumerRecords in Kafka Streaming Extractor[]
Closes #3323 from sv2000/npeKafkaNullRecord
---
.../extract/kafka/KafkaExtractorStatsTracker.java | 31 +++++++++++
.../extract/kafka/KafkaStreamingExtractor.java | 40 +++++++++-----
.../kafka/KafkaExtractorStatsTrackerTest.java | 17 ++++++
.../extract/kafka/KafkaStreamTestUtils.java | 62 ++++++++++------------
.../extract/kafka/KafkaStreamingExtractorTest.java | 36 +++++++++----
5 files changed, 131 insertions(+), 55 deletions(-)
diff --git a/gobblin-modules/gobblin-kafka-common/src/main/java/org/apache/gobblin/source/extractor/extract/kafka/KafkaExtractorStatsTracker.java b/gobblin-modules/gobblin-kafka-common/src/main/java/org/apache/gobblin/source/extractor/extract/kafka/KafkaExtractorStatsTracker.java
index 25c94d6..b1bf197 100644
--- a/gobblin-modules/gobblin-kafka-common/src/main/java/org/apache/gobblin/source/extractor/extract/kafka/KafkaExtractorStatsTracker.java
+++ b/gobblin-modules/gobblin-kafka-common/src/main/java/org/apache/gobblin/source/extractor/extract/kafka/KafkaExtractorStatsTracker.java
@@ -66,6 +66,7 @@ public class KafkaExtractorStatsTracker {
private static final String MIN_LOG_APPEND_TIMESTAMP = "minLogAppendTimestamp";
private static final String MAX_LOG_APPEND_TIMESTAMP = "maxLogAppendTimestamp";
private static final String UNDECODABLE_MESSAGE_COUNT = "undecodableMessageCount";
+ private static final String NULL_RECORD_COUNT = "nullRecordCount";
private static final String PARTITION_TOTAL_SIZE = "partitionTotalSize";
private static final String AVG_RECORD_PULL_TIME = "avgRecordPullTime";
private static final String AVG_RECORD_SIZE = "avgRecordSize";
@@ -92,6 +93,9 @@ public class KafkaExtractorStatsTracker {
//TopicPartitions.
@Getter
private int undecodableMessageCount = 0;
+ @Getter
+ private int nullRecordCount = 0;
+
private List<KafkaPartition> partitions;
private long maxPossibleLatency;
@@ -157,6 +161,7 @@ public class KafkaExtractorStatsTracker {
@Data
public static class ExtractorStats {
private long decodingErrorCount = -1L;
+ private long nullRecordCount = -1L;
private double avgMillisPerRecord = -1;
private long avgRecordSize;
private long elapsedTime;
@@ -198,6 +203,15 @@ public class KafkaExtractorStatsTracker {
}
/**
+ *
+ * @param partitionIdx index of Kafka topic partition.
+ * @return the number of null valued records for a given partition id.
+ */
+ public Long getNullRecordCount(int partitionIdx) {
+ return this.statsMap.get(this.partitions.get(partitionIdx)).getNullRecordCount();
+ }
+
+ /**
* Called when the KafkaExtractor encounters an undecodeable record.
*/
public void onUndecodeableRecord(int partitionIdx) {
@@ -206,6 +220,22 @@ public class KafkaExtractorStatsTracker {
incrementErrorCount(partitionIdx);
}
+ public void onNullRecord(int partitionIdx) {
+ this.nullRecordCount++;
+ incrementNullRecordCount(partitionIdx);
+ }
+
+ private void incrementNullRecordCount(int partitionIdx) {
+ this.statsMap.computeIfPresent(this.partitions.get(partitionIdx), (k, v) -> {
+ if (v.nullRecordCount < 0) {
+ v.nullRecordCount = 1;
+ } else {
+ v.nullRecordCount++;
+ }
+ return v;
+ });
+ }
+
private void incrementErrorCount(int partitionIdx) {
this.statsMap.computeIfPresent(this.partitions.get(partitionIdx), (k, v) -> {
if (v.decodingErrorCount < 0) {
@@ -412,6 +442,7 @@ public class KafkaExtractorStatsTracker {
Long.toString(TimeUnit.NANOSECONDS.toMillis(stats.getFetchMessageBufferTime())));
tagsForPartition.put(READ_RECORD_TIME, Long.toString(TimeUnit.NANOSECONDS.toMillis(stats.getReadRecordTime())));
tagsForPartition.put(UNDECODABLE_MESSAGE_COUNT, Long.toString(stats.getDecodingErrorCount()));
+ tagsForPartition.put(NULL_RECORD_COUNT, Long.toString(stats.getNullRecordCount()));
tagsForPartition.put(LAST_RECORD_HEADER_TIMESTAMP, Long.toString(stats.getLastSuccessfulRecordHeaderTimestamp()));
// Commit avg time to pull a record for each partition
diff --git a/gobblin-modules/gobblin-kafka-common/src/main/java/org/apache/gobblin/source/extractor/extract/kafka/KafkaStreamingExtractor.java b/gobblin-modules/gobblin-kafka-common/src/main/java/org/apache/gobblin/source/extractor/extract/kafka/KafkaStreamingExtractor.java
index 9f2ac76..bc2904b 100644
--- a/gobblin-modules/gobblin-kafka-common/src/main/java/org/apache/gobblin/source/extractor/extract/kafka/KafkaStreamingExtractor.java
+++ b/gobblin-modules/gobblin-kafka-common/src/main/java/org/apache/gobblin/source/extractor/extract/kafka/KafkaStreamingExtractor.java
@@ -87,7 +87,8 @@ import static org.apache.gobblin.source.extractor.extract.kafka.workunit.packer.
public class KafkaStreamingExtractor<S> extends FlushingExtractor<S, DecodeableKafkaRecord> {
public static final String DATASET_KEY = "dataset";
public static final String DATASET_PARTITION_KEY = "datasetPartition";
- private static final Long MAX_LOG_DECODING_ERRORS = 5L;
+ private static final Long MAX_LOG_ERRORS = 100L;
+
private static final String KAFKA_EXTRACTOR_STATS_REPORTING_INTERVAL_MINUTES_KEY =
"gobblin.kafka.extractor.statsReportingIntervalMinutes";
private static final Long DEFAULT_KAFKA_EXTRACTOR_STATS_REPORTING_INTERVAL_MINUTES = 1L;
@@ -350,20 +351,33 @@ public class KafkaStreamingExtractor<S> extends FlushingExtractor<S, DecodeableK
this.readStartTime = System.nanoTime();
long fetchStartTime = System.nanoTime();
try {
- while (this.messageIterator == null || !this.messageIterator.hasNext()) {
- Long currentTime = System.currentTimeMillis();
- //it's time to flush, so break the while loop and directly return null
- if ((currentTime - timeOfLastFlush) > this.flushIntervalMillis) {
- return new FlushRecordEnvelope();
+ DecodeableKafkaRecord kafkaConsumerRecord;
+ while(true) {
+ while (this.messageIterator == null || !this.messageIterator.hasNext()) {
+ Long currentTime = System.currentTimeMillis();
+ //it's time to flush, so break the while loop and directly return null
+ if ((currentTime - timeOfLastFlush) > this.flushIntervalMillis) {
+ return new FlushRecordEnvelope();
+ }
+ try {
+ fetchStartTime = System.nanoTime();
+ this.messageIterator = this.kafkaConsumerClient.consume();
+ } catch (Exception e) {
+ log.error("Failed to consume from Kafka", e);
+ }
}
- try {
- fetchStartTime = System.nanoTime();
- this.messageIterator = this.kafkaConsumerClient.consume();
- } catch (Exception e) {
- log.error("Failed to consume from Kafka", e);
+ kafkaConsumerRecord = (DecodeableKafkaRecord) this.messageIterator.next();
+ if (kafkaConsumerRecord.getValue() != null) {
+ break;
+ } else {
+ //Filter the null-valued records early, so that they do not break the pipeline downstream.
+ if (shouldLogError()) {
+ log.error("Encountered a null-valued record at offset: {}, partition: {}", kafkaConsumerRecord.getOffset(),
+ kafkaConsumerRecord.getPartition());
+ }
+ this.statsTracker.onNullRecord(this.partitionIdToIndexMap.get(kafkaConsumerRecord.getPartition()));
}
}
- DecodeableKafkaRecord kafkaConsumerRecord = (DecodeableKafkaRecord) this.messageIterator.next();
int partitionIndex = this.partitionIdToIndexMap.get(kafkaConsumerRecord.getPartition());
this.statsTracker.onFetchNextMessageBuffer(partitionIndex, fetchStartTime);
@@ -395,7 +409,7 @@ public class KafkaStreamingExtractor<S> extends FlushingExtractor<S, DecodeableK
}
private boolean shouldLogError() {
- return this.statsTracker.getUndecodableMessageCount() <= MAX_LOG_DECODING_ERRORS;
+ return (this.statsTracker.getUndecodableMessageCount() + this.statsTracker.getNullRecordCount()) <= MAX_LOG_ERRORS;
}
@Override
diff --git a/gobblin-modules/gobblin-kafka-common/src/test/java/org/apache/gobblin/source/extractor/extract/kafka/KafkaExtractorStatsTrackerTest.java b/gobblin-modules/gobblin-kafka-common/src/test/java/org/apache/gobblin/source/extractor/extract/kafka/KafkaExtractorStatsTrackerTest.java
index 9cecc59..bef77d8 100644
--- a/gobblin-modules/gobblin-kafka-common/src/test/java/org/apache/gobblin/source/extractor/extract/kafka/KafkaExtractorStatsTrackerTest.java
+++ b/gobblin-modules/gobblin-kafka-common/src/test/java/org/apache/gobblin/source/extractor/extract/kafka/KafkaExtractorStatsTrackerTest.java
@@ -74,6 +74,23 @@ public class KafkaExtractorStatsTrackerTest {
}
@Test
+ public void testOnNullRecord() {
+ //Ensure that counters are initialized correctly
+ Assert.assertEquals(this.extractorStatsTracker.getNullRecordCount(0).longValue(), -1);
+ Assert.assertEquals(this.extractorStatsTracker.getNullRecordCount(0).longValue(), -1);
+
+ //Ensure that counters are updated correctly after 1st call to KafkaExtractorStatsTracker#onNullRecord()
+ this.extractorStatsTracker.onNullRecord(0);
+ Assert.assertEquals(this.extractorStatsTracker.getNullRecordCount(0).longValue(), 1);
+ Assert.assertEquals(this.extractorStatsTracker.getNullRecordCount(1).longValue(), -1);
+
+ //Ensure that counters are updated correctly after 2nd call to KafkaExtractorStatsTracker#onUndecodeableRecord()
+ this.extractorStatsTracker.onNullRecord(0);
+ Assert.assertEquals(this.extractorStatsTracker.getNullRecordCount(0).longValue(), 2);
+ Assert.assertEquals(this.extractorStatsTracker.getNullRecordCount(1).longValue(), -1);
+ }
+
+ @Test
public void testResetStartFetchEpochTime() {
long currentTime = System.currentTimeMillis();
this.extractorStatsTracker.resetStartFetchEpochTime(1);
diff --git a/gobblin-modules/gobblin-kafka-common/src/test/java/org/apache/gobblin/source/extractor/extract/kafka/KafkaStreamTestUtils.java b/gobblin-modules/gobblin-kafka-common/src/test/java/org/apache/gobblin/source/extractor/extract/kafka/KafkaStreamTestUtils.java
index 731c0b1..25647a0 100644
--- a/gobblin-modules/gobblin-kafka-common/src/test/java/org/apache/gobblin/source/extractor/extract/kafka/KafkaStreamTestUtils.java
+++ b/gobblin-modules/gobblin-kafka-common/src/test/java/org/apache/gobblin/source/extractor/extract/kafka/KafkaStreamTestUtils.java
@@ -43,7 +43,6 @@ import org.apache.gobblin.kafka.client.DecodeableKafkaRecord;
import org.apache.gobblin.kafka.client.GobblinKafkaConsumerClient;
import org.apache.gobblin.kafka.client.KafkaConsumerRecord;
import org.apache.gobblin.metrics.kafka.KafkaSchemaRegistry;
-import org.apache.gobblin.metrics.kafka.SchemaRegistryException;
import org.apache.gobblin.source.extractor.extract.LongWatermark;
import org.apache.gobblin.util.ConfigUtils;
@@ -78,23 +77,21 @@ public class KafkaStreamTestUtils {
*/
public static class MockKafkaConsumerClient implements GobblinKafkaConsumerClient {
public static final String NUM_PARTITIONS_ASSIGNED = "gobblin.kafka.streaming.numPartitions";
+ public static final String CAN_RETURN_NULL_VALUED_RECORDS = "gobblin.kafka.streaming.canReturnNulls";
private final Map<KafkaPartition, Long> latestOffsets = Maps.newHashMap();
private final Random random = new Random();
private final String topicName;
+ private final boolean canReturnNullValuedRecords;
private final List<Integer> partitionIds;
protected MockKafkaConsumerClient(Config baseConfig) {
this.topicName = baseConfig.getString(KafkaSource.TOPIC_NAME);
int numPartitionsAssigned = ConfigUtils.getInt(baseConfig, NUM_PARTITIONS_ASSIGNED, 0);
+ this.canReturnNullValuedRecords = ConfigUtils.getBoolean(baseConfig, CAN_RETURN_NULL_VALUED_RECORDS, false);
this.partitionIds = getPartitionIds(baseConfig, numPartitionsAssigned);
}
- public MockKafkaConsumerClient() {
- topicName = "";
- partitionIds = Lists.newArrayList();
- }
-
private List<Integer> getPartitionIds(Config baseConfig, int numPartitionsAssigned) {
List<Integer> partitionIds = Lists.newArrayList();
for (int i = 0; i < numPartitionsAssigned; i++) {
@@ -110,7 +107,7 @@ public class KafkaStreamTestUtils {
*/
@Override
public Iterator<KafkaConsumerRecord> consume() {
- return new MockIterator(this.topicName, this.partitionIds);
+ return new MockIterator(this.topicName, this.partitionIds, this.canReturnNullValuedRecords);
}
@Override
@@ -125,14 +122,12 @@ public class KafkaStreamTestUtils {
}
@Override
- public long getEarliestOffset(KafkaPartition partition)
- throws KafkaOffsetRetrievalFailureException {
+ public long getEarliestOffset(KafkaPartition partition) {
return 0;
}
@Override
- public long getLatestOffset(KafkaPartition partition)
- throws KafkaOffsetRetrievalFailureException {
+ public long getLatestOffset(KafkaPartition partition) {
return 0;
}
@@ -180,23 +175,23 @@ public class KafkaStreamTestUtils {
}
@Override
- protected Schema fetchSchemaByKey(String key) throws SchemaRegistryException {
+ protected Schema fetchSchemaByKey(String key) {
return null;
}
@Override
- public Schema getLatestSchemaByTopic(String topic) throws SchemaRegistryException {
+ public Schema getLatestSchemaByTopic(String topic) {
return latestSchema;
}
@Override
- public String register(Schema schema) throws SchemaRegistryException {
+ public String register(Schema schema) {
return null;
}
@Override
- public String register(Schema schema, String name) throws SchemaRegistryException {
- this.latestSchema = schema;
+ public String register(Schema schema, String name) {
+ latestSchema = schema;
return schema.toString();
}
}
@@ -209,20 +204,18 @@ public class KafkaStreamTestUtils {
}
@Override
- public String register(String name, Schema schema)
- throws org.apache.gobblin.kafka.schemareg.SchemaRegistryException {
+ public String register(String name, Schema schema) {
this.latestSchema = schema;
return schema.toString();
}
@Override
- public Schema getById(String id) throws IOException, org.apache.gobblin.kafka.schemareg.SchemaRegistryException {
+ public Schema getById(String id) {
return null;
}
@Override
- public Schema getLatestSchema(String name)
- throws IOException, org.apache.gobblin.kafka.schemareg.SchemaRegistryException {
+ public Schema getLatestSchema(String name) {
return this.latestSchema;
}
@@ -239,7 +232,7 @@ public class KafkaStreamTestUtils {
* partition ids.
*/
public static class MockIterator implements Iterator<KafkaConsumerRecord> {
- //Schema for LiKafka10ConsumerRecords. TODO: Enhance the iterator to return random records
+ // Schema for LiKafka10ConsumerRecords. TODO: Enhance the iterator to return random records
// according to a given schema.
private static final String SCHEMA =
"{\"namespace\": \"example.avro\",\n" + " \"type\": \"record\",\n" + " \"name\": \"user\",\n"
@@ -247,22 +240,23 @@ public class KafkaStreamTestUtils {
+ " {\"name\": \"DUMMY\", \"type\": [\"null\",\"string\"]}\n" + " ]\n" + "}";
private final Schema schema = new Schema.Parser().parse(SCHEMA);
- private final Random random = new Random();
private final String topicName;
private final long maxNumRecords;
private final List<Integer> partitionIds;
private final long[] nextOffsets;
+ private final boolean canReturnNullRecords;
private long numRecordsReturnedSoFar;
private int partitionIdx = 0;
- public MockIterator(String topicName, List<Integer> partitionIds) {
- this(topicName, partitionIds, Long.MAX_VALUE);
+ public MockIterator(String topicName, List<Integer> partitionIds, boolean canReturnNullRecords) {
+ this(topicName, partitionIds, canReturnNullRecords, Long.MAX_VALUE);
}
- public MockIterator(String topicName, List<Integer> partitionIds, long numRecords) {
+ public MockIterator(String topicName, List<Integer> partitionIds, boolean canReturnNullRecords, long numRecords) {
this.topicName = topicName;
this.maxNumRecords = numRecords;
this.partitionIds = partitionIds;
+ this.canReturnNullRecords = canReturnNullRecords;
this.nextOffsets = new long[partitionIds.size()];
}
@@ -287,10 +281,10 @@ public class KafkaStreamTestUtils {
@Override
public KafkaConsumerRecord next() {
this.numRecordsReturnedSoFar++;
- return getMockConsumerRecord(this.numRecordsReturnedSoFar);
+ return getMockConsumerRecord();
}
- private KafkaConsumerRecord getMockConsumerRecord(long numRecordsReturnedSoFar) {
+ private KafkaConsumerRecord getMockConsumerRecord() {
DecodeableKafkaRecord mockRecord = Mockito.mock(DecodeableKafkaRecord.class);
Mockito.when(mockRecord.getValue()).thenReturn(getRecord());
Mockito.when(mockRecord.getTopic()).thenReturn(topicName);
@@ -303,12 +297,14 @@ public class KafkaStreamTestUtils {
}
private GenericRecord getRecord() {
- GenericRecord record = new GenericData.Record(schema);
- record.put("name", UUID.randomUUID());
- return record;
+ if ((!this.canReturnNullRecords) || (this.numRecordsReturnedSoFar % 2 == 0)) {
+ GenericRecord record = new GenericData.Record(schema);
+ record.put("name", UUID.randomUUID());
+ return record;
+ } else {
+ return null;
+ }
}
}
-
-
}
diff --git a/gobblin-modules/gobblin-kafka-common/src/test/java/org/apache/gobblin/source/extractor/extract/kafka/KafkaStreamingExtractorTest.java b/gobblin-modules/gobblin-kafka-common/src/test/java/org/apache/gobblin/source/extractor/extract/kafka/KafkaStreamingExtractorTest.java
index 927fe74..922d130 100644
--- a/gobblin-modules/gobblin-kafka-common/src/test/java/org/apache/gobblin/source/extractor/extract/kafka/KafkaStreamingExtractorTest.java
+++ b/gobblin-modules/gobblin-kafka-common/src/test/java/org/apache/gobblin/source/extractor/extract/kafka/KafkaStreamingExtractorTest.java
@@ -26,9 +26,12 @@ import org.testng.annotations.BeforeClass;
import org.testng.annotations.Test;
import org.apache.gobblin.configuration.WorkUnitState;
+import org.apache.gobblin.kafka.client.DecodeableKafkaRecord;
import org.apache.gobblin.publisher.DataPublisher;
import org.apache.gobblin.source.extractor.DataRecordException;
import org.apache.gobblin.source.extractor.extract.FlushingExtractor;
+import org.apache.gobblin.stream.RecordEnvelope;
+import org.apache.gobblin.stream.StreamEntity;
public class KafkaStreamingExtractorTest {
@@ -37,9 +40,9 @@ public class KafkaStreamingExtractorTest {
@BeforeClass
public void setUp() {
- WorkUnitState state = KafkaExtractorUtils.getWorkUnitState("testTopic", numPartitions);
- state.setProp(FlushingExtractor.FLUSH_DATA_PUBLISHER_CLASS, TestDataPublisher.class.getName());
- this.streamingExtractor = new KafkaStreamingExtractor(state);
+ WorkUnitState state1 = KafkaExtractorUtils.getWorkUnitState("testTopic", numPartitions);
+ state1.setProp(FlushingExtractor.FLUSH_DATA_PUBLISHER_CLASS, TestDataPublisher.class.getName());
+ this.streamingExtractor = new KafkaStreamingExtractor(state1);
}
@Test
@@ -48,17 +51,17 @@ public class KafkaStreamingExtractorTest {
MultiLongWatermark highWatermark1 = new MultiLongWatermark(this.streamingExtractor.highWatermark);
//Read 3 records
- this.streamingExtractor.readStreamEntityImpl();
+ StreamEntity<DecodeableKafkaRecord> streamEntity = this.streamingExtractor.readStreamEntityImpl();
Assert.assertEquals(this.streamingExtractor.nextWatermark.get(0), 1L);
Assert.assertEquals(this.streamingExtractor.nextWatermark.get(1), 0L);
Assert.assertEquals(this.streamingExtractor.nextWatermark.get(2), 0L);
- streamingExtractor.readStreamEntityImpl();
+ this.streamingExtractor.readStreamEntityImpl();
Assert.assertEquals(this.streamingExtractor.nextWatermark.get(0), 1L);
Assert.assertEquals(this.streamingExtractor.nextWatermark.get(1), 1L);
Assert.assertEquals(this.streamingExtractor.nextWatermark.get(2), 0L);
- streamingExtractor.readStreamEntityImpl();
+ this.streamingExtractor.readStreamEntityImpl();
Assert.assertEquals(this.streamingExtractor.nextWatermark.get(0), 1L);
Assert.assertEquals(this.streamingExtractor.nextWatermark.get(1), 1L);
Assert.assertEquals(this.streamingExtractor.nextWatermark.get(2), 1L);
@@ -103,7 +106,7 @@ public class KafkaStreamingExtractorTest {
}
@Test
- public void testGenerateAdditionalTagHelper() throws Exception {
+ public void testGenerateAdditionalTagHelper() {
// Verifying that produce rate has been added.
Map<KafkaPartition, Map<String, String>> result = this.streamingExtractor.getAdditionalTagsHelper();
for (Map<String, String> entry: result.values()) {
@@ -111,6 +114,22 @@ public class KafkaStreamingExtractorTest {
}
}
+ @Test
+ public void testReadRecordEnvelopeImpl()
+ throws IOException {
+ WorkUnitState state = KafkaExtractorUtils.getWorkUnitState("testTopic", numPartitions);
+ state.setProp(FlushingExtractor.FLUSH_DATA_PUBLISHER_CLASS, TestDataPublisher.class.getName());
+ //Enable config that allows underlying KafkaConsumerClient to return null-valued Kafka records.
+ state.setProp(KafkaStreamTestUtils.MockKafkaConsumerClient.CAN_RETURN_NULL_VALUED_RECORDS, "true");
+ KafkaStreamingExtractor streamingExtractorWithNulls = new KafkaStreamingExtractor(state);
+
+ //Extract 4 records. Ensure each record returned by readRecordEnvelopeImpl() is a non-null valued record.
+ for (int i = 0; i < 4; i++) {
+ RecordEnvelope<DecodeableKafkaRecord> recordEnvelope = streamingExtractorWithNulls.readRecordEnvelopeImpl();
+ Assert.assertNotNull(recordEnvelope.getRecord().getValue() != null);
+ }
+ }
+
static class TestDataPublisher extends DataPublisher {
public TestDataPublisher(WorkUnitState state) {
super(state);
@@ -131,8 +150,7 @@ public class KafkaStreamingExtractorTest {
}
@Override
- public void close()
- throws IOException {
+ public void close() {
}
}