You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by gu...@apache.org on 2018/03/28 21:40:24 UTC
[kafka] branch trunk updated: MINOR: Change getMessage to toString
(#4790)
This is an automated email from the ASF dual-hosted git repository.
guozhang pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push:
new 28f1fc2 MINOR: Change getMessage to toString (#4790)
28f1fc2 is described below
commit 28f1fc2f55269f43f0b2bb769b78f80b8cc9cf51
Author: Guozhang Wang <wa...@gmail.com>
AuthorDate: Wed Mar 28 14:40:21 2018 -0700
MINOR: Change getMessage to toString (#4790)
Reviewers: Bill Bejeck <bi...@confluent.io>, Matthias J. Sax <ma...@confluent.io>
---
.../apache/kafka/streams/processor/internals/AssignedTasks.java | 4 ++--
.../kafka/streams/processor/internals/GlobalStateManagerImpl.java | 4 ++--
.../kafka/streams/processor/internals/InternalTopicManager.java | 8 ++++----
.../kafka/streams/processor/internals/RecordCollectorImpl.java | 8 ++++----
.../apache/kafka/streams/processor/internals/StreamThread.java | 4 ++--
5 files changed, 14 insertions(+), 14 deletions(-)
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/AssignedTasks.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/AssignedTasks.java
index 9204571..4c4941a 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/AssignedTasks.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/AssignedTasks.java
@@ -94,7 +94,7 @@ abstract class AssignedTasks<T extends Task> {
it.remove();
} catch (final LockException e) {
// made this trace as it will spam the logs in the poll loop.
- log.trace("Could not create {} {} due to {}; will retry", taskTypeName, entry.getKey(), e.getMessage());
+ log.trace("Could not create {} {} due to {}; will retry", taskTypeName, entry.getKey(), e.toString());
}
}
}
@@ -204,7 +204,7 @@ abstract class AssignedTasks<T extends Task> {
try {
task.close(false, true);
} catch (final RuntimeException e) {
- log.warn("Failed to close zombie {} {} due to {}; ignore and proceed.", taskTypeName, task.id(), e.getMessage());
+ log.warn("Failed to close zombie {} {} due to {}; ignore and proceed.", taskTypeName, task.id(), e.toString());
return e;
}
return null;
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/GlobalStateManagerImpl.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/GlobalStateManagerImpl.java
index 56e6bed..e8ec5e9 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/GlobalStateManagerImpl.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/GlobalStateManagerImpl.java
@@ -276,7 +276,7 @@ public class GlobalStateManagerImpl extends AbstractStateManager implements Glob
} catch (final InvalidOffsetException recoverableException) {
log.warn("Restoring GlobalStore {} failed due to: {}. Deleting global store to recreate from scratch.",
storeName,
- recoverableException.getMessage());
+ recoverableException.toString());
reinitializeStateStoresForPartitions(recoverableException.partitions(), processorContext);
stateRestoreListener.onRestoreStart(topicPartition, storeName, offset, highWatermark);
@@ -318,7 +318,7 @@ public class GlobalStateManagerImpl extends AbstractStateManager implements Glob
closeFailed.append("Failed to close global state store:")
.append(entry.getKey())
.append(". Reason: ")
- .append(e.getMessage())
+ .append(e.toString())
.append("\n");
}
}
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/InternalTopicManager.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/InternalTopicManager.java
index aeff946..2ac37bd 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/InternalTopicManager.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/InternalTopicManager.java
@@ -130,9 +130,9 @@ public class InternalTopicManager {
"Will try again (remaining retries {}).", topicName, remainingRetries - 1);
} else if (cause instanceof TopicExistsException) {
createTopicNames.add(createTopicResult.getKey());
- log.info(String.format("Topic %s exist already: %s",
+ log.info("Topic {} exist already: {}",
topicName,
- couldNotCreateTopic.getMessage()));
+ couldNotCreateTopic.toString());
} else {
throw new StreamsException(String.format("Could not create topic %s.", topicName),
couldNotCreateTopic);
@@ -197,8 +197,8 @@ public class InternalTopicManager {
log.debug("Could not get number of partitions for topic {} due to timeout. " +
"Will try again (remaining retries {}).", topicFuture.getKey(), remainingRetries - 1);
} else {
- final String error = "Could not get number of partitions for topic {}.";
- log.debug(error, topicFuture.getKey(), cause.getMessage());
+ final String error = "Could not get number of partitions for topic {} due to {}";
+ log.debug(error, topicFuture.getKey(), cause.toString());
}
}
}
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/RecordCollectorImpl.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/RecordCollectorImpl.java
index 286cd81..f568048 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/RecordCollectorImpl.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/RecordCollectorImpl.java
@@ -130,7 +130,7 @@ public class RecordCollectorImpl implements RecordCollector {
value,
timestamp,
topic,
- exception.getMessage()),
+ exception.toString()),
exception);
}
@@ -163,7 +163,7 @@ public class RecordCollectorImpl implements RecordCollector {
} else {
if (sendException == null) {
if (exception instanceof ProducerFencedException) {
- log.warn(LOG_MESSAGE, key, value, timestamp, topic, exception.getMessage());
+ log.warn(LOG_MESSAGE, key, value, timestamp, topic, exception.toString());
sendException = new ProducerFencedException(
String.format(EXCEPTION_MESSAGE,
logPrefix,
@@ -172,7 +172,7 @@ public class RecordCollectorImpl implements RecordCollector {
value,
timestamp,
topic,
- exception.getMessage()));
+ exception.toString()));
} else {
if (productionExceptionIsFatal(exception)) {
recordSendError(key, value, timestamp, topic, exception);
@@ -201,7 +201,7 @@ public class RecordCollectorImpl implements RecordCollector {
value,
timestamp,
topic,
- uncaughtException.getMessage()),
+ uncaughtException.toString()),
uncaughtException);
}
}
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java
index 02a4bb9..ab96cce 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java
@@ -260,7 +260,7 @@ public class StreamThread extends Thread {
taskManager.createTasks(assignment);
} catch (final Throwable t) {
log.error("Error caught during partition assignment, " +
- "will abort the current process and re-throw at the end of rebalance: {}", t.getMessage());
+ "will abort the current process and re-throw at the end of rebalance: {}", t);
streamThread.setRebalanceException(t);
} finally {
log.info("partition assignment took {} ms.\n" +
@@ -291,7 +291,7 @@ public class StreamThread extends Thread {
taskManager.suspendTasksAndState();
} catch (final Throwable t) {
log.error("Error caught during partition revocation, " +
- "will abort the current process and re-throw at the end of rebalance: {}", t.getMessage());
+ "will abort the current process and re-throw at the end of rebalance: {}", t);
streamThread.setRebalanceException(t);
} finally {
streamThread.clearStandbyRecords();
--
To stop receiving notification emails like this one, please contact
guozhang@apache.org.