You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@lucene.apache.org by sh...@apache.org on 2013/05/13 13:57:24 UTC
svn commit: r1481804 [3/4] - in /lucene/dev/trunk:
dev-tools/maven/lucene/replicator/ lucene/
lucene/core/src/java/org/apache/lucene/index/ lucene/licenses/
lucene/replicator/ lucene/replicator/lib/ lucene/replicator/src/
lucene/replicator/src/java/ lu...
Added: lucene/dev/trunk/lucene/replicator/src/java/org/apache/lucene/replicator/ReplicationClient.java
URL: http://svn.apache.org/viewvc/lucene/dev/trunk/lucene/replicator/src/java/org/apache/lucene/replicator/ReplicationClient.java?rev=1481804&view=auto
==============================================================================
--- lucene/dev/trunk/lucene/replicator/src/java/org/apache/lucene/replicator/ReplicationClient.java (added)
+++ lucene/dev/trunk/lucene/replicator/src/java/org/apache/lucene/replicator/ReplicationClient.java Mon May 13 11:57:22 2013
@@ -0,0 +1,416 @@
+package org.apache.lucene.replicator;
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+import java.io.Closeable;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.PrintWriter;
+import java.io.StringWriter;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Map.Entry;
+import java.util.Set;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.locks.Lock;
+import java.util.concurrent.locks.ReentrantLock;
+
+import org.apache.lucene.store.AlreadyClosedException;
+import org.apache.lucene.store.Directory;
+import org.apache.lucene.store.IOContext;
+import org.apache.lucene.store.IndexOutput;
+import org.apache.lucene.util.IOUtils;
+import org.apache.lucene.util.InfoStream;
+import org.apache.lucene.util.ThreadInterruptedException;
+
+/**
+ * A client which monitors and obtains new revisions from a {@link Replicator}.
+ * It can be used to either periodically check for updates by invoking
+ * {@link #startUpdateThread}, or manually by calling {@link #updateNow()}.
+ * <p>
+ * Whenever a new revision is available, the {@link #requiredFiles(Map)} are
+ * copied to the {@link Directory} specified by {@link PerSessionDirectoryFactory} and
+ * a handler is notified.
+ *
+ * @lucene.experimental
+ */
+public class ReplicationClient implements Closeable {
+
+ private class ReplicationThread extends Thread {
+
+ private final long interval;
+
+ // client uses this to stop us
+ final CountDownLatch stop = new CountDownLatch(1);
+
+ public ReplicationThread(long interval) {
+ this.interval = interval;
+ }
+
+ @SuppressWarnings("synthetic-access")
+ @Override
+ public void run() {
+ while (true) {
+ long time = System.currentTimeMillis();
+ updateLock.lock();
+ try {
+ doUpdate();
+ } catch (Throwable t) {
+ handleUpdateException(t);
+ } finally {
+ updateLock.unlock();
+ }
+ time = System.currentTimeMillis() - time;
+
+ // adjust timeout to compensate the time spent doing the replication.
+ final long timeout = interval - time;
+ if (timeout > 0) {
+ try {
+ // this will return immediately if we were ordered to stop (count=0)
+ // or the timeout has elapsed. if it returns true, it means count=0,
+ // so terminate.
+ if (stop.await(timeout, TimeUnit.MILLISECONDS)) {
+ return;
+ }
+ } catch (InterruptedException e) {
+ // if we were interruted, somebody wants to terminate us, so just
+ // throw the exception further.
+ Thread.currentThread().interrupt();
+ throw new ThreadInterruptedException(e);
+ }
+ }
+ }
+ }
+
+ }
+
+ /** Handler for revisions obtained by the client. */
+ public static interface ReplicationHandler {
+
+ /** Returns the current revision files held by the handler. */
+ public Map<String,List<RevisionFile>> currentRevisionFiles();
+
+ /** Returns the current revision version held by the handler. */
+ public String currentVersion();
+
+ /**
+ * Called when a new revision was obtained and is available (i.e. all needed
+ * files were successfully copied).
+ *
+ * @param version
+ * the version of the {@link Revision} that was copied
+ * @param revisionFiles
+ * the files contained by this {@link Revision}
+ * @param copiedFiles
+ * the files that were actually copied
+ * @param sourceDirectory
+ * a mapping from a source of files to the {@link Directory} they
+ * were copied into
+ */
+ public void revisionReady(String version, Map<String,List<RevisionFile>> revisionFiles,
+ Map<String,List<String>> copiedFiles, Map<String, Directory> sourceDirectory) throws IOException;
+ }
+
+ /**
+ * Resolves a session and source into a {@link Directory} to use for copying
+ * the session files to.
+ */
+ public static interface SourceDirectoryFactory {
+
+ /**
+ * Called to denote that the replication actions for this session were finished and the directory is no longer needed.
+ */
+ public void cleanupSession(String sessionID) throws IOException;
+
+ /**
+ * Returns the {@link Directory} to use for the given session and source.
+ * Implementations may e.g. return different directories for different
+ * sessions, or the same directory for all sessions. In that case, it is
+ * advised to clean the directory before it is used for a new session.
+ *
+ * @see #cleanupSession(String)
+ */
+ public Directory getDirectory(String sessionID, String source) throws IOException;
+
+ }
+
+ /** The component name to use with {@link InfoStream#isEnabled(String)}. */
+ public static final String INFO_STREAM_COMPONENT = "ReplicationThread";
+
+ private final Replicator replicator;
+ private final ReplicationHandler handler;
+ private final SourceDirectoryFactory factory;
+ private final byte[] copyBuffer = new byte[16384];
+ private final Lock updateLock = new ReentrantLock();
+
+ private volatile ReplicationThread updateThread;
+ private volatile boolean closed = false;
+ private volatile InfoStream infoStream = InfoStream.getDefault();
+
+ /**
+ * Constructor.
+ *
+ * @param replicator the {@link Replicator} used for checking for updates
+ * @param handler notified when new revisions are ready
+ * @param factory returns a {@link Directory} for a given source and session
+ */
+ public ReplicationClient(Replicator replicator, ReplicationHandler handler, SourceDirectoryFactory factory) {
+ this.replicator = replicator;
+ this.handler = handler;
+ this.factory = factory;
+ }
+
+ private void copyBytes(IndexOutput out, InputStream in) throws IOException {
+ int numBytes;
+ while ((numBytes = in.read(copyBuffer)) > 0) {
+ out.writeBytes(copyBuffer, 0, numBytes);
+ }
+ }
+
+ private void doUpdate() throws IOException {
+ SessionToken session = null;
+ final Map<String,Directory> sourceDirectory = new HashMap<String,Directory>();
+ final Map<String,List<String>> copiedFiles = new HashMap<String,List<String>>();
+ boolean notify = false;
+ try {
+ final String version = handler.currentVersion();
+ session = replicator.checkForUpdate(version);
+ if (infoStream.isEnabled(INFO_STREAM_COMPONENT)) {
+ infoStream.message(INFO_STREAM_COMPONENT, "doUpdate(): handlerVersion=" + version + " session=" + session);
+ }
+ if (session == null) {
+ // already up to date
+ return;
+ }
+ Map<String,List<RevisionFile>> requiredFiles = requiredFiles(session.sourceFiles);
+ if (infoStream.isEnabled(INFO_STREAM_COMPONENT)) {
+ infoStream.message(INFO_STREAM_COMPONENT, "doUpdate(): requiredFiles=" + requiredFiles);
+ }
+ for (Entry<String,List<RevisionFile>> e : requiredFiles.entrySet()) {
+ String source = e.getKey();
+ Directory dir = factory.getDirectory(session.id, source);
+ sourceDirectory.put(source, dir);
+ List<String> cpFiles = new ArrayList<String>();
+ copiedFiles.put(source, cpFiles);
+ for (RevisionFile file : e.getValue()) {
+ if (closed) {
+ // if we're closed, abort file copy
+ if (infoStream.isEnabled(INFO_STREAM_COMPONENT)) {
+ infoStream.message(INFO_STREAM_COMPONENT, "doUpdate(): detected client was closed); abort file copy");
+ }
+ return;
+ }
+ InputStream in = null;
+ IndexOutput out = null;
+ try {
+ in = replicator.obtainFile(session.id, source, file.fileName);
+ out = dir.createOutput(file.fileName, IOContext.DEFAULT);
+ copyBytes(out, in);
+ cpFiles.add(file.fileName);
+ // TODO add some validation, on size / checksum
+ } finally {
+ IOUtils.close(in, out);
+ }
+ }
+ }
+ // only notify if all required files were successfully obtained.
+ notify = true;
+ } finally {
+ if (session != null) {
+ try {
+ replicator.release(session.id);
+ } finally {
+ if (!notify) { // cleanup after ourselves
+ IOUtils.close(sourceDirectory.values());
+ factory.cleanupSession(session.id);
+ }
+ }
+ }
+ }
+
+ // notify outside the try-finally above, so the session is released sooner.
+ // the handler may take time to finish acting on the copied files, but the
+ // session itself is no longer needed.
+ try {
+ if (notify && !closed ) { // no use to notify if we are closed already
+ handler.revisionReady(session.version, session.sourceFiles, copiedFiles, sourceDirectory);
+ }
+ } finally {
+ IOUtils.close(sourceDirectory.values());
+ if (session != null) {
+ factory.cleanupSession(session.id);
+ }
+ }
+ }
+
+ /** Throws {@link AlreadyClosedException} if the client has already been closed. */
+ protected final void ensureOpen() {
+ if (closed) {
+ throw new AlreadyClosedException("this update client has already been closed");
+ }
+ }
+
+ /**
+ * Called when an exception is hit by the replication thread. The default
+ * implementation prints the full stacktrace to the {@link InfoStream} set in
+ * {@link #setInfoStream(InfoStream)}, or the {@link InfoStream#getDefault()
+ * default} one. You can override to log the exception elswhere.
+ * <p>
+ * <b>NOTE:</b> if you override this method to throw the exception further,
+ * the replication thread will be terminated. The only way to restart it is to
+ * call {@link #stopUpdateThread()} followed by
+ * {@link #startUpdateThread(long, String)}.
+ */
+ protected void handleUpdateException(Throwable t) {
+ final StringWriter sw = new StringWriter();
+ t.printStackTrace(new PrintWriter(sw));
+ if (infoStream.isEnabled(INFO_STREAM_COMPONENT)) {
+ infoStream.message(INFO_STREAM_COMPONENT, "an error occurred during revision update: " + sw.toString());
+ }
+ }
+
+ /**
+ * Returns the files required for replication. By default, this method returns
+ * all files that exist in the new revision, but not in the handler.
+ */
+ protected Map<String,List<RevisionFile>> requiredFiles(Map<String,List<RevisionFile>> newRevisionFiles) {
+ Map<String,List<RevisionFile>> handlerRevisionFiles = handler.currentRevisionFiles();
+ if (handlerRevisionFiles == null) {
+ return newRevisionFiles;
+ }
+
+ Map<String,List<RevisionFile>> requiredFiles = new HashMap<String,List<RevisionFile>>();
+ for (Entry<String,List<RevisionFile>> e : handlerRevisionFiles.entrySet()) {
+ // put the handler files in a Set, for faster contains() checks later
+ Set<String> handlerFiles = new HashSet<String>();
+ for (RevisionFile file : e.getValue()) {
+ handlerFiles.add(file.fileName);
+ }
+
+ // make sure to preserve revisionFiles order
+ ArrayList<RevisionFile> res = new ArrayList<RevisionFile>();
+ String source = e.getKey();
+ assert newRevisionFiles.containsKey(source) : "source not found in newRevisionFiles: " + newRevisionFiles;
+ for (RevisionFile file : newRevisionFiles.get(source)) {
+ if (!handlerFiles.contains(file.fileName)) {
+ res.add(file);
+ }
+ }
+ requiredFiles.put(source, res);
+ }
+
+ return requiredFiles;
+ }
+
+ @Override
+ public synchronized void close() {
+ if (!closed) {
+ stopUpdateThread();
+ closed = true;
+ }
+ }
+
+ /**
+ * Start the update thread with the specified interval in milliseconds. For
+ * debugging purposes, you can optionally set the name to set on
+ * {@link Thread#setName(String)}. If you pass {@code null}, a default name
+ * will be set.
+ *
+ * @throws IllegalStateException if the thread has already been started
+ */
+ public synchronized void startUpdateThread(long intervalMillis, String threadName) {
+ ensureOpen();
+ if (updateThread != null && updateThread.isAlive()) {
+ throw new IllegalStateException(
+ "cannot start an update thread when one is running, must first call 'stopUpdateThread()'");
+ }
+ threadName = threadName == null ? INFO_STREAM_COMPONENT : "ReplicationThread-" + threadName;
+ updateThread = new ReplicationThread(intervalMillis);
+ updateThread.setName(threadName);
+ updateThread.start();
+ // we rely on isAlive to return true in isUpdateThreadAlive, assert to be on the safe side
+ assert updateThread.isAlive() : "updateThread started but not alive?";
+ }
+
+ /**
+ * Stop the update thread. If the update thread is not running, silently does
+ * nothing. This method returns after the update thread has stopped.
+ */
+ public synchronized void stopUpdateThread() {
+ if (updateThread != null) {
+ // this will trigger the thread to terminate if it awaits the lock.
+ // otherwise, if it's in the middle of replication, we wait for it to
+ // stop.
+ updateThread.stop.countDown();
+ try {
+ updateThread.join();
+ } catch (InterruptedException e) {
+ Thread.currentThread().interrupt();
+ throw new ThreadInterruptedException(e);
+ }
+ updateThread = null;
+ }
+ }
+
+ /**
+ * Returns true if the update thread is alive. The update thread is alive if
+ * it has been {@link #startUpdateThread(long, String) started} and not
+ * {@link #stopUpdateThread() stopped}, as well as didn't hit an error which
+ * caused it to terminate (i.e. {@link #handleUpdateException(Throwable)}
+ * threw the exception further).
+ */
+ public synchronized boolean isUpdateThreadAlive() {
+ return updateThread != null && updateThread.isAlive();
+ }
+
+ @Override
+ public String toString() {
+ String res = "ReplicationClient";
+ if (updateThread != null) {
+ res += " (" + updateThread.getName() + ")";
+ }
+ return res;
+ }
+
+ /**
+ * Executes the update operation immediately, irregardess if an update thread
+ * is running or not.
+ */
+ public void updateNow() throws IOException {
+ ensureOpen();
+ updateLock.lock();
+ try {
+ doUpdate();
+ } finally {
+ updateLock.unlock();
+ }
+ }
+
+ /** Sets the {@link InfoStream} to use for logging messages. */
+ public void setInfoStream(InfoStream infoStream) {
+ if (infoStream == null) {
+ infoStream = InfoStream.NO_OUTPUT;
+ }
+ this.infoStream = infoStream;
+ }
+
+}
Added: lucene/dev/trunk/lucene/replicator/src/java/org/apache/lucene/replicator/Replicator.java
URL: http://svn.apache.org/viewvc/lucene/dev/trunk/lucene/replicator/src/java/org/apache/lucene/replicator/Replicator.java?rev=1481804&view=auto
==============================================================================
--- lucene/dev/trunk/lucene/replicator/src/java/org/apache/lucene/replicator/Replicator.java (added)
+++ lucene/dev/trunk/lucene/replicator/src/java/org/apache/lucene/replicator/Replicator.java Mon May 13 11:57:22 2013
@@ -0,0 +1,80 @@
+package org.apache.lucene.replicator;
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+import java.io.Closeable;
+import java.io.IOException;
+import java.io.InputStream;
+
+/**
+ * An interface for replicating files. Allows a producer to
+ * {@link #publish(Revision) publish} {@link Revision}s and consumers to
+ * {@link #checkForUpdate(String) check for updates}. When a client needs to be
+ * updated, it is given a {@link SessionToken} through which it can
+ * {@link #obtainFile(String, String, String) obtain} the files of that
+ * revision. After the client has finished obtaining all the files, it should
+ * {@link #release(String) release} the given session, so that the files can be
+ * reclaimed if they are not needed anymore.
+ * <p>
+ * A client is always updated to the newest revision available. That is, if a
+ * client is on revision <em>r1</em> and revisions <em>r2</em> and <em>r3</em>
+ * were published, then when the cllient will next check for update, it will
+ * receive <em>r3</em>.
+ *
+ * @lucene.experimental
+ */
+public interface Replicator extends Closeable {
+
+ /**
+ * Publish a new {@link Revision} for consumption by clients. It is the
+ * caller's responsibility to verify that the revision files exist and can be
+ * read by clients. When the revision is no longer needed, it will be
+ * {@link Revision#release() released} by the replicator.
+ */
+ public void publish(Revision revision) throws IOException;
+
+ /**
+ * Check whether the given version is up-to-date and returns a
+ * {@link SessionToken} which can be used for fetching the revision files,
+ * otherwise returns {@code null}.
+ * <p>
+ * <b>NOTE:</b> when the returned session token is no longer needed, you
+ * should call {@link #release(String)} so that the session resources can be
+ * reclaimed, including the revision files.
+ */
+ public SessionToken checkForUpdate(String currVersion) throws IOException;
+
+ /**
+ * Notify that the specified {@link SessionToken} is no longer needed by the
+ * caller.
+ */
+ public void release(String sessionID) throws IOException;
+
+ /**
+ * Returns an {@link InputStream} for the requested file and source in the
+ * context of the given {@link SessionToken#id session}.
+ * <p>
+ * <b>NOTE:</b> it is the caller's responsibility to close the returned
+ * stream.
+ *
+ * @throws SessionExpiredException if the specified session has already
+ * expired
+ */
+ public InputStream obtainFile(String sessionID, String source, String fileName) throws IOException;
+
+}
Added: lucene/dev/trunk/lucene/replicator/src/java/org/apache/lucene/replicator/Revision.java
URL: http://svn.apache.org/viewvc/lucene/dev/trunk/lucene/replicator/src/java/org/apache/lucene/replicator/Revision.java?rev=1481804&view=auto
==============================================================================
--- lucene/dev/trunk/lucene/replicator/src/java/org/apache/lucene/replicator/Revision.java (added)
+++ lucene/dev/trunk/lucene/replicator/src/java/org/apache/lucene/replicator/Revision.java Mon May 13 11:57:22 2013
@@ -0,0 +1,75 @@
+package org.apache.lucene.replicator;
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.util.List;
+import java.util.Map;
+
+import org.apache.lucene.store.IndexInput;
+
+/**
+ * A revision comprises lists of files that come from different sources and need
+ * to be replicated together to e.g. guarantee that all resources are in sync.
+ * In most cases an application will replicate a single index, and so the
+ * revision will contain files from a single source. However, some applications
+ * may require to treat a collection of indexes as a single entity so that the
+ * files from all sources are replicated together, to guarantee consistency
+ * beween them. For example, an application which indexes facets will need to
+ * replicate both the search and taxonomy indexes together, to guarantee that
+ * they match at the client side.
+ *
+ * @lucene.experimental
+ */
+public interface Revision extends Comparable<Revision> {
+
+ /**
+ * Compares the revision to the given version string. Behaves like
+ * {@link Comparable#compareTo(Object)}.
+ */
+ public int compareTo(String version);
+
+ /**
+ * Returns a string representation of the version of this revision. The
+ * version is used by {@link #compareTo(String)} as well as to
+ * serialize/deserialize revision information. Therefore it must be self
+ * descriptive as well as be able to identify one revision from another.
+ */
+ public String getVersion();
+
+ /**
+ * Returns the files that comprise this revision, as a mapping from a source
+ * to a list of files.
+ */
+ public Map<String,List<RevisionFile>> getSourceFiles();
+
+ /**
+ * Returns an {@link IndexInput} for the given fileName and source. It is the
+ * caller's respnsibility to close the {@link IndexInput} when it has been
+ * consumed.
+ */
+ public InputStream open(String source, String fileName) throws IOException;
+
+ /**
+ * Called when this revision can be safely released, i.e. where there are no
+ * more references to it.
+ */
+ public void release() throws IOException;
+
+}
Added: lucene/dev/trunk/lucene/replicator/src/java/org/apache/lucene/replicator/RevisionFile.java
URL: http://svn.apache.org/viewvc/lucene/dev/trunk/lucene/replicator/src/java/org/apache/lucene/replicator/RevisionFile.java?rev=1481804&view=auto
==============================================================================
--- lucene/dev/trunk/lucene/replicator/src/java/org/apache/lucene/replicator/RevisionFile.java (added)
+++ lucene/dev/trunk/lucene/replicator/src/java/org/apache/lucene/replicator/RevisionFile.java Mon May 13 11:57:22 2013
@@ -0,0 +1,59 @@
+package org.apache.lucene.replicator;
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+/**
+ * Describes a file in a {@link Revision}. A file has a source, which allows a
+ * single revision to contain files from multiple sources (e.g. multiple
+ * indexes).
+ *
+ * @lucene.experimental
+ */
+public class RevisionFile {
+
+ /** The name of the file. */
+ public final String fileName;
+
+ /** The size of the file denoted by {@link #fileName}. */
+ public long size = -1;
+
+ /** Constructor with the given file name. */
+ public RevisionFile(String fileName) {
+ if (fileName == null || fileName.isEmpty()) {
+ throw new IllegalArgumentException("fileName cannot be null or empty");
+ }
+ this.fileName = fileName;
+ }
+
+ @Override
+ public boolean equals(Object obj) {
+ RevisionFile other = (RevisionFile) obj;
+ return fileName.equals(other.fileName) && size == other.size;
+ }
+
+ @Override
+ public int hashCode() {
+ return fileName.hashCode() ^ (int) (size ^ (size >>> 32));
+ }
+
+ @Override
+ public String toString() {
+ return "fileName=" + fileName + " size=" + size;
+ }
+
+}
Added: lucene/dev/trunk/lucene/replicator/src/java/org/apache/lucene/replicator/SessionExpiredException.java
URL: http://svn.apache.org/viewvc/lucene/dev/trunk/lucene/replicator/src/java/org/apache/lucene/replicator/SessionExpiredException.java?rev=1481804&view=auto
==============================================================================
--- lucene/dev/trunk/lucene/replicator/src/java/org/apache/lucene/replicator/SessionExpiredException.java (added)
+++ lucene/dev/trunk/lucene/replicator/src/java/org/apache/lucene/replicator/SessionExpiredException.java Mon May 13 11:57:22 2013
@@ -0,0 +1,54 @@
+package org.apache.lucene.replicator;
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+import java.io.IOException;
+
+/**
+ * Exception indicating that a revision update session was expired due to lack
+ * of activity.
+ *
+ * @see LocalReplicator#DEFAULT_SESSION_EXPIRATION_THRESHOLD
+ * @see LocalReplicator#setExpirationThreshold(long)
+ *
+ * @lucene.experimental
+ */
+public class SessionExpiredException extends IOException {
+
+ /**
+ * @see IOException#IOException(String, Throwable)
+ */
+ public SessionExpiredException(String message, Throwable cause) {
+ super(message, cause);
+ }
+
+ /**
+ * @see IOException#IOException(String)
+ */
+ public SessionExpiredException(String message) {
+ super(message);
+ }
+
+ /**
+ * @see IOException#IOException(Throwable)
+ */
+ public SessionExpiredException(Throwable cause) {
+ super(cause);
+ }
+
+}
Added: lucene/dev/trunk/lucene/replicator/src/java/org/apache/lucene/replicator/SessionToken.java
URL: http://svn.apache.org/viewvc/lucene/dev/trunk/lucene/replicator/src/java/org/apache/lucene/replicator/SessionToken.java?rev=1481804&view=auto
==============================================================================
--- lucene/dev/trunk/lucene/replicator/src/java/org/apache/lucene/replicator/SessionToken.java (added)
+++ lucene/dev/trunk/lucene/replicator/src/java/org/apache/lucene/replicator/SessionToken.java Mon May 13 11:57:22 2013
@@ -0,0 +1,108 @@
+package org.apache.lucene.replicator;
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Map.Entry;
+
+/**
+ * Token for a replication session, for guaranteeing that source replicated
+ * files will be kept safe until the replication completes.
+ *
+ * @see Replicator#checkForUpdate(String)
+ * @see Replicator#release(String)
+ * @see LocalReplicator#DEFAULT_SESSION_EXPIRATION_THRESHOLD
+ *
+ * @lucene.experimental
+ */
+public final class SessionToken {
+
+ /**
+ * ID of this session.
+ * Should be passed when releasing the session, thereby acknowledging the
+ * {@link Replicator Replicator} that this session is no longer in use.
+ * @see Replicator#release(String)
+ */
+ public final String id;
+
+ /**
+ * @see Revision#getVersion()
+ */
+ public final String version;
+
+ /**
+ * @see Revision#getSourceFiles()
+ */
+ public final Map<String,List<RevisionFile>> sourceFiles;
+
+ /** Constructor which deserializes from the given {@link DataInput}. */
+ public SessionToken(DataInput in) throws IOException {
+ this.id = in.readUTF();
+ this.version = in.readUTF();
+ this.sourceFiles = new HashMap<String,List<RevisionFile>>();
+ int numSources = in.readInt();
+ while (numSources > 0) {
+ String source = in.readUTF();
+ int numFiles = in.readInt();
+ List<RevisionFile> files = new ArrayList<RevisionFile>(numFiles);
+ for (int i = 0; i < numFiles; i++) {
+ String fileName = in.readUTF();
+ RevisionFile file = new RevisionFile(fileName);
+ file.size = in.readLong();
+ files.add(file);
+ }
+ this.sourceFiles.put(source, files);
+ --numSources;
+ }
+ }
+
+ /** Constructor with the given id and revision. */
+ public SessionToken(String id, Revision revision) {
+ this.id = id;
+ this.version = revision.getVersion();
+ this.sourceFiles = revision.getSourceFiles();
+ }
+
+ /** Serialize the token data for communication between server and client. */
+ public void serialize(DataOutput out) throws IOException {
+ out.writeUTF(id);
+ out.writeUTF(version);
+ out.writeInt(sourceFiles.size());
+ for (Entry<String,List<RevisionFile>> e : sourceFiles.entrySet()) {
+ out.writeUTF(e.getKey());
+ List<RevisionFile> files = e.getValue();
+ out.writeInt(files.size());
+ for (RevisionFile file : files) {
+ out.writeUTF(file.fileName);
+ out.writeLong(file.size);
+ }
+ }
+ }
+
+ @Override
+ public String toString() {
+ return "id=" + id + " version=" + version + " files=" + sourceFiles;
+ }
+
+}
\ No newline at end of file
Added: lucene/dev/trunk/lucene/replicator/src/java/org/apache/lucene/replicator/http/HttpClientBase.java
URL: http://svn.apache.org/viewvc/lucene/dev/trunk/lucene/replicator/src/java/org/apache/lucene/replicator/http/HttpClientBase.java?rev=1481804&view=auto
==============================================================================
--- lucene/dev/trunk/lucene/replicator/src/java/org/apache/lucene/replicator/http/HttpClientBase.java (added)
+++ lucene/dev/trunk/lucene/replicator/src/java/org/apache/lucene/replicator/http/HttpClientBase.java Mon May 13 11:57:22 2013
@@ -0,0 +1,297 @@
+package org.apache.lucene.replicator.http;
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+import java.io.Closeable;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.ObjectInputStream;
+import java.io.UnsupportedEncodingException;
+import java.net.URLEncoder;
+import java.util.concurrent.Callable;
+
+import org.apache.http.HttpEntity;
+import org.apache.http.HttpResponse;
+import org.apache.http.HttpStatus;
+import org.apache.http.StatusLine;
+import org.apache.http.client.HttpClient;
+import org.apache.http.client.methods.HttpGet;
+import org.apache.http.client.methods.HttpPost;
+import org.apache.http.conn.ClientConnectionManager;
+import org.apache.http.impl.client.DefaultHttpClient;
+import org.apache.http.params.HttpConnectionParams;
+import org.apache.http.util.EntityUtils;
+import org.apache.lucene.store.AlreadyClosedException;
+
+/**
+ * Base class for Http clients.
+ *
+ * @lucene.experimental
+ * */
+public abstract class HttpClientBase implements Closeable {
+
+ /**
+ * Default connection timeout for this client, in milliseconds.
+ *
+ * @see #setConnectionTimeout(int)
+ */
+ public static final int DEFAULT_CONNECTION_TIMEOUT = 1000;
+
+ /**
+ * Default socket timeout for this client, in milliseconds.
+ *
+ * @see #setSoTimeout(int)
+ */
+ public static final int DEFAULT_SO_TIMEOUT = 60000;
+
+ // TODO compression?
+
+ /** The URL stting to execute requests against. */
+ protected final String url;
+
+ private volatile boolean closed = false;
+
+ private final HttpClient httpc;
+
+ /**
+ * @param conMgr connection manager to use for this http client.
+ * <b>NOTE:</b>The provided {@link ClientConnectionManager} will not be
+ * {@link ClientConnectionManager#shutdown()} by this class.
+ */
+ protected HttpClientBase(String host, int port, String path, ClientConnectionManager conMgr) {
+ url = normalizedURL(host, port, path);
+ httpc = new DefaultHttpClient(conMgr);
+ setConnectionTimeout(DEFAULT_CONNECTION_TIMEOUT);
+ setSoTimeout(DEFAULT_SO_TIMEOUT);
+ }
+
+ /**
+ * Set the connection timeout for this client, in milliseconds. This setting
+ * is used to modify {@link HttpConnectionParams#setConnectionTimeout}.
+ *
+ * @param timeout timeout to set, in millisecopnds
+ */
+ public void setConnectionTimeout(int timeout) {
+ HttpConnectionParams.setConnectionTimeout(httpc.getParams(), timeout);
+ }
+
+ /**
+ * Set the socket timeout for this client, in milliseconds. This setting
+ * is used to modify {@link HttpConnectionParams#setSoTimeout}.
+ *
+ * @param timeout timeout to set, in millisecopnds
+ */
+ public void setSoTimeout(int timeout) {
+ HttpConnectionParams.setSoTimeout(httpc.getParams(), timeout);
+ }
+
+ /** Throws {@link AlreadyClosedException} if this client is already closed. */
+ protected final void ensureOpen() throws AlreadyClosedException {
+ if (closed) {
+ throw new AlreadyClosedException("HttpClient already closed");
+ }
+ }
+
+ /**
+ * Create a URL out of the given parameters, translate an empty/null path to '/'
+ */
+ private static String normalizedURL(String host, int port, String path) {
+ if (path == null || path.length() == 0) {
+ path = "/";
+ }
+ return "http://" + host + ":" + port + path;
+ }
+
+ /**
+ * <b>Internal:</b> response status after invocation, and in case or error attempt to read the
+ * exception sent by the server.
+ */
+ protected void verifyStatus(HttpResponse response) throws IOException {
+ StatusLine statusLine = response.getStatusLine();
+ if (statusLine.getStatusCode() != HttpStatus.SC_OK) {
+ throwKnownError(response, statusLine);
+ }
+ }
+
+ protected void throwKnownError(HttpResponse response, StatusLine statusLine) throws IOException {
+ ObjectInputStream in = null;
+ try {
+ in = new ObjectInputStream(response.getEntity().getContent());
+ } catch (Exception e) {
+ // the response stream is not an exception - could be an error in servlet.init().
+ throw new RuntimeException("Uknown error: " + statusLine);
+ }
+
+ Throwable t;
+ try {
+ t = (Throwable) in.readObject();
+ } catch (Exception e) {
+ //not likely
+ throw new RuntimeException("Failed to read exception object: " + statusLine, e);
+ } finally {
+ in.close();
+ }
+ if (t instanceof IOException) {
+ throw (IOException) t;
+ }
+ if (t instanceof RuntimeException) {
+ throw (RuntimeException) t;
+ }
+ throw new RuntimeException("unknown exception "+statusLine,t);
+ }
+
+ /**
+ * <b>internal:</b> execute a request and return its result
+ * The <code>params</code> argument is treated as: name1,value1,name2,value2,...
+ */
+ protected HttpResponse executePOST(String request, HttpEntity entity, String... params) throws IOException {
+ ensureOpen();
+ HttpPost m = new HttpPost(queryString(request, params));
+ m.setEntity(entity);
+ HttpResponse response = httpc.execute(m);
+ verifyStatus(response);
+ return response;
+ }
+
+ /**
+ * <b>internal:</b> execute a request and return its result
+ * The <code>params</code> argument is treated as: name1,value1,name2,value2,...
+ */
+ protected HttpResponse executeGET(String request, String... params) throws IOException {
+ ensureOpen();
+ HttpGet m = new HttpGet(queryString(request, params));
+ HttpResponse response = httpc.execute(m);
+ verifyStatus(response);
+ return response;
+ }
+
+ private String queryString(String request, String... params) throws UnsupportedEncodingException {
+ StringBuilder query = new StringBuilder(url).append('/').append(request).append('?');
+ if (params != null) {
+ for (int i = 0; i < params.length; i += 2) {
+ query.append(params[i]).append('=').append(URLEncoder.encode(params[i+1], "UTF8")).append('&');
+ }
+ }
+ return query.substring(0, query.length() - 1);
+ }
+
+ /** Internal utility: input stream of the provided response */
+ public InputStream responseInputStream(HttpResponse response) throws IOException {
+ return responseInputStream(response, false);
+ }
+
+ // TODO: can we simplify this Consuming !?!?!?
+ /**
+ * Internal utility: input stream of the provided response, which optionally
+ * consumes the response's resources when the input stream is exhausted.
+ */
+ public InputStream responseInputStream(HttpResponse response, boolean consume) throws IOException {
+ final HttpEntity entity = response.getEntity();
+ final InputStream in = entity.getContent();
+ if (!consume) {
+ return in;
+ }
+ return new InputStream() {
+ private boolean consumed = false;
+ @Override
+ public int read() throws IOException {
+ final int res = in.read();
+ consume(res);
+ return res;
+ }
+ @Override
+ public void close() throws IOException {
+ super.close();
+ consume(-1);
+ }
+ @Override
+ public int read(byte[] b) throws IOException {
+ final int res = super.read(b);
+ consume(res);
+ return res;
+ }
+ @Override
+ public int read(byte[] b, int off, int len) throws IOException {
+ final int res = super.read(b, off, len);
+ consume(res);
+ return res;
+ }
+ private void consume(int minusOne) {
+ if (!consumed && minusOne==-1) {
+ try {
+ EntityUtils.consume(entity);
+ } catch (Exception e) {
+ // ignored on purpose
+ }
+ consumed = true;
+ }
+ }
+ };
+ }
+
+ /**
+ * Returns true iff this instance was {@link #close() closed}, otherwise
+ * returns false. Note that if you override {@link #close()}, you must call
+ * {@code super.close()}, in order for this instance to be properly closed.
+ */
+ protected final boolean isClosed() {
+ return closed;
+ }
+
+ /**
+ * Same as {@link #doAction(HttpResponse, boolean, Callable)} but always do consume at the end.
+ */
+ protected <T> T doAction(HttpResponse response, Callable<T> call) throws IOException {
+ return doAction(response, true, call);
+ }
+
+ /**
+ * Do a specific action and validate after the action that the status is still OK,
+ * and if not, attempt to extract the actual server side exception. Optionally
+ * release the response at exit, depending on <code>consume</code> parameter.
+ */
+ protected <T> T doAction(HttpResponse response, boolean consume, Callable<T> call) throws IOException {
+ IOException error = null;
+ try {
+ return call.call();
+ } catch (IOException e) {
+ error = e;
+ } catch (Exception e) {
+ error = new IOException(e);
+ } finally {
+ try {
+ verifyStatus(response);
+ } finally {
+ if (consume) {
+ try {
+ EntityUtils.consume(response.getEntity());
+ } catch (Exception e) {
+ // ignoring on purpose
+ }
+ }
+ }
+ }
+ throw error; // should not get here
+ }
+
+ @Override
+ public void close() throws IOException {
+ closed = true;
+ }
+
+}
Added: lucene/dev/trunk/lucene/replicator/src/java/org/apache/lucene/replicator/http/HttpReplicator.java
URL: http://svn.apache.org/viewvc/lucene/dev/trunk/lucene/replicator/src/java/org/apache/lucene/replicator/http/HttpReplicator.java?rev=1481804&view=auto
==============================================================================
--- lucene/dev/trunk/lucene/replicator/src/java/org/apache/lucene/replicator/http/HttpReplicator.java (added)
+++ lucene/dev/trunk/lucene/replicator/src/java/org/apache/lucene/replicator/http/HttpReplicator.java Mon May 13 11:57:22 2013
@@ -0,0 +1,105 @@
+package org.apache.lucene.replicator.http;
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+import java.io.DataInputStream;
+import java.io.IOException;
+import java.io.InputStream;
+import java.util.concurrent.Callable;
+
+import org.apache.http.HttpResponse;
+import org.apache.http.conn.ClientConnectionManager;
+import org.apache.lucene.replicator.Replicator;
+import org.apache.lucene.replicator.Revision;
+import org.apache.lucene.replicator.SessionToken;
+import org.apache.lucene.replicator.http.ReplicationService.ReplicationAction;
+
+/**
+ * An HTTP implementation of {@link Replicator}. Assumes the API supported by
+ * {@link ReplicationService}.
+ *
+ * @lucene.experimental
+ */
+public class HttpReplicator extends HttpClientBase implements Replicator {
+
+ /** Construct with specified connection manager. */
+ public HttpReplicator(String host, int port, String path, ClientConnectionManager conMgr) {
+ super(host, port, path, conMgr);
+ }
+
+ @Override
+ public SessionToken checkForUpdate(String currVersion) throws IOException {
+ String[] params = null;
+ if (currVersion != null) {
+ params = new String[] { ReplicationService.REPLICATE_VERSION_PARAM, currVersion };
+ }
+ final HttpResponse response = executeGET(ReplicationAction.UPDATE.name(), params);
+ return doAction(response, new Callable<SessionToken>() {
+ @Override
+ public SessionToken call() throws Exception {
+ final DataInputStream dis = new DataInputStream(responseInputStream(response));
+ try {
+ if (dis.readByte() == 0) {
+ return null;
+ } else {
+ return new SessionToken(dis);
+ }
+ } finally {
+ dis.close();
+ }
+ }
+ });
+ }
+
+ @Override
+ public InputStream obtainFile(String sessionID, String source, String fileName) throws IOException {
+ String[] params = new String[] {
+ ReplicationService.REPLICATE_SESSION_ID_PARAM, sessionID,
+ ReplicationService.REPLICATE_SOURCE_PARAM, source,
+ ReplicationService.REPLICATE_FILENAME_PARAM, fileName,
+ };
+ final HttpResponse response = executeGET(ReplicationAction.OBTAIN.name(), params);
+ return doAction(response, false, new Callable<InputStream>() {
+ @Override
+ public InputStream call() throws Exception {
+ return responseInputStream(response,true);
+ }
+ });
+ }
+
+ @Override
+ public void publish(Revision revision) throws IOException {
+ throw new UnsupportedOperationException(
+ "this replicator implementation does not support remote publishing of revisions");
+ }
+
+ @Override
+ public void release(String sessionID) throws IOException {
+ String[] params = new String[] {
+ ReplicationService.REPLICATE_SESSION_ID_PARAM, sessionID
+ };
+ final HttpResponse response = executeGET(ReplicationAction.RELEASE.name(), params);
+ doAction(response, new Callable<Object>() {
+ @Override
+ public Object call() throws Exception {
+ return null; // do not remove this call: as it is still validating for us!
+ }
+ });
+ }
+
+}
Added: lucene/dev/trunk/lucene/replicator/src/java/org/apache/lucene/replicator/http/ReplicationService.java
URL: http://svn.apache.org/viewvc/lucene/dev/trunk/lucene/replicator/src/java/org/apache/lucene/replicator/http/ReplicationService.java?rev=1481804&view=auto
==============================================================================
--- lucene/dev/trunk/lucene/replicator/src/java/org/apache/lucene/replicator/http/ReplicationService.java (added)
+++ lucene/dev/trunk/lucene/replicator/src/java/org/apache/lucene/replicator/http/ReplicationService.java Mon May 13 11:57:22 2013
@@ -0,0 +1,198 @@
+package org.apache.lucene.replicator.http;
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+import java.io.DataOutputStream;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.ObjectOutputStream;
+import java.io.OutputStream;
+import java.util.ArrayList;
+import java.util.Locale;
+import java.util.Map;
+import java.util.StringTokenizer;
+
+import javax.servlet.ServletException;
+import javax.servlet.ServletOutputStream;
+import javax.servlet.http.HttpServletRequest;
+import javax.servlet.http.HttpServletResponse;
+
+import org.apache.http.HttpStatus;
+import org.apache.lucene.replicator.Replicator;
+import org.apache.lucene.replicator.SessionToken;
+
+/**
+ * A server-side service for handling replication requests. The service assumes
+ * requests are sent in the format
+ * <code>/<context>/<shard>/<action></code> where
+ * <ul>
+ * <li>{@code context} is the servlet context, e.g. {@link #REPLICATION_CONTEXT}
+ * <li>{@code shard} is the ID of the shard, e.g. "s1"
+ * <li>{@code action} is one of {@link ReplicationAction} values
+ * </ul>
+ * For example, to check whether there are revision updates for shard "s1" you
+ * should send the request: <code>http://host:port/replicate/s1/update</code>.
+ * <p>
+ * This service is written like a servlet, and
+ * {@link #perform(HttpServletRequest, HttpServletResponse)} takes servlet
+ * request and response accordingly, so it is quite easy to embed in your
+ * application's servlet.
+ *
+ * @lucene.experimental
+ */
+public class ReplicationService {
+
+ /** Actions supported by the {@link ReplicationService}. */
+ public enum ReplicationAction {
+ OBTAIN, RELEASE, UPDATE
+ }
+
+ /** The context path for the servlet. */
+ public static final String REPLICATION_CONTEXT = "/replicate";
+
+ /** Request parameter name for providing the revision version. */
+ public final static String REPLICATE_VERSION_PARAM = "version";
+
+ /** Request parameter name for providing a session ID. */
+ public final static String REPLICATE_SESSION_ID_PARAM = "sessionid";
+
+ /** Request parameter name for providing the file's source. */
+ public final static String REPLICATE_SOURCE_PARAM = "source";
+
+ /** Request parameter name for providing the file's name. */
+ public final static String REPLICATE_FILENAME_PARAM = "filename";
+
+ private static final int SHARD_IDX = 0, ACTION_IDX = 1;
+
+ private final Map<String,Replicator> replicators;
+
+ public ReplicationService(Map<String,Replicator> replicators) {
+ super();
+ this.replicators = replicators;
+ }
+
+ /**
+ * Returns the path elements that were given in the servlet request, excluding
+ * the servlet's action context.
+ */
+ private String[] getPathElements(HttpServletRequest req) {
+ String path = req.getServletPath();
+ String pathInfo = req.getPathInfo();
+ if (pathInfo != null) {
+ path += pathInfo;
+ }
+ int actionLen = REPLICATION_CONTEXT.length();
+ int startIdx = actionLen;
+ if (path.length() > actionLen && path.charAt(actionLen) == '/') {
+ ++startIdx;
+ }
+
+ // split the string on '/' and remove any empty elements. This is better
+ // than using String.split() since the latter may return empty elements in
+ // the array
+ StringTokenizer stok = new StringTokenizer(path.substring(startIdx), "/");
+ ArrayList<String> elements = new ArrayList<String>();
+ while (stok.hasMoreTokens()) {
+ elements.add(stok.nextToken());
+ }
+ return elements.toArray(new String[0]);
+ }
+
+ private static String extractRequestParam(HttpServletRequest req, String paramName) throws ServletException {
+ String param = req.getParameter(paramName);
+ if (param == null) {
+ throw new ServletException("Missing mandatory parameter: " + paramName);
+ }
+ return param;
+ }
+
+ private static void copy(InputStream in, OutputStream out) throws IOException {
+ byte[] buf = new byte[16384];
+ int numRead;
+ while ((numRead = in.read(buf)) != -1) {
+ out.write(buf, 0, numRead);
+ }
+ }
+
+ /** Executes the replication task. */
+ public void perform(HttpServletRequest req, HttpServletResponse resp) throws ServletException, IOException {
+ String[] pathElements = getPathElements(req);
+
+ if (pathElements.length != 2) {
+ throw new ServletException("invalid path, must contain shard ID and action, e.g. */s1/update");
+ }
+
+ final ReplicationAction action;
+ try {
+ action = ReplicationAction.valueOf(pathElements[ACTION_IDX].toUpperCase(Locale.ENGLISH));
+ } catch (IllegalArgumentException e) {
+ throw new ServletException("Unsupported action provided: " + pathElements[ACTION_IDX]);
+ }
+
+ final Replicator replicator = replicators.get(pathElements[SHARD_IDX]);
+ if (replicator == null) {
+ throw new ServletException("unrecognized shard ID " + pathElements[SHARD_IDX]);
+ }
+
+ ServletOutputStream resOut = resp.getOutputStream();
+ try {
+ switch (action) {
+ case OBTAIN:
+ final String sessionID = extractRequestParam(req, REPLICATE_SESSION_ID_PARAM);
+ final String fileName = extractRequestParam(req, REPLICATE_FILENAME_PARAM);
+ final String source = extractRequestParam(req, REPLICATE_SOURCE_PARAM);
+ InputStream in = replicator.obtainFile(sessionID, source, fileName);
+ try {
+ copy(in, resOut);
+ } finally {
+ in.close();
+ }
+ break;
+ case RELEASE:
+ replicator.release(extractRequestParam(req, REPLICATE_SESSION_ID_PARAM));
+ break;
+ case UPDATE:
+ String currVersion = req.getParameter(REPLICATE_VERSION_PARAM);
+ SessionToken token = replicator.checkForUpdate(currVersion);
+ if (token == null) {
+ resOut.write(0); // marker for null token
+ } else {
+ resOut.write(1); // marker for null token
+ token.serialize(new DataOutputStream(resOut));
+ }
+ break;
+ }
+ } catch (Exception e) {
+ resp.setStatus(HttpStatus.SC_INTERNAL_SERVER_ERROR); // propagate the failure
+ try {
+ /*
+ * Note: it is assumed that "identified exceptions" are thrown before
+ * anything was written to the stream.
+ */
+ ObjectOutputStream oos = new ObjectOutputStream(resOut);
+ oos.writeObject(e);
+ oos.flush();
+ } catch (Exception e2) {
+ throw new IOException("Could not serialize", e2);
+ }
+ } finally {
+ resp.flushBuffer();
+ }
+ }
+
+}
Added: lucene/dev/trunk/lucene/replicator/src/java/org/apache/lucene/replicator/http/package.html
URL: http://svn.apache.org/viewvc/lucene/dev/trunk/lucene/replicator/src/java/org/apache/lucene/replicator/http/package.html?rev=1481804&view=auto
==============================================================================
--- lucene/dev/trunk/lucene/replicator/src/java/org/apache/lucene/replicator/http/package.html (added)
+++ lucene/dev/trunk/lucene/replicator/src/java/org/apache/lucene/replicator/http/package.html Mon May 13 11:57:22 2013
@@ -0,0 +1,28 @@
+<html>
+
+<!--
+ Licensed to the Apache Software Foundation (ASF) under one or more
+ contributor license agreements. See the NOTICE file distributed with
+ this work for additional information regarding copyright ownership.
+ The ASF licenses this file to You under the Apache License, Version 2.0
+ (the "License"); you may not use this file except in compliance with
+ the License. You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+ Unless required by applicable law or agreed to in writing, software
+ distributed under the License is distributed on an "AS IS" BASIS,
+ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ See the License for the specific language governing permissions and
+ limitations under the License.
+-->
+
+<head>
+<title>HTTP replication implementation</title>
+</head>
+
+<body>
+<h1>HTTP replication implementation</h1>
+</body>
+
+</html>
\ No newline at end of file
Added: lucene/dev/trunk/lucene/replicator/src/java/org/apache/lucene/replicator/package.html
URL: http://svn.apache.org/viewvc/lucene/dev/trunk/lucene/replicator/src/java/org/apache/lucene/replicator/package.html?rev=1481804&view=auto
==============================================================================
--- lucene/dev/trunk/lucene/replicator/src/java/org/apache/lucene/replicator/package.html (added)
+++ lucene/dev/trunk/lucene/replicator/src/java/org/apache/lucene/replicator/package.html Mon May 13 11:57:22 2013
@@ -0,0 +1,79 @@
+<html>
+
+<!--
+ Licensed to the Apache Software Foundation (ASF) under one or more
+ contributor license agreements. See the NOTICE file distributed with
+ this work for additional information regarding copyright ownership.
+ The ASF licenses this file to You under the Apache License, Version 2.0
+ (the "License"); you may not use this file except in compliance with
+ the License. You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+ Unless required by applicable law or agreed to in writing, software
+ distributed under the License is distributed on an "AS IS" BASIS,
+ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ See the License for the specific language governing permissions and
+ limitations under the License.
+-->
+
+<head>
+<title>Files replication framework</title>
+</head>
+
+<body>
+<h1>Files replication framework</h1>
+
+ The
+ <a href="Replicator.html">Replicator</a> allows replicating files between a server and client(s). Producers publish
+ <a href="Revision.html">revisions</a> and consumers update to the latest revision available.
+ <a href="ReplicationClient.html">ReplicationClient</a> is a helper utility for performing the update operation. It can
+ be invoked either
+ <a href="ReplicationClient.html#updateNow()">manually</a> or periodically by
+ <a href="ReplicationClient.html#startUpdateThread(long, java.lang.String)">starting an update thread</a>.
+ <a href="http/HttpReplicator.html">HttpReplicator</a> can be used to replicate revisions by consumers that reside on
+ a different node than the producer.
+
+ <p />
+ The replication framework supports replicating any type of files, with built-in support for a single search index as
+ well as an index and taxonomy pair. For a single index, the application should publish an
+ <a href="IndexRevision.html">IndexRevision</a> and set
+ <a href="IndexReplicationHandler.html">IndexReplicationHandler</a> on the client. For an index and taxonomy pair, the
+ application should publish an <a href="IndexAndTaxonomyRevision.html">IndexAndTaxonomyRevision</a> and set
+ <a href="IndexAndTaxonomyReplicationHandler.html">IndexAndTaxonomyReplicationHandler</a> on the client.
+
+ <p />
+ When the replication client detects that there is a newer revision available, it copies the files of the revision and
+ then invokes the handler to complete the operation (e.g. copy the files to the index directory, fsync them, reopen an
+ index reader etc.). By default, only files that do not exist in the handler's
+ <a href="ReplicationClient.ReplicationHandler.html#currentRevisionFiles()">current revision files</a> are copied,
+ however this can be overridden by extending the client.
+
+ <p />
+ An example usage of the Replicator:
+
+<pre class="prettyprint lang-java">
+// ++++++++++++++ SERVER SIDE ++++++++++++++ //
+IndexWriter publishWriter; // the writer used for indexing
+Replicator replicator = new LocalReplicator();
+replicator.publish(new IndexRevision(publishWriter));
+
+// ++++++++++++++ CLIENT SIDE ++++++++++++++ //
+// either LocalReplictor, or HttpReplicator if client and server are on different nodes
+Replicator replicator;
+
+// callback invoked after handler finished handling the revision and e.g. can reopen the reader.
+Callable<Boolean> callback = null; // can also be null if no callback is needed
+ReplicationHandler handler = new IndexReplicationHandler(indexDir, callback);
+SourceDirectoryFactory factory = new PerSessionDirectoryFactory(workDir);
+ReplicationClient client = new ReplicationClient(replicator, handler, factory);
+
+// invoke client manually
+client.updateNow();
+
+// or, periodically
+client.startUpdateThread(100); // check for update every 100 milliseconds
+</pre>
+
+</body>
+</html>
\ No newline at end of file
Added: lucene/dev/trunk/lucene/replicator/src/java/overview.html
URL: http://svn.apache.org/viewvc/lucene/dev/trunk/lucene/replicator/src/java/overview.html?rev=1481804&view=auto
==============================================================================
--- lucene/dev/trunk/lucene/replicator/src/java/overview.html (added)
+++ lucene/dev/trunk/lucene/replicator/src/java/overview.html Mon May 13 11:57:22 2013
@@ -0,0 +1,26 @@
+<!--
+ Licensed to the Apache Software Foundation (ASF) under one or more
+ contributor license agreements. See the NOTICE file distributed with
+ this work for additional information regarding copyright ownership.
+ The ASF licenses this file to You under the Apache License, Version 2.0
+ (the "License"); you may not use this file except in compliance with
+ the License. You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+ Unless required by applicable law or agreed to in writing, software
+ distributed under the License is distributed on an "AS IS" BASIS,
+ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ See the License for the specific language governing permissions and
+ limitations under the License.
+-->
+<html>
+ <head>
+ <title>
+ replicator
+ </title>
+ </head>
+ <body>
+ Provides index files replication capabilities.
+ </body>
+</html>
Added: lucene/dev/trunk/lucene/replicator/src/test/org/apache/lucene/replicator/IndexAndTaxonomyReplicationClientTest.java
URL: http://svn.apache.org/viewvc/lucene/dev/trunk/lucene/replicator/src/test/org/apache/lucene/replicator/IndexAndTaxonomyReplicationClientTest.java?rev=1481804&view=auto
==============================================================================
--- lucene/dev/trunk/lucene/replicator/src/test/org/apache/lucene/replicator/IndexAndTaxonomyReplicationClientTest.java (added)
+++ lucene/dev/trunk/lucene/replicator/src/test/org/apache/lucene/replicator/IndexAndTaxonomyReplicationClientTest.java Mon May 13 11:57:22 2013
@@ -0,0 +1,444 @@
+package org.apache.lucene.replicator;
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+import java.io.Closeable;
+import java.io.File;
+import java.io.IOException;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.concurrent.Callable;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import org.apache.lucene.document.Document;
+import org.apache.lucene.facet.index.FacetFields;
+import org.apache.lucene.facet.params.FacetIndexingParams;
+import org.apache.lucene.facet.params.FacetSearchParams;
+import org.apache.lucene.facet.search.CountFacetRequest;
+import org.apache.lucene.facet.search.DrillDownQuery;
+import org.apache.lucene.facet.search.FacetsCollector;
+import org.apache.lucene.facet.taxonomy.CategoryPath;
+import org.apache.lucene.facet.taxonomy.TaxonomyReader;
+import org.apache.lucene.facet.taxonomy.TaxonomyWriter;
+import org.apache.lucene.facet.taxonomy.directory.DirectoryTaxonomyReader;
+import org.apache.lucene.facet.taxonomy.directory.DirectoryTaxonomyWriter;
+import org.apache.lucene.index.DirectoryReader;
+import org.apache.lucene.index.IndexWriter;
+import org.apache.lucene.index.IndexWriterConfig;
+import org.apache.lucene.index.SnapshotDeletionPolicy;
+import org.apache.lucene.replicator.IndexAndTaxonomyRevision.SnapshotDirectoryTaxonomyWriter;
+import org.apache.lucene.replicator.ReplicationClient.ReplicationHandler;
+import org.apache.lucene.replicator.ReplicationClient.SourceDirectoryFactory;
+import org.apache.lucene.search.IndexSearcher;
+import org.apache.lucene.search.MatchAllDocsQuery;
+import org.apache.lucene.search.TopDocs;
+import org.apache.lucene.store.Directory;
+import org.apache.lucene.store.MockDirectoryWrapper;
+import org.apache.lucene.util.IOUtils;
+import org.apache.lucene.util.ThreadInterruptedException;
+import org.apache.lucene.util._TestUtil;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+
+public class IndexAndTaxonomyReplicationClientTest extends ReplicatorTestCase {
+
+ private static class IndexAndTaxonomyReadyCallback implements Callable<Boolean>, Closeable {
+
+ private final Directory indexDir, taxoDir;
+ private DirectoryReader indexReader;
+ private DirectoryTaxonomyReader taxoReader;
+ private long lastIndexGeneration = -1;
+
+ public IndexAndTaxonomyReadyCallback(Directory indexDir, Directory taxoDir) throws IOException {
+ this.indexDir = indexDir;
+ this.taxoDir = taxoDir;
+ if (DirectoryReader.indexExists(indexDir)) {
+ indexReader = DirectoryReader.open(indexDir);
+ lastIndexGeneration = indexReader.getIndexCommit().getGeneration();
+ taxoReader = new DirectoryTaxonomyReader(taxoDir);
+ }
+ }
+
+ @Override
+ public Boolean call() throws Exception {
+ if (indexReader == null) {
+ indexReader = DirectoryReader.open(indexDir);
+ lastIndexGeneration = indexReader.getIndexCommit().getGeneration();
+ taxoReader = new DirectoryTaxonomyReader(taxoDir);
+ } else {
+ // verify search index
+ DirectoryReader newReader = DirectoryReader.openIfChanged(indexReader);
+ assertNotNull("should not have reached here if no changes were made to the index", newReader);
+ long newGeneration = newReader.getIndexCommit().getGeneration();
+ assertTrue("expected newer generation; current=" + lastIndexGeneration + " new=" + newGeneration, newGeneration > lastIndexGeneration);
+ indexReader.close();
+ indexReader = newReader;
+ lastIndexGeneration = newGeneration;
+ _TestUtil.checkIndex(indexDir);
+
+ // verify taxonomy index
+ DirectoryTaxonomyReader newTaxoReader = TaxonomyReader.openIfChanged(taxoReader);
+ if (newTaxoReader != null) {
+ taxoReader.close();
+ taxoReader = newTaxoReader;
+ }
+ _TestUtil.checkIndex(taxoDir);
+
+ // verify faceted search
+ int id = Integer.parseInt(indexReader.getIndexCommit().getUserData().get(VERSION_ID), 16);
+ CategoryPath cp = new CategoryPath("A", Integer.toString(id, 16));
+ IndexSearcher searcher = new IndexSearcher(indexReader);
+ FacetsCollector fc = FacetsCollector.create(new FacetSearchParams(new CountFacetRequest(cp, 10)), indexReader, taxoReader);
+ searcher.search(new MatchAllDocsQuery(), fc);
+ assertEquals(1, (int) fc.getFacetResults().get(0).getFacetResultNode().value);
+
+ DrillDownQuery drillDown = new DrillDownQuery(FacetIndexingParams.DEFAULT);
+ drillDown.add(cp);
+ TopDocs docs = searcher.search(drillDown, 10);
+ assertEquals(1, docs.totalHits);
+ }
+ return null;
+ }
+
+ @Override
+ public void close() throws IOException {
+ IOUtils.close(indexReader, taxoReader);
+ }
+ }
+
+ private Directory publishIndexDir, publishTaxoDir;
+ private MockDirectoryWrapper handlerIndexDir, handlerTaxoDir;
+ private Replicator replicator;
+ private SourceDirectoryFactory sourceDirFactory;
+ private ReplicationClient client;
+ private ReplicationHandler handler;
+ private IndexWriter publishIndexWriter;
+ private SnapshotDirectoryTaxonomyWriter publishTaxoWriter;
+ private IndexAndTaxonomyReadyCallback callback;
+ private File clientWorkDir;
+
+ private static final String VERSION_ID = "version";
+
+ private void assertHandlerRevision(int expectedID, Directory dir) throws IOException {
+ // loop as long as client is alive. test-framework will terminate us if
+ // there's a serious bug, e.g. client doesn't really update. otherwise,
+ // introducing timeouts is not good, can easily lead to false positives.
+ while (client.isUpdateThreadAlive()) {
+ // give client a chance to update
+ try {
+ Thread.sleep(100);
+ } catch (InterruptedException e) {
+ throw new ThreadInterruptedException(e);
+ }
+
+ try {
+ DirectoryReader reader = DirectoryReader.open(dir);
+ try {
+ int handlerID = Integer.parseInt(reader.getIndexCommit().getUserData().get(VERSION_ID), 16);
+ if (expectedID == handlerID) {
+ return;
+ }
+ } finally {
+ reader.close();
+ }
+ } catch (Exception e) {
+ // we can hit IndexNotFoundException or e.g. EOFException (on
+ // segments_N) because it is being copied at the same time it is read by
+ // DirectoryReader.open().
+ }
+ }
+ }
+
+ private Revision createRevision(final int id) throws IOException {
+ publishIndexWriter.addDocument(newDocument(publishTaxoWriter, id));
+ publishIndexWriter.setCommitData(new HashMap<String, String>() {{
+ put(VERSION_ID, Integer.toString(id, 16));
+ }});
+ publishIndexWriter.commit();
+ publishTaxoWriter.commit();
+ return new IndexAndTaxonomyRevision(publishIndexWriter, publishTaxoWriter);
+ }
+
+ private Document newDocument(TaxonomyWriter taxoWriter, int id) throws IOException {
+ Document doc = new Document();
+ FacetFields facetFields = new FacetFields(taxoWriter);
+ facetFields.addFields(doc, Collections.singleton(new CategoryPath("A", Integer.toString(id, 16))));
+ return doc;
+ }
+
+ @Override
+ @Before
+ public void setUp() throws Exception {
+ super.setUp();
+ publishIndexDir = newDirectory();
+ publishTaxoDir = newDirectory();
+ handlerIndexDir = newMockDirectory();
+ handlerTaxoDir = newMockDirectory();
+ clientWorkDir = _TestUtil.getTempDir("replicationClientTest");
+ sourceDirFactory = new PerSessionDirectoryFactory(clientWorkDir);
+ replicator = new LocalReplicator();
+ callback = new IndexAndTaxonomyReadyCallback(handlerIndexDir, handlerTaxoDir);
+ handler = new IndexAndTaxonomyReplicationHandler(handlerIndexDir, handlerTaxoDir, callback);
+ client = new ReplicationClient(replicator, handler, sourceDirFactory);
+
+ IndexWriterConfig conf = newIndexWriterConfig(TEST_VERSION_CURRENT, null);
+ conf.setIndexDeletionPolicy(new SnapshotDeletionPolicy(conf.getIndexDeletionPolicy()));
+ publishIndexWriter = new IndexWriter(publishIndexDir, conf);
+ publishTaxoWriter = new SnapshotDirectoryTaxonomyWriter(publishTaxoDir);
+ }
+
+ @After
+ @Override
+ public void tearDown() throws Exception {
+ IOUtils.close(client, callback, publishIndexWriter, publishTaxoWriter, replicator, publishIndexDir, publishTaxoDir,
+ handlerIndexDir, handlerTaxoDir);
+ super.tearDown();
+ }
+
+ @Test
+ public void testNoUpdateThread() throws Exception {
+ assertNull("no version expected at start", handler.currentVersion());
+
+ // Callback validates the replicated index
+ replicator.publish(createRevision(1));
+ client.updateNow();
+
+ // make sure updating twice, when in fact there's nothing to update, works
+ client.updateNow();
+
+ replicator.publish(createRevision(2));
+ client.updateNow();
+
+ // Publish two revisions without update, handler should be upgraded to latest
+ replicator.publish(createRevision(3));
+ replicator.publish(createRevision(4));
+ client.updateNow();
+ }
+
+ @Test
+ public void testRestart() throws Exception {
+ replicator.publish(createRevision(1));
+ client.updateNow();
+
+ replicator.publish(createRevision(2));
+ client.updateNow();
+
+ client.stopUpdateThread();
+ client.close();
+ client = new ReplicationClient(replicator, handler, sourceDirFactory);
+
+ // Publish two revisions without update, handler should be upgraded to latest
+ replicator.publish(createRevision(3));
+ replicator.publish(createRevision(4));
+ client.updateNow();
+ }
+
+ @Test
+ public void testUpdateThread() throws Exception {
+ client.startUpdateThread(10, "indexTaxo");
+
+ replicator.publish(createRevision(1));
+ assertHandlerRevision(1, handlerIndexDir);
+
+ replicator.publish(createRevision(2));
+ assertHandlerRevision(2, handlerIndexDir);
+
+ // Publish two revisions without update, handler should be upgraded to latest
+ replicator.publish(createRevision(3));
+ replicator.publish(createRevision(4));
+ assertHandlerRevision(4, handlerIndexDir);
+ }
+
+ @Test
+ public void testRecreateTaxonomy() throws Exception {
+ replicator.publish(createRevision(1));
+ client.updateNow();
+
+ // recreate index and taxonomy
+ Directory newTaxo = newDirectory();
+ new DirectoryTaxonomyWriter(newTaxo).close();
+ publishTaxoWriter.replaceTaxonomy(newTaxo);
+ publishIndexWriter.deleteAll();
+ replicator.publish(createRevision(2));
+
+ client.updateNow();
+ newTaxo.close();
+ }
+
+ /*
+ * This test verifies that the client and handler do not end up in a corrupt
+ * index if exceptions are thrown at any point during replication. Either when
+ * a client copies files from the server to the temporary space, or when the
+ * handler copies them to the index directory.
+ */
+ @Test
+ public void testConsistencyOnExceptions() throws Exception {
+ // so the handler's index isn't empty
+ replicator.publish(createRevision(1));
+ client.updateNow();
+ client.close();
+ callback.close();
+
+ // Replicator violates write-once policy. It may be that the
+ // handler copies files to the index dir, then fails to copy a
+ // file and reverts the copy operation. On the next attempt, it
+ // will copy the same file again. There is nothing wrong with this
+ // in a real system, but it does violate write-once, and MDW
+ // doesn't like it. Disabling it means that we won't catch cases
+ // where the handler overwrites an existing index file, but
+ // there's nothing currently we can do about it, unless we don't
+ // use MDW.
+ handlerIndexDir.setPreventDoubleWrite(false);
+ handlerTaxoDir.setPreventDoubleWrite(false);
+
+ // wrap sourceDirFactory to return a MockDirWrapper so we can simulate errors
+ final SourceDirectoryFactory in = sourceDirFactory;
+ final AtomicInteger failures = new AtomicInteger(atLeast(10));
+ sourceDirFactory = new SourceDirectoryFactory() {
+
+ private long clientMaxSize = 100, handlerIndexMaxSize = 100, handlerTaxoMaxSize = 100;
+ private double clientExRate = 1.0, handlerIndexExRate = 1.0, handlerTaxoExRate = 1.0;
+
+ @Override
+ public void cleanupSession(String sessionID) throws IOException {
+ in.cleanupSession(sessionID);
+ }
+
+ @SuppressWarnings("synthetic-access")
+ @Override
+ public Directory getDirectory(String sessionID, String source) throws IOException {
+ Directory dir = in.getDirectory(sessionID, source);
+ if (random().nextBoolean() && failures.get() > 0) { // client should fail, return wrapped dir
+ MockDirectoryWrapper mdw = new MockDirectoryWrapper(random(), dir);
+ mdw.setRandomIOExceptionRateOnOpen(clientExRate);
+ mdw.setMaxSizeInBytes(clientMaxSize);
+ mdw.setRandomIOExceptionRate(clientExRate);
+ mdw.setCheckIndexOnClose(false);
+ clientMaxSize *= 2;
+ clientExRate /= 2;
+ return mdw;
+ }
+
+ if (failures.get() > 0 && random().nextBoolean()) { // handler should fail
+ if (random().nextBoolean()) { // index dir fail
+ handlerIndexDir.setMaxSizeInBytes(handlerIndexMaxSize);
+ handlerIndexDir.setRandomIOExceptionRate(handlerIndexExRate);
+ handlerIndexDir.setRandomIOExceptionRateOnOpen(handlerIndexExRate);
+ handlerIndexMaxSize *= 2;
+ handlerIndexExRate /= 2;
+ } else { // taxo dir fail
+ handlerTaxoDir.setMaxSizeInBytes(handlerTaxoMaxSize);
+ handlerTaxoDir.setRandomIOExceptionRate(handlerTaxoExRate);
+ handlerTaxoDir.setRandomIOExceptionRateOnOpen(handlerTaxoExRate);
+ handlerTaxoDir.setCheckIndexOnClose(false);
+ handlerTaxoMaxSize *= 2;
+ handlerTaxoExRate /= 2;
+ }
+ } else {
+ // disable all errors
+ handlerIndexDir.setMaxSizeInBytes(0);
+ handlerIndexDir.setRandomIOExceptionRate(0.0);
+ handlerIndexDir.setRandomIOExceptionRateOnOpen(0.0);
+ handlerTaxoDir.setMaxSizeInBytes(0);
+ handlerTaxoDir.setRandomIOExceptionRate(0.0);
+ handlerTaxoDir.setRandomIOExceptionRateOnOpen(0.0);
+ }
+
+ return dir;
+ }
+ };
+
+ handler = new IndexAndTaxonomyReplicationHandler(handlerIndexDir, handlerTaxoDir, new Callable<Boolean>() {
+ @Override
+ public Boolean call() throws Exception {
+ if (random().nextDouble() < 0.2 && failures.get() > 0) {
+ throw new RuntimeException("random exception from callback");
+ }
+ return null;
+ }
+ });
+
+ // wrap handleUpdateException so we can act on the thrown exception
+ client = new ReplicationClient(replicator, handler, sourceDirFactory) {
+ @SuppressWarnings("synthetic-access")
+ @Override
+ protected void handleUpdateException(Throwable t) {
+ if (t instanceof IOException) {
+ try {
+ if (VERBOSE) {
+ System.out.println("hit exception during update: " + t);
+ t.printStackTrace(System.out);
+ }
+
+ // test that the index can be read and also some basic statistics
+ DirectoryReader reader = DirectoryReader.open(handlerIndexDir.getDelegate());
+ try {
+ int numDocs = reader.numDocs();
+ int version = Integer.parseInt(reader.getIndexCommit().getUserData().get(VERSION_ID), 16);
+ assertEquals(numDocs, version);
+ } finally {
+ reader.close();
+ }
+ // verify index is fully consistent
+ _TestUtil.checkIndex(handlerIndexDir.getDelegate());
+
+ // verify taxonomy index is fully consistent (since we only add one
+ // category to all documents, there's nothing much more to validate
+ _TestUtil.checkIndex(handlerTaxoDir.getDelegate());
+ } catch (IOException e) {
+ throw new RuntimeException(e);
+ } finally {
+ // count-down number of failures
+ failures.decrementAndGet();
+ assert failures.get() >= 0 : "handler failed too many times: " + failures.get();
+ if (VERBOSE) {
+ if (failures.get() == 0) {
+ System.out.println("no more failures expected");
+ } else {
+ System.out.println("num failures left: " + failures.get());
+ }
+ }
+ }
+ } else {
+ if (t instanceof RuntimeException) throw (RuntimeException) t;
+ throw new RuntimeException(t);
+ }
+ }
+ };
+
+ client.startUpdateThread(10, "indexAndTaxo");
+
+ final Directory baseHandlerIndexDir = handlerIndexDir.getDelegate();
+ int numRevisions = atLeast(20) + 2;
+ for (int i = 2; i < numRevisions; i++) {
+ replicator.publish(createRevision(i));
+ assertHandlerRevision(i, baseHandlerIndexDir);
+ }
+
+ // disable errors -- maybe randomness didn't exhaust all allowed failures,
+ // and we don't want e.g. CheckIndex to hit false errors.
+ handlerIndexDir.setMaxSizeInBytes(0);
+ handlerIndexDir.setRandomIOExceptionRate(0.0);
+ handlerIndexDir.setRandomIOExceptionRateOnOpen(0.0);
+ handlerTaxoDir.setMaxSizeInBytes(0);
+ handlerTaxoDir.setRandomIOExceptionRate(0.0);
+ handlerTaxoDir.setRandomIOExceptionRateOnOpen(0.0);
+ }
+
+}