You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@samza.apache.org by "sailingYang (JIRA)" <ji...@apache.org> on 2018/04/15 07:57:00 UTC
[jira] [Created] (SAMZA-1658) samza-kafka module use kafka
deprecated method to get topic metadata
sailingYang created SAMZA-1658:
----------------------------------
Summary: samza-kafka module use kafka deprecated method to get topic metadata
Key: SAMZA-1658
URL: https://issues.apache.org/jira/browse/SAMZA-1658
Project: Samza
Issue Type: Improvement
Components: kafka
Affects Versions: 0.13.1, 0.14.0, 0.13.0, 0.12.0
Reporter: sailingYang
Fix For: 0.15.0
in ClientUtilTopicMetadataStore, samza-kafka use ClientUtils.fetchTopicMetadata to get topic metadata, this is a deprecated method in kafka. this method use SyncProducer and may get some problem.
{code:java}
// code placeholder
def getTopicInfo(topics: Set[String]) = {
val currCorrId = corrID.getAndIncrement
val response: TopicMetadataResponse = ClientUtils.fetchTopicMetadata(topics, brokers, clientId, timeout, currCorrId)
if (response.correlationId != currCorrId) {
throw new SamzaException("CorrelationID did not match for request on topics %s (sent %d, got %d)" format (topics, currCorrId, response.correlationId))
}
response.topicsMetadata
.map(metadata => (metadata.topic, metadata))
.toMap
}
{code}
--
This message was sent by Atlassian JIRA
(v7.6.3#76005)