You are viewing a plain text version of this content. The canonical link for it is here.
Posted to oak-commits@jackrabbit.apache.org by ch...@apache.org on 2015/05/21 17:26:46 UTC
svn commit: r1680903 [1/2] - in /jackrabbit/oak/trunk/oak-lucene/src:
main/java/org/apache/jackrabbit/oak/plugins/index/lucene/
test/java/org/apache/jackrabbit/oak/plugins/index/lucene/
Author: chetanm
Date: Thu May 21 15:26:46 2015
New Revision: 1680903
URL: http://svn.apache.org/r1680903
Log:
OAK-2247 - CopyOnWriteDirectory implementation for Lucene for use in indexing
Modified:
jackrabbit/oak/trunk/oak-lucene/src/main/java/org/apache/jackrabbit/oak/plugins/index/lucene/CopyOnReadStatsMBean.java
jackrabbit/oak/trunk/oak-lucene/src/main/java/org/apache/jackrabbit/oak/plugins/index/lucene/IndexCopier.java
jackrabbit/oak/trunk/oak-lucene/src/main/java/org/apache/jackrabbit/oak/plugins/index/lucene/IndexDefinition.java
jackrabbit/oak/trunk/oak-lucene/src/main/java/org/apache/jackrabbit/oak/plugins/index/lucene/IndexNode.java
jackrabbit/oak/trunk/oak-lucene/src/main/java/org/apache/jackrabbit/oak/plugins/index/lucene/LuceneIndexConstants.java
jackrabbit/oak/trunk/oak-lucene/src/main/java/org/apache/jackrabbit/oak/plugins/index/lucene/LuceneIndexEditor.java
jackrabbit/oak/trunk/oak-lucene/src/main/java/org/apache/jackrabbit/oak/plugins/index/lucene/LuceneIndexEditorContext.java
jackrabbit/oak/trunk/oak-lucene/src/main/java/org/apache/jackrabbit/oak/plugins/index/lucene/LuceneIndexEditorProvider.java
jackrabbit/oak/trunk/oak-lucene/src/main/java/org/apache/jackrabbit/oak/plugins/index/lucene/LuceneIndexProviderService.java
jackrabbit/oak/trunk/oak-lucene/src/test/java/org/apache/jackrabbit/oak/plugins/index/lucene/IndexCopierTest.java
jackrabbit/oak/trunk/oak-lucene/src/test/java/org/apache/jackrabbit/oak/plugins/index/lucene/LuceneIndexProviderServiceTest.java
jackrabbit/oak/trunk/oak-lucene/src/test/java/org/apache/jackrabbit/oak/plugins/index/lucene/LuceneIndexTest.java
jackrabbit/oak/trunk/oak-lucene/src/test/java/org/apache/jackrabbit/oak/plugins/index/lucene/LucenePropertyIndexTest.java
Modified: jackrabbit/oak/trunk/oak-lucene/src/main/java/org/apache/jackrabbit/oak/plugins/index/lucene/CopyOnReadStatsMBean.java
URL: http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-lucene/src/main/java/org/apache/jackrabbit/oak/plugins/index/lucene/CopyOnReadStatsMBean.java?rev=1680903&r1=1680902&r2=1680903&view=diff
==============================================================================
--- jackrabbit/oak/trunk/oak-lucene/src/main/java/org/apache/jackrabbit/oak/plugins/index/lucene/CopyOnReadStatsMBean.java (original)
+++ jackrabbit/oak/trunk/oak-lucene/src/main/java/org/apache/jackrabbit/oak/plugins/index/lucene/CopyOnReadStatsMBean.java Thu May 21 15:26:46 2015
@@ -23,13 +23,17 @@ import javax.management.openmbean.Tabula
@SuppressWarnings("UnusedDeclaration")
public interface CopyOnReadStatsMBean {
- String TYPE = "CopyOnReadStats";
+ String TYPE = "IndexCopierStats";
TabularData getIndexPathMapping();
- int getLocalReadCount();
+ int getReaderLocalReadCount();
- int getRemoteReadCount();
+ int getReaderRemoteReadCount();
+
+ int getWriterLocalReadCount();
+
+ int getWriterRemoteReadCount();
int getScheduledForCopyCount();
@@ -47,6 +51,14 @@ public interface CopyOnReadStatsMBean {
long getDownloadTime();
+ int getDownloadCount();
+
+ String getUploadSize();
+
+ long getUploadTime();
+
+ int getUploadCount();
+
String getLocalIndexSize();
String[] getGarbageDetails();
@@ -56,4 +68,6 @@ public interface CopyOnReadStatsMBean {
int getDeletedFilesCount();
String getGarbageCollectedSize();
+
+ String getSkippedFromUploadSize();
}
Modified: jackrabbit/oak/trunk/oak-lucene/src/main/java/org/apache/jackrabbit/oak/plugins/index/lucene/IndexCopier.java
URL: http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-lucene/src/main/java/org/apache/jackrabbit/oak/plugins/index/lucene/IndexCopier.java?rev=1680903&r1=1680902&r2=1680903&view=diff
==============================================================================
--- jackrabbit/oak/trunk/oak-lucene/src/main/java/org/apache/jackrabbit/oak/plugins/index/lucene/IndexCopier.java (original)
+++ jackrabbit/oak/trunk/oak-lucene/src/main/java/org/apache/jackrabbit/oak/plugins/index/lucene/IndexCopier.java Thu May 21 15:26:46 2015
@@ -20,6 +20,7 @@
package org.apache.jackrabbit.oak.plugins.index.lucene;
import java.io.File;
+import java.io.FileNotFoundException;
import java.io.IOException;
import java.util.Arrays;
import java.util.Collection;
@@ -27,12 +28,17 @@ import java.util.Collections;
import java.util.HashSet;
import java.util.Map;
import java.util.Set;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.Callable;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
+import java.util.concurrent.CountDownLatch;
import java.util.concurrent.Executor;
+import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;
+import java.util.concurrent.atomic.AtomicReference;
import javax.management.openmbean.CompositeDataSupport;
import javax.management.openmbean.CompositeType;
@@ -46,13 +52,13 @@ import javax.management.openmbean.Tabula
import com.google.common.base.Charsets;
import com.google.common.base.Function;
import com.google.common.collect.ImmutableSet;
-import com.google.common.collect.Maps;
+import com.google.common.collect.Iterables;
import com.google.common.collect.Sets;
import com.google.common.hash.Hashing;
-
import org.apache.commons.io.FileUtils;
import org.apache.jackrabbit.oak.commons.IOUtils;
-import org.apache.lucene.store.BaseDirectory;
+import org.apache.jackrabbit.oak.commons.concurrent.NotifyingFutureTask;
+import org.apache.jackrabbit.oak.util.PerfLogger;
import org.apache.lucene.store.Directory;
import org.apache.lucene.store.FSDirectory;
import org.apache.lucene.store.FilterDirectory;
@@ -66,52 +72,99 @@ import static com.google.common.base.Pre
import static com.google.common.collect.Iterables.toArray;
import static com.google.common.collect.Iterables.transform;
import static com.google.common.collect.Maps.newConcurrentMap;
+import static org.apache.jackrabbit.oak.commons.IOUtils.humanReadableByteCount;
-class IndexCopier implements CopyOnReadStatsMBean {
+public class IndexCopier implements CopyOnReadStatsMBean {
private static final Set<String> REMOTE_ONLY = ImmutableSet.of("segments.gen");
private static final int MAX_FAILURE_ENTRIES = 10000;
+ private static final AtomicInteger UNIQUE_COUNTER = new AtomicInteger();
+ private static final String WORK_DIR_NAME = "indexWriterDir";
private final Logger log = LoggerFactory.getLogger(getClass());
+ private final PerfLogger PERF_LOGGER = new PerfLogger(LoggerFactory.getLogger(log.getName() + ".perf"));
private final Executor executor;
private final File indexRootDir;
+ private final File indexWorkDir;
- private final AtomicInteger localReadCount = new AtomicInteger();
- private final AtomicInteger remoteReadCount = new AtomicInteger();
+ private final AtomicInteger readerLocalReadCount = new AtomicInteger();
+ private final AtomicInteger writerLocalReadCount = new AtomicInteger();
+ private final AtomicInteger readerRemoteReadCount = new AtomicInteger();
+ private final AtomicInteger writerRemoteReadCount = new AtomicInteger();
private final AtomicInteger invalidFileCount = new AtomicInteger();
private final AtomicInteger deletedFileCount = new AtomicInteger();
private final AtomicInteger scheduledForCopyCount = new AtomicInteger();
private final AtomicInteger copyInProgressCount = new AtomicInteger();
private final AtomicInteger maxCopyInProgressCount = new AtomicInteger();
private final AtomicInteger maxScheduledForCopyCount = new AtomicInteger();
+ private final AtomicInteger uploadCount = new AtomicInteger();
+ private final AtomicInteger downloadCount = new AtomicInteger();
private final AtomicLong copyInProgressSize = new AtomicLong();
private final AtomicLong downloadSize = new AtomicLong();
+ private final AtomicLong uploadSize = new AtomicLong();
private final AtomicLong garbageCollectedSize = new AtomicLong();
+ private final AtomicLong skippedFromUploadSize = new AtomicLong();
private final AtomicLong downloadTime = new AtomicLong();
+ private final AtomicLong uploadTime = new AtomicLong();
- private final Map<String, String> indexPathMapping = Maps.newConcurrentMap();
- private final Map<String, String> indexPathVersionMapping = Maps.newConcurrentMap();
- private final ConcurrentMap<String, LocalIndexFile> failedToDeleteFiles = Maps.newConcurrentMap();
+ private final Map<String, String> indexPathMapping = newConcurrentMap();
+ private final Map<String, String> indexPathVersionMapping = newConcurrentMap();
+ private final ConcurrentMap<String, LocalIndexFile> failedToDeleteFiles = newConcurrentMap();
private final Set<LocalIndexFile> copyInProgressFiles = Collections.newSetFromMap(new ConcurrentHashMap<LocalIndexFile, Boolean>());
- public IndexCopier(Executor executor, File indexRootDir) {
+ public IndexCopier(Executor executor, File indexRootDir) throws IOException {
this.executor = executor;
this.indexRootDir = indexRootDir;
+ this.indexWorkDir = initializerWorkDir(indexRootDir);
}
- public Directory wrap(String indexPath, IndexDefinition definition, Directory remote) throws IOException {
- Directory local = createLocalDir(indexPath, definition);
+ public Directory wrapForRead(String indexPath, IndexDefinition definition, Directory remote) throws IOException {
+ Directory local = createLocalDirForIndexReader(indexPath, definition);
return new CopyOnReadDirectory(remote, local);
}
- protected Directory createLocalDir(String indexPath, IndexDefinition definition) throws IOException {
+ public Directory wrapForWrite(IndexDefinition definition, Directory remote, boolean reindexMode) throws IOException {
+ Directory local = createLocalDirForIndexWriter(definition);
+ return new CopyOnWriteDirectory(remote, local, reindexMode);
+ }
+
+ File getIndexWorkDir() {
+ return indexWorkDir;
+ }
+
+ File getIndexRootDir() {
+ return indexRootDir;
+ }
+
+ protected Directory createLocalDirForIndexWriter(IndexDefinition definition) throws IOException {
+ String indexPath = definition.getIndexPathFromConfig();
+ File indexWriterDir;
+ if (indexPath == null){
+ //If indexPath is not known create a unique directory for work
+ indexWriterDir = new File(indexWorkDir, String.valueOf(UNIQUE_COUNTER.incrementAndGet()));
+ } else {
+ File indexDir = getIndexDir(indexPath);
+ String newVersion = String.valueOf(definition.getReindexCount());
+ indexWriterDir = getVersionedDir(indexPath, indexDir, newVersion);
+ }
+ Directory dir = FSDirectory.open(indexWriterDir);
+
+ log.debug("IndexWriter would use {}", indexWriterDir);
+
+ if (indexPath == null) {
+ dir = new DeleteOldDirOnClose(dir, indexWriterDir);
+ log.debug("IndexPath [{}] not configured in index definition {}. Writer would create index " +
+ "files in temporary dir {} which would be deleted upon close. For better performance do " +
+ "configure the 'indexPath' as part of your index definition", LuceneIndexConstants.INDEX_PATH,
+ definition, indexWriterDir);
+ }
+ return dir;
+ }
+
+ protected Directory createLocalDirForIndexReader(String indexPath, IndexDefinition definition) throws IOException {
File indexDir = getIndexDir(indexPath);
String newVersion = String.valueOf(definition.getReindexCount());
- File versionedIndexDir = new File(indexDir, newVersion);
- if (!versionedIndexDir.exists()) {
- checkState(versionedIndexDir.mkdirs(), "Cannot create directory %s", versionedIndexDir);
- }
- indexPathMapping.put(indexPath, indexDir.getAbsolutePath());
+ File versionedIndexDir = getVersionedDir(indexPath, indexDir, newVersion);
Directory result = FSDirectory.open(versionedIndexDir);
String oldVersion = indexPathVersionMapping.put(indexPath, newVersion);
@@ -121,6 +174,15 @@ class IndexCopier implements CopyOnReadS
return result;
}
+ private File getVersionedDir(String indexPath, File indexDir, String newVersion) {
+ File versionedIndexDir = new File(indexDir, newVersion);
+ if (!versionedIndexDir.exists()) {
+ checkState(versionedIndexDir.mkdirs(), "Cannot create directory %s", versionedIndexDir);
+ }
+ indexPathMapping.put(indexPath, indexDir.getAbsolutePath());
+ return versionedIndexDir;
+ }
+
public File getIndexDir(String indexPath) {
String subDir = Hashing.sha256().hashString(indexPath, Charsets.UTF_8).toString();
return new File(indexRootDir, subDir);
@@ -157,14 +219,27 @@ class IndexCopier implements CopyOnReadS
}
/**
+ * Creates the workDir. If it exists then it is cleaned
+ *
+ * @param indexRootDir root directory under which all indexing related files are managed
+ * @return work directory. Always empty
+ */
+ private static File initializerWorkDir(File indexRootDir) throws IOException {
+ File workDir = new File(indexRootDir, WORK_DIR_NAME);
+ FileUtils.deleteDirectory(workDir);
+ checkState(workDir.mkdirs(), "Cannot create directory %s", workDir);
+ return workDir;
+ }
+
+ /**
* Directory implementation which lazily copies the index files from a
* remote directory in background.
*/
- private class CopyOnReadDirectory extends BaseDirectory {
+ private class CopyOnReadDirectory extends FilterDirectory {
private final Directory remote;
private final Directory local;
- private final ConcurrentMap<String, FileReference> files = newConcurrentMap();
+ private final ConcurrentMap<String, CORFileReference> files = newConcurrentMap();
/**
* Set of fileNames bound to current local dir. It is updated with any new file
* which gets added by this directory
@@ -172,59 +247,40 @@ class IndexCopier implements CopyOnReadS
private final Set<String> localFileNames = Sets.newConcurrentHashSet();
public CopyOnReadDirectory(Directory remote, Directory local) throws IOException {
+ super(remote);
this.remote = remote;
this.local = local;
this.localFileNames.addAll(Arrays.asList(local.listAll()));
}
@Override
- public String[] listAll() throws IOException {
- return remote.listAll();
- }
-
- @Override
- public boolean fileExists(String name) throws IOException {
- return remote.fileExists(name);
- }
-
- @Override
public void deleteFile(String name) throws IOException {
throw new UnsupportedOperationException("Cannot delete in a ReadOnly directory");
}
@Override
- public long fileLength(String name) throws IOException {
- return remote.fileLength(name);
- }
-
- @Override
public IndexOutput createOutput(String name, IOContext context) throws IOException {
throw new UnsupportedOperationException("Cannot write in a ReadOnly directory");
}
@Override
- public void sync(Collection<String> names) throws IOException {
- remote.sync(names);
- }
-
- @Override
public IndexInput openInput(String name, IOContext context) throws IOException {
if (REMOTE_ONLY.contains(name)) {
return remote.openInput(name, context);
}
- FileReference ref = files.get(name);
+ CORFileReference ref = files.get(name);
if (ref != null) {
if (ref.isLocalValid()) {
return files.get(name).openLocalInput(context);
} else {
- remoteReadCount.incrementAndGet();
+ readerRemoteReadCount.incrementAndGet();
return remote.openInput(name, context);
}
}
- FileReference toPut = new FileReference(name);
- FileReference old = files.putIfAbsent(name, toPut);
+ CORFileReference toPut = new CORFileReference(name);
+ CORFileReference old = files.putIfAbsent(name, toPut);
if (old == null) {
copy(toPut);
}
@@ -237,7 +293,7 @@ class IndexCopier implements CopyOnReadS
return remote.openInput(name, context);
}
- private void copy(final FileReference reference) {
+ private void copy(final CORFileReference reference) {
updateMaxScheduled(scheduledForCopyCount.incrementAndGet());
executor.execute(new Runnable() {
@Override
@@ -249,7 +305,7 @@ class IndexCopier implements CopyOnReadS
scheduledForCopyCount.decrementAndGet();
if (!local.fileExists(name)) {
long fileSize = remote.fileLength(name);
- LocalIndexFile file = new LocalIndexFile(local, name, fileSize);
+ LocalIndexFile file = new LocalIndexFile(local, name, fileSize, true);
long start = startCopy(file);
copyAttempted = true;
@@ -348,19 +404,9 @@ class IndexCopier implements CopyOnReadS
Set<String> failedToDelete = Sets.newHashSet();
for (String fileName : filesToBeDeleted) {
- LocalIndexFile file = new LocalIndexFile(local, fileName);
- try {
- boolean fileExisted = false;
- if (local.fileExists(fileName)) {
- fileExisted = true;
- local.deleteFile(fileName);
- }
- successfullyDeleted(file, fileExisted);
- } catch (IOException e) {
+ boolean deleted = IndexCopier.this.deleteFile(local, fileName, true);
+ if (!deleted){
failedToDelete.add(fileName);
- failedToDelete(file);
- log.debug("Error occurred while removing deleted file {} from Local {}. " +
- "Attempt would be maid to delete it on next run ", fileName, local, e);
}
}
@@ -372,11 +418,11 @@ class IndexCopier implements CopyOnReadS
}
}
- private class FileReference {
+ private class CORFileReference {
final String name;
private volatile boolean valid;
- private FileReference(String name) {
+ private CORFileReference(String name) {
this.name = name;
}
@@ -385,7 +431,7 @@ class IndexCopier implements CopyOnReadS
}
IndexInput openLocalInput( IOContext context) throws IOException {
- localReadCount.incrementAndGet();
+ readerLocalReadCount.incrementAndGet();
return local.openInput(name, context);
}
@@ -396,6 +442,435 @@ class IndexCopier implements CopyOnReadS
}
}
+ private class CopyOnWriteDirectory extends FilterDirectory {
+ /**
+ * Signal for the background thread to stop processing changes.
+ */
+ private final Callable<Void> STOP = new Callable<Void>() {
+ @Override
+ public Void call() throws Exception {
+ return null;
+ }
+ };
+ private final Directory remote;
+ private final Directory local;
+ private final ConcurrentMap<String, COWFileReference> fileMap = newConcurrentMap();
+ private final Set<String> deletedFilesLocal = Sets.newConcurrentHashSet();
+ private final Set<String> skippedFiles = Sets.newConcurrentHashSet();
+
+ private final BlockingQueue<Callable<Void>> queue = new LinkedBlockingQueue<Callable<Void>>();
+ private final AtomicReference<Throwable> errorInCopy = new AtomicReference<Throwable>();
+ private final CountDownLatch copyDone = new CountDownLatch(1);
+ private final boolean reindexMode;
+
+ /**
+ * Current background task
+ */
+ private volatile NotifyingFutureTask currentTask = NotifyingFutureTask.completed();
+
+ /**
+ * Completion handler: set the current task to the next task and schedules that one
+ * on the background thread.
+ */
+ private final Runnable completionHandler = new Runnable() {
+ Callable<Void> task = new Callable<Void>() {
+ @Override
+ public Void call() throws Exception {
+ try {
+ Callable<Void> task = queue.poll();
+ if (task != null && task != STOP) {
+ if (errorInCopy.get() != null) {
+ log.trace("Skipping task {} as some exception occurred in previous run", task);
+ } else {
+ task.call();
+ }
+ currentTask.onComplete(completionHandler);
+ }
+
+ //Signal that all tasks completed
+ if (task == STOP){
+ copyDone.countDown();
+ }
+ } catch (Throwable t) {
+ errorInCopy.set(t);
+ log.debug("Error occurred while copying files. Further processing would be skipped", t);
+ currentTask.onComplete(completionHandler);
+ }
+ return null;
+ }
+ };
+
+ @Override
+ public void run() {
+ currentTask = new NotifyingFutureTask(task);
+ executor.execute(currentTask);
+ }
+ };
+
+ public CopyOnWriteDirectory(Directory remote, Directory local, boolean reindexMode) throws IOException {
+ super(local);
+ this.remote = remote;
+ this.local = local;
+ this.reindexMode = reindexMode;
+ initialize();
+ }
+
+ @Override
+ public String[] listAll() throws IOException {
+ return Iterables.toArray(fileMap.keySet(), String.class);
+ }
+
+ @Override
+ public boolean fileExists(String name) throws IOException {
+ return fileMap.containsKey(name);
+ }
+
+ @Override
+ public void deleteFile(String name) throws IOException {
+ log.trace("[COW] Deleted file {}", name);
+ COWFileReference ref = fileMap.remove(name);
+ if (ref != null) {
+ ref.delete();
+ }
+ }
+
+ @Override
+ public long fileLength(String name) throws IOException {
+ COWFileReference ref = fileMap.get(name);
+ if (ref == null) {
+ throw new FileNotFoundException(name);
+ }
+ return ref.fileLength();
+ }
+
+ @Override
+ public IndexOutput createOutput(String name, IOContext context) throws IOException {
+ COWFileReference ref = fileMap.remove(name);
+ if (ref != null) {
+ ref.delete();
+ }
+ ref = new COWLocalFileReference(name);
+ fileMap.put(name, ref);
+ return ref.createOutput(context);
+ }
+
+ @Override
+ public void sync(Collection<String> names) throws IOException {
+ for (String name : names){
+ COWFileReference file = fileMap.get(name);
+ if (file != null){
+ file.sync();
+ }
+ }
+ }
+
+ @Override
+ public IndexInput openInput(String name, IOContext context) throws IOException {
+ COWFileReference ref = fileMap.get(name);
+ if (ref == null) {
+ throw new FileNotFoundException(name);
+ }
+ return ref.openInput(context);
+ }
+
+ @Override
+ public void close() throws IOException {
+ int pendingCopies = queue.size();
+ addTask(STOP);
+
+ //Wait for all pending copy task to finish
+ try {
+ long start = PERF_LOGGER.start();
+ copyDone.await();
+ PERF_LOGGER.end(start, -1, "Completed pending copying task {}", pendingCopies);
+ } catch (InterruptedException e) {
+ Thread.currentThread().interrupt();
+ throw new IOException(e);
+ }
+
+ Throwable t = errorInCopy.get();
+ if (t != null){
+ throw new IOException("Error occurred while copying files", t);
+ }
+
+ long skippedFilesSize = getSkippedFilesSize();
+
+ for (String fileName : deletedFilesLocal){
+ deleteLocalFile(fileName);
+ }
+
+ skippedFromUploadSize.addAndGet(skippedFilesSize);
+
+ String msg = "CopyOnWrite stats : Skipped copying {} files with total size {}";
+ if (reindexMode || skippedFilesSize > 10 * FileUtils.ONE_MB){
+ log.info(msg, skippedFiles.size(), humanReadableByteCount(skippedFilesSize));
+ } else {
+ log.debug(msg, skippedFiles.size(), humanReadableByteCount(skippedFilesSize));
+ }
+
+ if (log.isTraceEnabled()){
+ log.trace("File listing - Upon completion {}", Arrays.toString(remote.listAll()));
+ }
+
+ local.close();
+ remote.close();
+ }
+
+ private long getSkippedFilesSize() {
+ long size = 0;
+ for (String name : skippedFiles){
+ try{
+ if (local.fileExists(name)){
+ size += local.fileLength(name);
+ }
+ } catch (Exception ignore){
+
+ }
+ }
+ return size;
+ }
+
+ private void deleteLocalFile(String fileName) {
+ IndexCopier.this.deleteFile(local, fileName, false);
+ }
+
+ private void initialize() throws IOException {
+ for (String name : remote.listAll()) {
+ fileMap.put(name, new COWRemoteFileReference(name));
+ }
+
+ if (log.isTraceEnabled()){
+ log.trace("File listing - Start" + Arrays.toString(remote.listAll()));
+ }
+ }
+
+ private void addCopyTask(final String name){
+ updateMaxScheduled(scheduledForCopyCount.incrementAndGet());
+ addTask(new Callable<Void>() {
+ @Override
+ public Void call() throws Exception {
+ scheduledForCopyCount.decrementAndGet();
+ if (deletedFilesLocal.contains(name)){
+ skippedFiles.add(name);
+ log.trace("[COW] Skip copying of deleted file {}", name);
+ return null;
+ }
+ long fileSize = local.fileLength(name);
+ LocalIndexFile file = new LocalIndexFile(local, name, fileSize, false);
+ long perfStart = PERF_LOGGER.start();
+ long start = startCopy(file);
+
+ local.copy(remote, name, name, IOContext.DEFAULT);
+
+ doneCopy(file, start);
+ PERF_LOGGER.end(perfStart, 0, "Copied to remote {} ", name);
+ return null;
+ }
+
+ @Override
+ public String toString() {
+ return "Copy: " + name;
+ }
+ });
+ }
+
+ private void addDeleteTask(final String name){
+ addTask(new Callable<Void>() {
+ @Override
+ public Void call() throws Exception {
+ if (!skippedFiles.contains(name)) {
+ log.trace("[COW] Marking as deleted {}", name);
+ remote.deleteFile(name);
+ }
+ return null;
+ }
+
+ @Override
+ public String toString() {
+ return "Delete : " + name;
+ }
+ });
+ }
+
+ private void addTask(Callable<Void> task){
+ queue.add(task);
+ currentTask.onComplete(completionHandler);
+ }
+
+ private abstract class COWFileReference {
+ protected final String name;
+
+ public COWFileReference(String name) {
+ this.name = name;
+ }
+
+ public abstract long fileLength() throws IOException;
+
+ public abstract IndexInput openInput(IOContext context) throws IOException;
+
+ public abstract IndexOutput createOutput(IOContext context) throws IOException;
+
+ public abstract void delete() throws IOException;
+
+ public void sync() throws IOException {
+
+ }
+ }
+
+ private class COWRemoteFileReference extends COWFileReference {
+ private boolean validLocalCopyPresent;
+ private final long length;
+
+ public COWRemoteFileReference(String name) throws IOException {
+ super(name);
+ this.length = remote.fileLength(name);
+ }
+
+ @Override
+ public long fileLength() throws IOException {
+ return length;
+ }
+
+ @Override
+ public IndexInput openInput(IOContext context) throws IOException {
+ checkIfLocalValid();
+ if (validLocalCopyPresent && !REMOTE_ONLY.contains(name)) {
+ writerLocalReadCount.incrementAndGet();
+ return local.openInput(name, context);
+ }
+ writerRemoteReadCount.incrementAndGet();
+ return remote.openInput(name, context);
+ }
+
+ @Override
+ public IndexOutput createOutput(IOContext context) throws IOException {
+ throw new UnsupportedOperationException("Cannot create output for existing remote file " + name);
+ }
+
+ @Override
+ public void delete() throws IOException {
+ //Remote file should not be deleted locally as it might be
+ //in use by existing opened IndexSearcher. It would anyway
+ //get deleted by CopyOnRead later
+ //For now just record that these need to be deleted to avoid
+ //potential concurrent access of the NodeBuilder
+ addDeleteTask(name);
+ }
+
+ private void checkIfLocalValid() throws IOException {
+ validLocalCopyPresent = local.fileExists(name)
+ && local.fileLength(name) == remote.fileLength(name);
+ }
+ }
+
+ private class COWLocalFileReference extends COWFileReference {
+ public COWLocalFileReference(String name) {
+ super(name);
+ }
+
+ @Override
+ public long fileLength() throws IOException {
+ return local.fileLength(name);
+ }
+
+ @Override
+ public IndexInput openInput(IOContext context) throws IOException {
+ return local.openInput(name, context);
+ }
+
+ @Override
+ public IndexOutput createOutput(IOContext context) throws IOException {
+ log.debug("[COW] Creating output {}", name);
+ return new CopyOnCloseIndexOutput(local.createOutput(name, context));
+ }
+
+ @Override
+ public void delete() throws IOException {
+ addDeleteTask(name);
+ deletedFilesLocal.add(name);
+ }
+
+ @Override
+ public void sync() throws IOException {
+ local.sync(Collections.singleton(name));
+ }
+
+ /**
+ * Implementation note - As we are decorating existing implementation
+ * we would need to ensure that we also override methods (non abstract)
+ * which might be implemented in say FSIndexInput like setLength
+ */
+ private class CopyOnCloseIndexOutput extends IndexOutput {
+ private final IndexOutput delegate;
+
+ public CopyOnCloseIndexOutput(IndexOutput delegate) {
+ this.delegate = delegate;
+ }
+
+ @Override
+ public void flush() throws IOException {
+ delegate.flush();
+ }
+
+ @Override
+ public void close() throws IOException {
+ delegate.close();
+ //Schedule this file to be copied in background
+ addCopyTask(name);
+ }
+
+ @Override
+ public long getFilePointer() {
+ return delegate.getFilePointer();
+ }
+
+ @Override
+ public void seek(long pos) throws IOException {
+ delegate.seek(pos);
+ }
+
+ @Override
+ public long length() throws IOException {
+ return delegate.length();
+ }
+
+ @Override
+ public void writeByte(byte b) throws IOException {
+ delegate.writeByte(b);
+ }
+
+ @Override
+ public void writeBytes(byte[] b, int offset, int length) throws IOException {
+ delegate.writeBytes(b, offset, length);
+ }
+
+ @Override
+ public void setLength(long length) throws IOException {
+ delegate.setLength(length);
+ }
+ }
+ }
+ }
+
+ private boolean deleteFile(Directory dir, String fileName, boolean copiedFromRemote){
+ LocalIndexFile file = new LocalIndexFile(dir, fileName, getFileLength(dir, fileName), copiedFromRemote);
+ boolean successFullyDeleted = false;
+ try {
+ boolean fileExisted = false;
+ if (dir.fileExists(fileName)) {
+ fileExisted = true;
+ dir.deleteFile(fileName);
+ }
+ successfullyDeleted(file, fileExisted);
+ successFullyDeleted = true;
+ } catch (IOException e) {
+ failedToDelete(file);
+ log.debug("Error occurred while removing deleted file {} from Local {}. " +
+ "Attempt would be maid to delete it on next run ", fileName, dir, e);
+ }
+ return successFullyDeleted;
+ }
+
private long startCopy(LocalIndexFile file) {
updateMaxInProgress(copyInProgressCount.incrementAndGet());
copyInProgressSize.addAndGet(file.size);
@@ -408,8 +883,16 @@ class IndexCopier implements CopyOnReadS
copyInProgressCount.decrementAndGet();
copyInProgressSize.addAndGet(-file.size);
- downloadTime.addAndGet(System.currentTimeMillis() - start);
- downloadSize.addAndGet(file.size);
+ if(file.copyFromRemote) {
+ downloadTime.addAndGet(System.currentTimeMillis() - start);
+ downloadSize.addAndGet(file.size);
+ downloadCount.incrementAndGet();
+ } else {
+ uploadSize.addAndGet(file.size);
+ uploadTime.addAndGet(System.currentTimeMillis() - start);
+ uploadCount.incrementAndGet();
+ }
+
}
private void updateMaxScheduled(int val) {
@@ -440,13 +923,18 @@ class IndexCopier implements CopyOnReadS
@Override
public void close() throws IOException {
- try{
- FileUtils.deleteDirectory(oldIndexDir);
- log.debug("Removed old index content from {} ", oldIndexDir);
- } catch (IOException e){
- log.warn("Not able to remove old version of copied index at {}", oldIndexDir, e);
+ try {
+ super.close();
+ } finally {
+ //Clean out the local dir irrespective of any error occurring upon
+ //close in wrapped directory
+ try{
+ FileUtils.deleteDirectory(oldIndexDir);
+ log.debug("Removed old index content from {} ", oldIndexDir);
+ } catch (IOException e){
+ log.warn("Not able to remove old version of copied index at {}", oldIndexDir, e);
+ }
}
- super.close();
}
}
@@ -454,17 +942,20 @@ class IndexCopier implements CopyOnReadS
final File dir;
final String name;
final long size;
+ final boolean copyFromRemote;
private volatile int deleteAttemptCount;
final long creationTime = System.currentTimeMillis();
- public LocalIndexFile(Directory dir, String fileName, long size){
+ public LocalIndexFile(Directory dir, String fileName,
+ long size, boolean copyFromRemote){
+ this.copyFromRemote = copyFromRemote;
this.dir = getFSDir(dir);
this.name = fileName;
this.size = size;
}
public LocalIndexFile(Directory dir, String fileName){
- this(dir, fileName, getFileLength(dir, fileName));
+ this(dir, fileName, getFileLength(dir, fileName), true);
}
public String getKey(){
@@ -484,14 +975,14 @@ class IndexCopier implements CopyOnReadS
public String deleteLog(){
return String.format("%s (%s, %d attempts, %d s)", name,
- IOUtils.humanReadableByteCount(size), deleteAttemptCount, timeTaken());
+ humanReadableByteCount(size), deleteAttemptCount, timeTaken());
}
public String copyLog(){
return String.format("%s (%s, %1.1f%%, %s, %d s)", name,
- IOUtils.humanReadableByteCount(actualSize()),
+ humanReadableByteCount(actualSize()),
copyProgress(),
- IOUtils.humanReadableByteCount(size), timeTaken());
+ humanReadableByteCount(size), timeTaken());
}
@Override
@@ -561,7 +1052,7 @@ class IndexCopier implements CopyOnReadS
"Lucene Index Stats", IndexMappingData.TYPE, new String[]{"jcrPath"});
tds = new TabularDataSupport(tt);
for (Map.Entry<String, String> e : indexPathMapping.entrySet()){
- String size = IOUtils.humanReadableByteCount(FileUtils.sizeOfDirectory(new File(e.getValue())));
+ String size = humanReadableByteCount(FileUtils.sizeOfDirectory(new File(e.getValue())));
tds.put(new CompositeDataSupport(IndexMappingData.TYPE,
IndexMappingData.FIELD_NAMES,
new String[]{e.getKey(), e.getValue(), size}));
@@ -573,13 +1064,23 @@ class IndexCopier implements CopyOnReadS
}
@Override
- public int getLocalReadCount() {
- return localReadCount.get();
+ public int getReaderLocalReadCount() {
+ return readerLocalReadCount.get();
+ }
+
+ @Override
+ public int getReaderRemoteReadCount() {
+ return readerRemoteReadCount.get();
+ }
+
+ @Override
+ public int getWriterLocalReadCount() {
+ return writerLocalReadCount.get();
}
@Override
- public int getRemoteReadCount() {
- return remoteReadCount.get();
+ public int getWriterRemoteReadCount() {
+ return writerRemoteReadCount.get();
}
public int getInvalidFileCount(){
@@ -588,7 +1089,7 @@ class IndexCopier implements CopyOnReadS
@Override
public String getDownloadSize() {
- return IOUtils.humanReadableByteCount(downloadSize.get());
+ return humanReadableByteCount(downloadSize.get());
}
@Override
@@ -597,8 +1098,28 @@ class IndexCopier implements CopyOnReadS
}
@Override
+ public int getDownloadCount() {
+ return downloadCount.get();
+ }
+
+ @Override
+ public int getUploadCount() {
+ return uploadCount.get();
+ }
+
+ @Override
+ public String getUploadSize() {
+ return humanReadableByteCount(uploadSize.get());
+ }
+
+ @Override
+ public long getUploadTime() {
+ return uploadTime.get();
+ }
+
+ @Override
public String getLocalIndexSize() {
- return IOUtils.humanReadableByteCount(FileUtils.sizeOfDirectory(indexRootDir));
+ return humanReadableByteCount(FileUtils.sizeOfDirectory(indexRootDir));
}
@Override
@@ -618,7 +1139,7 @@ class IndexCopier implements CopyOnReadS
for (LocalIndexFile failedToDeleteFile : failedToDeleteFiles.values()){
garbageSize += failedToDeleteFile.size;
}
- return IOUtils.humanReadableByteCount(garbageSize);
+ return humanReadableByteCount(garbageSize);
}
@Override
@@ -633,7 +1154,7 @@ class IndexCopier implements CopyOnReadS
@Override
public String getCopyInProgressSize() {
- return IOUtils.humanReadableByteCount(copyInProgressSize.get());
+ return humanReadableByteCount(copyInProgressSize.get());
}
@Override
@@ -646,6 +1167,10 @@ class IndexCopier implements CopyOnReadS
return maxScheduledForCopyCount.get();
}
+ public String getSkippedFromUploadSize() {
+ return humanReadableByteCount(skippedFromUploadSize.get());
+ }
+
@Override
public String[] getCopyInProgressDetails() {
return toArray(transform(copyInProgressFiles,
@@ -664,7 +1189,7 @@ class IndexCopier implements CopyOnReadS
@Override
public String getGarbageCollectedSize() {
- return IOUtils.humanReadableByteCount(garbageCollectedSize.get());
+ return humanReadableByteCount(garbageCollectedSize.get());
}
private static class IndexMappingData {
Modified: jackrabbit/oak/trunk/oak-lucene/src/main/java/org/apache/jackrabbit/oak/plugins/index/lucene/IndexDefinition.java
URL: http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-lucene/src/main/java/org/apache/jackrabbit/oak/plugins/index/lucene/IndexDefinition.java?rev=1680903&r1=1680902&r2=1680903&view=diff
==============================================================================
--- jackrabbit/oak/trunk/oak-lucene/src/main/java/org/apache/jackrabbit/oak/plugins/index/lucene/IndexDefinition.java (original)
+++ jackrabbit/oak/trunk/oak-lucene/src/main/java/org/apache/jackrabbit/oak/plugins/index/lucene/IndexDefinition.java Thu May 21 15:26:46 2015
@@ -200,6 +200,8 @@ class IndexDefinition implements Aggrega
private final int suggesterUpdateFrequencyMinutes;
+ private final long reindexCount;
+
private final PathFilter pathFilter;
@Nullable
@@ -267,6 +269,7 @@ class IndexDefinition implements Aggrega
this.maxExtractLength = determineMaxExtractLength();
this.suggesterUpdateFrequencyMinutes = getOptionalValue(defn, LuceneIndexConstants.SUGGEST_UPDATE_FREQUENCY_MINUTES, 60);
this.scorerProviderName = getOptionalValue(defn, LuceneIndexConstants.PROP_SCORER_PROVIDER, null);
+ this.reindexCount = determineReindexCount(defn, defnb);
this.pathFilter = PathFilter.from(new ReadOnlyBuilder(defn));
this.queryPaths = getQueryPaths(defn);
this.saveDirListing = getOptionalValue(defn, LuceneIndexConstants.SAVE_DIR_LISTING, true);
@@ -297,10 +300,7 @@ class IndexDefinition implements Aggrega
}
public long getReindexCount(){
- if(definition.hasProperty(REINDEX_COUNT)){
- return definition.getProperty(REINDEX_COUNT).getValue(Type.LONG);
- }
- return 0;
+ return reindexCount;
}
public long getEntryCount() {
@@ -609,6 +609,11 @@ class IndexDefinition implements Aggrega
return suggestEnabled;
}
+ @CheckForNull
+ public String getIndexPathFromConfig() {
+ return definition.getString(LuceneIndexConstants.INDEX_PATH);
+ }
+
public class IndexingRule {
private final String baseNodeType;
private final String nodeTypeName;
@@ -1149,6 +1154,9 @@ class IndexDefinition implements Aggrega
private static String determineIndexName(NodeState defn, String indexPath) {
String indexName = defn.getString(PROP_NAME);
if (indexName == null){
+ if (indexPath == null){
+ indexPath = defn.getString(LuceneIndexConstants.INDEX_PATH);
+ }
if (indexPath != null) {
return indexPath;
}
@@ -1345,4 +1353,16 @@ class IndexDefinition implements Aggrega
return defn.getChildNode(LuceneIndexConstants.INDEX_RULES).exists();
}
+ private static long determineReindexCount(NodeState defn, NodeBuilder defnb) {
+ //Give precedence to count from builder as that reflects the latest state
+ //and might be higher than one from nodeState which is the base state
+ if (defnb != null && defnb.hasProperty(REINDEX_COUNT)) {
+ return defnb.getProperty(REINDEX_COUNT).getValue(Type.LONG);
+ }
+ if (defn.hasProperty(REINDEX_COUNT)) {
+ return defn.getProperty(REINDEX_COUNT).getValue(Type.LONG);
+ }
+ return 0;
+ }
+
}
Modified: jackrabbit/oak/trunk/oak-lucene/src/main/java/org/apache/jackrabbit/oak/plugins/index/lucene/IndexNode.java
URL: http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-lucene/src/main/java/org/apache/jackrabbit/oak/plugins/index/lucene/IndexNode.java?rev=1680903&r1=1680902&r2=1680903&view=diff
==============================================================================
--- jackrabbit/oak/trunk/oak-lucene/src/main/java/org/apache/jackrabbit/oak/plugins/index/lucene/IndexNode.java (original)
+++ jackrabbit/oak/trunk/oak-lucene/src/main/java/org/apache/jackrabbit/oak/plugins/index/lucene/IndexNode.java Thu May 21 15:26:46 2015
@@ -48,7 +48,7 @@ class IndexNode {
if (data.exists()) {
directory = new OakDirectory(new ReadOnlyBuilder(data), definition, true);
if (cloner != null){
- directory = cloner.wrap(indexPath, definition, directory);
+ directory = cloner.wrapForRead(indexPath, definition, directory);
}
} else if (PERSISTENCE_FILE.equalsIgnoreCase(defnNodeState.getString(PERSISTENCE_NAME))) {
String path = defnNodeState.getString(PERSISTENCE_PATH);
Modified: jackrabbit/oak/trunk/oak-lucene/src/main/java/org/apache/jackrabbit/oak/plugins/index/lucene/LuceneIndexConstants.java
URL: http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-lucene/src/main/java/org/apache/jackrabbit/oak/plugins/index/lucene/LuceneIndexConstants.java?rev=1680903&r1=1680902&r2=1680903&view=diff
==============================================================================
--- jackrabbit/oak/trunk/oak-lucene/src/main/java/org/apache/jackrabbit/oak/plugins/index/lucene/LuceneIndexConstants.java (original)
+++ jackrabbit/oak/trunk/oak-lucene/src/main/java/org/apache/jackrabbit/oak/plugins/index/lucene/LuceneIndexConstants.java Thu May 21 15:26:46 2015
@@ -262,4 +262,12 @@ public interface LuceneIndexConstants {
* to allow faster reads (OAK-2809)
*/
String SAVE_DIR_LISTING = "saveDirectoryListing";
+
+ /**
+ * Optional Property to store the path of index in the repository. Path at which index
+ * definition is defined is not known to IndexEditor. To make use of CopyOnWrite
+ * feature its required to know the indexPath to optimize the lookup and read of
+ * existing index files
+ */
+ String INDEX_PATH = "indexPath";
}
Modified: jackrabbit/oak/trunk/oak-lucene/src/main/java/org/apache/jackrabbit/oak/plugins/index/lucene/LuceneIndexEditor.java
URL: http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-lucene/src/main/java/org/apache/jackrabbit/oak/plugins/index/lucene/LuceneIndexEditor.java?rev=1680903&r1=1680902&r2=1680903&view=diff
==============================================================================
--- jackrabbit/oak/trunk/oak-lucene/src/main/java/org/apache/jackrabbit/oak/plugins/index/lucene/LuceneIndexEditor.java (original)
+++ jackrabbit/oak/trunk/oak-lucene/src/main/java/org/apache/jackrabbit/oak/plugins/index/lucene/LuceneIndexEditor.java Thu May 21 15:26:46 2015
@@ -34,6 +34,8 @@ import java.util.List;
import java.util.Set;
import java.util.concurrent.atomic.AtomicBoolean;
+import javax.annotation.Nullable;
+
import com.google.common.collect.Iterables;
import com.google.common.collect.Lists;
@@ -118,12 +120,12 @@ public class LuceneIndexEditor implement
private final PathFilter.Result pathFilterResult;
LuceneIndexEditor(NodeState root, NodeBuilder definition,
- IndexUpdateCallback updateCallback) throws CommitFailedException {
+ IndexUpdateCallback updateCallback,@Nullable IndexCopier indexCopier) throws CommitFailedException {
this.parent = null;
this.name = null;
this.path = "/";
this.context = new LuceneIndexEditorContext(root, definition,
- updateCallback);
+ updateCallback, indexCopier);
this.root = root;
this.isDeleted = false;
this.matcherState = MatcherState.NONE;
Modified: jackrabbit/oak/trunk/oak-lucene/src/main/java/org/apache/jackrabbit/oak/plugins/index/lucene/LuceneIndexEditorContext.java
URL: http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-lucene/src/main/java/org/apache/jackrabbit/oak/plugins/index/lucene/LuceneIndexEditorContext.java?rev=1680903&r1=1680902&r2=1680903&view=diff
==============================================================================
--- jackrabbit/oak/trunk/oak-lucene/src/main/java/org/apache/jackrabbit/oak/plugins/index/lucene/LuceneIndexEditorContext.java (original)
+++ jackrabbit/oak/trunk/oak-lucene/src/main/java/org/apache/jackrabbit/oak/plugins/index/lucene/LuceneIndexEditorContext.java Thu May 21 15:26:46 2015
@@ -30,6 +30,8 @@ import java.util.Calendar;
import java.util.Set;
import java.util.concurrent.TimeUnit;
+import javax.annotation.Nullable;
+
import org.apache.commons.io.IOUtils;
import org.apache.jackrabbit.oak.api.CommitFailedException;
import org.apache.jackrabbit.oak.api.PropertyState;
@@ -61,14 +63,16 @@ public class LuceneIndexEditorContext {
private static final PerfLogger PERF_LOGGER =
new PerfLogger(LoggerFactory.getLogger(LuceneIndexEditorContext.class.getName() + ".perf"));
- static IndexWriterConfig getIndexWriterConfig(IndexDefinition definition) {
+ static IndexWriterConfig getIndexWriterConfig(IndexDefinition definition, boolean remoteDir) {
// FIXME: Hack needed to make Lucene work in an OSGi environment
Thread thread = Thread.currentThread();
ClassLoader loader = thread.getContextClassLoader();
thread.setContextClassLoader(IndexWriterConfig.class.getClassLoader());
try {
IndexWriterConfig config = new IndexWriterConfig(VERSION, definition.getAnalyzer());
- config.setMergeScheduler(new SerialMergeScheduler());
+ if (remoteDir) {
+ config.setMergeScheduler(new SerialMergeScheduler());
+ }
if (definition.getCodec() != null) {
config.setCodec(definition.getCodec());
}
@@ -102,8 +106,6 @@ public class LuceneIndexEditorContext {
}
}
- private final IndexWriterConfig config;
-
private static final Parser defaultParser = createDefaultParser();
private final IndexDefinition definition;
@@ -120,6 +122,10 @@ public class LuceneIndexEditorContext {
private Parser parser;
+ @Nullable
+ private final IndexCopier indexCopier;
+
+
private Directory directory;
private final TextExtractionStats textExtractionStats = new TextExtractionStats();
@@ -129,10 +135,11 @@ public class LuceneIndexEditorContext {
*/
private Set<MediaType> supportedMediaTypes;
- LuceneIndexEditorContext(NodeState root, NodeBuilder definition, IndexUpdateCallback updateCallback) {
+ LuceneIndexEditorContext(NodeState root, NodeBuilder definition, IndexUpdateCallback updateCallback,
+ @Nullable IndexCopier indexCopier) {
this.definitionBuilder = definition;
+ this.indexCopier = indexCopier;
this.definition = new IndexDefinition(root, definition);
- this.config = getIndexWriterConfig(this.definition);
this.indexedNodes = 0;
this.updateCallback = updateCallback;
if (this.definition.isOfOldFormat()){
@@ -151,6 +158,13 @@ public class LuceneIndexEditorContext {
if (writer == null) {
final long start = PERF_LOGGER.start();
directory = newIndexDirectory(definition, definitionBuilder);
+ IndexWriterConfig config;
+ if (indexCopier != null){
+ directory = indexCopier.wrapForWrite(definition, directory, reindex);
+ config = getIndexWriterConfig(definition, false);
+ } else {
+ config = getIndexWriterConfig(definition, true);
+ }
writer = new IndexWriter(directory, config);
PERF_LOGGER.end(start, -1, "Created IndexWriter for directory {}", definition);
}
Modified: jackrabbit/oak/trunk/oak-lucene/src/main/java/org/apache/jackrabbit/oak/plugins/index/lucene/LuceneIndexEditorProvider.java
URL: http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-lucene/src/main/java/org/apache/jackrabbit/oak/plugins/index/lucene/LuceneIndexEditorProvider.java?rev=1680903&r1=1680902&r2=1680903&view=diff
==============================================================================
--- jackrabbit/oak/trunk/oak-lucene/src/main/java/org/apache/jackrabbit/oak/plugins/index/lucene/LuceneIndexEditorProvider.java (original)
+++ jackrabbit/oak/trunk/oak-lucene/src/main/java/org/apache/jackrabbit/oak/plugins/index/lucene/LuceneIndexEditorProvider.java Thu May 21 15:26:46 2015
@@ -17,11 +17,10 @@
package org.apache.jackrabbit.oak.plugins.index.lucene;
import javax.annotation.Nonnull;
+import javax.annotation.Nullable;
import static org.apache.jackrabbit.oak.plugins.index.lucene.LuceneIndexConstants.TYPE_LUCENE;
-import org.apache.felix.scr.annotations.Component;
-import org.apache.felix.scr.annotations.Service;
import org.apache.jackrabbit.oak.api.CommitFailedException;
import org.apache.jackrabbit.oak.plugins.index.IndexEditor;
import org.apache.jackrabbit.oak.plugins.index.IndexEditorProvider;
@@ -37,19 +36,29 @@ import org.apache.jackrabbit.oak.spi.sta
* @see IndexEditorProvider
*
*/
-@Component
-@Service(IndexEditorProvider.class)
public class LuceneIndexEditorProvider implements IndexEditorProvider {
+ private final IndexCopier indexCopier;
+
+ public LuceneIndexEditorProvider() {
+ this(null);
+ }
+
+ public LuceneIndexEditorProvider(@Nullable IndexCopier indexCopier) {
+ this.indexCopier = indexCopier;
+ }
@Override
public Editor getIndexEditor(
- @Nonnull String type, @Nonnull NodeBuilder definition, @Nonnull NodeState root, @Nonnull IndexUpdateCallback callback)
+ @Nonnull String type, @Nonnull NodeBuilder definition, @Nonnull NodeState root,
+ @Nonnull IndexUpdateCallback callback)
throws CommitFailedException {
if (TYPE_LUCENE.equals(type)) {
- return new LuceneIndexEditor(root, definition, callback);
+ return new LuceneIndexEditor(root, definition, callback, indexCopier);
}
return null;
}
-
+ IndexCopier getIndexCopier() {
+ return indexCopier;
+ }
}
Modified: jackrabbit/oak/trunk/oak-lucene/src/main/java/org/apache/jackrabbit/oak/plugins/index/lucene/LuceneIndexProviderService.java
URL: http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-lucene/src/main/java/org/apache/jackrabbit/oak/plugins/index/lucene/LuceneIndexProviderService.java?rev=1680903&r1=1680902&r2=1680903&view=diff
==============================================================================
--- jackrabbit/oak/trunk/oak-lucene/src/main/java/org/apache/jackrabbit/oak/plugins/index/lucene/LuceneIndexProviderService.java (original)
+++ jackrabbit/oak/trunk/oak-lucene/src/main/java/org/apache/jackrabbit/oak/plugins/index/lucene/LuceneIndexProviderService.java Thu May 21 15:26:46 2015
@@ -20,9 +20,17 @@
package org.apache.jackrabbit.oak.plugins.index.lucene;
import java.io.File;
+import java.io.IOException;
import java.util.List;
import java.util.Map;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.ThreadFactory;
+import java.util.concurrent.ThreadPoolExecutor;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
+import javax.annotation.Nonnull;
import javax.management.NotCompliantMBeanException;
import com.google.common.base.Strings;
@@ -38,6 +46,7 @@ import org.apache.felix.scr.annotations.
import org.apache.felix.scr.annotations.ReferencePolicyOption;
import org.apache.jackrabbit.oak.commons.PropertiesUtil;
import org.apache.jackrabbit.oak.osgi.OsgiWhiteboard;
+import org.apache.jackrabbit.oak.plugins.index.IndexEditorProvider;
import org.apache.jackrabbit.oak.plugins.index.aggregate.NodeAggregator;
import org.apache.jackrabbit.oak.spi.commit.BackgroundObserver;
import org.apache.jackrabbit.oak.plugins.index.lucene.score.ScorerProviderFactory;
@@ -46,7 +55,6 @@ import org.apache.jackrabbit.oak.spi.com
import org.apache.jackrabbit.oak.spi.query.QueryIndexProvider;
import org.apache.jackrabbit.oak.spi.whiteboard.Registration;
import org.apache.jackrabbit.oak.spi.whiteboard.Whiteboard;
-import org.apache.jackrabbit.oak.spi.whiteboard.WhiteboardExecutor;
import org.apache.lucene.analysis.util.CharFilterFactory;
import org.apache.lucene.analysis.util.TokenFilterFactory;
import org.apache.lucene.analysis.util.TokenizerFactory;
@@ -99,28 +107,48 @@ public class LuceneIndexProviderService
private static final String PROP_LOCAL_INDEX_DIR = "localIndexDir";
@Property(
+ boolValue = false,
+ label = "Enable CopyOnWrite",
+ description = "Enable copying of Lucene index to local file system to improve index writer performance"
+ )
+ private static final String PROP_COPY_ON_WRITE = "enableCopyOnWriteSupport";
+
+ @Property(
boolValue = true,
label = "Open index asynchronously",
description = "Enable opening of indexes in asynchronous mode"
)
private static final String PROP_ASYNC_INDEX_OPEN = "enableOpenIndexAsync";
- private Whiteboard whiteboard;
+ private static final int PROP_THREAD_POOL_SIZE_DEFAULT = 5;
+ @Property(
+ intValue = PROP_THREAD_POOL_SIZE_DEFAULT,
+ label = "Thread pool size",
+ description = "Thread pool size used to perform various asynchronous task in Oak Lucene"
+ )
+ private static final String PROP_THREAD_POOL_SIZE = "threadPoolSize";
- private WhiteboardExecutor executor;
+ private Whiteboard whiteboard;
private BackgroundObserver backgroundObserver;
@Reference
ScorerProviderFactory scorerFactory;
+ private IndexCopier indexCopier;
+
+ private File indexDir;
+
+ private ExecutorService executorService;
+
+ private int threadPoolSize;
+
@Activate
private void activate(BundleContext bundleContext, Map<String, ?> config)
- throws NotCompliantMBeanException {
+ throws NotCompliantMBeanException, IOException {
initializeFactoryClassLoaders(getClass().getClassLoader());
whiteboard = new OsgiWhiteboard(bundleContext);
- executor = new WhiteboardExecutor();
- executor.start(whiteboard);
+ threadPoolSize = PropertiesUtil.toInteger(config.get(PROP_THREAD_POOL_SIZE), PROP_THREAD_POOL_SIZE_DEFAULT);
indexProvider = new LuceneIndexProvider(createTracker(bundleContext, config), scorerFactory);
initializeLogging(config);
@@ -128,6 +156,7 @@ public class LuceneIndexProviderService
regs.add(bundleContext.registerService(QueryIndexProvider.class.getName(), indexProvider, null));
registerObserver(bundleContext, config);
+ registerIndexEditor(bundleContext, config);
oakRegs.add(registerMBean(whiteboard,
LuceneIndexMBean.class,
@@ -137,7 +166,7 @@ public class LuceneIndexProviderService
}
@Deactivate
- private void deactivate() {
+ private void deactivate() throws InterruptedException {
for (ServiceRegistration reg : regs) {
reg.unregister();
}
@@ -155,8 +184,9 @@ public class LuceneIndexProviderService
indexProvider = null;
}
- if (executor != null){
- executor.stop();
+ if (executorService != null){
+ executorService.shutdown();
+ executorService.awaitTermination(1, TimeUnit.MINUTES);
}
InfoStream.setDefault(InfoStream.NO_OUTPUT);
@@ -183,41 +213,96 @@ public class LuceneIndexProviderService
}
}
- private IndexTracker createTracker(BundleContext bundleContext, Map<String, ?> config) {
+ private void registerIndexEditor(BundleContext bundleContext, Map<String, ?> config) throws IOException {
+ boolean enableCopyOnWrite = PropertiesUtil.toBoolean(config.get(PROP_COPY_ON_WRITE), false);
+ LuceneIndexEditorProvider editorProvider;
+ if (enableCopyOnWrite){
+ initializeIndexCopier(bundleContext, config);
+ editorProvider = new LuceneIndexEditorProvider(indexCopier);
+ log.info("Enabling CopyOnWrite support. Index files would be copied under {}", indexDir.getAbsolutePath());
+ } else {
+ editorProvider = new LuceneIndexEditorProvider();
+ }
+ regs.add(bundleContext.registerService(IndexEditorProvider.class.getName(), editorProvider, null));
+ }
+
+ private IndexTracker createTracker(BundleContext bundleContext, Map<String, ?> config) throws IOException {
boolean enableCopyOnRead = PropertiesUtil.toBoolean(config.get(PROP_COPY_ON_READ), true);
if (enableCopyOnRead){
- String indexDirPath = PropertiesUtil.toString(config.get(PROP_LOCAL_INDEX_DIR), null);
- if (Strings.isNullOrEmpty(indexDirPath)) {
- String repoHome = bundleContext.getProperty(REPOSITORY_HOME);
- if (repoHome != null){
- indexDirPath = FilenameUtils.concat(repoHome, "index");
- }
+ initializeIndexCopier(bundleContext, config);
+ log.info("Enabling CopyOnRead support. Index files would be copied under {}", indexDir.getAbsolutePath());
+ return new IndexTracker(indexCopier);
+ }
+
+ return new IndexTracker();
+ }
+
+ private void initializeIndexCopier(BundleContext bundleContext, Map<String, ?> config) throws IOException {
+ if(indexCopier != null){
+ return;
+ }
+ String indexDirPath = PropertiesUtil.toString(config.get(PROP_LOCAL_INDEX_DIR), null);
+ if (Strings.isNullOrEmpty(indexDirPath)) {
+ String repoHome = bundleContext.getProperty(REPOSITORY_HOME);
+ if (repoHome != null){
+ indexDirPath = FilenameUtils.concat(repoHome, "index");
}
+ }
- checkNotNull(indexDirPath, "Index directory cannot be determined as neither index " +
- "directory path [%s] nor repository home [%s] defined", PROP_LOCAL_INDEX_DIR, REPOSITORY_HOME);
+ checkNotNull(indexDirPath, "Index directory cannot be determined as neither index " +
+ "directory path [%s] nor repository home [%s] defined", PROP_LOCAL_INDEX_DIR, REPOSITORY_HOME);
- File indexDir = new File(indexDirPath);
- IndexCopier copier = new IndexCopier(executor, indexDir);
- log.info("Enabling CopyOnRead support. Index files would be copied under {}", indexDir.getAbsolutePath());
+ indexDir = new File(indexDirPath);
+ indexCopier = new IndexCopier(getExecutorService(), indexDir);
- oakRegs.add(registerMBean(whiteboard,
- CopyOnReadStatsMBean.class,
- copier,
- CopyOnReadStatsMBean.TYPE,
- "CopyOnRead support statistics"));
+ oakRegs.add(registerMBean(whiteboard,
+ CopyOnReadStatsMBean.class,
+ indexCopier,
+ CopyOnReadStatsMBean.TYPE,
+ "IndexCopier support statistics"));
+
+ }
- return new IndexTracker(copier);
+ private ExecutorService getExecutorService(){
+ if (executorService == null){
+ executorService = createExecutor();
}
+ return executorService;
+ }
- return new IndexTracker();
+ private ExecutorService createExecutor() {
+ ThreadPoolExecutor executor = new ThreadPoolExecutor(0, 5, 60L, TimeUnit.SECONDS,
+ new LinkedBlockingQueue<Runnable>(), new ThreadFactory() {
+ private final AtomicInteger counter = new AtomicInteger();
+ private final Thread.UncaughtExceptionHandler handler = new Thread.UncaughtExceptionHandler() {
+ @Override
+ public void uncaughtException(Thread t, Throwable e) {
+ log.warn("Error occurred in asynchronous processing ", e);
+ }
+ };
+ @Override
+ public Thread newThread(@Nonnull Runnable r) {
+ Thread thread = new Thread(r, createName());
+ thread.setDaemon(true);
+ thread.setPriority(Thread.MIN_PRIORITY);
+ thread.setUncaughtExceptionHandler(handler);
+ return thread;
+ }
+
+ private String createName() {
+ return "oak-lucene-" + counter.getAndIncrement();
+ }
+ });
+ executor.setKeepAliveTime(1, TimeUnit.MINUTES);
+ executor.allowCoreThreadTimeOut(true);
+ return executor;
}
private void registerObserver(BundleContext bundleContext, Map<String, ?> config) {
boolean enableAsyncIndexOpen = PropertiesUtil.toBoolean(config.get(PROP_ASYNC_INDEX_OPEN), true);
Observer observer = indexProvider;
if (enableAsyncIndexOpen) {
- backgroundObserver = new BackgroundObserver(indexProvider, executor, 5);
+ backgroundObserver = new BackgroundObserver(indexProvider, getExecutorService(), 5);
observer = backgroundObserver;
oakRegs.add(registerMBean(whiteboard,
BackgroundObserverMBean.class,