You are viewing a plain text version of this content. The canonical link for it is here.
Posted to oak-commits@jackrabbit.apache.org by ju...@apache.org on 2013/11/06 21:08:44 UTC
svn commit: r1539450 - in
/jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/segment:
Segment.java SegmentStore.java SegmentWriter.java file/FileStore.java
memory/MemoryStore.java mongo/MongoStore.java
Author: jukka
Date: Wed Nov 6 20:08:43 2013
New Revision: 1539450
URL: http://svn.apache.org/r1539450
Log:
OAK-1036: SegmentMK: Auto-flushing SegmentNodeBuilder
Inline the referenced segment ids to a new segment already in SegmentWriter
Modified:
jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/segment/Segment.java
jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/segment/SegmentStore.java
jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/segment/SegmentWriter.java
jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/segment/file/FileStore.java
jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/segment/memory/MemoryStore.java
jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/segment/mongo/MongoStore.java
Modified: jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/segment/Segment.java
URL: http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/segment/Segment.java?rev=1539450&r1=1539449&r2=1539450&view=diff
==============================================================================
--- jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/segment/Segment.java (original)
+++ jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/segment/Segment.java Wed Nov 6 20:08:43 2013
@@ -24,7 +24,6 @@ import static org.apache.jackrabbit.oak.
import java.nio.ByteBuffer;
import java.util.Arrays;
-import java.util.Collection;
import java.util.UUID;
import java.util.concurrent.Callable;
@@ -103,25 +102,6 @@ public class Segment {
this.data = checkNotNull(data);
}
- public Segment(
- SegmentStore store, UUID uuid, Collection<UUID> refids,
- byte[] buffer, int offset, int length) {
- this.store = checkNotNull(store);
- this.uuid = checkNotNull(uuid);
-
- checkNotNull(refids);
- checkNotNull(buffer);
- checkPositionIndexes(offset, offset + length, buffer.length);
-
- this.data = ByteBuffer.allocate(refids.size() * 16 + length);
- for (UUID refid : refids) {
- data.putLong(refid.getMostSignificantBits());
- data.putLong(refid.getLeastSignificantBits());
- }
- data.put(buffer, offset, length);
- data.rewind();
- }
-
/**
* Maps the given record offset to the respective position within the
* internal {@link #data} array. The validity of a record with the given
Modified: jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/segment/SegmentStore.java
URL: http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/segment/SegmentStore.java?rev=1539450&r1=1539449&r2=1539450&view=diff
==============================================================================
--- jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/segment/SegmentStore.java (original)
+++ jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/segment/SegmentStore.java Wed Nov 6 20:08:43 2013
@@ -16,7 +16,6 @@
*/
package org.apache.jackrabbit.oak.plugins.segment;
-import java.util.Collection;
import java.util.UUID;
import java.util.concurrent.Callable;
@@ -32,14 +31,11 @@ public interface SegmentStore {
* Writes the given segment to the segment store.
*
* @param segmentId segment identifier
- * @param referencedSegmentIds identifiers of all the referenced segments
* @param bytes byte buffer that contains the raw contents of the segment
* @param offset start offset within the byte buffer
* @param length length of the segment
*/
- void writeSegment(
- UUID segmentId, Collection<UUID> referencedSegmentIds,
- byte[] bytes, int offset, int length);
+ void writeSegment(UUID segmentId, byte[] bytes, int offset, int length);
void deleteSegment(UUID segmentId);
Modified: jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/segment/SegmentWriter.java
URL: http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/segment/SegmentWriter.java?rev=1539450&r1=1539449&r2=1539450&view=diff
==============================================================================
--- jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/segment/SegmentWriter.java (original)
+++ jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/segment/SegmentWriter.java Wed Nov 6 20:08:43 2013
@@ -23,6 +23,7 @@ import static com.google.common.base.Pre
import static com.google.common.base.Preconditions.checkPositionIndex;
import static com.google.common.base.Preconditions.checkPositionIndexes;
import static com.google.common.base.Preconditions.checkState;
+import static com.google.common.collect.Lists.newArrayListWithExpectedSize;
import static com.google.common.collect.Maps.newHashMap;
import static java.util.Collections.emptyMap;
import static org.apache.jackrabbit.oak.api.Type.NAME;
@@ -33,7 +34,7 @@ import static org.apache.jackrabbit.oak.
import java.io.ByteArrayInputStream;
import java.io.IOException;
import java.io.InputStream;
-import java.util.ArrayList;
+import java.nio.ByteBuffer;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
@@ -115,17 +116,21 @@ public class SegmentWriter {
public SegmentWriter(SegmentStore store) {
this.store = store;
- this.dummySegment = new Segment(
- store, UUID.randomUUID(), Collections.<UUID>emptyList(),
- new byte[0], 0, 0);
+ this.dummySegment =
+ new Segment(store, UUID.randomUUID(), ByteBuffer.allocate(0));
}
public synchronized Segment getCurrentSegment(UUID id) {
if (equal(id, uuid)) {
if (currentSegment == null) {
- currentSegment = new Segment(
- store, uuid, uuids.keySet(),
- buffer, buffer.length - length, length);
+ ByteBuffer b = ByteBuffer.allocate(16 * uuids.size() + length);
+ for (UUID refid : uuids.keySet()) {
+ b.putLong(refid.getMostSignificantBits());
+ b.putLong(refid.getLeastSignificantBits());
+ }
+ b.put(buffer, buffer.length - length, length);
+ b.rewind();
+ currentSegment = new Segment(store, uuid, b);
}
return currentSegment;
} else {
@@ -139,9 +144,16 @@ public class SegmentWriter {
public synchronized void flush() {
if (length > 0) {
- store.writeSegment(
- uuid, uuids.keySet(),
- buffer, buffer.length - length, length);
+ length += 16 * uuids.size();
+
+ ByteBuffer b = ByteBuffer.wrap(
+ buffer, buffer.length - length, 16 * uuids.size());
+ for (UUID refid : uuids.keySet()) {
+ b.putLong(refid.getMostSignificantBits());
+ b.putLong(refid.getLeastSignificantBits());
+ }
+
+ store.writeSegment(uuid, buffer, buffer.length - length, length);
uuid = UUID.randomUUID();
uuids.clear();
@@ -167,13 +179,11 @@ public class SegmentWriter {
}
int fullSize = size + ids.size() * Segment.RECORD_ID_BYTES;
- checkArgument(fullSize > 0);
-
int alignment = Segment.RECORD_ALIGN_BYTES - 1;
int alignedSize = (fullSize + alignment) & ~alignment;
- int segmentReferenceCount = uuids.size() + segmentIds.size();
- if (length + alignedSize > buffer.length
- || segmentReferenceCount > Segment.SEGMENT_REFERENCE_LIMIT) {
+ int refs = uuids.size() + segmentIds.size();
+ if (refs * 16 + length + alignedSize > buffer.length - 1
+ || refs > Segment.SEGMENT_REFERENCE_LIMIT) {
flush();
}
@@ -516,60 +526,56 @@ public class SegmentWriter {
private RecordId internalWriteStream(InputStream stream)
throws IOException {
- // First read the head of the stream. This covers most small
- // values and the frequently accessed head of larger ones.
- // The head gets inlined in the current segment.
- byte[] head = new byte[Segment.MEDIUM_LIMIT];
- int headLength = ByteStreams.read(stream, head, 0, head.length);
+ byte[] data = new byte[Segment.MAX_SEGMENT_SIZE];
+ int n = ByteStreams.read(stream, data, 0, data.length);
- if (headLength < Segment.SMALL_LIMIT) {
+ // Special case for short binaries (up to about 16kB):
+ // store them directly as small- or medium-sized value records
+ if (n < Segment.MEDIUM_LIMIT) {
synchronized (this) {
- RecordId id = prepare(1 + headLength);
- buffer[position++] = (byte) headLength;
- System.arraycopy(head, 0, buffer, position, headLength);
- position += headLength;
- return id;
- }
- } else if (headLength < Segment.MEDIUM_LIMIT) {
- synchronized (this) {
- RecordId id = prepare(2 + headLength);
- int len = (headLength - Segment.SMALL_LIMIT) | 0x8000;
- buffer[position++] = (byte) (len >> 8);
- buffer[position++] = (byte) len;
- System.arraycopy(head, 0, buffer, position, headLength);
- position += headLength;
+ RecordId id;
+ if (n < Segment.SMALL_LIMIT) {
+ id = prepare(1 + n);
+ buffer[position++] = (byte) n;
+ } else {
+ id = prepare(2 + n);
+ int len = (n - Segment.SMALL_LIMIT) | 0x8000;
+ buffer[position++] = (byte) (len >> 8);
+ buffer[position++] = (byte) len;
+ }
+ System.arraycopy(data, 0, buffer, position, n);
+ position += n;
return id;
}
- } else {
- // If the stream filled the full head buffer, it's likely
- // that the bulk of the data is still to come. Read it
- // in larger chunks and save in separate segments.
-
- long length = 0;
- List<RecordId> blockIds = new ArrayList<RecordId>();
-
- byte[] bulk = new byte[Segment.MAX_SEGMENT_SIZE];
- System.arraycopy(head, 0, bulk, 0, headLength);
- int bulkLength = headLength + ByteStreams.read(
- stream, bulk, headLength, bulk.length - headLength);
- while (bulkLength > 0) {
- UUID segmentId = UUID.randomUUID();
- int align = Segment.RECORD_ALIGN_BYTES - 1;
- int bulkAlignLength = (bulkLength + align) & ~align;
- store.writeSegment(
- segmentId, Collections.<UUID>emptyList(),
- bulk, 0, bulkAlignLength);
- for (int pos = Segment.MAX_SEGMENT_SIZE - bulkAlignLength;
- pos < Segment.MAX_SEGMENT_SIZE;
- pos += BLOCK_SIZE) {
- blockIds.add(new RecordId(segmentId, pos));
- }
- length += bulkLength;
- bulkLength = ByteStreams.read(stream, bulk, 0, bulk.length);
+ }
+
+ long length = n;
+ List<RecordId> blockIds = newArrayListWithExpectedSize(n / 4096);
+
+ // Write full bulk segments
+ while (n == buffer.length) {
+ UUID id = UUID.randomUUID();
+ store.writeSegment(id, data, 0, data.length);
+
+ for (int i = 0; i < data.length; i += BLOCK_SIZE) {
+ blockIds.add(new RecordId(id, i));
}
- return writeValueRecord(length, writeList(blockIds));
+ n = ByteStreams.read(stream, data, 0, data.length);
+ length += n;
+ }
+
+
+ // Inline the remaining blocks in the current segments
+ for (int p = 0; p < n; p += BLOCK_SIZE) {
+ int size = Math.min(n - p, BLOCK_SIZE);
+ synchronized (this) {
+ blockIds.add(prepare(size));
+ System.arraycopy(data, p, buffer, position, size);
+ }
}
+
+ return writeValueRecord(length, writeList(blockIds));
}
private RecordId writeProperty(
Modified: jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/segment/file/FileStore.java
URL: http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/segment/file/FileStore.java?rev=1539450&r1=1539449&r2=1539450&view=diff
==============================================================================
--- jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/segment/file/FileStore.java (original)
+++ jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/segment/file/FileStore.java Wed Nov 6 20:08:43 2013
@@ -26,7 +26,6 @@ import static org.apache.jackrabbit.oak.
import java.io.File;
import java.io.IOException;
import java.nio.ByteBuffer;
-import java.util.Collection;
import java.util.LinkedList;
import java.util.Map;
import java.util.UUID;
@@ -139,24 +138,7 @@ public class FileStore extends AbstractS
@Override
public synchronized void writeSegment(
- UUID segmentId, Collection<UUID> referencedSegmentIds,
- byte[] data, int offset, int length) {
- if (!referencedSegmentIds.isEmpty()) {
- int size = 16 * referencedSegmentIds.size() + length;
- ByteBuffer buffer = ByteBuffer.allocate(size);
-
- for (UUID referencedSegmentId : referencedSegmentIds) {
- buffer.putLong(referencedSegmentId.getMostSignificantBits());
- buffer.putLong(referencedSegmentId.getLeastSignificantBits());
- }
-
- buffer.put(data, offset, length);
-
- data = buffer.array();
- offset = 0;
- length = size;
- }
-
+ UUID segmentId, byte[] data, int offset, int length) {
try {
writeEntry(segmentId, data, offset, length);
} catch (IOException e) {
Modified: jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/segment/memory/MemoryStore.java
URL: http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/segment/memory/MemoryStore.java?rev=1539450&r1=1539449&r2=1539450&view=diff
==============================================================================
--- jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/segment/memory/MemoryStore.java (original)
+++ jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/segment/memory/MemoryStore.java Wed Nov 6 20:08:43 2013
@@ -18,7 +18,7 @@ package org.apache.jackrabbit.oak.plugin
import static org.apache.jackrabbit.oak.plugins.memory.EmptyNodeState.EMPTY_NODE;
-import java.util.Collection;
+import java.nio.ByteBuffer;
import java.util.Map;
import java.util.UUID;
import java.util.concurrent.ConcurrentMap;
@@ -75,10 +75,11 @@ public class MemoryStore extends Abstrac
@Override
public void writeSegment(
- UUID segmentId, Collection<UUID> referencedSegmentIds,
- byte[] data, int offset, int length) {
- Segment segment = new Segment(
- this, segmentId, referencedSegmentIds, data, offset, length);
+ UUID segmentId, byte[] data, int offset, int length) {
+ ByteBuffer buffer = ByteBuffer.allocate(length);
+ buffer.put(data, offset, length);
+ buffer.rewind();
+ Segment segment = new Segment(this, segmentId, buffer);
if (segments.putIfAbsent(segment.getSegmentId(), segment) != null) {
throw new IllegalStateException(
"Segment override: " + segment.getSegmentId());
Modified: jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/segment/mongo/MongoStore.java
URL: http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/segment/mongo/MongoStore.java?rev=1539450&r1=1539449&r2=1539450&view=diff
==============================================================================
--- jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/segment/mongo/MongoStore.java (original)
+++ jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/segment/mongo/MongoStore.java Wed Nov 6 20:08:43 2013
@@ -22,8 +22,7 @@ import static com.mongodb.ReadPreference
import static com.mongodb.ReadPreference.primary;
import static org.apache.jackrabbit.oak.plugins.memory.EmptyNodeState.EMPTY_NODE;
-import java.util.Collection;
-import java.util.List;
+import java.nio.ByteBuffer;
import java.util.Map;
import java.util.UUID;
@@ -32,7 +31,6 @@ import org.apache.jackrabbit.oak.plugins
import org.apache.jackrabbit.oak.plugins.segment.Segment;
import org.apache.jackrabbit.oak.spi.state.NodeBuilder;
-import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
import com.mongodb.BasicDBObject;
import com.mongodb.DB;
@@ -83,17 +81,20 @@ public class MongoStore extends Abstract
@Override
public void writeSegment(
- UUID segmentId, Collection<UUID> referencedSegmentIds,
- byte[] data, int offset, int length) {
+ UUID segmentId, byte[] data, int offset, int length) {
byte[] d = new byte[length];
System.arraycopy(data, offset, d, 0, length);
- insertSegment(segmentId, d, referencedSegmentIds);
+
+ BasicDBObject segment = new BasicDBObject();
+ segment.put("_id", segmentId.toString());
+ segment.put("data", data);
+ segments.insert(segment, concern);
}
@Override
protected Segment loadSegment(UUID segmentId) {
DBObject id = new BasicDBObject("_id", segmentId.toString());
- DBObject fields = new BasicDBObject(of("data", 1, "uuids", 1));
+ DBObject fields = new BasicDBObject(of("data", 1));
DBObject segment = segments.findOne(id, fields, nearest());
if (segment == null) {
@@ -105,26 +106,7 @@ public class MongoStore extends Abstract
}
byte[] data = (byte[]) segment.get("data");
- List<?> list = (List<?>) segment.get("uuids");
- List<UUID> uuids = Lists.newArrayListWithCapacity(list.size());
- for (Object object : list) {
- uuids.add(UUID.fromString(object.toString()));
- }
- return new Segment(this, segmentId, uuids, data, 0, data.length);
- }
-
- private void insertSegment(
- UUID segmentId, byte[] data, Collection<UUID> uuids) {
- List<String> list = Lists.newArrayListWithCapacity(uuids.size());
- for (UUID uuid : uuids) {
- list.add(uuid.toString());
- }
-
- BasicDBObject segment = new BasicDBObject();
- segment.put("_id", segmentId.toString());
- segment.put("data", data);
- segment.put("uuids", list);
- segments.insert(segment, concern);
+ return new Segment(this, segmentId, ByteBuffer.wrap(data));
}
@Override