You are viewing a plain text version of this content. The canonical link for it is here.
Posted to oak-commits@jackrabbit.apache.org by ju...@apache.org on 2013/02/14 22:05:56 UTC

svn commit: r1446326 - in /jackrabbit/oak/trunk/oak-core/src: main/java/org/apache/jackrabbit/oak/plugins/segment/SegmentWriter.java test/java/org/apache/jackrabbit/oak/plugins/segment/RecordTest.java

Author: jukka
Date: Thu Feb 14 21:05:56 2013
New Revision: 1446326

URL: http://svn.apache.org/r1446326
Log:
OAK-593: Segment-based MK

Store large binaries entirely in separate bulk segments to make normal segments more useful for caching.

Modified:
    jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/segment/SegmentWriter.java
    jackrabbit/oak/trunk/oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/segment/RecordTest.java

Modified: jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/segment/SegmentWriter.java
URL: http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/segment/SegmentWriter.java?rev=1446326&r1=1446325&r2=1446326&view=diff
==============================================================================
--- jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/segment/SegmentWriter.java (original)
+++ jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/segment/SegmentWriter.java Thu Feb 14 21:05:56 2013
@@ -53,10 +53,6 @@ public class SegmentWriter {
 
     static final int BLOCK_SIZE = 1 << 12; // 4kB
 
-    static final int INLINE_BLOCKS = 16;
-
-    static final int INLINE_SIZE = INLINE_BLOCKS * BLOCK_SIZE; // 64kB
-
     private final SegmentStore store;
 
     private final int blocksPerSegment;
@@ -137,28 +133,6 @@ public class SegmentWriter {
         buffer.putInt(index << 24 | id.getOffset());
     }
 
-    private void writeInlineBlocks(
-            List<RecordId> blockIds, byte[] buffer, int offset, int length) {
-        int begin = offset;
-        int end = offset + length;
-        while (begin + BLOCK_SIZE <= end) {
-            blockIds.add(writeBlock(buffer, begin, BLOCK_SIZE));
-            begin += BLOCK_SIZE;
-        }
-        if (begin < end) {
-            blockIds.add(writeBlock(buffer, begin, end - begin));
-        }
-    }
-
-    private void writeBulkSegment(
-            List<RecordId> blockIds, byte[] buffer, int offset, int length) {
-        UUID segmentId = UUID.randomUUID();
-        store.createSegment(segmentId, buffer, offset, length);
-        for (int position = 0; position < length; position += BLOCK_SIZE) {
-            blockIds.add(new RecordId(segmentId, position));
-        }
-    }
-
     private synchronized RecordId writeListBucket(List<RecordId> bucket) {
         RecordId bucketId = prepare(0, bucket);
         for (RecordId id : bucket) {
@@ -355,49 +329,7 @@ public class SegmentWriter {
         RecordId id = SegmentStream.getRecordIdIfAvailable(stream);
         if (id == null) {
             try {
-                List<RecordId> blockIds = new ArrayList<RecordId>();
-
-                // First read the head of the stream. This covers most small
-                // values and the frequently accessed head of larger ones.
-                // The head gets inlined in the current segment.
-                byte[] head = new byte[INLINE_SIZE];
-                int headLength = ByteStreams.read(stream, head, 0, head.length);
-
-                if (headLength < 0x80) {
-                    id = prepare(1 + headLength);
-                    buffer.put((byte) headLength);
-                    buffer.put(head, 0, headLength);
-                } else if (headLength - 0x80 < 0x4000) {
-                    id = prepare(2 + headLength);
-                    buffer.putShort((short) ((headLength - 0x80) | 0x8000));
-                    buffer.put(head, 0, headLength);
-                } else {
-                    writeInlineBlocks(blockIds, head, 0, headLength);
-                    long length = headLength;
-
-                    // If the stream filled the full head buffer, it's likely
-                    // that the bulk of the data is still to come. Read it
-                    // in larger chunks and save in separate segments.
-                    if (headLength == head.length) {
-                        byte[] bulk = new byte[blockSegmentSize];
-                        int bulkLength = ByteStreams.read(
-                                stream, bulk, 0, bulk.length);
-                        while (bulkLength > INLINE_SIZE) {
-                            writeBulkSegment(blockIds, bulk, 0, bulkLength);
-                            length += bulkLength;
-                            bulkLength = ByteStreams.read(
-                                    stream, bulk, 0, bulk.length);
-                        }
-                        // The tail chunk of the stream is too small to put in
-                        // a separate segment, so we inline also it.
-                        if (bulkLength > 0) {
-                            writeInlineBlocks(blockIds, bulk, 0, bulkLength);
-                            length += bulkLength;
-                        }
-                    }
-
-                    id = writeValueRecord(length, writeList(blockIds));
-                }
+                id = internalWriteStream(stream);
             } finally {
                 stream.close();
             }
@@ -405,6 +337,50 @@ public class SegmentWriter {
         return id;
     }
 
+    private RecordId internalWriteStream(InputStream stream)
+            throws IOException {
+        // First read the head of the stream. This covers most small
+        // values and the frequently accessed head of larger ones.
+        // The head gets inlined in the current segment.
+        byte[] head = new byte[0x4080];
+        int headLength = ByteStreams.read(stream, head, 0, head.length);
+
+        if (headLength < 0x80) {
+            RecordId id = prepare(1 + headLength);
+            buffer.put((byte) headLength);
+            buffer.put(head, 0, headLength);
+            return id;
+        } else if (headLength < 0x4080) {
+            RecordId id = prepare(2 + headLength);
+            buffer.putShort((short) ((headLength - 0x80) | 0x8000));
+            buffer.put(head, 0, headLength);
+            return id;
+        } else {
+            // If the stream filled the full head buffer, it's likely
+            // that the bulk of the data is still to come. Read it
+            // in larger chunks and save in separate segments.
+
+            long length = 0;
+            List<RecordId> blockIds = new ArrayList<RecordId>();
+
+            byte[] bulk = new byte[blockSegmentSize];
+            System.arraycopy(head, 0, bulk, 0, headLength);
+            int bulkLength = headLength + ByteStreams.read(
+                    stream, bulk, headLength, bulk.length - headLength);
+            while (bulkLength > 0) {
+                UUID segmentId = UUID.randomUUID();
+                store.createSegment(segmentId, bulk, 0, bulkLength);
+                for (int pos = 0; pos < bulkLength; pos += BLOCK_SIZE) {
+                    blockIds.add(new RecordId(segmentId, pos));
+                }
+                length += bulkLength;
+                bulkLength = ByteStreams.read(stream, bulk, 0, bulk.length);
+            }
+
+            return writeValueRecord(length, writeList(blockIds));
+        }
+    }
+
     private RecordId writeProperty(PropertyState state) {
         Type<?> type = state.getType();
         int count = state.count();

Modified: jackrabbit/oak/trunk/oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/segment/RecordTest.java
URL: http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/segment/RecordTest.java?rev=1446326&r1=1446325&r2=1446326&view=diff
==============================================================================
--- jackrabbit/oak/trunk/oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/segment/RecordTest.java (original)
+++ jackrabbit/oak/trunk/oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/segment/RecordTest.java Thu Feb 14 21:05:56 2013
@@ -115,10 +115,12 @@ public class RecordTest {
     public void testStreamRecord() throws IOException {
         checkRandomStreamRecord(0);
         checkRandomStreamRecord(1);
+        checkRandomStreamRecord(0x79);
+        checkRandomStreamRecord(0x80);
+        checkRandomStreamRecord(0x4079);
+        checkRandomStreamRecord(0x4080);
         checkRandomStreamRecord(SegmentWriter.BLOCK_SIZE);
         checkRandomStreamRecord(SegmentWriter.BLOCK_SIZE + 1);
-        checkRandomStreamRecord(SegmentWriter.INLINE_SIZE);
-        checkRandomStreamRecord(SegmentWriter.INLINE_SIZE + 1);
         checkRandomStreamRecord(store.getMaxSegmentSize());
         checkRandomStreamRecord(store.getMaxSegmentSize() + 1);
         checkRandomStreamRecord(store.getMaxSegmentSize() * 2);