You are viewing a plain text version of this content. The canonical link for it is here.
Posted to oak-commits@jackrabbit.apache.org by ju...@apache.org on 2013/02/06 18:06:19 UTC
svn commit: r1443071 - in /jackrabbit/oak/trunk/oak-core/src:
main/java/org/apache/jackrabbit/oak/plugins/segment/
test/java/org/apache/jackrabbit/oak/plugins/segment/
Author: jukka
Date: Wed Feb 6 17:06:18 2013
New Revision: 1443071
URL: http://svn.apache.org/viewvc?rev=1443071&view=rev
Log:
OAK-593: Segment-based MK
Implement basic record types and a simple in-memory segment store for testing
Added:
jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/segment/BlockRecord.java (with props)
jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/segment/ListRecord.java (with props)
jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/segment/MemoryStore.java (with props)
jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/segment/Record.java
- copied, changed from r1443031, jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/segment/SegmentStore.java
jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/segment/Segment.java (with props)
jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/segment/SegmentStream.java (with props)
jackrabbit/oak/trunk/oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/segment/
jackrabbit/oak/trunk/oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/segment/RecordTest.java (with props)
Modified:
jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/segment/CacheStore.java
jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/segment/RecordId.java
jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/segment/SegmentReader.java
jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/segment/SegmentStore.java
jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/segment/SegmentWriter.java
Added: jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/segment/BlockRecord.java
URL: http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/segment/BlockRecord.java?rev=1443071&view=auto
==============================================================================
--- jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/segment/BlockRecord.java (added)
+++ jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/segment/BlockRecord.java Wed Feb 6 17:06:18 2013
@@ -0,0 +1,58 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.jackrabbit.oak.plugins.segment;
+
+import static com.google.common.base.Preconditions.checkElementIndex;
+import static com.google.common.base.Preconditions.checkNotNull;
+import static com.google.common.base.Preconditions.checkPositionIndexes;
+
+class BlockRecord extends Record {
+
+ private final int size;
+
+ BlockRecord(SegmentReader reader, RecordId id, int size) {
+ super(reader, id);
+ this.size = size;
+ }
+
+ /**
+ * Reads bytes from this block. Up to the given number of bytes are
+ * read starting from the given position within this block. The number
+ * of bytes read is returned.
+ *
+ * @param position position within this block
+ * @param buffer target buffer
+ * @param offset offset within the target buffer
+ * @param length maximum number of bytes to read
+ * @return number of bytes that could be read
+ */
+ public int read(int position, byte[] buffer, int offset, int length) {
+ checkElementIndex(position, size);
+ checkNotNull(buffer);
+ checkPositionIndexes(offset, offset + length, buffer.length);
+
+ if (position + length > size) {
+ length = size - position;
+ }
+ if (length > 0) {
+ getReader().readBytes(
+ getRecordId(), position, buffer, offset, length);
+ }
+ return length;
+ }
+
+}
Propchange: jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/segment/BlockRecord.java
------------------------------------------------------------------------------
svn:eol-style = native
Modified: jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/segment/CacheStore.java
URL: http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/segment/CacheStore.java?rev=1443071&r1=1443070&r2=1443071&view=diff
==============================================================================
--- jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/segment/CacheStore.java (original)
+++ jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/segment/CacheStore.java Wed Feb 6 17:06:18 2013
@@ -1,3 +1,19 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
package org.apache.jackrabbit.oak.plugins.segment;
import java.util.UUID;
@@ -6,43 +22,33 @@ import java.util.concurrent.ExecutionExc
import com.google.common.cache.CacheBuilder;
import com.google.common.cache.CacheLoader;
import com.google.common.cache.LoadingCache;
-import com.google.common.cache.Weigher;
public class CacheStore implements SegmentStore {
private final SegmentStore store;
- private final LoadingCache<UUID, byte[]> cache;
+ private final LoadingCache<UUID, Segment> cache;
public CacheStore(final SegmentStore store, long cacheSize) {
this.store = store;
this.cache = CacheBuilder.newBuilder()
.maximumWeight(cacheSize)
- .weigher(new Weigher<UUID, byte[]>() {
+ .weigher(Segment.weigher())
+ .build(new CacheLoader<UUID, Segment>() {
@Override
- public int weigh(UUID key, byte[] value) {
- return value.length;
- }
- }).build(new CacheLoader<UUID, byte[]>() {
- @Override
- public byte[] load(UUID key) throws Exception {
+ public Segment load(UUID key) throws Exception {
return store.readSegment(key);
}
});
}
@Override
- public RecordId getHead(String journal) {
- return store.getHead(journal);
- }
-
- @Override
- public boolean updateHead(String journal, RecordId base, RecordId head) {
- return store.updateHead(journal, base, head);
+ public int getMaxSegmentSize() {
+ return store.getMaxSegmentSize();
}
@Override
- public byte[] readSegment(UUID segmentId) {
+ public Segment readSegment(UUID segmentId) {
try {
return cache.get(segmentId);
} catch (ExecutionException e) {
@@ -52,6 +58,12 @@ public class CacheStore implements Segme
}
@Override
+ public void createSegment(Segment segment) {
+ store.createSegment(segment);
+ cache.put(segment.getSegmentId(), segment);
+ }
+
+ @Override
public void createSegment(
UUID segmentId, byte[] data, int offset, int length) {
store.createSegment(segmentId, data, offset, length);
@@ -60,6 +72,6 @@ public class CacheStore implements Segme
@Override
public void deleteSegment(UUID segmentId) {
store.deleteSegment(segmentId);
+ cache.invalidate(segmentId);
}
-
}
Added: jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/segment/ListRecord.java
URL: http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/segment/ListRecord.java?rev=1443071&view=auto
==============================================================================
--- jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/segment/ListRecord.java (added)
+++ jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/segment/ListRecord.java Wed Feb 6 17:06:18 2013
@@ -0,0 +1,61 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.jackrabbit.oak.plugins.segment;
+
+import static com.google.common.base.Preconditions.checkArgument;
+import static com.google.common.base.Preconditions.checkElementIndex;
+
+class ListRecord extends Record {
+
+ static final int LEVEL_SIZE = 1 << 8; // 256
+
+ private final int size;
+
+ private final int bucketSize;
+
+ ListRecord(SegmentReader reader, RecordId id, int size) {
+ super(reader, id);
+ checkArgument(size >= 0);
+ this.size = size;
+
+ int bs = 1;
+ while (bs * LEVEL_SIZE < size) {
+ bs *= LEVEL_SIZE;
+ }
+ this.bucketSize = bs;
+ }
+
+ public int size() {
+ return size;
+ }
+
+ public RecordId getEntry(int index) {
+ checkElementIndex(index, size);
+
+ if (size == 1) {
+ return getRecordId();
+ } else {
+ int bucketIndex = index / bucketSize;
+ int bucketOffset = index % bucketSize;
+ RecordId bucketId = readRecordId(bucketIndex * 4);
+ ListRecord bucket =
+ new ListRecord(getReader(), bucketId, bucketSize);
+ return bucket.getEntry(bucketOffset);
+ }
+ }
+
+}
Propchange: jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/segment/ListRecord.java
------------------------------------------------------------------------------
svn:eol-style = native
Added: jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/segment/MemoryStore.java
URL: http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/segment/MemoryStore.java?rev=1443071&view=auto
==============================================================================
--- jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/segment/MemoryStore.java (added)
+++ jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/segment/MemoryStore.java Wed Feb 6 17:06:18 2013
@@ -0,0 +1,69 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.jackrabbit.oak.plugins.segment;
+
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.UUID;
+
+public class MemoryStore implements SegmentStore {
+
+ private static final int MAX_SEGMENT_SIZE = 1 << 20; // 1MB
+
+ private final Map<UUID, Segment> segments =
+ Collections.synchronizedMap(new HashMap<UUID, Segment>());
+
+ @Override
+ public int getMaxSegmentSize() {
+ return MAX_SEGMENT_SIZE;
+ }
+
+ @Override
+ public Segment readSegment(UUID segmentId) {
+ Segment segment = segments.get(segmentId);
+ if (segment != null) {
+ return segment;
+ } else {
+ throw new IllegalStateException("Segment not found: " + segmentId);
+ }
+ }
+
+ @Override
+ public void createSegment(Segment segment) {
+ if (segments.put(segment.getSegmentId(), segment) != null) {
+ throw new IllegalStateException(
+ "Segment override: " + segment.getSegmentId());
+ }
+ }
+
+ @Override
+ public void createSegment(
+ UUID segmentId, byte[] data, int offset, int length) {
+ byte[] segment = new byte[length];
+ System.arraycopy(data, offset, segment, 0, length);
+ createSegment(new Segment(segmentId, segment, new UUID[0]));
+ }
+
+ @Override
+ public void deleteSegment(UUID segmentId) {
+ if (segments.remove(segmentId) == null) {
+ throw new IllegalStateException("Missing segment: " + segmentId);
+ }
+ }
+
+}
Propchange: jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/segment/MemoryStore.java
------------------------------------------------------------------------------
svn:eol-style = native
Copied: jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/segment/Record.java (from r1443031, jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/segment/SegmentStore.java)
URL: http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/segment/Record.java?p2=jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/segment/Record.java&p1=jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/segment/SegmentStore.java&r1=1443031&r2=1443071&rev=1443071&view=diff
==============================================================================
--- jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/segment/SegmentStore.java (original)
+++ jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/segment/Record.java Wed Feb 6 17:06:18 2013
@@ -1,33 +1,42 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.jackrabbit.oak.plugins.segment;
-
-import java.util.UUID;
-
-public interface SegmentStore {
-
- RecordId getHead(String journal);
-
- boolean updateHead(String journal, RecordId base, RecordId head);
-
- byte[] readSegment(UUID segmentId);
-
- void createSegment(UUID segmentId, byte[] data, int offset, int length);
-
- void deleteSegment(UUID segmentId);
-
-}
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.jackrabbit.oak.plugins.segment;
+
+class Record {
+
+ private final SegmentReader reader;
+
+ private final RecordId id;
+
+ protected Record(SegmentReader reader, RecordId id) {
+ this.reader = reader;
+ this.id = id;
+ }
+
+ protected SegmentReader getReader() {
+ return reader;
+ }
+
+ protected RecordId readRecordId(int position) {
+ return reader.readRecordId(id, position);
+ }
+
+ public RecordId getRecordId() {
+ return id;
+ }
+
+}
Modified: jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/segment/RecordId.java
URL: http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/segment/RecordId.java?rev=1443071&r1=1443070&r2=1443071&view=diff
==============================================================================
--- jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/segment/RecordId.java (original)
+++ jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/segment/RecordId.java Wed Feb 6 17:06:18 2013
@@ -22,6 +22,8 @@ import com.google.common.base.Preconditi
public final class RecordId {
+ public static RecordId[] EMPTY_ARRAY = new RecordId[0];
+
private final UUID segmentId;
private final int offset;
Added: jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/segment/Segment.java
URL: http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/segment/Segment.java?rev=1443071&view=auto
==============================================================================
--- jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/segment/Segment.java (added)
+++ jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/segment/Segment.java Wed Feb 6 17:06:18 2013
@@ -0,0 +1,92 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.jackrabbit.oak.plugins.segment;
+
+import static com.google.common.base.Preconditions.checkNotNull;
+import static com.google.common.base.Preconditions.checkPositionIndexes;
+
+import java.nio.ByteBuffer;
+import java.util.UUID;
+
+import com.google.common.cache.Weigher;
+
+class Segment {
+
+ static final int SMALL_LIMIT = 1 << 7;
+
+ static final int MEDIUM_LIMIT = 1 << 14 + SMALL_LIMIT;
+
+ private final UUID uuid;
+
+ private final byte[] data;
+
+ private final UUID[] uuids;
+
+ Segment(UUID uuid, byte[] data, UUID[] uuids) {
+ this.uuid = uuid;
+ this.data = data;
+ this.uuids = uuids;
+ }
+
+ public UUID getSegmentId() {
+ return uuid;
+ }
+
+ public int size() {
+ return data.length;
+ }
+
+ /**
+ * Reads the given number of bytes starting from the given position
+ * in this segment.
+ *
+ * @param position position within segment
+ * @param buffer target buffer
+ * @param offset offset within target buffer
+ * @param length number of bytes to read
+ */
+ public void readBytes(int position, byte[] buffer, int offset, int length) {
+ checkPositionIndexes(position, position + length, data.length);
+ checkNotNull(buffer);
+ checkPositionIndexes(offset, offset + length, buffer.length);
+
+ System.arraycopy(data, position, buffer, offset, length);
+ }
+
+ RecordId readRecordId(int offset) {
+ return new RecordId(
+ uuids[data[offset] & 0xff],
+ (data[offset + 1] & 0xff) << 16
+ | (data[offset + 2] & 0xff) << 8
+ | (data[offset + 3] & 0xff));
+ }
+
+ public long readLength(int offset) {
+ checkPositionIndexes(offset, offset + 8, data.length);
+ return ByteBuffer.wrap(data).getLong(offset);
+ }
+
+ public static Weigher<UUID, Segment> weigher() {
+ return new Weigher<UUID, Segment>() {
+ @Override
+ public int weigh(UUID key, Segment value) {
+ return value.size();
+ }
+ };
+ }
+
+}
Propchange: jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/segment/Segment.java
------------------------------------------------------------------------------
svn:eol-style = native
Modified: jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/segment/SegmentReader.java
URL: http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/segment/SegmentReader.java?rev=1443071&r1=1443070&r2=1443071&view=diff
==============================================================================
--- jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/segment/SegmentReader.java (original)
+++ jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/segment/SegmentReader.java Wed Feb 6 17:06:18 2013
@@ -16,14 +16,89 @@
*/
package org.apache.jackrabbit.oak.plugins.segment;
-import java.io.InputStream;
-
-public interface SegmentReader {
-
- String readString(RecordId recordId);
-
- InputStream readStream(RecordId recordId);
-
- void readBytes(RecordId recordId, byte[] bytes, int offset, int length);
+import static com.google.common.base.Preconditions.checkArgument;
+import static com.google.common.base.Preconditions.checkNotNull;
+import static com.google.common.base.Preconditions.checkPositionIndexes;
+import static org.apache.jackrabbit.oak.plugins.segment.SegmentWriter.BLOCK_SIZE;
+
+import java.io.IOException;
+
+import com.google.common.base.Charsets;
+import com.google.common.io.ByteStreams;
+
+public class SegmentReader {
+
+ private final SegmentStore store;
+
+ public SegmentReader(SegmentStore store) {
+ this.store = store;
+ }
+
+ public String readString(RecordId recordId) {
+ SegmentStream stream = readStream(recordId);
+ try {
+ if (stream.getLength() > Integer.MAX_VALUE) {
+ throw new IllegalStateException(
+ "Too long value: " + stream.getLength());
+ }
+ byte[] data = new byte[(int) stream.getLength()];
+ ByteStreams.readFully(stream, data);
+ return new String(data, Charsets.UTF_8);
+ } catch (IOException e) {
+ throw new IllegalStateException("Unexpected IOException", e);
+ } finally {
+ stream.close();
+ }
+ }
+
+ public SegmentStream readStream(RecordId recordId) {
+ Segment segment = store.readSegment(recordId.getSegmentId());
+ int offset = recordId.getOffset();
+ long length = segment.readLength(offset);
+ int size = (int) ((length + BLOCK_SIZE - 1) / BLOCK_SIZE);
+ ListRecord list = new ListRecord(
+ this, segment.readRecordId(offset + 8), size);
+ return new SegmentStream(this, list, length);
+ }
+
+ public void readBytes(
+ RecordId recordId, int position,
+ byte[] buffer, int offset, int length) {
+ checkNotNull(recordId);
+ checkArgument(position >= 0);
+ checkNotNull(buffer);
+ checkPositionIndexes(offset, offset + length, buffer.length);
+
+ Segment segment = store.readSegment(recordId.getSegmentId());
+ segment.readBytes(
+ recordId.getOffset() + position, buffer, offset, length);
+ }
+
+ public RecordId readRecordId(RecordId recordId, int position) {
+ checkNotNull(recordId);
+ checkArgument(position >= 0);
+
+ Segment segment = store.readSegment(recordId.getSegmentId());
+ return segment.readRecordId(recordId.getOffset() + position);
+ }
+
+ public ListRecord readList(RecordId recordId, int numberOfEntries) {
+ checkNotNull(recordId);
+ checkArgument(numberOfEntries >= 0);
+
+ if (numberOfEntries > 0) {
+ Segment segment = store.readSegment(recordId.getSegmentId());
+ RecordId id = segment.readRecordId(recordId.getOffset());
+ return new ListRecord(this, id, numberOfEntries);
+ } else {
+ return new ListRecord(this, recordId, numberOfEntries);
+ }
+ }
+
+ public BlockRecord readBlock(RecordId recordId, int size) {
+ checkNotNull(recordId);
+ checkArgument(size > 0);
+ return new BlockRecord(this, recordId, size);
+ }
}
Modified: jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/segment/SegmentStore.java
URL: http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/segment/SegmentStore.java?rev=1443071&r1=1443070&r2=1443071&view=diff
==============================================================================
--- jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/segment/SegmentStore.java (original)
+++ jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/segment/SegmentStore.java Wed Feb 6 17:06:18 2013
@@ -20,13 +20,13 @@ import java.util.UUID;
public interface SegmentStore {
- RecordId getHead(String journal);
+ int getMaxSegmentSize();
- boolean updateHead(String journal, RecordId base, RecordId head);
+ Segment readSegment(UUID segmentId);
- byte[] readSegment(UUID segmentId);
+ void createSegment(Segment segment);
- void createSegment(UUID segmentId, byte[] data, int offset, int length);
+ void createSegment(UUID segmentId, byte[] bytes, int offset, int len);
void deleteSegment(UUID segmentId);
Added: jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/segment/SegmentStream.java
URL: http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/segment/SegmentStream.java?rev=1443071&view=auto
==============================================================================
--- jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/segment/SegmentStream.java (added)
+++ jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/segment/SegmentStream.java Wed Feb 6 17:06:18 2013
@@ -0,0 +1,116 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.jackrabbit.oak.plugins.segment;
+
+import static com.google.common.base.Preconditions.checkNotNull;
+import static com.google.common.base.Preconditions.checkPositionIndexes;
+import static org.apache.jackrabbit.oak.plugins.segment.SegmentWriter.BLOCK_SIZE;
+
+import java.io.IOException;
+import java.io.InputStream;
+
+public class SegmentStream extends InputStream {
+
+ private final SegmentReader reader;
+
+ private final ListRecord blocks;
+
+ private final long length;
+
+ private long position = 0;
+
+ private long mark = 0;
+
+ SegmentStream(SegmentReader reader, ListRecord blocks, long length) {
+ this.reader = reader;
+ this.blocks = blocks;
+ this.length = length;
+ }
+
+ public long getLength() {
+ return length;
+ }
+
+ @Override
+ public boolean markSupported() {
+ return true;
+ }
+
+ @Override
+ public synchronized void mark(int readlimit) {
+ mark = position;
+ }
+
+ @Override
+ public synchronized void reset() {
+ position = mark;
+ }
+
+ @Override
+ public int read() throws IOException {
+ byte[] b = new byte[1];
+ if (read(b) != -1) {
+ return b[0] & 0xff;
+ } else {
+ return -1;
+ }
+ }
+
+ @Override
+ public int read(byte[] b, int off, int len) throws IOException {
+ checkNotNull(b);
+ checkPositionIndexes(off, off + len, b.length);
+ if (len == 0) {
+ return 0;
+ } else if (position == length) {
+ return -1;
+ } else {
+ int blockIndex = (int) (position / SegmentWriter.BLOCK_SIZE);
+ int blockOffset = (int) (position % SegmentWriter.BLOCK_SIZE);
+
+ if (blockOffset + len > SegmentWriter.BLOCK_SIZE) {
+ len = SegmentWriter.BLOCK_SIZE - blockOffset;
+ }
+ if (position + len > length) {
+ len = (int) (length - position);
+ }
+
+ BlockRecord block =
+ reader.readBlock(blocks.getEntry(blockIndex), BLOCK_SIZE);
+ len = block.read(blockOffset, b, off, len);
+ position += len;
+ return len;
+ }
+ }
+
+ @Override
+ public long skip(long n) {
+ if (position + n > length) {
+ n = length - position;
+ } else if (position + n < 0) {
+ n = -position;
+ }
+ position += n;
+ return n;
+ }
+
+ @Override
+ public void close() {
+ position = length;
+ }
+
+}
Propchange: jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/segment/SegmentStream.java
------------------------------------------------------------------------------
svn:eol-style = native
Modified: jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/segment/SegmentWriter.java
URL: http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/segment/SegmentWriter.java?rev=1443071&r1=1443070&r2=1443071&view=diff
==============================================================================
--- jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/segment/SegmentWriter.java (original)
+++ jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/segment/SegmentWriter.java Wed Feb 6 17:06:18 2013
@@ -16,15 +16,203 @@
*/
package org.apache.jackrabbit.oak.plugins.segment;
+import static com.google.common.base.Preconditions.checkNotNull;
+import static com.google.common.base.Preconditions.checkPositionIndexes;
+
import java.io.IOException;
import java.io.InputStream;
-
-public interface SegmentWriter {
-
- RecordId writeString(String string);
-
- RecordId writeStream(InputStream stream) throws IOException;
-
- RecordId writeBytes(byte[] bytes, int offset, int length);
+import java.nio.ByteBuffer;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Set;
+import java.util.UUID;
+
+import com.google.common.base.Charsets;
+import com.google.common.collect.Lists;
+
+public class SegmentWriter {
+
+ private static final int INITIAL_BUFFER_SIZE = 1 << 12; // 4kB
+
+ static final int BLOCK_SIZE = 1 << 12; // 4kB
+
+ static final int INLINE_LIMIT = 16 * BLOCK_SIZE; // 64kB
+
+ private final SegmentStore store;
+
+ private final int blocksPerSegment;
+
+ private final int blockSegmentSize;
+
+ private UUID uuid = UUID.randomUUID();
+
+ private List<UUID> uuids = new ArrayList<UUID>(255);
+
+ private ByteBuffer buffer = ByteBuffer.allocate(INITIAL_BUFFER_SIZE);
+
+ public SegmentWriter(SegmentStore store) {
+ this.store = store;
+ this.blocksPerSegment = store.getMaxSegmentSize() / BLOCK_SIZE;
+ this.blockSegmentSize = blocksPerSegment * BLOCK_SIZE;
+ }
+
+ public void flush() {
+ if (buffer.position() > 0) {
+ byte[] data = new byte[buffer.position()];
+ buffer.flip();
+ buffer.get(data);
+
+ store.createSegment(new Segment(
+ uuid, data, uuids.toArray(new UUID[0])));
+
+ uuid = UUID.randomUUID();
+ uuids.clear();
+ buffer.clear();
+ }
+ }
+
+ private RecordId prepare(int size) {
+ return prepare(size, Collections.<RecordId>emptyList());
+ }
+
+ private synchronized RecordId prepare(int size, Collection<RecordId> ids) {
+ Set<UUID> segmentIds = new HashSet<UUID>();
+ for (RecordId id : ids) {
+ UUID segmentId = id.getSegmentId();
+ if (!uuid.equals(segmentId) && !uuids.contains(segmentId)) {
+ segmentIds.add(segmentId);
+ }
+ }
+
+ int fullSize = size + 4 * ids.size();
+ if (buffer.position() + fullSize > store.getMaxSegmentSize()) {
+ flush();
+ }
+ if (fullSize > buffer.remaining()) {
+ int n = Math.min(buffer.capacity() * 2, store.getMaxSegmentSize());
+ ByteBuffer newBuffer = ByteBuffer.allocate(n);
+ buffer.flip();
+ newBuffer.put(buffer);
+ buffer = newBuffer;
+ }
+ return new RecordId(uuid, buffer.position());
+ }
+
+ /**
+ * Writes a block record containing the given block of bytes.
+ *
+ * @param bytes source buffer
+ * @param offset offset within the source buffer
+ * @param length number of bytes to write
+ * @return block record identifier
+ */
+ public synchronized RecordId writeBlock(
+ byte[] bytes, int offset, int length) {
+ checkNotNull(bytes);
+ checkPositionIndexes(offset, offset + length, bytes.length);
+
+ RecordId blockId = prepare(length);
+ buffer.put(bytes, offset, length);
+ return blockId;
+ }
+
+ /**
+ * Writes a list record containing the given list of record identifiers.
+ *
+ * @param ids list of record identifiers
+ * @return list record identifier
+ */
+ public synchronized RecordId writeList(List<RecordId> ids) {
+ checkNotNull(ids);
+
+ int size = ids.size();
+ if (size == 0) {
+ return prepare(0);
+ } else if (size == 1) {
+ return ids.iterator().next();
+ } else {
+ List<RecordId> thisLevel = ids;
+ do {
+ List<RecordId> nextLevel = new ArrayList<RecordId>();
+ for (List<RecordId> bucket :
+ Lists.partition(thisLevel, ListRecord.LEVEL_SIZE)) {
+ RecordId bucketId = prepare(0, bucket);
+ for (RecordId id : bucket) {
+ writeRecordId(id);
+ }
+ nextLevel.add(bucketId);
+ }
+ thisLevel = nextLevel;
+ } while (thisLevel.size() > 1);
+ return thisLevel.iterator().next();
+ }
+ }
+
+ /**
+ * Writes a value record containing the given sequence of bytes.
+ *
+ * @param bytes source buffer
+ * @param offset offset within the source buffer
+ * @param length number of bytes in the value
+ * @return value record identifier
+ */
+ public synchronized RecordId writeValue(
+ byte[] bytes, int offset, int length) {
+ List<RecordId> blockIds = new ArrayList<RecordId>();
+ int remaining = length;
+
+ // First create block segments for a large bulk of the value
+ while (remaining > INLINE_LIMIT) {
+ int len = Math.min(remaining, blockSegmentSize);
+ UUID segmentId = UUID.randomUUID();
+ store.createSegment(segmentId, bytes, offset, len);
+ for (int position = 0; position < len; position += BLOCK_SIZE) {
+ blockIds.add(new RecordId(segmentId, position));
+ }
+ offset += len;
+ remaining -= len;
+ }
+
+ // Then inline any remaining full blocks
+ while (remaining > BLOCK_SIZE) {
+ blockIds.add(writeBlock(bytes, offset, BLOCK_SIZE));
+ offset += BLOCK_SIZE;
+ remaining -= BLOCK_SIZE;
+ }
+
+ // Finally add the last partial block (if any)
+ if (remaining > 0) {
+ blockIds.add(writeBlock(bytes, offset, remaining));
+ }
+
+ // Store the list of blocks along with the length of the value
+ RecordId listId = writeList(blockIds);
+ RecordId valueId = prepare(8, Collections.singleton(listId));
+ buffer.putLong(length);
+ writeRecordId(listId);
+ return valueId;
+ }
+
+ public RecordId writeString(String string) {
+ byte[] data = string.getBytes(Charsets.UTF_8);
+ return writeValue(data, 0, data.length);
+ }
+
+ public RecordId writeStream(InputStream stream) throws IOException {
+ throw new UnsupportedOperationException(); // TODO
+ }
+
+ private void writeRecordId(RecordId id) {
+ UUID segmentId = id.getSegmentId();
+ int index = uuids.indexOf(segmentId);
+ if (index == -1) {
+ index = uuids.size();
+ uuids.add(segmentId);
+ }
+ buffer.putInt(index << 24 | id.getOffset());
+ }
}
Added: jackrabbit/oak/trunk/oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/segment/RecordTest.java
URL: http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/segment/RecordTest.java?rev=1443071&view=auto
==============================================================================
--- jackrabbit/oak/trunk/oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/segment/RecordTest.java (added)
+++ jackrabbit/oak/trunk/oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/segment/RecordTest.java Wed Feb 6 17:06:18 2013
@@ -0,0 +1,160 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.jackrabbit.oak.plugins.segment;
+
+import static org.apache.jackrabbit.oak.plugins.segment.ListRecord.LEVEL_SIZE;
+import static org.junit.Assert.assertEquals;
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.List;
+import java.util.Random;
+
+import org.junit.Test;
+
+import com.google.common.base.Charsets;
+
+public class RecordTest {
+
+ private String hello = "Hello, World!";
+
+ private byte[] bytes = hello.getBytes(Charsets.UTF_8);
+
+ private SegmentStore store = new MemoryStore();
+
+ private SegmentWriter writer = new SegmentWriter(store);
+
+ private SegmentReader reader = new SegmentReader(store);
+
+ private final Random random = new Random(0xcafefaceL);
+
+ @Test
+ public void testBlockRecord() {
+ RecordId blockId = writer.writeBlock(bytes, 0, bytes.length);
+ writer.flush();
+ BlockRecord block = new BlockRecord(reader, blockId, bytes.length);
+
+ // Check reading with all valid positions and lengths
+ for (int n = 1; n < bytes.length; n++) {
+ for (int i = 0; i + n <= bytes.length; i++) {
+ Arrays.fill(bytes, i, i + n, (byte) '.');
+ assertEquals(n, block.read(i, bytes, i, n));
+ assertEquals(hello, new String(bytes, Charsets.UTF_8));
+ }
+ }
+
+ // Check reading with a too long length
+ byte[] large = new byte[bytes.length * 2];
+ assertEquals(bytes.length, block.read(0, large, 0, large.length));
+ assertEquals(hello, new String(large, 0, bytes.length, Charsets.UTF_8));
+ }
+
+ @Test
+ public void testListRecord() {
+ RecordId blockId = writer.writeBlock(bytes, 0, bytes.length);
+
+ ListRecord zero = writeList(0, blockId);
+ ListRecord one = writeList(1, blockId);
+ ListRecord level1 = writeList(LEVEL_SIZE, blockId);
+ ListRecord level1p = writeList(LEVEL_SIZE + 1, blockId);
+ ListRecord level2 = writeList(LEVEL_SIZE * LEVEL_SIZE, blockId);
+ ListRecord level2p = writeList(LEVEL_SIZE * LEVEL_SIZE + 1, blockId);
+ writer.flush();
+
+ assertEquals(0, zero.size());
+ assertEquals(1, one.size());
+ assertEquals(blockId, one.getEntry(0));
+ assertEquals(LEVEL_SIZE, level1.size());
+ assertEquals(blockId, level1.getEntry(0));
+ assertEquals(blockId, level1.getEntry(LEVEL_SIZE - 1));
+ assertEquals(LEVEL_SIZE + 1, level1p.size());
+ assertEquals(blockId, level1p.getEntry(0));
+ assertEquals(blockId, level1p.getEntry(LEVEL_SIZE));
+ assertEquals(LEVEL_SIZE * LEVEL_SIZE, level2.size());
+ assertEquals(blockId, level2.getEntry(0));
+ assertEquals(blockId, level2.getEntry(LEVEL_SIZE * LEVEL_SIZE - 1));
+ assertEquals(LEVEL_SIZE * LEVEL_SIZE + 1, level2p.size());
+ assertEquals(blockId, level2p.getEntry(0));
+ assertEquals(blockId, level2p.getEntry(LEVEL_SIZE * LEVEL_SIZE));
+ }
+
+ private ListRecord writeList(int size, RecordId id) {
+ List<RecordId> list = Collections.nCopies(size, id);
+ return new ListRecord(reader, writer.writeList(list), size);
+ }
+
+ @Test
+ public void testStreamRecord() throws IOException {
+ checkRandomStreamRecord(0);
+ checkRandomStreamRecord(1);
+ checkRandomStreamRecord(SegmentWriter.BLOCK_SIZE);
+ checkRandomStreamRecord(SegmentWriter.BLOCK_SIZE + 1);
+ checkRandomStreamRecord(SegmentWriter.INLINE_LIMIT);
+ checkRandomStreamRecord(SegmentWriter.INLINE_LIMIT + 1);
+ checkRandomStreamRecord(store.getMaxSegmentSize());
+ checkRandomStreamRecord(store.getMaxSegmentSize() + 1);
+ checkRandomStreamRecord(store.getMaxSegmentSize() * 2);
+ checkRandomStreamRecord(store.getMaxSegmentSize() * 2 + 1);
+ }
+
+ private void checkRandomStreamRecord(int size) throws IOException {
+ byte[] source = new byte[size];
+ random.nextBytes(source);
+
+ RecordId valueId = writer.writeValue(source, 0, size);
+ writer.flush();
+
+ InputStream stream = reader.readStream(valueId);
+ try {
+ byte[] b = new byte[349]; // prime number
+ int offset = 0;
+ for (int n = stream.read(b); n != -1; n = stream.read(b)) {
+ for (int i = 0; i < n; i++) {
+ assertEquals(source[offset + i], b[i]);
+ }
+ offset += n;
+ }
+ assertEquals(offset, size);
+ assertEquals(-1, stream.read());
+ } finally {
+ stream.close();
+ }
+ }
+
+ @Test
+ public void testStringRecord() {
+ RecordId empty = writer.writeString("");
+ RecordId space = writer.writeString(" ");
+ RecordId hello = writer.writeString("Hello, World!");
+
+ StringBuilder builder = new StringBuilder();
+ for (int i = 0; i < 1000; i++) {
+ builder.append((char) ('0' + i % 10));
+ }
+ RecordId large = writer.writeString(builder.toString());
+
+ writer.flush();
+
+ assertEquals("", reader.readString(empty));
+ assertEquals(" ", reader.readString(space));
+ assertEquals("Hello, World!", reader.readString(hello));
+ assertEquals(builder.toString(), reader.readString(large));
+ }
+
+}
Propchange: jackrabbit/oak/trunk/oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/segment/RecordTest.java
------------------------------------------------------------------------------
svn:eol-style = native