You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by GitBox <gi...@apache.org> on 2021/09/24 14:10:31 UTC

[GitHub] [pulsar] hangc0276 commented on a change in pull request #12123: Fix the potential race condition in the BlobStore readhandler

hangc0276 commented on a change in pull request #12123:
URL: https://github.com/apache/pulsar/pull/12123#discussion_r715613361



##########
File path: tiered-storage/jcloud/src/main/java/org/apache/bookkeeper/mledger/offload/jcloud/impl/BlobStoreBackedReadHandleImpl.java
##########
@@ -94,56 +104,70 @@ public LedgerMetadata getLedgerMetadata() {
         log.debug("Ledger {}: reading {} - {}", getId(), firstEntry, lastEntry);
         CompletableFuture<LedgerEntries> promise = new CompletableFuture<>();
         executor.submit(() -> {
+            List<LedgerEntry> entries = new ArrayList<LedgerEntry>();
+            try {
                 if (firstEntry > lastEntry
                     || firstEntry < 0
                     || lastEntry > getLastAddConfirmed()) {
                     promise.completeExceptionally(new BKException.BKIncorrectParameterException());
                     return;
                 }
                 long entriesToRead = (lastEntry - firstEntry) + 1;
-                List<LedgerEntry> entries = new ArrayList<LedgerEntry>();
                 long nextExpectedId = firstEntry;
-                try {
-                    while (entriesToRead > 0) {
-                        int length = dataStream.readInt();
-                        if (length < 0) { // hit padding or new block
-                            inputStream.seek(index.getIndexEntryForEntry(nextExpectedId).getDataOffset());
-                            continue;
-                        }
-                        long entryId = dataStream.readLong();
-
-                        if (entryId == nextExpectedId) {
-                            ByteBuf buf = PulsarByteBufAllocator.DEFAULT.buffer(length, length);
-                            entries.add(LedgerEntryImpl.create(ledgerId, entryId, length, buf));
-                            int toWrite = length;
-                            while (toWrite > 0) {
-                                toWrite -= buf.writeBytes(dataStream, toWrite);
-                            }
-                            entriesToRead--;
-                            nextExpectedId++;
-                        } else if (entryId > nextExpectedId) {
-                            inputStream.seek(index.getIndexEntryForEntry(nextExpectedId).getDataOffset());
-                            continue;
-                        } else if (entryId < nextExpectedId
-                                && !index.getIndexEntryForEntry(nextExpectedId).equals(
-                                index.getIndexEntryForEntry(entryId)))  {
-                            inputStream.seek(index.getIndexEntryForEntry(nextExpectedId).getDataOffset());
-                            continue;
-                        } else if (entryId > lastEntry) {
-                            log.info("Expected to read {}, but read {}, which is greater than last entry {}",
-                                     nextExpectedId, entryId, lastEntry);
-                            throw new BKException.BKUnexpectedConditionException();
-                        } else {
-                            long ignored = inputStream.skip(length);
+
+                // seek the position to the first entry position, otherwise we will get the unexpected entry ID when doing
+                // the first read, that would cause read an unexpected entry id which is out of range between firstEntry
+                // and lastEntry
+                // for example, when we get 1-10 entries at first, then the next request is get 2-9, the following code
+                // will read the entry id from the stream and that is not the correct entry id, so it will seek to the
+                // correct position then read the stream as normal. But the entry id may exceed the last entry id, that
+                // will cause we are hardly to know the edge of the request range.
+                inputStream.seek(index.getIndexEntryForEntry(firstEntry).getDataOffset());
+
+                while (entriesToRead > 0) {
+                    if (state == State.Closed) {
+                        log.warn("Reading a closed read handler. Ledger ID: {}, Read range: {}-{}", ledgerId, firstEntry, lastEntry);
+                        throw new BKException.BKUnexpectedConditionException();
+                    }
+                    int length = dataStream.readInt();
+                    if (length < 0) { // hit padding or new block
+                        inputStream.seek(index.getIndexEntryForEntry(nextExpectedId).getDataOffset());
+                        continue;
+                    }
+                    long entryId = dataStream.readLong();
+
+                    if (entryId == nextExpectedId) {
+                        ByteBuf buf = PulsarByteBufAllocator.DEFAULT.buffer(length, length);
+                        entries.add(LedgerEntryImpl.create(ledgerId, entryId, length, buf));
+                        int toWrite = length;
+                        while (toWrite > 0) {
+                            toWrite -= buf.writeBytes(dataStream, toWrite);
                         }
+                        entriesToRead--;
+                        nextExpectedId++;
+                    } else if (entryId > nextExpectedId && entryId < lastEntry) {

Review comment:
       The two if check are lead to 
   ```Java
   inputStream.seek(index.getIndexEntryForEntry(nextExpectedId).getDataOffset());
   continue;
   ```
   can we merge them into one if check?

##########
File path: tiered-storage/jcloud/src/main/java/org/apache/bookkeeper/mledger/offload/jcloud/impl/BlobStoreBackedReadHandleImpl.java
##########
@@ -94,56 +104,70 @@ public LedgerMetadata getLedgerMetadata() {
         log.debug("Ledger {}: reading {} - {}", getId(), firstEntry, lastEntry);
         CompletableFuture<LedgerEntries> promise = new CompletableFuture<>();
         executor.submit(() -> {
+            List<LedgerEntry> entries = new ArrayList<LedgerEntry>();

Review comment:
       We'd better put it behind 
   ```Java
   if (firstEntry > lastEntry
                       || firstEntry < 0
                       || lastEntry > getLastAddConfirmed()) {
       promise.completeExceptionally(new BKException.BKIncorrectParameterException());
       return;
   }
   ```
   we can save the list object apply when the (firstEntry, lastEntry) check failed.

##########
File path: tiered-storage/jcloud/src/main/java/org/apache/bookkeeper/mledger/offload/jcloud/impl/BlobStoreBackedInputStreamImpl.java
##########
@@ -115,7 +115,8 @@ public int read(byte[] b, int off, int len) throws IOException {
     }
 
     @Override
-    public void seek(long position) {
+    public void seek(long position) throws IOException {
+        refillBufferIfNeeded();

Review comment:
       Why add `refillBufferIfNeed()`?
   We'd better check `buffer.readerIndex() == position`, if the check is true, just skip the following check and return.

##########
File path: tiered-storage/jcloud/src/main/java/org/apache/bookkeeper/mledger/offload/jcloud/impl/BlobStoreBackedReadHandleImpl.java
##########
@@ -94,56 +104,70 @@ public LedgerMetadata getLedgerMetadata() {
         log.debug("Ledger {}: reading {} - {}", getId(), firstEntry, lastEntry);
         CompletableFuture<LedgerEntries> promise = new CompletableFuture<>();
         executor.submit(() -> {
+            List<LedgerEntry> entries = new ArrayList<LedgerEntry>();
+            try {
                 if (firstEntry > lastEntry
                     || firstEntry < 0
                     || lastEntry > getLastAddConfirmed()) {
                     promise.completeExceptionally(new BKException.BKIncorrectParameterException());
                     return;
                 }
                 long entriesToRead = (lastEntry - firstEntry) + 1;
-                List<LedgerEntry> entries = new ArrayList<LedgerEntry>();
                 long nextExpectedId = firstEntry;
-                try {
-                    while (entriesToRead > 0) {
-                        int length = dataStream.readInt();
-                        if (length < 0) { // hit padding or new block
-                            inputStream.seek(index.getIndexEntryForEntry(nextExpectedId).getDataOffset());
-                            continue;
-                        }
-                        long entryId = dataStream.readLong();
-
-                        if (entryId == nextExpectedId) {
-                            ByteBuf buf = PulsarByteBufAllocator.DEFAULT.buffer(length, length);
-                            entries.add(LedgerEntryImpl.create(ledgerId, entryId, length, buf));
-                            int toWrite = length;
-                            while (toWrite > 0) {
-                                toWrite -= buf.writeBytes(dataStream, toWrite);
-                            }
-                            entriesToRead--;
-                            nextExpectedId++;
-                        } else if (entryId > nextExpectedId) {
-                            inputStream.seek(index.getIndexEntryForEntry(nextExpectedId).getDataOffset());
-                            continue;
-                        } else if (entryId < nextExpectedId
-                                && !index.getIndexEntryForEntry(nextExpectedId).equals(
-                                index.getIndexEntryForEntry(entryId)))  {
-                            inputStream.seek(index.getIndexEntryForEntry(nextExpectedId).getDataOffset());
-                            continue;
-                        } else if (entryId > lastEntry) {
-                            log.info("Expected to read {}, but read {}, which is greater than last entry {}",
-                                     nextExpectedId, entryId, lastEntry);
-                            throw new BKException.BKUnexpectedConditionException();
-                        } else {
-                            long ignored = inputStream.skip(length);
+
+                // seek the position to the first entry position, otherwise we will get the unexpected entry ID when doing
+                // the first read, that would cause read an unexpected entry id which is out of range between firstEntry
+                // and lastEntry
+                // for example, when we get 1-10 entries at first, then the next request is get 2-9, the following code
+                // will read the entry id from the stream and that is not the correct entry id, so it will seek to the
+                // correct position then read the stream as normal. But the entry id may exceed the last entry id, that
+                // will cause we are hardly to know the edge of the request range.
+                inputStream.seek(index.getIndexEntryForEntry(firstEntry).getDataOffset());
+
+                while (entriesToRead > 0) {
+                    if (state == State.Closed) {
+                        log.warn("Reading a closed read handler. Ledger ID: {}, Read range: {}-{}", ledgerId, firstEntry, lastEntry);
+                        throw new BKException.BKUnexpectedConditionException();
+                    }
+                    int length = dataStream.readInt();
+                    if (length < 0) { // hit padding or new block
+                        inputStream.seek(index.getIndexEntryForEntry(nextExpectedId).getDataOffset());
+                        continue;
+                    }
+                    long entryId = dataStream.readLong();
+
+                    if (entryId == nextExpectedId) {
+                        ByteBuf buf = PulsarByteBufAllocator.DEFAULT.buffer(length, length);
+                        entries.add(LedgerEntryImpl.create(ledgerId, entryId, length, buf));
+                        int toWrite = length;
+                        while (toWrite > 0) {
+                            toWrite -= buf.writeBytes(dataStream, toWrite);
                         }
+                        entriesToRead--;
+                        nextExpectedId++;
+                    } else if (entryId > nextExpectedId && entryId < lastEntry) {
+                        inputStream.seek(index.getIndexEntryForEntry(nextExpectedId).getDataOffset());
+                        continue;
+                    } else if (entryId < nextExpectedId
+                        && !index.getIndexEntryForEntry(nextExpectedId).equals(
+                        index.getIndexEntryForEntry(entryId))) {
+                        inputStream.seek(index.getIndexEntryForEntry(nextExpectedId).getDataOffset());
+                        continue;
+                    } else if (entryId > lastEntry) {
+                        log.info("Expected to read {}, but read {}, which is greater than last entry {}",
+                            nextExpectedId, entryId, lastEntry);
+                        throw new BKException.BKUnexpectedConditionException();
+                    } else {
+                        long ignored = inputStream.skip(length);

Review comment:
       The `ignored` has no reference, we can use `inputStream.skip(length);`

##########
File path: tiered-storage/jcloud/src/main/java/org/apache/bookkeeper/mledger/offload/jcloud/impl/BlobStoreBackedReadHandleImpl.java
##########
@@ -94,56 +104,70 @@ public LedgerMetadata getLedgerMetadata() {
         log.debug("Ledger {}: reading {} - {}", getId(), firstEntry, lastEntry);
         CompletableFuture<LedgerEntries> promise = new CompletableFuture<>();
         executor.submit(() -> {
+            List<LedgerEntry> entries = new ArrayList<LedgerEntry>();
+            try {
                 if (firstEntry > lastEntry
                     || firstEntry < 0
                     || lastEntry > getLastAddConfirmed()) {
                     promise.completeExceptionally(new BKException.BKIncorrectParameterException());
                     return;
                 }
                 long entriesToRead = (lastEntry - firstEntry) + 1;
-                List<LedgerEntry> entries = new ArrayList<LedgerEntry>();
                 long nextExpectedId = firstEntry;
-                try {
-                    while (entriesToRead > 0) {
-                        int length = dataStream.readInt();
-                        if (length < 0) { // hit padding or new block
-                            inputStream.seek(index.getIndexEntryForEntry(nextExpectedId).getDataOffset());
-                            continue;
-                        }
-                        long entryId = dataStream.readLong();
-
-                        if (entryId == nextExpectedId) {
-                            ByteBuf buf = PulsarByteBufAllocator.DEFAULT.buffer(length, length);
-                            entries.add(LedgerEntryImpl.create(ledgerId, entryId, length, buf));
-                            int toWrite = length;
-                            while (toWrite > 0) {
-                                toWrite -= buf.writeBytes(dataStream, toWrite);
-                            }
-                            entriesToRead--;
-                            nextExpectedId++;
-                        } else if (entryId > nextExpectedId) {
-                            inputStream.seek(index.getIndexEntryForEntry(nextExpectedId).getDataOffset());
-                            continue;
-                        } else if (entryId < nextExpectedId
-                                && !index.getIndexEntryForEntry(nextExpectedId).equals(
-                                index.getIndexEntryForEntry(entryId)))  {
-                            inputStream.seek(index.getIndexEntryForEntry(nextExpectedId).getDataOffset());
-                            continue;
-                        } else if (entryId > lastEntry) {
-                            log.info("Expected to read {}, but read {}, which is greater than last entry {}",
-                                     nextExpectedId, entryId, lastEntry);
-                            throw new BKException.BKUnexpectedConditionException();
-                        } else {
-                            long ignored = inputStream.skip(length);
+
+                // seek the position to the first entry position, otherwise we will get the unexpected entry ID when doing
+                // the first read, that would cause read an unexpected entry id which is out of range between firstEntry
+                // and lastEntry
+                // for example, when we get 1-10 entries at first, then the next request is get 2-9, the following code
+                // will read the entry id from the stream and that is not the correct entry id, so it will seek to the
+                // correct position then read the stream as normal. But the entry id may exceed the last entry id, that
+                // will cause we are hardly to know the edge of the request range.
+                inputStream.seek(index.getIndexEntryForEntry(firstEntry).getDataOffset());
+
+                while (entriesToRead > 0) {
+                    if (state == State.Closed) {
+                        log.warn("Reading a closed read handler. Ledger ID: {}, Read range: {}-{}", ledgerId, firstEntry, lastEntry);
+                        throw new BKException.BKUnexpectedConditionException();
+                    }
+                    int length = dataStream.readInt();
+                    if (length < 0) { // hit padding or new block
+                        inputStream.seek(index.getIndexEntryForEntry(nextExpectedId).getDataOffset());
+                        continue;
+                    }
+                    long entryId = dataStream.readLong();
+
+                    if (entryId == nextExpectedId) {
+                        ByteBuf buf = PulsarByteBufAllocator.DEFAULT.buffer(length, length);
+                        entries.add(LedgerEntryImpl.create(ledgerId, entryId, length, buf));
+                        int toWrite = length;
+                        while (toWrite > 0) {
+                            toWrite -= buf.writeBytes(dataStream, toWrite);
                         }
+                        entriesToRead--;
+                        nextExpectedId++;
+                    } else if (entryId > nextExpectedId && entryId < lastEntry) {
+                        inputStream.seek(index.getIndexEntryForEntry(nextExpectedId).getDataOffset());
+                        continue;
+                    } else if (entryId < nextExpectedId
+                        && !index.getIndexEntryForEntry(nextExpectedId).equals(
+                        index.getIndexEntryForEntry(entryId))) {
+                        inputStream.seek(index.getIndexEntryForEntry(nextExpectedId).getDataOffset());
+                        continue;
+                    } else if (entryId > lastEntry) {
+                        log.info("Expected to read {}, but read {}, which is greater than last entry {}",
+                            nextExpectedId, entryId, lastEntry);
+                        throw new BKException.BKUnexpectedConditionException();
+                    } else {
+                        long ignored = inputStream.skip(length);

Review comment:
       The `ignored` has no reference, we can use `inputStream.skip(length);`

##########
File path: tiered-storage/jcloud/src/main/java/org/apache/bookkeeper/mledger/offload/jcloud/impl/BlobStoreBackedReadHandleImpl.java
##########
@@ -94,56 +104,70 @@ public LedgerMetadata getLedgerMetadata() {
         log.debug("Ledger {}: reading {} - {}", getId(), firstEntry, lastEntry);
         CompletableFuture<LedgerEntries> promise = new CompletableFuture<>();
         executor.submit(() -> {
+            List<LedgerEntry> entries = new ArrayList<LedgerEntry>();
+            try {
                 if (firstEntry > lastEntry
                     || firstEntry < 0
                     || lastEntry > getLastAddConfirmed()) {
                     promise.completeExceptionally(new BKException.BKIncorrectParameterException());
                     return;
                 }
                 long entriesToRead = (lastEntry - firstEntry) + 1;
-                List<LedgerEntry> entries = new ArrayList<LedgerEntry>();
                 long nextExpectedId = firstEntry;
-                try {
-                    while (entriesToRead > 0) {
-                        int length = dataStream.readInt();
-                        if (length < 0) { // hit padding or new block
-                            inputStream.seek(index.getIndexEntryForEntry(nextExpectedId).getDataOffset());
-                            continue;
-                        }
-                        long entryId = dataStream.readLong();
-
-                        if (entryId == nextExpectedId) {
-                            ByteBuf buf = PulsarByteBufAllocator.DEFAULT.buffer(length, length);
-                            entries.add(LedgerEntryImpl.create(ledgerId, entryId, length, buf));
-                            int toWrite = length;
-                            while (toWrite > 0) {
-                                toWrite -= buf.writeBytes(dataStream, toWrite);
-                            }
-                            entriesToRead--;
-                            nextExpectedId++;
-                        } else if (entryId > nextExpectedId) {
-                            inputStream.seek(index.getIndexEntryForEntry(nextExpectedId).getDataOffset());
-                            continue;
-                        } else if (entryId < nextExpectedId
-                                && !index.getIndexEntryForEntry(nextExpectedId).equals(
-                                index.getIndexEntryForEntry(entryId)))  {
-                            inputStream.seek(index.getIndexEntryForEntry(nextExpectedId).getDataOffset());
-                            continue;
-                        } else if (entryId > lastEntry) {
-                            log.info("Expected to read {}, but read {}, which is greater than last entry {}",
-                                     nextExpectedId, entryId, lastEntry);
-                            throw new BKException.BKUnexpectedConditionException();
-                        } else {
-                            long ignored = inputStream.skip(length);
+
+                // seek the position to the first entry position, otherwise we will get the unexpected entry ID when doing
+                // the first read, that would cause read an unexpected entry id which is out of range between firstEntry
+                // and lastEntry
+                // for example, when we get 1-10 entries at first, then the next request is get 2-9, the following code
+                // will read the entry id from the stream and that is not the correct entry id, so it will seek to the
+                // correct position then read the stream as normal. But the entry id may exceed the last entry id, that
+                // will cause we are hardly to know the edge of the request range.
+                inputStream.seek(index.getIndexEntryForEntry(firstEntry).getDataOffset());

Review comment:
       Because we have seek to the firstEntry before reading, we can simplify the following check logic. If the entryId read from dataStream not equal to `nextExpectedId`, we can throw exception.
   @codelipenghui Please help check this logic.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org