You are viewing a plain text version of this content. The canonical link for it is here.
Posted to oak-commits@jackrabbit.apache.org by th...@apache.org on 2015/09/17 12:24:11 UTC
svn commit: r1703555 - in /jackrabbit/oak/trunk: oak-blob/
oak-blob/src/main/java/org/apache/jackrabbit/oak/spi/blob/
oak-blob/src/main/java/org/apache/jackrabbit/oak/spi/blob/osgi/
oak-blob/src/main/java/org/apache/jackrabbit/oak/spi/blob/split/ oak-b...
Author: thomasm
Date: Thu Sep 17 10:24:07 2015
New Revision: 1703555
URL: http://svn.apache.org/r1703555
Log:
OAK-3148 Online migration process for the binaries
Added:
jackrabbit/oak/trunk/oak-blob/src/main/java/org/apache/jackrabbit/oak/spi/blob/BlobStoreWrapper.java
jackrabbit/oak/trunk/oak-blob/src/main/java/org/apache/jackrabbit/oak/spi/blob/osgi/SplitBlobStoreService.java
jackrabbit/oak/trunk/oak-blob/src/main/java/org/apache/jackrabbit/oak/spi/blob/split/
jackrabbit/oak/trunk/oak-blob/src/main/java/org/apache/jackrabbit/oak/spi/blob/split/BlobIdSet.java
jackrabbit/oak/trunk/oak-blob/src/main/java/org/apache/jackrabbit/oak/spi/blob/split/DefaultSplitBlobStore.java
jackrabbit/oak/trunk/oak-blob/src/main/java/org/apache/jackrabbit/oak/spi/blob/split/SplitBlobStore.java
jackrabbit/oak/trunk/oak-blob/src/main/java/org/apache/jackrabbit/oak/spi/blob/split/WrappingSplitBlobStore.java
jackrabbit/oak/trunk/oak-blob/src/test/java/org/apache/jackrabbit/oak/spi/blob/split/
jackrabbit/oak/trunk/oak-blob/src/test/java/org/apache/jackrabbit/oak/spi/blob/split/SplitBlobStoreTest.java
jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/blob/migration/
jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/blob/migration/BlobMigration.java
jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/blob/migration/BlobMigrationMBean.java
jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/blob/migration/BlobMigrator.java
jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/blob/migration/DepthFirstNodeIterator.java
jackrabbit/oak/trunk/oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/blob/migration/
jackrabbit/oak/trunk/oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/blob/migration/AbstractMigratorTest.java
jackrabbit/oak/trunk/oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/blob/migration/DepthFirstNodeIteratorTest.java
jackrabbit/oak/trunk/oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/blob/migration/DocumentToExternalMigrationTest.java
jackrabbit/oak/trunk/oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/blob/migration/ExternalToExternalMigrationTest.java
jackrabbit/oak/trunk/oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/blob/migration/SegmentToExternalMigrationTest.java
Modified:
jackrabbit/oak/trunk/oak-blob/pom.xml
jackrabbit/oak/trunk/oak-blob/src/main/java/org/apache/jackrabbit/oak/spi/blob/osgi/FileBlobStoreService.java
jackrabbit/oak/trunk/oak-blob/src/main/java/org/apache/jackrabbit/oak/spi/blob/package-info.java
jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/blob/datastore/AbstractDataStoreService.java
jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/DocumentNodeStoreService.java
jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/segment/SegmentNodeStoreService.java
Modified: jackrabbit/oak/trunk/oak-blob/pom.xml
URL: http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-blob/pom.xml?rev=1703555&r1=1703554&r2=1703555&view=diff
==============================================================================
--- jackrabbit/oak/trunk/oak-blob/pom.xml (original)
+++ jackrabbit/oak/trunk/oak-blob/pom.xml Thu Sep 17 10:24:07 2015
@@ -45,7 +45,8 @@
*
</Import-Package>
<Export-Package>
- org.apache.jackrabbit.oak.spi.blob
+ org.apache.jackrabbit.oak.spi.blob,
+ org.apache.jackrabbit.oak.spi.blob.split
</Export-Package>
</instructions>
</configuration>
Added: jackrabbit/oak/trunk/oak-blob/src/main/java/org/apache/jackrabbit/oak/spi/blob/BlobStoreWrapper.java
URL: http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-blob/src/main/java/org/apache/jackrabbit/oak/spi/blob/BlobStoreWrapper.java?rev=1703555&view=auto
==============================================================================
--- jackrabbit/oak/trunk/oak-blob/src/main/java/org/apache/jackrabbit/oak/spi/blob/BlobStoreWrapper.java (added)
+++ jackrabbit/oak/trunk/oak-blob/src/main/java/org/apache/jackrabbit/oak/spi/blob/BlobStoreWrapper.java Thu Sep 17 10:24:07 2015
@@ -0,0 +1,26 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.jackrabbit.oak.spi.blob;
+
+public interface BlobStoreWrapper extends BlobStore {
+
+ void setBlobStore(BlobStore blobStore);
+
+}
\ No newline at end of file
Modified: jackrabbit/oak/trunk/oak-blob/src/main/java/org/apache/jackrabbit/oak/spi/blob/osgi/FileBlobStoreService.java
URL: http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-blob/src/main/java/org/apache/jackrabbit/oak/spi/blob/osgi/FileBlobStoreService.java?rev=1703555&r1=1703554&r2=1703555&view=diff
==============================================================================
--- jackrabbit/oak/trunk/oak-blob/src/main/java/org/apache/jackrabbit/oak/spi/blob/osgi/FileBlobStoreService.java (original)
+++ jackrabbit/oak/trunk/oak-blob/src/main/java/org/apache/jackrabbit/oak/spi/blob/osgi/FileBlobStoreService.java Thu Sep 17 10:24:07 2015
@@ -19,7 +19,10 @@
package org.apache.jackrabbit.oak.spi.blob.osgi;
+import static org.apache.jackrabbit.oak.spi.blob.osgi.SplitBlobStoreService.PROP_SPLIT_BLOBSTORE;
+import java.util.Dictionary;
+import java.util.Hashtable;
import java.util.Map;
import org.apache.commons.io.FilenameUtils;
@@ -54,10 +57,14 @@ public class FileBlobStoreService {
}
BlobStore blobStore = new FileBlobStore(FilenameUtils.concat(homeDir,"datastore"));
PropertiesUtil.populate(blobStore, config, false);
+ Dictionary<String, Object> props = new Hashtable<String, Object>();
+ if (context.getProperties().get(PROP_SPLIT_BLOBSTORE) != null) {
+ props.put(PROP_SPLIT_BLOBSTORE, context.getProperties().get(PROP_SPLIT_BLOBSTORE));
+ }
reg = context.getBundleContext().registerService(new String[]{
BlobStore.class.getName(),
GarbageCollectableBlobStore.class.getName()
- }, blobStore, null);
+ }, blobStore, props);
}
@Deactivate
Added: jackrabbit/oak/trunk/oak-blob/src/main/java/org/apache/jackrabbit/oak/spi/blob/osgi/SplitBlobStoreService.java
URL: http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-blob/src/main/java/org/apache/jackrabbit/oak/spi/blob/osgi/SplitBlobStoreService.java?rev=1703555&view=auto
==============================================================================
--- jackrabbit/oak/trunk/oak-blob/src/main/java/org/apache/jackrabbit/oak/spi/blob/osgi/SplitBlobStoreService.java (added)
+++ jackrabbit/oak/trunk/oak-blob/src/main/java/org/apache/jackrabbit/oak/spi/blob/osgi/SplitBlobStoreService.java Thu Sep 17 10:24:07 2015
@@ -0,0 +1,176 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.jackrabbit.oak.spi.blob.osgi;
+
+import java.util.Dictionary;
+import java.util.Hashtable;
+import java.util.Map;
+
+import org.apache.felix.scr.annotations.Activate;
+import org.apache.felix.scr.annotations.Component;
+import org.apache.felix.scr.annotations.ConfigurationPolicy;
+import org.apache.felix.scr.annotations.Deactivate;
+import org.apache.felix.scr.annotations.Property;
+import org.apache.felix.scr.annotations.PropertyOption;
+import org.apache.felix.scr.annotations.Reference;
+import org.apache.felix.scr.annotations.ReferenceCardinality;
+import org.apache.felix.scr.annotations.ReferencePolicy;
+import org.apache.jackrabbit.oak.spi.blob.BlobStore;
+import org.apache.jackrabbit.oak.spi.blob.split.DefaultSplitBlobStore;
+import org.apache.jackrabbit.oak.spi.blob.split.WrappingSplitBlobStore;
+import org.osgi.framework.BundleContext;
+import org.osgi.framework.InvalidSyntaxException;
+import org.osgi.framework.ServiceRegistration;
+import org.osgi.service.component.ComponentContext;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import static org.apache.jackrabbit.oak.spi.blob.osgi.SplitBlobStoreService.BlobStoreType.*;
+
+@Component(policy = ConfigurationPolicy.REQUIRE)
+public class SplitBlobStoreService {
+ private static final Logger log = LoggerFactory.getLogger(SplitBlobStoreService.class);
+
+ @Property
+ private static final String PROP_HOME = "repository.home";
+
+ @Property(options = { @PropertyOption(name = "External", value = "EXTERNAL"),
+ @PropertyOption(name = "Internal - Segment", value = "SEGMENT"),
+ @PropertyOption(name = "Internal - Document", value = "DOCUMENT") })
+ private static final String PROP_OLD_BLOB_STORE_TYPE = "split.old.blobstore.type";
+
+ public static final String PROP_SPLIT_BLOBSTORE = "split.blobstore";
+
+ public static final String ONLY_STANDALONE_TARGET = "(&(!(split.blobstore=old))(!(split.blobstore=new)))";
+
+ @Reference(cardinality = ReferenceCardinality.OPTIONAL_UNARY, policy = ReferencePolicy.DYNAMIC, target = "(split.blobstore=old)")
+ private BlobStore oldBlobStore;
+
+ @Reference(cardinality = ReferenceCardinality.OPTIONAL_UNARY, policy = ReferencePolicy.DYNAMIC, target = "(split.blobstore=new)")
+ private BlobStore newBlobStore;
+
+ private BundleContext ctx;
+
+ private ServiceRegistration reg;
+
+ private String homeDir;
+
+ private BlobStoreType oldBlobStoreType;
+
+ @Activate
+ protected void activate(ComponentContext context, Map<String, Object> config) throws InvalidSyntaxException {
+ String oldTypeName = lookup(context, PROP_OLD_BLOB_STORE_TYPE);
+ if (oldTypeName == null) {
+ oldBlobStoreType = BlobStoreType.EXTERNAL;
+ } else {
+ oldBlobStoreType = BlobStoreType.valueOf(oldTypeName);
+ }
+ homeDir = lookup(context, PROP_HOME);
+ if (homeDir != null) {
+ log.info("Initializing the SplitBlobStore with home [{}]", homeDir);
+ } else {
+ log.warn("Can't initialize SplitBlobStore - empty {}", PROP_HOME);
+ return;
+ }
+ ctx = context.getBundleContext();
+ registerSplitBlobStore();
+ }
+
+ @Deactivate
+ protected void deactivate() {
+ unregisterSplitBlobStore();
+ ctx = null;
+ }
+
+ private void registerSplitBlobStore() {
+ if (oldBlobStore == null && oldBlobStoreType == BlobStoreType.EXTERNAL) {
+ log.info("No BlobStore with ({}=old)", PROP_SPLIT_BLOBSTORE);
+ return;
+ }
+ if (newBlobStore == null) {
+ log.info("No BlobStore with ({}=new)", PROP_SPLIT_BLOBSTORE);
+ return;
+ }
+ if (reg != null) {
+ log.info("SplitBlobStore already registered");
+ return;
+ }
+ if (ctx == null) {
+ log.info("Component not activated yet");
+ return;
+ }
+ log.info("Registering SplitBlobStore with old={} ({}) and new={}", oldBlobStore, oldBlobStoreType,
+ newBlobStore);
+ BlobStore blobStore;
+ if (oldBlobStoreType == EXTERNAL || oldBlobStoreType == SEGMENT) {
+ blobStore = new DefaultSplitBlobStore(homeDir, oldBlobStore, newBlobStore);
+ } else if (oldBlobStoreType == DOCUMENT) {
+ blobStore = new WrappingSplitBlobStore(homeDir, newBlobStore);
+ } else {
+ throw new IllegalStateException("Illegal blob store type value: " + oldBlobStoreType);
+ }
+ Dictionary<String, Object> props = new Hashtable<String, Object>();
+ props.put("service.pid", "org.apache.jackrabbit.oak.spi.blob.split.SplitBlobStore");
+ reg = ctx.registerService(new String[] { BlobStore.class.getName() }, blobStore, props);
+ }
+
+ private void unregisterSplitBlobStore() {
+ if (reg != null) {
+ reg.unregister();
+ }
+ reg = null;
+ }
+
+ private static String lookup(ComponentContext context, String property) {
+ // Prefer property from BundleContext first
+ if (context.getBundleContext().getProperty(property) != null) {
+ return context.getBundleContext().getProperty(property);
+ }
+
+ if (context.getProperties().get(property) != null) {
+ return context.getProperties().get(property).toString();
+ }
+ return null;
+ }
+
+ protected void bindOldBlobStore(BlobStore blobStore) {
+ this.oldBlobStore = blobStore;
+ registerSplitBlobStore();
+ }
+
+ protected void unbindOldBlobStore(BlobStore blobStore) {
+ this.oldBlobStore = null;
+ unregisterSplitBlobStore();
+ }
+
+ protected void bindNewBlobStore(BlobStore blobStore) {
+ this.newBlobStore = blobStore;
+ registerSplitBlobStore();
+ }
+
+ protected void unbindNewBlobStore(BlobStore blobStore) {
+ this.newBlobStore = null;
+ unregisterSplitBlobStore();
+ }
+
+ enum BlobStoreType {
+ EXTERNAL, DOCUMENT, SEGMENT
+ }
+}
Modified: jackrabbit/oak/trunk/oak-blob/src/main/java/org/apache/jackrabbit/oak/spi/blob/package-info.java
URL: http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-blob/src/main/java/org/apache/jackrabbit/oak/spi/blob/package-info.java?rev=1703555&r1=1703554&r2=1703555&view=diff
==============================================================================
--- jackrabbit/oak/trunk/oak-blob/src/main/java/org/apache/jackrabbit/oak/spi/blob/package-info.java (original)
+++ jackrabbit/oak/trunk/oak-blob/src/main/java/org/apache/jackrabbit/oak/spi/blob/package-info.java Thu Sep 17 10:24:07 2015
@@ -14,7 +14,7 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
-@Version("1.1")
+@Version("1.2")
@Export(optional = "provide:=true")
package org.apache.jackrabbit.oak.spi.blob;
Added: jackrabbit/oak/trunk/oak-blob/src/main/java/org/apache/jackrabbit/oak/spi/blob/split/BlobIdSet.java
URL: http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-blob/src/main/java/org/apache/jackrabbit/oak/spi/blob/split/BlobIdSet.java?rev=1703555&view=auto
==============================================================================
--- jackrabbit/oak/trunk/oak-blob/src/main/java/org/apache/jackrabbit/oak/spi/blob/split/BlobIdSet.java (added)
+++ jackrabbit/oak/trunk/oak-blob/src/main/java/org/apache/jackrabbit/oak/spi/blob/split/BlobIdSet.java Thu Sep 17 10:24:07 2015
@@ -0,0 +1,125 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.jackrabbit.oak.spi.blob.split;
+
+import java.io.BufferedReader;
+import java.io.File;
+import java.io.FileNotFoundException;
+import java.io.FileReader;
+import java.io.FileWriter;
+import java.io.IOException;
+
+import org.apache.commons.io.IOUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.google.common.base.Charsets;
+import com.google.common.cache.Cache;
+import com.google.common.cache.CacheBuilder;
+import com.google.common.hash.BloomFilter;
+import com.google.common.hash.Funnels;
+
+class BlobIdSet {
+
+ private static final Logger log = LoggerFactory.getLogger(BlobIdSet.class);
+
+ private final File store;
+
+ private final BloomFilter<CharSequence> bloomFilter;
+
+ private final Cache<String, Boolean> cache;
+
+ BlobIdSet(String repositoryDir, String filename) {
+ store = new File(new File(repositoryDir), filename);
+ bloomFilter = BloomFilter.create(Funnels.stringFunnel(Charsets.UTF_8), 9000000); // about 8MB
+ cache = CacheBuilder.newBuilder().maximumSize(1000).build();
+ fillBloomFilter();
+ }
+
+ synchronized boolean contains(String blobId) throws IOException {
+ if (!bloomFilter.apply(blobId)) {
+ return false;
+ }
+ Boolean cached = cache.getIfPresent(blobId);
+ if (cached != null) {
+ return cached;
+ }
+
+ if (isPresentInStore(blobId)) {
+ cache.put(blobId, Boolean.TRUE);
+ bloomFilter.put(blobId);
+ return true;
+ } else {
+ cache.put(blobId, Boolean.FALSE);
+ return false;
+ }
+ }
+
+ synchronized void add(String blobId) throws IOException {
+ addToStore(blobId);
+ bloomFilter.put(blobId);
+ cache.put(blobId, Boolean.TRUE);
+ }
+
+ private boolean isPresentInStore(String blobId) throws FileNotFoundException, IOException {
+ if (!store.exists()) {
+ return false;
+ }
+ BufferedReader reader = new BufferedReader(new FileReader(store));
+ try {
+ String line;
+ while ((line = reader.readLine()) != null) {
+ if (line.equals(blobId)) {
+ return true;
+ }
+ }
+ } finally {
+ reader.close();
+ }
+ return false;
+ }
+
+ private void addToStore(String blobId) throws IOException {
+ FileWriter writer = new FileWriter(store.getPath(), true);
+ try {
+ writer.append(blobId).append('\n');
+ } finally {
+ writer.close();
+ }
+ }
+
+ private void fillBloomFilter() {
+ if (!store.exists()) {
+ return;
+ }
+ BufferedReader reader = null;
+ try {
+ reader = new BufferedReader(new FileReader(store));
+ String line;
+ while ((line = reader.readLine()) != null) {
+ bloomFilter.put(line);
+ }
+ } catch (IOException e) {
+ log.error("Can't fill bloom filter", e);
+ } finally {
+ IOUtils.closeQuietly(reader);
+ }
+ }
+}
Added: jackrabbit/oak/trunk/oak-blob/src/main/java/org/apache/jackrabbit/oak/spi/blob/split/DefaultSplitBlobStore.java
URL: http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-blob/src/main/java/org/apache/jackrabbit/oak/spi/blob/split/DefaultSplitBlobStore.java?rev=1703555&view=auto
==============================================================================
--- jackrabbit/oak/trunk/oak-blob/src/main/java/org/apache/jackrabbit/oak/spi/blob/split/DefaultSplitBlobStore.java (added)
+++ jackrabbit/oak/trunk/oak-blob/src/main/java/org/apache/jackrabbit/oak/spi/blob/split/DefaultSplitBlobStore.java Thu Sep 17 10:24:07 2015
@@ -0,0 +1,113 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.jackrabbit.oak.spi.blob.split;
+
+import java.io.IOException;
+import java.io.InputStream;
+
+import org.apache.jackrabbit.oak.spi.blob.BlobStore;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class DefaultSplitBlobStore implements SplitBlobStore {
+
+ private static final Logger log = LoggerFactory.getLogger(BlobIdSet.class);
+
+ private static final String OLD_BLOBSTORE_PREFIX = "o_";
+
+ private static final String NEW_BLOBSTORE_PREFIX = "n_";
+
+ private final BlobStore oldBlobStore;
+
+ private final BlobStore newBlobStore;
+
+ private final BlobIdSet migratedBlobs;
+
+ public DefaultSplitBlobStore(String repositoryDir, BlobStore oldBlobStore, BlobStore newBlobStore) {
+ this.oldBlobStore = oldBlobStore;
+ this.newBlobStore = newBlobStore;
+ this.migratedBlobs = new BlobIdSet(repositoryDir, "migrated_blobs.txt");
+ }
+
+ public boolean isMigrated(String blobId) throws IOException {
+ return migratedBlobs.contains(blobId);
+ }
+
+ @Override
+ public String writeBlob(InputStream in) throws IOException {
+ String blobId = newBlobStore.writeBlob(in);
+ migratedBlobs.add(blobId);
+ return blobId;
+ }
+
+ @Override
+ public int readBlob(String blobId, long pos, byte[] buff, int off, int length) throws IOException {
+ return chooseBlobStoreByBlobId(blobId).readBlob(blobId, pos, buff, off, length);
+ }
+
+ @Override
+ public long getBlobLength(String blobId) throws IOException {
+ return chooseBlobStoreByBlobId(blobId).getBlobLength(blobId);
+ }
+
+ @Override
+ public InputStream getInputStream(String blobId) throws IOException {
+ return chooseBlobStoreByBlobId(blobId).getInputStream(blobId);
+ }
+
+ @Override
+ public String getBlobId(String reference) {
+ if (reference.startsWith(NEW_BLOBSTORE_PREFIX)) {
+ return newBlobStore.getBlobId(reference.substring(NEW_BLOBSTORE_PREFIX.length()));
+ } else if (reference.startsWith(OLD_BLOBSTORE_PREFIX)) {
+ return oldBlobStore.getBlobId(reference.substring(OLD_BLOBSTORE_PREFIX.length()));
+ } else {
+ log.error("Invalid reference: {}", reference);
+ return null;
+ }
+ }
+
+ @Override
+ public String getReference(String blobId) {
+ try {
+ if (isMigrated(blobId)) {
+ return NEW_BLOBSTORE_PREFIX + newBlobStore.getReference(blobId);
+ } else {
+ return OLD_BLOBSTORE_PREFIX + oldBlobStore.getReference(blobId);
+ }
+ } catch (IOException e) {
+ log.error("Can't get reference", e);
+ return null;
+ }
+ }
+
+ private BlobStore chooseBlobStoreByBlobId(String blobId) throws IOException {
+ if (isMigrated(blobId) || oldBlobStore == null) {
+ return newBlobStore;
+ } else {
+ return oldBlobStore;
+ }
+ }
+
+ @Override
+ public String toString() {
+ return String.format("SplitBlobStore[old={}, new={}]", oldBlobStore, newBlobStore);
+ }
+}
Added: jackrabbit/oak/trunk/oak-blob/src/main/java/org/apache/jackrabbit/oak/spi/blob/split/SplitBlobStore.java
URL: http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-blob/src/main/java/org/apache/jackrabbit/oak/spi/blob/split/SplitBlobStore.java?rev=1703555&view=auto
==============================================================================
--- jackrabbit/oak/trunk/oak-blob/src/main/java/org/apache/jackrabbit/oak/spi/blob/split/SplitBlobStore.java (added)
+++ jackrabbit/oak/trunk/oak-blob/src/main/java/org/apache/jackrabbit/oak/spi/blob/split/SplitBlobStore.java Thu Sep 17 10:24:07 2015
@@ -0,0 +1,30 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.jackrabbit.oak.spi.blob.split;
+
+import java.io.IOException;
+
+import org.apache.jackrabbit.oak.spi.blob.BlobStore;
+
+public interface SplitBlobStore extends BlobStore {
+
+ boolean isMigrated(String blobId) throws IOException;
+
+}
Added: jackrabbit/oak/trunk/oak-blob/src/main/java/org/apache/jackrabbit/oak/spi/blob/split/WrappingSplitBlobStore.java
URL: http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-blob/src/main/java/org/apache/jackrabbit/oak/spi/blob/split/WrappingSplitBlobStore.java?rev=1703555&view=auto
==============================================================================
--- jackrabbit/oak/trunk/oak-blob/src/main/java/org/apache/jackrabbit/oak/spi/blob/split/WrappingSplitBlobStore.java (added)
+++ jackrabbit/oak/trunk/oak-blob/src/main/java/org/apache/jackrabbit/oak/spi/blob/split/WrappingSplitBlobStore.java Thu Sep 17 10:24:07 2015
@@ -0,0 +1,93 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.jackrabbit.oak.spi.blob.split;
+
+import java.io.IOException;
+import java.io.InputStream;
+
+import org.apache.jackrabbit.oak.spi.blob.BlobStore;
+import org.apache.jackrabbit.oak.spi.blob.BlobStoreWrapper;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class WrappingSplitBlobStore implements BlobStoreWrapper, SplitBlobStore {
+
+ private static final Logger log = LoggerFactory.getLogger(WrappingSplitBlobStore.class);
+
+ private DefaultSplitBlobStore splitBlobStore;
+
+ private final String repositoryDir;
+
+ private final BlobStore newBlobStore;
+
+ public WrappingSplitBlobStore(String repositoryDir, BlobStore newBlobStore) {
+ this.repositoryDir = repositoryDir;
+ this.newBlobStore = newBlobStore;
+ }
+
+ @Override
+ public void setBlobStore(BlobStore blobStore) {
+ log.info("Internal blob store set: {}", blobStore);
+ splitBlobStore = new DefaultSplitBlobStore(repositoryDir, blobStore, newBlobStore);
+ }
+
+ private SplitBlobStore getSplitBlobStore() {
+ if (splitBlobStore == null) {
+ throw new IllegalStateException("The old blob store hasn't been set yet.");
+ }
+ return splitBlobStore;
+ }
+
+ @Override
+ public String writeBlob(InputStream in) throws IOException {
+ return getSplitBlobStore().writeBlob(in);
+ }
+
+ @Override
+ public int readBlob(String blobId, long pos, byte[] buff, int off, int length) throws IOException {
+ return getSplitBlobStore().readBlob(blobId, pos, buff, off, length);
+ }
+
+ @Override
+ public long getBlobLength(String blobId) throws IOException {
+ return getSplitBlobStore().getBlobLength(blobId);
+ }
+
+ @Override
+ public InputStream getInputStream(String blobId) throws IOException {
+ return getSplitBlobStore().getInputStream(blobId);
+ }
+
+ @Override
+ public String getBlobId(String reference) {
+ return getSplitBlobStore().getBlobId(reference);
+ }
+
+ @Override
+ public String getReference(String blobId) {
+ return getSplitBlobStore().getReference(blobId);
+ }
+
+ @Override
+ public boolean isMigrated(String blobId) throws IOException {
+ return getSplitBlobStore().isMigrated(blobId);
+ }
+
+}
Added: jackrabbit/oak/trunk/oak-blob/src/test/java/org/apache/jackrabbit/oak/spi/blob/split/SplitBlobStoreTest.java
URL: http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-blob/src/test/java/org/apache/jackrabbit/oak/spi/blob/split/SplitBlobStoreTest.java?rev=1703555&view=auto
==============================================================================
--- jackrabbit/oak/trunk/oak-blob/src/test/java/org/apache/jackrabbit/oak/spi/blob/split/SplitBlobStoreTest.java (added)
+++ jackrabbit/oak/trunk/oak-blob/src/test/java/org/apache/jackrabbit/oak/spi/blob/split/SplitBlobStoreTest.java Thu Sep 17 10:24:07 2015
@@ -0,0 +1,165 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.jackrabbit.oak.spi.blob.split;
+
+import java.io.ByteArrayInputStream;
+import java.io.File;
+import java.io.IOException;
+import java.io.InputStream;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Random;
+
+import org.apache.commons.io.FileUtils;
+import org.apache.jackrabbit.oak.spi.blob.BlobStore;
+import org.apache.jackrabbit.oak.spi.blob.FileBlobStore;
+import org.apache.jackrabbit.oak.spi.blob.split.DefaultSplitBlobStore;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+
+import com.google.common.io.Files;
+
+import static junit.framework.Assert.assertEquals;
+import static junit.framework.Assert.assertFalse;
+import static junit.framework.Assert.assertTrue;
+
+public class SplitBlobStoreTest {
+
+ private static final int LENGTH = 1024;
+
+ private final Random random = new Random();
+
+ private File repository;
+
+ private BlobStore oldBlobStore;
+
+ private BlobStore newBlobStore;
+
+ private DefaultSplitBlobStore splitBlobStore;
+
+ private List<String> oldBlobIds;
+
+ private List<String> newBlobIds;
+
+ @Before
+ public void setup() throws IOException {
+ repository = Files.createTempDir();
+ oldBlobStore = new FileBlobStore(repository.getPath() + "/old");
+ newBlobStore = new FileBlobStore(repository.getPath() + "/new");
+ splitBlobStore = new DefaultSplitBlobStore(repository.getPath(), oldBlobStore, newBlobStore);
+ oldBlobIds = addBlobs(oldBlobStore);
+ newBlobIds = addBlobs(splitBlobStore);
+ }
+
+ @After
+ public void teardown() throws IOException {
+ FileUtils.deleteDirectory(repository);
+ }
+
+ @Test
+ public void testLength() throws IOException {
+ for (String id : oldBlobIds) {
+ assertEquals(LENGTH, splitBlobStore.getBlobLength(id));
+ }
+ for (String id : newBlobIds) {
+ assertEquals(LENGTH, newBlobStore.getBlobLength(id));
+ assertEquals(LENGTH, splitBlobStore.getBlobLength(id));
+ }
+ }
+
+ @Test
+ public void testIsMigrated() throws IOException {
+ for (String id : oldBlobIds) {
+ assertFalse(splitBlobStore.isMigrated(id));
+ }
+ for (String id : newBlobIds) {
+ assertTrue(splitBlobStore.isMigrated(id));
+ }
+ }
+
+ @Test
+ public void testGetInputStream() throws IOException {
+ for (String id : oldBlobIds) {
+ assertStreamEquals(oldBlobStore.getInputStream(id), splitBlobStore.getInputStream(id));
+ }
+ for (String id : newBlobIds) {
+ assertStreamEquals(newBlobStore.getInputStream(id), splitBlobStore.getInputStream(id));
+ }
+ }
+
+ @Test
+ public void testReadByte() throws IOException {
+ byte[] expected = new byte[LENGTH];
+ byte[] actual = new byte[LENGTH];
+ for (String id : oldBlobIds) {
+ oldBlobStore.readBlob(id, 0, expected, 0, LENGTH);
+ splitBlobStore.readBlob(id, 0, actual, 0, LENGTH);
+ assertArrayEquals(expected, actual);
+ }
+ for (String id : newBlobIds) {
+ newBlobStore.readBlob(id, 0, expected, 0, LENGTH);
+ splitBlobStore.readBlob(id, 0, actual, 0, LENGTH);
+ assertArrayEquals(expected, actual);
+ }
+ }
+
+ @Test
+ public void testReferences() throws IOException {
+ for (String id : oldBlobIds) {
+ String reference = splitBlobStore.getReference(id);
+ assertEquals(id, splitBlobStore.getBlobId(reference));
+ }
+ for (String id : newBlobIds) {
+ String reference = splitBlobStore.getReference(id);
+ assertEquals(id, splitBlobStore.getBlobId(reference));
+ }
+ }
+
+ private List<String> addBlobs(BlobStore blobStore) throws IOException {
+ List<String> ids = new ArrayList<String>();
+ for (int i = 0; i < 5; i++) {
+ byte[] buffer = new byte[LENGTH];
+ random.nextBytes(buffer);
+ InputStream bis = new ByteArrayInputStream(buffer);
+ ids.add(blobStore.writeBlob(bis));
+ }
+ return ids;
+ }
+
+ private static void assertStreamEquals(InputStream expected, InputStream actual) throws IOException {
+ while (true) {
+ int expectedByte = expected.read();
+ int actualByte = actual.read();
+ assertEquals(expectedByte, actualByte);
+ if (expectedByte == -1) {
+ break;
+ }
+ }
+ }
+
+ private static void assertArrayEquals(byte[] expected, byte[] actual) throws IOException {
+ assertEquals(expected.length, actual.length);
+ for (int i = 0; i < expected.length; i++) {
+ assertEquals(expected[i], actual[i]);
+ }
+ }
+
+}
\ No newline at end of file
Modified: jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/blob/datastore/AbstractDataStoreService.java
URL: http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/blob/datastore/AbstractDataStoreService.java?rev=1703555&r1=1703554&r2=1703555&view=diff
==============================================================================
--- jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/blob/datastore/AbstractDataStoreService.java (original)
+++ jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/blob/datastore/AbstractDataStoreService.java Thu Sep 17 10:24:07 2015
@@ -36,6 +36,8 @@ import org.osgi.service.component.Compon
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import static org.apache.jackrabbit.oak.spi.blob.osgi.SplitBlobStoreService.PROP_SPLIT_BLOBSTORE;
+
public abstract class AbstractDataStoreService {
private static final String PROP_HOME = "repository.home";
@@ -65,6 +67,9 @@ public abstract class AbstractDataStoreS
Dictionary<String, Object> props = new Hashtable<String, Object>();
props.put(Constants.SERVICE_PID, ds.getClass().getName());
props.put(DESCRIPTION, getDescription());
+ if (context.getProperties().get(PROP_SPLIT_BLOBSTORE) != null) {
+ props.put(PROP_SPLIT_BLOBSTORE, context.getProperties().get(PROP_SPLIT_BLOBSTORE));
+ }
reg = context.getBundleContext().registerService(new String[]{
BlobStore.class.getName(),
Added: jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/blob/migration/BlobMigration.java
URL: http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/blob/migration/BlobMigration.java?rev=1703555&view=auto
==============================================================================
--- jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/blob/migration/BlobMigration.java (added)
+++ jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/blob/migration/BlobMigration.java Thu Sep 17 10:24:07 2015
@@ -0,0 +1,165 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.jackrabbit.oak.plugins.blob.migration;
+
+import static java.lang.System.nanoTime;
+import static org.apache.jackrabbit.oak.management.ManagementOperation.done;
+import static org.apache.jackrabbit.oak.management.ManagementOperation.newManagementOperation;
+import static org.apache.jackrabbit.oak.management.ManagementOperation.Status.formatTime;
+import static org.apache.jackrabbit.oak.spi.whiteboard.WhiteboardUtils.registerMBean;
+
+import java.util.HashMap;
+import java.util.Map;
+import java.util.concurrent.Callable;
+import java.util.concurrent.Executor;
+import java.util.concurrent.Executors;
+
+import javax.annotation.Nonnull;
+import javax.management.openmbean.CompositeData;
+import javax.management.openmbean.CompositeDataSupport;
+import javax.management.openmbean.CompositeType;
+import javax.management.openmbean.OpenDataException;
+import javax.management.openmbean.OpenType;
+import javax.management.openmbean.SimpleType;
+
+import org.apache.felix.scr.annotations.Activate;
+import org.apache.felix.scr.annotations.Component;
+import org.apache.felix.scr.annotations.Deactivate;
+import org.apache.felix.scr.annotations.Reference;
+import org.apache.jackrabbit.oak.api.jmx.RepositoryManagementMBean.StatusCode;
+import org.apache.jackrabbit.oak.commons.jmx.AnnotatedStandardMBean;
+import org.apache.jackrabbit.oak.management.ManagementOperation;
+import org.apache.jackrabbit.oak.osgi.OsgiWhiteboard;
+import org.apache.jackrabbit.oak.spi.blob.BlobStore;
+import org.apache.jackrabbit.oak.spi.blob.split.SplitBlobStore;
+import org.apache.jackrabbit.oak.spi.state.NodeStore;
+import org.apache.jackrabbit.oak.spi.whiteboard.Registration;
+import org.apache.jackrabbit.oak.spi.whiteboard.Whiteboard;
+import org.osgi.framework.BundleContext;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+@Component
+public class BlobMigration extends AnnotatedStandardMBean implements BlobMigrationMBean {
+
+ public static final String OP_NAME = "Blob migration";
+
+ private static final Logger log = LoggerFactory.getLogger(BlobMigrator.class);
+
+ private ManagementOperation<String> migrationOp = done(OP_NAME, "");
+
+ private static final CompositeType TYPE;
+
+ static {
+ CompositeType type;
+ try {
+ type = new CompositeType("BlobMigrationStatus", "Status of the blob migraiton",
+ new String[] { "isRunning", "migratedNodes", "lastProcessedPath", "operationStatus" },
+ new String[] { "Migration in progress", "Total number of migrated nodes", "Last processed path", "Status of the operation" },
+ new OpenType[] { SimpleType.BOOLEAN, SimpleType.INTEGER, SimpleType.STRING, ManagementOperation.Status.ITEM_TYPES });
+ } catch (OpenDataException e) {
+ type = null;
+ log.error("Can't create a CompositeType", e);
+ }
+ TYPE = type;
+ }
+
+ @Reference(target = "(service.pid=org.apache.jackrabbit.oak.spi.blob.split.SplitBlobStore)")
+ private BlobStore splitBlobStore;
+
+ @Reference
+ private NodeStore nodeStore;
+
+ private Executor executor = Executors.newSingleThreadExecutor();
+
+ private BlobMigrator migrator;
+
+ private Registration mbeanReg;
+
+ public BlobMigration() {
+ super(BlobMigrationMBean.class);
+ }
+
+ @Activate
+ private void activate(BundleContext ctx) {
+ Whiteboard wb = new OsgiWhiteboard(ctx);
+ migrator = new BlobMigrator((SplitBlobStore) splitBlobStore, nodeStore);
+ mbeanReg = registerMBean(wb, BlobMigrationMBean.class, this, BlobMigrationMBean.TYPE, OP_NAME);
+ }
+
+ @Deactivate
+ private void deactivate() throws InterruptedException {
+ if (migrator != null) {
+ migrator.stop();
+ migrator = null;
+ }
+ if (mbeanReg != null) {
+ mbeanReg.unregister();
+ mbeanReg = null;
+ }
+ }
+
+ @Nonnull
+ @Override
+ public String startBlobMigration(final boolean resume) {
+ if (migrationOp.isDone()) {
+ migrationOp = newManagementOperation(OP_NAME, new Callable<String>() {
+ @Override
+ public String call() throws Exception {
+ long t0 = nanoTime();
+ boolean finished;
+ if (resume) {
+ finished = migrator.migrate();
+ } else {
+ finished = migrator.start();
+ }
+ String duration = formatTime(nanoTime() - t0);
+ if (finished) {
+ return "All blobs migrated in " + duration;
+ } else {
+ return "Migration stopped manually after " + duration;
+ }
+ }
+ });
+ executor.execute(migrationOp);
+ return "Migration started";
+ } else {
+ return "Migration is already in progress";
+ }
+ }
+
+ @Nonnull
+ @Override
+ public String stopBlobMigration() {
+ migrator.stop();
+ return "Migration will be stopped";
+ }
+
+ @Nonnull
+ @Override
+ public CompositeData getBlobMigrationStatus() throws OpenDataException {
+ Map<String, Object> status = new HashMap<String, Object>();
+ status.put("isRunning", migrationOp.getStatus().getCode() == StatusCode.RUNNING);
+ status.put("migratedNodes", migrator.getTotalMigratedNodes());
+ status.put("lastProcessedPath", migrator.getLastProcessedPath());
+ status.put("operationStatus", migrationOp.getStatus().toCompositeData());
+ return new CompositeDataSupport(TYPE, status);
+ }
+}
Added: jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/blob/migration/BlobMigrationMBean.java
URL: http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/blob/migration/BlobMigrationMBean.java?rev=1703555&view=auto
==============================================================================
--- jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/blob/migration/BlobMigrationMBean.java (added)
+++ jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/blob/migration/BlobMigrationMBean.java Thu Sep 17 10:24:07 2015
@@ -0,0 +1,45 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.jackrabbit.oak.plugins.blob.migration;
+
+import javax.annotation.Nonnull;
+import javax.management.openmbean.CompositeData;
+import javax.management.openmbean.OpenDataException;
+
+import org.apache.jackrabbit.oak.commons.jmx.Description;
+import org.apache.jackrabbit.oak.commons.jmx.Name;
+
+public interface BlobMigrationMBean {
+
+ String TYPE = "BlobMigration";
+
+ @Nonnull
+ @Description("Start or resume the blob migration")
+ String startBlobMigration(
+ @Name("resume") @Description("true to resume stopped migration or false to start it from scratch") boolean resume);
+
+ @Nonnull
+ @Description("Stop the blob migration")
+ String stopBlobMigration();
+
+ @Nonnull
+ CompositeData getBlobMigrationStatus() throws OpenDataException;
+
+}
Added: jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/blob/migration/BlobMigrator.java
URL: http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/blob/migration/BlobMigrator.java?rev=1703555&view=auto
==============================================================================
--- jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/blob/migration/BlobMigrator.java (added)
+++ jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/blob/migration/BlobMigrator.java Thu Sep 17 10:24:07 2015
@@ -0,0 +1,216 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.jackrabbit.oak.plugins.blob.migration;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.concurrent.atomic.AtomicBoolean;
+
+import org.apache.jackrabbit.oak.api.Blob;
+import org.apache.jackrabbit.oak.api.CommitFailedException;
+import org.apache.jackrabbit.oak.api.PropertyState;
+import org.apache.jackrabbit.oak.api.Type;
+import org.apache.jackrabbit.oak.plugins.blob.BlobStoreBlob;
+import org.apache.jackrabbit.oak.plugins.memory.PropertyBuilder;
+import org.apache.jackrabbit.oak.spi.blob.split.SplitBlobStore;
+import org.apache.jackrabbit.oak.spi.commit.CommitInfo;
+import org.apache.jackrabbit.oak.spi.commit.EmptyHook;
+import org.apache.jackrabbit.oak.spi.state.ChildNodeEntry;
+import org.apache.jackrabbit.oak.spi.state.NodeBuilder;
+import org.apache.jackrabbit.oak.spi.state.NodeState;
+import org.apache.jackrabbit.oak.spi.state.NodeStore;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class BlobMigrator {
+
+ private static final Logger log = LoggerFactory.getLogger(BlobMigrator.class);
+
+ private static final int MERGE_LIMIT = 100;
+
+ private static final int MERGE_TIMEOUT = 30;
+
+ private final SplitBlobStore blobStore;
+
+ private final NodeStore nodeStore;
+
+ private final AtomicBoolean stopMigration = new AtomicBoolean(false);
+
+ private DepthFirstNodeIterator nodeIterator;
+
+ private NodeBuilder rootBuilder;
+
+ private long lastCommit;
+
+ private int migratedNodes;
+
+ private volatile String lastPath;
+
+ private volatile int totalMigratedNodes;
+
+ public BlobMigrator(SplitBlobStore blobStore, NodeStore nodeStore) {
+ this.blobStore = blobStore;
+ this.nodeStore = nodeStore;
+ refreshAndReset();
+ }
+
+ public boolean start() throws IOException {
+ totalMigratedNodes = 0;
+ refreshAndReset();
+ return migrate();
+ }
+
+ public boolean migrate() throws IOException {
+ do {
+ while (nodeIterator.hasNext()) {
+ lastPath = nodeIterator.getPath();
+ if (stopMigration.getAndSet(false)) {
+ if (migratedNodes > 0) {
+ tryCommit();
+ }
+ return false;
+ }
+ migrateNode(rootBuilder, nodeIterator);
+ if (timeToCommit()) {
+ tryCommit();
+ }
+ }
+ // at this point we iterated over the whole repository
+ // the last thing to do is to check if we don't have
+ // any nodes waiting to be migrated. if the operation
+ // fails we have to start from the beginning
+ } while (migratedNodes > 0 && !tryCommit());
+ return true;
+ }
+
+ private boolean tryCommit() {
+ try {
+ nodeStore.merge(rootBuilder, EmptyHook.INSTANCE, CommitInfo.EMPTY);
+ totalMigratedNodes += migratedNodes;
+ log.info("{} nodes merged succesfully. Nodes migrated in this session: {}", migratedNodes, totalMigratedNodes);
+ lastCommit = System.currentTimeMillis();
+ migratedNodes = 0;
+ return true;
+ } catch (CommitFailedException e) {
+ log.error("Can't commit. Resetting the migrator", e);
+ refreshAndReset();
+ return false;
+ }
+ }
+
+ private boolean timeToCommit() {
+ long changesMerged = (System.currentTimeMillis() - lastCommit) / 1000;
+ if (migratedNodes >= MERGE_LIMIT) {
+ log.info("Migrated nodes count: {}. Merging changes.", migratedNodes);
+ return true;
+ } else if (migratedNodes > 0 && changesMerged >= MERGE_TIMEOUT) {
+ log.info("Changes have been merged {}s ago. Merging {} nodes.", changesMerged, migratedNodes);
+ return true;
+ }
+ return false;
+ }
+
+ public void stop() {
+ stopMigration.set(true);
+ }
+
+ public String getLastProcessedPath() {
+ return lastPath;
+ }
+
+ public int getTotalMigratedNodes() {
+ return totalMigratedNodes;
+ }
+
+ private void refreshAndReset() {
+ NodeState rootState = nodeStore.getRoot();
+ rootBuilder = rootState.builder();
+ nodeIterator = new DepthFirstNodeIterator(rootState);
+ lastPath = null;
+ lastCommit = System.currentTimeMillis();
+ migratedNodes = 0;
+ }
+
+ private void migrateNode(NodeBuilder rootBuilder, DepthFirstNodeIterator iterator) throws IOException {
+ ChildNodeEntry node = iterator.next();
+ NodeState state = node.getNodeState();
+ for (PropertyState property : state.getProperties()) {
+ PropertyState newProperty;
+ if (property.getType() == Type.BINARY) {
+ newProperty = migrateProperty(property);
+ } else if (property.getType() == Type.BINARIES) {
+ newProperty = migrateMultiProperty(property);
+ } else {
+ newProperty = null;
+ }
+ if (newProperty != null) {
+ NodeBuilder builder = iterator.getBuilder(rootBuilder);
+ if (builder.exists()) {
+ builder.setProperty(newProperty);
+ migratedNodes++;
+ log.debug("Migrated property {}/{}", lastPath, property.getName());
+ } else {
+ log.warn("Can't migrate blobs for a non-existing node: {}", lastPath);
+ }
+ }
+ }
+ }
+
+ private PropertyState migrateProperty(PropertyState propertyState) throws IOException {
+ Blob oldBlob = propertyState.getValue(Type.BINARY);
+ String blobId = oldBlob.getContentIdentity();
+ if (blobStore.isMigrated(blobId)) {
+ return null;
+ }
+
+ String newBlobId = blobStore.writeBlob(oldBlob.getNewStream());
+ Blob newBlob = new BlobStoreBlob(blobStore, newBlobId);
+ PropertyBuilder<Blob> builder = new PropertyBuilder<Blob>(Type.BINARY);
+ builder.assignFrom(propertyState);
+ builder.setValue(newBlob);
+ return builder.getPropertyState();
+ }
+
+ private PropertyState migrateMultiProperty(PropertyState propertyState) throws IOException {
+ Iterable<Blob> oldBlobs = propertyState.getValue(Type.BINARIES);
+ List<Blob> newBlobs = new ArrayList<Blob>();
+ PropertyBuilder<Blob> builder = new PropertyBuilder<Blob>(Type.BINARY);
+ builder.assignFrom(propertyState);
+ boolean blobUpdated = false;
+ for (Blob oldBlob : oldBlobs) {
+ String blobId = oldBlob.getContentIdentity();
+ if (blobStore.isMigrated(blobId)) {
+ newBlobs.add(new BlobStoreBlob(blobStore, blobId));
+ } else {
+ String newBlobId = blobStore.writeBlob(oldBlob.getNewStream());
+ Blob newBlob = new BlobStoreBlob(blobStore, newBlobId);
+ newBlobs.add(newBlob);
+ blobUpdated = true;
+ }
+ }
+ if (blobUpdated) {
+ builder.setValues(newBlobs);
+ return builder.getPropertyState();
+ } else {
+ return null;
+ }
+ }
+}
\ No newline at end of file
Added: jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/blob/migration/DepthFirstNodeIterator.java
URL: http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/blob/migration/DepthFirstNodeIterator.java?rev=1703555&view=auto
==============================================================================
--- jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/blob/migration/DepthFirstNodeIterator.java (added)
+++ jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/blob/migration/DepthFirstNodeIterator.java Thu Sep 17 10:24:07 2015
@@ -0,0 +1,84 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.jackrabbit.oak.plugins.blob.migration;
+
+import java.util.ArrayDeque;
+import java.util.Deque;
+import java.util.Iterator;
+
+import org.apache.jackrabbit.oak.spi.state.ChildNodeEntry;
+import org.apache.jackrabbit.oak.spi.state.NodeBuilder;
+import org.apache.jackrabbit.oak.spi.state.NodeState;
+
+import com.google.common.base.Joiner;
+import com.google.common.collect.AbstractIterator;
+
+public class DepthFirstNodeIterator extends AbstractIterator<ChildNodeEntry> {
+
+ private final Deque<Iterator<? extends ChildNodeEntry>> itQueue = new ArrayDeque<Iterator<? extends ChildNodeEntry>>();
+
+ private final Deque<String> nameQueue = new ArrayDeque<String>();
+
+ private final NodeState root;
+
+ public DepthFirstNodeIterator(NodeState root) {
+ this.root = root;
+ reset();
+ }
+
+ public void reset() {
+ itQueue.clear();
+ nameQueue.clear();
+ itQueue.add(root.getChildNodeEntries().iterator());
+ }
+
+ @Override
+ protected ChildNodeEntry computeNext() {
+ if (itQueue.isEmpty()) {
+ return endOfData();
+ }
+ if (itQueue.peekLast().hasNext()) {
+ ChildNodeEntry next = itQueue.peekLast().next();
+ itQueue.add(next.getNodeState().getChildNodeEntries().iterator());
+ nameQueue.add(next.getName());
+ return next;
+ } else {
+ itQueue.pollLast();
+ if (!nameQueue.isEmpty()) {
+ nameQueue.pollLast();
+ }
+ return computeNext();
+ }
+ }
+
+ public NodeBuilder getBuilder(NodeBuilder rootBuilder) {
+ NodeBuilder builder = rootBuilder;
+ for (String name : nameQueue) {
+ builder = builder.getChildNode(name);
+ }
+ return builder;
+ }
+
+ public String getPath() {
+ StringBuilder path = new StringBuilder("/");
+ return Joiner.on('/').appendTo(path, nameQueue).toString();
+ }
+
+}
\ No newline at end of file
Modified: jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/DocumentNodeStoreService.java
URL: http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/DocumentNodeStoreService.java?rev=1703555&r1=1703554&r2=1703555&view=diff
==============================================================================
--- jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/DocumentNodeStoreService.java (original)
+++ jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/DocumentNodeStoreService.java Thu Sep 17 10:24:07 2015
@@ -27,6 +27,7 @@ import static org.apache.jackrabbit.oak.
import static org.apache.jackrabbit.oak.plugins.document.DocumentMK.Builder.DEFAULT_DIFF_CACHE_PERCENTAGE;
import static org.apache.jackrabbit.oak.plugins.document.DocumentMK.Builder.DEFAULT_DOC_CHILDREN_CACHE_PERCENTAGE;
import static org.apache.jackrabbit.oak.plugins.document.DocumentMK.Builder.DEFAULT_NODE_CACHE_PERCENTAGE;
+import static org.apache.jackrabbit.oak.spi.blob.osgi.SplitBlobStoreService.ONLY_STANDALONE_TARGET;
import static org.apache.jackrabbit.oak.spi.whiteboard.WhiteboardUtils.registerMBean;
import java.io.ByteArrayInputStream;
@@ -70,6 +71,7 @@ import org.apache.jackrabbit.oak.plugins
import org.apache.jackrabbit.oak.plugins.document.util.MongoConnection;
import org.apache.jackrabbit.oak.plugins.identifier.ClusterRepositoryInfo;
import org.apache.jackrabbit.oak.spi.blob.BlobStore;
+import org.apache.jackrabbit.oak.spi.blob.BlobStoreWrapper;
import org.apache.jackrabbit.oak.spi.blob.GarbageCollectableBlobStore;
import org.apache.jackrabbit.oak.spi.state.NodeStore;
import org.apache.jackrabbit.oak.spi.state.RevisionGC;
@@ -240,7 +242,7 @@ public class DocumentNodeStoreService {
private WhiteboardExecutor executor;
@Reference(cardinality = ReferenceCardinality.OPTIONAL_UNARY,
- policy = ReferencePolicy.DYNAMIC)
+ policy = ReferencePolicy.DYNAMIC, target = ONLY_STANDALONE_TARGET)
private volatile BlobStore blobStore;
@Reference(cardinality = ReferenceCardinality.OPTIONAL_UNARY,
@@ -389,8 +391,10 @@ public class DocumentNodeStoreService {
mkBuilder.setPersistentCache(persistentCache);
}
+ boolean wrappingCustomBlobStore = customBlobStore && blobStore instanceof BlobStoreWrapper;
+
//Set blobstore before setting the DB
- if (customBlobStore) {
+ if (customBlobStore && !wrappingCustomBlobStore) {
checkNotNull(blobStore, "Use of custom BlobStore enabled via [%s] but blobStore reference not " +
"initialized", CUSTOM_BLOB_STORE);
mkBuilder.setBlobStore(blobStore);
@@ -431,6 +435,12 @@ public class DocumentNodeStoreService {
log.info("Connected to database {}", mongoDB);
}
+ //Set wrapping blob store after setting the DB
+ if (wrappingCustomBlobStore) {
+ ((BlobStoreWrapper) blobStore).setBlobStore(mkBuilder.getBlobStore());
+ mkBuilder.setBlobStore(blobStore);
+ }
+
mkBuilder.setExecutor(executor);
mk = mkBuilder.open();
Modified: jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/segment/SegmentNodeStoreService.java
URL: http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/segment/SegmentNodeStoreService.java?rev=1703555&r1=1703554&r2=1703555&view=diff
==============================================================================
--- jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/segment/SegmentNodeStoreService.java (original)
+++ jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/segment/SegmentNodeStoreService.java Thu Sep 17 10:24:07 2015
@@ -18,6 +18,7 @@ package org.apache.jackrabbit.oak.plugin
import static com.google.common.base.Preconditions.checkState;
import static java.util.Collections.emptyMap;
+import static org.apache.jackrabbit.oak.spi.blob.osgi.SplitBlobStoreService.ONLY_STANDALONE_TARGET;
import static org.apache.jackrabbit.oak.commons.PropertiesUtil.toBoolean;
import static org.apache.jackrabbit.oak.commons.PropertiesUtil.toInteger;
import static org.apache.jackrabbit.oak.commons.PropertiesUtil.toLong;
@@ -250,7 +251,7 @@ public class SegmentNodeStoreService ext
private ComponentContext context;
@Reference(cardinality = ReferenceCardinality.OPTIONAL_UNARY,
- policy = ReferencePolicy.DYNAMIC)
+ policy = ReferencePolicy.DYNAMIC, target = ONLY_STANDALONE_TARGET)
private volatile BlobStore blobStore;
private ServiceRegistration storeRegistration;
Added: jackrabbit/oak/trunk/oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/blob/migration/AbstractMigratorTest.java
URL: http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/blob/migration/AbstractMigratorTest.java?rev=1703555&view=auto
==============================================================================
--- jackrabbit/oak/trunk/oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/blob/migration/AbstractMigratorTest.java (added)
+++ jackrabbit/oak/trunk/oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/blob/migration/AbstractMigratorTest.java Thu Sep 17 10:24:07 2015
@@ -0,0 +1,165 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.jackrabbit.oak.plugins.blob.migration;
+
+import static junit.framework.Assert.assertEquals;
+
+import java.io.File;
+import java.io.IOException;
+import java.io.InputStream;
+import java.util.Random;
+
+import org.apache.commons.io.FileUtils;
+import org.apache.jackrabbit.oak.api.Blob;
+import org.apache.jackrabbit.oak.api.CommitFailedException;
+import org.apache.jackrabbit.oak.api.PropertyState;
+import org.apache.jackrabbit.oak.api.Type;
+import org.apache.jackrabbit.oak.plugins.memory.ArrayBasedBlob;
+import org.apache.jackrabbit.oak.plugins.memory.PropertyBuilder;
+import org.apache.jackrabbit.oak.spi.blob.BlobStore;
+import org.apache.jackrabbit.oak.spi.blob.split.DefaultSplitBlobStore;
+import org.apache.jackrabbit.oak.spi.commit.CommitInfo;
+import org.apache.jackrabbit.oak.spi.commit.EmptyHook;
+import org.apache.jackrabbit.oak.spi.state.NodeBuilder;
+import org.apache.jackrabbit.oak.spi.state.NodeState;
+import org.apache.jackrabbit.oak.spi.state.NodeStore;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+
+import com.google.common.io.Files;
+
+public abstract class AbstractMigratorTest {
+
+ private static final int LENGTH = 1024 * 16;
+
+ private static final Random RANDOM = new Random();
+
+ private File repository;
+
+ private NodeStore nodeStore;
+
+ private BlobStore newBlobStore;
+
+ private BlobMigrator migrator;
+
+ @Before
+ public void setup() throws CommitFailedException, IllegalArgumentException, IOException {
+ repository = Files.createTempDir();
+ BlobStore oldBlobStore = createOldBlobStore(repository);
+ NodeStore originalNodeStore = createNodeStore(oldBlobStore, repository);
+ createContent(originalNodeStore);
+ closeNodeStore();
+
+ newBlobStore = createNewBlobStore(repository);
+ DefaultSplitBlobStore splitBlobStore = new DefaultSplitBlobStore(repository.getPath(), oldBlobStore, newBlobStore);
+ nodeStore = createNodeStore(splitBlobStore, repository);
+ migrator = new BlobMigrator(splitBlobStore, nodeStore);
+ }
+
+ protected abstract NodeStore createNodeStore(BlobStore blobStore, File repository) throws IOException;
+
+ protected abstract void closeNodeStore();
+
+ protected abstract BlobStore createOldBlobStore(File repository);
+
+ protected abstract BlobStore createNewBlobStore(File repository);
+
+ @After
+ public void teardown() throws IOException {
+ closeNodeStore();
+ FileUtils.deleteDirectory(repository);
+ }
+
+ @Test
+ public void blobsExistsOnTheNewBlobStore() throws IOException, CommitFailedException {
+ migrator.migrate();
+ NodeState root = nodeStore.getRoot();
+ for (int i = 1; i <= 3; i++) {
+ assertPropertyOnTheNewStore(root.getChildNode("node" + i).getProperty("prop"));
+ }
+ }
+
+ @Test
+ public void blobsCanBeReadAfterSwitchingBlobStore() throws IOException, CommitFailedException {
+ migrator.migrate();
+ closeNodeStore();
+
+ nodeStore = createNodeStore(newBlobStore, repository);
+ NodeState root = nodeStore.getRoot();
+ for (int i = 1; i <= 3; i++) {
+ assertPropertyExists(root.getChildNode("node" + i).getProperty("prop"));
+ }
+ }
+
+ private void assertPropertyExists(PropertyState property) {
+ if (property.isArray()) {
+ for (Blob blob : property.getValue(Type.BINARIES)) {
+ assertEquals(LENGTH, blob.length());
+ }
+ } else {
+ assertEquals(LENGTH, property.getValue(Type.BINARY).length());
+ }
+ }
+
+ private void assertPropertyOnTheNewStore(PropertyState property) throws IOException {
+ if (property.isArray()) {
+ for (Blob blob : property.getValue(Type.BINARIES)) {
+ assertPropertyOnTheNewStore(blob);
+ }
+ } else {
+ assertPropertyOnTheNewStore(property.getValue(Type.BINARY));
+ }
+ }
+
+ private void assertPropertyOnTheNewStore(Blob blob) throws IOException {
+ String blobId = blob.getContentIdentity();
+ assertStreamEquals(blob.getNewStream(), newBlobStore.getInputStream(blobId));
+ }
+
+ private static void createContent(NodeStore nodeStore) throws IOException, CommitFailedException {
+ NodeBuilder rootBuilder = nodeStore.getRoot().builder();
+ rootBuilder.child("node1").setProperty("prop", createBlob(nodeStore));
+ rootBuilder.child("node2").setProperty("prop", createBlob(nodeStore));
+ PropertyBuilder<Blob> builder = PropertyBuilder.array(Type.BINARY, "prop");
+ builder.addValue(createBlob(nodeStore));
+ builder.addValue(createBlob(nodeStore));
+ builder.addValue(createBlob(nodeStore));
+ rootBuilder.child("node3").setProperty(builder.getPropertyState());
+ nodeStore.merge(rootBuilder, EmptyHook.INSTANCE, CommitInfo.EMPTY);
+ }
+
+ private static Blob createBlob(NodeStore nodeStore) throws IOException {
+ byte[] buffer = new byte[LENGTH];
+ RANDOM.nextBytes(buffer);
+ return new ArrayBasedBlob(buffer);
+ }
+
+ private static void assertStreamEquals(InputStream expected, InputStream actual) throws IOException {
+ while (true) {
+ int expectedByte = expected.read();
+ int actualByte = actual.read();
+ assertEquals(expectedByte, actualByte);
+ if (expectedByte == -1) {
+ break;
+ }
+ }
+ }
+}
Added: jackrabbit/oak/trunk/oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/blob/migration/DepthFirstNodeIteratorTest.java
URL: http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/blob/migration/DepthFirstNodeIteratorTest.java?rev=1703555&view=auto
==============================================================================
--- jackrabbit/oak/trunk/oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/blob/migration/DepthFirstNodeIteratorTest.java (added)
+++ jackrabbit/oak/trunk/oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/blob/migration/DepthFirstNodeIteratorTest.java Thu Sep 17 10:24:07 2015
@@ -0,0 +1,97 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.jackrabbit.oak.plugins.blob.migration;
+
+import org.apache.jackrabbit.oak.api.CommitFailedException;
+import org.apache.jackrabbit.oak.plugins.segment.SegmentNodeStore;
+import org.apache.jackrabbit.oak.plugins.segment.memory.MemoryStore;
+import org.apache.jackrabbit.oak.spi.commit.CommitInfo;
+import org.apache.jackrabbit.oak.spi.commit.EmptyHook;
+import org.apache.jackrabbit.oak.spi.state.NodeBuilder;
+import org.apache.jackrabbit.oak.spi.state.NodeStore;
+import org.junit.Before;
+import org.junit.Test;
+
+import static junit.framework.Assert.assertEquals;
+import static junit.framework.Assert.assertFalse;
+import static junit.framework.Assert.assertTrue;
+
+import java.util.HashMap;
+import java.util.Map;
+
+public class DepthFirstNodeIteratorTest {
+
+ private NodeStore store;
+
+ @Before
+ public void setup() throws CommitFailedException {
+ store = SegmentNodeStore.newSegmentNodeStore(new MemoryStore()).create();
+ NodeBuilder rootBuilder = store.getRoot().builder();
+ NodeBuilder countries = rootBuilder.child("countries");
+ countries.child("uk").child("cities").child("london").child("districts").child("frognal");
+ countries.child("germany");
+ countries.child("france").child("cities").child("paris");
+ store.merge(rootBuilder, EmptyHook.INSTANCE, CommitInfo.EMPTY);
+ }
+
+ // The order of the returned nodes is not defined, that's why we have to
+ // create 3 subtrees.
+ @Test
+ public void testIterate() {
+ Map<String, String[]> subtrees = new HashMap<String, String[]>();
+ subtrees.put("uk", new String[] { "cities", "london", "districts", "frognal" });
+ subtrees.put("germany", new String[] {});
+ subtrees.put("france", new String[] { "cities", "paris" });
+
+ DepthFirstNodeIterator iterator = new DepthFirstNodeIterator(store.getRoot());
+ assertTrue(iterator.hasNext());
+ assertEquals("countries", iterator.next().getName());
+
+ for (int i = 0; i < 3; i++) {
+ assertTrue(iterator.hasNext());
+ String country = iterator.next().getName();
+ for (String node : subtrees.remove(country)) {
+ assertTrue(iterator.hasNext());
+ assertEquals(node, iterator.next().getName());
+ }
+ }
+ assertFalse(iterator.hasNext());
+ assertTrue(subtrees.isEmpty());
+ }
+
+ @Test
+ public void testGetPath() {
+ Map<String, String> nameToPath = new HashMap<String, String>();
+ nameToPath.put("countries", "/countries");
+ nameToPath.put("uk", "/countries/uk");
+ nameToPath.put("frognal", "/countries/uk/cities/london/districts/frognal");
+ nameToPath.put("paris", "/countries/france/cities/paris");
+
+ DepthFirstNodeIterator iterator = new DepthFirstNodeIterator(store.getRoot());
+ while (iterator.hasNext()) {
+ String expectedPath = nameToPath.remove(iterator.next().getName());
+ if (expectedPath == null) {
+ continue;
+ }
+ assertEquals(expectedPath, iterator.getPath());
+ }
+ assertTrue(nameToPath.isEmpty());
+ }
+}
Added: jackrabbit/oak/trunk/oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/blob/migration/DocumentToExternalMigrationTest.java
URL: http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/blob/migration/DocumentToExternalMigrationTest.java?rev=1703555&view=auto
==============================================================================
--- jackrabbit/oak/trunk/oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/blob/migration/DocumentToExternalMigrationTest.java (added)
+++ jackrabbit/oak/trunk/oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/blob/migration/DocumentToExternalMigrationTest.java Thu Sep 17 10:24:07 2015
@@ -0,0 +1,76 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.jackrabbit.oak.plugins.blob.migration;
+
+import java.io.File;
+import java.io.IOException;
+
+import org.apache.jackrabbit.oak.plugins.document.DocumentMK;
+import org.apache.jackrabbit.oak.plugins.document.DocumentNodeStore;
+import org.apache.jackrabbit.oak.plugins.document.MongoUtils;
+import org.apache.jackrabbit.oak.plugins.document.mongo.MongoBlobStore;
+import org.apache.jackrabbit.oak.plugins.document.util.MongoConnection;
+import org.apache.jackrabbit.oak.spi.blob.BlobStore;
+import org.apache.jackrabbit.oak.spi.blob.FileBlobStore;
+import org.apache.jackrabbit.oak.spi.state.NodeStore;
+import org.junit.Assume;
+import org.junit.BeforeClass;
+
+public class DocumentToExternalMigrationTest extends AbstractMigratorTest {
+
+ private DocumentNodeStore nodeStore;;
+
+ @Override
+ protected NodeStore createNodeStore(BlobStore blobStore, File repository) throws IOException {
+ MongoConnection connection = MongoUtils.getConnection();
+ Assume.assumeNotNull(connection);
+ DocumentMK.Builder builder = new DocumentMK.Builder();
+ if (blobStore != null) {
+ builder.setBlobStore(blobStore);
+ }
+ builder.setMongoDB(connection.getDB());
+ return nodeStore = builder.getNodeStore();
+ }
+
+ @Override
+ protected void closeNodeStore() {
+ if (nodeStore != null) {
+ nodeStore.dispose();
+ nodeStore = null;
+ }
+ }
+
+ @BeforeClass
+ public static void checkMongoDbAvailable() {
+ Assume.assumeNotNull(MongoUtils.getConnection());
+ }
+
+ @Override
+ protected BlobStore createOldBlobStore(File repository) {
+ MongoConnection connection = MongoUtils.getConnection();
+ return new MongoBlobStore(connection.getDB());
+ }
+
+ @Override
+ protected BlobStore createNewBlobStore(File repository) {
+ return new FileBlobStore(repository.getPath() + "/new");
+ }
+
+}
Added: jackrabbit/oak/trunk/oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/blob/migration/ExternalToExternalMigrationTest.java
URL: http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/blob/migration/ExternalToExternalMigrationTest.java?rev=1703555&view=auto
==============================================================================
--- jackrabbit/oak/trunk/oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/blob/migration/ExternalToExternalMigrationTest.java (added)
+++ jackrabbit/oak/trunk/oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/blob/migration/ExternalToExternalMigrationTest.java Thu Sep 17 10:24:07 2015
@@ -0,0 +1,58 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.jackrabbit.oak.plugins.blob.migration;
+
+import java.io.File;
+import java.io.IOException;
+
+import org.apache.jackrabbit.oak.plugins.segment.SegmentNodeStore;
+import org.apache.jackrabbit.oak.plugins.segment.SegmentStore;
+import org.apache.jackrabbit.oak.plugins.segment.file.FileStore;
+import org.apache.jackrabbit.oak.spi.blob.BlobStore;
+import org.apache.jackrabbit.oak.spi.blob.FileBlobStore;
+import org.apache.jackrabbit.oak.spi.state.NodeStore;
+
+public class ExternalToExternalMigrationTest extends AbstractMigratorTest {
+
+ private SegmentStore segmentStore;
+
+ @Override
+ protected NodeStore createNodeStore(BlobStore blobStore, File repository) throws IOException {
+ File segmentDir = new File(repository, "segmentstore");
+ segmentStore = FileStore.newFileStore(segmentDir).withBlobStore(blobStore).create();
+ return SegmentNodeStore.newSegmentNodeStore(segmentStore).create();
+ }
+
+ @Override
+ protected void closeNodeStore() {
+ segmentStore.close();
+ }
+
+ @Override
+ protected BlobStore createOldBlobStore(File repository) {
+ return new FileBlobStore(repository.getPath() + "/old");
+ }
+
+ @Override
+ protected BlobStore createNewBlobStore(File repository) {
+ return new FileBlobStore(repository.getPath() + "/new");
+ }
+
+}
Added: jackrabbit/oak/trunk/oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/blob/migration/SegmentToExternalMigrationTest.java
URL: http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/blob/migration/SegmentToExternalMigrationTest.java?rev=1703555&view=auto
==============================================================================
--- jackrabbit/oak/trunk/oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/blob/migration/SegmentToExternalMigrationTest.java (added)
+++ jackrabbit/oak/trunk/oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/blob/migration/SegmentToExternalMigrationTest.java Thu Sep 17 10:24:07 2015
@@ -0,0 +1,62 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.jackrabbit.oak.plugins.blob.migration;
+
+import java.io.File;
+import java.io.IOException;
+
+import org.apache.jackrabbit.oak.plugins.segment.SegmentNodeStore;
+import org.apache.jackrabbit.oak.plugins.segment.SegmentStore;
+import org.apache.jackrabbit.oak.plugins.segment.file.FileStore;
+import org.apache.jackrabbit.oak.spi.blob.BlobStore;
+import org.apache.jackrabbit.oak.spi.blob.FileBlobStore;
+import org.apache.jackrabbit.oak.spi.state.NodeStore;
+
+public class SegmentToExternalMigrationTest extends AbstractMigratorTest {
+
+ private SegmentStore segmentStore;
+
+ @Override
+ protected NodeStore createNodeStore(BlobStore blobStore, File repository) throws IOException {
+ File segmentDir = new File(repository, "segmentstore");
+ FileStore.Builder builder = FileStore.newFileStore(segmentDir);
+ if (blobStore != null) {
+ builder.withBlobStore(blobStore);
+ }
+ segmentStore = builder.create();
+ return SegmentNodeStore.newSegmentNodeStore(segmentStore).create();
+ }
+
+ @Override
+ protected void closeNodeStore() {
+ segmentStore.close();
+ }
+
+ @Override
+ protected BlobStore createOldBlobStore(File repository) {
+ return null;
+ }
+
+ @Override
+ protected BlobStore createNewBlobStore(File repository) {
+ return new FileBlobStore(repository.getPath() + "/new");
+ }
+
+}
\ No newline at end of file