You are viewing a plain text version of this content. The canonical link for it is here.
Posted to oak-commits@jackrabbit.apache.org by md...@apache.org on 2016/04/20 12:03:32 UTC
svn commit: r1740085 - in /jackrabbit/oak/trunk/oak-segment-next/src:
main/java/org/apache/jackrabbit/oak/plugins/segment/
main/java/org/apache/jackrabbit/oak/plugins/segment/file/
test/java/org/apache/jackrabbit/oak/plugins/segment/
Author: mduerig
Date: Wed Apr 20 10:03:32 2016
New Revision: 1740085
URL: http://svn.apache.org/viewvc?rev=1740085&view=rev
Log:
OAK-3348: Cross gc sessions might introduce references to pre-compacted segments
* Move SegmentBufferWriterPool to top level
* Allocate one SegmentBufferWriter per call to SegmentWriter
* Bind GC generation to SegmentBufferWriter thus there is no need anymore to flush writers on compaction
* Enable preCompactionReferences test as this one now passes
Added:
jackrabbit/oak/trunk/oak-segment-next/src/main/java/org/apache/jackrabbit/oak/plugins/segment/SegmentBufferWriterPool.java
Modified:
jackrabbit/oak/trunk/oak-segment-next/src/main/java/org/apache/jackrabbit/oak/plugins/segment/Segment.java
jackrabbit/oak/trunk/oak-segment-next/src/main/java/org/apache/jackrabbit/oak/plugins/segment/SegmentBufferWriter.java
jackrabbit/oak/trunk/oak-segment-next/src/main/java/org/apache/jackrabbit/oak/plugins/segment/SegmentWriter.java
jackrabbit/oak/trunk/oak-segment-next/src/main/java/org/apache/jackrabbit/oak/plugins/segment/file/FileStore.java
jackrabbit/oak/trunk/oak-segment-next/src/test/java/org/apache/jackrabbit/oak/plugins/segment/CompactionAndCleanupIT.java
jackrabbit/oak/trunk/oak-segment-next/src/test/java/org/apache/jackrabbit/oak/plugins/segment/SegmentParserTest.java
Modified: jackrabbit/oak/trunk/oak-segment-next/src/main/java/org/apache/jackrabbit/oak/plugins/segment/Segment.java
URL: http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-segment-next/src/main/java/org/apache/jackrabbit/oak/plugins/segment/Segment.java?rev=1740085&r1=1740084&r2=1740085&view=diff
==============================================================================
--- jackrabbit/oak/trunk/oak-segment-next/src/main/java/org/apache/jackrabbit/oak/plugins/segment/Segment.java (original)
+++ jackrabbit/oak/trunk/oak-segment-next/src/main/java/org/apache/jackrabbit/oak/plugins/segment/Segment.java Wed Apr 20 10:03:32 2016
@@ -23,7 +23,9 @@ import static com.google.common.base.Pre
import static com.google.common.collect.Lists.newArrayListWithCapacity;
import static com.google.common.collect.Maps.newConcurrentMap;
import static java.lang.Boolean.getBoolean;
+import static java.lang.Integer.parseInt;
import static org.apache.jackrabbit.oak.commons.IOUtils.closeQuietly;
+import static org.apache.jackrabbit.oak.plugins.segment.SegmentId.isDataSegmentId;
import static org.apache.jackrabbit.oak.plugins.segment.SegmentVersion.V_11;
import static org.apache.jackrabbit.oak.plugins.segment.SegmentWriter.BLOCK_SIZE;
@@ -36,6 +38,7 @@ import java.nio.channels.Channels;
import java.nio.channels.WritableByteChannel;
import java.util.Arrays;
import java.util.List;
+import java.util.Map;
import java.util.concurrent.ConcurrentMap;
import javax.annotation.CheckForNull;
@@ -47,6 +50,8 @@ import org.apache.commons.io.HexDump;
import org.apache.commons.io.output.ByteArrayOutputStream;
import org.apache.jackrabbit.oak.api.PropertyState;
import org.apache.jackrabbit.oak.api.Type;
+import org.apache.jackrabbit.oak.commons.json.JsonObject;
+import org.apache.jackrabbit.oak.commons.json.JsopTokenizer;
import org.apache.jackrabbit.oak.plugins.blob.ReferenceCollector;
import org.apache.jackrabbit.oak.plugins.memory.PropertyStates;
@@ -254,9 +259,10 @@ public class Segment {
}
}
- Segment(SegmentTracker tracker, byte[] buffer) {
+ Segment(SegmentTracker tracker, byte[] buffer, String info) {
this.tracker = checkNotNull(tracker);
this.id = tracker.newDataSegmentId();
+ this.info = info;
if (tracker.getStringCache() == null) {
strings = newConcurrentMap();
stringCache = null;
@@ -322,6 +328,8 @@ public class Segment {
<< RECORD_ALIGN_BITS;
}
+ private volatile String info;
+
/**
* Returns the segment meta data of this segment or {@code null} if none is present.
* <p>
@@ -339,11 +347,10 @@ public class Segment {
*/
@CheckForNull
public String getSegmentInfo() {
- if (getRootCount() == 0) {
- return null;
- } else {
- return readString(getRootOffset(0));
+ if (info == null && getRefCount() != 0) {
+ info = readString(getRootOffset(0));
}
+ return info;
}
SegmentId getRefId(int index) {
@@ -735,4 +742,28 @@ public class Segment {
return string.toString();
}
+ private volatile int gcGen = -1;
+
+ // FIXME OAK-3348 improve generation handling
+ public int getGcGen() {
+ if (gcGen < 0) {
+ if (isDataSegmentId(id.getLeastSignificantBits())) {
+ String info = getSegmentInfo();
+ if (info != null) {
+ JsopTokenizer tokenizer = new JsopTokenizer(info);
+ tokenizer.read('{');
+ Map<String, String> properties = JsonObject.create(tokenizer).getProperties();
+ int gen = parseInt(properties.get("gc"));
+ if (properties.get("wid").contains("c-")) {
+ gen++;
+ }
+ gcGen = gen;
+ return gcGen;
+ }
+ }
+ gcGen = Integer.MAX_VALUE;
+ return gcGen;
+ }
+ return gcGen;
+ }
}
Modified: jackrabbit/oak/trunk/oak-segment-next/src/main/java/org/apache/jackrabbit/oak/plugins/segment/SegmentBufferWriter.java
URL: http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-segment-next/src/main/java/org/apache/jackrabbit/oak/plugins/segment/SegmentBufferWriter.java?rev=1740085&r1=1740084&r2=1740085&view=diff
==============================================================================
--- jackrabbit/oak/trunk/oak-segment-next/src/main/java/org/apache/jackrabbit/oak/plugins/segment/SegmentBufferWriter.java (original)
+++ jackrabbit/oak/trunk/oak-segment-next/src/main/java/org/apache/jackrabbit/oak/plugins/segment/SegmentBufferWriter.java Wed Apr 20 10:03:32 2016
@@ -34,6 +34,7 @@ import static org.apache.jackrabbit.oak.
import static org.apache.jackrabbit.oak.plugins.segment.Segment.RECORD_ID_BYTES;
import static org.apache.jackrabbit.oak.plugins.segment.Segment.SEGMENT_REFERENCE_LIMIT;
import static org.apache.jackrabbit.oak.plugins.segment.Segment.align;
+import static org.apache.jackrabbit.oak.plugins.segment.SegmentId.isDataSegmentId;
import java.io.IOException;
import java.nio.ByteBuffer;
@@ -89,6 +90,8 @@ class SegmentBufferWriter {
private final SegmentTracker tracker;
+ private final int generation;
+
/**
* The segment write buffer, filled from the end to the beginning
* (see OAK-629).
@@ -117,10 +120,15 @@ class SegmentBufferWriter {
: wid);
this.tracker = store.getTracker();
+ this.generation = tracker.getCompactionMap().getGeneration();
this.buffer = createNewBuffer(version);
newSegment(this.wid);
}
+ int getGeneration() {
+ return generation;
+ }
+
/**
* Allocate a new segment and write the segment meta data.
* The segment meta data is a string of the format {@code "{wid=W,sno=S,gc=G,t=T}"}
@@ -137,12 +145,11 @@ class SegmentBufferWriter {
* @param wid the writer id
*/
private void newSegment(String wid) throws IOException {
- this.segment = new Segment(tracker, buffer);
String metaInfo = "{\"wid\":\"" + wid + '"' +
",\"sno\":" + tracker.getNextSegmentNo() +
- ",\"gc\":" + tracker.getCompactionMap().getGeneration() +
+ ",\"gc\":" + generation +
",\"t\":" + currentTimeMillis() + "}";
-
+ this.segment = new Segment(tracker, buffer, metaInfo);
byte[] data = metaInfo.getBytes(UTF_8);
newValueWriter(data.length, data).write(this);
}
@@ -198,7 +205,37 @@ class SegmentBufferWriter {
buffer[position++] = (byte) (offset >> Segment.RECORD_ALIGN_BITS);
}
+ // FIXME OAK-3348 disable/remove this in production
+ private void checkGCGen(SegmentId id) {
+ try {
+ if (isDataSegmentId(id.getLeastSignificantBits())) {
+ if (id.getSegment().getGcGen() < generation && !isCompactionMap(id)) {
+ LOG.warn("Detected reference from {} to segment {} from a previous gc generation.",
+ info(this.segment), info(id.getSegment()), new Exception());
+ }
+ }
+ } catch (SegmentNotFoundException snfe) {
+ LOG.warn("Detected reference from {} to non existing segment {}",
+ info(this.segment), id, snfe);
+ }
+ }
+
+ private static boolean isCompactionMap(SegmentId id) {
+ String info = id.getSegment().getSegmentInfo();
+ return info != null && info.contains("cm-");
+ }
+
+ private static String info(Segment segment) {
+ String info = segment.getSegmentId().toString();
+ if (isDataSegmentId(segment.getSegmentId().getLeastSignificantBits())) {
+ info += (" " + segment.getSegmentInfo());
+ }
+ return info;
+ }
+
private int getSegmentRef(SegmentId segmentId) {
+ checkGCGen(segmentId);
+
int refCount = segment.getRefCount();
if (refCount > SEGMENT_REFERENCE_LIMIT) {
throw new SegmentOverflowException(
Added: jackrabbit/oak/trunk/oak-segment-next/src/main/java/org/apache/jackrabbit/oak/plugins/segment/SegmentBufferWriterPool.java
URL: http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-segment-next/src/main/java/org/apache/jackrabbit/oak/plugins/segment/SegmentBufferWriterPool.java?rev=1740085&view=auto
==============================================================================
--- jackrabbit/oak/trunk/oak-segment-next/src/main/java/org/apache/jackrabbit/oak/plugins/segment/SegmentBufferWriterPool.java (added)
+++ jackrabbit/oak/trunk/oak-segment-next/src/main/java/org/apache/jackrabbit/oak/plugins/segment/SegmentBufferWriterPool.java Wed Apr 20 10:03:32 2016
@@ -0,0 +1,102 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.jackrabbit.oak.plugins.segment;
+
+import static com.google.common.collect.Lists.newArrayList;
+import static com.google.common.collect.Maps.newHashMap;
+import static com.google.common.collect.Sets.newHashSet;
+
+import java.io.IOException;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+
+/**
+ * FIXME OAK-3348 document
+ */
+class SegmentBufferWriterPool {
+ private final Map<Object, SegmentBufferWriter> writers = newHashMap();
+ private final Set<SegmentBufferWriter> borrowed = newHashSet();
+ private final Set<SegmentBufferWriter> disposed = newHashSet();
+ private final SegmentStore store;
+ private final SegmentVersion version;
+ private final String wid;
+
+ private short writerId = -1;
+
+ SegmentBufferWriterPool(SegmentStore store, SegmentVersion version, String wid) {
+ this.store = store;
+ this.version = version;
+ this.wid = wid;
+ }
+
+ void flush() throws IOException {
+ List<SegmentBufferWriter> toFlush = newArrayList();
+ synchronized (this) {
+ toFlush.addAll(writers.values());
+ toFlush.addAll(disposed);
+ writers.clear();
+ disposed.clear();
+ borrowed.clear();
+ }
+ // Call flush from outside a synchronized context to avoid
+ // deadlocks of that method calling SegmentStore.writeSegment
+ for (SegmentBufferWriter writer : toFlush) {
+ writer.flush();
+ }
+ }
+
+ synchronized SegmentBufferWriter borrowWriter(Object key) throws IOException {
+ SegmentBufferWriter writer = writers.remove(key);
+ if (writer == null) {
+ writer = new SegmentBufferWriter(store, version, getWriterId(wid));
+ } else if (writer.getGeneration() != store.getTracker().getCompactionMap().getGeneration()) { // FIXME OAK-3348 improve generation tracking
+ disposed.add(writer);
+ writer = new SegmentBufferWriter(store, version, getWriterId(wid));
+ }
+ borrowed.add(writer);
+ return writer;
+ }
+
+ synchronized void returnWriter(Object key, SegmentBufferWriter writer) throws IOException {
+ if (borrowed.remove(writer)) {
+ writers.put(key, writer);
+ } else {
+ // Defer flush this writer as it was borrowed while flush() was called.
+ disposed.add(writer);
+ }
+ }
+
+ private synchronized String getWriterId(String wid) {
+ if (++writerId > 9999) {
+ writerId = 0;
+ }
+ // Manual padding seems to be fastest here
+ if (writerId < 10) {
+ return wid + ".000" + writerId;
+ } else if (writerId < 100) {
+ return wid + ".00" + writerId;
+ } else if (writerId < 1000) {
+ return wid + ".0" + writerId;
+ } else {
+ return wid + "." + writerId;
+ }
+ }
+}
Modified: jackrabbit/oak/trunk/oak-segment-next/src/main/java/org/apache/jackrabbit/oak/plugins/segment/SegmentWriter.java
URL: http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-segment-next/src/main/java/org/apache/jackrabbit/oak/plugins/segment/SegmentWriter.java?rev=1740085&r1=1740084&r2=1740085&view=diff
==============================================================================
--- jackrabbit/oak/trunk/oak-segment-next/src/main/java/org/apache/jackrabbit/oak/plugins/segment/SegmentWriter.java (original)
+++ jackrabbit/oak/trunk/oak-segment-next/src/main/java/org/apache/jackrabbit/oak/plugins/segment/SegmentWriter.java Wed Apr 20 10:03:32 2016
@@ -29,10 +29,7 @@ import static com.google.common.collect.
import static com.google.common.collect.Lists.newArrayListWithExpectedSize;
import static com.google.common.collect.Lists.partition;
import static com.google.common.collect.Maps.newHashMap;
-import static com.google.common.collect.Sets.newHashSet;
import static com.google.common.io.ByteStreams.read;
-import static com.google.common.io.Closeables.close;
-import static java.lang.String.valueOf;
import static java.lang.Thread.currentThread;
import static java.util.Arrays.asList;
import static java.util.Collections.emptyMap;
@@ -66,15 +63,14 @@ import java.util.Collection;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
-import java.util.Set;
import javax.jcr.PropertyType;
+import com.google.common.io.Closeables;
import org.apache.jackrabbit.oak.api.Blob;
import org.apache.jackrabbit.oak.api.PropertyState;
import org.apache.jackrabbit.oak.api.Type;
import org.apache.jackrabbit.oak.plugins.memory.ModifiedNodeState;
-import org.apache.jackrabbit.oak.plugins.segment.RecordWriters.RecordWriter;
import org.apache.jackrabbit.oak.spi.blob.BlobStore;
import org.apache.jackrabbit.oak.spi.state.ChildNodeEntry;
import org.apache.jackrabbit.oak.spi.state.DefaultNodeStateDiff;
@@ -84,14 +80,13 @@ import org.slf4j.LoggerFactory;
/**
* Converts nodes, properties, and values to records, which are written to segments.
+ * FIXME OAK-3348 doc thread safety properties
*/
public class SegmentWriter {
private static final Logger LOG = LoggerFactory.getLogger(SegmentWriter.class);
static final int BLOCK_SIZE = 1 << 12; // 4kB
- private final SegmentBufferWriterPool segmentBufferWriterPool = new SegmentBufferWriterPool();
-
private static final int STRING_RECORDS_CACHE_SIZE = Integer.getInteger(
"oak.segment.writer.stringsCacheSize", 15000);
@@ -99,7 +94,7 @@ public class SegmentWriter {
* Cache of recently stored string records, used to avoid storing duplicates
* of frequently occurring data.
*/
- final Map<String, RecordId> stringCache = newItemsCache(
+ private final Map<String, RecordId> stringCache = newItemsCache(
STRING_RECORDS_CACHE_SIZE);
private static final int TPL_RECORDS_CACHE_SIZE = Integer.getInteger(
@@ -109,10 +104,19 @@ public class SegmentWriter {
* Cache of recently stored template records, used to avoid storing
* duplicates of frequently occurring data.
*/
- final Map<Template, RecordId> templateCache = newItemsCache(TPL_RECORDS_CACHE_SIZE);
+ private final Map<Template, RecordId> templateCache = newItemsCache(TPL_RECORDS_CACHE_SIZE);
+
+ private final SegmentStore store;
+
+ /**
+ * Version of the segment storage format.
+ */
+ private final SegmentVersion version;
+
+ private final SegmentBufferWriterPool segmentBufferWriterPool;
private static final <T> Map<T, RecordId> newItemsCache(final int size) {
- final boolean disabled = size <= 0;
+ final boolean disabled = true; // FIXME OAK-3348 re-enable caches but make generation part of cache key to avoid backrefs
final int safeSize = size <= 0 ? 0 : (int) (size * 1.2);
return new LinkedHashMap<T, RecordId>(safeSize, 0.9f, true) {
@Override
@@ -140,15 +144,6 @@ public class SegmentWriter {
};
}
- private final SegmentStore store;
-
- /**
- * Version of the segment storage format.
- */
- private final SegmentVersion version;
-
- private final String wid;
-
/**
* @param store store to write to
* @param version segment version to write
@@ -157,744 +152,801 @@ public class SegmentWriter {
public SegmentWriter(SegmentStore store, SegmentVersion version, String wid) {
this.store = store;
this.version = version;
- this.wid = wid;
+ this.segmentBufferWriterPool = new SegmentBufferWriterPool(store, version, wid);
}
public void flush() throws IOException {
segmentBufferWriterPool.flush();
}
+ // FIXME OAK-3348 this is a hack and probably prone to races: replace with making the tacker allocate a new writer
public void dropCache() {
stringCache.clear();
templateCache.clear();
}
MapRecord writeMap(MapRecord base, Map<String, RecordId> changes) throws IOException {
- if (base != null && base.isDiff()) {
- Segment segment = base.getSegment();
- RecordId key = segment.readRecordId(base.getOffset(8));
- String name = readString(key);
- if (!changes.containsKey(name)) {
- changes.put(name, segment.readRecordId(base.getOffset(8, 1)));
- }
- base = new MapRecord(segment.readRecordId(base.getOffset(8, 2)));
- }
-
- if (base != null && changes.size() == 1) {
- Map.Entry<String, RecordId> change =
- changes.entrySet().iterator().next();
- RecordId value = change.getValue();
- if (value != null) {
- MapEntry entry = base.getEntry(change.getKey());
- if (entry != null) {
- if (value.equals(entry.getValue())) {
- return base;
- } else {
- return writeRecord(newMapBranchWriter(entry.getHash(),
- asList(entry.getKey(), value, base.getRecordId())));
- }
- }
- }
+ Writer writer = new Writer();
+ try {
+ return writer.writeMap(base, changes);
+ } finally {
+ writer.close();
}
+ }
- List<MapEntry> entries = newArrayList();
- for (Map.Entry<String, RecordId> entry : changes.entrySet()) {
- String key = entry.getKey();
+ public RecordId writeList(List<RecordId> list) throws IOException {
+ Writer writer = new Writer();
+ try {
+ return writer.writeList(list);
+ } finally {
+ writer.close();
+ }
+ }
- RecordId keyId = null;
- if (base != null) {
- MapEntry e = base.getEntry(key);
- if (e != null) {
- keyId = e.getKey();
- }
- }
- if (keyId == null && entry.getValue() != null) {
- keyId = writeString(key);
- }
+ public RecordId writeString(String string) throws IOException {
+ Writer writer = new Writer();
+ try {
+ return writer.writeString(string);
+ } finally {
+ writer.close();
+ }
+ }
- if (keyId != null) {
- entries.add(new MapEntry(key, keyId, entry.getValue()));
- }
+ SegmentBlob writeBlob(Blob blob) throws IOException {
+ Writer writer = new Writer();
+ try {
+ return writer.writeBlob(blob);
+ } finally {
+ writer.close();
}
- return writeMapBucket(base, entries, 0);
}
- private MapRecord writeMapLeaf(int level, Collection<MapEntry> entries) throws IOException {
- checkNotNull(entries);
- int size = entries.size();
- checkElementIndex(size, MapRecord.MAX_SIZE);
- checkPositionIndex(level, MapRecord.MAX_NUMBER_OF_LEVELS);
- checkArgument(size != 0 || level == MapRecord.MAX_NUMBER_OF_LEVELS);
- return writeRecord(newMapLeafWriter(level, entries));
+ /**
+ * Writes a block record containing the given block of bytes.
+ *
+ * @param bytes source buffer
+ * @param offset offset within the source buffer
+ * @param length number of bytes to write
+ * @return block record identifier
+ */
+ RecordId writeBlock(byte[] bytes, int offset, int length) throws IOException {
+ Writer writer = new Writer();
+ try {
+ return writer.writeBlock(bytes, offset, length);
+ } finally {
+ writer.close();
+ }
}
- private MapRecord writeMapBranch(int level, int size, MapRecord[] buckets) throws IOException {
- int bitmap = 0;
- List<RecordId> bucketIds = newArrayListWithCapacity(buckets.length);
- for (int i = 0; i < buckets.length; i++) {
- if (buckets[i] != null) {
- bitmap |= 1L << i;
- bucketIds.add(buckets[i].getRecordId());
- }
+ SegmentBlob writeExternalBlob(String blobId) throws IOException {
+ Writer writer = new Writer();
+ try {
+ return writer.writeExternalBlob(blobId);
+ } finally {
+ writer.close();
}
- return writeRecord(newMapBranchWriter(level, size, bitmap, bucketIds));
}
- private MapRecord writeMapBucket(MapRecord base, Collection<MapEntry> entries, int level) throws IOException {
- // when no changed entries, return the base map (if any) as-is
- if (entries == null || entries.isEmpty()) {
- if (base != null) {
- return base;
- } else if (level == 0) {
- return writeRecord(newMapLeafWriter());
- } else {
- return null;
- }
+ SegmentBlob writeLargeBlob(long length, List<RecordId> list) throws IOException {
+ Writer writer = new Writer();
+ try {
+ return writer.writeLargeBlob(length, list);
+ } finally {
+ writer.close();
}
+ }
- // when no base map was given, write a fresh new map
- if (base == null) {
- // use leaf records for small maps or the last map level
- if (entries.size() <= BUCKETS_PER_LEVEL
- || level == MapRecord.MAX_NUMBER_OF_LEVELS) {
- return writeMapLeaf(level, entries);
- }
+ /**
+ * Writes a stream value record. The given stream is consumed <em>and closed</em> by
+ * this method.
+ *
+ * @param stream stream to be written
+ * @return blob for the passed {@code stream}
+ * @throws IOException if the input stream could not be read or the output could not be written
+ */
+ public SegmentBlob writeStream(InputStream stream) throws IOException {
+ Writer writer = new Writer();
+ try {
+ return writer.writeStream(stream);
+ } finally {
+ writer.close();
+ }
+ }
- // write a large map by dividing the entries into buckets
- MapRecord[] buckets = new MapRecord[BUCKETS_PER_LEVEL];
- List<List<MapEntry>> changes = splitToBuckets(entries, level);
- for (int i = 0; i < BUCKETS_PER_LEVEL; i++) {
- buckets[i] = writeMapBucket(null, changes.get(i), level + 1);
- }
+ public SegmentNodeState writeNode(NodeState state) throws IOException {
+ Writer writer = new Writer();
+ try {
+ return writer.writeNode(state);
+ } finally {
+ writer.close();
+ }
+ }
+
+ // FIXME OAK-3348 document: not thread safe
+ private final class Writer {
+ private final SegmentBufferWriter writer;
+ private final int key = currentThread().hashCode();
- // combine the buckets into one big map
- return writeMapBranch(level, entries.size(), buckets);
+ private Writer() throws IOException {
+ writer = segmentBufferWriterPool.borrowWriter(key);
}
- // if the base map is small, update in memory and write as a new map
- if (base.isLeaf()) {
- Map<String, MapEntry> map = newHashMap();
- for (MapEntry entry : base.getEntries()) {
- map.put(entry.getName(), entry);
- }
- for (MapEntry entry : entries) {
- if (entry.getValue() != null) {
- map.put(entry.getName(), entry);
- } else {
- map.remove(entry.getName());
+ private void close() throws IOException {
+ // Not implementing Closeable because we are not idempotent
+ segmentBufferWriterPool.returnWriter(key, writer);
+ }
+
+ private MapRecord writeMap(MapRecord base, Map<String, RecordId> changes) throws IOException {
+ if (base != null && base.isDiff()) {
+ Segment segment = base.getSegment();
+ RecordId key = segment.readRecordId(base.getOffset(8));
+ String name = readString(key);
+ if (!changes.containsKey(name)) {
+ changes.put(name, segment.readRecordId(base.getOffset(8, 1)));
}
+ base = new MapRecord(segment.readRecordId(base.getOffset(8, 2)));
}
- return writeMapBucket(null, map.values(), level);
- }
- // finally, the if the base map is large, handle updates per bucket
- int newSize = 0;
- int newCount = 0;
- MapRecord[] buckets = base.getBuckets();
- List<List<MapEntry>> changes = splitToBuckets(entries, level);
- for (int i = 0; i < BUCKETS_PER_LEVEL; i++) {
- buckets[i] = writeMapBucket(buckets[i], changes.get(i), level + 1);
- if (buckets[i] != null) {
- newSize += buckets[i].size();
- newCount++;
+ if (base != null && changes.size() == 1) {
+ Map.Entry<String, RecordId> change =
+ changes.entrySet().iterator().next();
+ RecordId value = change.getValue();
+ if (value != null) {
+ MapEntry entry = base.getEntry(change.getKey());
+ if (entry != null) {
+ if (value.equals(entry.getValue())) {
+ return base;
+ } else {
+ return newMapBranchWriter(entry.getHash(), asList(entry.getKey(),
+ value, base.getRecordId())).write(writer);
+ }
+ }
+ }
}
- }
- // OAK-654: what if the updated map is smaller?
- if (newSize > BUCKETS_PER_LEVEL) {
- return writeMapBranch(level, newSize, buckets);
- } else if (newCount <= 1) {
- // up to one bucket contains entries, so return that as the new map
- for (MapRecord bucket : buckets) {
- if (bucket != null) {
- return bucket;
+ List<MapEntry> entries = newArrayList();
+ for (Map.Entry<String, RecordId> entry : changes.entrySet()) {
+ String key = entry.getKey();
+
+ RecordId keyId = null;
+ if (base != null) {
+ MapEntry e = base.getEntry(key);
+ if (e != null) {
+ keyId = e.getKey();
+ }
+ }
+ if (keyId == null && entry.getValue() != null) {
+ keyId = writeString(key);
+ }
+
+ if (keyId != null) {
+ entries.add(new MapEntry(key, keyId, entry.getValue()));
}
}
- // no buckets remaining, return empty map
- return writeMapBucket(null, null, level);
- } else {
- // combine all remaining entries into a leaf record
- List<MapEntry> list = newArrayList();
- for (MapRecord bucket : buckets) {
- if (bucket != null) {
- addAll(list, bucket.getEntries());
+ return writeMapBucket(base, entries, 0);
+ }
+
+ private MapRecord writeMapLeaf(int level, Collection<MapEntry> entries) throws IOException {
+ checkNotNull(entries);
+ int size = entries.size();
+ checkElementIndex(size, MapRecord.MAX_SIZE);
+ checkPositionIndex(level, MapRecord.MAX_NUMBER_OF_LEVELS);
+ checkArgument(size != 0 || level == MapRecord.MAX_NUMBER_OF_LEVELS);
+ return newMapLeafWriter(level, entries).write(writer);
+ }
+
+ private MapRecord writeMapBranch(int level, int size, MapRecord... buckets) throws IOException {
+ int bitmap = 0;
+ List<RecordId> bucketIds = newArrayListWithCapacity(buckets.length);
+ for (int i = 0; i < buckets.length; i++) {
+ if (buckets[i] != null) {
+ bitmap |= 1L << i;
+ bucketIds.add(buckets[i].getRecordId());
}
}
- return writeMapLeaf(level, list);
+ return newMapBranchWriter(level, size, bitmap, bucketIds).write(writer);
}
- }
- /**
- * Writes a list record containing the given list of record identifiers.
- *
- * @param list list of record identifiers
- * @return list record identifier
- */
- public RecordId writeList(List<RecordId> list) throws IOException {
- checkNotNull(list);
- checkArgument(!list.isEmpty());
- List<RecordId> thisLevel = list;
- while (thisLevel.size() > 1) {
- List<RecordId> nextLevel = newArrayList();
- for (List<RecordId> bucket :
- partition(thisLevel, ListRecord.LEVEL_SIZE)) {
- if (bucket.size() > 1) {
- nextLevel.add(writeListBucket(bucket));
+ private MapRecord writeMapBucket(MapRecord base, Collection<MapEntry> entries, int level)
+ throws IOException {
+ // when no changed entries, return the base map (if any) as-is
+ if (entries == null || entries.isEmpty()) {
+ if (base != null) {
+ return base;
+ } else if (level == 0) {
+ return newMapLeafWriter().write(writer);
} else {
- nextLevel.add(bucket.get(0));
+ return null;
}
}
- thisLevel = nextLevel;
- }
- return thisLevel.iterator().next();
- }
- private RecordId writeListBucket(List<RecordId> bucket) throws IOException {
- checkArgument(bucket.size() > 1);
- return writeRecord(newListBucketWriter(bucket));
- }
+ // when no base map was given, write a fresh new map
+ if (base == null) {
+ // use leaf records for small maps or the last map level
+ if (entries.size() <= BUCKETS_PER_LEVEL
+ || level == MapRecord.MAX_NUMBER_OF_LEVELS) {
+ return writeMapLeaf(level, entries);
+ }
- private static List<List<MapEntry>> splitToBuckets(Collection<MapEntry> entries, int level) {
- List<MapEntry> empty = null;
- int mask = (1 << MapRecord.BITS_PER_LEVEL) - 1;
- int shift = 32 - (level + 1) * MapRecord.BITS_PER_LEVEL;
+ // write a large map by dividing the entries into buckets
+ MapRecord[] buckets = new MapRecord[BUCKETS_PER_LEVEL];
+ List<List<MapEntry>> changes = splitToBuckets(entries, level);
+ for (int i = 0; i < BUCKETS_PER_LEVEL; i++) {
+ buckets[i] = writeMapBucket(null, changes.get(i), level + 1);
+ }
- List<List<MapEntry>> buckets =
- newArrayList(nCopies(MapRecord.BUCKETS_PER_LEVEL, empty));
- for (MapEntry entry : entries) {
- int index = (entry.getHash() >> shift) & mask;
- List<MapEntry> bucket = buckets.get(index);
- if (bucket == null) {
- bucket = newArrayList();
- buckets.set(index, bucket);
+ // combine the buckets into one big map
+ return writeMapBranch(level, entries.size(), buckets);
}
- bucket.add(entry);
- }
- return buckets;
- }
- private RecordId writeValueRecord(long length, RecordId blocks) throws IOException {
- long len = (length - Segment.MEDIUM_LIMIT) | (0x3L << 62);
- return writeRecord(newValueWriter(blocks, len));
- }
+ // if the base map is small, update in memory and write as a new map
+ if (base.isLeaf()) {
+ Map<String, MapEntry> map = newHashMap();
+ for (MapEntry entry : base.getEntries()) {
+ map.put(entry.getName(), entry);
+ }
+ for (MapEntry entry : entries) {
+ if (entry.getValue() != null) {
+ map.put(entry.getName(), entry);
+ } else {
+ map.remove(entry.getName());
+ }
+ }
+ return writeMapBucket(null, map.values(), level);
+ }
- private RecordId writeValueRecord(int length, byte[] data) throws IOException {
- checkArgument(length < Segment.MEDIUM_LIMIT);
- return writeRecord(newValueWriter(length, data));
- }
+ // finally, the if the base map is large, handle updates per bucket
+ int newSize = 0;
+ int newCount = 0;
+ MapRecord[] buckets = base.getBuckets();
+ List<List<MapEntry>> changes = splitToBuckets(entries, level);
+ for (int i = 0; i < BUCKETS_PER_LEVEL; i++) {
+ buckets[i] = writeMapBucket(buckets[i], changes.get(i), level + 1);
+ if (buckets[i] != null) {
+ newSize += buckets[i].size();
+ newCount++;
+ }
+ }
- /**
- * Writes a string value record.
- *
- * @param string string to be written
- * @return value record identifier
- */
- public RecordId writeString(String string) throws IOException {
- RecordId id = stringCache.get(string);
- if (id != null) {
- return id; // shortcut if the same string was recently stored
+ // OAK-654: what if the updated map is smaller?
+ if (newSize > BUCKETS_PER_LEVEL) {
+ return writeMapBranch(level, newSize, buckets);
+ } else if (newCount <= 1) {
+ // up to one bucket contains entries, so return that as the new map
+ for (MapRecord bucket : buckets) {
+ if (bucket != null) {
+ return bucket;
+ }
+ }
+ // no buckets remaining, return empty map
+ return writeMapBucket(null, null, level);
+ } else {
+ // combine all remaining entries into a leaf record
+ List<MapEntry> list = newArrayList();
+ for (MapRecord bucket : buckets) {
+ if (bucket != null) {
+ addAll(list, bucket.getEntries());
+ }
+ }
+ return writeMapLeaf(level, list);
+ }
}
- byte[] data = string.getBytes(UTF_8);
+ /**
+ * Writes a list record containing the given list of record identifiers.
+ *
+ * @param list list of record identifiers
+ * @return list record identifier
+ */
+ private RecordId writeList(List<RecordId> list) throws IOException {
+ checkNotNull(list);
+ checkArgument(!list.isEmpty());
+ List<RecordId> thisLevel = list;
+ while (thisLevel.size() > 1) {
+ List<RecordId> nextLevel = newArrayList();
+ for (List<RecordId> bucket :
+ partition(thisLevel, ListRecord.LEVEL_SIZE)) {
+ if (bucket.size() > 1) {
+ nextLevel.add(writeListBucket(bucket));
+ } else {
+ nextLevel.add(bucket.get(0));
+ }
+ }
+ thisLevel = nextLevel;
+ }
+ return thisLevel.iterator().next();
+ }
- if (data.length < Segment.MEDIUM_LIMIT) {
- // only cache short strings to avoid excessive memory use
- id = writeValueRecord(data.length, data);
- stringCache.put(string, id);
- return id;
+ private RecordId writeListBucket(List<RecordId> bucket) throws IOException {
+ checkArgument(bucket.size() > 1);
+ return newListBucketWriter(bucket).write(writer);
}
- int pos = 0;
- List<RecordId> blockIds = newArrayListWithExpectedSize(
- data.length / BLOCK_SIZE + 1);
+ private List<List<MapEntry>> splitToBuckets(Collection<MapEntry> entries, int level) {
+ int mask = (1 << MapRecord.BITS_PER_LEVEL) - 1;
+ int shift = 32 - (level + 1) * MapRecord.BITS_PER_LEVEL;
- // write as many full bulk segments as possible
- while (pos + MAX_SEGMENT_SIZE <= data.length) {
- SegmentId bulkId = store.getTracker().newBulkSegmentId();
- store.writeSegment(bulkId, data, pos, MAX_SEGMENT_SIZE);
- for (int i = 0; i < MAX_SEGMENT_SIZE; i += BLOCK_SIZE) {
- blockIds.add(new RecordId(bulkId, i));
+ List<List<MapEntry>> buckets =
+ newArrayList(nCopies(MapRecord.BUCKETS_PER_LEVEL, (List<MapEntry>) null));
+ for (MapEntry entry : entries) {
+ int index = (entry.getHash() >> shift) & mask;
+ List<MapEntry> bucket = buckets.get(index);
+ if (bucket == null) {
+ bucket = newArrayList();
+ buckets.set(index, bucket);
+ }
+ bucket.add(entry);
}
- pos += MAX_SEGMENT_SIZE;
+ return buckets;
}
- // inline the remaining data as block records
- while (pos < data.length) {
- int len = Math.min(BLOCK_SIZE, data.length - pos);
- blockIds.add(writeBlock(data, pos, len));
- pos += len;
+ private RecordId writeValueRecord(long length, RecordId blocks) throws IOException {
+ long len = (length - Segment.MEDIUM_LIMIT) | (0x3L << 62);
+ return newValueWriter(blocks, len).write(writer);
}
- return writeValueRecord(data.length, writeList(blockIds));
- }
-
- public SegmentBlob writeBlob(Blob blob) throws IOException {
- if (blob instanceof SegmentBlob
- && store.containsSegment(((SegmentBlob) blob).getRecordId().getSegmentId())) {
- return (SegmentBlob) blob;
+ private RecordId writeValueRecord(int length, byte... data) throws IOException {
+ checkArgument(length < Segment.MEDIUM_LIMIT);
+ return newValueWriter(length, data).write(writer);
}
- String reference = blob.getReference();
- if (reference != null && store.getBlobStore() != null) {
- String blobId = store.getBlobStore().getBlobId(reference);
- if (blobId != null) {
- RecordId id = writeBlobId(blobId);
- return new SegmentBlob(id);
- } else {
- LOG.debug("No blob found for reference {}, inlining...", reference);
+ /**
+ * Writes a string value record.
+ *
+ * @param string string to be written
+ * @return value record identifier
+ */
+ private RecordId writeString(String string) throws IOException {
+ RecordId id = stringCache.get(string);
+ if (id != null) {
+ return id; // shortcut if the same string was recently stored
}
- }
- return writeStream(blob.getNewStream());
- }
+ byte[] data = string.getBytes(UTF_8);
- /**
- * Write a reference to an external blob. This method handles blob IDs of
- * every length, but behaves differently for small and large blob IDs.
- *
- * @param blobId Blob ID.
- * @return Record ID pointing to the written blob ID.
- * @see Segment#BLOB_ID_SMALL_LIMIT
- */
- private RecordId writeBlobId(String blobId) throws IOException {
- byte[] data = blobId.getBytes(UTF_8);
- if (data.length < Segment.BLOB_ID_SMALL_LIMIT) {
- return writeRecord(newBlobIdWriter(data));
- } else {
- return writeRecord(newBlobIdWriter(writeString(blobId)));
- }
- }
-
- /**
- * Writes a block record containing the given block of bytes.
- *
- * @param bytes source buffer
- * @param offset offset within the source buffer
- * @param length number of bytes to write
- * @return block record identifier
- */
- RecordId writeBlock(byte[] bytes, int offset, int length) throws IOException {
- checkNotNull(bytes);
- checkPositionIndexes(offset, offset + length, bytes.length);
- return writeRecord(newBlockWriter(bytes, offset, length));
- }
+ if (data.length < Segment.MEDIUM_LIMIT) {
+ // only cache short strings to avoid excessive memory use
+ id = writeValueRecord(data.length, data);
+ stringCache.put(string, id);
+ return id;
+ }
- SegmentBlob writeExternalBlob(String blobId) throws IOException {
- RecordId id = writeBlobId(blobId);
- return new SegmentBlob(id);
- }
+ int pos = 0;
+ List<RecordId> blockIds = newArrayListWithExpectedSize(
+ data.length / BLOCK_SIZE + 1);
- SegmentBlob writeLargeBlob(long length, List<RecordId> list) throws IOException {
- RecordId id = writeValueRecord(length, writeList(list));
- return new SegmentBlob(id);
- }
+ // write as many full bulk segments as possible
+ while (pos + MAX_SEGMENT_SIZE <= data.length) {
+ SegmentId bulkId = store.getTracker().newBulkSegmentId();
+ store.writeSegment(bulkId, data, pos, MAX_SEGMENT_SIZE);
+ for (int i = 0; i < MAX_SEGMENT_SIZE; i += BLOCK_SIZE) {
+ blockIds.add(new RecordId(bulkId, i));
+ }
+ pos += MAX_SEGMENT_SIZE;
+ }
- /**
- * Writes a stream value record. The given stream is consumed
- * <em>and closed</em> by this method.
- *
- * @param stream stream to be written
- * @return value record identifier
- * @throws IOException if the stream could not be read
- */
- public SegmentBlob writeStream(InputStream stream) throws IOException {
- boolean threw = true;
- try {
- RecordId id = SegmentStream.getRecordIdIfAvailable(stream, store);
- if (id == null) {
- id = internalWriteStream(stream);
+ // inline the remaining data as block records
+ while (pos < data.length) {
+ int len = Math.min(BLOCK_SIZE, data.length - pos);
+ blockIds.add(writeBlock(data, pos, len));
+ pos += len;
}
- threw = false;
- return new SegmentBlob(id);
- } finally {
- close(stream, threw);
- }
- }
- private RecordId internalWriteStream(InputStream stream)
- throws IOException {
- BlobStore blobStore = store.getBlobStore();
- byte[] data = new byte[Segment.MEDIUM_LIMIT];
- int n = read(stream, data, 0, data.length);
-
- // Special case for short binaries (up to about 16kB):
- // store them directly as small- or medium-sized value records
- if (n < Segment.MEDIUM_LIMIT) {
- return writeValueRecord(n, data);
- } else if (blobStore != null) {
- String blobId = blobStore.writeBlob(new SequenceInputStream(
- new ByteArrayInputStream(data, 0, n), stream));
- return writeBlobId(blobId);
+ return writeValueRecord(data.length, writeList(blockIds));
}
- data = Arrays.copyOf(data, MAX_SEGMENT_SIZE);
- n += read(stream, data, n, MAX_SEGMENT_SIZE - n);
- long length = n;
- List<RecordId> blockIds =
- newArrayListWithExpectedSize(2 * n / BLOCK_SIZE);
+ private boolean hasSegment(Blob blob) {
+ return (blob instanceof SegmentBlob)
+ && store.containsSegment(((Record) blob).getRecordId().getSegmentId());
+ }
- // Write the data to bulk segments and collect the list of block ids
- while (n != 0) {
- SegmentId bulkId = store.getTracker().newBulkSegmentId();
- int len = align(n, 1 << Segment.RECORD_ALIGN_BITS);
- LOG.debug("Writing bulk segment {} ({} bytes)", bulkId, n);
- store.writeSegment(bulkId, data, 0, len);
+ private SegmentBlob writeBlob(Blob blob) throws IOException {
+ if (hasSegment(blob)) {
+ SegmentBlob segmentBlob = (SegmentBlob) blob;
+ if (!isOldGen(segmentBlob.getRecordId())) {
+ return segmentBlob;
+ }
+ }
- for (int i = 0; i < n; i += BLOCK_SIZE) {
- blockIds.add(new RecordId(bulkId, data.length - len + i));
+ String reference = blob.getReference();
+ if (reference != null && store.getBlobStore() != null) {
+ String blobId = store.getBlobStore().getBlobId(reference);
+ if (blobId != null) {
+ RecordId id = writeBlobId(blobId);
+ return new SegmentBlob(id);
+ } else {
+ LOG.debug("No blob found for reference {}, inlining...", reference);
+ }
}
- n = read(stream, data, 0, data.length);
- length += n;
+ return writeStream(blob.getNewStream());
}
- return writeValueRecord(length, writeList(blockIds));
- }
+ /**
+ * Write a reference to an external blob. This method handles blob IDs of
+ * every length, but behaves differently for small and large blob IDs.
+ *
+ * @param blobId Blob ID.
+ * @return Record ID pointing to the written blob ID.
+ * @see Segment#BLOB_ID_SMALL_LIMIT
+ */
+ private RecordId writeBlobId(String blobId) throws IOException {
+ byte[] data = blobId.getBytes(UTF_8);
+ if (data.length < Segment.BLOB_ID_SMALL_LIMIT) {
+ return newBlobIdWriter(data).write(writer);
+ } else {
+ return newBlobIdWriter(writeString(blobId)).write(writer);
+ }
+ }
- public RecordId writeProperty(PropertyState state) throws IOException {
- Map<String, RecordId> previousValues = emptyMap();
- return writeProperty(state, previousValues);
- }
+ private RecordId writeBlock(byte[] bytes, int offset, int length) throws IOException {
+ checkNotNull(bytes);
+ checkPositionIndexes(offset, offset + length, bytes.length);
+ return newBlockWriter(bytes, offset, length).write(writer);
+ }
- private RecordId writeProperty(PropertyState state, Map<String, RecordId> previousValues) throws IOException {
- Type<?> type = state.getType();
- int count = state.count();
+ private SegmentBlob writeExternalBlob(String blobId) throws IOException {
+ RecordId id = writeBlobId(blobId);
+ return new SegmentBlob(id);
+ }
- List<RecordId> valueIds = newArrayList();
- for (int i = 0; i < count; i++) {
- if (type.tag() == PropertyType.BINARY) {
- try {
- SegmentBlob blob =
- writeBlob(state.getValue(BINARY, i));
- valueIds.add(blob.getRecordId());
- } catch (IOException e) {
- throw new IllegalStateException("Unexpected IOException", e);
- }
- } else {
- String value = state.getValue(STRING, i);
- RecordId valueId = previousValues.get(value);
- if (valueId == null) {
- valueId = writeString(value);
+ private SegmentBlob writeLargeBlob(long length, List<RecordId> list) throws IOException {
+ RecordId id = writeValueRecord(length, writeList(list));
+ return new SegmentBlob(id);
+ }
+
+ private SegmentBlob writeStream(InputStream stream) throws IOException {
+ boolean threw = true;
+ try {
+ RecordId id = SegmentStream.getRecordIdIfAvailable(stream, store);
+ if (id == null || isOldGen(id)) {
+ id = internalWriteStream(stream);
}
- valueIds.add(valueId);
+ threw = false;
+ return new SegmentBlob(id);
+ } finally {
+ Closeables.close(stream, threw);
}
}
- if (!type.isArray()) {
- return valueIds.iterator().next();
- } else if (count == 0) {
- return writeRecord(newListWriter());
- } else {
- return writeRecord(newListWriter(count, writeList(valueIds)));
- }
- }
+ private RecordId internalWriteStream(InputStream stream) throws IOException {
+ BlobStore blobStore = store.getBlobStore();
+ byte[] data = new byte[Segment.MEDIUM_LIMIT];
+ int n = read(stream, data, 0, data.length);
- public RecordId writeTemplate(Template template) throws IOException {
- checkNotNull(template);
+ // Special case for short binaries (up to about 16kB):
+ // store them directly as small- or medium-sized value records
+ if (n < Segment.MEDIUM_LIMIT) {
+ return writeValueRecord(n, data);
+ }
+ if (blobStore != null) {
+ String blobId = blobStore.writeBlob(new SequenceInputStream(
+ new ByteArrayInputStream(data, 0, n), stream));
+ return writeBlobId(blobId);
+ }
- RecordId id = templateCache.get(template);
- if (id != null) {
- return id; // shortcut if the same template was recently stored
- }
+ data = Arrays.copyOf(data, MAX_SEGMENT_SIZE);
+ n += read(stream, data, n, MAX_SEGMENT_SIZE - n);
+ long length = n;
+ List<RecordId> blockIds =
+ newArrayListWithExpectedSize(2 * n / BLOCK_SIZE);
- Collection<RecordId> ids = newArrayList();
- int head = 0;
+ // Write the data to bulk segments and collect the list of block ids
+ while (n != 0) {
+ SegmentId bulkId = store.getTracker().newBulkSegmentId();
+ int len = align(n, 1 << Segment.RECORD_ALIGN_BITS);
+ LOG.debug("Writing bulk segment {} ({} bytes)", bulkId, n);
+ store.writeSegment(bulkId, data, 0, len);
- RecordId primaryId = null;
- PropertyState primaryType = template.getPrimaryType();
- if (primaryType != null) {
- head |= 1 << 31;
- primaryId = writeString(primaryType.getValue(NAME));
- ids.add(primaryId);
- }
+ for (int i = 0; i < n; i += BLOCK_SIZE) {
+ blockIds.add(new RecordId(bulkId, data.length - len + i));
+ }
- List<RecordId> mixinIds = null;
- PropertyState mixinTypes = template.getMixinTypes();
- if (mixinTypes != null) {
- head |= 1 << 30;
- mixinIds = newArrayList();
- for (String mixin : mixinTypes.getValue(NAMES)) {
- mixinIds.add(writeString(mixin));
+ n = read(stream, data, 0, data.length);
+ length += n;
}
- ids.addAll(mixinIds);
- checkState(mixinIds.size() < (1 << 10));
- head |= mixinIds.size() << 18;
+
+ return writeValueRecord(length, writeList(blockIds));
}
- RecordId childNameId = null;
- String childName = template.getChildName();
- if (childName == Template.ZERO_CHILD_NODES) {
- head |= 1 << 29;
- } else if (childName == Template.MANY_CHILD_NODES) {
- head |= 1 << 28;
- } else {
- childNameId = writeString(childName);
- ids.add(childNameId);
+ private RecordId writeProperty(PropertyState state) throws IOException {
+ Map<String, RecordId> previousValues = emptyMap();
+ return writeProperty(state, previousValues);
}
- PropertyTemplate[] properties = template.getPropertyTemplates();
- RecordId[] propertyNames = new RecordId[properties.length];
- byte[] propertyTypes = new byte[properties.length];
- for (int i = 0; i < properties.length; i++) {
- // Note: if the property names are stored in more than 255 separate
- // segments, this will not work.
- propertyNames[i] = writeString(properties[i].getName());
- Type<?> type = properties[i].getType();
- if (type.isArray()) {
- propertyTypes[i] = (byte) -type.tag();
- } else {
- propertyTypes[i] = (byte) type.tag();
+ private RecordId writeProperty(PropertyState state, Map<String, RecordId> previousValues)
+ throws IOException {
+ Type<?> type = state.getType();
+ int count = state.count();
+
+ List<RecordId> valueIds = newArrayList();
+ for (int i = 0; i < count; i++) {
+ if (type.tag() == PropertyType.BINARY) {
+ try {
+ SegmentBlob blob =
+ writeBlob(state.getValue(BINARY, i));
+ valueIds.add(blob.getRecordId());
+ } catch (IOException e) {
+ throw new IllegalStateException("Unexpected IOException", e);
+ }
+ } else {
+ String value = state.getValue(STRING, i);
+ RecordId valueId = previousValues.get(value);
+ if (valueId == null) {
+ valueId = writeString(value);
+ }
+ valueIds.add(valueId);
+ }
}
- }
- RecordId propNamesId = null;
- if (version.onOrAfter(V_11)) {
- if (propertyNames.length > 0) {
- propNamesId = writeList(asList(propertyNames));
- ids.add(propNamesId);
+ if (!type.isArray()) {
+ return valueIds.iterator().next();
+ } else if (count == 0) {
+ return newListWriter().write(writer);
+ } else {
+ return newListWriter(count, writeList(valueIds)).write(writer);
}
- } else {
- ids.addAll(asList(propertyNames));
}
- checkState(propertyNames.length < (1 << 18));
- head |= propertyNames.length;
+ private RecordId writeTemplate(Template template) throws IOException {
+ checkNotNull(template);
- RecordId tid = writeRecord(newTemplateWriter(ids, propertyNames,
- propertyTypes, head, primaryId, mixinIds, childNameId,
- propNamesId, version));
- templateCache.put(template, tid);
- return tid;
- }
+ RecordId id = templateCache.get(template);
+ if (id != null) {
+ return id; // shortcut if the same template was recently stored
+ }
- public SegmentNodeState writeNode(NodeState state) throws IOException {
- if (state instanceof SegmentNodeState) {
- SegmentNodeState sns = uncompact((SegmentNodeState) state);
- if (sns != state || store.containsSegment(
- sns.getRecordId().getSegmentId())) {
- return sns;
+ Collection<RecordId> ids = newArrayList();
+ int head = 0;
+
+ RecordId primaryId = null;
+ PropertyState primaryType = template.getPrimaryType();
+ if (primaryType != null) {
+ head |= 1 << 31;
+ primaryId = writeString(primaryType.getValue(NAME));
+ ids.add(primaryId);
}
- }
- SegmentNodeState before = null;
- Template beforeTemplate = null;
- ModifiedNodeState after = null;
- if (state instanceof ModifiedNodeState) {
- after = (ModifiedNodeState) state;
- NodeState base = after.getBaseState();
- if (base instanceof SegmentNodeState) {
- SegmentNodeState sns = uncompact((SegmentNodeState) base);
- if (sns != base || store.containsSegment(
- sns.getRecordId().getSegmentId())) {
- before = sns;
- beforeTemplate = before.getTemplate();
+ List<RecordId> mixinIds = null;
+ PropertyState mixinTypes = template.getMixinTypes();
+ if (mixinTypes != null) {
+ head |= 1 << 30;
+ mixinIds = newArrayList();
+ for (String mixin : mixinTypes.getValue(NAMES)) {
+ mixinIds.add(writeString(mixin));
}
+ ids.addAll(mixinIds);
+ checkState(mixinIds.size() < (1 << 10));
+ head |= mixinIds.size() << 18;
}
- }
- Template template = new Template(state);
- RecordId templateId;
- if (before != null && template.equals(beforeTemplate)) {
- templateId = before.getTemplateId();
- } else {
- templateId = writeTemplate(template);
- }
-
- List<RecordId> ids = newArrayList();
- ids.add(templateId);
-
- String childName = template.getChildName();
- if (childName == Template.MANY_CHILD_NODES) {
- MapRecord base;
- Map<String, RecordId> childNodes;
- if (before != null
- && before.getChildNodeCount(2) > 1
- && after.getChildNodeCount(2) > 1) {
- base = before.getChildNodeMap();
- childNodes = new ChildNodeCollectorDiff().diff(before, after);
- } else {
- base = null;
- childNodes = newHashMap();
- for (ChildNodeEntry entry : state.getChildNodeEntries()) {
- childNodes.put(
- entry.getName(),
- writeNode(entry.getNodeState()).getRecordId());
- }
- }
- ids.add(writeMap(base, childNodes).getRecordId());
- } else if (childName != Template.ZERO_CHILD_NODES) {
- ids.add(writeNode(state.getChildNode(template.getChildName())).getRecordId());
- }
-
- List<RecordId> pIds = newArrayList();
- for (PropertyTemplate pt : template.getPropertyTemplates()) {
- String name = pt.getName();
- PropertyState property = state.getProperty(name);
-
- if (property instanceof SegmentPropertyState
- && store.containsSegment(((SegmentPropertyState) property).getRecordId().getSegmentId())) {
- pIds.add(((SegmentPropertyState) property).getRecordId());
- } else if (before == null
- || !store.containsSegment(before.getRecordId().getSegmentId())) {
- pIds.add(writeProperty(property));
+ RecordId childNameId = null;
+ String childName = template.getChildName();
+ if (childName == Template.ZERO_CHILD_NODES) {
+ head |= 1 << 29;
+ } else if (childName == Template.MANY_CHILD_NODES) {
+ head |= 1 << 28;
} else {
- // reuse previously stored property, if possible
- PropertyTemplate bt = beforeTemplate.getPropertyTemplate(name);
- if (bt == null) {
- pIds.add(writeProperty(property)); // new property
+ childNameId = writeString(childName);
+ ids.add(childNameId);
+ }
+
+ PropertyTemplate[] properties = template.getPropertyTemplates();
+ RecordId[] propertyNames = new RecordId[properties.length];
+ byte[] propertyTypes = new byte[properties.length];
+ for (int i = 0; i < properties.length; i++) {
+ // Note: if the property names are stored in more than 255 separate
+ // segments, this will not work.
+ propertyNames[i] = writeString(properties[i].getName());
+ Type<?> type = properties[i].getType();
+ if (type.isArray()) {
+ propertyTypes[i] = (byte) -type.tag();
} else {
- SegmentPropertyState bp = beforeTemplate.getProperty(
- before.getRecordId(), bt.getIndex());
- if (property.equals(bp)) {
- pIds.add(bp.getRecordId()); // no changes
- } else if (bp.isArray() && bp.getType() != BINARIES) {
- // reuse entries from the previous list
- pIds.add(writeProperty(property, bp.getValueRecords()));
- } else {
- pIds.add(writeProperty(property));
- }
+ propertyTypes[i] = (byte) type.tag();
}
}
- }
- if (!pIds.isEmpty()) {
+ RecordId propNamesId = null;
if (version.onOrAfter(V_11)) {
- ids.add(writeList(pIds));
+ if (propertyNames.length > 0) {
+ propNamesId = writeList(asList(propertyNames));
+ ids.add(propNamesId);
+ }
} else {
- ids.addAll(pIds);
+ ids.addAll(asList(propertyNames));
}
- }
- return writeRecord(newNodeStateWriter(ids));
- }
- /**
- * If the given node was compacted, return the compacted node, otherwise
- * return the passed node. This is to avoid pointing to old nodes, if they
- * have been compacted.
- *
- * @param state the node
- * @return the compacted node (if it was compacted)
- */
- private SegmentNodeState uncompact(SegmentNodeState state) {
- RecordId id = store.getTracker().getCompactionMap().get(state.getRecordId());
- if (id != null) {
- return new SegmentNodeState(id);
- } else {
- return state;
- }
- }
+ checkState(propertyNames.length < (1 << 18));
+ head |= propertyNames.length;
- private <T> T writeRecord(RecordWriter<T> recordWriter) throws IOException {
- SegmentBufferWriter writer = segmentBufferWriterPool.borrowWriter(currentThread());
- try {
- return recordWriter.write(writer);
- } finally {
- segmentBufferWriterPool.returnWriter(currentThread(), writer);
+ RecordId tid = newTemplateWriter(ids, propertyNames,
+ propertyTypes, head, primaryId, mixinIds, childNameId,
+ propNamesId, version).write(writer);
+ templateCache.put(template, tid);
+ return tid;
}
- }
- private class SegmentBufferWriterPool {
- private final Set<SegmentBufferWriter> borrowed = newHashSet();
- private final Map<Object, SegmentBufferWriter> writers = newHashMap();
+ // FIXME OAK-3348 defer compacted items are not in the compaction map -> performance regression
+ // split compaction map into 1) id based equality and 2) cache (like string and template) for nodes
+ private SegmentNodeState writeNode(NodeState state) throws IOException {
+ if (state instanceof SegmentNodeState) {
+ SegmentNodeState sns = uncompact((SegmentNodeState) state);
+ if (sns != state || hasSegment(sns)) {
+ if (!isOldGen(sns.getRecordId())) {
+ return sns;
+ }
+ }
+ }
- private short writerId = -1;
+ SegmentNodeState before = null;
+ Template beforeTemplate = null;
+ ModifiedNodeState after = null;
+ if (state instanceof ModifiedNodeState) {
+ after = (ModifiedNodeState) state;
+ NodeState base = after.getBaseState();
+ if (base instanceof SegmentNodeState) {
+ SegmentNodeState sns = uncompact((SegmentNodeState) base);
+ if (sns != base || hasSegment(sns)) {
+ if (!isOldGen(sns.getRecordId())) {
+ before = sns;
+ beforeTemplate = before.getTemplate();
+ }
+ }
+ }
+ }
- public void flush() throws IOException {
- List<SegmentBufferWriter> toFlush = newArrayList();
- synchronized (this) {
- toFlush.addAll(writers.values());
- writers.clear();
- borrowed.clear();
+ Template template = new Template(state);
+ RecordId templateId;
+ if (template.equals(beforeTemplate)) {
+ templateId = before.getTemplateId();
+ } else {
+ templateId = writeTemplate(template);
}
- // Call flush from outside a synchronized context to avoid
- // deadlocks of that method calling SegmentStore.writeSegment
- for (SegmentBufferWriter writer : toFlush) {
- writer.flush();
+
+ List<RecordId> ids = newArrayList();
+ ids.add(templateId);
+
+ String childName = template.getChildName();
+ if (childName == Template.MANY_CHILD_NODES) {
+ MapRecord base;
+ Map<String, RecordId> childNodes;
+ if (before != null
+ && before.getChildNodeCount(2) > 1
+ && after.getChildNodeCount(2) > 1) {
+ base = before.getChildNodeMap();
+ childNodes = new ChildNodeCollectorDiff().diff(before, after);
+ } else {
+ base = null;
+ childNodes = newHashMap();
+ for (ChildNodeEntry entry : state.getChildNodeEntries()) {
+ childNodes.put(
+ entry.getName(),
+ writeNode(entry.getNodeState()).getRecordId());
+ }
+ }
+ ids.add(writeMap(base, childNodes).getRecordId());
+ } else if (childName != Template.ZERO_CHILD_NODES) {
+ ids.add(writeNode(state.getChildNode(template.getChildName())).getRecordId());
+ }
+
+ List<RecordId> pIds = newArrayList();
+ for (PropertyTemplate pt : template.getPropertyTemplates()) {
+ String name = pt.getName();
+ PropertyState property = state.getProperty(name);
+
+ if (hasSegment(property)) {
+ RecordId pid = ((Record) property).getRecordId();
+ if (isOldGen(pid)) {
+ pIds.add(writeProperty(property));
+ } else {
+ pIds.add(pid);
+ }
+ } else if (before == null || !hasSegment(before)) {
+ pIds.add(writeProperty(property));
+ } else {
+ // reuse previously stored property, if possible
+ PropertyTemplate bt = beforeTemplate.getPropertyTemplate(name);
+ if (bt == null) {
+ pIds.add(writeProperty(property)); // new property
+ } else {
+ SegmentPropertyState bp = beforeTemplate.getProperty(before.getRecordId(), bt.getIndex());
+ if (property.equals(bp)) {
+ pIds.add(bp.getRecordId()); // no changes
+ } else if (bp.isArray() && bp.getType() != BINARIES) {
+ // reuse entries from the previous list
+ pIds.add(writeProperty(property, bp.getValueRecords()));
+ } else {
+ pIds.add(writeProperty(property));
+ }
+ }
+ }
}
- }
- public synchronized SegmentBufferWriter borrowWriter(Object key) throws IOException {
- SegmentBufferWriter writer = writers.remove(key);
- if (writer == null) {
- writer = new SegmentBufferWriter(store, version, wid + "." + getWriterId());
+ if (!pIds.isEmpty()) {
+ if (version.onOrAfter(V_11)) {
+ ids.add(writeList(pIds));
+ } else {
+ ids.addAll(pIds);
+ }
}
- borrowed.add(writer);
- return writer;
+ return newNodeStateWriter(ids).write(writer);
}
- public void returnWriter(Object key, SegmentBufferWriter writer) throws IOException {
- if (!tryReturn(key, writer)) {
- // Delayed flush this writer as it was borrowed while flush() was called.
- writer.flush();
- }
+ private boolean hasSegment(SegmentNodeState node) {
+ return store.containsSegment(node.getRecordId().getSegmentId());
}
- private synchronized boolean tryReturn(Object key, SegmentBufferWriter writer) {
- if (borrowed.remove(writer)) {
- writers.put(key, writer);
- return true;
- } else {
- return false;
- }
+ private boolean hasSegment(PropertyState property) {
+ return (property instanceof SegmentPropertyState)
+ && store.containsSegment(((Record) property).getRecordId().getSegmentId());
}
- private synchronized String getWriterId() {
- if (++writerId > 9999) {
- writerId = 0;
- }
- // Manually padding seems to be fastest here
- if (writerId < 10) {
- return "000" + writerId;
- } else if (writerId < 100) {
- return "00" + writerId;
- } else if (writerId < 1000) {
- return "0" + writerId;
+ /**
+ * If the given node was compacted, return the compacted node, otherwise
+ * return the passed node. This is to avoid pointing to old nodes, if they
+ * have been compacted.
+ *
+ * @param state the node
+ * @return the compacted node (if it was compacted)
+ */
+ private SegmentNodeState uncompact(SegmentNodeState state) {
+ RecordId id = store.getTracker().getCompactionMap().get(state.getRecordId());
+ if (id != null) {
+ return new SegmentNodeState(id);
} else {
- return valueOf(writerId);
+ return state;
}
}
- }
- private class ChildNodeCollectorDiff extends DefaultNodeStateDiff {
- private final Map<String, RecordId> childNodes = newHashMap();
- private IOException exception;
+ private boolean isOldGen(RecordId id) {
+ int thatGen = id.getSegment().getGcGen();
+ int thisGen = writer.getGeneration();
+ return thatGen < thisGen;
+ }
- @Override
- public boolean childNodeAdded(String name, NodeState after) {
- try {
- childNodes.put(name, writeNode(after).getRecordId());
- } catch (IOException e) {
- exception = e;
- return false;
+ private class ChildNodeCollectorDiff extends DefaultNodeStateDiff {
+ private final Map<String, RecordId> childNodes = newHashMap();
+ private IOException exception;
+
+ public Map<String, RecordId> diff(SegmentNodeState before, ModifiedNodeState after) throws IOException {
+ after.compareAgainstBaseState(before, this);
+ if (exception != null) {
+ throw new IOException(exception);
+ } else {
+ return childNodes;
+ }
}
- return true;
- }
- @Override
- public boolean childNodeChanged(
- String name, NodeState before, NodeState after) {
- try {
- childNodes.put(name, writeNode(after).getRecordId());
- } catch (IOException e) {
- exception = e;
- return false;
+ @Override
+ public boolean childNodeAdded(String name, NodeState after) {
+ try {
+ childNodes.put(name, writeNode(after).getRecordId());
+ } catch (IOException e) {
+ exception = e;
+ return false;
+ }
+ return true;
}
- return true;
- }
- @Override
- public boolean childNodeDeleted(String name, NodeState before) {
- childNodes.put(name, null);
- return true;
- }
+ @Override
+ public boolean childNodeChanged(
+ String name, NodeState before, NodeState after) {
+ try {
+ childNodes.put(name, writeNode(after).getRecordId());
+ } catch (IOException e) {
+ exception = e;
+ return false;
+ }
+ return true;
+ }
- public Map<String, RecordId> diff(SegmentNodeState before, ModifiedNodeState after) throws IOException {
- after.compareAgainstBaseState(before, this);
- if (exception != null) {
- throw new IOException(exception);
- } else {
- return childNodes;
+ @Override
+ public boolean childNodeDeleted(String name, NodeState before) {
+ childNodes.put(name, null);
+ return true;
}
}
+
}
+
}
Modified: jackrabbit/oak/trunk/oak-segment-next/src/main/java/org/apache/jackrabbit/oak/plugins/segment/file/FileStore.java
URL: http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-segment-next/src/main/java/org/apache/jackrabbit/oak/plugins/segment/file/FileStore.java?rev=1740085&r1=1740084&r2=1740085&view=diff
==============================================================================
--- jackrabbit/oak/trunk/oak-segment-next/src/main/java/org/apache/jackrabbit/oak/plugins/segment/file/FileStore.java (original)
+++ jackrabbit/oak/trunk/oak-segment-next/src/main/java/org/apache/jackrabbit/oak/plugins/segment/file/FileStore.java Wed Apr 20 10:03:32 2016
@@ -1484,7 +1484,6 @@ public class FileStore implements Segmen
// in an attempt to prevent new references to old pre-compacted
// content. TODO: There should be a cleaner way to do this. (implement GCMonitor!?)
tracker.getWriter().dropCache();
- tracker.getWriter().flush();
CompactionMap cm = tracker.getCompactionMap();
gcMonitor.compacted(cm.getSegmentCounts(), cm.getRecordCounts(), cm.getEstimatedWeights());
Modified: jackrabbit/oak/trunk/oak-segment-next/src/test/java/org/apache/jackrabbit/oak/plugins/segment/CompactionAndCleanupIT.java
URL: http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-segment-next/src/test/java/org/apache/jackrabbit/oak/plugins/segment/CompactionAndCleanupIT.java?rev=1740085&r1=1740084&r2=1740085&view=diff
==============================================================================
--- jackrabbit/oak/trunk/oak-segment-next/src/test/java/org/apache/jackrabbit/oak/plugins/segment/CompactionAndCleanupIT.java (original)
+++ jackrabbit/oak/trunk/oak-segment-next/src/test/java/org/apache/jackrabbit/oak/plugins/segment/CompactionAndCleanupIT.java Wed Apr 20 10:03:32 2016
@@ -67,6 +67,8 @@ import org.apache.jackrabbit.oak.spi.sta
import org.apache.jackrabbit.oak.spi.state.NodeBuilder;
import org.apache.jackrabbit.oak.spi.state.NodeState;
import org.apache.jackrabbit.oak.spi.state.NodeStore;
+import org.junit.After;
+import org.junit.Before;
import org.junit.Ignore;
import org.junit.Rule;
import org.junit.Test;
@@ -89,7 +91,7 @@ public class CompactionAndCleanupIT {
public static void assumptions() {
assumeTrue(getFixtures().contains(SEGMENT_MK));
}
-
+
@Test
public void compactionNoBinaryClone() throws Exception {
// 2MB data, 5MB blob
@@ -427,7 +429,6 @@ public class CompactionAndCleanupIT {
* Test asserting OAK-3348: Cross gc sessions might introduce references to pre-compacted segments
*/
@Test
- @Ignore("OAK-3348") // FIXME OAK-3348
public void preCompactionReferences() throws IOException, CommitFailedException, InterruptedException {
for (String ref : new String[] {"merge-before-compact", "merge-after-compact"}) {
File repoDir = new File(getFileStoreFolder(), ref);
Modified: jackrabbit/oak/trunk/oak-segment-next/src/test/java/org/apache/jackrabbit/oak/plugins/segment/SegmentParserTest.java
URL: http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-segment-next/src/test/java/org/apache/jackrabbit/oak/plugins/segment/SegmentParserTest.java?rev=1740085&r1=1740084&r2=1740085&view=diff
==============================================================================
--- jackrabbit/oak/trunk/oak-segment-next/src/test/java/org/apache/jackrabbit/oak/plugins/segment/SegmentParserTest.java (original)
+++ jackrabbit/oak/trunk/oak-segment-next/src/test/java/org/apache/jackrabbit/oak/plugins/segment/SegmentParserTest.java Wed Apr 20 10:03:32 2016
@@ -22,6 +22,7 @@ package org.apache.jackrabbit.oak.plugin
import static com.google.common.base.Strings.repeat;
import static com.google.common.collect.Lists.newArrayListWithCapacity;
import static com.google.common.collect.Maps.newHashMap;
+import static java.io.File.createTempFile;
import static junitx.framework.ComparableAssert.assertEquals;
import static org.apache.jackrabbit.oak.api.Type.BINARY;
import static org.apache.jackrabbit.oak.api.Type.LONGS;
@@ -38,10 +39,8 @@ import static org.apache.jackrabbit.oak.
import static org.apache.jackrabbit.oak.plugins.segment.TestUtils.newRecordId;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertTrue;
-import static org.mockito.Mockito.mock;
-import static org.mockito.Mockito.when;
-import static org.mockito.Mockito.withSettings;
+import java.io.File;
import java.io.IOException;
import java.util.List;
import java.util.Map;
@@ -57,7 +56,9 @@ import org.apache.jackrabbit.oak.plugins
import org.apache.jackrabbit.oak.plugins.segment.SegmentParser.MapInfo;
import org.apache.jackrabbit.oak.plugins.segment.SegmentParser.NodeInfo;
import org.apache.jackrabbit.oak.plugins.segment.SegmentParser.ValueInfo;
+import org.apache.jackrabbit.oak.plugins.segment.file.FileStore;
import org.apache.jackrabbit.oak.spi.state.NodeBuilder;
+import org.junit.After;
import org.junit.Before;
import org.junit.Test;
import org.junit.runner.RunWith;
@@ -67,6 +68,7 @@ import org.junit.runners.Parameterized;
public class SegmentParserTest {
private final SegmentVersion segmentVersion;
+ private File directory;
private SegmentStore store;
private SegmentWriter writer;
@@ -152,13 +154,21 @@ public class SegmentParserTest {
}
@Before
- public void setup() {
- store = mock(SegmentStore.class, withSettings().stubOnly());
- SegmentTracker tracker = new SegmentTracker(store);
- when(store.getTracker()).thenReturn(tracker);
+ public void setup() throws IOException {
+ directory = createTempFile(getClass().getSimpleName(), "dir", new File("target"));
+ directory.delete();
+ directory.mkdir();
+
+ store = FileStore.builder(directory).build();
writer = new SegmentWriter(store, segmentVersion, "");
}
+ @After
+ public void tearDown() {
+ store.close();
+ directory.delete();
+ }
+
@Test
public void emptyNode() throws IOException {
SegmentNodeState node = writer.writeNode(EMPTY_NODE);
@@ -293,10 +303,10 @@ public class SegmentParserTest {
assertEquals(456, size.get());
}
- private Map<String, RecordId> createMap(int size, Random rnd) {
+ private Map<String, RecordId> createMap(int size, Random rnd) throws IOException {
Map<String, RecordId> map = newHashMap();
for (int k = 0; k < size; k++) {
- map.put("k" + k, newRecordId(store.getTracker(), rnd));
+ map.put("k" + k, writer.writeString("string" + rnd.nextLong()));
}
return map;
}
@@ -443,7 +453,7 @@ public class SegmentParserTest {
Random rnd = new Random();
List<RecordId> list = newArrayListWithCapacity(count);
for (int k = 0; k < count; k++) {
- list.add(newRecordId(store.getTracker(), rnd));
+ list.add(writer.writeString("string " + rnd.nextLong()));
}
RecordId listId = writer.writeList(list);
ListInfo listInfo = new TestParser("nonEmptyList"){