You are viewing a plain text version of this content. The canonical link for it is here.
Posted to oak-commits@jackrabbit.apache.org by md...@apache.org on 2016/04/20 12:04:38 UTC
svn commit: r1740093 - in
/jackrabbit/oak/trunk/oak-segment-next/src/main/java/org/apache/jackrabbit/oak/plugins:
backup/ segment/
Author: mduerig
Date: Wed Apr 20 10:04:38 2016
New Revision: 1740093
URL: http://svn.apache.org/viewvc?rev=1740093&view=rev
Log:
OAK-3348: Cross gc sessions might introduce references to pre-compacted segments
* Deduplicate long binaries by rewriting the list of its block ids.
* Remove deduplication logic in the compactor and rely on the one in the segment writer
* Decouple SegmentPropertyState from SegmentTemplate
Modified:
jackrabbit/oak/trunk/oak-segment-next/src/main/java/org/apache/jackrabbit/oak/plugins/backup/FileStoreBackup.java
jackrabbit/oak/trunk/oak-segment-next/src/main/java/org/apache/jackrabbit/oak/plugins/backup/FileStoreRestore.java
jackrabbit/oak/trunk/oak-segment-next/src/main/java/org/apache/jackrabbit/oak/plugins/segment/Compactor.java
jackrabbit/oak/trunk/oak-segment-next/src/main/java/org/apache/jackrabbit/oak/plugins/segment/SegmentBlob.java
jackrabbit/oak/trunk/oak-segment-next/src/main/java/org/apache/jackrabbit/oak/plugins/segment/SegmentPropertyState.java
jackrabbit/oak/trunk/oak-segment-next/src/main/java/org/apache/jackrabbit/oak/plugins/segment/SegmentStream.java
jackrabbit/oak/trunk/oak-segment-next/src/main/java/org/apache/jackrabbit/oak/plugins/segment/SegmentWriter.java
Modified: jackrabbit/oak/trunk/oak-segment-next/src/main/java/org/apache/jackrabbit/oak/plugins/backup/FileStoreBackup.java
URL: http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-segment-next/src/main/java/org/apache/jackrabbit/oak/plugins/backup/FileStoreBackup.java?rev=1740093&r1=1740092&r2=1740093&view=diff
==============================================================================
--- jackrabbit/oak/trunk/oak-segment-next/src/main/java/org/apache/jackrabbit/oak/plugins/backup/FileStoreBackup.java (original)
+++ jackrabbit/oak/trunk/oak-segment-next/src/main/java/org/apache/jackrabbit/oak/plugins/backup/FileStoreBackup.java Wed Apr 20 10:04:38 2016
@@ -56,8 +56,8 @@ public class FileStoreBackup {
try {
SegmentNodeState state = backup.getHead();
Compactor compactor = new Compactor(backup.getTracker());
- compactor.setDeepCheckLargeBinaries(true);
- compactor.setContentEqualityCheck(true);
+// compactor.setDeepCheckLargeBinaries(true);
+// compactor.setContentEqualityCheck(true);
SegmentNodeState after = compactor.compact(state, current, state);
backup.setHead(state, after);
} finally {
Modified: jackrabbit/oak/trunk/oak-segment-next/src/main/java/org/apache/jackrabbit/oak/plugins/backup/FileStoreRestore.java
URL: http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-segment-next/src/main/java/org/apache/jackrabbit/oak/plugins/backup/FileStoreRestore.java?rev=1740093&r1=1740092&r2=1740093&view=diff
==============================================================================
--- jackrabbit/oak/trunk/oak-segment-next/src/main/java/org/apache/jackrabbit/oak/plugins/backup/FileStoreRestore.java (original)
+++ jackrabbit/oak/trunk/oak-segment-next/src/main/java/org/apache/jackrabbit/oak/plugins/backup/FileStoreRestore.java Wed Apr 20 10:04:38 2016
@@ -25,7 +25,6 @@ import com.google.common.base.Stopwatch;
import org.apache.jackrabbit.oak.plugins.segment.Compactor;
import org.apache.jackrabbit.oak.plugins.segment.SegmentNodeState;
import org.apache.jackrabbit.oak.plugins.segment.file.FileStore;
-import org.apache.jackrabbit.oak.plugins.segment.file.FileStore.ReadOnlyStore;
import org.apache.jackrabbit.oak.spi.state.NodeStore;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -53,7 +52,6 @@ public class FileStoreRestore {
SegmentNodeState current = store.getHead();
try {
Compactor compactor = new Compactor(store.getTracker());
- compactor.setDeepCheckLargeBinaries(true);
SegmentNodeState after = compactor.compact(current,
restore.getHead(), current);
store.setHead(current, after);
Modified: jackrabbit/oak/trunk/oak-segment-next/src/main/java/org/apache/jackrabbit/oak/plugins/segment/Compactor.java
URL: http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-segment-next/src/main/java/org/apache/jackrabbit/oak/plugins/segment/Compactor.java?rev=1740093&r1=1740092&r2=1740093&view=diff
==============================================================================
--- jackrabbit/oak/trunk/oak-segment-next/src/main/java/org/apache/jackrabbit/oak/plugins/segment/Compactor.java (original)
+++ jackrabbit/oak/trunk/oak-segment-next/src/main/java/org/apache/jackrabbit/oak/plugins/segment/Compactor.java Wed Apr 20 10:04:38 2016
@@ -16,33 +16,15 @@
*/
package org.apache.jackrabbit.oak.plugins.segment;
-import static com.google.common.collect.Lists.newArrayList;
-import static com.google.common.collect.Maps.newHashMap;
-import static org.apache.jackrabbit.oak.api.Type.BINARIES;
-import static org.apache.jackrabbit.oak.api.Type.BINARY;
import static org.apache.jackrabbit.oak.commons.PathUtils.concat;
-import static org.apache.jackrabbit.oak.plugins.memory.EmptyNodeState.EMPTY_NODE;
import java.io.IOException;
-import java.io.InputStream;
-import java.util.ArrayList;
-import java.util.List;
-import java.util.Map;
import javax.annotation.Nonnull;
-import com.google.common.base.Predicate;
-import com.google.common.base.Predicates;
import com.google.common.base.Supplier;
import com.google.common.base.Suppliers;
-import com.google.common.hash.Hashing;
-import org.apache.jackrabbit.oak.api.Blob;
import org.apache.jackrabbit.oak.api.PropertyState;
-import org.apache.jackrabbit.oak.api.Type;
-import org.apache.jackrabbit.oak.commons.IOUtils;
-import org.apache.jackrabbit.oak.plugins.memory.BinaryPropertyState;
-import org.apache.jackrabbit.oak.plugins.memory.MultiBinaryPropertyState;
-import org.apache.jackrabbit.oak.plugins.memory.PropertyStates;
import org.apache.jackrabbit.oak.plugins.segment.compaction.CompactionStrategy;
import org.apache.jackrabbit.oak.spi.state.ApplyDiff;
import org.apache.jackrabbit.oak.spi.state.NodeBuilder;
@@ -58,55 +40,13 @@ public class Compactor {
/** Logger instance */
private static final Logger log = LoggerFactory.getLogger(Compactor.class);
- /**
- * Locks down the RecordId persistence structure
- */
- static long[] recordAsKey(RecordId r) {
- return new long[] { r.getSegmentId().getMostSignificantBits(),
- r.getSegmentId().getLeastSignificantBits(), r.getOffset() };
- }
-
private final SegmentTracker tracker;
private final SegmentWriter writer;
- /**
- * Filters nodes that will be included in the compaction map, allowing for
- * optimization in case of an offline compaction
- */
- private Predicate<NodeState> includeInMap = Predicates.alwaysTrue();
-
private final ProgressTracker progress = new ProgressTracker();
/**
- * Map from {@link #getBlobKey(Blob) blob keys} to matching compacted
- * blob record identifiers. Used to de-duplicate copies of the same
- * binary values.
- */
- private final Map<String, List<RecordId>> binaries = newHashMap();
-
- /**
- * If the compactor should copy large binaries as streams or just copy the
- * refs
- */
- private final boolean cloneBinaries;
-
- /**
- * In the case of large inlined binaries, compaction will verify if all
- * referenced segments exist in order to determine if a full clone is
- * necessary, or just a shallow copy of the RecordId list is enough
- * (Used in Backup scenario)
- */
- private boolean deepCheckLargeBinaries;
-
- /**
- * Flag to use content equality verification before actually compacting the
- * state, on the childNodeChanged diff branch
- * (Used in Backup scenario)
- */
- private boolean contentEqualityCheck;
-
- /**
* Allows the cancellation of the compaction process. If this {@code
* Supplier} returns {@code true}, this compactor will cancel compaction and
* return a partial {@code SegmentNodeState} containing the changes
@@ -118,21 +58,15 @@ public class Compactor {
this(tracker, Suppliers.ofInstance(false));
}
- public Compactor(SegmentTracker tracker, Supplier<Boolean> cancel) {
+ Compactor(SegmentTracker tracker, Supplier<Boolean> cancel) {
this.tracker = tracker;
this.writer = tracker.getWriter();
- this.cloneBinaries = false;
this.cancel = cancel;
}
- public Compactor(SegmentTracker tracker, CompactionStrategy compactionStrategy) {
- this(tracker, compactionStrategy, Suppliers.ofInstance(false));
- }
-
public Compactor(SegmentTracker tracker, CompactionStrategy compactionStrategy, Supplier<Boolean> cancel) {
this.tracker = tracker;
this.writer = createSegmentWriter(tracker);
- this.cloneBinaries = compactionStrategy.cloneBinaries();
this.cancel = cancel;
}
@@ -142,12 +76,6 @@ public class Compactor {
new SegmentBufferWriter(tracker.getStore(), tracker.getSegmentVersion(), "c", tracker.getGcGen() + 1));
}
- protected SegmentNodeBuilder process(NodeState before, NodeState after, NodeState onto) throws IOException {
- SegmentNodeBuilder builder = new SegmentNodeBuilder(writer.writeNode(onto), writer);
- new CompactDiff(builder).diff(before, after);
- return builder;
- }
-
/**
* Compact the differences between a {@code before} and a {@code after}
* on top of an {@code onto} state.
@@ -158,7 +86,9 @@ public class Compactor {
*/
public SegmentNodeState compact(NodeState before, NodeState after, NodeState onto) throws IOException {
progress.start();
- SegmentNodeState compacted = process(before, after, onto).getNodeState();
+ SegmentNodeBuilder builder = new SegmentNodeBuilder(writer.writeNode(onto), writer);
+ new CompactDiff(builder).diff(before, after);
+ SegmentNodeState compacted = builder.getNodeState();
writer.flush();
progress.stop();
return compacted;
@@ -202,36 +132,20 @@ public class Compactor {
@Override
public boolean propertyAdded(PropertyState after) {
- if (path != null) {
- log.trace("propertyAdded {}/{}", path, after.getName());
+ try {
+ progress.onProperty("propertyAdded", path, after);
+ return super.propertyAdded(writer.writeProperty(after));
+ } catch (IOException e) {
+ exception = e;
+ return false;
}
- progress.onProperty();
- return super.propertyAdded(compact(after));
}
@Override
public boolean propertyChanged(PropertyState before, PropertyState after) {
- if (path != null) {
- log.trace("propertyChanged {}/{}", path, after.getName());
- }
- progress.onProperty();
- return super.propertyChanged(before, compact(after));
- }
-
- @Override
- public boolean childNodeAdded(String name, NodeState after) {
- if (path != null) {
- log.trace("childNodeAdded {}/{}", path, name);
- }
-
- progress.onNode();
try {
- NodeBuilder child = EMPTY_NODE.builder();
- boolean success = new CompactDiff(child, path, name).diff(EMPTY_NODE, after);
- if (success) {
- builder.setChildNode(name, writer.writeNode(child.getNodeState()));
- }
- return success;
+ progress.onProperty("propertyChanged", path, after);
+ return super.propertyChanged(before, writer.writeProperty(after));
} catch (IOException e) {
exception = e;
return false;
@@ -239,121 +153,28 @@ public class Compactor {
}
@Override
- public boolean childNodeChanged(
- String name, NodeState before, NodeState after) {
- if (path != null) {
- log.trace("childNodeChanged {}/{}", path, name);
- }
-
- if (contentEqualityCheck && before.equals(after)) {
- return true;
- }
-
- progress.onNode();
+ public boolean childNodeAdded(String name, NodeState after) {
try {
- NodeBuilder child = builder.getChildNode(name);
- boolean success = new CompactDiff(child, path, name).diff(before, after);
- if (success) {
- writer.writeNode(child.getNodeState()).getRecordId();
- }
- return success;
+ progress.onNode("childNodeAdded", path, name);
+ super.childNodeAdded(name, writer.writeNode(after));
+ return true;
} catch (IOException e) {
exception = e;
return false;
}
}
- }
- private PropertyState compact(PropertyState property) {
- String name = property.getName();
- Type<?> type = property.getType();
- if (type == BINARY) {
- Blob blob = compact(property.getValue(Type.BINARY));
- return BinaryPropertyState.binaryProperty(name, blob);
- } else if (type == BINARIES) {
- List<Blob> blobs = new ArrayList<Blob>();
- for (Blob blob : property.getValue(BINARIES)) {
- blobs.add(compact(blob));
- }
- return MultiBinaryPropertyState.binaryPropertyFromBlob(name, blobs);
- } else {
- Object value = property.getValue(type);
- return PropertyStates.createProperty(name, value, type);
- }
- }
-
- /**
- * Compacts (and de-duplicates) the given blob.
- *
- * @param blob blob to be compacted
- * @return compacted blob
- */
- private Blob compact(Blob blob) {
- if (blob instanceof SegmentBlob) {
- SegmentBlob sb = (SegmentBlob) blob;
+ @Override
+ public boolean childNodeChanged(String name, NodeState before, NodeState after) {
try {
- // Check if we've already cloned this specific record
- progress.onBinary();
-
- // if the blob is inlined or external, just clone it
- if (sb.isExternal() || sb.length() < Segment.MEDIUM_LIMIT) {
- return sb.clone(writer, false);
- }
-
- // alternatively look if the exact same binary has been cloned
- String key = getBlobKey(blob);
- List<RecordId> ids = binaries.get(key);
- if (ids != null) {
- for (RecordId duplicateId : ids) {
- if (new SegmentBlob(duplicateId).equals(sb)) {
- return new SegmentBlob(duplicateId);
- }
- }
- }
-
- boolean clone = cloneBinaries;
- if (deepCheckLargeBinaries) {
- clone = clone
- || !tracker.getStore().containsSegment(
- sb.getRecordId().getSegmentId());
- if (!clone) {
- for (SegmentId bid : SegmentBlob.getBulkSegmentIds(sb)) {
- if (!tracker.getStore().containsSegment(bid)) {
- clone = true;
- break;
- }
- }
- }
- }
-
- // if not, clone the large blob and keep track of the result
- sb = sb.clone(writer, clone);
- if (ids == null) {
- ids = newArrayList();
- binaries.put(key, ids);
- }
- ids.add(sb.getRecordId());
-
- return sb;
+ progress.onNode("childNodeChanged", path, name);
+ return new CompactDiff(builder.getChildNode(name), path, name).diff(before, after);
} catch (IOException e) {
- log.warn("Failed to compact a blob", e);
- // fall through
+ exception = e;
+ return false;
}
}
- // no way to compact this blob, so we'll just keep it as-is
- return blob;
- }
-
- private static String getBlobKey(Blob blob) throws IOException {
- InputStream stream = blob.getNewStream();
- try {
- byte[] buffer = new byte[SegmentWriter.BLOCK_SIZE];
- int n = IOUtils.readFully(stream, buffer, 0, buffer.length);
- return blob.length() + ":" + Hashing.sha1().hashBytes(buffer, 0, n);
- } finally {
- stream.close();
- }
}
private static class ProgressTracker {
@@ -373,14 +194,20 @@ public class Compactor {
start = System.currentTimeMillis();
}
- void onNode() {
+ void onNode(String msg, String path, String nodeName) {
+ if (path != null) {
+ log.trace("{} {}/{}", msg, path, nodeName);
+ }
if (++nodes % logAt == 0) {
logProgress(start, false);
start = System.currentTimeMillis();
}
}
- void onProperty() {
+ void onProperty(String msg, String path, PropertyState propertyState) {
+ if (path != null) {
+ log.trace("{} {}/{}", msg, path, propertyState.getName());
+ }
properties++;
}
@@ -405,52 +232,4 @@ public class Compactor {
}
}
- private static class OfflineCompactionPredicate implements
- Predicate<NodeState> {
-
- /**
- * over 64K in size, node will be included in the compaction map
- */
- private static final long offlineThreshold = 65536;
-
- @Override
- public boolean apply(NodeState state) {
- if (state.getChildNodeCount(2) > 1) {
- return true;
- }
- long count = 0;
- for (PropertyState ps : state.getProperties()) {
- Type<?> type = ps.getType();
- for (int i = 0; i < ps.count(); i++) {
- long size = 0;
- if (type == BINARY || type == BINARIES) {
- Blob blob = ps.getValue(BINARY, i);
- if (blob instanceof SegmentBlob) {
- if (!((SegmentBlob) blob).isExternal()) {
- size += blob.length();
- }
- } else {
- size += blob.length();
- }
- } else {
- size = ps.size(i);
- }
- count += size;
- if (size >= offlineThreshold || count >= offlineThreshold) {
- return true;
- }
- }
- }
- return false;
- }
- }
-
- public void setDeepCheckLargeBinaries(boolean deepCheckLargeBinaries) {
- this.deepCheckLargeBinaries = deepCheckLargeBinaries;
- }
-
- public void setContentEqualityCheck(boolean contentEqualityCheck) {
- this.contentEqualityCheck = contentEqualityCheck;
- }
-
}
Modified: jackrabbit/oak/trunk/oak-segment-next/src/main/java/org/apache/jackrabbit/oak/plugins/segment/SegmentBlob.java
URL: http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-segment-next/src/main/java/org/apache/jackrabbit/oak/plugins/segment/SegmentBlob.java?rev=1740093&r1=1740092&r2=1740093&view=diff
==============================================================================
--- jackrabbit/oak/trunk/oak-segment-next/src/main/java/org/apache/jackrabbit/oak/plugins/segment/SegmentBlob.java (original)
+++ jackrabbit/oak/trunk/oak-segment-next/src/main/java/org/apache/jackrabbit/oak/plugins/segment/SegmentBlob.java Wed Apr 20 10:04:38 2016
@@ -23,8 +23,6 @@ import static org.apache.jackrabbit.oak.
import static org.apache.jackrabbit.oak.plugins.segment.Segment.SMALL_LIMIT;
import static org.apache.jackrabbit.oak.plugins.segment.SegmentWriter.BLOCK_SIZE;
-import java.io.BufferedInputStream;
-import java.io.IOException;
import java.io.InputStream;
import java.util.List;
import java.util.Set;
@@ -167,42 +165,6 @@ public class SegmentBlob extends Record
}
}
- public SegmentBlob clone(SegmentWriter writer, boolean cloneLargeBinaries) throws IOException {
- Segment segment = getSegment();
- int offset = getOffset();
- byte head = segment.readByte(offset);
- if ((head & 0x80) == 0x00) {
- // 0xxx xxxx: small value
- return writer.writeStream(new BufferedInputStream(getNewStream()));
- } else if ((head & 0xc0) == 0x80) {
- // 10xx xxxx: medium value
- return writer.writeStream(new BufferedInputStream(getNewStream()));
- } else if ((head & 0xe0) == 0xc0) {
- // 110x xxxx: long value
- if (cloneLargeBinaries) {
- return writer.writeStream(new BufferedInputStream(
- getNewStream()));
- } else {
- // this was the previous (default) behavior
- long length = (segment.readLong(offset) & 0x1fffffffffffffffL)
- + MEDIUM_LIMIT;
- int listSize = (int) ((length + BLOCK_SIZE - 1) / BLOCK_SIZE);
- ListRecord list = new ListRecord(
- segment.readRecordId(offset + 8), listSize);
- return writer.writeLargeBlob(length, list.getEntries());
- }
- } else if ((head & 0xf0) == 0xe0) {
- // 1110 xxxx: external value, short blob ID
- return writer.writeExternalBlob(getBlobId());
- } else if ((head & 0xf8) == 0xf0) {
- // 1111 0xxx: external value, long blob ID
- return writer.writeExternalBlob(getBlobId());
- } else {
- throw new IllegalStateException(String.format(
- "Unexpected value record type: %02x", head & 0xff));
- }
- }
-
//------------------------------------------------------------< Object >--
@Override
Modified: jackrabbit/oak/trunk/oak-segment-next/src/main/java/org/apache/jackrabbit/oak/plugins/segment/SegmentPropertyState.java
URL: http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-segment-next/src/main/java/org/apache/jackrabbit/oak/plugins/segment/SegmentPropertyState.java?rev=1740093&r1=1740092&r2=1740093&view=diff
==============================================================================
--- jackrabbit/oak/trunk/oak-segment-next/src/main/java/org/apache/jackrabbit/oak/plugins/segment/SegmentPropertyState.java (original)
+++ jackrabbit/oak/trunk/oak-segment-next/src/main/java/org/apache/jackrabbit/oak/plugins/segment/SegmentPropertyState.java Wed Apr 20 10:04:38 2016
@@ -60,11 +60,17 @@ import org.apache.jackrabbit.oak.plugins
*/
public class SegmentPropertyState extends Record implements PropertyState {
- private final PropertyTemplate template;
+ private final String name;
+ private final Type<?> type;
- public SegmentPropertyState(RecordId id, PropertyTemplate template) {
+ SegmentPropertyState(RecordId id, String name, Type<?> type) {
super(id);
- this.template = checkNotNull(template);
+ this.name = checkNotNull(name);
+ this.type = checkNotNull(type);
+ }
+
+ SegmentPropertyState(RecordId id, PropertyTemplate template) {
+ this(id, template.getName(), template.getType());
}
private ListRecord getValueList(Segment segment) {
@@ -99,12 +105,12 @@ public class SegmentPropertyState extend
@Override @Nonnull
public String getName() {
- return template.getName();
+ return name;
}
@Override
public Type<?> getType() {
- return template.getType();
+ return type;
}
@Override
@@ -219,9 +225,10 @@ public class SegmentPropertyState extend
// optimize for common cases
if (this == object) { // don't use fastEquals here due to value sharing
return true;
- } else if (object instanceof SegmentPropertyState) {
+ }
+ if (object instanceof SegmentPropertyState) {
SegmentPropertyState that = (SegmentPropertyState) object;
- if (!template.equals(that.template)) {
+ if (!type.equals(that.type) || !name.equals(that.name)) {
return false;
} else if (getRecordId().equals(that.getRecordId())) {
return true;
Modified: jackrabbit/oak/trunk/oak-segment-next/src/main/java/org/apache/jackrabbit/oak/plugins/segment/SegmentStream.java
URL: http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-segment-next/src/main/java/org/apache/jackrabbit/oak/plugins/segment/SegmentStream.java?rev=1740093&r1=1740092&r2=1740093&view=diff
==============================================================================
--- jackrabbit/oak/trunk/oak-segment-next/src/main/java/org/apache/jackrabbit/oak/plugins/segment/SegmentStream.java (original)
+++ jackrabbit/oak/trunk/oak-segment-next/src/main/java/org/apache/jackrabbit/oak/plugins/segment/SegmentStream.java Wed Apr 20 10:04:38 2016
@@ -76,6 +76,14 @@ public class SegmentStream extends Input
this.length = inline.length;
}
+ List<RecordId> getBlockIds() {
+ if (blocks == null) {
+ return null;
+ } else {
+ return blocks.getEntries();
+ }
+ }
+
public long getLength() {
return length;
}
Modified: jackrabbit/oak/trunk/oak-segment-next/src/main/java/org/apache/jackrabbit/oak/plugins/segment/SegmentWriter.java
URL: http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-segment-next/src/main/java/org/apache/jackrabbit/oak/plugins/segment/SegmentWriter.java?rev=1740093&r1=1740092&r2=1740093&view=diff
==============================================================================
--- jackrabbit/oak/trunk/oak-segment-next/src/main/java/org/apache/jackrabbit/oak/plugins/segment/SegmentWriter.java (original)
+++ jackrabbit/oak/trunk/oak-segment-next/src/main/java/org/apache/jackrabbit/oak/plugins/segment/SegmentWriter.java Wed Apr 20 10:04:38 2016
@@ -112,6 +112,7 @@ public class SegmentWriter {
TPL_RECORDS_CACHE_SIZE <= 0 ? 0 : (int) (TPL_RECORDS_CACHE_SIZE * 1.2),
TPL_RECORDS_CACHE_SIZE, TPL_RECORDS_CACHE_SIZE <= 0);
+
private final SegmentStore store;
/**
@@ -190,26 +191,6 @@ public class SegmentWriter {
});
}
- SegmentBlob writeExternalBlob(final String blobId) throws IOException {
- return new SegmentBlob(
- writeOperationHandler.execute(new SegmentWriteOperation() {
- @Override
- public RecordId execute(SegmentBufferWriter writer) throws IOException {
- return with(writer).writeExternalBlob(blobId);
- }
- }));
- }
-
- SegmentBlob writeLargeBlob(final long length, final List<RecordId> list) throws IOException {
- return new SegmentBlob(
- writeOperationHandler.execute(new SegmentWriteOperation() {
- @Override
- public RecordId execute(SegmentBufferWriter writer) throws IOException {
- return with(writer).writeLargeBlob(length, list);
- }
- }));
- }
-
/**
* Writes a stream value record. The given stream is consumed <em>and closed</em> by
* this method.
@@ -228,6 +209,16 @@ public class SegmentWriter {
}));
}
+ SegmentPropertyState writeProperty(final PropertyState state) throws IOException {
+ RecordId id = writeOperationHandler.execute(new SegmentWriteOperation() {
+ @Override
+ public RecordId execute(SegmentBufferWriter writer) throws IOException {
+ return with(writer).writeProperty(state);
+ }
+ });
+ return new SegmentPropertyState(id, state.getName(), state.getType());
+ }
+
public SegmentNodeState writeNode(final NodeState state) throws IOException {
return new SegmentNodeState(
writeOperationHandler.execute(new SegmentWriteOperation() {
@@ -563,14 +554,6 @@ public class SegmentWriter {
return newBlockWriter(bytes, offset, length).write(writer);
}
- private RecordId writeExternalBlob(String blobId) throws IOException {
- return writeBlobId(blobId);
- }
-
- private RecordId writeLargeBlob(long length, List<RecordId> list) throws IOException {
- return writeValueRecord(length, writeList(list));
- }
-
private RecordId writeStream(InputStream stream) throws IOException {
boolean threw = true;
try {
@@ -586,15 +569,23 @@ public class SegmentWriter {
}
private RecordId internalWriteStream(InputStream stream) throws IOException {
- BlobStore blobStore = store.getBlobStore();
- byte[] data = new byte[Segment.MEDIUM_LIMIT];
- int n = read(stream, data, 0, data.length);
+ if (stream instanceof SegmentStream) {
+ SegmentStream segmentStream = (SegmentStream) stream;
+ List<RecordId> blockIds = segmentStream.getBlockIds();
+ if (blockIds != null) {
+ return writeValueRecord(segmentStream.getLength(), writeList(blockIds));
+ }
+ }
// Special case for short binaries (up to about 16kB):
// store them directly as small- or medium-sized value records
+ byte[] data = new byte[Segment.MEDIUM_LIMIT];
+ int n = read(stream, data, 0, data.length);
if (n < Segment.MEDIUM_LIMIT) {
return writeValueRecord(n, data);
}
+
+ BlobStore blobStore = store.getBlobStore();
if (blobStore != null) {
String blobId = blobStore.writeBlob(new SequenceInputStream(
new ByteArrayInputStream(data, 0, n), stream));