You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by si...@apache.org on 2018/05/03 07:54:57 UTC
[incubator-pulsar] branch master updated: PIP-17: provide
DataBlockHeader and implementation (#1669)
This is an automated email from the ASF dual-hosted git repository.
sijie pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-pulsar.git
The following commit(s) were added to refs/heads/master by this push:
new d1e719c PIP-17: provide DataBlockHeader and implementation (#1669)
d1e719c is described below
commit d1e719c9dd0190b5d28d3182fa9e2394a29476d7
Author: Jia Zhai <zh...@gmail.com>
AuthorDate: Thu May 3 15:54:55 2018 +0800
PIP-17: provide DataBlockHeader and implementation (#1669)
### Motivation
Provide DataBlockHeader implementation. A block header is in format:
`[ magic_word -- int ][ block_len -- int ][ first_entry_id -- long][alignment padding]`
Master issue #1511
### Modifications
Add class DataBlockHeader and test for it.
### Result
unit test pass.
---
.../pulsar/broker/s3offload/DataBlockHeader.java | 57 ++++++++++
.../broker/s3offload/impl/DataBlockHeaderImpl.java | 124 +++++++++++++++++++++
.../pulsar/s3offload/DataBlockHeaderTest.java | 83 ++++++++++++++
3 files changed, 264 insertions(+)
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/s3offload/DataBlockHeader.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/s3offload/DataBlockHeader.java
new file mode 100644
index 0000000..cae653b
--- /dev/null
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/s3offload/DataBlockHeader.java
@@ -0,0 +1,57 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.broker.s3offload;
+
+import java.io.IOException;
+import java.io.InputStream;
+import org.apache.bookkeeper.common.annotation.InterfaceStability.Unstable;
+
+/**
+ * The data block header in code storage for each data block
+ *
+ * Currently, It is in format:
+ * [ magic_word -- int ][ block_len -- int ][ first_entry_id -- long][padding]
+ *
+ * with the size: 4 + 4 + 8 + padding = 128 Bytes
+ */
+@Unstable
+public interface DataBlockHeader {
+
+ /**
+ * Get the length of the block in bytes, including the header.
+ */
+ int getBlockLength();
+
+ /**
+ * Get the message entry Id for the first message that stored in this data block.
+ */
+ long getFirstEntryId();
+
+ /**
+ * Get the size of this DataBlockHeader.
+ */
+ int getHeaderSize();
+
+ /**
+ * Get the content of the data block header as InputStream.
+ * Read out in current format.
+ */
+ InputStream toStream();
+}
+
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/s3offload/impl/DataBlockHeaderImpl.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/s3offload/impl/DataBlockHeaderImpl.java
new file mode 100644
index 0000000..5413cf7
--- /dev/null
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/s3offload/impl/DataBlockHeaderImpl.java
@@ -0,0 +1,124 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.broker.s3offload.impl;
+
+import io.netty.buffer.ByteBuf;
+import io.netty.buffer.ByteBufInputStream;
+import io.netty.buffer.PooledByteBufAllocator;
+import java.io.DataInputStream;
+import java.io.EOFException;
+import java.io.IOException;
+import java.io.InputStream;
+import org.apache.pulsar.broker.s3offload.DataBlockHeader;
+
+/**
+ *
+ * The data block header in code storage for each data block.
+ *
+ */
+public class DataBlockHeaderImpl implements DataBlockHeader {
+ // Magic Word for data block.
+ // It is a sequence of bytes used to identify the start of a block.
+ private static final int MAGIC_WORD = 0xDBDBDBDB;
+ // This is bigger than header size. Leaving some place for alignment and future enhancement.
+ // Payload use this as the start offset.
+ private static final int HEADER_MAX_SIZE = 128;
+ // The size of this header.
+ private static final int HEADER_SIZE = 4 /* magic word */
+ + 4 /* index block length */
+ + 8 /* first entry id */;
+ private static final byte[] PADDING = new byte[HEADER_MAX_SIZE - HEADER_SIZE];
+
+
+ public static DataBlockHeaderImpl of(int blockLength, long firstEntryId) {
+ return new DataBlockHeaderImpl(blockLength, firstEntryId);
+ }
+
+ // Construct DataBlockHeader from InputStream, which contains `HEADER_MAX_SIZE` bytes readable.
+ public static DataBlockHeader fromStream(InputStream stream) throws IOException {
+ DataInputStream dis = new DataInputStream(stream);
+ int magic = dis.readInt();
+ if (magic != MAGIC_WORD) {
+ throw new IOException("Data block header magic word not match. read: " + magic + " expected: " + MAGIC_WORD);
+ }
+
+ int blockLen = dis.readInt();
+ long firstEntryId = dis.readLong();
+
+ // padding part
+ if (PADDING.length != dis.skipBytes(PADDING.length)) {
+ throw new EOFException("Data block header magic word not match.");
+ }
+
+ return new DataBlockHeaderImpl(blockLen, firstEntryId);
+ }
+
+ private final int blockLength;
+ private final long firstEntryId;
+
+ static public int getBlockMagicWord() {
+ return MAGIC_WORD;
+ }
+
+ static public int getDataStartOffset() {
+ return HEADER_MAX_SIZE;
+ }
+
+ @Override
+ public int getBlockLength() {
+ return this.blockLength;
+ }
+
+ @Override
+ public long getFirstEntryId() {
+ return this.firstEntryId;
+ }
+
+ @Override
+ public int getHeaderSize() {
+ return HEADER_MAX_SIZE;
+ }
+
+ public DataBlockHeaderImpl(int blockLength, long firstEntryId) {
+ this.blockLength = blockLength;
+ this.firstEntryId = firstEntryId;
+ }
+
+ /**
+ * Get the content of the data block header as InputStream.
+ * Read out in format:
+ * [ magic_word -- int ][ block_len -- int ][ first_entry_id -- long] [padding zeros]
+ */
+ @Override
+ public InputStream toStream() {
+ int headerSize = 4 /* magic word */
+ + 4 /* index block length */
+ + 8 /* first entry id */;
+
+ ByteBuf out = PooledByteBufAllocator.DEFAULT.buffer(HEADER_MAX_SIZE, HEADER_MAX_SIZE);
+ out.writeInt(MAGIC_WORD)
+ .writeInt(blockLength)
+ .writeLong(firstEntryId)
+ .writeBytes(PADDING);
+
+ // true means the input stream will release the ByteBuf on close
+ return new ByteBufInputStream(out, true);
+ }
+}
+
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/s3offload/DataBlockHeaderTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/s3offload/DataBlockHeaderTest.java
new file mode 100644
index 0000000..9c87455
--- /dev/null
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/s3offload/DataBlockHeaderTest.java
@@ -0,0 +1,83 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.s3offload;
+
+import static org.testng.Assert.assertEquals;
+import static org.testng.Assert.assertTrue;
+import static org.testng.Assert.fail;
+
+import java.io.ByteArrayInputStream;
+import java.io.IOException;
+import java.io.InputStream;
+import lombok.extern.slf4j.Slf4j;
+import org.apache.pulsar.broker.s3offload.DataBlockHeader;
+import org.apache.pulsar.broker.s3offload.impl.DataBlockHeaderImpl;
+import org.testng.annotations.Test;
+
+@Slf4j
+public class DataBlockHeaderTest {
+
+ @Test
+ public void dataBlockHeaderImplTest() throws Exception {
+ int headerLength = 1024 * 1024;
+ long firstEntryId = 3333L;
+
+ DataBlockHeaderImpl dataBlockHeader = DataBlockHeaderImpl.of(headerLength,
+ firstEntryId);
+
+ // verify get methods
+ assertEquals(dataBlockHeader.getBlockMagicWord(), 0xDBDBDBDB);
+ assertEquals(dataBlockHeader.getBlockLength(), headerLength);
+ assertEquals(dataBlockHeader.getFirstEntryId(), firstEntryId);
+
+ // verify toStream and fromStream
+ InputStream stream = dataBlockHeader.toStream();
+ stream.mark(0);
+ DataBlockHeader rebuild = DataBlockHeaderImpl.fromStream(stream);
+ assertEquals(rebuild.getBlockLength(), headerLength);
+ assertEquals(rebuild.getFirstEntryId(), firstEntryId);
+ // verify InputStream reach end
+ assertEquals(stream.read(), -1);
+
+ stream.reset();
+ byte streamContent[] = new byte[DataBlockHeaderImpl.getDataStartOffset()];
+
+ // stream with all 0, simulate junk data, should throw exception for header magic not match.
+ try(InputStream stream2 = new ByteArrayInputStream(streamContent, 0, DataBlockHeaderImpl.getDataStartOffset())) {
+ DataBlockHeader rebuild2 = DataBlockHeaderImpl.fromStream(stream2);
+ fail("Should throw IOException");
+ } catch (Exception e) {
+ assertTrue(e instanceof IOException);
+ assertTrue(e.getMessage().contains("Data block header magic word not match"));
+ }
+
+ // simulate read header too small, throw EOFException.
+ stream.read(streamContent);
+ try(InputStream stream3 =
+ new ByteArrayInputStream(streamContent, 0, DataBlockHeaderImpl.getDataStartOffset() - 1)) {
+ DataBlockHeader rebuild3 = DataBlockHeaderImpl.fromStream(stream3);
+ fail("Should throw EOFException");
+ } catch (Exception e) {
+ assertTrue(e instanceof java.io.EOFException);
+ }
+
+ stream.close();
+ }
+
+}
--
To stop receiving notification emails like this one, please contact
sijie@apache.org.