You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by ij...@apache.org on 2016/08/23 22:47:30 UTC

kafka git commit: KAFKA-3937; Kafka Clients Leak Native Memory For Longer Than Needed With Compressed Messages

Repository: kafka
Updated Branches:
  refs/heads/trunk cec2769e2 -> d85aaf95f


KAFKA-3937; Kafka Clients Leak Native Memory For Longer Than Needed With Compressed Messages

ijuma - Making the change against trunk based on your suggestions to have the stream closing handled in the private RecordIterator constructor which I understand is only to be used only if the block of message(s) are compressed.

Author: William Yu <wy...@unified.com>
Author: Ismael Juma <is...@juma.me.uk>

Reviewers: Ismael Juma <is...@juma.me.uk>

Closes #1760 from wiyu/compressor_memory_leak_in_fetcher


Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/d85aaf95
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/d85aaf95
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/d85aaf95

Branch: refs/heads/trunk
Commit: d85aaf95febca62cb03f4c2a2f84401cae4800d2
Parents: cec2769
Author: William Yu <wy...@unified.com>
Authored: Tue Aug 23 22:20:14 2016 +0100
Committer: Ismael Juma <is...@juma.me.uk>
Committed: Tue Aug 23 23:46:41 2016 +0100

----------------------------------------------------------------------
 .../kafka/common/record/MemoryRecords.java      | 39 ++++++++++++--------
 .../org/apache/kafka/common/utils/Utils.java    | 13 +++++++
 2 files changed, 37 insertions(+), 15 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kafka/blob/d85aaf95/clients/src/main/java/org/apache/kafka/common/record/MemoryRecords.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/record/MemoryRecords.java b/clients/src/main/java/org/apache/kafka/common/record/MemoryRecords.java
index 603f74b..3848ea9 100644
--- a/clients/src/main/java/org/apache/kafka/common/record/MemoryRecords.java
+++ b/clients/src/main/java/org/apache/kafka/common/record/MemoryRecords.java
@@ -21,6 +21,7 @@ import java.util.Iterator;
 
 import org.apache.kafka.common.KafkaException;
 import org.apache.kafka.common.utils.AbstractIterator;
+import org.apache.kafka.common.utils.Utils;
 
 /**
  * A {@link Records} implementation backed by a ByteBuffer.
@@ -245,30 +246,38 @@ public class MemoryRecords implements Records {
             this.shallow = true;
             this.stream = Compressor.wrapForInput(new ByteBufferInputStream(this.buffer), type, entry.record().magic());
             long wrapperRecordOffset = entry.offset();
+
+            long wrapperRecordTimestamp = entry.record().timestamp();
+            this.logEntries = new ArrayDeque<>();
             // If relative offset is used, we need to decompress the entire message first to compute
-            // the absolute offset.
-            if (entry.record().magic() > Record.MAGIC_VALUE_V0) {
-                this.logEntries = new ArrayDeque<>();
-                long wrapperRecordTimestamp = entry.record().timestamp();
+            // the absolute offset. For simplicity and because it's a format that is on its way out, we
+            // do the same for message format version 0
+            try {
                 while (true) {
                     try {
                         LogEntry logEntry = getNextEntryFromStream();
-                        Record recordWithTimestamp = new Record(logEntry.record().buffer(),
-                                                                wrapperRecordTimestamp,
-                                                                entry.record().timestampType());
-                        logEntries.add(new LogEntry(logEntry.offset(), recordWithTimestamp));
+                        if (entry.record().magic() > Record.MAGIC_VALUE_V0) {
+                            Record recordWithTimestamp = new Record(
+                                    logEntry.record().buffer(),
+                                    wrapperRecordTimestamp,
+                                    entry.record().timestampType()
+                            );
+                            logEntry = new LogEntry(logEntry.offset(), recordWithTimestamp);
+                        }
+                        logEntries.add(logEntry);
                     } catch (EOFException e) {
                         break;
-                    } catch (IOException e) {
-                        throw new KafkaException(e);
                     }
                 }
-                this.absoluteBaseOffset = wrapperRecordOffset - logEntries.getLast().offset();
-            } else {
-                this.logEntries = null;
-                this.absoluteBaseOffset = -1;
+                if (entry.record().magic() > Record.MAGIC_VALUE_V0)
+                    this.absoluteBaseOffset = wrapperRecordOffset - logEntries.getLast().offset();
+                else
+                    this.absoluteBaseOffset = -1;
+            } catch (IOException e) {
+                throw new KafkaException(e);
+            } finally {
+                Utils.closeQuietly(stream, "records iterator stream");
             }
-
         }
 
         /*

http://git-wip-us.apache.org/repos/asf/kafka/blob/d85aaf95/clients/src/main/java/org/apache/kafka/common/utils/Utils.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/utils/Utils.java b/clients/src/main/java/org/apache/kafka/common/utils/Utils.java
index 4629baf..8d7014a 100755
--- a/clients/src/main/java/org/apache/kafka/common/utils/Utils.java
+++ b/clients/src/main/java/org/apache/kafka/common/utils/Utils.java
@@ -700,6 +700,19 @@ public class Utils {
     }
 
     /**
+     * Closes {@code closeable} and if an exception is thrown, it is logged at the WARN level.
+     */
+    public static void closeQuietly(Closeable closeable, String name) {
+        if (closeable != null) {
+            try {
+                closeable.close();
+            } catch (Throwable t) {
+                log.warn("Failed to close " + name, t);
+            }
+        }
+    }
+
+    /**
      * A cheap way to deterministically convert a number to a positive value. When the input is
      * positive, the original value is returned. When the input number is negative, the returned
      * positive value is the original value bit AND against 0x7fffffff which is not its absolutely