You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by li...@apache.org on 2022/04/29 02:02:17 UTC
[pulsar] branch master updated: [doc][tiered storage] read data from filesystem (#15239)
This is an automated email from the ASF dual-hosted git repository.
liuyu pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/master by this push:
new 47e57386870 [doc][tiered storage] read data from filesystem (#15239)
47e57386870 is described below
commit 47e5738687010c350b454ce384c3000ef0432bfb
Author: Xiangying Meng <55...@users.noreply.github.com>
AuthorDate: Fri Apr 29 10:02:08 2022 +0800
[doc][tiered storage] read data from filesystem (#15239)
---
site2/docs/tiered-storage-filesystem.md | 63 ++++++++++++++++++++++++++++++++-
1 file changed, 62 insertions(+), 1 deletion(-)
diff --git a/site2/docs/tiered-storage-filesystem.md b/site2/docs/tiered-storage-filesystem.md
index 2e96a48928d..b9dca2d1dad 100644
--- a/site2/docs/tiered-storage-filesystem.md
+++ b/site2/docs/tiered-storage-filesystem.md
@@ -520,4 +520,65 @@ Execute the following commands in the repository where you download Pulsar tarba
And the **Capacity Used** is changed from 4 KB to 116.46 KB.
- ![](assets/FileSystem-8.png)
\ No newline at end of file
+ ![](assets/FileSystem-8.png)
+
+## Read offloaded data from filesystem
+
+* The offloaded data is stored as `MapFile` in the following new path of the filesystem:
+ ```properties
+ path = storageBasePath + "/" + managedLedgerName + "/" + ledgerId + "-" + uuid.toString();
+ ```
+ * `storageBasePath` is the value of `hadoop.tmp.dir`, which is configured in `broker.conf` or `filesystem_offload_core_site.xml`.
+ * `managedLedgerName` is the ledger name of the persistentTopic manager.
+ ```shell
+ managedLedgerName of persistent://public/default/topics-name is public/default/persistent/topics-name.
+ ```
+ You can use the following method to get `managedLedgerName`:
+ ```shell
+ String managedLedgerName = TopicName.get("persistent://public/default/topics-name").getPersistenceNamingEncoding();
+ ```
+
+To read data out as ledger entries from the filesystem, complete the following steps.
+1. Create a reader to read both `MapFile` with a new path and the `configuration` of the filesystem.
+ ```shell
+ MapFile.Reader reader = new MapFile.Reader(new Path(dataFilePath), configuration);
+ ```
+2. Read the data as `LedgerEntry` from the filesystem.
+ ```java
+ LongWritable key = new LongWritable();
+ BytesWritable value = new BytesWritable();
+ key.set(nextExpectedId - 1);
+ reader.seek(key);
+ reader.next(key, value);
+ int length = value.getLength();
+ long entryId = key.get();
+ ByteBuf buf = PooledByteBufAllocator.DEFAULT.buffer(length, length);
+ buf.writeBytes(value.copyBytes());
+ LedgerEntryImpl ledgerEntry = LedgerEntryImpl.create(ledgerId, entryId, length, buf);
+ ```
+3. Deserialize the `LedgerEntry` to `Message`.
+ ```java
+ ByteBuf metadataAndPayload = ledgerEntry.getDataBuffer();
+ long totalSize = metadataAndPayload.readableBytes();
+ BrokerEntryMetadata brokerEntryMetadata = Commands.peekBrokerEntryMetadataIfExist(metadataAndPayload);
+ MessageMetadata metadata = Commands.parseMessageMetadata(metadataAndPayload);
+
+ Map<String, String> properties = new TreeMap<>();
+ properties.put("X-Pulsar-batch-size", String.valueOf(totalSize
+ - metadata.getSerializedSize()));
+ properties.put("TOTAL-CHUNKS", Integer.toString(metadata.getNumChunksFromMsg()));
+ properties.put("CHUNK-ID", Integer.toString(metadata.getChunkId()));
+
+ // Decode if needed
+ CompressionCodec codec = CompressionCodecProvider.getCompressionCodec(metadata.getCompression());
+ ByteBuf uncompressedPayload = codec.decode(metadataAndPayload, metadata.getUncompressedSize());
+ // Copy into a heap buffer for output stream compatibility
+ ByteBuf data = PulsarByteBufAllocator.DEFAULT.heapBuffer(uncompressedPayload.readableBytes(),
+ uncompressedPayload.readableBytes());
+ data.writeBytes(uncompressedPayload);
+ uncompressedPayload.release();
+
+ MessageImpl message = new MessageImpl(topic, ((PositionImpl)ledgerEntry.getPosition()).toString(), properties,
+ data, Schema.BYTES, metadata);
+ message.setBrokerEntryMetadata(brokerEntryMetadata);
+ ```
\ No newline at end of file