You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@samza.apache.org by bo...@apache.org on 2018/09/18 23:01:43 UTC
[46/47] samza git commit: rename of a var
rename of a var
Project: http://git-wip-us.apache.org/repos/asf/samza/repo
Commit: http://git-wip-us.apache.org/repos/asf/samza/commit/74b6cfab
Tree: http://git-wip-us.apache.org/repos/asf/samza/tree/74b6cfab
Diff: http://git-wip-us.apache.org/repos/asf/samza/diff/74b6cfab
Branch: refs/heads/NewKafkaSystemConsumer
Commit: 74b6cfabdbb5112488965c2fc3629156e0ff8c4c
Parents: ed0648d
Author: Boris S <bo...@apache.org>
Authored: Tue Sep 18 14:17:58 2018 -0700
Committer: Boris S <bo...@apache.org>
Committed: Tue Sep 18 14:17:58 2018 -0700
----------------------------------------------------------------------
.../apache/samza/system/kafka/KafkaConsumerProxy.java | 14 +++++++-------
1 file changed, 7 insertions(+), 7 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/samza/blob/74b6cfab/samza-kafka/src/main/scala/org/apache/samza/system/kafka/KafkaConsumerProxy.java
----------------------------------------------------------------------
diff --git a/samza-kafka/src/main/scala/org/apache/samza/system/kafka/KafkaConsumerProxy.java b/samza-kafka/src/main/scala/org/apache/samza/system/kafka/KafkaConsumerProxy.java
index 6fc6491..b67df0a 100644
--- a/samza-kafka/src/main/scala/org/apache/samza/system/kafka/KafkaConsumerProxy.java
+++ b/samza-kafka/src/main/scala/org/apache/samza/system/kafka/KafkaConsumerProxy.java
@@ -367,20 +367,20 @@ import org.slf4j.LoggerFactory;
Using the consumer to poll the messages from the stream.
*/
private void fetchMessages() {
- Set<SystemStreamPartition> SSPsToFetch = new HashSet<>();
+ Set<SystemStreamPartition> sspsToFetch = new HashSet<>();
for (SystemStreamPartition ssp : nextOffsets.keySet()) {
if (sink.needsMoreMessages(ssp)) {
- SSPsToFetch.add(ssp);
+ sspsToFetch.add(ssp);
}
}
- LOG.debug("pollConsumer {}", SSPsToFetch.size());
- if (!SSPsToFetch.isEmpty()) {
+ LOG.debug("pollConsumer {}", sspsToFetch.size());
+ if (!sspsToFetch.isEmpty()) {
kafkaConsumerMetrics.incClientReads(metricName);
Map<SystemStreamPartition, List<IncomingMessageEnvelope>> response;
- LOG.debug("pollConsumer from following SSPs: {}; total#={}", SSPsToFetch, SSPsToFetch.size());
+ LOG.debug("pollConsumer from following SSPs: {}; total#={}", sspsToFetch, sspsToFetch.size());
- response = pollConsumer(SSPsToFetch, 500); // TODO should be default value from ConsumerConfig
+ response = pollConsumer(sspsToFetch, 500); // TODO should be default value from ConsumerConfig
// move the responses into the queue
for (Map.Entry<SystemStreamPartition, List<IncomingMessageEnvelope>> e : response.entrySet()) {
@@ -390,7 +390,7 @@ import org.slf4j.LoggerFactory;
}
}
- populateCurrentLags(SSPsToFetch); // find current lags for for each SSP
+ populateCurrentLags(sspsToFetch); // find current lags for for each SSP
} else { // nothing to read
LOG.debug("No topic/partitions need to be fetched for consumer {} right now. Sleeping {}ms.", kafkaConsumer,