You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@storm.apache.org by pt...@apache.org on 2014/04/21 21:44:37 UTC
[38/50] [abbrv] git commit: updated log messages
updated log messages
Project: http://git-wip-us.apache.org/repos/asf/incubator-storm/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-storm/commit/9370c5cc
Tree: http://git-wip-us.apache.org/repos/asf/incubator-storm/tree/9370c5cc
Diff: http://git-wip-us.apache.org/repos/asf/incubator-storm/diff/9370c5cc
Branch: refs/heads/master
Commit: 9370c5ccaab437f2aa1c0f5fad55802aaa3b2b96
Parents: 98cfe93
Author: wurstmeister <wu...@users.noreply.github.com>
Authored: Sat Apr 5 13:44:35 2014 +0100
Committer: wurstmeister <wu...@users.noreply.github.com>
Committed: Sat Apr 5 13:54:39 2014 +0100
----------------------------------------------------------------------
src/jvm/storm/kafka/KafkaUtils.java | 6 +++++-
src/jvm/storm/kafka/PartitionManager.java | 21 +++++++++------------
src/jvm/storm/kafka/ZkCoordinator.java | 14 ++++++--------
3 files changed, 20 insertions(+), 21 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-storm/blob/9370c5cc/src/jvm/storm/kafka/KafkaUtils.java
----------------------------------------------------------------------
diff --git a/src/jvm/storm/kafka/KafkaUtils.java b/src/jvm/storm/kafka/KafkaUtils.java
index 4e8b3a3..0e7f601 100644
--- a/src/jvm/storm/kafka/KafkaUtils.java
+++ b/src/jvm/storm/kafka/KafkaUtils.java
@@ -204,11 +204,15 @@ public class KafkaUtils {
}
private static void logPartitionMapping(int totalTasks, int taskIndex, List<Partition> taskPartitions) {
- String taskPrefix = "[" + taskIndex + "/" + totalTasks + "] --> ";
+ String taskPrefix = taskId(taskIndex, totalTasks);
if (taskPartitions.isEmpty()) {
LOG.warn(taskPrefix + "no partitions assigned");
} else {
LOG.info(taskPrefix + "assigned " + taskPartitions);
}
}
+
+ public static String taskId(int taskIndex, int totalTasks) {
+ return "Task [" + (taskIndex + 1) + "/" + totalTasks + "] ";
+ }
}
http://git-wip-us.apache.org/repos/asf/incubator-storm/blob/9370c5cc/src/jvm/storm/kafka/PartitionManager.java
----------------------------------------------------------------------
diff --git a/src/jvm/storm/kafka/PartitionManager.java b/src/jvm/storm/kafka/PartitionManager.java
index fc9d817..915f0f9 100644
--- a/src/jvm/storm/kafka/PartitionManager.java
+++ b/src/jvm/storm/kafka/PartitionManager.java
@@ -6,7 +6,6 @@ import backtype.storm.metric.api.CountMetric;
import backtype.storm.metric.api.MeanReducer;
import backtype.storm.metric.api.ReducedMetric;
import backtype.storm.spout.SpoutOutputCollector;
-import backtype.storm.utils.Utils;
import com.google.common.collect.ImmutableMap;
import kafka.api.OffsetRequest;
import kafka.javaapi.consumer.SimpleConsumer;
@@ -64,7 +63,7 @@ public class PartitionManager {
String path = committedPath();
try {
Map<Object, Object> json = _state.readJSON(path);
- LOG.info("Read partition information from: " + path + " --> " + json );
+ LOG.info("Read partition information from: " + path + " --> " + json );
if (json != null) {
jsonTopologyId = (String) ((Map<Object, Object>) json.get("topology")).get("id");
jsonOffset = (Long) json.get("offset");
@@ -84,7 +83,7 @@ public class PartitionManager {
LOG.info("Read last commit offset from zookeeper: " + _committedTo + "; old topology_id: " + jsonTopologyId + " - new topology_id: " + topologyInstanceId );
}
- LOG.info("Starting Kafka " + _consumer.host() + ":" + id.partition + " from offset " + _committedTo);
+ LOG.info("Starting " + _partition + " from offset " + _committedTo);
_emittedToOffset = _committedTo;
_fetchAPILatencyMax = new CombinedMetric(new MaxMetric());
@@ -141,7 +140,7 @@ public class PartitionManager {
_fetchAPIMessageCount.incrBy(numMessages);
if (numMessages > 0) {
- LOG.info("Fetched " + numMessages + " messages from Kafka: " + _consumer.host() + ":" + _partition.partition);
+ LOG.info("Fetched " + numMessages + " messages from: " + _partition);
}
for (MessageAndOffset msg : msgs) {
_pending.add(_emittedToOffset);
@@ -149,7 +148,7 @@ public class PartitionManager {
_emittedToOffset = msg.nextOffset();
}
if (numMessages > 0) {
- LOG.info("Added " + numMessages + " messages from Kafka: " + _consumer.host() + ":" + _partition.partition + " to internal buffers");
+ LOG.info("Added " + numMessages + " messages from: " + _partition + " to internal buffers");
}
}
@@ -175,7 +174,6 @@ public class PartitionManager {
}
public void commit() {
- LOG.info("Committing offset for " + _partition);
long committedTo;
if (_pending.isEmpty()) {
committedTo = _emittedToOffset;
@@ -183,9 +181,8 @@ public class PartitionManager {
committedTo = _pending.first();
}
if (committedTo != _committedTo) {
- LOG.info("Writing committed offset to ZK: " + committedTo);
-
- Map<Object, Object> data = (Map<Object, Object>) ImmutableMap.builder()
+ LOG.info("Writing committed offset (" + committedTo + ") to ZK for " + _partition + " for topology: " + _topologyInstanceId);
+ Map<Object, Object> data = ImmutableMap.builder()
.put("topology", ImmutableMap.of("id", _topologyInstanceId,
"name", _stormConf.get(Config.TOPOLOGY_NAME)))
.put("offset", committedTo)
@@ -194,11 +191,11 @@ public class PartitionManager {
"port", _partition.host.port))
.put("topic", _spoutConfig.topic).build();
_state.writeJSON(committedPath(), data);
-
- LOG.info("Wrote committed offset to ZK: " + committedTo);
_committedTo = committedTo;
+ LOG.info("Wrote committed offset (" + committedTo + ") to ZK for " + _partition + " for topology: " + _topologyInstanceId);
+ } else {
+ LOG.info("No new offset for " + _partition + " for topology: " + _topologyInstanceId);
}
- LOG.info("Committed offset " + committedTo + " for " + _partition + " for topology: " + _topologyInstanceId);
}
private String committedPath() {
http://git-wip-us.apache.org/repos/asf/incubator-storm/blob/9370c5cc/src/jvm/storm/kafka/ZkCoordinator.java
----------------------------------------------------------------------
diff --git a/src/jvm/storm/kafka/ZkCoordinator.java b/src/jvm/storm/kafka/ZkCoordinator.java
index 35d2c57..ec35aed 100644
--- a/src/jvm/storm/kafka/ZkCoordinator.java
+++ b/src/jvm/storm/kafka/ZkCoordinator.java
@@ -6,6 +6,8 @@ import storm.kafka.trident.GlobalPartitionInformation;
import java.util.*;
+import static storm.kafka.KafkaUtils.taskId;
+
public class ZkCoordinator implements PartitionCoordinator {
public static final Logger LOG = LoggerFactory.getLogger(ZkCoordinator.class);
@@ -55,7 +57,7 @@ public class ZkCoordinator implements PartitionCoordinator {
void refresh() {
try {
- LOG.info(taskIdentifier() + "Refreshing partition manager connections");
+ LOG.info(taskId(_taskIndex, _totalTasks) + "Refreshing partition manager connections");
GlobalPartitionInformation brokerInfo = _reader.getBrokerInfo();
List<Partition> mine = KafkaUtils.calculatePartitionsForTask(brokerInfo, _totalTasks, _taskIndex);
@@ -66,13 +68,13 @@ public class ZkCoordinator implements PartitionCoordinator {
Set<Partition> deletedPartitions = new HashSet<Partition>(curr);
deletedPartitions.removeAll(mine);
- LOG.info(taskIdentifier() + "Deleted partition managers: " + deletedPartitions.toString());
+ LOG.info(taskId(_taskIndex, _totalTasks) + "Deleted partition managers: " + deletedPartitions.toString());
for (Partition id : deletedPartitions) {
PartitionManager man = _managers.remove(id);
man.close();
}
- LOG.info(taskIdentifier() + "New partition managers: " + newPartitions.toString());
+ LOG.info(taskId(_taskIndex, _totalTasks) + "New partition managers: " + newPartitions.toString());
for (Partition id : newPartitions) {
PartitionManager man = new PartitionManager(_connections, _topologyInstanceId, _state, _stormConf, _spoutConfig, id);
@@ -83,11 +85,7 @@ public class ZkCoordinator implements PartitionCoordinator {
throw new RuntimeException(e);
}
_cachedList = new ArrayList<PartitionManager>(_managers.values());
- LOG.info(taskIdentifier() + "Finished refreshing");
- }
-
- private String taskIdentifier() {
- return "[" + _taskIndex + "/" + _totalTasks + "] - ";
+ LOG.info(taskId(_taskIndex, _totalTasks) + "Finished refreshing");
}
@Override