You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by GitBox <gi...@apache.org> on 2021/11/03 08:35:45 UTC

[GitHub] [pulsar] congbobo184 commented on a change in pull request #12449: [Transaction] Offload transaction data

congbobo184 commented on a change in pull request #12449:
URL: https://github.com/apache/pulsar/pull/12449#discussion_r741641947



##########
File path: managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java
##########
@@ -1848,8 +1851,10 @@ public void asyncReadEntry(PositionImpl position, ReadEntryCallback callback, Ob
     }
 
     private void internalReadFromLedger(ReadHandle ledger, OpReadEntry opReadEntry) {
-
-        if (opReadEntry.readPosition.compareTo(opReadEntry.maxPosition) > 0) {
+        //entries that has been offloaded do not need to be restricted by TransactionBuffer`maxReadPosition
+        LedgerInfo info = ledgers.get(ledger.getId());
+        if (opReadEntry.readPosition.compareTo(opReadEntry.maxPosition) > 0

Review comment:
       we only can offload small than maxPosition? why we should add this logic?

##########
File path: tiered-storage/jcloud/src/main/java/org/apache/bookkeeper/mledger/offload/jcloud/impl/BlobStoreBackedReadHandleImplV2.java
##########
@@ -153,18 +155,35 @@ public LedgerMetadata getLedgerMetadata() {
                 long entriesToRead = (groupedReader.lastEntry - groupedReader.firstEntry) + 1;
                 long nextExpectedId = groupedReader.firstEntry;
                 try {
+                    groupedReader.inputStream
+                            .seek(groupedReader.index
+                                    .getIndexEntryForEntry(groupedReader.ledgerId, nextExpectedId)
+                                    .getDataOffset());
                     while (entriesToRead > 0) {
-                        int length = groupedReader.dataStream.readInt();
-                        if (length < 0) { // hit padding or new block
-                            groupedReader.inputStream
-                                    .seek(groupedReader.index
-                                            .getIndexEntryForEntry(groupedReader.ledgerId, nextExpectedId)
-                                            .getDataOffset());
-                            continue;
+
+                        long entryId;
+                        int length;
+                        try {
+                            length = groupedReader.dataStream.readInt();
+                            if (length < 0) { // hit padding or new block
+                                groupedReader.inputStream
+                                        .seek(groupedReader.index
+                                                .getIndexEntryForEntry(groupedReader.ledgerId, nextExpectedId)
+                                                .getDataOffset());
+                                continue;
+                            }
+                            entryId = groupedReader.dataStream.readLong();
+                        } catch (EOFException ioException) {
+                            //Some entries in this ledger may be filtered. If the last entry in this ledger was filtered,

Review comment:
       add a test for it when produce EOFException

##########
File path: managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java
##########
@@ -1870,7 +1875,9 @@ private void internalReadFromLedger(ReadHandle ledger, OpReadEntry opReadEntry)
 
         // can read max position entryId
         if (ledger.getId() == opReadEntry.maxPosition.getLedgerId()) {
-            lastEntryInLedger = min(opReadEntry.maxPosition.getEntryId(), lastEntryInLedger);
+            if(!info.getOffloadContext().getComplete()){

Review comment:
       same as above

##########
File path: tiered-storage/file-system/src/main/java/org/apache/bookkeeper/mledger/offload/filesystem/impl/FileStoreBackedReadHandleImpl.java
##########
@@ -105,30 +105,22 @@ public LedgerMetadata getLedgerMetadata() {
                 promise.completeExceptionally(new BKException.BKIncorrectParameterException());
                 return;
             }
-            long entriesToRead = (lastEntry - firstEntry) + 1;
             List<LedgerEntry> entries = new ArrayList<LedgerEntry>();
-            long nextExpectedId = firstEntry;
             LongWritable key = new LongWritable();
             BytesWritable value = new BytesWritable();
             try {
-                key.set(nextExpectedId - 1);
+                key.set(firstEntry - 1);
                 reader.seek(key);
-                while (entriesToRead > 0) {
-                    reader.next(key, value);
+                reader.next(key, value);
+                long entryId;
+                do {
                     int length = value.getLength();
-                    long entryId = key.get();
-                    if (entryId == nextExpectedId) {
-                        ByteBuf buf = PooledByteBufAllocator.DEFAULT.buffer(length, length);
-                        entries.add(LedgerEntryImpl.create(ledgerId, entryId, length, buf));
-                        buf.writeBytes(value.copyBytes());
-                        entriesToRead--;
-                        nextExpectedId++;
-                    } else if (entryId > lastEntry) {
-                        log.info("Expected to read {}, but read {}, which is greater than last entry {}",
-                                nextExpectedId, entryId, lastEntry);
-                        throw new BKException.BKUnexpectedConditionException();
-                    }
-            }
+                    entryId = key.get();
+                    ByteBuf buf = PooledByteBufAllocator.DEFAULT.buffer(length, length);

Review comment:
       we can't return length is 0 entry, same as above BlockAwareSegmentInputStream.java 

##########
File path: tiered-storage/jcloud/src/main/java/org/apache/bookkeeper/mledger/offload/jcloud/impl/BlobStoreManagedLedgerOffloader.java
##########
@@ -200,7 +204,7 @@ public String getOffloadDriverName() {
 
                     try (BlockAwareSegmentInputStream blockStream = new BlockAwareSegmentInputStreamImpl(
                         readHandle, startEntry, blockSize)) {
-
+                        blockStream.setOffloadFilter(offloadFilter);

Review comment:
       Is it better to add it to the construction method?

##########
File path: tiered-storage/jcloud/src/main/java/org/apache/bookkeeper/mledger/offload/jcloud/impl/BlobStoreBackedReadHandleImplV2.java
##########
@@ -196,6 +209,10 @@ public LedgerMetadata getLedgerMetadata() {
                             val skipped = groupedReader.inputStream.skip(length);
                         }
                     }
+                    if (entries.isEmpty()) {
+                        ByteBuf buf = PooledByteBufAllocator.DEFAULT.buffer(0, 0);
+                        entries.add(LedgerEntryImpl.create(ledgerId, 0, 0, buf));

Review comment:
       we can't do this, because `LedgerEntryImpl.create(ledgerId, 0, 0, buf)` will be sent to consumer but it don't have any data

##########
File path: pulsar-broker/src/main/java/org/apache/pulsar/broker/service/OffloadFilterImp.java
##########
@@ -0,0 +1,56 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.broker.service;
+
+import org.apache.bookkeeper.client.api.LedgerEntry;
+import org.apache.bookkeeper.mledger.OffloadFilter;
+import org.apache.pulsar.broker.service.persistent.PersistentTopic;
+import org.apache.pulsar.client.api.transaction.TxnID;
+import org.apache.pulsar.common.api.proto.MessageMetadata;
+import org.apache.pulsar.common.protocol.Commands;
+import org.apache.pulsar.common.protocol.Markers;
+
+public class OffloadFilterImp implements OffloadFilter {
+    PersistentTopic persistentTopic;
+    public OffloadFilterImp(PersistentTopic persistentTopic) {
+        this.persistentTopic = persistentTopic;
+    }
+
+    @Override
+    public boolean checkIfNeedOffload(LedgerEntry ledgerEntry) {
+        MessageMetadata messageMetadata = Commands.parseMessageMetadata(ledgerEntry.getEntryBuffer());
+
+        if (messageMetadata.hasTxnidLeastBits() && messageMetadata.hasTxnidMostBits()){
+            return !persistentTopic.isTxnAborted(new TxnID(messageMetadata.getTxnidMostBits(),
+                    messageMetadata.getTxnidLeastBits())) && !Markers.isTxnMarker(messageMetadata);
+        }
+        return true;
+    }
+
+    @Override
+    public boolean checkIfLedgerIdCanOffload(long ledgerId) {
+        return ledgerId <= persistentTopic.getMaxReadPosition().getLedgerId();
+    }
+
+    @Override
+    public boolean checkFilterIsReady() {
+        return persistentTopic.getBrokerService().getPulsar().getConfiguration().isTransactionCoordinatorEnabled()
+                && !"Initializing".equals(persistentTopic.getTransactionBufferStats().state);

Review comment:
       only 'Ready' and 'NoSnapshot' can offload right?

##########
File path: tiered-storage/jcloud/src/main/java/org/apache/bookkeeper/mledger/offload/jcloud/impl/BlobStoreBackedReadHandleImplV2.java
##########
@@ -153,18 +155,35 @@ public LedgerMetadata getLedgerMetadata() {
                 long entriesToRead = (groupedReader.lastEntry - groupedReader.firstEntry) + 1;
                 long nextExpectedId = groupedReader.firstEntry;
                 try {
+                    groupedReader.inputStream
+                            .seek(groupedReader.index
+                                    .getIndexEntryForEntry(groupedReader.ledgerId, nextExpectedId)
+                                    .getDataOffset());

Review comment:
       Annotate this seems to have no effect on test




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org