You are viewing a plain text version of this content. The canonical link for it is here.
Posted to distributedlog-issues@bookkeeper.apache.org by GitBox <gi...@apache.org> on 2017/10/24 11:57:09 UTC

[GitHub] jiazhai closed pull request #230: Issue 226: ByteBuf.release() was not called before it's garbage-collected

jiazhai closed pull request #230: Issue 226: ByteBuf.release() was not called before it's garbage-collected
URL: https://github.com/apache/distributedlog/pull/230
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git a/distributedlog-core/src/main/java/org/apache/distributedlog/BKLogSegmentWriter.java b/distributedlog-core/src/main/java/org/apache/distributedlog/BKLogSegmentWriter.java
index 1d65d077..8584780c 100644
--- a/distributedlog-core/src/main/java/org/apache/distributedlog/BKLogSegmentWriter.java
+++ b/distributedlog-core/src/main/java/org/apache/distributedlog/BKLogSegmentWriter.java
@@ -18,6 +18,7 @@
 package org.apache.distributedlog;
 
 import static com.google.common.base.Charsets.UTF_8;
+import static org.apache.distributedlog.DistributedLogConstants.INVALID_TXID;
 import static org.apache.distributedlog.LogRecord.MAX_LOGRECORDSET_SIZE;
 import static org.apache.distributedlog.LogRecord.MAX_LOGRECORD_SIZE;
 
@@ -47,6 +48,7 @@
 import org.apache.bookkeeper.stats.OpStatsLogger;
 import org.apache.bookkeeper.stats.StatsLogger;
 import org.apache.bookkeeper.util.MathUtils;
+import org.apache.distributedlog.Entry.Writer;
 import org.apache.distributedlog.common.concurrent.FutureEventListener;
 import org.apache.distributedlog.common.concurrent.FutureUtils;
 import org.apache.distributedlog.common.stats.OpStatsListener;
@@ -71,11 +73,8 @@
 import org.apache.distributedlog.logsegment.LogSegmentEntryWriter;
 import org.apache.distributedlog.logsegment.LogSegmentWriter;
 import org.apache.distributedlog.util.FailpointUtils;
-
-
 import org.apache.distributedlog.util.OrderedScheduler;
 import org.apache.distributedlog.util.SimplePermitLimiter;
-
 import org.apache.distributedlog.util.Utils;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -104,6 +103,54 @@
 class BKLogSegmentWriter implements LogSegmentWriter, AddCallback, Runnable, Sizable {
     static final Logger LOG = LoggerFactory.getLogger(BKLogSegmentWriter.class);
 
+    final Writer REJECT_WRITES_WRITER = new Writer() {
+        @Override
+        public void writeRecord(LogRecord record, CompletableFuture<DLSN> transmitPromise)
+            throws LogRecordTooLongException, WriteException {
+            throw new WriteException(getFullyQualifiedLogSegment(), "Write record is cancelled.");
+        }
+
+        @Override
+        public boolean hasUserRecords() {
+            return false;
+        }
+
+        @Override
+        public int getNumRecords() {
+            return 0;
+        }
+
+        @Override
+        public int getNumBytes() {
+            return 0;
+        }
+
+        @Override
+        public long getMaxTxId() {
+            return INVALID_TXID;
+        }
+
+        @Override
+        public ByteBuf getBuffer() throws InvalidEnvelopedEntryException, IOException {
+            throw new IOException("GetBuffer is not supported.");
+        }
+
+        @Override
+        public DLSN finalizeTransmit(long lssn, long entryId) {
+            return new DLSN(lssn, entryId, -1L);
+        }
+
+        @Override
+        public void completeTransmit(long lssn, long entryId) {
+            // no-op
+        }
+
+        @Override
+        public void abortTransmit(Throwable reason) {
+            // no-op
+        }
+    };
+
     private final String fullyQualifiedLogSegment;
     private final String streamName;
     private final int logSegmentMetadataVersion;
@@ -120,8 +167,8 @@
     private final boolean isDurableWriteEnabled;
     private DLSN lastDLSN = DLSN.InvalidDLSN;
     private final long startTxId;
-    private long lastTxId = DistributedLogConstants.INVALID_TXID;
-    private long lastTxIdAcknowledged = DistributedLogConstants.INVALID_TXID;
+    private long lastTxId = INVALID_TXID;
+    private long lastTxIdAcknowledged = INVALID_TXID;
     private long outstandingBytes = 0;
     private long numFlushesSinceRestart = 0;
     private long numBytes = 0;
@@ -555,7 +602,7 @@ private void abortTransmitPacketOnClose(final boolean abort,
         synchronized (this) {
             packetPreviousSaved = packetPrevious;
             packetCurrentSaved = new BKTransmitPacket(recordSetWriter);
-            recordSetWriter = newRecordSetWriter();
+            recordSetWriter = REJECT_WRITES_WRITER;
         }
 
         // Once the last packet been transmitted, apply any remaining promises asynchronously


 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services