You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by si...@apache.org on 2018/05/03 07:54:57 UTC

[incubator-pulsar] branch master updated: PIP-17: provide DataBlockHeader and implementation (#1669)

This is an automated email from the ASF dual-hosted git repository.

sijie pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-pulsar.git


The following commit(s) were added to refs/heads/master by this push:
     new d1e719c  PIP-17: provide DataBlockHeader and implementation (#1669)
d1e719c is described below

commit d1e719c9dd0190b5d28d3182fa9e2394a29476d7
Author: Jia Zhai <zh...@gmail.com>
AuthorDate: Thu May 3 15:54:55 2018 +0800

    PIP-17: provide DataBlockHeader and implementation (#1669)
    
    ### Motivation
    Provide DataBlockHeader implementation. A block header is in format:
    `[ magic_word -- int ][ block_len -- int ][ first_entry_id  -- long][alignment padding]`
    
    Master issue #1511
    
    ### Modifications
    
    Add class DataBlockHeader and test for it.
    
    ### Result
    
    unit test pass.
---
 .../pulsar/broker/s3offload/DataBlockHeader.java   |  57 ++++++++++
 .../broker/s3offload/impl/DataBlockHeaderImpl.java | 124 +++++++++++++++++++++
 .../pulsar/s3offload/DataBlockHeaderTest.java      |  83 ++++++++++++++
 3 files changed, 264 insertions(+)

diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/s3offload/DataBlockHeader.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/s3offload/DataBlockHeader.java
new file mode 100644
index 0000000..cae653b
--- /dev/null
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/s3offload/DataBlockHeader.java
@@ -0,0 +1,57 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.broker.s3offload;
+
+import java.io.IOException;
+import java.io.InputStream;
+import org.apache.bookkeeper.common.annotation.InterfaceStability.Unstable;
+
+/**
+ * The data block header in code storage for each data block
+ *
+ * Currently, It is in format:
+ * [ magic_word -- int ][ block_len -- int ][ first_entry_id  -- long][padding]
+ *
+ * with the size: 4 + 4 + 8 + padding = 128 Bytes
+ */
+@Unstable
+public interface DataBlockHeader {
+
+    /**
+     * Get the length of the block in bytes, including the header.
+     */
+    int getBlockLength();
+
+    /**
+     * Get the message entry Id for the first message that stored in this data block.
+     */
+    long getFirstEntryId();
+
+    /**
+     * Get the size of this DataBlockHeader.
+     */
+    int getHeaderSize();
+
+    /**
+     * Get the content of the data block header as InputStream.
+     * Read out in current format.
+     */
+    InputStream toStream();
+}
+
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/s3offload/impl/DataBlockHeaderImpl.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/s3offload/impl/DataBlockHeaderImpl.java
new file mode 100644
index 0000000..5413cf7
--- /dev/null
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/s3offload/impl/DataBlockHeaderImpl.java
@@ -0,0 +1,124 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.broker.s3offload.impl;
+
+import io.netty.buffer.ByteBuf;
+import io.netty.buffer.ByteBufInputStream;
+import io.netty.buffer.PooledByteBufAllocator;
+import java.io.DataInputStream;
+import java.io.EOFException;
+import java.io.IOException;
+import java.io.InputStream;
+import org.apache.pulsar.broker.s3offload.DataBlockHeader;
+
+/**
+ *
+ * The data block header in code storage for each data block.
+ *
+ */
+public class DataBlockHeaderImpl implements DataBlockHeader {
+    // Magic Word for data block.
+    // It is a sequence of bytes used to identify the start of a block.
+    private static final int MAGIC_WORD = 0xDBDBDBDB;
+    // This is bigger than header size. Leaving some place for alignment and future enhancement.
+    // Payload use this as the start offset.
+    private static final int HEADER_MAX_SIZE = 128;
+    // The size of this header.
+    private static final int HEADER_SIZE = 4 /* magic word */
+        + 4 /* index block length */
+        + 8 /* first entry id */;
+    private static final byte[] PADDING = new byte[HEADER_MAX_SIZE - HEADER_SIZE];
+
+
+    public static DataBlockHeaderImpl of(int blockLength, long firstEntryId) {
+        return new DataBlockHeaderImpl(blockLength, firstEntryId);
+    }
+
+    // Construct DataBlockHeader from InputStream, which contains `HEADER_MAX_SIZE` bytes readable.
+    public static DataBlockHeader fromStream(InputStream stream) throws IOException {
+        DataInputStream dis = new DataInputStream(stream);
+        int magic = dis.readInt();
+        if (magic != MAGIC_WORD) {
+            throw new IOException("Data block header magic word not match. read: " + magic + " expected: " + MAGIC_WORD);
+        }
+
+        int blockLen = dis.readInt();
+        long firstEntryId = dis.readLong();
+
+        // padding part
+        if (PADDING.length != dis.skipBytes(PADDING.length)) {
+            throw new EOFException("Data block header magic word not match.");
+        }
+
+        return new DataBlockHeaderImpl(blockLen, firstEntryId);
+    }
+
+    private final int blockLength;
+    private final long firstEntryId;
+
+    static public int getBlockMagicWord() {
+        return MAGIC_WORD;
+    }
+
+    static public int getDataStartOffset() {
+        return HEADER_MAX_SIZE;
+    }
+
+    @Override
+    public int getBlockLength() {
+        return this.blockLength;
+    }
+
+    @Override
+    public long getFirstEntryId() {
+        return this.firstEntryId;
+    }
+
+    @Override
+    public int getHeaderSize() {
+        return HEADER_MAX_SIZE;
+    }
+
+    public DataBlockHeaderImpl(int blockLength, long firstEntryId) {
+        this.blockLength = blockLength;
+        this.firstEntryId = firstEntryId;
+    }
+
+    /**
+     * Get the content of the data block header as InputStream.
+     * Read out in format:
+     *   [ magic_word -- int ][ block_len -- int ][ first_entry_id  -- long] [padding zeros]
+     */
+    @Override
+    public InputStream toStream() {
+        int headerSize = 4 /* magic word */
+            + 4 /* index block length */
+            + 8 /* first entry id */;
+
+        ByteBuf out = PooledByteBufAllocator.DEFAULT.buffer(HEADER_MAX_SIZE, HEADER_MAX_SIZE);
+        out.writeInt(MAGIC_WORD)
+            .writeInt(blockLength)
+            .writeLong(firstEntryId)
+            .writeBytes(PADDING);
+
+        // true means the input stream will release the ByteBuf on close
+        return new ByteBufInputStream(out, true);
+    }
+}
+
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/s3offload/DataBlockHeaderTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/s3offload/DataBlockHeaderTest.java
new file mode 100644
index 0000000..9c87455
--- /dev/null
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/s3offload/DataBlockHeaderTest.java
@@ -0,0 +1,83 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.s3offload;
+
+import static org.testng.Assert.assertEquals;
+import static org.testng.Assert.assertTrue;
+import static org.testng.Assert.fail;
+
+import java.io.ByteArrayInputStream;
+import java.io.IOException;
+import java.io.InputStream;
+import lombok.extern.slf4j.Slf4j;
+import org.apache.pulsar.broker.s3offload.DataBlockHeader;
+import org.apache.pulsar.broker.s3offload.impl.DataBlockHeaderImpl;
+import org.testng.annotations.Test;
+
+@Slf4j
+public class DataBlockHeaderTest {
+
+    @Test
+    public void dataBlockHeaderImplTest() throws Exception {
+        int headerLength = 1024 * 1024;
+        long firstEntryId = 3333L;
+
+        DataBlockHeaderImpl dataBlockHeader = DataBlockHeaderImpl.of(headerLength,
+            firstEntryId);
+
+        // verify get methods
+        assertEquals(dataBlockHeader.getBlockMagicWord(), 0xDBDBDBDB);
+        assertEquals(dataBlockHeader.getBlockLength(), headerLength);
+        assertEquals(dataBlockHeader.getFirstEntryId(), firstEntryId);
+
+        // verify toStream and fromStream
+        InputStream stream = dataBlockHeader.toStream();
+        stream.mark(0);
+        DataBlockHeader rebuild = DataBlockHeaderImpl.fromStream(stream);
+        assertEquals(rebuild.getBlockLength(), headerLength);
+        assertEquals(rebuild.getFirstEntryId(), firstEntryId);
+        // verify InputStream reach end
+        assertEquals(stream.read(), -1);
+
+        stream.reset();
+        byte streamContent[] = new byte[DataBlockHeaderImpl.getDataStartOffset()];
+
+        // stream with all 0, simulate junk data, should throw exception for header magic not match.
+        try(InputStream stream2 = new ByteArrayInputStream(streamContent, 0, DataBlockHeaderImpl.getDataStartOffset())) {
+            DataBlockHeader rebuild2 = DataBlockHeaderImpl.fromStream(stream2);
+            fail("Should throw IOException");
+        } catch (Exception e) {
+            assertTrue(e instanceof IOException);
+            assertTrue(e.getMessage().contains("Data block header magic word not match"));
+        }
+
+        // simulate read header too small, throw EOFException.
+        stream.read(streamContent);
+        try(InputStream stream3 =
+                new ByteArrayInputStream(streamContent, 0, DataBlockHeaderImpl.getDataStartOffset() - 1)) {
+            DataBlockHeader rebuild3 = DataBlockHeaderImpl.fromStream(stream3);
+            fail("Should throw EOFException");
+        } catch (Exception e) {
+            assertTrue(e instanceof java.io.EOFException);
+        }
+
+        stream.close();
+    }
+
+}

-- 
To stop receiving notification emails like this one, please contact
sijie@apache.org.