You are viewing a plain text version of this content. The canonical link for it is here.
Posted to oak-commits@jackrabbit.apache.org by md...@apache.org on 2016/06/01 07:48:52 UTC
svn commit: r1746410 [2/4] - in /jackrabbit/oak/trunk:
oak-run/src/main/java/org/apache/jackrabbit/oak/checkpoint/
oak-run/src/main/java/org/apache/jackrabbit/oak/console/
oak-run/src/main/java/org/apache/jackrabbit/oak/explorer/
oak-run/src/main/java/...
Modified: jackrabbit/oak/trunk/oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/SegmentPropertyState.java
URL: http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/SegmentPropertyState.java?rev=1746410&r1=1746409&r2=1746410&view=diff
==============================================================================
--- jackrabbit/oak/trunk/oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/SegmentPropertyState.java (original)
+++ jackrabbit/oak/trunk/oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/SegmentPropertyState.java Wed Jun 1 07:48:51 2016
@@ -62,7 +62,7 @@ import org.apache.jackrabbit.oak.plugins
*/
public class SegmentPropertyState extends Record implements PropertyState {
@Nonnull
- private final SegmentStore store;
+ private final SegmentReader reader;
@Nonnull
private final String name;
@@ -70,17 +70,17 @@ public class SegmentPropertyState extend
@Nonnull
private final Type<?> type;
- SegmentPropertyState(@Nonnull SegmentStore store, @Nonnull RecordId id,
+ SegmentPropertyState(@Nonnull SegmentReader reader, @Nonnull RecordId id,
@Nonnull String name, @Nonnull Type<?> type) {
super(id);
- this.store = checkNotNull(store);
+ this.reader = checkNotNull(reader);
this.name = checkNotNull(name);
this.type = checkNotNull(type);
}
- SegmentPropertyState(@Nonnull SegmentStore store, @Nonnull RecordId id,
+ SegmentPropertyState(@Nonnull SegmentReader reader, @Nonnull RecordId id,
@Nonnull PropertyTemplate template) {
- this(store, id, template.getName(), template.getType());
+ this(reader, id, template.getName(), template.getType());
}
private ListRecord getValueList(Segment segment) {
@@ -106,7 +106,7 @@ public class SegmentPropertyState extend
ListRecord values = getValueList(segment);
for (int i = 0; i < values.size(); i++) {
RecordId valueId = values.getEntry(i);
- String value = store.getReader().readString(valueId);
+ String value = reader.readString(valueId);
map.put(value, valueId);
}
@@ -186,10 +186,10 @@ public class SegmentPropertyState extend
@SuppressWarnings("unchecked")
private <T> T getValue(RecordId id, Type<T> type) {
if (type == BINARY) {
- return (T) new SegmentBlob(store, id); // load binaries lazily
+ return (T) reader.readBlob(id); // load binaries lazily
}
- String value = store.getReader().readString(id);
+ String value = reader.readString(id);
if (type == STRING || type == URI || type == DATE
|| type == NAME || type == PATH
|| type == REFERENCE || type == WEAKREFERENCE) {
@@ -222,7 +222,7 @@ public class SegmentPropertyState extend
RecordId entry = values.getEntry(index);
if (getType().equals(BINARY) || getType().equals(BINARIES)) {
- return new SegmentBlob(store, entry).length();
+ return reader.readBlob(entry).length();
}
return getSegment().readLength(entry);
Modified: jackrabbit/oak/trunk/oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/SegmentReader.java
URL: http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/SegmentReader.java?rev=1746410&r1=1746409&r2=1746410&view=diff
==============================================================================
--- jackrabbit/oak/trunk/oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/SegmentReader.java (original)
+++ jackrabbit/oak/trunk/oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/SegmentReader.java Wed Jun 1 07:48:51 2016
@@ -21,8 +21,6 @@ package org.apache.jackrabbit.oak.segmen
import javax.annotation.Nonnull;
-import org.apache.jackrabbit.oak.cache.CacheStats;
-
public interface SegmentReader {
@Nonnull
String readString(@Nonnull RecordId id);
@@ -33,7 +31,18 @@ public interface SegmentReader {
@Nonnull
Template readTemplate(@Nonnull RecordId id);
- // FIXME OAK-4373 remove from this interface
@Nonnull
- CacheStats getStringCacheStats();
+ SegmentNodeState readNode(@Nonnull RecordId id);
+
+ @Nonnull
+ SegmentNodeState readHeadState();
+
+ @Nonnull
+ SegmentPropertyState readProperty(
+ @Nonnull RecordId id,
+ @Nonnull PropertyTemplate template);
+
+ @Nonnull
+ SegmentBlob readBlob(@Nonnull RecordId id);
+
}
Added: jackrabbit/oak/trunk/oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/SegmentReaders.java
URL: http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/SegmentReaders.java?rev=1746410&view=auto
==============================================================================
--- jackrabbit/oak/trunk/oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/SegmentReaders.java (added)
+++ jackrabbit/oak/trunk/oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/SegmentReaders.java Wed Jun 1 07:48:51 2016
@@ -0,0 +1,74 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.jackrabbit.oak.segment;
+
+import javax.annotation.Nonnull;
+
+import com.google.common.base.Supplier;
+import org.apache.jackrabbit.oak.segment.file.FileStore;
+import org.apache.jackrabbit.oak.segment.http.HttpStore;
+import org.apache.jackrabbit.oak.segment.memory.MemoryStore;
+
+public final class SegmentReaders {
+ private SegmentReaders() {}
+
+ @Nonnull
+ public static SegmentReader segmentReader(@Nonnull FileStore store, long stringCacheMB) {
+ return new CachingSegmentReader(getWriter(store), store.getRevisions(), store.getBlobStore(), stringCacheMB);
+ }
+
+ private static Supplier<SegmentWriter> getWriter(final FileStore store) {
+ return new Supplier<SegmentWriter>() {
+ @Override
+ public SegmentWriter get() {
+ return store.getWriter();
+ }
+ };
+ }
+
+ @Nonnull
+ public static SegmentReader segmentReader(@Nonnull MemoryStore store, long stringCacheMB) {
+ return new CachingSegmentReader(getWriter(store), store.getRevisions(), store.getBlobStore(), stringCacheMB);
+ }
+
+ private static Supplier<SegmentWriter> getWriter(final MemoryStore store) {
+ return new Supplier<SegmentWriter>() {
+ @Override
+ public SegmentWriter get() {
+ return store.getWriter();
+ }
+ };
+ }
+
+ @Nonnull
+ public static SegmentReader segmentReader(@Nonnull HttpStore store, long stringCacheMB) {
+ return new CachingSegmentReader(getWriter(store), store.getRevisions(), store.getBlobStore(), stringCacheMB);
+ }
+
+ private static Supplier<SegmentWriter> getWriter(final HttpStore store) {
+ return new Supplier<SegmentWriter>() {
+ @Override
+ public SegmentWriter get() {
+ return store.getWriter();
+ }
+ };
+ }
+
+}
Modified: jackrabbit/oak/trunk/oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/SegmentStore.java
URL: http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/SegmentStore.java?rev=1746410&r1=1746409&r2=1746410&view=diff
==============================================================================
--- jackrabbit/oak/trunk/oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/SegmentStore.java (original)
+++ jackrabbit/oak/trunk/oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/SegmentStore.java Wed Jun 1 07:48:51 2016
@@ -18,38 +18,14 @@
*/
package org.apache.jackrabbit.oak.segment;
-import java.io.Closeable;
import java.io.IOException;
-import javax.annotation.CheckForNull;
import javax.annotation.Nonnull;
-import org.apache.jackrabbit.oak.api.Blob;
-import org.apache.jackrabbit.oak.spi.blob.BlobStore;
-
/**
* The backend storage interface used by the segment node store.
*/
-public interface SegmentStore extends Closeable {
-
- @Nonnull
- SegmentTracker getTracker();
-
- @Nonnull
- SegmentWriter getWriter();
-
- @Nonnull
- SegmentReader getReader();
-
- /**
- * Returns the head state.
- *
- * @return head state
- */
- @Nonnull
- SegmentNodeState getHead();
-
- boolean setHead(SegmentNodeState base, SegmentNodeState head);
+public interface SegmentStore {
/**
* Checks whether the identified segment exists in this store.
@@ -78,25 +54,4 @@ public interface SegmentStore extends Cl
*/
void writeSegment(SegmentId id, byte[] bytes, int offset, int length) throws IOException;
- void close();
-
- /**
- * Read a blob from external storage.
- *
- * @param reference blob reference
- * @return external blob
- */
- Blob readBlob(String reference);
-
- /**
- * Returns the external BlobStore (if configured) with this store
- */
- @CheckForNull
- BlobStore getBlobStore();
-
- /**
- * Triggers removal of segments that are no longer referenceable.
- */
- void gc();
-
}
Modified: jackrabbit/oak/trunk/oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/SegmentWriter.java
URL: http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/SegmentWriter.java?rev=1746410&r1=1746409&r2=1746410&view=diff
==============================================================================
--- jackrabbit/oak/trunk/oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/SegmentWriter.java (original)
+++ jackrabbit/oak/trunk/oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/SegmentWriter.java Wed Jun 1 07:48:51 2016
@@ -80,9 +80,6 @@ import org.slf4j.LoggerFactory;
* {@code WriteOperationHandler} passed to the constructor is thread safe.
*/
public class SegmentWriter {
-// FIXME OAK-4102: Break cyclic dependency of FileStore and SegmentTracker
-// Improve the way how SegmentWriter instances are created. (OAK-4102)
-
private static final Logger LOG = LoggerFactory.getLogger(SegmentWriter.class);
static final int BLOCK_SIZE = 1 << 12; // 4kB
@@ -94,6 +91,15 @@ public class SegmentWriter {
private final SegmentStore store;
@Nonnull
+ private final SegmentReader reader;
+
+ @CheckForNull
+ private final BlobStore blobStore;
+
+ @Nonnull
+ private final SegmentTracker tracker;
+
+ @Nonnull
private final WriteOperationHandler writeOperationHandler;
/**
@@ -101,11 +107,20 @@ public class SegmentWriter {
* pointed out in the class comment.
*
* @param store store to write to
+ * @param reader segment reader for the {@code store}
+ * @param blobStore the blog store or {@code null} for inlined blobs
+ * @param tracker segment tracker for {@code store}
* @param writeOperationHandler handler for write operations.
*/
public SegmentWriter(@Nonnull SegmentStore store,
+ @Nonnull SegmentReader reader,
+ @Nullable BlobStore blobStore,
+ @Nonnull SegmentTracker tracker,
@Nonnull WriteOperationHandler writeOperationHandler) {
this.store = checkNotNull(store);
+ this.reader = checkNotNull(reader);
+ this.blobStore = blobStore;
+ this.tracker = checkNotNull(tracker);
this.writeOperationHandler = checkNotNull(writeOperationHandler);
this.cacheManager = new WriterCacheManager();
}
@@ -138,7 +153,7 @@ public class SegmentWriter {
return with(writer).writeMap(base, changes);
}
});
- return new MapRecord(store, mapId);
+ return new MapRecord(reader, mapId);
}
/**
@@ -187,7 +202,7 @@ public class SegmentWriter {
return with(writer).writeBlob(blob);
}
});
- return new SegmentBlob(store, blobId);
+ return new SegmentBlob(blobStore, blobId);
}
/**
@@ -225,11 +240,11 @@ public class SegmentWriter {
return with(writer).writeStream(stream);
}
});
- return new SegmentBlob(store, blobId);
+ return new SegmentBlob(blobStore, blobId);
}
/**
- * Write a propery.
+ * Write a property.
* @param state the property to write
* @return the property state written
* @throws IOException
@@ -243,7 +258,7 @@ public class SegmentWriter {
return with(writer).writeProperty(state);
}
});
- return new SegmentPropertyState(store, id, state.getName(), state.getType());
+ return new SegmentPropertyState(reader, id, state.getName(), state.getType());
}
/**
@@ -260,7 +275,7 @@ public class SegmentWriter {
return with(writer).writeNode(state, 0);
}
});
- return new SegmentNodeState(store, nodeId);
+ return new SegmentNodeState(reader, this, nodeId);
}
/**
@@ -287,7 +302,7 @@ public class SegmentWriter {
}
});
writeOperationHandler.flush();
- return new SegmentNodeState(store, nodeId);
+ return new SegmentNodeState(reader, this, nodeId);
} catch (SegmentWriteOperation.CancelledWriteException ignore) {
return null;
}
@@ -344,11 +359,11 @@ public class SegmentWriter {
if (base != null && base.isDiff()) {
Segment segment = base.getSegment();
RecordId key = segment.readRecordId(base.getOffset(8));
- String name = store.getReader().readString(key);
+ String name = reader.readString(key);
if (!changes.containsKey(name)) {
changes.put(name, segment.readRecordId(base.getOffset(8, 1)));
}
- base = new MapRecord(store, segment.readRecordId(base.getOffset(8, 2)));
+ base = new MapRecord(reader, segment.readRecordId(base.getOffset(8, 2)));
}
if (base != null && changes.size() == 1) {
@@ -384,7 +399,7 @@ public class SegmentWriter {
}
if (keyId != null) {
- entries.add(new MapEntry(store, key, keyId, entry.getValue()));
+ entries.add(new MapEntry(reader, key, keyId, entry.getValue()));
}
}
return writeMapBucket(base, entries, 0);
@@ -497,7 +512,7 @@ public class SegmentWriter {
}
private MapRecord mapRecordOrNull(RecordId id) {
- return id == null ? null : new MapRecord(store, id);
+ return id == null ? null : new MapRecord(reader, id);
}
/**
@@ -585,7 +600,7 @@ public class SegmentWriter {
// write as many full bulk segments as possible
while (pos + Segment.MAX_SEGMENT_SIZE <= data.length) {
- SegmentId bulkId = store.getTracker().newBulkSegmentId();
+ SegmentId bulkId = tracker.newBulkSegmentId();
store.writeSegment(bulkId, data, pos, Segment.MAX_SEGMENT_SIZE);
for (int i = 0; i < Segment.MAX_SEGMENT_SIZE; i += BLOCK_SIZE) {
blockIds.add(new RecordId(bulkId, i));
@@ -626,8 +641,8 @@ public class SegmentWriter {
}
String reference = blob.getReference();
- if (reference != null && store.getBlobStore() != null) {
- String blobId = store.getBlobStore().getBlobId(reference);
+ if (reference != null && blobStore != null) {
+ String blobId = blobStore.getBlobId(reference);
if (blobId != null) {
return writeBlobId(blobId);
} else {
@@ -697,7 +712,6 @@ public class SegmentWriter {
return writeValueRecord(n, data);
}
- BlobStore blobStore = store.getBlobStore();
if (blobStore != null) {
String blobId = blobStore.writeBlob(new SequenceInputStream(
new ByteArrayInputStream(data, 0, n), stream));
@@ -712,7 +726,7 @@ public class SegmentWriter {
// Write the data to bulk segments and collect the list of block ids
while (n != 0) {
- SegmentId bulkId = store.getTracker().newBulkSegmentId();
+ SegmentId bulkId = tracker.newBulkSegmentId();
int len = Segment.align(n, 1 << Segment.RECORD_ALIGN_BITS);
LOG.debug("Writing bulk segment {} ({} bytes)", bulkId, n);
store.writeSegment(bulkId, data, 0, len);
@@ -893,7 +907,7 @@ public class SegmentWriter {
}
List<RecordId> ids = newArrayList();
- Template template = new Template(store, state);
+ Template template = new Template(reader, state);
if (template.equals(beforeTemplate)) {
ids.add(before.getTemplateId());
} else {
Added: jackrabbit/oak/trunk/oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/SegmentWriters.java
URL: http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/SegmentWriters.java?rev=1746410&view=auto
==============================================================================
--- jackrabbit/oak/trunk/oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/SegmentWriters.java (added)
+++ jackrabbit/oak/trunk/oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/SegmentWriters.java Wed Jun 1 07:48:51 2016
@@ -0,0 +1,86 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.jackrabbit.oak.segment;
+
+import javax.annotation.Nonnull;
+
+import com.google.common.base.Supplier;
+import org.apache.jackrabbit.oak.segment.file.FileStore;
+import org.apache.jackrabbit.oak.segment.http.HttpStore;
+import org.apache.jackrabbit.oak.segment.memory.MemoryStore;
+
+public final class SegmentWriters {
+ private SegmentWriters() {}
+
+ @Nonnull
+ public static SegmentWriter pooledSegmentWriter(@Nonnull FileStore store,
+ @Nonnull SegmentVersion version,
+ @Nonnull String name,
+ @Nonnull Supplier<Integer> generation) {
+ return new SegmentWriter(store, store.getReader(), store.getBlobStore(), store.getTracker(),
+ new SegmentBufferWriterPool(store, store.getTracker(), store.getReader(), version, name, generation));
+ }
+
+ @Nonnull
+ public static SegmentWriter pooledSegmentWriter(@Nonnull MemoryStore store,
+ @Nonnull SegmentVersion version,
+ @Nonnull String name,
+ @Nonnull Supplier<Integer> generation) {
+ return new SegmentWriter(store, store.getReader(), store.getBlobStore(), store.getTracker(),
+ new SegmentBufferWriterPool(store, store.getTracker(), store.getReader(), version, name, generation));
+ }
+
+ @Nonnull
+ public static SegmentWriter pooledSegmentWriter(@Nonnull HttpStore store,
+ @Nonnull SegmentVersion version,
+ @Nonnull String name,
+ @Nonnull Supplier<Integer> generation) {
+ return new SegmentWriter(store, store.getReader(), store.getBlobStore(), store.getTracker(),
+ new SegmentBufferWriterPool(store, store.getTracker(), store.getReader(), version, name, generation));
+ }
+
+ @Nonnull
+ public static SegmentWriter segmentWriter(@Nonnull FileStore store,
+ @Nonnull SegmentVersion version,
+ @Nonnull String name,
+ int generation) {
+ return new SegmentWriter(store, store.getReader(), store.getBlobStore(), store.getTracker(),
+ new SegmentBufferWriter(store, store.getTracker(), store.getReader(), version, name, generation));
+ }
+
+ @Nonnull
+ public static SegmentWriter segmentWriter(@Nonnull MemoryStore store,
+ @Nonnull SegmentVersion version,
+ @Nonnull String name,
+ int generation) {
+ return new SegmentWriter(store, store.getReader(), store.getBlobStore(), store.getTracker(),
+ new SegmentBufferWriter(store, store.getTracker(), store.getReader(), version, name, generation));
+ }
+
+ @Nonnull
+ public static SegmentWriter segmentWriter(@Nonnull HttpStore store,
+ @Nonnull SegmentVersion version,
+ @Nonnull String name,
+ int generation) {
+ return new SegmentWriter(store, store.getReader(), store.getBlobStore(), store.getTracker(),
+ new SegmentBufferWriter(store, store.getTracker(), store.getReader(), version, name, generation));
+ }
+
+}
Modified: jackrabbit/oak/trunk/oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/Template.java
URL: http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/Template.java?rev=1746410&r1=1746409&r2=1746410&view=diff
==============================================================================
--- jackrabbit/oak/trunk/oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/Template.java (original)
+++ jackrabbit/oak/trunk/oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/Template.java Wed Jun 1 07:48:51 2016
@@ -59,7 +59,7 @@ public class Template {
static final String MANY_CHILD_NODES = "";
@Nonnull
- private final SegmentStore store;
+ private final SegmentReader reader;
/**
* The {@code jcr:primaryType} property, if present as a single-valued
@@ -90,12 +90,12 @@ public class Template {
@CheckForNull
private final String childName;
- Template(@Nonnull SegmentStore store,
+ Template(@Nonnull SegmentReader reader,
@Nullable PropertyState primaryType,
@Nullable PropertyState mixinTypes,
@Nullable PropertyTemplate[] properties,
@Nullable String childName) {
- this.store = store;
+ this.reader = checkNotNull(reader);
this.primaryType = primaryType;
this.mixinTypes = mixinTypes;
if (properties != null) {
@@ -107,8 +107,9 @@ public class Template {
this.childName = childName;
}
- Template(@Nonnull SegmentStore store, @Nonnull NodeState state) {
- this.store = store;
+ Template(@Nonnull SegmentReader reader, @Nonnull NodeState state) {
+ this.reader = checkNotNull(reader);
+ checkNotNull(state);
PropertyState primary = null;
PropertyState mixins = null;
List<PropertyTemplate> templates = Lists.newArrayList();
@@ -198,7 +199,7 @@ public class Template {
RecordId lid = segment.readRecordId(offset);
ListRecord props = new ListRecord(lid, properties.length);
RecordId rid = props.getEntry(index);
- return new SegmentPropertyState(store, rid, properties[index]);
+ return reader.readProperty(rid, properties[index]);
}
MapRecord getChildNodeMap(RecordId recordId) {
@@ -206,7 +207,7 @@ public class Template {
Segment segment = recordId.getSegment();
int offset = recordId.getOffset() + 2 * RECORD_ID_BYTES;
RecordId childNodesId = segment.readRecordId(offset);
- return store.getReader().readMap(childNodesId);
+ return reader.readMap(childNodesId);
}
public NodeState getChildNode(String name, RecordId recordId) {
@@ -224,7 +225,7 @@ public class Template {
Segment segment = recordId.getSegment();
int offset = recordId.getOffset() + 2 * RECORD_ID_BYTES;
RecordId childNodeId = segment.readRecordId(offset);
- return new SegmentNodeState(store, childNodeId);
+ return reader.readNode(childNodeId);
} else {
return MISSING_NODE;
}
@@ -241,7 +242,7 @@ public class Template {
int offset = recordId.getOffset() + 2 * RECORD_ID_BYTES;
RecordId childNodeId = segment.readRecordId(offset);
return Collections.singletonList(new MemoryChildNodeEntry(
- childName, new SegmentNodeState(store, childNodeId)));
+ childName, reader.readNode(childNodeId)));
}
}
Modified: jackrabbit/oak/trunk/oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/file/FileStore.java
URL: http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/file/FileStore.java?rev=1746410&r1=1746409&r2=1746410&view=diff
==============================================================================
--- jackrabbit/oak/trunk/oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/file/FileStore.java (original)
+++ jackrabbit/oak/trunk/oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/file/FileStore.java Wed Jun 1 07:48:51 2016
@@ -33,10 +33,15 @@ import static java.util.Collections.empt
import static java.util.Collections.singletonMap;
import static java.util.concurrent.TimeUnit.MILLISECONDS;
import static java.util.concurrent.TimeUnit.MINUTES;
+import static java.util.concurrent.TimeUnit.SECONDS;
import static org.apache.jackrabbit.oak.commons.IOUtils.humanReadableByteCount;
import static org.apache.jackrabbit.oak.plugins.memory.EmptyNodeState.EMPTY_NODE;
+import static org.apache.jackrabbit.oak.segment.CachingSegmentReader.DEFAULT_STRING_CACHE_MB;
import static org.apache.jackrabbit.oak.segment.SegmentId.isDataSegmentId;
-import static org.apache.jackrabbit.oak.segment.SegmentReaderImpl.DEFAULT_STRING_CACHE_MB;
+import static org.apache.jackrabbit.oak.segment.SegmentVersion.LATEST_VERSION;
+import static org.apache.jackrabbit.oak.segment.SegmentWriters.pooledSegmentWriter;
+import static org.apache.jackrabbit.oak.segment.SegmentWriters.segmentWriter;
+import static org.apache.jackrabbit.oak.segment.file.TarRevisions.timeout;
import java.io.Closeable;
import java.io.File;
@@ -54,30 +59,28 @@ import java.util.Set;
import java.util.UUID;
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutionException;
-import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicLong;
-import java.util.concurrent.atomic.AtomicReference;
import java.util.concurrent.locks.ReadWriteLock;
import java.util.concurrent.locks.ReentrantReadWriteLock;
import java.util.regex.Matcher;
import java.util.regex.Pattern;
+import javax.annotation.CheckForNull;
import javax.annotation.Nonnull;
+import javax.annotation.Nullable;
+import com.google.common.base.Function;
import com.google.common.base.Predicate;
import com.google.common.base.Predicates;
import com.google.common.base.Stopwatch;
import com.google.common.base.Supplier;
-import com.google.common.base.Suppliers;
-import org.apache.jackrabbit.oak.api.Blob;
import org.apache.jackrabbit.oak.cache.CacheStats;
-import org.apache.jackrabbit.oak.plugins.blob.BlobStoreBlob;
import org.apache.jackrabbit.oak.plugins.blob.ReferenceCollector;
+import org.apache.jackrabbit.oak.segment.CachingSegmentReader;
import org.apache.jackrabbit.oak.segment.RecordId;
import org.apache.jackrabbit.oak.segment.Segment;
import org.apache.jackrabbit.oak.segment.SegmentBufferWriter;
-import org.apache.jackrabbit.oak.segment.SegmentBufferWriterPool;
import org.apache.jackrabbit.oak.segment.SegmentCache;
import org.apache.jackrabbit.oak.segment.SegmentGraph.SegmentGraphVisitor;
import org.apache.jackrabbit.oak.segment.SegmentId;
@@ -85,7 +88,7 @@ import org.apache.jackrabbit.oak.segment
import org.apache.jackrabbit.oak.segment.SegmentNodeStore;
import org.apache.jackrabbit.oak.segment.SegmentNotFoundException;
import org.apache.jackrabbit.oak.segment.SegmentReader;
-import org.apache.jackrabbit.oak.segment.SegmentReaderImpl;
+import org.apache.jackrabbit.oak.segment.SegmentReaders;
import org.apache.jackrabbit.oak.segment.SegmentStore;
import org.apache.jackrabbit.oak.segment.SegmentTracker;
import org.apache.jackrabbit.oak.segment.SegmentVersion;
@@ -102,9 +105,7 @@ import org.slf4j.LoggerFactory;
/**
* The storage implementation for tar files.
*/
-public class FileStore implements SegmentStore {
-
- /** Logger instance */
+public class FileStore implements SegmentStore, Closeable {
private static final Logger log = LoggerFactory.getLogger(FileStore.class);
private static final int MB = 1024 * 1024;
@@ -114,8 +115,6 @@ public class FileStore implements Segmen
private static final String FILE_NAME_FORMAT = "data%05d%s.tar";
- private static final String JOURNAL_FILE_NAME = "journal.log";
-
private static final String LOCK_FILE_NAME = "repo.lock";
/**
@@ -149,24 +148,14 @@ public class FileStore implements Segmen
private volatile File writeFile;
- private volatile TarWriter writer;
-
- private final RandomAccessFile journalFile;
+ private volatile TarWriter tarWriter;
private final RandomAccessFile lockFile;
private final FileLock lock;
- /**
- * The latest head state.
- */
- private final AtomicReference<RecordId> head;
-
- /**
- * The persisted head of the root journal, used to determine whether the
- * latest {@link #head} value should be written to the disk.
- */
- private final AtomicReference<RecordId> persistedHead;
+ @Nonnull
+ private final TarRevisions revisions;
/**
* The background flush thread. Automatically flushes the TarMK state
@@ -254,8 +243,6 @@ public class FileStore implements Segmen
private BlobStore blobStore; // null -> store blobs inline
- private NodeState root = EMPTY_NODE;
-
private int maxFileSize = 256;
private int cacheSize; // 0 -> DEFAULT_MEMORY_CACHE_SIZE
@@ -274,6 +261,8 @@ public class FileStore implements Segmen
this.directory = directory;
}
+ private TarRevisions revisions;
+
/**
* Specify the {@link BlobStore}.
* @param blobStore
@@ -286,17 +275,6 @@ public class FileStore implements Segmen
}
/**
- * Specify the initial root node state for the file store
- * @param root
- * @return this instance
- */
- @Nonnull
- public Builder withRoot(@Nonnull NodeState root) {
- this.root = checkNotNull(root);
- return this;
- }
-
- /**
* Maximal size of the generated tar files in MB.
* @param maxFileSize
* @return this instance
@@ -384,7 +362,7 @@ public class FileStore implements Segmen
@Nonnull
public Builder withGCOptions(SegmentGCOptions gcOptions) {
- this.gcOptions = gcOptions;
+ this.gcOptions = checkNotNull(gcOptions);
return this;
}
@@ -407,34 +385,49 @@ public class FileStore implements Segmen
*/
@Nonnull
public FileStore build() throws IOException {
- return new FileStore(this, false);
+ directory.mkdir();
+ revisions = new TarRevisions(false, directory);
+ FileStore store = new FileStore(this, false);
+ revisions.bind(store, store.getTracker(), initialNode(store));
+ return store;
}
+ @Nonnull
public ReadOnlyStore buildReadOnly() throws IOException {
- return new ReadOnlyStore(this);
+ checkState(directory.exists() && directory.isDirectory());
+ revisions = new TarRevisions(true, directory);
+ ReadOnlyStore store = new ReadOnlyStore(this);
+ revisions.bind(store, store.getTracker(), initialNode(store));
+ return store;
}
+ @Nonnull
+ private static Supplier<RecordId> initialNode(final FileStore store) {
+ return new Supplier<RecordId>() {
+ @Override
+ public RecordId get() {
+ try {
+ SegmentWriter writer = segmentWriter(store, LATEST_VERSION, "init", 0);
+ NodeBuilder builder = EMPTY_NODE.builder();
+ builder.setChildNode("root", EMPTY_NODE);
+ SegmentNodeState node = writer.writeNode(builder.getNodeState());
+ writer.flush();
+ return node.getRecordId();
+ } catch (IOException e) {
+ String msg = "Failed to write initial node";
+ log.error(msg, e);
+ throw new IllegalStateException(msg, e);
+ }
+ }
+ };
+ }
}
private FileStore(Builder builder, boolean readOnly) throws IOException {
this.version = builder.version;
-
- if (readOnly) {
- checkNotNull(builder.directory);
- checkState(builder.directory.exists() && builder.directory.isDirectory());
- } else {
- checkNotNull(builder.directory).mkdirs();
- }
-
- // FIXME OAK-4102: Break cyclic dependency of FileStore and SegmentTracker
- // SegmentTracker and FileStore have a cyclic dependency, which we should
- // try to break. Here we pass along a not fully initialised instances of the
- // FileStore to the SegmentTracker, which in turn is in later invoked to write
- // the initial node state. Notably before this instance is fully initialised!
- // Once consequence of this is that we cannot reliably determine the current
- // GC generation while writing the initial head state. See further below.
-
this.tracker = new SegmentTracker(this);
+ this.revisions = builder.revisions;
+ this.blobStore = builder.blobStore;
// FIXME OAK-4373 refactor cache size configurations
if (builder.cacheSize < 0) {
@@ -445,34 +438,24 @@ public class FileStore implements Segmen
this.segmentCache = new SegmentCache(DEFAULT_STRING_CACHE_MB);
}
if (builder.cacheSize < 0) {
- this.segmentReader = new SegmentReaderImpl(this, 0);
+ this.segmentReader = SegmentReaders.segmentReader(this, 0);
} else if (builder.cacheSize > 0) {
- this.segmentReader = new SegmentReaderImpl(this, builder.cacheSize);
+ this.segmentReader = SegmentReaders.segmentReader(this, builder.cacheSize);
} else {
- this.segmentReader = new SegmentReaderImpl(this);
+ this.segmentReader = SegmentReaders.segmentReader(this, DEFAULT_STRING_CACHE_MB);
}
- this.segmentWriter = new SegmentWriter(this,
- new SegmentBufferWriterPool(this, version, "sys", new Supplier<Integer>() {
+ this.segmentWriter = pooledSegmentWriter(this, version, "sys", new Supplier<Integer>() {
@Override
public Integer get() {
return getGcGeneration();
}
- }));
- this.blobStore = builder.blobStore;
+ });
this.directory = builder.directory;
this.maxFileSize = builder.maxFileSize * MB;
this.memoryMapping = builder.memoryMapping;
this.gcMonitor = builder.gcMonitor;
this.gcOptions = builder.gcOptions;
- if (readOnly) {
- journalFile = new RandomAccessFile(new File(directory,
- JOURNAL_FILE_NAME), "r");
- } else {
- journalFile = new RandomAccessFile(new File(directory,
- JOURNAL_FILE_NAME), "rw");
- }
-
Map<Integer, Map<Character, File>> map = collectFiles(directory);
this.readers = newArrayListWithCapacity(map.size());
Integer[] indices = map.keySet().toArray(new Integer[map.size()]);
@@ -501,52 +484,17 @@ public class FileStore implements Segmen
}
this.writeFile = new File(directory, String.format(
FILE_NAME_FORMAT, writeNumber, "a"));
- this.writer = new TarWriter(writeFile, stats);
- }
-
- RecordId id = null;
- try (JournalReader journalReader = new JournalReader(new File(directory, JOURNAL_FILE_NAME))) {
- Iterator<String> heads = journalReader.iterator();
- while (id == null && heads.hasNext()) {
- String head = heads.next();
- try {
- RecordId last = RecordId.fromString(tracker, head);
- SegmentId segmentId = last.getSegmentId();
- if (containsSegment(
- segmentId.getMostSignificantBits(),
- segmentId.getLeastSignificantBits())) {
- id = last;
- } else {
- log.warn("Unable to access revision {}, rewinding...", last);
- }
- } catch (IllegalArgumentException ignore) {
- log.warn("Skipping invalid record id {}", head);
- }
- }
+ this.tarWriter = new TarWriter(writeFile, stats);
}
- journalFile.seek(journalFile.length());
-
if (!readOnly) {
- lockFile = new RandomAccessFile(
- new File(directory, LOCK_FILE_NAME), "rw");
+ lockFile = new RandomAccessFile(new File(directory, LOCK_FILE_NAME), "rw");
lock = lockFile.getChannel().lock();
} else {
lockFile = null;
lock = null;
}
- if (id != null) {
- head = new AtomicReference<>(id);
- persistedHead = new AtomicReference<>(id);
- } else {
- NodeBuilder nodeBuilder = EMPTY_NODE.builder();
- nodeBuilder.setChildNode("root", builder.root);
- head = new AtomicReference<>(writeNode(
- builder.root, segmentWriter, new SegmentBufferWriter(this, version, "init", 0)));
- persistedHead = new AtomicReference<>(null);
- }
-
// FIXME OAK-3468 Replace BackgroundThread with Scheduler
// Externalise these background operations
if (!readOnly) {
@@ -601,19 +549,8 @@ public class FileStore implements Segmen
log.debug("TarMK readers {}", this.readers);
}
- @Nonnull
- private static RecordId writeNode(NodeState root, SegmentWriter writer,
- SegmentBufferWriter bufferWriter)
- throws IOException {
- NodeBuilder nodeBuilder = EMPTY_NODE.builder();
- nodeBuilder.setChildNode("root", root);
- SegmentNodeState node = writer.writeNode(nodeBuilder.getNodeState(), bufferWriter, Suppliers.ofInstance(false));
- assert node != null;
- return node.getRecordId();
- }
-
private int getGcGeneration() {
- return head.get().getSegment().getGcGeneration();
+ return revisions.getHead().getSegment().getGcGeneration();
}
@Nonnull
@@ -621,6 +558,12 @@ public class FileStore implements Segmen
return segmentCache.getCacheStats();
}
+ // FIXME OAK-4373 move access to the cache stats to the segment reader and avoid casting to implementation
+ @Nonnull
+ public CacheStats getStringCacheStats() {
+ return ((CachingSegmentReader)segmentReader).getStringCacheStats();
+ }
+
public void maybeCompact(boolean cleanup) throws IOException {
gcMonitor.info("TarMK GC #{}: started", GC_COUNT.incrementAndGet());
@@ -768,7 +711,7 @@ public class FileStore implements Segmen
return dataFiles;
}
- public long size() {
+ public final long size() {
fileStoreLock.readLock().lock();
try {
long size = writeFile != null ? writeFile.length() : 0;
@@ -799,8 +742,8 @@ public class FileStore implements Segmen
fileStoreLock.readLock().lock();
try {
int count = 0;
- if (writer != null) {
- count += writer.count();
+ if (tarWriter != null) {
+ count += tarWriter.count();
}
for (TarReader reader : readers) {
count += reader.count();
@@ -818,7 +761,7 @@ public class FileStore implements Segmen
* @return compaction gain estimate
*/
CompactionGainEstimate estimateCompactionGain(Supplier<Boolean> stop) {
- CompactionGainEstimate estimate = new CompactionGainEstimate(getHead(), count(), stop);
+ CompactionGainEstimate estimate = new CompactionGainEstimate(segmentReader.readHeadState(), count(), stop);
fileStoreLock.readLock().lock();
try {
for (TarReader reader : readers) {
@@ -838,52 +781,32 @@ public class FileStore implements Segmen
}
public void flush() throws IOException {
- flush(cleanupNeeded.getAndSet(false));
- }
-
- public void flush(boolean cleanup) throws IOException {
- synchronized (persistedHead) {
- RecordId before = persistedHead.get();
- RecordId after = head.get();
-
- if (cleanup || !after.equals(before)) {
- segmentWriter.flush();
-
+ revisions.flush(new Callable<Void>() {
+ @Override
+ public Void call() throws Exception {
// FIXME OAK-4291: FileStore.flush prone to races leading to corruption
// There is a small windows that could lead to a corrupted store:
// if we crash right after setting the persisted head but before any delay-flushed
// SegmentBufferWriter instance flushes (see SegmentBufferWriterPool.returnWriter())
// then that data is lost although it might be referenced from the persisted head already.
-
// Need a test case. Possible fix: return a future from flush() and set the persisted head
// in the completion handler.
- writer.flush();
-
- fileStoreLock.writeLock().lock();
- try {
- log.debug("TarMK journal update {} -> {}", before, after);
- journalFile.writeBytes(after.toString10() + " root " + System.currentTimeMillis()+"\n");
- journalFile.getChannel().force(false);
- persistedHead.set(after);
- } finally {
- fileStoreLock.writeLock().unlock();
- }
-
- if (cleanup) {
- // Explicitly give up reference to the previous root state
- // otherwise they could block cleanup. See OAK-3347
- before = null;
- after = null;
- pendingRemove.addAll(cleanup());
- }
+ segmentWriter.flush();
+ tarWriter.flush();
+ return null;
}
+ });
- // remove all obsolete tar generations
+ if (cleanupNeeded.getAndSet(false)) {
+ pendingRemove.addAll(cleanup());
+ }
+
+ // remove all obsolete tar generations
+ synchronized (pendingRemove) {
Iterator<File> iterator = pendingRemove.iterator();
while (iterator.hasNext()) {
File file = iterator.next();
- log.debug("TarMK GC: Attempting to remove old file {}",
- file);
+ log.debug("TarMK GC: Attempting to remove old file {}", file);
if (!file.exists() || file.delete()) {
log.debug("TarMK GC: Removed old file {}", file);
iterator.remove();
@@ -1103,7 +1026,7 @@ public class FileStore implements Segmen
gcMonitor.info("TarMK GC #{}: compaction started, gc options={}", GC_COUNT, gcOptions);
Stopwatch watch = Stopwatch.createStarted();
- SegmentNodeState before = getHead();
+ SegmentNodeState before = segmentReader.readHeadState();
long existing = before.getChildNode(SegmentNodeStore.CHECKPOINTS)
.getChildNodeCount(Long.MAX_VALUE);
if (existing > 1) {
@@ -1116,7 +1039,7 @@ public class FileStore implements Segmen
final int newGeneration = getGcGeneration() + 1;
SegmentBufferWriter bufferWriter = new SegmentBufferWriter(
- this, version, "c", newGeneration);
+ this, tracker, segmentReader, version, "c", newGeneration);
Supplier<Boolean> cancel = newCancelCompactionCondition();
SegmentNodeState after = compact(bufferWriter, before, cancel);
if (after == null) {
@@ -1130,13 +1053,14 @@ public class FileStore implements Segmen
try {
int cycles = 0;
boolean success = false;
- while (cycles++ < gcOptions.getRetryCount() && !(success = setHead(before, after))) {
+ while (cycles++ < gcOptions.getRetryCount() &&
+ !(success = revisions.setHead(before.getRecordId(), after.getRecordId()))) {
// Some other concurrent changes have been made.
// Rebase (and compact) those changes on top of the
// compacted state before retrying to set the head.
gcMonitor.info("TarMK GC #{}: compaction detected concurrent commits while compacting. " +
"Compacting these commits. Cycle {}", GC_COUNT, cycles);
- SegmentNodeState head = getHead();
+ SegmentNodeState head = segmentReader.readHeadState();
after = compact(bufferWriter, head, cancel);
if (after == null) {
gcMonitor.info("TarMK GC #{}: compaction cancelled.", GC_COUNT);
@@ -1215,32 +1139,38 @@ public class FileStore implements Segmen
return segmentWriter.writeNode(node, bufferWriter, cancel);
}
- private boolean forceCompact(SegmentBufferWriter bufferWriter, Supplier<Boolean> cancel)
- throws InterruptedException, IOException {
- if (rwLock.writeLock().tryLock(gcOptions.getLockWaitTime(), TimeUnit.SECONDS)) {
- try {
- SegmentNodeState head = getHead();
- SegmentNodeState after = compact(bufferWriter, head, cancel);
- if (after == null) {
- gcMonitor.info("TarMK GC #{}: compaction cancelled.", GC_COUNT);
- return false;
- } else {
- return setHead(head, after);
+ private boolean forceCompact(@Nonnull final SegmentBufferWriter bufferWriter,
+ @Nonnull final Supplier<Boolean> cancel)
+ throws InterruptedException {
+ return revisions.
+ setHead(new Function<RecordId, RecordId>() {
+ @Nullable
+ @Override
+ public RecordId apply(RecordId base) {
+ try {
+ SegmentNodeState after = compact(bufferWriter,
+ segmentReader.readNode(base), cancel);
+ if (after == null) {
+ gcMonitor.info("TarMK GC #{}: compaction cancelled.", GC_COUNT);
+ return null;
+ } else {
+ return after.getRecordId();
+ }
+ } catch (IOException e) {
+ gcMonitor.error("TarMK GC #{" + GC_COUNT + "}: Error during forced compaction.", e);
+ return null;
+ }
}
- } finally {
- rwLock.writeLock().unlock();
- }
- } else {
- return false;
- }
+ },
+ timeout(gcOptions.getLockWaitTime(), SECONDS));
}
public Iterable<SegmentId> getSegmentIds() {
fileStoreLock.readLock().lock();
try {
List<SegmentId> ids = newArrayList();
- if (writer != null) {
- for (UUID uuid : writer.getUUIDs()) {
+ if (tarWriter != null) {
+ for (UUID uuid : tarWriter.getUUIDs()) {
ids.add(tracker.getSegmentId(
uuid.getMostSignificantBits(),
uuid.getLeastSignificantBits()));
@@ -1259,43 +1189,24 @@ public class FileStore implements Segmen
}
}
- @Override
@Nonnull
public SegmentTracker getTracker() {
return tracker;
}
- @Override
@Nonnull
public SegmentWriter getWriter() {
return segmentWriter;
}
- @Override
@Nonnull
public SegmentReader getReader() {
return segmentReader;
}
- @Override
- public SegmentNodeState getHead() {
- return new SegmentNodeState(this, head.get());
- }
-
- // FIXME OAK-4015: Expedite commits from the compactor
- // use a lock that can expedite important commits like compaction and checkpoints.
- private final ReadWriteLock rwLock = new ReentrantReadWriteLock();
-
- @Override
- public boolean setHead(SegmentNodeState base, SegmentNodeState head) {
- rwLock.readLock().lock();
- try {
- RecordId id = this.head.get();
- return id.equals(base.getRecordId())
- && this.head.compareAndSet(id, head.getRecordId());
- } finally {
- rwLock.readLock().unlock();
- }
+ @Nonnull
+ public TarRevisions getRevisions() {
+ return revisions;
}
@Override
@@ -1310,12 +1221,13 @@ public class FileStore implements Segmen
closeAndLogOnFail(diskSpaceThread);
try {
flush();
+ revisions.close();
// FIXME OAK-4291: FileStore.flush prone to races leading to corruption
// Replace this with a way to "close" the underlying SegmentBufferWriter(s)
// tracker.getWriter().dropCache();
fileStoreLock.writeLock().lock();
try {
- closeAndLogOnFail(writer);
+ closeAndLogOnFail(tarWriter);
List<TarReader> list = readers;
readers = newArrayList();
@@ -1327,7 +1239,6 @@ public class FileStore implements Segmen
lock.release();
}
closeAndLogOnFail(lockFile);
- closeAndLogOnFail(journalFile);
} finally {
fileStoreLock.writeLock().unlock();
}
@@ -1355,10 +1266,10 @@ public class FileStore implements Segmen
}
}
- if (writer != null) {
+ if (tarWriter != null) {
fileStoreLock.readLock().lock();
try {
- if (writer.containsEntry(msb, lsb)) {
+ if (tarWriter.containsEntry(msb, lsb)) {
return true;
}
} finally {
@@ -1398,23 +1309,23 @@ public class FileStore implements Segmen
ByteBuffer buffer = reader.readEntry(msb, lsb);
if (buffer != null) {
- return new Segment(FileStore.this, id, buffer);
+ return new Segment(tracker, segmentReader, id, buffer);
}
} catch (IOException e) {
log.warn("Failed to read from tar file {}", reader, e);
}
}
- if (writer != null) {
+ if (tarWriter != null) {
fileStoreLock.readLock().lock();
try {
try {
- ByteBuffer buffer = writer.readEntry(msb, lsb);
+ ByteBuffer buffer = tarWriter.readEntry(msb, lsb);
if (buffer != null) {
- return new Segment(FileStore.this, id, buffer);
+ return new Segment(tracker, segmentReader, id, buffer);
}
} catch (IOException e) {
- log.warn("Failed to read from tar file {}", writer, e);
+ log.warn("Failed to read from tar file {}", tarWriter, e);
}
} finally {
fileStoreLock.readLock().unlock();
@@ -1434,7 +1345,7 @@ public class FileStore implements Segmen
ByteBuffer buffer = reader.readEntry(msb, lsb);
if (buffer != null) {
- return new Segment(FileStore.this, id, buffer);
+ return new Segment(tracker, segmentReader, id, buffer);
}
} catch (IOException e) {
log.warn("Failed to read from tar file {}", reader, e);
@@ -1456,7 +1367,7 @@ public class FileStore implements Segmen
fileStoreLock.writeLock().lock();
try {
int generation = Segment.getGcGeneration(wrap(buffer, offset, length), id.asUUID());
- long size = writer.writeEntry(
+ long size = tarWriter.writeEntry(
id.getMostSignificantBits(),
id.getLeastSignificantBits(),
buffer, offset, length, generation);
@@ -1478,7 +1389,7 @@ public class FileStore implements Segmen
} else {
data = ByteBuffer.wrap(buffer, offset, length);
}
- segmentCache.putSegment(new Segment(this, id, data));
+ segmentCache.putSegment(new Segment(tracker, segmentReader, id, data));
}
}
@@ -1488,8 +1399,8 @@ public class FileStore implements Segmen
* @throws IOException
*/
private void newWriter() throws IOException {
- if (writer.isDirty()) {
- writer.close();
+ if (tarWriter.isDirty()) {
+ tarWriter.close();
List<TarReader> list =
newArrayListWithCapacity(1 + readers.size());
@@ -1501,25 +1412,21 @@ public class FileStore implements Segmen
writeFile = new File(
directory,
String.format(FILE_NAME_FORMAT, writeNumber, "a"));
- writer = new TarWriter(writeFile, stats);
+ tarWriter = new TarWriter(writeFile, stats);
}
}
- @Override
- public Blob readBlob(String blobId) {
- if (blobStore != null) {
- return new BlobStoreBlob(blobStore, blobId);
- }
- throw new IllegalStateException("Attempt to read external blob with blobId [" + blobId + "] " +
- "without specifying BlobStore");
- }
-
- @Override
+ /**
+ * @return the external BlobStore (if configured) with this store, {@code null} otherwise.
+ */
+ @CheckForNull
public BlobStore getBlobStore() {
return blobStore;
}
- @Override
+ /**
+ * Trigger a garbage collection cycle
+ */
public void gc() {
compactionThread.trigger();
}
@@ -1552,9 +1459,7 @@ public class FileStore implements Segmen
private void setRevision(String rootRevision) {
fileStoreLock.writeLock().lock();
try {
- RecordId id = RecordId.fromString(tracker, rootRevision);
- head.set(id);
- persistedHead.set(id);
+ revisions.setHeadId(RecordId.fromString(tracker, rootRevision));
} finally {
fileStoreLock.writeLock().unlock();
}
@@ -1637,11 +1542,6 @@ public class FileStore implements Segmen
}
@Override
- public boolean setHead(SegmentNodeState base, SegmentNodeState head) {
- throw new UnsupportedOperationException("Read Only Store");
- }
-
- @Override
public void writeSegment(SegmentId id, byte[] data,
int offset, int length) {
throw new UnsupportedOperationException("Read Only Store");
Added: jackrabbit/oak/trunk/oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/file/TarRevisions.java
URL: http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/file/TarRevisions.java?rev=1746410&view=auto
==============================================================================
--- jackrabbit/oak/trunk/oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/file/TarRevisions.java (added)
+++ jackrabbit/oak/trunk/oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/file/TarRevisions.java Wed Jun 1 07:48:51 2016
@@ -0,0 +1,251 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.jackrabbit.oak.segment.file;
+
+import static com.google.common.base.Preconditions.checkNotNull;
+import static com.google.common.base.Preconditions.checkState;
+import static com.google.common.base.Throwables.propagate;
+import static com.google.common.base.Throwables.propagateIfInstanceOf;
+import static java.lang.Long.MAX_VALUE;
+import static java.util.concurrent.TimeUnit.DAYS;
+
+import java.io.Closeable;
+import java.io.File;
+import java.io.IOException;
+import java.io.RandomAccessFile;
+import java.util.Iterator;
+import java.util.concurrent.Callable;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicReference;
+import java.util.concurrent.locks.Lock;
+import java.util.concurrent.locks.ReadWriteLock;
+import java.util.concurrent.locks.ReentrantLock;
+import java.util.concurrent.locks.ReentrantReadWriteLock;
+
+import javax.annotation.CheckForNull;
+import javax.annotation.Nonnull;
+
+import com.google.common.base.Function;
+import com.google.common.base.Supplier;
+import org.apache.jackrabbit.oak.segment.RecordId;
+import org.apache.jackrabbit.oak.segment.Revisions;
+import org.apache.jackrabbit.oak.segment.SegmentStore;
+import org.apache.jackrabbit.oak.segment.SegmentTracker;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class TarRevisions implements Revisions, Closeable {
+ private static final Logger LOG = LoggerFactory.getLogger(TarRevisions.class);
+
+ public static final String JOURNAL_FILE_NAME = "journal.log";
+
+ private final boolean readOnly;
+
+ /**
+ * The latest head state.
+ */
+ @Nonnull
+ private final AtomicReference<RecordId> head;
+
+ @Nonnull
+ private final File directory;
+
+ @Nonnull
+ private final RandomAccessFile journalFile;
+
+ /**
+ * The persisted head of the root journal, used to determine whether the
+ * latest {@link #head} value should be written to the disk.
+ */
+ @Nonnull
+ private final AtomicReference<RecordId> persistedHead;
+
+ // FIXME OAK-4015: Expedite commits from the compactor
+ // use a lock that can expedite important commits like compaction and checkpoints.
+ @Nonnull
+ private final ReadWriteLock rwLock = new ReentrantReadWriteLock();
+
+ private static class TimeOutOption implements Option {
+ private final long time;
+
+ @Nonnull
+ private final TimeUnit unit;
+
+ TimeOutOption(long time, @Nonnull TimeUnit unit) {
+ this.time = time;
+ this.unit = unit;
+ }
+
+ @Nonnull
+ public static TimeOutOption from(@CheckForNull Option option) {
+ if (option instanceof TimeOutOption) {
+ return (TimeOutOption) option;
+ } else {
+ throw new IllegalArgumentException("Invalid option " + option);
+ }
+ }
+ }
+
+ public static final Option INFINITY = new TimeOutOption(MAX_VALUE, DAYS);
+
+ public static Option timeout(long time, TimeUnit unit) {
+ return new TimeOutOption(time, unit);
+ }
+
+ public TarRevisions(boolean readOnly, @Nonnull File directory)
+ throws IOException {
+ this.readOnly = readOnly;
+ this.directory = checkNotNull(directory);
+ this.journalFile = new RandomAccessFile(new File(directory, JOURNAL_FILE_NAME),
+ readOnly ? "r" : "rw");
+ this.journalFile.seek(journalFile.length());
+ this.head = new AtomicReference<>(null);
+ this.persistedHead = new AtomicReference<>(null);
+ }
+
+ synchronized void bind(@Nonnull SegmentStore store,
+ @Nonnull SegmentTracker tracker,
+ @Nonnull Supplier<RecordId> writeInitialNode)
+ throws IOException {
+ if (head.get() == null) {
+ RecordId persistedId = null;
+ try (JournalReader journalReader = new JournalReader(new File(directory, JOURNAL_FILE_NAME))) {
+ Iterator<String> entries = journalReader.iterator();
+ while (persistedId == null && entries.hasNext()) {
+ String entry = entries.next();
+ try {
+ RecordId id = RecordId.fromString(tracker, entry);
+ if (store.containsSegment(id.getSegmentId())) {
+ persistedId = id;
+ } else {
+ LOG.warn("Unable to access revision {}, rewinding...", id);
+ }
+ } catch (IllegalArgumentException ignore) {
+ LOG.warn("Skipping invalid record id {}", entry);
+ }
+ }
+ }
+
+ if (persistedId == null) {
+ head.set(writeInitialNode.get());
+ } else {
+ persistedHead.set(persistedId);
+ head.set(persistedId);
+ }
+ }
+ }
+
+ private void checkBound() {
+ checkState(head.get() != null, "Revisions not bound to a store");
+ }
+
+ private final Lock flushLock = new ReentrantLock();
+
+ public void flush(@Nonnull Callable<Void> persisted) throws IOException {
+ checkBound();
+ if (flushLock.tryLock()) {
+ try {
+ RecordId before = persistedHead.get();
+ RecordId after = getHead();
+ if (!after.equals(before)) {
+ persisted.call();
+
+ LOG.debug("TarMK journal update {} -> {}", before, after);
+ journalFile.writeBytes(after.toString10() + " root " + System.currentTimeMillis() + "\n");
+ journalFile.getChannel().force(false);
+ persistedHead.set(after);
+ }
+ } catch (Exception e) {
+ propagateIfInstanceOf(e, IOException.class);
+ propagate(e);
+ } finally {
+ flushLock.unlock();
+ }
+ }
+ }
+
+ @Nonnull
+ @Override
+ public RecordId getHead() {
+ checkBound();
+ return head.get();
+ }
+
+ @Override
+ public boolean setHead(
+ @Nonnull RecordId base,
+ @Nonnull RecordId head,
+ @Nonnull Option... options) {
+ checkBound();
+ rwLock.readLock().lock();
+ try {
+ RecordId id = this.head.get();
+ return id.equals(base) && this.head.compareAndSet(id, head);
+ } finally {
+ rwLock.readLock().unlock();
+ }
+ }
+
+ @Override
+ public boolean setHead(
+ @Nonnull Function<RecordId, RecordId> newHead,
+ @Nonnull Option... options)
+ throws InterruptedException {
+ checkBound();
+ TimeOutOption timeout = getTimeout(options);
+ if (rwLock.writeLock().tryLock(timeout.time, timeout.unit)) {
+ try {
+ RecordId after = newHead.apply(getHead());
+ if (after != null) {
+ head.set(after);
+ return true;
+ } else {
+ return false;
+ }
+ } finally {
+ rwLock.writeLock().unlock();
+ }
+ } else {
+ return false;
+ }
+ }
+
+ @Nonnull
+ private static TimeOutOption getTimeout(@Nonnull Option[] options) {
+ if (options.length == 0) {
+ return TimeOutOption.from(INFINITY);
+ } else if (options.length == 1) {
+ return TimeOutOption.from(options[0]);
+ } else {
+ throw new IllegalArgumentException("Expected zero or one options, got " + options.length);
+ }
+ }
+
+ @Override
+ public void close() throws IOException {
+ journalFile.close();
+ }
+
+ void setHeadId(@Nonnull RecordId headId) {
+ checkState(readOnly, "Cannot set revision on a writable store");
+ head.set(headId);
+ persistedHead.set(headId);
+ }
+}
Modified: jackrabbit/oak/trunk/oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/file/tooling/ConsistencyChecker.java
URL: http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/file/tooling/ConsistencyChecker.java?rev=1746410&r1=1746409&r2=1746410&view=diff
==============================================================================
--- jackrabbit/oak/trunk/oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/file/tooling/ConsistencyChecker.java (original)
+++ jackrabbit/oak/trunk/oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/file/tooling/ConsistencyChecker.java Wed Jun 1 07:48:51 2016
@@ -38,7 +38,7 @@ import org.apache.jackrabbit.oak.api.Blo
import org.apache.jackrabbit.oak.api.PropertyState;
import org.apache.jackrabbit.oak.api.Type;
import org.apache.jackrabbit.oak.segment.SegmentBlob;
-import org.apache.jackrabbit.oak.segment.SegmentNodeStore;
+import org.apache.jackrabbit.oak.segment.SegmentNodeStoreBuilders;
import org.apache.jackrabbit.oak.segment.file.FileStore;
import org.apache.jackrabbit.oak.segment.file.FileStore.ReadOnlyStore;
import org.apache.jackrabbit.oak.segment.file.JournalReader;
@@ -145,7 +145,7 @@ public class ConsistencyChecker {
private String checkPath(String path, long binLen) {
try {
print("Checking {}", path);
- NodeState root = SegmentNodeStore.builder(store).build().getRoot();
+ NodeState root = SegmentNodeStoreBuilders.builder(store).build().getRoot();
String parentPath = getParentPath(path);
String name = getName(path);
NodeState parent = getNode(root, parentPath);
@@ -173,7 +173,8 @@ public class ConsistencyChecker {
store.setRevision(revision);
nodeCount = 0;
propertyCount = 0;
- String result = traverse(SegmentNodeStore.builder(store).build().getRoot(), "/", true, binLen);
+ String result = traverse(SegmentNodeStoreBuilders.builder(store).build()
+ .getRoot(), "/", true, binLen);
print("Traversed {} nodes and {} properties", nodeCount, propertyCount);
return result;
} catch (RuntimeException e) {
Modified: jackrabbit/oak/trunk/oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/file/tooling/RevisionHistory.java
URL: http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/file/tooling/RevisionHistory.java?rev=1746410&r1=1746409&r2=1746410&view=diff
==============================================================================
--- jackrabbit/oak/trunk/oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/file/tooling/RevisionHistory.java (original)
+++ jackrabbit/oak/trunk/oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/file/tooling/RevisionHistory.java Wed Jun 1 07:48:51 2016
@@ -81,7 +81,7 @@ public class RevisionHistory {
@Nullable @Override
public HistoryElement apply(String revision) {
store.setRevision(revision);
- NodeState node = getNode(store.getHead(), path);
+ NodeState node = getNode(store.getReader().readHeadState(), path);
return new HistoryElement(revision, node);
}
});
Modified: jackrabbit/oak/trunk/oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/http/HttpStore.java
URL: http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/http/HttpStore.java?rev=1746410&r1=1746409&r2=1746410&view=diff
==============================================================================
--- jackrabbit/oak/trunk/oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/http/HttpStore.java (original)
+++ jackrabbit/oak/trunk/oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/http/HttpStore.java Wed Jun 1 07:48:51 2016
@@ -18,13 +18,12 @@
*/
package org.apache.jackrabbit.oak.segment.http;
-import static com.google.common.base.Charsets.UTF_8;
+import static org.apache.jackrabbit.oak.segment.CachingSegmentReader.DEFAULT_STRING_CACHE_MB;
import static org.apache.jackrabbit.oak.segment.SegmentVersion.LATEST_VERSION;
+import static org.apache.jackrabbit.oak.segment.SegmentWriters.pooledSegmentWriter;
-import java.io.BufferedReader;
import java.io.IOException;
import java.io.InputStream;
-import java.io.InputStreamReader;
import java.io.OutputStream;
import java.net.MalformedURLException;
import java.net.URL;
@@ -34,16 +33,14 @@ import java.nio.ByteBuffer;
import javax.annotation.CheckForNull;
import javax.annotation.Nonnull;
+import com.google.common.base.Suppliers;
import com.google.common.io.ByteStreams;
-import org.apache.jackrabbit.oak.api.Blob;
-import org.apache.jackrabbit.oak.segment.RecordId;
+import org.apache.jackrabbit.oak.segment.Revisions;
import org.apache.jackrabbit.oak.segment.Segment;
-import org.apache.jackrabbit.oak.segment.SegmentBufferWriterPool;
import org.apache.jackrabbit.oak.segment.SegmentId;
-import org.apache.jackrabbit.oak.segment.SegmentNodeState;
import org.apache.jackrabbit.oak.segment.SegmentNotFoundException;
import org.apache.jackrabbit.oak.segment.SegmentReader;
-import org.apache.jackrabbit.oak.segment.SegmentReaderImpl;
+import org.apache.jackrabbit.oak.segment.SegmentReaders;
import org.apache.jackrabbit.oak.segment.SegmentStore;
import org.apache.jackrabbit.oak.segment.SegmentTracker;
import org.apache.jackrabbit.oak.segment.SegmentWriter;
@@ -55,11 +52,14 @@ public class HttpStore implements Segmen
private final SegmentTracker tracker = new SegmentTracker(this);
@Nonnull
- private final SegmentWriter segmentWriter = new SegmentWriter(this,
- new SegmentBufferWriterPool(this, LATEST_VERSION, "sys"));
+ private final HttpStoreRevisions revisions = new HttpStoreRevisions(this);
@Nonnull
- private final SegmentReader segmentReader = new SegmentReaderImpl(this);
+ private final SegmentReader segmentReader = SegmentReaders.segmentReader(this, DEFAULT_STRING_CACHE_MB);
+
+ @Nonnull
+ private final SegmentWriter segmentWriter = pooledSegmentWriter(this,
+ LATEST_VERSION, "sys", Suppliers.ofInstance(0));
private final URL base;
@@ -72,30 +72,32 @@ public class HttpStore implements Segmen
this.base = base;
}
- @Override
@Nonnull
public SegmentTracker getTracker() {
return tracker;
}
- @Override
@Nonnull
public SegmentWriter getWriter() {
return segmentWriter;
}
- @Override
@Nonnull
public SegmentReader getReader() {
return segmentReader;
}
+ @Nonnull
+ public Revisions getRevisions() {
+ return revisions;
+ }
+
/**
* Builds a simple URLConnection. This method can be extended to add
* authorization headers if needed.
*
*/
- protected URLConnection get(String fragment) throws MalformedURLException,
+ URLConnection get(String fragment) throws MalformedURLException,
IOException {
final URL url;
if (fragment == null) {
@@ -107,33 +109,6 @@ public class HttpStore implements Segmen
}
@Override
- public SegmentNodeState getHead() {
- try {
- URLConnection connection = get(null);
- InputStream stream = connection.getInputStream();
- try {
- BufferedReader reader = new BufferedReader(
- new InputStreamReader(stream, UTF_8));
- return new SegmentNodeState(this, RecordId.fromString(tracker, reader.readLine()));
- } finally {
- stream.close();
- }
- } catch (IllegalArgumentException e) {
- throw new IllegalStateException(e);
- } catch (MalformedURLException e) {
- throw new IllegalStateException(e);
- } catch (IOException e) {
- throw new RuntimeException(e);
- }
- }
-
- @Override
- public boolean setHead(SegmentNodeState base, SegmentNodeState head) {
- // TODO throw new UnsupportedOperationException();
- return true;
- }
-
- @Override
// FIXME OAK-4396: HttpStore.containsSegment throws SNFE instead of returning false for non existing segments
public boolean containsSegment(SegmentId id) {
return id.sameStore(this) || readSegment(id) != null;
@@ -147,7 +122,7 @@ public class HttpStore implements Segmen
InputStream stream = connection.getInputStream();
try {
byte[] data = ByteStreams.toByteArray(stream);
- return new Segment(this, id, ByteBuffer.wrap(data));
+ return new Segment(tracker, segmentReader, id, ByteBuffer.wrap(data));
} finally {
stream.close();
}
@@ -176,23 +151,12 @@ public class HttpStore implements Segmen
}
}
- @Override
- public void close() {
- }
-
- @Override @CheckForNull
- public Blob readBlob(String reference) {
- return null;
- }
-
- @Override @CheckForNull
+ /**
+ * @return {@code null}
+ */
+ @CheckForNull
public BlobStore getBlobStore() {
return null;
}
- @Override
- public void gc() {
- // TODO: distributed gc
- }
-
}
Added: jackrabbit/oak/trunk/oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/http/HttpStoreRevisions.java
URL: http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/http/HttpStoreRevisions.java?rev=1746410&view=auto
==============================================================================
--- jackrabbit/oak/trunk/oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/http/HttpStoreRevisions.java (added)
+++ jackrabbit/oak/trunk/oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/http/HttpStoreRevisions.java Wed Jun 1 07:48:51 2016
@@ -0,0 +1,77 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.jackrabbit.oak.segment.http;
+
+import static com.google.common.base.Charsets.UTF_8;
+
+import java.io.BufferedReader;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.InputStreamReader;
+import java.net.MalformedURLException;
+import java.net.URLConnection;
+
+import javax.annotation.Nonnull;
+
+import com.google.common.base.Function;
+import org.apache.jackrabbit.oak.segment.RecordId;
+import org.apache.jackrabbit.oak.segment.Revisions;
+
+public class HttpStoreRevisions implements Revisions {
+
+ @Nonnull
+ private final HttpStore store;
+
+ public HttpStoreRevisions(@Nonnull HttpStore store) {
+ this.store = store;
+ }
+
+ @Nonnull
+ @Override
+ public RecordId getHead() {
+ try {
+ URLConnection connection = store.get(null);
+ try (
+ InputStream stream = connection.getInputStream();
+ BufferedReader reader = new BufferedReader(new InputStreamReader(stream, UTF_8))
+ ) {
+ return RecordId.fromString(store.getTracker(), reader.readLine());
+ }
+ } catch (IllegalArgumentException | MalformedURLException e) {
+ throw new IllegalStateException(e);
+ } catch (IOException e) {
+ throw new RuntimeException(e);
+ }
+ }
+
+ @Override
+ public boolean setHead(
+ @Nonnull RecordId base, @Nonnull RecordId head,
+ @Nonnull Option... options) {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public boolean setHead(
+ @Nonnull Function<RecordId, RecordId> newHead,
+ @Nonnull Option... options) throws InterruptedException {
+ throw new UnsupportedOperationException();
+ }
+}
Modified: jackrabbit/oak/trunk/oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/memory/MemoryStore.java
URL: http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/memory/MemoryStore.java?rev=1746410&r1=1746409&r2=1746410&view=diff
==============================================================================
--- jackrabbit/oak/trunk/oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/memory/MemoryStore.java (original)
+++ jackrabbit/oak/trunk/oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/memory/MemoryStore.java Wed Jun 1 07:48:51 2016
@@ -18,30 +18,28 @@
*/
package org.apache.jackrabbit.oak.segment.memory;
-import static org.apache.jackrabbit.oak.plugins.memory.EmptyNodeState.EMPTY_NODE;
import static org.apache.jackrabbit.oak.segment.SegmentVersion.LATEST_VERSION;
+import static org.apache.jackrabbit.oak.segment.SegmentWriters.pooledSegmentWriter;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.util.concurrent.ConcurrentMap;
+import javax.annotation.CheckForNull;
import javax.annotation.Nonnull;
+import com.google.common.base.Suppliers;
import com.google.common.collect.Maps;
-import org.apache.jackrabbit.oak.api.Blob;
+import org.apache.jackrabbit.oak.segment.Revisions;
import org.apache.jackrabbit.oak.segment.Segment;
-import org.apache.jackrabbit.oak.segment.SegmentBufferWriterPool;
import org.apache.jackrabbit.oak.segment.SegmentId;
-import org.apache.jackrabbit.oak.segment.SegmentNodeState;
import org.apache.jackrabbit.oak.segment.SegmentNotFoundException;
import org.apache.jackrabbit.oak.segment.SegmentReader;
-import org.apache.jackrabbit.oak.segment.SegmentReaderImpl;
+import org.apache.jackrabbit.oak.segment.SegmentReaders;
import org.apache.jackrabbit.oak.segment.SegmentStore;
import org.apache.jackrabbit.oak.segment.SegmentTracker;
import org.apache.jackrabbit.oak.segment.SegmentWriter;
import org.apache.jackrabbit.oak.spi.blob.BlobStore;
-import org.apache.jackrabbit.oak.spi.state.NodeBuilder;
-import org.apache.jackrabbit.oak.spi.state.NodeState;
/**
* A store used for in-memory operations.
@@ -52,60 +50,44 @@ public class MemoryStore implements Segm
private final SegmentTracker tracker = new SegmentTracker(this);
@Nonnull
- private final SegmentWriter segmentWriter = new SegmentWriter(this,
- new SegmentBufferWriterPool(this, LATEST_VERSION, "sys"));
+ private final MemoryStoreRevisions revisions;
@Nonnull
- private final SegmentReader segmentReader = new SegmentReaderImpl(this, 16);
+ private final SegmentReader segmentReader;
- private SegmentNodeState head;
+ @Nonnull
+ private final SegmentWriter segmentWriter;
private final ConcurrentMap<SegmentId, Segment> segments =
Maps.newConcurrentMap();
- public MemoryStore(NodeState root) throws IOException {
- NodeBuilder builder = EMPTY_NODE.builder();
- builder.setChildNode("root", root);
-
- this.head = segmentWriter.writeNode(builder.getNodeState());
- segmentWriter.flush();
- }
-
public MemoryStore() throws IOException {
- this(EMPTY_NODE);
+ this.revisions = new MemoryStoreRevisions();
+ this.segmentReader = SegmentReaders.segmentReader(this, 16);
+ this.segmentWriter = pooledSegmentWriter(this,
+ LATEST_VERSION, "sys", Suppliers.ofInstance(0));
+ revisions.bind(this);
+ segmentWriter.flush();
}
- @Override
@Nonnull
public SegmentTracker getTracker() {
return tracker;
}
- @Override
@Nonnull
public SegmentWriter getWriter() {
return segmentWriter;
}
- @Override
@Nonnull
public SegmentReader getReader() {
return segmentReader;
}
- @Override
- public synchronized SegmentNodeState getHead() {
- return head;
- }
-
- @Override
- public synchronized boolean setHead(SegmentNodeState base, SegmentNodeState head) {
- if (this.head.getRecordId().equals(base.getRecordId())) {
- this.head = head;
- return true;
- } else {
- return false;
- }
+ @Nonnull
+ public Revisions getRevisions() {
+ return revisions;
}
@Override
@@ -128,27 +110,20 @@ public class MemoryStore implements Segm
ByteBuffer buffer = ByteBuffer.allocate(length);
buffer.put(data, offset, length);
buffer.rewind();
- Segment segment = new Segment(this, id, buffer);
+ Segment segment = new Segment(tracker, segmentReader, id, buffer);
if (segments.putIfAbsent(id, segment) != null) {
throw new IOException("Segment override: " + id);
}
}
- @Override
- public void close() {
- }
-
- @Override
- public Blob readBlob(String reference) {
- return null;
- }
-
- @Override
+ /**
+ * @return {@code null}
+ */
+ @CheckForNull
public BlobStore getBlobStore() {
return null;
}
- @Override
public void gc() {
System.gc();
segments.keySet().retainAll(tracker.getReferencedSegmentIds());
Added: jackrabbit/oak/trunk/oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/memory/MemoryStoreRevisions.java
URL: http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/memory/MemoryStoreRevisions.java?rev=1746410&view=auto
==============================================================================
--- jackrabbit/oak/trunk/oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/memory/MemoryStoreRevisions.java (added)
+++ jackrabbit/oak/trunk/oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/memory/MemoryStoreRevisions.java Wed Jun 1 07:48:51 2016
@@ -0,0 +1,76 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.jackrabbit.oak.segment.memory;
+
+import static com.google.common.base.Preconditions.checkState;
+import static org.apache.jackrabbit.oak.plugins.memory.EmptyNodeState.EMPTY_NODE;
+
+import java.io.IOException;
+
+import javax.annotation.Nonnull;
+
+import com.google.common.base.Function;
+import org.apache.jackrabbit.oak.segment.RecordId;
+import org.apache.jackrabbit.oak.segment.Revisions;
+import org.apache.jackrabbit.oak.spi.state.NodeBuilder;
+
+public class MemoryStoreRevisions implements Revisions {
+ private RecordId head;
+
+ public void bind(MemoryStore store) throws IOException {
+ if (head == null) {
+ NodeBuilder builder = EMPTY_NODE.builder();
+ builder.setChildNode("root", EMPTY_NODE);
+ head = store.getWriter().writeNode(builder.getNodeState()).getRecordId();
+ store.getWriter().flush();
+ }
+ }
+
+ private void checkBound() {
+ checkState(head != null, "Revisions not bound to a store");
+ }
+
+ @Nonnull
+ @Override
+ public synchronized RecordId getHead() {
+ checkBound();
+ return head;
+ }
+
+ @Override
+ public synchronized boolean setHead(
+ @Nonnull RecordId base, @Nonnull RecordId head,
+ @Nonnull Option... options) {
+ checkBound();
+ if (this.head.equals(base)) {
+ this.head = head;
+ return true;
+ } else {
+ return false;
+ }
+ }
+
+ @Override
+ public boolean setHead(
+ @Nonnull Function<RecordId, RecordId> newHead,
+ @Nonnull Option... options) throws InterruptedException {
+ throw new UnsupportedOperationException();
+ }
+}