You are viewing a plain text version of this content. The canonical link for it is here.
Posted to oak-commits@jackrabbit.apache.org by ju...@apache.org on 2013/02/06 19:47:21 UTC
svn commit: r1443120 - in /jackrabbit/oak/trunk/oak-core/src:
main/java/org/apache/jackrabbit/oak/plugins/segment/SegmentWriter.java
test/java/org/apache/jackrabbit/oak/plugins/segment/RecordTest.java
Author: jukka
Date: Wed Feb 6 18:47:21 2013
New Revision: 1443120
URL: http://svn.apache.org/viewvc?rev=1443120&view=rev
Log:
OAK-593: Segment-based MK
Improved handling of value records
Modified:
jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/segment/SegmentWriter.java
jackrabbit/oak/trunk/oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/segment/RecordTest.java
Modified: jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/segment/SegmentWriter.java
URL: http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/segment/SegmentWriter.java?rev=1443120&r1=1443119&r2=1443120&view=diff
==============================================================================
--- jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/segment/SegmentWriter.java (original)
+++ jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/segment/SegmentWriter.java Wed Feb 6 18:47:21 2013
@@ -32,6 +32,7 @@ import java.util.UUID;
import com.google.common.base.Charsets;
import com.google.common.collect.Lists;
+import com.google.common.io.ByteStreams;
public class SegmentWriter {
@@ -39,7 +40,9 @@ public class SegmentWriter {
static final int BLOCK_SIZE = 1 << 12; // 4kB
- static final int INLINE_LIMIT = 16 * BLOCK_SIZE; // 64kB
+ static final int INLINE_BLOCKS = 16;
+
+ static final int INLINE_SIZE = INLINE_BLOCKS * BLOCK_SIZE; // 64kB
private final SegmentStore store;
@@ -159,35 +162,63 @@ public class SegmentWriter {
* @param length number of bytes in the value
* @return value record identifier
*/
- public synchronized RecordId writeValue(
- byte[] bytes, int offset, int length) {
+ public RecordId writeString(String string) {
+ byte[] data = string.getBytes(Charsets.UTF_8);
List<RecordId> blockIds = new ArrayList<RecordId>();
- int remaining = length;
- // First create block segments for a large bulk of the value
- while (remaining > INLINE_LIMIT) {
- int len = Math.min(remaining, blockSegmentSize);
- UUID segmentId = UUID.randomUUID();
- store.createSegment(segmentId, bytes, offset, len);
- for (int position = 0; position < len; position += BLOCK_SIZE) {
- blockIds.add(new RecordId(segmentId, position));
+ int headLength = Math.min(data.length, INLINE_SIZE);
+ writeInlineBlocks(blockIds, data, 0, headLength);
+ if (data.length > headLength) {
+ int offset = headLength;
+ while (offset + INLINE_SIZE <= data.length) {
+ int bulkLength =
+ Math.min(data.length - offset, blockSegmentSize);
+ writeBulkSegment(blockIds, data, offset, bulkLength);
+ offset += bulkLength;
+ }
+ if (offset < data.length) {
+ writeInlineBlocks(blockIds, data, offset, data.length - offset);
}
- offset += len;
- remaining -= len;
}
- // Then inline any remaining full blocks
- while (remaining > BLOCK_SIZE) {
- blockIds.add(writeBlock(bytes, offset, BLOCK_SIZE));
- offset += BLOCK_SIZE;
- remaining -= BLOCK_SIZE;
- }
+ return writeValueRecord(data.length, blockIds);
+ }
+
+ public RecordId writeStream(InputStream stream) throws IOException {
+ List<RecordId> blockIds = new ArrayList<RecordId>();
- // Finally add the last partial block (if any)
- if (remaining > 0) {
- blockIds.add(writeBlock(bytes, offset, remaining));
+ // First read the head of the stream. This covers most small
+ // binaries and the frequently accessed head of larger ones.
+ // The head gets inlined in the current segment.
+ byte[] head = new byte[INLINE_SIZE];
+ int headLength = ByteStreams.read(stream, head, 0, head.length);
+
+ writeInlineBlocks(blockIds, head, 0, headLength);
+ long length = headLength;
+
+ // If the stream filled the full head buffer, it's likely that
+ // the bulk of the data is still to come. Read it in larger
+ // chunks and save in separate segments.
+ if (headLength == head.length) {
+ byte[] bulk = new byte[blockSegmentSize];
+ int bulkLength = ByteStreams.read(stream, bulk, 0, bulk.length);
+ while (bulkLength > INLINE_SIZE) {
+ writeBulkSegment(blockIds, bulk, 0, bulkLength);
+ length += bulkLength;
+ bulkLength = ByteStreams.read(stream, bulk, 0, bulk.length);
+ }
+ // The tail chunk of the stream is too small to put in a separate
+ // segment, so we inline also it.
+ if (bulkLength > 0) {
+ writeInlineBlocks(blockIds, bulk, 0, bulkLength);
+ length += bulkLength;
+ }
}
+ return writeValueRecord(length, blockIds);
+ }
+
+ private RecordId writeValueRecord(long length, List<RecordId> blockIds) {
// Store the list of blocks along with the length of the value
RecordId listId = writeList(blockIds);
RecordId valueId = prepare(8, Collections.singleton(listId));
@@ -196,13 +227,26 @@ public class SegmentWriter {
return valueId;
}
- public RecordId writeString(String string) {
- byte[] data = string.getBytes(Charsets.UTF_8);
- return writeValue(data, 0, data.length);
+ private void writeInlineBlocks(
+ List<RecordId> blockIds, byte[] buffer, int offset, int length) {
+ int begin = offset;
+ int end = offset + length;
+ while (begin + BLOCK_SIZE <= end) {
+ blockIds.add(writeBlock(buffer, begin, BLOCK_SIZE));
+ begin += BLOCK_SIZE;
+ }
+ if (begin < end) {
+ blockIds.add(writeBlock(buffer, begin, end - begin));
+ }
}
- public RecordId writeStream(InputStream stream) throws IOException {
- throw new UnsupportedOperationException(); // TODO
+ private void writeBulkSegment(
+ List<RecordId> blockIds, byte[] buffer, int offset, int length) {
+ UUID segmentId = UUID.randomUUID();
+ store.createSegment(segmentId, buffer, offset, length);
+ for (int position = 0; position < length; position += BLOCK_SIZE) {
+ blockIds.add(new RecordId(segmentId, position));
+ }
}
private void writeRecordId(RecordId id) {
Modified: jackrabbit/oak/trunk/oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/segment/RecordTest.java
URL: http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/segment/RecordTest.java?rev=1443120&r1=1443119&r2=1443120&view=diff
==============================================================================
--- jackrabbit/oak/trunk/oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/segment/RecordTest.java (original)
+++ jackrabbit/oak/trunk/oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/segment/RecordTest.java Wed Feb 6 18:47:21 2013
@@ -19,6 +19,7 @@ package org.apache.jackrabbit.oak.plugin
import static org.apache.jackrabbit.oak.plugins.segment.ListRecord.LEVEL_SIZE;
import static org.junit.Assert.assertEquals;
+import java.io.ByteArrayInputStream;
import java.io.IOException;
import java.io.InputStream;
import java.util.Arrays;
@@ -105,8 +106,8 @@ public class RecordTest {
checkRandomStreamRecord(1);
checkRandomStreamRecord(SegmentWriter.BLOCK_SIZE);
checkRandomStreamRecord(SegmentWriter.BLOCK_SIZE + 1);
- checkRandomStreamRecord(SegmentWriter.INLINE_LIMIT);
- checkRandomStreamRecord(SegmentWriter.INLINE_LIMIT + 1);
+ checkRandomStreamRecord(SegmentWriter.INLINE_SIZE);
+ checkRandomStreamRecord(SegmentWriter.INLINE_SIZE + 1);
checkRandomStreamRecord(store.getMaxSegmentSize());
checkRandomStreamRecord(store.getMaxSegmentSize() + 1);
checkRandomStreamRecord(store.getMaxSegmentSize() * 2);
@@ -117,7 +118,7 @@ public class RecordTest {
byte[] source = new byte[size];
random.nextBytes(source);
- RecordId valueId = writer.writeValue(source, 0, size);
+ RecordId valueId = writer.writeStream(new ByteArrayInputStream(source));
writer.flush();
InputStream stream = reader.readStream(valueId);
@@ -144,7 +145,7 @@ public class RecordTest {
RecordId hello = writer.writeString("Hello, World!");
StringBuilder builder = new StringBuilder();
- for (int i = 0; i < 1000; i++) {
+ for (int i = 0; i < 100000; i++) {
builder.append((char) ('0' + i % 10));
}
RecordId large = writer.writeString(builder.toString());