You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@storm.apache.org by pt...@apache.org on 2014/04/21 21:44:37 UTC

[38/50] [abbrv] git commit: updated log messages

updated log messages


Project: http://git-wip-us.apache.org/repos/asf/incubator-storm/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-storm/commit/9370c5cc
Tree: http://git-wip-us.apache.org/repos/asf/incubator-storm/tree/9370c5cc
Diff: http://git-wip-us.apache.org/repos/asf/incubator-storm/diff/9370c5cc

Branch: refs/heads/master
Commit: 9370c5ccaab437f2aa1c0f5fad55802aaa3b2b96
Parents: 98cfe93
Author: wurstmeister <wu...@users.noreply.github.com>
Authored: Sat Apr 5 13:44:35 2014 +0100
Committer: wurstmeister <wu...@users.noreply.github.com>
Committed: Sat Apr 5 13:54:39 2014 +0100

----------------------------------------------------------------------
 src/jvm/storm/kafka/KafkaUtils.java       |  6 +++++-
 src/jvm/storm/kafka/PartitionManager.java | 21 +++++++++------------
 src/jvm/storm/kafka/ZkCoordinator.java    | 14 ++++++--------
 3 files changed, 20 insertions(+), 21 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-storm/blob/9370c5cc/src/jvm/storm/kafka/KafkaUtils.java
----------------------------------------------------------------------
diff --git a/src/jvm/storm/kafka/KafkaUtils.java b/src/jvm/storm/kafka/KafkaUtils.java
index 4e8b3a3..0e7f601 100644
--- a/src/jvm/storm/kafka/KafkaUtils.java
+++ b/src/jvm/storm/kafka/KafkaUtils.java
@@ -204,11 +204,15 @@ public class KafkaUtils {
     }
 
     private static void logPartitionMapping(int totalTasks, int taskIndex, List<Partition> taskPartitions) {
-        String taskPrefix = "[" + taskIndex + "/" + totalTasks + "] --> ";
+        String taskPrefix = taskId(taskIndex, totalTasks);
         if (taskPartitions.isEmpty()) {
             LOG.warn(taskPrefix + "no partitions assigned");
         } else {
             LOG.info(taskPrefix + "assigned " + taskPartitions);
         }
     }
+
+    public static String taskId(int taskIndex, int totalTasks) {
+        return "Task [" + (taskIndex + 1) + "/" + totalTasks + "] ";
+    }
 }

http://git-wip-us.apache.org/repos/asf/incubator-storm/blob/9370c5cc/src/jvm/storm/kafka/PartitionManager.java
----------------------------------------------------------------------
diff --git a/src/jvm/storm/kafka/PartitionManager.java b/src/jvm/storm/kafka/PartitionManager.java
index fc9d817..915f0f9 100644
--- a/src/jvm/storm/kafka/PartitionManager.java
+++ b/src/jvm/storm/kafka/PartitionManager.java
@@ -6,7 +6,6 @@ import backtype.storm.metric.api.CountMetric;
 import backtype.storm.metric.api.MeanReducer;
 import backtype.storm.metric.api.ReducedMetric;
 import backtype.storm.spout.SpoutOutputCollector;
-import backtype.storm.utils.Utils;
 import com.google.common.collect.ImmutableMap;
 import kafka.api.OffsetRequest;
 import kafka.javaapi.consumer.SimpleConsumer;
@@ -64,7 +63,7 @@ public class PartitionManager {
         String path = committedPath();
         try {
             Map<Object, Object> json = _state.readJSON(path);
-            LOG.info("Read partition information from: " + path +  "  --> " + json );
+            LOG.info("Read partition information from: " + path +  " --> " + json );
             if (json != null) {
                 jsonTopologyId = (String) ((Map<Object, Object>) json.get("topology")).get("id");
                 jsonOffset = (Long) json.get("offset");
@@ -84,7 +83,7 @@ public class PartitionManager {
             LOG.info("Read last commit offset from zookeeper: " + _committedTo + "; old topology_id: " + jsonTopologyId + " - new topology_id: " + topologyInstanceId );
         }
 
-        LOG.info("Starting Kafka " + _consumer.host() + ":" + id.partition + " from offset " + _committedTo);
+        LOG.info("Starting " + _partition + " from offset " + _committedTo);
         _emittedToOffset = _committedTo;
 
         _fetchAPILatencyMax = new CombinedMetric(new MaxMetric());
@@ -141,7 +140,7 @@ public class PartitionManager {
         _fetchAPIMessageCount.incrBy(numMessages);
 
         if (numMessages > 0) {
-            LOG.info("Fetched " + numMessages + " messages from Kafka: " + _consumer.host() + ":" + _partition.partition);
+            LOG.info("Fetched " + numMessages + " messages from: " + _partition);
         }
         for (MessageAndOffset msg : msgs) {
             _pending.add(_emittedToOffset);
@@ -149,7 +148,7 @@ public class PartitionManager {
             _emittedToOffset = msg.nextOffset();
         }
         if (numMessages > 0) {
-            LOG.info("Added " + numMessages + " messages from Kafka: " + _consumer.host() + ":" + _partition.partition + " to internal buffers");
+            LOG.info("Added " + numMessages + " messages from: " + _partition + " to internal buffers");
         }
     }
 
@@ -175,7 +174,6 @@ public class PartitionManager {
     }
 
     public void commit() {
-        LOG.info("Committing offset for " + _partition);
         long committedTo;
         if (_pending.isEmpty()) {
             committedTo = _emittedToOffset;
@@ -183,9 +181,8 @@ public class PartitionManager {
             committedTo = _pending.first();
         }
         if (committedTo != _committedTo) {
-            LOG.info("Writing committed offset to ZK: " + committedTo);
-
-            Map<Object, Object> data = (Map<Object, Object>) ImmutableMap.builder()
+            LOG.info("Writing committed offset (" + committedTo + ") to ZK for " + _partition + " for topology: " + _topologyInstanceId);
+            Map<Object, Object> data = ImmutableMap.builder()
                     .put("topology", ImmutableMap.of("id", _topologyInstanceId,
                             "name", _stormConf.get(Config.TOPOLOGY_NAME)))
                     .put("offset", committedTo)
@@ -194,11 +191,11 @@ public class PartitionManager {
                             "port", _partition.host.port))
                     .put("topic", _spoutConfig.topic).build();
             _state.writeJSON(committedPath(), data);
-
-            LOG.info("Wrote committed offset to ZK: " + committedTo);
             _committedTo = committedTo;
+            LOG.info("Wrote committed offset (" + committedTo + ") to ZK for " + _partition + " for topology: " + _topologyInstanceId);
+        } else {
+            LOG.info("No new offset for " + _partition + " for topology: " + _topologyInstanceId);
         }
-        LOG.info("Committed offset " + committedTo + " for " + _partition + " for topology: " + _topologyInstanceId);
     }
 
     private String committedPath() {

http://git-wip-us.apache.org/repos/asf/incubator-storm/blob/9370c5cc/src/jvm/storm/kafka/ZkCoordinator.java
----------------------------------------------------------------------
diff --git a/src/jvm/storm/kafka/ZkCoordinator.java b/src/jvm/storm/kafka/ZkCoordinator.java
index 35d2c57..ec35aed 100644
--- a/src/jvm/storm/kafka/ZkCoordinator.java
+++ b/src/jvm/storm/kafka/ZkCoordinator.java
@@ -6,6 +6,8 @@ import storm.kafka.trident.GlobalPartitionInformation;
 
 import java.util.*;
 
+import static storm.kafka.KafkaUtils.taskId;
+
 public class ZkCoordinator implements PartitionCoordinator {
     public static final Logger LOG = LoggerFactory.getLogger(ZkCoordinator.class);
 
@@ -55,7 +57,7 @@ public class ZkCoordinator implements PartitionCoordinator {
 
     void refresh() {
         try {
-            LOG.info(taskIdentifier() + "Refreshing partition manager connections");
+            LOG.info(taskId(_taskIndex, _totalTasks) + "Refreshing partition manager connections");
             GlobalPartitionInformation brokerInfo = _reader.getBrokerInfo();
             List<Partition> mine = KafkaUtils.calculatePartitionsForTask(brokerInfo, _totalTasks, _taskIndex);
 
@@ -66,13 +68,13 @@ public class ZkCoordinator implements PartitionCoordinator {
             Set<Partition> deletedPartitions = new HashSet<Partition>(curr);
             deletedPartitions.removeAll(mine);
 
-            LOG.info(taskIdentifier() + "Deleted partition managers: " + deletedPartitions.toString());
+            LOG.info(taskId(_taskIndex, _totalTasks) + "Deleted partition managers: " + deletedPartitions.toString());
 
             for (Partition id : deletedPartitions) {
                 PartitionManager man = _managers.remove(id);
                 man.close();
             }
-            LOG.info(taskIdentifier() + "New partition managers: " + newPartitions.toString());
+            LOG.info(taskId(_taskIndex, _totalTasks) + "New partition managers: " + newPartitions.toString());
 
             for (Partition id : newPartitions) {
                 PartitionManager man = new PartitionManager(_connections, _topologyInstanceId, _state, _stormConf, _spoutConfig, id);
@@ -83,11 +85,7 @@ public class ZkCoordinator implements PartitionCoordinator {
             throw new RuntimeException(e);
         }
         _cachedList = new ArrayList<PartitionManager>(_managers.values());
-        LOG.info(taskIdentifier() + "Finished refreshing");
-    }
-
-    private String taskIdentifier() {
-        return "[" + _taskIndex + "/" + _totalTasks + "] - ";
+        LOG.info(taskId(_taskIndex, _totalTasks) + "Finished refreshing");
     }
 
     @Override