You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by ij...@apache.org on 2016/08/23 22:47:30 UTC
kafka git commit: KAFKA-3937;
Kafka Clients Leak Native Memory For Longer Than Needed With
Compressed Messages
Repository: kafka
Updated Branches:
refs/heads/trunk cec2769e2 -> d85aaf95f
KAFKA-3937; Kafka Clients Leak Native Memory For Longer Than Needed With Compressed Messages
ijuma - Making the change against trunk based on your suggestions to have the stream closing handled in the private RecordIterator constructor which I understand is only to be used only if the block of message(s) are compressed.
Author: William Yu <wy...@unified.com>
Author: Ismael Juma <is...@juma.me.uk>
Reviewers: Ismael Juma <is...@juma.me.uk>
Closes #1760 from wiyu/compressor_memory_leak_in_fetcher
Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/d85aaf95
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/d85aaf95
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/d85aaf95
Branch: refs/heads/trunk
Commit: d85aaf95febca62cb03f4c2a2f84401cae4800d2
Parents: cec2769
Author: William Yu <wy...@unified.com>
Authored: Tue Aug 23 22:20:14 2016 +0100
Committer: Ismael Juma <is...@juma.me.uk>
Committed: Tue Aug 23 23:46:41 2016 +0100
----------------------------------------------------------------------
.../kafka/common/record/MemoryRecords.java | 39 ++++++++++++--------
.../org/apache/kafka/common/utils/Utils.java | 13 +++++++
2 files changed, 37 insertions(+), 15 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/kafka/blob/d85aaf95/clients/src/main/java/org/apache/kafka/common/record/MemoryRecords.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/record/MemoryRecords.java b/clients/src/main/java/org/apache/kafka/common/record/MemoryRecords.java
index 603f74b..3848ea9 100644
--- a/clients/src/main/java/org/apache/kafka/common/record/MemoryRecords.java
+++ b/clients/src/main/java/org/apache/kafka/common/record/MemoryRecords.java
@@ -21,6 +21,7 @@ import java.util.Iterator;
import org.apache.kafka.common.KafkaException;
import org.apache.kafka.common.utils.AbstractIterator;
+import org.apache.kafka.common.utils.Utils;
/**
* A {@link Records} implementation backed by a ByteBuffer.
@@ -245,30 +246,38 @@ public class MemoryRecords implements Records {
this.shallow = true;
this.stream = Compressor.wrapForInput(new ByteBufferInputStream(this.buffer), type, entry.record().magic());
long wrapperRecordOffset = entry.offset();
+
+ long wrapperRecordTimestamp = entry.record().timestamp();
+ this.logEntries = new ArrayDeque<>();
// If relative offset is used, we need to decompress the entire message first to compute
- // the absolute offset.
- if (entry.record().magic() > Record.MAGIC_VALUE_V0) {
- this.logEntries = new ArrayDeque<>();
- long wrapperRecordTimestamp = entry.record().timestamp();
+ // the absolute offset. For simplicity and because it's a format that is on its way out, we
+ // do the same for message format version 0
+ try {
while (true) {
try {
LogEntry logEntry = getNextEntryFromStream();
- Record recordWithTimestamp = new Record(logEntry.record().buffer(),
- wrapperRecordTimestamp,
- entry.record().timestampType());
- logEntries.add(new LogEntry(logEntry.offset(), recordWithTimestamp));
+ if (entry.record().magic() > Record.MAGIC_VALUE_V0) {
+ Record recordWithTimestamp = new Record(
+ logEntry.record().buffer(),
+ wrapperRecordTimestamp,
+ entry.record().timestampType()
+ );
+ logEntry = new LogEntry(logEntry.offset(), recordWithTimestamp);
+ }
+ logEntries.add(logEntry);
} catch (EOFException e) {
break;
- } catch (IOException e) {
- throw new KafkaException(e);
}
}
- this.absoluteBaseOffset = wrapperRecordOffset - logEntries.getLast().offset();
- } else {
- this.logEntries = null;
- this.absoluteBaseOffset = -1;
+ if (entry.record().magic() > Record.MAGIC_VALUE_V0)
+ this.absoluteBaseOffset = wrapperRecordOffset - logEntries.getLast().offset();
+ else
+ this.absoluteBaseOffset = -1;
+ } catch (IOException e) {
+ throw new KafkaException(e);
+ } finally {
+ Utils.closeQuietly(stream, "records iterator stream");
}
-
}
/*
http://git-wip-us.apache.org/repos/asf/kafka/blob/d85aaf95/clients/src/main/java/org/apache/kafka/common/utils/Utils.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/utils/Utils.java b/clients/src/main/java/org/apache/kafka/common/utils/Utils.java
index 4629baf..8d7014a 100755
--- a/clients/src/main/java/org/apache/kafka/common/utils/Utils.java
+++ b/clients/src/main/java/org/apache/kafka/common/utils/Utils.java
@@ -700,6 +700,19 @@ public class Utils {
}
/**
+ * Closes {@code closeable} and if an exception is thrown, it is logged at the WARN level.
+ */
+ public static void closeQuietly(Closeable closeable, String name) {
+ if (closeable != null) {
+ try {
+ closeable.close();
+ } catch (Throwable t) {
+ log.warn("Failed to close " + name, t);
+ }
+ }
+ }
+
+ /**
* A cheap way to deterministically convert a number to a positive value. When the input is
* positive, the original value is returned. When the input number is negative, the returned
* positive value is the original value bit AND against 0x7fffffff which is not its absolutely