You are viewing a plain text version of this content. The canonical link for it is here.
Posted to oak-commits@jackrabbit.apache.org by ju...@apache.org on 2013/02/06 19:47:21 UTC

svn commit: r1443120 - in /jackrabbit/oak/trunk/oak-core/src: main/java/org/apache/jackrabbit/oak/plugins/segment/SegmentWriter.java test/java/org/apache/jackrabbit/oak/plugins/segment/RecordTest.java

Author: jukka
Date: Wed Feb  6 18:47:21 2013
New Revision: 1443120

URL: http://svn.apache.org/viewvc?rev=1443120&view=rev
Log:
OAK-593: Segment-based MK

Improved handling of value records

Modified:
    jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/segment/SegmentWriter.java
    jackrabbit/oak/trunk/oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/segment/RecordTest.java

Modified: jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/segment/SegmentWriter.java
URL: http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/segment/SegmentWriter.java?rev=1443120&r1=1443119&r2=1443120&view=diff
==============================================================================
--- jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/segment/SegmentWriter.java (original)
+++ jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/segment/SegmentWriter.java Wed Feb  6 18:47:21 2013
@@ -32,6 +32,7 @@ import java.util.UUID;
 
 import com.google.common.base.Charsets;
 import com.google.common.collect.Lists;
+import com.google.common.io.ByteStreams;
 
 public class SegmentWriter {
 
@@ -39,7 +40,9 @@ public class SegmentWriter {
 
     static final int BLOCK_SIZE = 1 << 12; // 4kB
 
-    static final int INLINE_LIMIT = 16 * BLOCK_SIZE; // 64kB
+    static final int INLINE_BLOCKS = 16;
+
+    static final int INLINE_SIZE = INLINE_BLOCKS * BLOCK_SIZE; // 64kB
 
     private final SegmentStore store;
 
@@ -159,35 +162,63 @@ public class SegmentWriter {
      * @param length number of bytes in the value
      * @return value record identifier
      */
-    public synchronized RecordId writeValue(
-            byte[] bytes, int offset, int length) {
+    public RecordId writeString(String string) {
+        byte[] data = string.getBytes(Charsets.UTF_8);
         List<RecordId> blockIds = new ArrayList<RecordId>();
-        int remaining = length;
 
-        // First create block segments for a large bulk of the value
-        while (remaining > INLINE_LIMIT) {
-            int len = Math.min(remaining, blockSegmentSize);
-            UUID segmentId = UUID.randomUUID();
-            store.createSegment(segmentId, bytes, offset, len);
-            for (int position = 0; position < len; position += BLOCK_SIZE) {
-                blockIds.add(new RecordId(segmentId, position));
+        int headLength = Math.min(data.length, INLINE_SIZE);
+        writeInlineBlocks(blockIds, data, 0, headLength);
+        if (data.length > headLength) {
+            int offset = headLength;
+            while (offset + INLINE_SIZE <= data.length) {
+                int bulkLength =
+                    Math.min(data.length - offset, blockSegmentSize);
+                writeBulkSegment(blockIds, data, offset, bulkLength);
+                offset += bulkLength;
+            }
+            if (offset < data.length) {
+                writeInlineBlocks(blockIds, data, offset, data.length - offset);
             }
-            offset += len;
-            remaining -= len;
         }
 
-        // Then inline any remaining full blocks
-        while (remaining > BLOCK_SIZE) {
-            blockIds.add(writeBlock(bytes, offset, BLOCK_SIZE));
-            offset += BLOCK_SIZE;
-            remaining -= BLOCK_SIZE;
-        }
+        return writeValueRecord(data.length, blockIds);
+    }
+
+    public RecordId writeStream(InputStream stream) throws IOException {
+        List<RecordId> blockIds = new ArrayList<RecordId>();
 
-        // Finally add the last partial block (if any)
-        if (remaining > 0) {
-            blockIds.add(writeBlock(bytes, offset, remaining));
+        // First read the head of the stream. This covers most small
+        // binaries and the frequently accessed head of larger ones.
+        // The head gets inlined in the current segment.
+        byte[] head = new byte[INLINE_SIZE];
+        int headLength = ByteStreams.read(stream, head, 0, head.length);
+
+        writeInlineBlocks(blockIds, head, 0, headLength);
+        long length = headLength;
+
+        // If the stream filled the full head buffer, it's likely that
+        // the bulk of the data is still to come. Read it in larger
+        // chunks and save in separate segments.
+        if (headLength == head.length) {
+            byte[] bulk = new byte[blockSegmentSize];
+            int bulkLength = ByteStreams.read(stream, bulk, 0, bulk.length);
+            while (bulkLength > INLINE_SIZE) {
+                writeBulkSegment(blockIds, bulk, 0, bulkLength);
+                length += bulkLength;
+                bulkLength = ByteStreams.read(stream, bulk, 0, bulk.length);
+            }
+            // The tail chunk of the stream is too small to put in a separate
+            // segment, so we inline also it.
+            if (bulkLength > 0) {
+                writeInlineBlocks(blockIds, bulk, 0, bulkLength);
+                length += bulkLength;
+            }
         }
 
+        return writeValueRecord(length, blockIds);
+    }
+
+    private RecordId writeValueRecord(long length, List<RecordId> blockIds) {
         // Store the list of blocks along with the length of the value
         RecordId listId = writeList(blockIds);
         RecordId valueId = prepare(8, Collections.singleton(listId));
@@ -196,13 +227,26 @@ public class SegmentWriter {
         return valueId;
     }
 
-    public RecordId writeString(String string) {
-        byte[] data = string.getBytes(Charsets.UTF_8);
-        return writeValue(data, 0, data.length);
+    private void writeInlineBlocks(
+            List<RecordId> blockIds, byte[] buffer, int offset, int length) {
+        int begin = offset;
+        int end = offset + length;
+        while (begin + BLOCK_SIZE <= end) {
+            blockIds.add(writeBlock(buffer, begin, BLOCK_SIZE));
+            begin += BLOCK_SIZE;
+        }
+        if (begin < end) {
+            blockIds.add(writeBlock(buffer, begin, end - begin));
+        }
     }
 
-    public RecordId writeStream(InputStream stream) throws IOException {
-        throw new UnsupportedOperationException(); // TODO
+    private void writeBulkSegment(
+            List<RecordId> blockIds, byte[] buffer, int offset, int length) {
+        UUID segmentId = UUID.randomUUID();
+        store.createSegment(segmentId, buffer, offset, length);
+        for (int position = 0; position < length; position += BLOCK_SIZE) {
+            blockIds.add(new RecordId(segmentId, position));
+        }
     }
 
     private void writeRecordId(RecordId id) {

Modified: jackrabbit/oak/trunk/oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/segment/RecordTest.java
URL: http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/segment/RecordTest.java?rev=1443120&r1=1443119&r2=1443120&view=diff
==============================================================================
--- jackrabbit/oak/trunk/oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/segment/RecordTest.java (original)
+++ jackrabbit/oak/trunk/oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/segment/RecordTest.java Wed Feb  6 18:47:21 2013
@@ -19,6 +19,7 @@ package org.apache.jackrabbit.oak.plugin
 import static org.apache.jackrabbit.oak.plugins.segment.ListRecord.LEVEL_SIZE;
 import static org.junit.Assert.assertEquals;
 
+import java.io.ByteArrayInputStream;
 import java.io.IOException;
 import java.io.InputStream;
 import java.util.Arrays;
@@ -105,8 +106,8 @@ public class RecordTest {
         checkRandomStreamRecord(1);
         checkRandomStreamRecord(SegmentWriter.BLOCK_SIZE);
         checkRandomStreamRecord(SegmentWriter.BLOCK_SIZE + 1);
-        checkRandomStreamRecord(SegmentWriter.INLINE_LIMIT);
-        checkRandomStreamRecord(SegmentWriter.INLINE_LIMIT + 1);
+        checkRandomStreamRecord(SegmentWriter.INLINE_SIZE);
+        checkRandomStreamRecord(SegmentWriter.INLINE_SIZE + 1);
         checkRandomStreamRecord(store.getMaxSegmentSize());
         checkRandomStreamRecord(store.getMaxSegmentSize() + 1);
         checkRandomStreamRecord(store.getMaxSegmentSize() * 2);
@@ -117,7 +118,7 @@ public class RecordTest {
         byte[] source = new byte[size];
         random.nextBytes(source);
 
-        RecordId valueId = writer.writeValue(source, 0, size);
+        RecordId valueId = writer.writeStream(new ByteArrayInputStream(source));
         writer.flush();
 
         InputStream stream = reader.readStream(valueId);
@@ -144,7 +145,7 @@ public class RecordTest {
         RecordId hello = writer.writeString("Hello, World!");
 
         StringBuilder builder = new StringBuilder();
-        for (int i = 0; i < 1000; i++) {
+        for (int i = 0; i < 100000; i++) {
             builder.append((char) ('0' + i % 10));
         }
         RecordId large = writer.writeString(builder.toString());