You are viewing a plain text version of this content. The canonical link for it is here.
Posted to oak-commits@jackrabbit.apache.org by mr...@apache.org on 2016/12/14 08:48:03 UTC
svn commit: r1774141 - in /jackrabbit/oak/trunk/oak-lucene/src:
main/java/org/apache/jackrabbit/oak/plugins/index/lucene/OakDirectory.java
test/java/org/apache/jackrabbit/oak/plugins/index/lucene/OakDirectoryTest.java
Author: mreutegg
Date: Wed Dec 14 08:48:03 2016
New Revision: 1774141
URL: http://svn.apache.org/viewvc?rev=1774141&view=rev
Log:
OAK-5299: Introduce BlobFactory in OakDirectory
Modified:
jackrabbit/oak/trunk/oak-lucene/src/main/java/org/apache/jackrabbit/oak/plugins/index/lucene/OakDirectory.java
jackrabbit/oak/trunk/oak-lucene/src/test/java/org/apache/jackrabbit/oak/plugins/index/lucene/OakDirectoryTest.java
Modified: jackrabbit/oak/trunk/oak-lucene/src/main/java/org/apache/jackrabbit/oak/plugins/index/lucene/OakDirectory.java
URL: http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-lucene/src/main/java/org/apache/jackrabbit/oak/plugins/index/lucene/OakDirectory.java?rev=1774141&r1=1774140&r2=1774141&view=diff
==============================================================================
--- jackrabbit/oak/trunk/oak-lucene/src/main/java/org/apache/jackrabbit/oak/plugins/index/lucene/OakDirectory.java (original)
+++ jackrabbit/oak/trunk/oak-lucene/src/main/java/org/apache/jackrabbit/oak/plugins/index/lucene/OakDirectory.java Wed Dec 14 08:48:03 2016
@@ -28,6 +28,7 @@ import java.util.Iterator;
import java.util.List;
import java.util.Set;
+import javax.annotation.Nonnull;
import javax.annotation.Nullable;
import com.google.common.collect.ImmutableSet;
@@ -40,6 +41,7 @@ import org.apache.jackrabbit.oak.api.Typ
import org.apache.jackrabbit.oak.commons.StringUtils;
import org.apache.jackrabbit.oak.plugins.blob.BlobStoreBlob;
import org.apache.jackrabbit.oak.spi.blob.BlobOptions;
+import org.apache.jackrabbit.oak.spi.blob.BlobStore;
import org.apache.jackrabbit.oak.spi.blob.GarbageCollectableBlobStore;
import org.apache.jackrabbit.oak.spi.state.NodeBuilder;
import org.apache.jackrabbit.oak.util.PerfLogger;
@@ -92,20 +94,25 @@ public class OakDirectory extends Direct
private final Set<String> fileNamesAtStart;
private final boolean activeDeleteEnabled;
private final String indexName;
- @Nullable
- private final GarbageCollectableBlobStore blobStore;
+ private final BlobFactory blobFactory;
private volatile boolean dirty;
public OakDirectory(NodeBuilder builder, IndexDefinition definition, boolean readOnly) {
- this(builder, INDEX_DATA_CHILD_NAME, definition, readOnly, null);
+ this(builder, INDEX_DATA_CHILD_NAME, definition, readOnly);
}
public OakDirectory(NodeBuilder builder, String dataNodeName, IndexDefinition definition, boolean readOnly) {
- this(builder, dataNodeName, definition, readOnly, null);
+ this(builder, dataNodeName, definition, readOnly, new NodeBuilderBlobFactory(builder));
}
public OakDirectory(NodeBuilder builder, String dataNodeName, IndexDefinition definition,
- boolean readOnly, @Nullable GarbageCollectableBlobStore blobStore) {
+ boolean readOnly, @Nullable GarbageCollectableBlobStore blobStore) {
+ this(builder, dataNodeName, definition, readOnly,
+ blobStore != null ? new BlobStoreBlobFactory(blobStore) : new NodeBuilderBlobFactory(builder));
+ }
+
+ public OakDirectory(NodeBuilder builder, String dataNodeName, IndexDefinition definition,
+ boolean readOnly, BlobFactory blobFactory) {
this.lockFactory = NoLockFactory.getNoLockFactory();
this.builder = builder;
this.directoryBuilder = readOnly ? builder.getChildNode(dataNodeName) : builder.child(dataNodeName);
@@ -115,7 +122,7 @@ public class OakDirectory extends Direct
this.fileNamesAtStart = ImmutableSet.copyOf(this.fileNames);
this.activeDeleteEnabled = definition.getActiveDeleteEnabled();
this.indexName = definition.getIndexName();
- this.blobStore = blobStore;
+ this.blobFactory = blobFactory;
}
@Override
@@ -161,7 +168,7 @@ public class OakDirectory extends Direct
@Override
public long fileLength(String name) throws IOException {
NodeBuilder file = directoryBuilder.getChildNode(name);
- OakIndexInput input = new OakIndexInput(name, file, indexName, blobStore);
+ OakIndexInput input = new OakIndexInput(name, file, indexName, blobFactory);
try {
return input.length();
} finally {
@@ -186,7 +193,7 @@ public class OakDirectory extends Direct
}
fileNames.add(name);
markDirty();
- return new OakIndexOutput(name, file, indexName, blobStore);
+ return new OakIndexOutput(name, file, indexName, blobFactory);
}
@@ -195,7 +202,7 @@ public class OakDirectory extends Direct
throws IOException {
NodeBuilder file = directoryBuilder.getChildNode(name);
if (file.exists()) {
- return new OakIndexInput(name, file, indexName, blobStore);
+ return new OakIndexInput(name, file, indexName, blobFactory);
} else {
String msg = String.format("[%s] %s", indexName, name);
throw new FileNotFoundException(msg);
@@ -340,17 +347,17 @@ public class OakDirectory extends Direct
private final String dirDetails;
- private final GarbageCollectableBlobStore blobStore;
+ private final BlobFactory blobFactory;
public OakIndexFile(String name, NodeBuilder file, String dirDetails,
- @Nullable GarbageCollectableBlobStore blobStore) {
+ @Nonnull BlobFactory blobFactory) {
this.name = name;
this.file = file;
this.dirDetails = dirDetails;
this.blobSize = determineBlobSize(file);
this.uniqueKey = readUniqueKey(file);
this.blob = new byte[blobSize];
- this.blobStore = blobStore;
+ this.blobFactory = checkNotNull(blobFactory);
PropertyState property = file.getProperty(JCR_DATA);
if (property != null && property.getType() == BINARIES) {
@@ -381,7 +388,7 @@ public class OakDirectory extends Direct
this.length = that.length;
this.data = newArrayList(that.data);
this.dataModified = that.dataModified;
- this.blobStore = that.blobStore;
+ this.blobFactory = that.blobFactory;
}
private void loadBlob(int i) throws IOException {
@@ -410,7 +417,7 @@ public class OakDirectory extends Direct
new ByteArrayInputStream(uniqueKey));
}
- Blob b = writeBlob(in);
+ Blob b = blobFactory.createBlob(in);
if (index < data.size()) {
data.set(index, b);
} else {
@@ -422,25 +429,6 @@ public class OakDirectory extends Direct
}
}
- /**
- * Writes the blob to the blobstore directly if available.
- *
- * @param in input stream
- * @return
- * @throws IOException
- */
- private Blob writeBlob(InputStream in) throws IOException {
- if (blobStore != null) {
- if (!ENABLE_AYNC_DS) {
- return new BlobStoreBlob(blobStore,
- blobStore.writeBlob(in, new BlobOptions().setUpload(SYNCHRONOUS)));
- } else {
- return new BlobStoreBlob(blobStore, blobStore.writeBlob(in));
- }
- }
- return file.createBlob(in);
- }
-
public void seek(long pos) throws IOException {
// seek() may be called with pos == length
// see https://issues.apache.org/jira/browse/LUCENE-1196
@@ -558,10 +546,10 @@ public class OakDirectory extends Direct
private final String dirDetails;
public OakIndexInput(String name, NodeBuilder file, String dirDetails,
- @Nullable GarbageCollectableBlobStore blobStore) {
+ BlobFactory blobFactory) {
super(name);
this.dirDetails = dirDetails;
- this.file = new OakIndexFile(name, file, dirDetails, blobStore);
+ this.file = new OakIndexFile(name, file, dirDetails, blobFactory);
clones = WeakIdentityMap.newConcurrentHashMap();
}
@@ -641,9 +629,10 @@ public class OakDirectory extends Direct
private final String dirDetails;
private final OakIndexFile file;
- public OakIndexOutput(String name, NodeBuilder file, String dirDetails, GarbageCollectableBlobStore blobStore) throws IOException {
+ public OakIndexOutput(String name, NodeBuilder file, String dirDetails,
+ BlobFactory blobFactory) throws IOException {
this.dirDetails = dirDetails;
- this.file = new OakIndexFile(name, file, dirDetails, blobStore);
+ this.file = new OakIndexFile(name, file, dirDetails, blobFactory);
}
@Override
@@ -699,4 +688,42 @@ public class OakDirectory extends Direct
}
+ public interface BlobFactory {
+
+ Blob createBlob(InputStream in) throws IOException;
+ }
+
+ public static final class NodeBuilderBlobFactory implements BlobFactory {
+
+ private final NodeBuilder builder;
+
+ public NodeBuilderBlobFactory(NodeBuilder builder) {
+ this.builder = builder;
+ }
+
+ @Override
+ public Blob createBlob(InputStream in) throws IOException {
+ return builder.createBlob(in);
+ }
+ }
+
+ public static final class BlobStoreBlobFactory implements BlobFactory {
+
+ private final BlobStore store;
+
+ public BlobStoreBlobFactory(BlobStore store) {
+ this.store = store;
+ }
+
+ @Override
+ public Blob createBlob(InputStream in) throws IOException {
+ String blobId;
+ if (!ENABLE_AYNC_DS) {
+ blobId = store.writeBlob(in, new BlobOptions().setUpload(SYNCHRONOUS));
+ } else {
+ blobId = store.writeBlob(in);
+ }
+ return new BlobStoreBlob(store, blobId);
+ }
+ }
}
Modified: jackrabbit/oak/trunk/oak-lucene/src/test/java/org/apache/jackrabbit/oak/plugins/index/lucene/OakDirectoryTest.java
URL: http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-lucene/src/test/java/org/apache/jackrabbit/oak/plugins/index/lucene/OakDirectoryTest.java?rev=1774141&r1=1774140&r2=1774141&view=diff
==============================================================================
--- jackrabbit/oak/trunk/oak-lucene/src/test/java/org/apache/jackrabbit/oak/plugins/index/lucene/OakDirectoryTest.java (original)
+++ jackrabbit/oak/trunk/oak-lucene/src/test/java/org/apache/jackrabbit/oak/plugins/index/lucene/OakDirectoryTest.java Wed Dec 14 08:48:03 2016
@@ -27,6 +27,7 @@ import static org.apache.jackrabbit.JcrC
import static org.apache.jackrabbit.oak.api.Type.BINARIES;
import static org.apache.jackrabbit.oak.plugins.index.lucene.LuceneIndexConstants.INDEX_DATA_CHILD_NAME;
import static org.apache.jackrabbit.oak.plugins.index.lucene.OakDirectory.PROP_BLOB_SIZE;
+import static org.apache.jackrabbit.oak.plugins.index.lucene.OakDirectory.UNIQUE_KEY_SIZE;
import static org.apache.jackrabbit.oak.plugins.nodetype.write.InitialContent.INITIAL_CONTENT;
import static org.hamcrest.CoreMatchers.containsString;
import static org.junit.Assert.assertEquals;
@@ -36,6 +37,7 @@ import static org.junit.Assert.assertTru
import static org.junit.Assert.fail;
import java.io.ByteArrayInputStream;
+import java.io.ByteArrayOutputStream;
import java.io.File;
import java.io.IOException;
import java.io.InputStream;
@@ -44,12 +46,12 @@ import java.util.Arrays;
import java.util.List;
import java.util.Random;
import java.util.Set;
+import java.util.concurrent.atomic.AtomicInteger;
import org.apache.commons.io.IOUtils;
import org.apache.commons.io.input.NullInputStream;
import org.apache.jackrabbit.oak.api.Blob;
import org.apache.jackrabbit.oak.api.Type;
-import org.apache.jackrabbit.oak.plugins.index.IndexConstants;
import org.apache.jackrabbit.oak.plugins.memory.ArrayBasedBlob;
import org.apache.jackrabbit.oak.plugins.memory.EmptyNodeState;
import org.apache.jackrabbit.oak.plugins.memory.PropertyStates;
@@ -188,7 +190,7 @@ public class OakDirectoryTest {
assertEquals(blobSize, testNode.getProperty(PROP_BLOB_SIZE).getValue(Type.LONG).longValue());
List<Blob> blobs = newArrayList(testNode.getProperty(JCR_DATA).getValue(BINARIES));
- assertEquals(blobSize + OakDirectory.UNIQUE_KEY_SIZE, blobs.get(0).length());
+ assertEquals(blobSize + UNIQUE_KEY_SIZE, blobs.get(0).length());
return data;
}
@@ -456,6 +458,29 @@ public class OakDirectoryTest {
dir.close();
}
+ @Test
+ public void blobFactory() throws Exception {
+ final AtomicInteger numBlobs = new AtomicInteger();
+ final int fileSize = 1024;
+ IndexDefinition def = new IndexDefinition(root, builder.getNodeState(), "/foo");
+ OakDirectory.BlobFactory factory = new OakDirectory.BlobFactory() {
+ @Override
+ public Blob createBlob(InputStream in) throws IOException {
+ ByteArrayOutputStream out = new ByteArrayOutputStream();
+ IOUtils.copy(in, out);
+ byte[] data = out.toByteArray();
+ assertEquals(fileSize + UNIQUE_KEY_SIZE, data.length);
+ numBlobs.incrementAndGet();
+ return new ArrayBasedBlob(data);
+ }
+ };
+ OakDirectory dir = new OakDirectory(builder, INDEX_DATA_CHILD_NAME, def, false, factory);
+ numBlobs.set(0);
+ writeFile(dir, "file", fileSize);
+ assertEquals(1, numBlobs.get());
+ dir.close();
+ }
+
private static void readInputToEnd(long expectedSize, IndexInput input) throws IOException {
int COPY_BUFFER_SIZE = 16384;
byte[] copyBuffer = new byte[(int) ONE_MB];