You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by GitBox <gi...@apache.org> on 2018/05/02 17:03:04 UTC

[GitHub] sijie closed pull request #1593: PIP-17: the part of index block for offload.

sijie closed pull request #1593: PIP-17:  the part of index block for offload.
URL: https://github.com/apache/incubator-pulsar/pull/1593
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/s3offload/OffloadIndexBlock.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/s3offload/OffloadIndexBlock.java
new file mode 100644
index 0000000000..8f9d3cecb1
--- /dev/null
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/s3offload/OffloadIndexBlock.java
@@ -0,0 +1,63 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.broker.s3offload;
+
+import java.io.Closeable;
+import java.io.IOException;
+import java.io.InputStream;
+import org.apache.bookkeeper.client.api.LedgerMetadata;
+import org.apache.bookkeeper.common.annotation.InterfaceStability.Unstable;
+
+/**
+ *
+ * The Index block abstraction used for offload a ledger to long term storage.
+ *
+ */
+@Unstable
+public interface OffloadIndexBlock extends Closeable {
+
+    /**
+     * Get the content of the index block as InputStream.
+     * Read out in format:
+     *   | index_magic_header | index_block_len | index_entry_count |
+     *   |segment_metadata_length | segment metadata | index entries |
+     */
+    InputStream toStream() throws IOException;
+
+    /**
+     * Get the related OffloadIndexEntry that contains the given messageEntryId.
+     *
+     * @param messageEntryId
+     *                      the entry id of message
+     * @return the offload index entry
+     */
+    OffloadIndexEntry getIndexEntryForEntry(long messageEntryId) throws IOException;
+
+    /**
+     * Get the entry count that contained in this index Block.
+     */
+    int getEntryCount();
+
+    /**
+     * Get LedgerMetadata.
+     */
+    LedgerMetadata getLedgerMetadata();
+
+}
+
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/s3offload/OffloadIndexBlockBuilder.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/s3offload/OffloadIndexBlockBuilder.java
new file mode 100644
index 0000000000..8ec0395498
--- /dev/null
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/s3offload/OffloadIndexBlockBuilder.java
@@ -0,0 +1,72 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.broker.s3offload;
+
+import java.io.IOException;
+import java.io.InputStream;
+import org.apache.bookkeeper.client.api.LedgerMetadata;
+import org.apache.bookkeeper.common.annotation.InterfaceAudience.LimitedPrivate;
+import org.apache.bookkeeper.common.annotation.InterfaceStability.Unstable;
+import org.apache.pulsar.broker.s3offload.impl.OffloadIndexBlockBuilderImpl;
+
+/**
+ * Interface for builder of index block used for offload a ledger to long term storage.
+ */
+@Unstable
+@LimitedPrivate
+public interface OffloadIndexBlockBuilder {
+
+    /**
+     * Build index block with the passed in ledger metadata.
+     *
+     * @param metadata the ledger metadata
+     */
+    OffloadIndexBlockBuilder withMetadata(LedgerMetadata metadata);
+
+    /**
+     * Add one payload block related information into index block.
+     * It contains the first entryId in payload block, the payload block Id,
+     * and payload block size.
+     * This information will be used to consist one index entry in OffloadIndexBlock.
+     *
+     * @param firstEntryId the first entryId in payload block
+     * @param partId the payload block Id
+     * @param blockSize the payload block size
+     */
+    OffloadIndexBlockBuilder addBlock(long firstEntryId, int partId, int blockSize);
+
+    /**
+     * Finalize the immutable OffloadIndexBlock
+     */
+    OffloadIndexBlock build();
+
+    /**
+     * Construct OffloadIndex from an InputStream
+     */
+    OffloadIndexBlock fromStream(InputStream is) throws IOException;
+
+    /**
+     * create an OffloadIndexBlockBuilder
+     */
+    static OffloadIndexBlockBuilder create() {
+        return new OffloadIndexBlockBuilderImpl();
+    }
+
+
+}
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/s3offload/OffloadIndexEntry.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/s3offload/OffloadIndexEntry.java
new file mode 100644
index 0000000000..03927d31e8
--- /dev/null
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/s3offload/OffloadIndexEntry.java
@@ -0,0 +1,50 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.broker.s3offload;
+
+import org.apache.bookkeeper.common.annotation.InterfaceAudience.LimitedPrivate;
+import org.apache.bookkeeper.common.annotation.InterfaceStability.Unstable;
+
+/**
+ *
+ * The Index Entry in OffloadIndexBlock.
+ * It consists of the message entry id, the code storage block part id for this message entry,
+ * and the offset in code storage block for this message id.
+ *
+ */
+@Unstable
+@LimitedPrivate
+public interface OffloadIndexEntry {
+
+    /**
+     * Get the entryId that this entry contains.
+     */
+    long getEntryId();
+
+    /**
+     * Get the block part id of code storage.
+     */
+    int getPartId();
+
+    /**
+     * Get the offset of this message entry in code storage.
+     */
+    long getOffset();
+}
+
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/s3offload/impl/OffloadIndexBlockBuilderImpl.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/s3offload/impl/OffloadIndexBlockBuilderImpl.java
new file mode 100644
index 0000000000..ced3bf37c6
--- /dev/null
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/s3offload/impl/OffloadIndexBlockBuilderImpl.java
@@ -0,0 +1,77 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.broker.s3offload.impl;
+
+import static com.google.common.base.Preconditions.checkState;
+
+import com.google.common.collect.Lists;
+import java.io.IOException;
+import java.io.InputStream;
+import java.util.List;
+import org.apache.bookkeeper.client.api.LedgerMetadata;
+import org.apache.pulsar.broker.s3offload.OffloadIndexBlock;
+import org.apache.pulsar.broker.s3offload.OffloadIndexBlockBuilder;
+
+/**
+ * Interface for builder of index block used for offload a ledger to long term storage.
+ */
+public class OffloadIndexBlockBuilderImpl implements OffloadIndexBlockBuilder {
+
+    private LedgerMetadata ledgerMetadata;
+    private List<OffloadIndexEntryImpl> entries;
+
+    public OffloadIndexBlockBuilderImpl() {
+        this.entries = Lists.newArrayList();
+    }
+
+    @Override
+    public OffloadIndexBlockBuilder withMetadata(LedgerMetadata metadata) {
+        this.ledgerMetadata = metadata;
+        return this;
+    }
+
+    @Override
+    public OffloadIndexBlockBuilder addBlock(long firstEntryId, int partId, int blockSize) {
+        // we should added one by one.
+        long offset;
+        if(firstEntryId == 0) {
+            checkState(entries.size() == 0);
+            offset = 0;
+        } else {
+            checkState(entries.size() > 0);
+            offset = entries.get(entries.size() - 1).getOffset() + blockSize;
+        }
+
+        this.entries.add(OffloadIndexEntryImpl.of(firstEntryId, partId, offset));
+        return this;
+    }
+
+    @Override
+    public OffloadIndexBlock fromStream(InputStream is) throws IOException {
+        return OffloadIndexBlockImpl.get(is);
+    }
+
+    @Override
+    public OffloadIndexBlock build() {
+        checkState(ledgerMetadata != null);
+        checkState(!entries.isEmpty());
+        return OffloadIndexBlockImpl.get(ledgerMetadata, entries);
+    }
+
+}
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/s3offload/impl/OffloadIndexBlockImpl.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/s3offload/impl/OffloadIndexBlockImpl.java
new file mode 100644
index 0000000000..31058b4f29
--- /dev/null
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/s3offload/impl/OffloadIndexBlockImpl.java
@@ -0,0 +1,337 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.broker.s3offload.impl;
+
+import static com.google.common.base.Preconditions.checkState;
+
+import com.google.common.collect.Maps;
+import io.netty.buffer.ByteBuf;
+import io.netty.buffer.ByteBufInputStream;
+import io.netty.buffer.PooledByteBufAllocator;
+import io.netty.util.Recycler;
+import io.netty.util.Recycler.Handle;
+import java.io.DataInputStream;
+import java.io.IOException;
+import java.io.InputStream;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+import java.util.NavigableMap;
+import java.util.TreeMap;
+import java.util.stream.Collectors;
+import org.apache.bookkeeper.client.BookKeeper;
+import org.apache.bookkeeper.client.api.DigestType;
+import org.apache.bookkeeper.client.api.LedgerMetadata;
+import org.apache.bookkeeper.net.BookieSocketAddress;
+import org.apache.bookkeeper.proto.DataFormats;
+import org.apache.bookkeeper.proto.DataFormats.LedgerMetadataFormat;
+import org.apache.bookkeeper.proto.DataFormats.LedgerMetadataFormat.State;
+import org.apache.bookkeeper.shaded.com.google.protobuf.ByteString;
+import org.apache.pulsar.broker.s3offload.OffloadIndexBlock;
+import org.apache.pulsar.broker.s3offload.OffloadIndexEntry;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class OffloadIndexBlockImpl implements OffloadIndexBlock {
+    private static final Logger log = LoggerFactory.getLogger(OffloadIndexBlockImpl.class);
+
+    private static final int INDEX_MAGIC_WORD = 0xDE47DE47;
+
+    private LedgerMetadata segmentMetadata;
+    private TreeMap<Long, OffloadIndexEntryImpl> indexEntries;
+
+    private final Handle<OffloadIndexBlockImpl> recyclerHandle;
+
+    private static final Recycler<OffloadIndexBlockImpl> RECYCLER = new Recycler<OffloadIndexBlockImpl>() {
+        @Override
+        protected OffloadIndexBlockImpl newObject(Recycler.Handle<OffloadIndexBlockImpl> handle) {
+            return new OffloadIndexBlockImpl(handle);
+        }
+    };
+
+    private OffloadIndexBlockImpl(Handle<OffloadIndexBlockImpl> recyclerHandle) {
+        this.recyclerHandle = recyclerHandle;
+    }
+
+    public static OffloadIndexBlockImpl get(LedgerMetadata metadata, List<OffloadIndexEntryImpl> entries) {
+        OffloadIndexBlockImpl block = RECYCLER.get();
+        block.indexEntries = Maps.newTreeMap();
+        entries.forEach(entry -> block.indexEntries.putIfAbsent(entry.getEntryId(), entry));
+        checkState(entries.size() == block.indexEntries.size());
+        block.segmentMetadata = metadata;
+        return block;
+    }
+
+    public static OffloadIndexBlockImpl get(InputStream stream) throws IOException {
+        OffloadIndexBlockImpl block = RECYCLER.get();
+        block.indexEntries = Maps.newTreeMap();
+        block.fromStream(stream);
+        return block;
+    }
+
+    public void recycle() {
+        segmentMetadata = null;
+        indexEntries.clear();
+        indexEntries = null;
+        if (recyclerHandle != null) {
+            recyclerHandle.recycle(this);
+        }
+    }
+
+    @Override
+    public OffloadIndexEntry getIndexEntryForEntry(long messageEntryId) throws IOException {
+        if(messageEntryId > segmentMetadata.getLastEntryId()) {
+            log.warn("Try to get entry: {}, which beyond lastEntryId {}, return null",
+                messageEntryId, segmentMetadata.getLastEntryId());
+            throw new IndexOutOfBoundsException("Entry index: " + messageEntryId +
+                " beyond lastEntryId: " + segmentMetadata.getLastEntryId());
+        }
+        // find the greatest mapping Id whose entryId <= messageEntryId
+        return this.indexEntries.floorEntry(messageEntryId).getValue();
+    }
+
+    @Override
+    public int getEntryCount() {
+        return this.indexEntries.size();
+    }
+
+    @Override
+    public LedgerMetadata getLedgerMetadata() {
+        return this.segmentMetadata;
+    }
+
+    private static byte[] buildLedgerMetadataFormat(LedgerMetadata metadata) {
+        LedgerMetadataFormat.Builder builder = LedgerMetadataFormat.newBuilder();
+        builder.setQuorumSize(metadata.getWriteQuorumSize())
+            .setAckQuorumSize(metadata.getAckQuorumSize())
+            .setEnsembleSize(metadata.getEnsembleSize())
+            .setLength(metadata.getLength())
+            .setState(metadata.isClosed() ? LedgerMetadataFormat.State.CLOSED : LedgerMetadataFormat.State.OPEN)
+            .setLastEntryId(metadata.getLastEntryId())
+            .setCtime(metadata.getCtime())
+            .setDigestType(BookKeeper.DigestType.toProtoDigestType(
+                BookKeeper.DigestType.fromApiDigestType(metadata.getDigestType())));
+
+        for (Map.Entry<String, byte[]> e : metadata.getCustomMetadata().entrySet()) {
+            builder.addCustomMetadataBuilder()
+                .setKey(e.getKey()).setValue(ByteString.copyFrom(e.getValue()));
+        }
+
+        for (Map.Entry<Long, ? extends List<BookieSocketAddress>> e : metadata.getAllEnsembles().entrySet()) {
+            builder.addSegmentBuilder()
+                .setFirstEntryId(e.getKey())
+                .addAllEnsembleMember(e.getValue().stream().map(a -> a.toString()).collect(Collectors.toList()));
+        }
+
+        return builder.build().toByteArray();
+    }
+
+    /**
+     * Get the content of the index block as InputStream.
+     * Read out in format:
+     *   | index_magic_header | index_block_len | index_entry_count |
+     *   |segment_metadata_len | segment metadata | index entries |
+     */
+    @Override
+    public InputStream toStream() throws IOException {
+        int indexBlockLength;
+        int segmentMetadataLength;
+        int indexEntryCount = this.indexEntries.size();
+
+        byte[] ledgerMetadataByte = buildLedgerMetadataFormat(this.segmentMetadata);
+        segmentMetadataLength = ledgerMetadataByte.length;
+
+        indexBlockLength = 4 /* magic header */
+            + 4 /* index block length */
+            + 4 /* segment metadata length */
+            + 4 /* index entry count */
+            + segmentMetadataLength
+            + indexEntryCount * (8 + 4 + 8); /* messageEntryId + blockPartId + blockOffset */
+
+        ByteBuf out = PooledByteBufAllocator.DEFAULT.buffer(indexBlockLength, indexBlockLength);
+
+        out.writeInt(INDEX_MAGIC_WORD)
+            .writeInt(indexBlockLength)
+            .writeInt(segmentMetadataLength)
+            .writeInt(indexEntryCount);
+
+        // write metadata
+        out.writeBytes(ledgerMetadataByte);
+
+        // write entries
+        this.indexEntries.entrySet().forEach(entry ->
+            out.writeLong(entry.getValue().getEntryId())
+                .writeInt(entry.getValue().getPartId())
+                .writeLong(entry.getValue().getOffset()));
+
+        return new ByteBufInputStream(out, true);
+    }
+
+    static private class InternalLedgerMetadata implements LedgerMetadata {
+        private LedgerMetadataFormat ledgerMetadataFormat;
+
+        private int ensembleSize;
+        private int writeQuorumSize;
+        private int ackQuorumSize;
+        private long lastEntryId;
+        private long length;
+        private DataFormats.LedgerMetadataFormat.DigestType digestType;
+        private long ctime;
+        private State state;
+        private Map<String, byte[]> customMetadata = Maps.newHashMap();
+        private TreeMap<Long, ArrayList<BookieSocketAddress>> ensembles = new TreeMap<Long, ArrayList<BookieSocketAddress>>();
+
+        InternalLedgerMetadata(LedgerMetadataFormat ledgerMetadataFormat) {
+            this.ensembleSize = ledgerMetadataFormat.getEnsembleSize();
+            this.writeQuorumSize = ledgerMetadataFormat.getQuorumSize();
+            this.ackQuorumSize = ledgerMetadataFormat.getAckQuorumSize();
+            this.lastEntryId = ledgerMetadataFormat.getLastEntryId();
+            this.length = ledgerMetadataFormat.getLength();
+            this.digestType = ledgerMetadataFormat.getDigestType();
+            this.ctime = ledgerMetadataFormat.getCtime();
+            this.state = ledgerMetadataFormat.getState();
+
+            if (ledgerMetadataFormat.getCustomMetadataCount() > 0) {
+                ledgerMetadataFormat.getCustomMetadataList().forEach(
+                    entry -> this.customMetadata.put(entry.getKey(), entry.getValue().toByteArray()));
+            }
+
+            ledgerMetadataFormat.getSegmentList().forEach(segment -> {
+                ArrayList<BookieSocketAddress> addressArrayList = new ArrayList<BookieSocketAddress>();
+                segment.getEnsembleMemberList().forEach(address -> {
+                    try {
+                        addressArrayList.add(new BookieSocketAddress(address));
+                    } catch (IOException e) {
+                        log.error("Exception when create BookieSocketAddress. ", e);
+                    }
+                });
+                this.ensembles.put(segment.getFirstEntryId(), addressArrayList);
+            });
+        }
+
+        @Override
+        public int getEnsembleSize() {
+            return this.ensembleSize;
+        }
+
+        @Override
+        public int getWriteQuorumSize() {
+            return this.writeQuorumSize;
+        }
+
+        @Override
+        public int getAckQuorumSize() {
+            return this.ackQuorumSize;
+        }
+
+        @Override
+        public long getLastEntryId() {
+            return this.lastEntryId;
+        }
+
+        @Override
+        public long getLength() {
+            return this.length;
+        }
+
+        @Override
+        public DigestType getDigestType() {
+            switch (this.digestType) {
+                case HMAC:
+                    return DigestType.MAC;
+                case CRC32:
+                    return DigestType.CRC32;
+                case CRC32C:
+                    return DigestType.CRC32C;
+                case DUMMY:
+                    return DigestType.DUMMY;
+                default:
+                    throw new IllegalArgumentException("Unable to convert digest type " + digestType);
+            }
+        }
+
+        @Override
+        public long getCtime() {
+            return this.ctime;
+        }
+
+        @Override
+        public boolean isClosed() {
+            return this.state == State.CLOSED;
+        }
+
+        @Override
+        public Map<String, byte[]> getCustomMetadata() {
+            return this.customMetadata;
+        }
+
+        @Override
+        public List<BookieSocketAddress> getEnsembleAt(long entryId) {
+            return ensembles.get(ensembles.headMap(entryId + 1).lastKey());
+        }
+
+        @Override
+        public NavigableMap<Long, ? extends List<BookieSocketAddress>> getAllEnsembles() {
+            return this.ensembles;
+        }
+    }
+
+    private static LedgerMetadata parseLedgerMetadata(byte[] bytes) throws IOException {
+        LedgerMetadataFormat.Builder builder = LedgerMetadataFormat.newBuilder();
+        builder.mergeFrom(bytes);
+        return new InternalLedgerMetadata(builder.build());
+    }
+
+    private OffloadIndexBlock fromStream(InputStream stream) throws IOException {
+        DataInputStream dis = new DataInputStream(stream);
+        int magic = dis.readInt();
+        if (magic != this.INDEX_MAGIC_WORD) {
+            throw new IOException("Invalid MagicWord. read: " + magic + " expected: " + INDEX_MAGIC_WORD);
+        }
+        int indexBlockLength = dis.readInt();
+        int segmentMetadataLength = dis.readInt();
+        int indexEntryCount = dis.readInt();
+
+        byte[] metadataBytes = new byte[segmentMetadataLength];
+
+        if (segmentMetadataLength != dis.read(metadataBytes)) {
+            log.error("Read ledgerMetadata from bytes failed");
+            throw new IOException("Read ledgerMetadata from bytes failed");
+        }
+        this.segmentMetadata = parseLedgerMetadata(metadataBytes);
+
+        for (int i = 0; i < indexEntryCount; i ++) {
+            long entryId = dis.readLong();
+            this.indexEntries.putIfAbsent(entryId, OffloadIndexEntryImpl.of(entryId, dis.readInt(), dis.readLong()));
+        }
+
+        return this;
+    }
+
+    public static int getIndexMagicWord() {
+        return INDEX_MAGIC_WORD;
+    }
+
+    @Override
+    public void close() {
+        recycle();
+    }
+
+}
+
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/s3offload/impl/OffloadIndexEntryImpl.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/s3offload/impl/OffloadIndexEntryImpl.java
new file mode 100644
index 0000000000..d74ba93792
--- /dev/null
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/s3offload/impl/OffloadIndexEntryImpl.java
@@ -0,0 +1,58 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.broker.s3offload.impl;
+
+import org.apache.pulsar.broker.s3offload.OffloadIndexEntry;
+
+/**
+ *
+ * The Index Entry in OffloadIndexBlock.
+ *
+ */
+public class OffloadIndexEntryImpl implements OffloadIndexEntry {
+    public static OffloadIndexEntryImpl of(long entryId, int partId, long offset) {
+        return new OffloadIndexEntryImpl(entryId, partId, offset);
+    }
+
+    private final long entryId;
+
+    private final int partId;
+
+    private final long offset;
+
+    @Override
+    public long getEntryId() {
+        return entryId;
+    }
+    @Override
+    public int getPartId() {
+        return partId;
+    }
+    @Override
+    public long getOffset() {
+        return offset;
+    }
+
+    public OffloadIndexEntryImpl(long entryId, int partId, long offset) {
+        this.entryId = entryId;
+        this.partId = partId;
+        this.offset = offset;
+    }
+}
+
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/s3offload/OffloadIndexTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/s3offload/OffloadIndexTest.java
new file mode 100644
index 0000000000..aeecbee686
--- /dev/null
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/s3offload/OffloadIndexTest.java
@@ -0,0 +1,237 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.s3offload;
+
+import static com.google.common.base.Charsets.UTF_8;
+import static org.testng.Assert.assertEquals;
+import static org.testng.Assert.assertTrue;
+import static org.testng.Assert.fail;
+
+import com.google.common.collect.Lists;
+import com.google.common.collect.Maps;
+import io.netty.buffer.ByteBuf;
+import io.netty.buffer.Unpooled;
+import java.io.ByteArrayInputStream;
+import java.io.IOException;
+import java.io.InputStream;
+import java.util.ArrayList;
+import java.util.Map;
+import lombok.extern.slf4j.Slf4j;
+import org.apache.bookkeeper.client.BookKeeper.DigestType;
+import org.apache.bookkeeper.client.api.LedgerMetadata;
+import org.apache.bookkeeper.net.BookieSocketAddress;
+import org.apache.pulsar.broker.s3offload.OffloadIndexBlock;
+import org.apache.pulsar.broker.s3offload.OffloadIndexBlockBuilder;
+import org.apache.pulsar.broker.s3offload.OffloadIndexEntry;
+import org.apache.pulsar.broker.s3offload.impl.OffloadIndexBlockImpl;
+import org.apache.pulsar.broker.s3offload.impl.OffloadIndexEntryImpl;
+import org.testng.annotations.Test;
+
+@Slf4j
+public class OffloadIndexTest {
+
+    @Test
+    public void offloadIndexEntryImplTest() {
+        // verify OffloadIndexEntryImpl builder
+        OffloadIndexEntryImpl entry1 = OffloadIndexEntryImpl.of(0, 2, 0);
+        OffloadIndexEntryImpl entry2 = OffloadIndexEntryImpl.of(100, 3, 1234);
+
+        // verify OffloadIndexEntryImpl get
+        assertEquals(entry1.getEntryId(), 0L);
+        assertEquals(entry1.getPartId(), 2);
+        assertEquals(entry1.getOffset(), 0L);
+
+        assertEquals(entry2.getEntryId(), 100L);
+        assertEquals(entry2.getPartId(), 3);
+        assertEquals(entry2.getOffset(), 1234L);
+    }
+
+
+    // use mock to setLastEntryId
+    class LedgerMetadataMock extends org.apache.bookkeeper.client.LedgerMetadata {
+        long lastId = 0;
+        public LedgerMetadataMock(int ensembleSize, int writeQuorumSize, int ackQuorumSize, org.apache.bookkeeper.client.BookKeeper.DigestType digestType, byte[] password, Map<String, byte[]> customMetadata, boolean storeSystemtimeAsLedgerCreationTime) {
+            super(ensembleSize, writeQuorumSize, ackQuorumSize, digestType, password, customMetadata, storeSystemtimeAsLedgerCreationTime);
+        }
+
+        @Override
+        public long getLastEntryId(){
+            return  lastId;
+        }
+
+        public void setLastEntryId(long lastId) {
+            this.lastId = lastId;
+        }
+    }
+
+    private LedgerMetadata createLedgerMetadata() throws Exception {
+
+        Map<String, byte[]> metadataCustom = Maps.newHashMap();
+        metadataCustom.put("key1", "value1".getBytes(UTF_8));
+        metadataCustom.put("key7", "value7".getBytes(UTF_8));
+
+        ArrayList<BookieSocketAddress> bookies = Lists.newArrayList();
+        BookieSocketAddress BOOKIE1 = new BookieSocketAddress("127.0.0.1:3181");
+        BookieSocketAddress BOOKIE2 = new BookieSocketAddress("127.0.0.2:3181");
+        BookieSocketAddress BOOKIE3 = new BookieSocketAddress("127.0.0.3:3181");
+        bookies.add(0, BOOKIE1);
+        bookies.add(1, BOOKIE2);
+        bookies.add(2, BOOKIE3);
+
+        LedgerMetadataMock metadata = new LedgerMetadataMock(3, 3, 2,
+            DigestType.CRC32C, "password".getBytes(UTF_8), metadataCustom, false);
+
+        metadata.addEnsemble(0, bookies);
+        metadata.setLastEntryId(5000);
+        return metadata;
+    }
+
+    // prepare metadata, then use builder to build a OffloadIndexBlockImpl
+    // verify get methods, readout and fromStream methods.
+    @Test
+    public void offloadIndexBlockImplTest() throws Exception {
+        OffloadIndexBlockBuilder blockBuilder = OffloadIndexBlockBuilder.create();
+        LedgerMetadata metadata = createLedgerMetadata();
+        log.debug("created metadata: {}", metadata.toString());
+
+        blockBuilder.withMetadata(metadata);
+
+        blockBuilder.addBlock(0, 2, 0);
+        blockBuilder.addBlock(1000, 3, 64 * 1024 * 1024);
+        blockBuilder.addBlock(2000, 4, 64 * 1024 * 1024);
+        OffloadIndexBlock indexBlock = blockBuilder.build();
+
+        // verify getEntryCount and getLedgerMetadata
+        assertEquals(indexBlock.getEntryCount(), 3);
+        assertEquals(indexBlock.getLedgerMetadata(), metadata);
+
+        // verify getIndexEntryForEntry
+        OffloadIndexEntry entry1 = indexBlock.getIndexEntryForEntry(0);
+        assertEquals(entry1.getEntryId(), 0);
+        assertEquals(entry1.getPartId(),2);
+        assertEquals(entry1.getOffset(), 0);
+
+        OffloadIndexEntry entry11 = indexBlock.getIndexEntryForEntry(500);
+        assertEquals(entry11, entry1);
+
+        OffloadIndexEntry entry2 = indexBlock.getIndexEntryForEntry(1000);
+        assertEquals(entry2.getEntryId(), 1000);
+        assertEquals(entry2.getPartId(), 3);
+        assertEquals(entry2.getOffset(), 64 * 1024 * 1024);
+
+        OffloadIndexEntry entry22 = indexBlock.getIndexEntryForEntry(1300);
+        assertEquals(entry22, entry2);
+
+        OffloadIndexEntry entry3 = indexBlock.getIndexEntryForEntry(2000);
+
+        assertEquals(entry3.getEntryId(), 2000);
+        assertEquals(entry3.getPartId(), 4);
+        assertEquals(entry3.getOffset(), 2 * 64 * 1024 * 1024);
+
+        OffloadIndexEntry entry33 = indexBlock.getIndexEntryForEntry(3000);
+        assertEquals(entry33, entry3);
+
+        try {
+            OffloadIndexEntry entry4 = indexBlock.getIndexEntryForEntry(6000);
+            fail("Should throw IndexOutOfBoundsException.");
+        } catch (Exception e) {
+            assertTrue(e instanceof IndexOutOfBoundsException);
+            assertEquals(e.getMessage(), "Entry index: 6000 beyond lastEntryId: 5000");
+        }
+
+        // verify toStream
+        InputStream out = indexBlock.toStream();
+        byte b[] = new byte[1024];
+        int readoutLen = out.read(b);
+        out.close();
+        ByteBuf wrapper = Unpooled.wrappedBuffer(b);
+        int magic = wrapper.readInt();
+        int indexBlockLength = wrapper.readInt();
+        int segmentMetadataLength = wrapper.readInt();
+        int indexEntryCount = wrapper.readInt();
+
+        // verify counter
+        assertEquals(magic, OffloadIndexBlockImpl.getIndexMagicWord());
+        assertEquals(indexBlockLength, readoutLen);
+        assertEquals(indexEntryCount, 3);
+
+        wrapper.readBytes(segmentMetadataLength);
+        log.debug("magic: {}, blockLength: {}, metadataLength: {}, indexCount: {}",
+            magic, indexBlockLength, segmentMetadataLength, indexEntryCount);
+
+        // verify entry
+        OffloadIndexEntry e1 = OffloadIndexEntryImpl.of(wrapper.readLong(), wrapper.readInt(), wrapper.readLong());
+        OffloadIndexEntry e2 = OffloadIndexEntryImpl.of(wrapper.readLong(), wrapper.readInt(), wrapper.readLong());
+        OffloadIndexEntry e3 = OffloadIndexEntryImpl.of(wrapper.readLong(), wrapper.readInt(), wrapper.readLong());;
+
+        assertEquals(e1.getEntryId(),entry1.getEntryId());
+        assertEquals(e1.getPartId(), entry1.getPartId());
+        assertEquals(e1.getOffset(), entry1.getOffset());
+        assertEquals(e2.getEntryId(), entry2.getEntryId());
+        assertEquals(e2.getPartId(), entry2.getPartId());
+        assertEquals(e2.getOffset(), entry2.getOffset());
+        assertEquals(e3.getEntryId(), entry3.getEntryId());
+        assertEquals(e3.getPartId(), entry3.getPartId());
+        assertEquals(e3.getOffset(), entry3.getOffset());
+        wrapper.release();
+
+        // verify build OffloadIndexBlock from InputStream
+        InputStream out2 = indexBlock.toStream();
+        int streamLength = out2.available();
+        out2.mark(0);
+        OffloadIndexBlock indexBlock2 = blockBuilder.fromStream(out2);
+        // 1. verify metadata that got from inputstream success.
+        LedgerMetadata metadata2 = indexBlock2.getLedgerMetadata();
+        log.debug("built metadata: {}", metadata2.toString());
+        assertEquals(metadata2.getAckQuorumSize(), metadata.getAckQuorumSize());
+        assertEquals(metadata2.getEnsembleSize(), metadata.getEnsembleSize());
+        assertEquals(metadata2.getDigestType(), metadata.getDigestType());
+        assertEquals(metadata2.getAllEnsembles().entrySet(), metadata.getAllEnsembles().entrySet());
+        // 2. verify set all the entries
+        assertEquals(indexBlock2.getEntryCount(), indexBlock.getEntryCount());
+        // 3. verify reach end
+        assertEquals(out2.read(), -1);
+
+
+        out2.reset();
+        byte streamContent[] = new byte[streamLength];
+        // stream with all 0, simulate junk data, should throw exception for header magic not match.
+        try(InputStream stream3 = new ByteArrayInputStream(streamContent, 0, streamLength)) {
+            OffloadIndexBlock indexBlock3 = blockBuilder.fromStream(stream3);
+            fail("Should throw IOException");
+        } catch (Exception e) {
+            assertTrue(e instanceof IOException);
+            assertTrue(e.getMessage().contains("Invalid MagicWord"));
+        }
+
+        // simulate read header too small, throw EOFException.
+        out2.read(streamContent);
+        try(InputStream stream4 =
+                new ByteArrayInputStream(streamContent, 0, streamLength - 1)) {
+            OffloadIndexBlock indexBlock4 = blockBuilder.fromStream(stream4);
+            fail("Should throw EOFException");
+        } catch (Exception e) {
+            assertTrue(e instanceof java.io.EOFException);
+        }
+
+        out2.close();
+        indexBlock.close();
+    }
+
+}


 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services