You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by li...@apache.org on 2022/04/29 02:02:17 UTC

[pulsar] branch master updated: [doc][tiered storage] read data from filesystem (#15239)

This is an automated email from the ASF dual-hosted git repository.

liuyu pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/master by this push:
     new 47e57386870 [doc][tiered storage] read data from filesystem (#15239)
47e57386870 is described below

commit 47e5738687010c350b454ce384c3000ef0432bfb
Author: Xiangying Meng <55...@users.noreply.github.com>
AuthorDate: Fri Apr 29 10:02:08 2022 +0800

    [doc][tiered storage] read data from filesystem (#15239)
---
 site2/docs/tiered-storage-filesystem.md | 63 ++++++++++++++++++++++++++++++++-
 1 file changed, 62 insertions(+), 1 deletion(-)

diff --git a/site2/docs/tiered-storage-filesystem.md b/site2/docs/tiered-storage-filesystem.md
index 2e96a48928d..b9dca2d1dad 100644
--- a/site2/docs/tiered-storage-filesystem.md
+++ b/site2/docs/tiered-storage-filesystem.md
@@ -520,4 +520,65 @@ Execute the following commands in the repository where you download Pulsar tarba
 
     And the **Capacity Used** is changed from 4 KB to 116.46 KB.
 
-    ![](assets/FileSystem-8.png)
\ No newline at end of file
+    ![](assets/FileSystem-8.png)
+
+## Read offloaded data from filesystem
+
+* The offloaded data is stored as `MapFile` in the following new path of the filesystem:
+  ```properties
+    path = storageBasePath + "/" + managedLedgerName + "/" + ledgerId + "-" + uuid.toString();
+  ```
+    * `storageBasePath` is the value of `hadoop.tmp.dir`, which is configured in `broker.conf` or `filesystem_offload_core_site.xml`.
+    * `managedLedgerName` is the ledger name of the persistentTopic manager.
+  ```shell
+     managedLedgerName of persistent://public/default/topics-name is public/default/persistent/topics-name.
+  ```
+  You can use the following method to get `managedLedgerName`:
+  ```shell
+     String managedLedgerName = TopicName.get("persistent://public/default/topics-name").getPersistenceNamingEncoding(); 
+  ```
+
+To read data out as ledger entries from the filesystem, complete the following steps.
+1. Create a reader to read both `MapFile` with a new path and the `configuration` of the filesystem.
+  ```shell
+     MapFile.Reader reader = new MapFile.Reader(new Path(dataFilePath),  configuration); 
+  ```
+2. Read the data as `LedgerEntry` from the filesystem.
+  ```java
+     LongWritable key = new LongWritable();
+     BytesWritable value = new BytesWritable();
+     key.set(nextExpectedId - 1);
+     reader.seek(key);
+     reader.next(key, value);
+     int length = value.getLength();
+     long entryId = key.get();
+     ByteBuf buf = PooledByteBufAllocator.DEFAULT.buffer(length, length);
+     buf.writeBytes(value.copyBytes());
+     LedgerEntryImpl ledgerEntry = LedgerEntryImpl.create(ledgerId, entryId, length, buf);
+  ```
+3. Deserialize the `LedgerEntry` to `Message`.
+  ```java
+        ByteBuf metadataAndPayload = ledgerEntry.getDataBuffer();
+        long totalSize = metadataAndPayload.readableBytes();
+        BrokerEntryMetadata brokerEntryMetadata = Commands.peekBrokerEntryMetadataIfExist(metadataAndPayload);
+        MessageMetadata metadata = Commands.parseMessageMetadata(metadataAndPayload);
+        
+        Map<String, String> properties = new TreeMap<>();
+        properties.put("X-Pulsar-batch-size", String.valueOf(totalSize
+                - metadata.getSerializedSize()));
+        properties.put("TOTAL-CHUNKS", Integer.toString(metadata.getNumChunksFromMsg()));
+        properties.put("CHUNK-ID", Integer.toString(metadata.getChunkId()));
+
+        // Decode if needed
+        CompressionCodec codec = CompressionCodecProvider.getCompressionCodec(metadata.getCompression());
+        ByteBuf uncompressedPayload = codec.decode(metadataAndPayload, metadata.getUncompressedSize());
+        // Copy into a heap buffer for output stream compatibility
+        ByteBuf data = PulsarByteBufAllocator.DEFAULT.heapBuffer(uncompressedPayload.readableBytes(),
+                uncompressedPayload.readableBytes());
+        data.writeBytes(uncompressedPayload);
+        uncompressedPayload.release();
+  
+        MessageImpl message = new MessageImpl(topic, ((PositionImpl)ledgerEntry.getPosition()).toString(), properties,
+                data, Schema.BYTES, metadata);
+        message.setBrokerEntryMetadata(brokerEntryMetadata);
+  ```
\ No newline at end of file