You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@samza.apache.org by bo...@apache.org on 2018/10/17 23:38:29 UTC
samza git commit: SAMZA-1888: Kafka consumer improvements
Repository: samza
Updated Branches:
refs/heads/master 6d20ee7e4 -> a8a8dc78d
SAMZA-1888: Kafka consumer improvements
Author: Boris S <bs...@linkedin.com>
Author: Boris S <bo...@apache.org>
Author: Boris Shkolnik <bs...@linkedin.com>
Reviewers: bharathkk <co...@gmail.com>
Closes #738 from sborya/KafkaConsumerImprovements
Project: http://git-wip-us.apache.org/repos/asf/samza/repo
Commit: http://git-wip-us.apache.org/repos/asf/samza/commit/a8a8dc78
Tree: http://git-wip-us.apache.org/repos/asf/samza/tree/a8a8dc78
Diff: http://git-wip-us.apache.org/repos/asf/samza/diff/a8a8dc78
Branch: refs/heads/master
Commit: a8a8dc78d6aa375ab2220f8fcf3998d3f012d27d
Parents: 6d20ee7
Author: Boris S <bs...@linkedin.com>
Authored: Wed Oct 17 16:38:19 2018 -0700
Committer: Boris S <bs...@linkedin.com>
Committed: Wed Oct 17 16:38:19 2018 -0700
----------------------------------------------------------------------
.../samza/system/kafka/KafkaSystemAdmin.java | 90 ++++++++------
.../samza/system/kafka/KafkaSystemConsumer.java | 17 ++-
.../kafka/DefaultFetchSimpleConsumer.scala | 66 -----------
.../apache/samza/system/kafka/GetOffset.scala | 116 -------------------
.../samza/system/kafka/TestGetOffset.scala | 110 ------------------
5 files changed, 60 insertions(+), 339 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/samza/blob/a8a8dc78/samza-kafka/src/main/java/org/apache/samza/system/kafka/KafkaSystemAdmin.java
----------------------------------------------------------------------
diff --git a/samza-kafka/src/main/java/org/apache/samza/system/kafka/KafkaSystemAdmin.java b/samza-kafka/src/main/java/org/apache/samza/system/kafka/KafkaSystemAdmin.java
index f761ab3..d2ceafb 100644
--- a/samza-kafka/src/main/java/org/apache/samza/system/kafka/KafkaSystemAdmin.java
+++ b/samza-kafka/src/main/java/org/apache/samza/system/kafka/KafkaSystemAdmin.java
@@ -80,9 +80,9 @@ public class KafkaSystemAdmin implements ExtendedSystemAdmin {
protected final String systemName;
protected final Consumer metadataConsumer;
+ protected final Config config;
- // get ZkUtils object to connect to Kafka's ZK.
- private final Supplier<ZkUtils> getZkConnection;
+ protected AdminClient adminClient = null;
// Custom properties to create a new coordinator stream.
private final Properties coordinatorStreamProperties;
@@ -96,16 +96,14 @@ public class KafkaSystemAdmin implements ExtendedSystemAdmin {
// Kafka properties for intermediate topics creation
private final Map<String, Properties> intermediateStreamProperties;
- // adminClient is required for deleteCommittedMessages operation
- private final AdminClient adminClient;
-
// used for intermediate streams
- private final boolean deleteCommittedMessages;
+ protected final boolean deleteCommittedMessages;
private final AtomicBoolean stopped = new AtomicBoolean(false);
public KafkaSystemAdmin(String systemName, Config config, Consumer metadataConsumer) {
this.systemName = systemName;
+ this.config = config;
if (metadataConsumer == null) {
throw new SamzaException(
@@ -113,35 +111,6 @@ public class KafkaSystemAdmin implements ExtendedSystemAdmin {
}
this.metadataConsumer = metadataConsumer;
- // populate brokerList from either consumer or producer configs
- Properties props = new Properties();
- String brokerList = config.get(
- String.format(KafkaConfig.CONSUMER_CONFIGS_CONFIG_KEY(), systemName, ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG));
- if (brokerList == null) {
- brokerList = config.get(String.format(KafkaConfig.PRODUCER_CONFIGS_CONFIG_KEY(), systemName,
- ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG));
- }
- if (brokerList == null) {
- throw new SamzaException(
- ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG + " is required for systemAdmin for system " + systemName);
- }
- props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, brokerList);
-
- // kafka.admin.AdminUtils requires zkConnect
- // this will change after we move to the new org.apache..AdminClient
- String zkConnect =
- config.get(String.format(KafkaConfig.CONSUMER_CONFIGS_CONFIG_KEY(), systemName, ZOOKEEPER_CONNECT));
- if (StringUtils.isBlank(zkConnect)) {
- throw new SamzaException("Missing zookeeper.connect config for admin for system " + systemName);
- }
- props.put(ZOOKEEPER_CONNECT, zkConnect);
-
- adminClient = AdminClient.create(props);
-
- getZkConnection = () -> {
- return ZkUtils.apply(zkConnect, 6000, 6000, false);
- };
-
KafkaConfig kafkaConfig = new KafkaConfig(config);
coordinatorStreamReplicationFactor = Integer.valueOf(kafkaConfig.getCoordinatorReplicationFactor());
coordinatorStreamProperties = KafkaSystemAdminUtilsScala.getCoordinatorTopicProperties(kafkaConfig);
@@ -197,6 +166,8 @@ public class KafkaSystemAdmin implements ExtendedSystemAdmin {
} catch (Exception e) {
LOG.warn("metadataConsumer.close for system " + systemName + " failed with exception.", e);
}
+ }
+ if (adminClient != null) {
try {
adminClient.close();
} catch (Exception e) {
@@ -546,14 +517,14 @@ public class KafkaSystemAdmin implements ExtendedSystemAdmin {
public boolean createStream(StreamSpec streamSpec) {
LOG.info("Creating Kafka topic: {} on system: {}", streamSpec.getPhysicalName(), streamSpec.getSystemName());
- return KafkaSystemAdminUtilsScala.createStream(toKafkaSpec(streamSpec), getZkConnection);
+ return KafkaSystemAdminUtilsScala.createStream(toKafkaSpec(streamSpec), getZkConnection());
}
@Override
public boolean clearStream(StreamSpec streamSpec) {
LOG.info("Creating Kafka topic: {} on system: {}", streamSpec.getPhysicalName(), streamSpec.getSystemName());
- KafkaSystemAdminUtilsScala.clearStream(streamSpec, getZkConnection);
+ KafkaSystemAdminUtilsScala.clearStream(streamSpec, getZkConnection());
Map<String, List<PartitionInfo>> topicsMetadata = getTopicMetadata(ImmutableSet.of(streamSpec.getPhysicalName()));
return topicsMetadata.get(streamSpec.getPhysicalName()).isEmpty();
@@ -630,11 +601,56 @@ public class KafkaSystemAdmin implements ExtendedSystemAdmin {
@Override
public void deleteMessages(Map<SystemStreamPartition, String> offsets) {
if (deleteCommittedMessages) {
+ if (adminClient == null) {
+ adminClient = AdminClient.create(createAdminClientProperties());
+ }
KafkaSystemAdminUtilsScala.deleteMessages(adminClient, offsets);
deleteMessageCalled = true;
}
}
+ protected Properties createAdminClientProperties() {
+ // populate brokerList from either consumer or producer configs
+ Properties props = new Properties();
+ // included SSL settings if needed
+
+ props.putAll(config.subset(String.format("systems.%s.consumer.", systemName), true));
+
+ //validate brokerList
+ String brokerList = config.get(
+ String.format(KafkaConfig.CONSUMER_CONFIGS_CONFIG_KEY(), systemName, ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG));
+ if (brokerList == null) {
+ brokerList = config.get(String.format(KafkaConfig.PRODUCER_CONFIGS_CONFIG_KEY(), systemName,
+ ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG));
+ }
+ if (brokerList == null) {
+ throw new SamzaException(
+ ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG + " is required for systemAdmin for system " + systemName);
+ }
+ props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, brokerList);
+
+
+ // kafka.admin.AdminUtils requires zkConnect
+ // this will change after we move to the new org.apache..AdminClient
+ String zkConnect =
+ config.get(String.format(KafkaConfig.CONSUMER_CONFIGS_CONFIG_KEY(), systemName, ZOOKEEPER_CONNECT));
+ if (StringUtils.isBlank(zkConnect)) {
+ throw new SamzaException("Missing zookeeper.connect config for admin for system " + systemName);
+ }
+ props.put(ZOOKEEPER_CONNECT, zkConnect);
+
+ return props;
+ }
+
+ private Supplier<ZkUtils> getZkConnection() {
+ String zkConnect =
+ config.get(String.format(KafkaConfig.CONSUMER_CONFIGS_CONFIG_KEY(), systemName, ZOOKEEPER_CONNECT));
+ if (StringUtils.isBlank(zkConnect)) {
+ throw new SamzaException("Missing zookeeper.connect config for admin for system " + systemName);
+ }
+ return () -> ZkUtils.apply(zkConnect, 6000, 6000, false);
+ }
+
/**
* Container for metadata about offsets.
*/
http://git-wip-us.apache.org/repos/asf/samza/blob/a8a8dc78/samza-kafka/src/main/java/org/apache/samza/system/kafka/KafkaSystemConsumer.java
----------------------------------------------------------------------
diff --git a/samza-kafka/src/main/java/org/apache/samza/system/kafka/KafkaSystemConsumer.java b/samza-kafka/src/main/java/org/apache/samza/system/kafka/KafkaSystemConsumer.java
index 65d0e42..b5f283a 100644
--- a/samza-kafka/src/main/java/org/apache/samza/system/kafka/KafkaSystemConsumer.java
+++ b/samza-kafka/src/main/java/org/apache/samza/system/kafka/KafkaSystemConsumer.java
@@ -77,8 +77,10 @@ public class KafkaSystemConsumer<K, V> extends BlockingEnvelopeMap implements Sy
/**
* Create a KafkaSystemConsumer for the provided {@code systemName}
+ * @param kafkaConsumer kafka Consumer object to be used by this system consumer
* @param systemName system name for which we create the consumer
* @param config application config
+ * @param clientId clientId from the kafka consumer to be used in the KafkaConsumerProxy
* @param metrics metrics for this KafkaSystemConsumer
* @param clock system clock
*/
@@ -106,12 +108,13 @@ public class KafkaSystemConsumer<K, V> extends BlockingEnvelopeMap implements Sy
/**
* Create internal kafka consumer object, which will be used in the Proxy.
+ * @param <K> key type for the consumer
+ * @param <V> value type for the consumer
* @param systemName system name for which we create the consumer
* @param kafkaConsumerConfig config object for Kafka's KafkaConsumer
- * @return KafkaConsumer object
+ * @return KafkaConsumer newly created kafka consumer object
*/
- public static <K,V> KafkaConsumer<K, V> createKafkaConsumerImpl(String systemName,
- HashMap<String, Object> kafkaConsumerConfig) {
+ public static <K, V> KafkaConsumer<K, V> createKafkaConsumerImpl(String systemName, HashMap<String, Object> kafkaConsumerConfig) {
LOG.info("Instantiating KafkaConsumer for systemName {} with properties {}", systemName, kafkaConsumerConfig);
return new KafkaConsumer<>(kafkaConsumerConfig);
@@ -176,7 +179,7 @@ public class KafkaSystemConsumer<K, V> extends BlockingEnvelopeMap implements Sy
throw new SamzaException(msg, e);
}
- LOG.info("{}: Changing consumer's starting offset for tp = %s to %s", this, tp, startingOffsetString);
+ LOG.info("{}: Changing consumer's starting offset for tp = {} to {}", this, tp, startingOffsetString);
// add the partition to the proxy
proxy.addTopicPartition(topicPartitionsToSSP.get(tp), startingOffset);
@@ -310,16 +313,10 @@ public class KafkaSystemConsumer<K, V> extends BlockingEnvelopeMap implements Sy
return super.poll(systemStreamPartitions, timeout);
}
- /**
- * convert from TopicPartition to TopicAndPartition
- */
public static TopicAndPartition toTopicAndPartition(TopicPartition tp) {
return new TopicAndPartition(tp.topic(), tp.partition());
}
- /**
- * convert to TopicPartition from SystemStreamPartition
- */
public static TopicPartition toTopicPartition(SystemStreamPartition ssp) {
return new TopicPartition(ssp.getStream(), ssp.getPartition().getPartitionId());
}
http://git-wip-us.apache.org/repos/asf/samza/blob/a8a8dc78/samza-kafka/src/main/scala/org/apache/samza/system/kafka/DefaultFetchSimpleConsumer.scala
----------------------------------------------------------------------
diff --git a/samza-kafka/src/main/scala/org/apache/samza/system/kafka/DefaultFetchSimpleConsumer.scala b/samza-kafka/src/main/scala/org/apache/samza/system/kafka/DefaultFetchSimpleConsumer.scala
deleted file mode 100644
index 5b4886a..0000000
--- a/samza-kafka/src/main/scala/org/apache/samza/system/kafka/DefaultFetchSimpleConsumer.scala
+++ /dev/null
@@ -1,66 +0,0 @@
-/*
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
- */
-
-package org.apache.samza.system.kafka
-
-import kafka.consumer.SimpleConsumer
-import kafka.api._
-import kafka.common.TopicAndPartition
-import kafka.consumer.ConsumerConfig
-
-class DefaultFetchSimpleConsumer(host: scala.Predef.String, port: scala.Int, soTimeout: scala.Int, bufferSize: scala.Int,
- clientId: scala.Predef.String, fetchSize: StreamFetchSizes = new StreamFetchSizes,
- minBytes: Int = ConsumerConfig.MinFetchBytes, maxWait: Int = ConsumerConfig.MaxFetchWaitMs)
- extends SimpleConsumer(host, port, soTimeout, bufferSize, clientId) {
-
- def defaultFetch(fetches: (TopicAndPartition, Long)*) = {
- val fbr = new FetchRequestBuilder().maxWait(maxWait)
- .minBytes(minBytes)
- .clientId(clientId)
-
- fetches.foreach(f => fbr.addFetch(f._1.topic, f._1.partition, f._2, fetchSize.streamValue.getOrElse(f._1.topic, fetchSize.defaultValue)))
-
- this.fetch(fbr.build())
- }
-
- override def close(): Unit = super.close()
-
- override def send(request: TopicMetadataRequest): TopicMetadataResponse = super.send(request)
-
- override def fetch(request: FetchRequest): FetchResponse = super.fetch(request)
-
- override def getOffsetsBefore(request: OffsetRequest): OffsetResponse = super.getOffsetsBefore(request)
-
- override def commitOffsets(request: OffsetCommitRequest): OffsetCommitResponse = super.commitOffsets(request)
-
- override def fetchOffsets(request: OffsetFetchRequest): OffsetFetchResponse = super.fetchOffsets(request)
-
- override def earliestOrLatestOffset(topicAndPartition: TopicAndPartition, earliestOrLatest: Long, consumerId: Int): Long = super.earliestOrLatestOffset(topicAndPartition, earliestOrLatest, consumerId)
-}
-
-/**
- * a simple class for holding values for the stream's fetch size (fetch.message.max.bytes).
- * The stream-level fetch size values are put in the streamValue map streamName -> fetchSize.
- * If stream-level fetch size is not defined, use the default value. The default value is the
- * Kafka's default fetch size value or the system-level fetch size value (if defined).
- */
-case class StreamFetchSizes(defaultValue: Int = ConsumerConfig.MaxFetchSize, streamValue: Map[String, Int] = Map[String, Int]())
-
http://git-wip-us.apache.org/repos/asf/samza/blob/a8a8dc78/samza-kafka/src/main/scala/org/apache/samza/system/kafka/GetOffset.scala
----------------------------------------------------------------------
diff --git a/samza-kafka/src/main/scala/org/apache/samza/system/kafka/GetOffset.scala b/samza-kafka/src/main/scala/org/apache/samza/system/kafka/GetOffset.scala
deleted file mode 100644
index 55b4611..0000000
--- a/samza-kafka/src/main/scala/org/apache/samza/system/kafka/GetOffset.scala
+++ /dev/null
@@ -1,116 +0,0 @@
-/*
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
- */
-
-package org.apache.samza.system.kafka
-
-import org.apache.kafka.common.errors.OffsetOutOfRangeException
-import kafka.api._
-import kafka.common.TopicAndPartition
-import kafka.api.PartitionOffsetRequestInfo
-import org.apache.samza.util.Logging
-import org.apache.samza.util.KafkaUtil
-
-/**
- * GetOffset validates offsets for topic partitions, and manages fetching new
- * offsets for topics using Kafka's auto.offset.reset configuration.
- */
-class GetOffset(
- /**
- * The default auto.offset.reset to use if a topic is not overridden in
- * autoOffsetResetTopics. Any value other than "earliest" or "latest" will
- * result in an exception when getRestOffset is called.
- */
- default: String,
-
- /**
- * Topic-level overrides for auto.offset.reset. Any value other than
- * "earliest" or "latest" will result in an exception when getRestOffset is
- * called.
- */
- autoOffsetResetTopics: Map[String, String] = Map()) extends Logging with Toss {
-
- /**
- * Checks if an offset is valid for a given topic/partition. Validity is
- * defined as an offset that returns a readable non-empty message set with
- * no exceptions.
- */
- def isValidOffset(consumer: DefaultFetchSimpleConsumer, topicAndPartition: TopicAndPartition, offset: String) = {
- info("Validating offset %s for topic and partition %s" format (offset, topicAndPartition))
-
- try {
- val messages = consumer.defaultFetch((topicAndPartition, offset.toLong))
-
- if (messages.hasError) {
- KafkaUtil.maybeThrowException(messages.error(topicAndPartition.topic, topicAndPartition.partition).exception())
- }
-
- info("Able to successfully read from offset %s for topic and partition %s. Using it to instantiate consumer." format (offset, topicAndPartition))
-
- true
- } catch {
- case e: OffsetOutOfRangeException => false
- }
- }
-
- /**
- * Uses a topic's auto.offset.reset setting (defined via the
- * autoOffsetResetTopics map in the constructor) to fetch either the
- * earliest or latest offset. If neither earliest or latest is defined for
- * the topic in question, the default supplied in the constructor will be
- * used.
- */
- def getResetOffset(consumer: DefaultFetchSimpleConsumer, topicAndPartition: TopicAndPartition) = {
- val offsetRequest = new OffsetRequest(Map(topicAndPartition -> new PartitionOffsetRequestInfo(getAutoOffset(topicAndPartition.topic), 1)))
- val offsetResponse = consumer.getOffsetsBefore(offsetRequest)
- val partitionOffsetResponse = offsetResponse
- .partitionErrorAndOffsets
- .get(topicAndPartition)
- .getOrElse(toss("Unable to find offset information for %s" format topicAndPartition))
-
- KafkaUtil.maybeThrowException(partitionOffsetResponse.error.exception())
-
- partitionOffsetResponse
- .offsets
- .headOption
- .getOrElse(toss("Got response, but no offsets defined for %s" format topicAndPartition))
- }
-
- /**
- * Returns either the earliest or latest setting (a Kafka constant) for a
- * given topic using the autoOffsetResetTopics map defined in the
- * constructor. If the topic is not defined in autoOffsetResetTopics, the
- * default value supplied in the constructor will be used. This is used in
- * conjunction with getResetOffset to fetch either the earliest or latest
- * offset for a topic.
- */
- private def getAutoOffset(topic: String): Long = {
- info("Checking if auto.offset.reset is defined for topic %s" format (topic))
- autoOffsetResetTopics.getOrElse(topic, default) match {
- case OffsetRequest.LargestTimeString =>
- info("Got reset of type %s." format OffsetRequest.LargestTimeString)
- OffsetRequest.LatestTime
- case OffsetRequest.SmallestTimeString =>
- info("Got reset of type %s." format OffsetRequest.SmallestTimeString)
- OffsetRequest.EarliestTime
- case other => toss("Can't get offset value for topic %s due to invalid value: %s" format (topic, other))
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/samza/blob/a8a8dc78/samza-kafka/src/test/scala/org/apache/samza/system/kafka/TestGetOffset.scala
----------------------------------------------------------------------
diff --git a/samza-kafka/src/test/scala/org/apache/samza/system/kafka/TestGetOffset.scala b/samza-kafka/src/test/scala/org/apache/samza/system/kafka/TestGetOffset.scala
deleted file mode 100644
index ab82609..0000000
--- a/samza-kafka/src/test/scala/org/apache/samza/system/kafka/TestGetOffset.scala
+++ /dev/null
@@ -1,110 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.samza.system.kafka
-
-import java.nio.ByteBuffer
-
-import kafka.api._
-import kafka.common.TopicAndPartition
-import kafka.consumer.SimpleConsumer
-import kafka.message.Message
-import kafka.message.ByteBufferMessageSet
-import org.apache.kafka.common.errors.OffsetOutOfRangeException
-import org.junit._
-import org.junit.Assert._
-import org.mockito.Mockito
-import org.mockito.Mockito._
-import org.mockito.Matchers._
-import org.mockito.invocation.InvocationOnMock
-import org.mockito.stubbing.Answer
-
-class TestGetOffset {
-
- private val outOfRangeOffset : String = "0"
-
- /**
- * An empty message set is still a valid offset. It just means that the
- * offset was for the upcoming message, which hasn't yet been written. The
- * fetch request times out in such a case, and an empty message set is
- * returned.
- */
- @Test
- def testIsValidOffsetWorksWithEmptyMessageSet {
- val getOffset = new GetOffset(OffsetRequest.LargestTimeString)
- // Should not throw an exception.
- assertTrue(getOffset.isValidOffset(getMockDefaultFetchSimpleConsumer, TopicAndPartition("foo", 1), "1234"))
- }
-
- /**
- * An empty message set is still a valid offset. It just means that the
- * offset was for the upcoming message, which hasn't yet been written. The
- * fetch request times out in such a case, and an empty message set is
- * returned.
- */
- @Test
- def testIsValidOffsetWorksWithOffsetOutOfRangeException {
- val getOffset = new GetOffset(OffsetRequest.LargestTimeString)
- // Should not throw an exception.
- assertFalse(getOffset.isValidOffset(getMockDefaultFetchSimpleConsumer, TopicAndPartition("foo", 1), outOfRangeOffset))
- }
-
- /**
- * Create a default fetch simple consumer that returns empty message sets.
- */
- def getMockDefaultFetchSimpleConsumer = {
- new DefaultFetchSimpleConsumer("", 0, 0, 0, "") {
- val sc = Mockito.mock(classOf[SimpleConsumer])
-
- // Build an empty fetch response.
- val fetchResponse = {
- val fetchResponse = Mockito.mock(classOf[FetchResponse])
- val messageSet = {
- val messageSet = Mockito.mock(classOf[ByteBufferMessageSet])
- val messages = List()
-
- def getMessage() = new Message(Mockito.mock(classOf[ByteBuffer]))
-
- when(messageSet.sizeInBytes).thenReturn(0)
- when(messageSet.size).thenReturn(0)
- when(messageSet.iterator).thenReturn(messages.iterator)
-
- messageSet
- }
- when(fetchResponse.messageSet(any(classOf[String]), any(classOf[Int]))).thenReturn(messageSet)
-
- fetchResponse
- }
-
- doAnswer(new Answer[FetchResponse] {
- override def answer(invocation: InvocationOnMock): FetchResponse = {
- if (invocation.getArgumentAt(0, classOf[FetchRequest]).requestInfo.exists(
- req => req._2.offset.toString.equals(outOfRangeOffset))) {
- throw new OffsetOutOfRangeException("test exception")
- }
- fetchResponse
- }
- }).when(sc).fetch(any(classOf[FetchRequest]))
-
- override def fetch(request: FetchRequest): FetchResponse = {
- sc.fetch(request)
- }
- }
- }
-}