You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@samza.apache.org by bo...@apache.org on 2018/09/18 23:01:43 UTC

[46/47] samza git commit: rename of a var

rename of a var


Project: http://git-wip-us.apache.org/repos/asf/samza/repo
Commit: http://git-wip-us.apache.org/repos/asf/samza/commit/74b6cfab
Tree: http://git-wip-us.apache.org/repos/asf/samza/tree/74b6cfab
Diff: http://git-wip-us.apache.org/repos/asf/samza/diff/74b6cfab

Branch: refs/heads/NewKafkaSystemConsumer
Commit: 74b6cfabdbb5112488965c2fc3629156e0ff8c4c
Parents: ed0648d
Author: Boris S <bo...@apache.org>
Authored: Tue Sep 18 14:17:58 2018 -0700
Committer: Boris S <bo...@apache.org>
Committed: Tue Sep 18 14:17:58 2018 -0700

----------------------------------------------------------------------
 .../apache/samza/system/kafka/KafkaConsumerProxy.java | 14 +++++++-------
 1 file changed, 7 insertions(+), 7 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/samza/blob/74b6cfab/samza-kafka/src/main/scala/org/apache/samza/system/kafka/KafkaConsumerProxy.java
----------------------------------------------------------------------
diff --git a/samza-kafka/src/main/scala/org/apache/samza/system/kafka/KafkaConsumerProxy.java b/samza-kafka/src/main/scala/org/apache/samza/system/kafka/KafkaConsumerProxy.java
index 6fc6491..b67df0a 100644
--- a/samza-kafka/src/main/scala/org/apache/samza/system/kafka/KafkaConsumerProxy.java
+++ b/samza-kafka/src/main/scala/org/apache/samza/system/kafka/KafkaConsumerProxy.java
@@ -367,20 +367,20 @@ import org.slf4j.LoggerFactory;
     Using the consumer to poll the messages from the stream.
    */
   private void fetchMessages() {
-    Set<SystemStreamPartition> SSPsToFetch = new HashSet<>();
+    Set<SystemStreamPartition> sspsToFetch = new HashSet<>();
     for (SystemStreamPartition ssp : nextOffsets.keySet()) {
       if (sink.needsMoreMessages(ssp)) {
-        SSPsToFetch.add(ssp);
+        sspsToFetch.add(ssp);
       }
     }
-    LOG.debug("pollConsumer {}", SSPsToFetch.size());
-    if (!SSPsToFetch.isEmpty()) {
+    LOG.debug("pollConsumer {}", sspsToFetch.size());
+    if (!sspsToFetch.isEmpty()) {
       kafkaConsumerMetrics.incClientReads(metricName);
 
       Map<SystemStreamPartition, List<IncomingMessageEnvelope>> response;
-      LOG.debug("pollConsumer from following SSPs: {}; total#={}", SSPsToFetch, SSPsToFetch.size());
+      LOG.debug("pollConsumer from following SSPs: {}; total#={}", sspsToFetch, sspsToFetch.size());
 
-      response = pollConsumer(SSPsToFetch, 500); // TODO should be default value from ConsumerConfig
+      response = pollConsumer(sspsToFetch, 500); // TODO should be default value from ConsumerConfig
 
       // move the responses into the queue
       for (Map.Entry<SystemStreamPartition, List<IncomingMessageEnvelope>> e : response.entrySet()) {
@@ -390,7 +390,7 @@ import org.slf4j.LoggerFactory;
         }
       }
 
-      populateCurrentLags(SSPsToFetch); // find current lags for for each SSP
+      populateCurrentLags(sspsToFetch); // find current lags for for each SSP
     } else { // nothing to read
 
       LOG.debug("No topic/partitions need to be fetched for consumer {} right now. Sleeping {}ms.", kafkaConsumer,