You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@lucene.apache.org by sh...@apache.org on 2013/05/13 13:57:24 UTC

svn commit: r1481804 [3/4] - in /lucene/dev/trunk: dev-tools/maven/lucene/replicator/ lucene/ lucene/core/src/java/org/apache/lucene/index/ lucene/licenses/ lucene/replicator/ lucene/replicator/lib/ lucene/replicator/src/ lucene/replicator/src/java/ lu...

Added: lucene/dev/trunk/lucene/replicator/src/java/org/apache/lucene/replicator/ReplicationClient.java
URL: http://svn.apache.org/viewvc/lucene/dev/trunk/lucene/replicator/src/java/org/apache/lucene/replicator/ReplicationClient.java?rev=1481804&view=auto
==============================================================================
--- lucene/dev/trunk/lucene/replicator/src/java/org/apache/lucene/replicator/ReplicationClient.java (added)
+++ lucene/dev/trunk/lucene/replicator/src/java/org/apache/lucene/replicator/ReplicationClient.java Mon May 13 11:57:22 2013
@@ -0,0 +1,416 @@
+package org.apache.lucene.replicator;
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+import java.io.Closeable;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.PrintWriter;
+import java.io.StringWriter;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Map.Entry;
+import java.util.Set;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.locks.Lock;
+import java.util.concurrent.locks.ReentrantLock;
+
+import org.apache.lucene.store.AlreadyClosedException;
+import org.apache.lucene.store.Directory;
+import org.apache.lucene.store.IOContext;
+import org.apache.lucene.store.IndexOutput;
+import org.apache.lucene.util.IOUtils;
+import org.apache.lucene.util.InfoStream;
+import org.apache.lucene.util.ThreadInterruptedException;
+
+/**
+ * A client which monitors and obtains new revisions from a {@link Replicator}.
+ * It can be used to either periodically check for updates by invoking
+ * {@link #startUpdateThread}, or manually by calling {@link #updateNow()}.
+ * <p>
+ * Whenever a new revision is available, the {@link #requiredFiles(Map)} are
+ * copied to the {@link Directory} specified by {@link PerSessionDirectoryFactory} and
+ * a handler is notified.
+ * 
+ * @lucene.experimental
+ */
+public class ReplicationClient implements Closeable {
+  
+  private class ReplicationThread extends Thread {
+    
+    private final long interval;
+    
+    // client uses this to stop us
+    final CountDownLatch stop = new CountDownLatch(1);
+    
+    public ReplicationThread(long interval) {
+      this.interval = interval;
+    }
+    
+    @SuppressWarnings("synthetic-access")
+    @Override
+    public void run() {
+      while (true) {
+        long time = System.currentTimeMillis();
+        updateLock.lock();
+        try {
+          doUpdate();
+        } catch (Throwable t) {
+          handleUpdateException(t);
+        } finally {
+          updateLock.unlock();
+        }
+        time = System.currentTimeMillis() - time;
+        
+        // adjust timeout to compensate the time spent doing the replication.
+        final long timeout = interval - time;
+        if (timeout > 0) {
+          try {
+            // this will return immediately if we were ordered to stop (count=0)
+            // or the timeout has elapsed. if it returns true, it means count=0,
+            // so terminate.
+            if (stop.await(timeout, TimeUnit.MILLISECONDS)) {
+              return;
+            }
+          } catch (InterruptedException e) {
+            // if we were interruted, somebody wants to terminate us, so just
+            // throw the exception further.
+            Thread.currentThread().interrupt();
+            throw new ThreadInterruptedException(e);
+          }
+        }
+      }
+    }
+    
+  }
+  
+  /** Handler for revisions obtained by the client. */
+  public static interface ReplicationHandler {
+    
+    /** Returns the current revision files held by the handler. */
+    public Map<String,List<RevisionFile>> currentRevisionFiles();
+    
+    /** Returns the current revision version held by the handler. */
+    public String currentVersion();
+    
+    /**
+     * Called when a new revision was obtained and is available (i.e. all needed
+     * files were successfully copied).
+     * 
+     * @param version
+     *          the version of the {@link Revision} that was copied
+     * @param revisionFiles
+     *          the files contained by this {@link Revision}
+     * @param copiedFiles
+     *          the files that were actually copied
+     * @param sourceDirectory
+     *          a mapping from a source of files to the {@link Directory} they
+     *          were copied into
+     */
+    public void revisionReady(String version, Map<String,List<RevisionFile>> revisionFiles, 
+        Map<String,List<String>> copiedFiles, Map<String, Directory> sourceDirectory) throws IOException;
+  }
+  
+  /**
+   * Resolves a session and source into a {@link Directory} to use for copying
+   * the session files to.
+   */
+  public static interface SourceDirectoryFactory {
+    
+    /**
+     * Called to denote that the replication actions for this session were finished and the directory is no longer needed. 
+     */
+    public void cleanupSession(String sessionID) throws IOException;
+    
+    /**
+     * Returns the {@link Directory} to use for the given session and source.
+     * Implementations may e.g. return different directories for different
+     * sessions, or the same directory for all sessions. In that case, it is
+     * advised to clean the directory before it is used for a new session.
+     * 
+     * @see #cleanupSession(String)
+     */
+    public Directory getDirectory(String sessionID, String source) throws IOException;
+    
+  }
+  
+  /** The component name to use with {@link InfoStream#isEnabled(String)}. */
+  public static final String INFO_STREAM_COMPONENT = "ReplicationThread";
+  
+  private final Replicator replicator;
+  private final ReplicationHandler handler;
+  private final SourceDirectoryFactory factory;
+  private final byte[] copyBuffer = new byte[16384];
+  private final Lock updateLock = new ReentrantLock();
+  
+  private volatile ReplicationThread updateThread;
+  private volatile boolean closed = false;
+  private volatile InfoStream infoStream = InfoStream.getDefault();
+  
+  /**
+   * Constructor.
+   * 
+   * @param replicator the {@link Replicator} used for checking for updates
+   * @param handler notified when new revisions are ready
+   * @param factory returns a {@link Directory} for a given source and session 
+   */
+  public ReplicationClient(Replicator replicator, ReplicationHandler handler, SourceDirectoryFactory factory) {
+    this.replicator = replicator;
+    this.handler = handler;
+    this.factory = factory;
+  }
+  
+  private void copyBytes(IndexOutput out, InputStream in) throws IOException {
+    int numBytes;
+    while ((numBytes = in.read(copyBuffer)) > 0) {
+      out.writeBytes(copyBuffer, 0, numBytes);
+    }
+  }
+  
+  private void doUpdate() throws IOException {
+    SessionToken session = null;
+    final Map<String,Directory> sourceDirectory = new HashMap<String,Directory>();
+    final Map<String,List<String>> copiedFiles = new HashMap<String,List<String>>();
+    boolean notify = false;
+    try {
+      final String version = handler.currentVersion();
+      session = replicator.checkForUpdate(version);
+      if (infoStream.isEnabled(INFO_STREAM_COMPONENT)) {
+        infoStream.message(INFO_STREAM_COMPONENT, "doUpdate(): handlerVersion=" + version + " session=" + session);
+      }
+      if (session == null) {
+        // already up to date
+        return;
+      }
+      Map<String,List<RevisionFile>> requiredFiles = requiredFiles(session.sourceFiles);
+      if (infoStream.isEnabled(INFO_STREAM_COMPONENT)) {
+        infoStream.message(INFO_STREAM_COMPONENT, "doUpdate(): requiredFiles=" + requiredFiles);
+      }
+      for (Entry<String,List<RevisionFile>> e : requiredFiles.entrySet()) {
+        String source = e.getKey();
+        Directory dir = factory.getDirectory(session.id, source);
+        sourceDirectory.put(source, dir);
+        List<String> cpFiles = new ArrayList<String>();
+        copiedFiles.put(source, cpFiles);
+        for (RevisionFile file : e.getValue()) {
+          if (closed) {
+            // if we're closed, abort file copy
+            if (infoStream.isEnabled(INFO_STREAM_COMPONENT)) {
+              infoStream.message(INFO_STREAM_COMPONENT, "doUpdate(): detected client was closed); abort file copy");
+            }
+            return;
+          }
+          InputStream in = null;
+          IndexOutput out = null;
+          try {
+            in = replicator.obtainFile(session.id, source, file.fileName);
+            out = dir.createOutput(file.fileName, IOContext.DEFAULT);
+            copyBytes(out, in);
+            cpFiles.add(file.fileName);
+            // TODO add some validation, on size / checksum
+          } finally {
+            IOUtils.close(in, out);
+          }
+        }
+      }
+      // only notify if all required files were successfully obtained.
+      notify = true;
+    } finally {
+      if (session != null) {
+        try {
+          replicator.release(session.id);
+        } finally {
+          if (!notify) { // cleanup after ourselves
+            IOUtils.close(sourceDirectory.values());
+            factory.cleanupSession(session.id);
+          }
+        }
+      }
+    }
+    
+    // notify outside the try-finally above, so the session is released sooner.
+    // the handler may take time to finish acting on the copied files, but the
+    // session itself is no longer needed.
+    try {
+      if (notify && !closed ) { // no use to notify if we are closed already
+        handler.revisionReady(session.version, session.sourceFiles, copiedFiles, sourceDirectory);
+      }
+    } finally {
+      IOUtils.close(sourceDirectory.values());
+      if (session != null) {
+        factory.cleanupSession(session.id);
+      }
+    }
+  }
+  
+  /** Throws {@link AlreadyClosedException} if the client has already been closed. */
+  protected final void ensureOpen() {
+    if (closed) {
+      throw new AlreadyClosedException("this update client has already been closed");
+    }
+  }
+  
+  /**
+   * Called when an exception is hit by the replication thread. The default
+   * implementation prints the full stacktrace to the {@link InfoStream} set in
+   * {@link #setInfoStream(InfoStream)}, or the {@link InfoStream#getDefault()
+   * default} one. You can override to log the exception elswhere.
+   * <p>
+   * <b>NOTE:</b> if you override this method to throw the exception further,
+   * the replication thread will be terminated. The only way to restart it is to
+   * call {@link #stopUpdateThread()} followed by
+   * {@link #startUpdateThread(long, String)}.
+   */
+  protected void handleUpdateException(Throwable t) {
+    final StringWriter sw = new StringWriter();
+    t.printStackTrace(new PrintWriter(sw));
+    if (infoStream.isEnabled(INFO_STREAM_COMPONENT)) {
+      infoStream.message(INFO_STREAM_COMPONENT, "an error occurred during revision update: " + sw.toString());
+    }
+  }
+  
+  /**
+   * Returns the files required for replication. By default, this method returns
+   * all files that exist in the new revision, but not in the handler.
+   */
+  protected Map<String,List<RevisionFile>> requiredFiles(Map<String,List<RevisionFile>> newRevisionFiles) {
+    Map<String,List<RevisionFile>> handlerRevisionFiles = handler.currentRevisionFiles();
+    if (handlerRevisionFiles == null) {
+      return newRevisionFiles;
+    }
+    
+    Map<String,List<RevisionFile>> requiredFiles = new HashMap<String,List<RevisionFile>>();
+    for (Entry<String,List<RevisionFile>> e : handlerRevisionFiles.entrySet()) {
+      // put the handler files in a Set, for faster contains() checks later
+      Set<String> handlerFiles = new HashSet<String>();
+      for (RevisionFile file : e.getValue()) {
+        handlerFiles.add(file.fileName);
+      }
+      
+      // make sure to preserve revisionFiles order
+      ArrayList<RevisionFile> res = new ArrayList<RevisionFile>();
+      String source = e.getKey();
+      assert newRevisionFiles.containsKey(source) : "source not found in newRevisionFiles: " + newRevisionFiles;
+      for (RevisionFile file : newRevisionFiles.get(source)) {
+        if (!handlerFiles.contains(file.fileName)) {
+          res.add(file);
+        }
+      }
+      requiredFiles.put(source, res);
+    }
+    
+    return requiredFiles;
+  }
+  
+  @Override
+  public synchronized void close() {
+    if (!closed) {
+      stopUpdateThread();
+      closed = true;
+    }
+  }
+  
+  /**
+   * Start the update thread with the specified interval in milliseconds. For
+   * debugging purposes, you can optionally set the name to set on
+   * {@link Thread#setName(String)}. If you pass {@code null}, a default name
+   * will be set.
+   * 
+   * @throws IllegalStateException if the thread has already been started
+   */
+  public synchronized void startUpdateThread(long intervalMillis, String threadName) {
+    ensureOpen();
+    if (updateThread != null && updateThread.isAlive()) {
+      throw new IllegalStateException(
+          "cannot start an update thread when one is running, must first call 'stopUpdateThread()'");
+    }
+    threadName = threadName == null ? INFO_STREAM_COMPONENT : "ReplicationThread-" + threadName;
+    updateThread = new ReplicationThread(intervalMillis);
+    updateThread.setName(threadName);
+    updateThread.start();
+    // we rely on isAlive to return true in isUpdateThreadAlive, assert to be on the safe side
+    assert updateThread.isAlive() : "updateThread started but not alive?";
+  }
+  
+  /**
+   * Stop the update thread. If the update thread is not running, silently does
+   * nothing. This method returns after the update thread has stopped.
+   */
+  public synchronized void stopUpdateThread() {
+    if (updateThread != null) {
+      // this will trigger the thread to terminate if it awaits the lock.
+      // otherwise, if it's in the middle of replication, we wait for it to
+      // stop.
+      updateThread.stop.countDown();
+      try {
+        updateThread.join();
+      } catch (InterruptedException e) {
+        Thread.currentThread().interrupt();
+        throw new ThreadInterruptedException(e);
+      }
+      updateThread = null;
+    }
+  }
+  
+  /**
+   * Returns true if the update thread is alive. The update thread is alive if
+   * it has been {@link #startUpdateThread(long, String) started} and not
+   * {@link #stopUpdateThread() stopped}, as well as didn't hit an error which
+   * caused it to terminate (i.e. {@link #handleUpdateException(Throwable)}
+   * threw the exception further).
+   */
+  public synchronized boolean isUpdateThreadAlive() {
+    return updateThread != null && updateThread.isAlive();
+  }
+  
+  @Override
+  public String toString() {
+    String res = "ReplicationClient";
+    if (updateThread != null) {
+      res += " (" + updateThread.getName() + ")";
+    }
+    return res;
+  }
+  
+  /**
+   * Executes the update operation immediately, irregardess if an update thread
+   * is running or not.
+   */
+  public void updateNow() throws IOException {
+    ensureOpen();
+    updateLock.lock();
+    try {
+      doUpdate();
+    } finally {
+      updateLock.unlock();
+    }
+  }
+
+  /** Sets the {@link InfoStream} to use for logging messages. */
+  public void setInfoStream(InfoStream infoStream) {
+    if (infoStream == null) {
+      infoStream = InfoStream.NO_OUTPUT;
+    }
+    this.infoStream = infoStream;
+  }
+  
+}

Added: lucene/dev/trunk/lucene/replicator/src/java/org/apache/lucene/replicator/Replicator.java
URL: http://svn.apache.org/viewvc/lucene/dev/trunk/lucene/replicator/src/java/org/apache/lucene/replicator/Replicator.java?rev=1481804&view=auto
==============================================================================
--- lucene/dev/trunk/lucene/replicator/src/java/org/apache/lucene/replicator/Replicator.java (added)
+++ lucene/dev/trunk/lucene/replicator/src/java/org/apache/lucene/replicator/Replicator.java Mon May 13 11:57:22 2013
@@ -0,0 +1,80 @@
+package org.apache.lucene.replicator;
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+import java.io.Closeable;
+import java.io.IOException;
+import java.io.InputStream;
+
+/**
+ * An interface for replicating files. Allows a producer to
+ * {@link #publish(Revision) publish} {@link Revision}s and consumers to
+ * {@link #checkForUpdate(String) check for updates}. When a client needs to be
+ * updated, it is given a {@link SessionToken} through which it can
+ * {@link #obtainFile(String, String, String) obtain} the files of that
+ * revision. After the client has finished obtaining all the files, it should
+ * {@link #release(String) release} the given session, so that the files can be
+ * reclaimed if they are not needed anymore.
+ * <p>
+ * A client is always updated to the newest revision available. That is, if a
+ * client is on revision <em>r1</em> and revisions <em>r2</em> and <em>r3</em>
+ * were published, then when the cllient will next check for update, it will
+ * receive <em>r3</em>.
+ * 
+ * @lucene.experimental
+ */
+public interface Replicator extends Closeable {
+  
+  /**
+   * Publish a new {@link Revision} for consumption by clients. It is the
+   * caller's responsibility to verify that the revision files exist and can be
+   * read by clients. When the revision is no longer needed, it will be
+   * {@link Revision#release() released} by the replicator.
+   */
+  public void publish(Revision revision) throws IOException;
+  
+  /**
+   * Check whether the given version is up-to-date and returns a
+   * {@link SessionToken} which can be used for fetching the revision files,
+   * otherwise returns {@code null}.
+   * <p>
+   * <b>NOTE:</b> when the returned session token is no longer needed, you
+   * should call {@link #release(String)} so that the session resources can be
+   * reclaimed, including the revision files.
+   */
+  public SessionToken checkForUpdate(String currVersion) throws IOException;
+  
+  /**
+   * Notify that the specified {@link SessionToken} is no longer needed by the
+   * caller.
+   */
+  public void release(String sessionID) throws IOException;
+  
+  /**
+   * Returns an {@link InputStream} for the requested file and source in the
+   * context of the given {@link SessionToken#id session}.
+   * <p>
+   * <b>NOTE:</b> it is the caller's responsibility to close the returned
+   * stream.
+   * 
+   * @throws SessionExpiredException if the specified session has already
+   *         expired
+   */
+  public InputStream obtainFile(String sessionID, String source, String fileName) throws IOException;
+  
+}

Added: lucene/dev/trunk/lucene/replicator/src/java/org/apache/lucene/replicator/Revision.java
URL: http://svn.apache.org/viewvc/lucene/dev/trunk/lucene/replicator/src/java/org/apache/lucene/replicator/Revision.java?rev=1481804&view=auto
==============================================================================
--- lucene/dev/trunk/lucene/replicator/src/java/org/apache/lucene/replicator/Revision.java (added)
+++ lucene/dev/trunk/lucene/replicator/src/java/org/apache/lucene/replicator/Revision.java Mon May 13 11:57:22 2013
@@ -0,0 +1,75 @@
+package org.apache.lucene.replicator;
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.util.List;
+import java.util.Map;
+
+import org.apache.lucene.store.IndexInput;
+
+/**
+ * A revision comprises lists of files that come from different sources and need
+ * to be replicated together to e.g. guarantee that all resources are in sync.
+ * In most cases an application will replicate a single index, and so the
+ * revision will contain files from a single source. However, some applications
+ * may require to treat a collection of indexes as a single entity so that the
+ * files from all sources are replicated together, to guarantee consistency
+ * beween them. For example, an application which indexes facets will need to
+ * replicate both the search and taxonomy indexes together, to guarantee that
+ * they match at the client side.
+ * 
+ * @lucene.experimental
+ */
+public interface Revision extends Comparable<Revision> {
+  
+  /**
+   * Compares the revision to the given version string. Behaves like
+   * {@link Comparable#compareTo(Object)}.
+   */
+  public int compareTo(String version);
+  
+  /**
+   * Returns a string representation of the version of this revision. The
+   * version is used by {@link #compareTo(String)} as well as to
+   * serialize/deserialize revision information. Therefore it must be self
+   * descriptive as well as be able to identify one revision from another.
+   */
+  public String getVersion();
+  
+  /**
+   * Returns the files that comprise this revision, as a mapping from a source
+   * to a list of files.
+   */
+  public Map<String,List<RevisionFile>> getSourceFiles();
+  
+  /**
+   * Returns an {@link IndexInput} for the given fileName and source. It is the
+   * caller's respnsibility to close the {@link IndexInput} when it has been
+   * consumed.
+   */
+  public InputStream open(String source, String fileName) throws IOException;
+  
+  /**
+   * Called when this revision can be safely released, i.e. where there are no
+   * more references to it.
+   */
+  public void release() throws IOException;
+  
+}

Added: lucene/dev/trunk/lucene/replicator/src/java/org/apache/lucene/replicator/RevisionFile.java
URL: http://svn.apache.org/viewvc/lucene/dev/trunk/lucene/replicator/src/java/org/apache/lucene/replicator/RevisionFile.java?rev=1481804&view=auto
==============================================================================
--- lucene/dev/trunk/lucene/replicator/src/java/org/apache/lucene/replicator/RevisionFile.java (added)
+++ lucene/dev/trunk/lucene/replicator/src/java/org/apache/lucene/replicator/RevisionFile.java Mon May 13 11:57:22 2013
@@ -0,0 +1,59 @@
+package org.apache.lucene.replicator;
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+/**
+ * Describes a file in a {@link Revision}. A file has a source, which allows a
+ * single revision to contain files from multiple sources (e.g. multiple
+ * indexes).
+ * 
+ * @lucene.experimental
+ */
+public class RevisionFile {
+  
+  /** The name of the file. */
+  public final String fileName;
+  
+  /** The size of the file denoted by {@link #fileName}. */
+  public long size = -1;
+  
+  /** Constructor with the given file name. */
+  public RevisionFile(String fileName) {
+    if (fileName == null || fileName.isEmpty()) {
+      throw new IllegalArgumentException("fileName cannot be null or empty");
+    }
+    this.fileName = fileName;
+  }
+  
+  @Override
+  public boolean equals(Object obj) {
+    RevisionFile other = (RevisionFile) obj;
+    return fileName.equals(other.fileName) && size == other.size;
+  }
+  
+  @Override
+  public int hashCode() {
+    return fileName.hashCode() ^ (int) (size ^ (size >>> 32));
+  }
+  
+  @Override
+  public String toString() {
+    return "fileName=" + fileName + " size=" + size;
+  }
+  
+}

Added: lucene/dev/trunk/lucene/replicator/src/java/org/apache/lucene/replicator/SessionExpiredException.java
URL: http://svn.apache.org/viewvc/lucene/dev/trunk/lucene/replicator/src/java/org/apache/lucene/replicator/SessionExpiredException.java?rev=1481804&view=auto
==============================================================================
--- lucene/dev/trunk/lucene/replicator/src/java/org/apache/lucene/replicator/SessionExpiredException.java (added)
+++ lucene/dev/trunk/lucene/replicator/src/java/org/apache/lucene/replicator/SessionExpiredException.java Mon May 13 11:57:22 2013
@@ -0,0 +1,54 @@
+package org.apache.lucene.replicator;
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+import java.io.IOException;
+
+/**
+ * Exception indicating that a revision update session was expired due to lack
+ * of activity.
+ * 
+ * @see LocalReplicator#DEFAULT_SESSION_EXPIRATION_THRESHOLD
+ * @see LocalReplicator#setExpirationThreshold(long)
+ * 
+ * @lucene.experimental
+ */
+public class SessionExpiredException extends IOException {
+  
+  /**
+   * @see IOException#IOException(String, Throwable)
+   */
+  public SessionExpiredException(String message, Throwable cause) {
+    super(message, cause);
+  }
+  
+  /**
+   * @see IOException#IOException(String)
+   */
+  public SessionExpiredException(String message) {
+    super(message);
+  }
+  
+  /**
+   * @see IOException#IOException(Throwable)
+   */
+  public SessionExpiredException(Throwable cause) {
+    super(cause);
+  }
+  
+}

Added: lucene/dev/trunk/lucene/replicator/src/java/org/apache/lucene/replicator/SessionToken.java
URL: http://svn.apache.org/viewvc/lucene/dev/trunk/lucene/replicator/src/java/org/apache/lucene/replicator/SessionToken.java?rev=1481804&view=auto
==============================================================================
--- lucene/dev/trunk/lucene/replicator/src/java/org/apache/lucene/replicator/SessionToken.java (added)
+++ lucene/dev/trunk/lucene/replicator/src/java/org/apache/lucene/replicator/SessionToken.java Mon May 13 11:57:22 2013
@@ -0,0 +1,108 @@
+package org.apache.lucene.replicator;
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Map.Entry;
+
+/**
+ * Token for a replication session, for guaranteeing that source replicated
+ * files will be kept safe until the replication completes.
+ * 
+ * @see Replicator#checkForUpdate(String)
+ * @see Replicator#release(String)
+ * @see LocalReplicator#DEFAULT_SESSION_EXPIRATION_THRESHOLD
+ * 
+ * @lucene.experimental
+ */
+public final class SessionToken {
+  
+  /**
+   * ID of this session.
+   * Should be passed when releasing the session, thereby acknowledging the 
+   * {@link Replicator Replicator} that this session is no longer in use.
+   * @see Replicator#release(String)
+   */
+  public final String id;
+  
+  /**
+   * @see Revision#getVersion()
+   */
+  public final String version;
+  
+  /**
+   * @see Revision#getSourceFiles()
+   */
+  public final Map<String,List<RevisionFile>> sourceFiles;
+  
+  /** Constructor which deserializes from the given {@link DataInput}. */
+  public SessionToken(DataInput in) throws IOException {
+    this.id = in.readUTF();
+    this.version = in.readUTF();
+    this.sourceFiles = new HashMap<String,List<RevisionFile>>();
+    int numSources = in.readInt();
+    while (numSources > 0) {
+      String source = in.readUTF();
+      int numFiles = in.readInt();
+      List<RevisionFile> files = new ArrayList<RevisionFile>(numFiles);
+      for (int i = 0; i < numFiles; i++) {
+        String fileName = in.readUTF();
+        RevisionFile file = new RevisionFile(fileName);
+        file.size = in.readLong();
+        files.add(file);
+      }
+      this.sourceFiles.put(source, files);
+      --numSources;
+    }
+  }
+  
+  /** Constructor with the given id and revision. */
+  public SessionToken(String id, Revision revision) {
+    this.id = id;
+    this.version = revision.getVersion();
+    this.sourceFiles = revision.getSourceFiles();
+  }
+  
+  /** Serialize the token data for communication between server and client. */
+  public void serialize(DataOutput out) throws IOException {
+    out.writeUTF(id);
+    out.writeUTF(version);
+    out.writeInt(sourceFiles.size());
+    for (Entry<String,List<RevisionFile>> e : sourceFiles.entrySet()) {
+      out.writeUTF(e.getKey());
+      List<RevisionFile> files = e.getValue();
+      out.writeInt(files.size());
+      for (RevisionFile file : files) {
+        out.writeUTF(file.fileName);
+        out.writeLong(file.size);
+      }
+    }
+  }
+  
+  @Override
+  public String toString() {
+    return "id=" + id + " version=" + version + " files=" + sourceFiles;
+  }
+  
+}
\ No newline at end of file

Added: lucene/dev/trunk/lucene/replicator/src/java/org/apache/lucene/replicator/http/HttpClientBase.java
URL: http://svn.apache.org/viewvc/lucene/dev/trunk/lucene/replicator/src/java/org/apache/lucene/replicator/http/HttpClientBase.java?rev=1481804&view=auto
==============================================================================
--- lucene/dev/trunk/lucene/replicator/src/java/org/apache/lucene/replicator/http/HttpClientBase.java (added)
+++ lucene/dev/trunk/lucene/replicator/src/java/org/apache/lucene/replicator/http/HttpClientBase.java Mon May 13 11:57:22 2013
@@ -0,0 +1,297 @@
+package org.apache.lucene.replicator.http;
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+import java.io.Closeable;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.ObjectInputStream;
+import java.io.UnsupportedEncodingException;
+import java.net.URLEncoder;
+import java.util.concurrent.Callable;
+
+import org.apache.http.HttpEntity;
+import org.apache.http.HttpResponse;
+import org.apache.http.HttpStatus;
+import org.apache.http.StatusLine;
+import org.apache.http.client.HttpClient;
+import org.apache.http.client.methods.HttpGet;
+import org.apache.http.client.methods.HttpPost;
+import org.apache.http.conn.ClientConnectionManager;
+import org.apache.http.impl.client.DefaultHttpClient;
+import org.apache.http.params.HttpConnectionParams;
+import org.apache.http.util.EntityUtils;
+import org.apache.lucene.store.AlreadyClosedException;
+
+/**
+ * Base class for Http clients.
+ * 
+ * @lucene.experimental
+ * */
+public abstract class HttpClientBase implements Closeable {
+  
+  /**
+   * Default connection timeout for this client, in milliseconds.
+   * 
+   * @see #setConnectionTimeout(int)
+   */
+  public static final int DEFAULT_CONNECTION_TIMEOUT = 1000;
+  
+  /**
+   * Default socket timeout for this client, in milliseconds.
+   * 
+   * @see #setSoTimeout(int)
+   */
+  public static final int DEFAULT_SO_TIMEOUT = 60000;
+  
+  // TODO compression?
+  
+  /** The URL stting to execute requests against. */
+  protected final String url;
+  
+  private volatile boolean closed = false;
+  
+  private final HttpClient httpc;
+  
+  /**
+   * @param conMgr connection manager to use for this http client.
+   *        <b>NOTE:</b>The provided {@link ClientConnectionManager} will not be
+   *        {@link ClientConnectionManager#shutdown()} by this class.
+   */
+  protected HttpClientBase(String host, int port, String path, ClientConnectionManager conMgr) {
+    url = normalizedURL(host, port, path);
+    httpc = new DefaultHttpClient(conMgr);
+    setConnectionTimeout(DEFAULT_CONNECTION_TIMEOUT);
+    setSoTimeout(DEFAULT_SO_TIMEOUT);
+  }
+  
+  /**
+   * Set the connection timeout for this client, in milliseconds. This setting
+   * is used to modify {@link HttpConnectionParams#setConnectionTimeout}.
+   * 
+   * @param timeout timeout to set, in millisecopnds
+   */
+  public void setConnectionTimeout(int timeout) {
+    HttpConnectionParams.setConnectionTimeout(httpc.getParams(), timeout);
+  }
+  
+  /**
+   * Set the socket timeout for this client, in milliseconds. This setting
+   * is used to modify {@link HttpConnectionParams#setSoTimeout}.
+   * 
+   * @param timeout timeout to set, in millisecopnds
+   */
+  public void setSoTimeout(int timeout) {
+    HttpConnectionParams.setSoTimeout(httpc.getParams(), timeout);
+  }
+  
+  /** Throws {@link AlreadyClosedException} if this client is already closed. */
+  protected final void ensureOpen() throws AlreadyClosedException {
+    if (closed) {
+      throw new AlreadyClosedException("HttpClient already closed");
+    }
+  }
+  
+  /**
+   * Create a URL out of the given parameters, translate an empty/null path to '/'
+   */
+  private static String normalizedURL(String host, int port, String path) {
+    if (path == null || path.length() == 0) {
+      path = "/";
+    }
+    return "http://" + host + ":" + port + path;
+  }
+  
+  /**
+   * <b>Internal:</b> response status after invocation, and in case or error attempt to read the 
+   * exception sent by the server. 
+   */
+  protected void verifyStatus(HttpResponse response) throws IOException {
+    StatusLine statusLine = response.getStatusLine();
+    if (statusLine.getStatusCode() != HttpStatus.SC_OK) {
+      throwKnownError(response, statusLine); 
+    }
+  }
+  
+  protected void throwKnownError(HttpResponse response, StatusLine statusLine) throws IOException {
+    ObjectInputStream in = null;
+    try {
+      in = new ObjectInputStream(response.getEntity().getContent());
+    } catch (Exception e) {
+      // the response stream is not an exception - could be an error in servlet.init().
+      throw new RuntimeException("Uknown error: " + statusLine);
+    }
+    
+    Throwable t;
+    try {
+      t = (Throwable) in.readObject();
+    } catch (Exception e) { 
+      //not likely
+      throw new RuntimeException("Failed to read exception object: " + statusLine, e);
+    } finally {
+      in.close();
+    }
+    if (t instanceof IOException) {
+      throw (IOException) t;
+    }
+    if (t instanceof RuntimeException) {
+      throw (RuntimeException) t;
+    }
+    throw new RuntimeException("unknown exception "+statusLine,t);
+  }
+  
+  /**
+   * <b>internal:</b> execute a request and return its result
+   * The <code>params</code> argument is treated as: name1,value1,name2,value2,...
+   */
+  protected HttpResponse executePOST(String request, HttpEntity entity, String... params) throws IOException {
+    ensureOpen();
+    HttpPost m = new HttpPost(queryString(request, params));
+    m.setEntity(entity);
+    HttpResponse response = httpc.execute(m);
+    verifyStatus(response);
+    return response;
+  }
+  
+  /**
+   * <b>internal:</b> execute a request and return its result
+   * The <code>params</code> argument is treated as: name1,value1,name2,value2,...
+   */
+  protected HttpResponse executeGET(String request, String... params) throws IOException {
+    ensureOpen();
+    HttpGet m = new HttpGet(queryString(request, params));
+    HttpResponse response = httpc.execute(m);
+    verifyStatus(response);
+    return response;
+  }
+  
+  private String queryString(String request, String... params) throws UnsupportedEncodingException {
+    StringBuilder query = new StringBuilder(url).append('/').append(request).append('?');
+    if (params != null) {
+      for (int i = 0; i < params.length; i += 2) {
+        query.append(params[i]).append('=').append(URLEncoder.encode(params[i+1], "UTF8")).append('&');
+      }
+    }
+    return query.substring(0, query.length() - 1);
+  }
+  
+  /** Internal utility: input stream of the provided response */
+  public InputStream responseInputStream(HttpResponse response) throws IOException {
+    return responseInputStream(response, false);
+  }
+  
+  // TODO: can we simplify this Consuming !?!?!?
+  /**
+   * Internal utility: input stream of the provided response, which optionally 
+   * consumes the response's resources when the input stream is exhausted.
+   */
+  public InputStream responseInputStream(HttpResponse response, boolean consume) throws IOException {
+    final HttpEntity entity = response.getEntity();
+    final InputStream in = entity.getContent();
+    if (!consume) {
+      return in;
+    }
+    return new InputStream() {
+      private boolean consumed = false;
+      @Override
+      public int read() throws IOException {
+        final int res = in.read();
+        consume(res);
+        return res;
+      }
+      @Override
+      public void close() throws IOException {
+        super.close();
+        consume(-1);
+      }
+      @Override
+      public int read(byte[] b) throws IOException {
+        final int res = super.read(b);
+        consume(res);
+        return res;
+      }
+      @Override
+      public int read(byte[] b, int off, int len) throws IOException {
+        final int res = super.read(b, off, len);
+        consume(res);
+        return res;
+      }
+      private void consume(int minusOne) {
+        if (!consumed && minusOne==-1) {
+          try {
+            EntityUtils.consume(entity);
+          } catch (Exception e) {
+            // ignored on purpose
+          }
+          consumed = true;
+        }
+      }
+    };
+  }
+  
+  /**
+   * Returns true iff this instance was {@link #close() closed}, otherwise
+   * returns false. Note that if you override {@link #close()}, you must call
+   * {@code super.close()}, in order for this instance to be properly closed.
+   */
+  protected final boolean isClosed() {
+    return closed;
+  }
+  
+  /**
+   * Same as {@link #doAction(HttpResponse, boolean, Callable)} but always do consume at the end.
+   */
+  protected <T> T doAction(HttpResponse response, Callable<T> call) throws IOException {
+    return doAction(response, true, call);
+  }
+  
+  /**
+   * Do a specific action and validate after the action that the status is still OK, 
+   * and if not, attempt to extract the actual server side exception. Optionally
+   * release the response at exit, depending on <code>consume</code> parameter.
+   */
+  protected <T> T doAction(HttpResponse response, boolean consume, Callable<T> call) throws IOException {
+    IOException error = null;
+    try {
+      return call.call();
+    } catch (IOException e) {
+      error = e;
+    } catch (Exception e) {
+      error = new IOException(e);
+    } finally {
+      try {
+        verifyStatus(response);
+      } finally {
+        if (consume) {
+          try {
+            EntityUtils.consume(response.getEntity());
+          } catch (Exception e) {
+            // ignoring on purpose
+          }
+        }
+      }
+    }
+    throw error; // should not get here
+  }
+  
+  @Override
+  public void close() throws IOException {
+    closed = true;
+  }
+  
+}

Added: lucene/dev/trunk/lucene/replicator/src/java/org/apache/lucene/replicator/http/HttpReplicator.java
URL: http://svn.apache.org/viewvc/lucene/dev/trunk/lucene/replicator/src/java/org/apache/lucene/replicator/http/HttpReplicator.java?rev=1481804&view=auto
==============================================================================
--- lucene/dev/trunk/lucene/replicator/src/java/org/apache/lucene/replicator/http/HttpReplicator.java (added)
+++ lucene/dev/trunk/lucene/replicator/src/java/org/apache/lucene/replicator/http/HttpReplicator.java Mon May 13 11:57:22 2013
@@ -0,0 +1,105 @@
+package org.apache.lucene.replicator.http;
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+import java.io.DataInputStream;
+import java.io.IOException;
+import java.io.InputStream;
+import java.util.concurrent.Callable;
+
+import org.apache.http.HttpResponse;
+import org.apache.http.conn.ClientConnectionManager;
+import org.apache.lucene.replicator.Replicator;
+import org.apache.lucene.replicator.Revision;
+import org.apache.lucene.replicator.SessionToken;
+import org.apache.lucene.replicator.http.ReplicationService.ReplicationAction;
+
+/**
+ * An HTTP implementation of {@link Replicator}. Assumes the API supported by
+ * {@link ReplicationService}.
+ * 
+ * @lucene.experimental
+ */
+public class HttpReplicator extends HttpClientBase implements Replicator {
+  
+  /** Construct with specified connection manager. */
+  public HttpReplicator(String host, int port, String path, ClientConnectionManager conMgr) {
+    super(host, port, path, conMgr);
+  }
+  
+  @Override
+  public SessionToken checkForUpdate(String currVersion) throws IOException {
+    String[] params = null;
+    if (currVersion != null) {
+      params = new String[] { ReplicationService.REPLICATE_VERSION_PARAM, currVersion };
+    }
+    final HttpResponse response = executeGET(ReplicationAction.UPDATE.name(), params);
+    return doAction(response, new Callable<SessionToken>() {
+      @Override
+      public SessionToken call() throws Exception {
+        final DataInputStream dis = new DataInputStream(responseInputStream(response));
+        try {
+          if (dis.readByte() == 0) {
+            return null;
+          } else {
+            return new SessionToken(dis);
+          }
+        } finally {
+          dis.close();
+        }
+      }
+    });
+  }
+  
+  @Override
+  public InputStream obtainFile(String sessionID, String source, String fileName) throws IOException {
+    String[] params = new String[] {
+        ReplicationService.REPLICATE_SESSION_ID_PARAM, sessionID,
+        ReplicationService.REPLICATE_SOURCE_PARAM, source,
+        ReplicationService.REPLICATE_FILENAME_PARAM, fileName,
+    };
+    final HttpResponse response = executeGET(ReplicationAction.OBTAIN.name(), params);
+    return doAction(response, false, new Callable<InputStream>() {
+      @Override
+      public InputStream call() throws Exception {
+        return responseInputStream(response,true);
+      }
+    });
+  }
+  
+  @Override
+  public void publish(Revision revision) throws IOException {
+    throw new UnsupportedOperationException(
+        "this replicator implementation does not support remote publishing of revisions");
+  }
+  
+  @Override
+  public void release(String sessionID) throws IOException {
+    String[] params = new String[] {
+        ReplicationService.REPLICATE_SESSION_ID_PARAM, sessionID
+    };
+    final HttpResponse response = executeGET(ReplicationAction.RELEASE.name(), params);
+    doAction(response, new Callable<Object>() {
+      @Override
+      public Object call() throws Exception {
+        return null; // do not remove this call: as it is still validating for us!
+      }
+    });
+  }
+  
+}

Added: lucene/dev/trunk/lucene/replicator/src/java/org/apache/lucene/replicator/http/ReplicationService.java
URL: http://svn.apache.org/viewvc/lucene/dev/trunk/lucene/replicator/src/java/org/apache/lucene/replicator/http/ReplicationService.java?rev=1481804&view=auto
==============================================================================
--- lucene/dev/trunk/lucene/replicator/src/java/org/apache/lucene/replicator/http/ReplicationService.java (added)
+++ lucene/dev/trunk/lucene/replicator/src/java/org/apache/lucene/replicator/http/ReplicationService.java Mon May 13 11:57:22 2013
@@ -0,0 +1,198 @@
+package org.apache.lucene.replicator.http;
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+import java.io.DataOutputStream;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.ObjectOutputStream;
+import java.io.OutputStream;
+import java.util.ArrayList;
+import java.util.Locale;
+import java.util.Map;
+import java.util.StringTokenizer;
+
+import javax.servlet.ServletException;
+import javax.servlet.ServletOutputStream;
+import javax.servlet.http.HttpServletRequest;
+import javax.servlet.http.HttpServletResponse;
+
+import org.apache.http.HttpStatus;
+import org.apache.lucene.replicator.Replicator;
+import org.apache.lucene.replicator.SessionToken;
+
+/**
+ * A server-side service for handling replication requests. The service assumes
+ * requests are sent in the format
+ * <code>/&lt;context&gt;/&lt;shard&gt;/&lt;action&gt;</code> where
+ * <ul>
+ * <li>{@code context} is the servlet context, e.g. {@link #REPLICATION_CONTEXT}
+ * <li>{@code shard} is the ID of the shard, e.g. "s1"
+ * <li>{@code action} is one of {@link ReplicationAction} values
+ * </ul>
+ * For example, to check whether there are revision updates for shard "s1" you
+ * should send the request: <code>http://host:port/replicate/s1/update</code>.
+ * <p>
+ * This service is written like a servlet, and
+ * {@link #perform(HttpServletRequest, HttpServletResponse)} takes servlet
+ * request and response accordingly, so it is quite easy to embed in your
+ * application's servlet.
+ * 
+ * @lucene.experimental
+ */
+public class ReplicationService {
+  
+  /** Actions supported by the {@link ReplicationService}. */
+  public enum ReplicationAction {
+    OBTAIN, RELEASE, UPDATE
+  }
+  
+  /** The context path for the servlet. */
+  public static final String REPLICATION_CONTEXT = "/replicate";
+  
+  /** Request parameter name for providing the revision version. */
+  public final static String REPLICATE_VERSION_PARAM = "version";
+  
+  /** Request parameter name for providing a session ID. */
+  public final static String REPLICATE_SESSION_ID_PARAM = "sessionid";
+  
+  /** Request parameter name for providing the file's source. */
+  public final static String REPLICATE_SOURCE_PARAM = "source";
+  
+  /** Request parameter name for providing the file's name. */
+  public final static String REPLICATE_FILENAME_PARAM = "filename";
+  
+  private static final int SHARD_IDX = 0, ACTION_IDX = 1;
+  
+  private final Map<String,Replicator> replicators;
+  
+  public ReplicationService(Map<String,Replicator> replicators) {
+    super();
+    this.replicators = replicators;
+  }
+  
+  /**
+   * Returns the path elements that were given in the servlet request, excluding
+   * the servlet's action context.
+   */
+  private String[] getPathElements(HttpServletRequest req) {
+    String path = req.getServletPath();
+    String pathInfo = req.getPathInfo();
+    if (pathInfo != null) {
+      path += pathInfo;
+    }
+    int actionLen = REPLICATION_CONTEXT.length();
+    int startIdx = actionLen;
+    if (path.length() > actionLen && path.charAt(actionLen) == '/') {
+      ++startIdx;
+    }
+    
+    // split the string on '/' and remove any empty elements. This is better
+    // than using String.split() since the latter may return empty elements in
+    // the array
+    StringTokenizer stok = new StringTokenizer(path.substring(startIdx), "/");
+    ArrayList<String> elements = new ArrayList<String>();
+    while (stok.hasMoreTokens()) {
+      elements.add(stok.nextToken());
+    }
+    return elements.toArray(new String[0]);
+  }
+  
+  private static String extractRequestParam(HttpServletRequest req, String paramName) throws ServletException {
+    String param = req.getParameter(paramName);
+    if (param == null) {
+      throw new ServletException("Missing mandatory parameter: " + paramName);
+    }
+    return param;
+  }
+  
+  private static void copy(InputStream in, OutputStream out) throws IOException {
+    byte[] buf = new byte[16384];
+    int numRead;
+    while ((numRead = in.read(buf)) != -1) {
+      out.write(buf, 0, numRead);
+    }
+  }
+  
+  /** Executes the replication task. */
+  public void perform(HttpServletRequest req, HttpServletResponse resp) throws ServletException, IOException {
+    String[] pathElements = getPathElements(req);
+    
+    if (pathElements.length != 2) {
+      throw new ServletException("invalid path, must contain shard ID and action, e.g. */s1/update");
+    }
+    
+    final ReplicationAction action;
+    try {
+      action = ReplicationAction.valueOf(pathElements[ACTION_IDX].toUpperCase(Locale.ENGLISH));
+    } catch (IllegalArgumentException e) {
+      throw new ServletException("Unsupported action provided: " + pathElements[ACTION_IDX]);
+    }
+    
+    final Replicator replicator = replicators.get(pathElements[SHARD_IDX]);
+    if (replicator == null) {
+      throw new ServletException("unrecognized shard ID " + pathElements[SHARD_IDX]);
+    }
+    
+    ServletOutputStream resOut = resp.getOutputStream();
+    try {
+      switch (action) {
+        case OBTAIN:
+          final String sessionID = extractRequestParam(req, REPLICATE_SESSION_ID_PARAM);
+          final String fileName = extractRequestParam(req, REPLICATE_FILENAME_PARAM);
+          final String source = extractRequestParam(req, REPLICATE_SOURCE_PARAM);
+          InputStream in = replicator.obtainFile(sessionID, source, fileName);
+          try {
+            copy(in, resOut);
+          } finally {
+            in.close();
+          }
+          break;
+        case RELEASE:
+          replicator.release(extractRequestParam(req, REPLICATE_SESSION_ID_PARAM));
+          break;
+        case UPDATE:
+          String currVersion = req.getParameter(REPLICATE_VERSION_PARAM);
+          SessionToken token = replicator.checkForUpdate(currVersion);
+          if (token == null) {
+            resOut.write(0); // marker for null token
+          } else {
+            resOut.write(1); // marker for null token
+            token.serialize(new DataOutputStream(resOut));
+          }
+          break;
+      }
+    } catch (Exception e) {
+      resp.setStatus(HttpStatus.SC_INTERNAL_SERVER_ERROR); // propagate the failure
+      try {
+        /*
+         * Note: it is assumed that "identified exceptions" are thrown before
+         * anything was written to the stream.
+         */
+        ObjectOutputStream oos = new ObjectOutputStream(resOut);
+        oos.writeObject(e);
+        oos.flush();
+      } catch (Exception e2) {
+        throw new IOException("Could not serialize", e2);
+      }
+    } finally {
+      resp.flushBuffer();
+    }
+  }
+  
+}

Added: lucene/dev/trunk/lucene/replicator/src/java/org/apache/lucene/replicator/http/package.html
URL: http://svn.apache.org/viewvc/lucene/dev/trunk/lucene/replicator/src/java/org/apache/lucene/replicator/http/package.html?rev=1481804&view=auto
==============================================================================
--- lucene/dev/trunk/lucene/replicator/src/java/org/apache/lucene/replicator/http/package.html (added)
+++ lucene/dev/trunk/lucene/replicator/src/java/org/apache/lucene/replicator/http/package.html Mon May 13 11:57:22 2013
@@ -0,0 +1,28 @@
+<html>
+
+<!-- 
+ Licensed to the Apache Software Foundation (ASF) under one or more
+ contributor license agreements.  See the NOTICE file distributed with
+ this work for additional information regarding copyright ownership.
+ The ASF licenses this file to You under the Apache License, Version 2.0
+ (the "License"); you may not use this file except in compliance with
+ the License.  You may obtain a copy of the License at
+
+     http://www.apache.org/licenses/LICENSE-2.0
+
+ Unless required by applicable law or agreed to in writing, software
+ distributed under the License is distributed on an "AS IS" BASIS,
+ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ See the License for the specific language governing permissions and
+ limitations under the License.
+-->
+
+<head>
+<title>HTTP replication implementation</title>
+</head>
+
+<body>
+<h1>HTTP replication implementation</h1>
+</body>
+
+</html>
\ No newline at end of file

Added: lucene/dev/trunk/lucene/replicator/src/java/org/apache/lucene/replicator/package.html
URL: http://svn.apache.org/viewvc/lucene/dev/trunk/lucene/replicator/src/java/org/apache/lucene/replicator/package.html?rev=1481804&view=auto
==============================================================================
--- lucene/dev/trunk/lucene/replicator/src/java/org/apache/lucene/replicator/package.html (added)
+++ lucene/dev/trunk/lucene/replicator/src/java/org/apache/lucene/replicator/package.html Mon May 13 11:57:22 2013
@@ -0,0 +1,79 @@
+<html>
+
+<!-- 
+ Licensed to the Apache Software Foundation (ASF) under one or more
+ contributor license agreements.  See the NOTICE file distributed with
+ this work for additional information regarding copyright ownership.
+ The ASF licenses this file to You under the Apache License, Version 2.0
+ (the "License"); you may not use this file except in compliance with
+ the License.  You may obtain a copy of the License at
+
+     http://www.apache.org/licenses/LICENSE-2.0
+
+ Unless required by applicable law or agreed to in writing, software
+ distributed under the License is distributed on an "AS IS" BASIS,
+ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ See the License for the specific language governing permissions and
+ limitations under the License.
+-->
+
+<head>
+<title>Files replication framework</title>
+</head>
+
+<body>
+<h1>Files replication framework</h1>
+
+	The
+	<a href="Replicator.html">Replicator</a> allows replicating files between a server and client(s). Producers publish
+	<a href="Revision.html">revisions</a> and consumers update to the latest revision available.
+	<a href="ReplicationClient.html">ReplicationClient</a> is a helper utility for performing the update operation. It can
+	be invoked either
+	<a href="ReplicationClient.html#updateNow()">manually</a> or periodically by
+	<a href="ReplicationClient.html#startUpdateThread(long, java.lang.String)">starting an update thread</a>.
+	<a href="http/HttpReplicator.html">HttpReplicator</a> can be used to replicate revisions by consumers that reside on
+	a different node than the producer.
+
+	<p />
+	The replication framework supports replicating any type of files, with built-in support for a single search index as
+	well as an index and taxonomy pair. For a single index, the application should publish an
+	<a href="IndexRevision.html">IndexRevision</a> and set
+	<a href="IndexReplicationHandler.html">IndexReplicationHandler</a> on the client. For an index and taxonomy pair, the
+	application should publish an <a href="IndexAndTaxonomyRevision.html">IndexAndTaxonomyRevision</a> and set 
+	<a href="IndexAndTaxonomyReplicationHandler.html">IndexAndTaxonomyReplicationHandler</a> on the client.
+
+	<p />
+	When the replication client detects that there is a newer revision available, it copies the files of the revision and
+	then invokes the handler to complete the operation (e.g. copy the files to the index directory, fsync them, reopen an
+	index reader etc.). By default, only files that do not exist in the handler's
+	<a href="ReplicationClient.ReplicationHandler.html#currentRevisionFiles()">current revision files</a> are copied,
+	however this can be overridden by extending the client.
+
+	<p />
+	An example usage of the Replicator:
+	
+<pre class="prettyprint lang-java">
+// ++++++++++++++ SERVER SIDE ++++++++++++++ // 
+IndexWriter publishWriter; // the writer used for indexing
+Replicator replicator = new LocalReplicator();
+replicator.publish(new IndexRevision(publishWriter));
+
+// ++++++++++++++ CLIENT SIDE ++++++++++++++ // 
+// either LocalReplictor, or HttpReplicator if client and server are on different nodes
+Replicator replicator;
+
+// callback invoked after handler finished handling the revision and e.g. can reopen the reader.
+Callable&lt;Boolean&gt; callback = null; // can also be null if no callback is needed
+ReplicationHandler handler = new IndexReplicationHandler(indexDir, callback);
+SourceDirectoryFactory factory = new PerSessionDirectoryFactory(workDir);
+ReplicationClient client = new ReplicationClient(replicator, handler, factory);
+
+// invoke client manually
+client.updateNow();
+
+// or, periodically
+client.startUpdateThread(100); // check for update every 100 milliseconds
+</pre>
+
+</body>
+</html>
\ No newline at end of file

Added: lucene/dev/trunk/lucene/replicator/src/java/overview.html
URL: http://svn.apache.org/viewvc/lucene/dev/trunk/lucene/replicator/src/java/overview.html?rev=1481804&view=auto
==============================================================================
--- lucene/dev/trunk/lucene/replicator/src/java/overview.html (added)
+++ lucene/dev/trunk/lucene/replicator/src/java/overview.html Mon May 13 11:57:22 2013
@@ -0,0 +1,26 @@
+<!--
+ Licensed to the Apache Software Foundation (ASF) under one or more
+ contributor license agreements.  See the NOTICE file distributed with
+ this work for additional information regarding copyright ownership.
+ The ASF licenses this file to You under the Apache License, Version 2.0
+ (the "License"); you may not use this file except in compliance with
+ the License.  You may obtain a copy of the License at
+
+     http://www.apache.org/licenses/LICENSE-2.0
+
+ Unless required by applicable law or agreed to in writing, software
+ distributed under the License is distributed on an "AS IS" BASIS,
+ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ See the License for the specific language governing permissions and
+ limitations under the License.
+-->
+<html>
+  <head>
+    <title>
+      replicator
+    </title>
+  </head>
+  <body>
+  Provides index files replication capabilities.
+  </body>
+</html>

Added: lucene/dev/trunk/lucene/replicator/src/test/org/apache/lucene/replicator/IndexAndTaxonomyReplicationClientTest.java
URL: http://svn.apache.org/viewvc/lucene/dev/trunk/lucene/replicator/src/test/org/apache/lucene/replicator/IndexAndTaxonomyReplicationClientTest.java?rev=1481804&view=auto
==============================================================================
--- lucene/dev/trunk/lucene/replicator/src/test/org/apache/lucene/replicator/IndexAndTaxonomyReplicationClientTest.java (added)
+++ lucene/dev/trunk/lucene/replicator/src/test/org/apache/lucene/replicator/IndexAndTaxonomyReplicationClientTest.java Mon May 13 11:57:22 2013
@@ -0,0 +1,444 @@
+package org.apache.lucene.replicator;
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+import java.io.Closeable;
+import java.io.File;
+import java.io.IOException;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.concurrent.Callable;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import org.apache.lucene.document.Document;
+import org.apache.lucene.facet.index.FacetFields;
+import org.apache.lucene.facet.params.FacetIndexingParams;
+import org.apache.lucene.facet.params.FacetSearchParams;
+import org.apache.lucene.facet.search.CountFacetRequest;
+import org.apache.lucene.facet.search.DrillDownQuery;
+import org.apache.lucene.facet.search.FacetsCollector;
+import org.apache.lucene.facet.taxonomy.CategoryPath;
+import org.apache.lucene.facet.taxonomy.TaxonomyReader;
+import org.apache.lucene.facet.taxonomy.TaxonomyWriter;
+import org.apache.lucene.facet.taxonomy.directory.DirectoryTaxonomyReader;
+import org.apache.lucene.facet.taxonomy.directory.DirectoryTaxonomyWriter;
+import org.apache.lucene.index.DirectoryReader;
+import org.apache.lucene.index.IndexWriter;
+import org.apache.lucene.index.IndexWriterConfig;
+import org.apache.lucene.index.SnapshotDeletionPolicy;
+import org.apache.lucene.replicator.IndexAndTaxonomyRevision.SnapshotDirectoryTaxonomyWriter;
+import org.apache.lucene.replicator.ReplicationClient.ReplicationHandler;
+import org.apache.lucene.replicator.ReplicationClient.SourceDirectoryFactory;
+import org.apache.lucene.search.IndexSearcher;
+import org.apache.lucene.search.MatchAllDocsQuery;
+import org.apache.lucene.search.TopDocs;
+import org.apache.lucene.store.Directory;
+import org.apache.lucene.store.MockDirectoryWrapper;
+import org.apache.lucene.util.IOUtils;
+import org.apache.lucene.util.ThreadInterruptedException;
+import org.apache.lucene.util._TestUtil;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+
+public class IndexAndTaxonomyReplicationClientTest extends ReplicatorTestCase {
+  
+  private static class IndexAndTaxonomyReadyCallback implements Callable<Boolean>, Closeable {
+    
+    private final Directory indexDir, taxoDir;
+    private DirectoryReader indexReader;
+    private DirectoryTaxonomyReader taxoReader;
+    private long lastIndexGeneration = -1;
+    
+    public IndexAndTaxonomyReadyCallback(Directory indexDir, Directory taxoDir) throws IOException {
+      this.indexDir = indexDir;
+      this.taxoDir = taxoDir;
+      if (DirectoryReader.indexExists(indexDir)) {
+        indexReader = DirectoryReader.open(indexDir);
+        lastIndexGeneration = indexReader.getIndexCommit().getGeneration();
+        taxoReader = new DirectoryTaxonomyReader(taxoDir);
+      }
+    }
+    
+    @Override
+    public Boolean call() throws Exception {
+      if (indexReader == null) {
+        indexReader = DirectoryReader.open(indexDir);
+        lastIndexGeneration = indexReader.getIndexCommit().getGeneration();
+        taxoReader = new DirectoryTaxonomyReader(taxoDir);
+      } else {
+        // verify search index
+        DirectoryReader newReader = DirectoryReader.openIfChanged(indexReader);
+        assertNotNull("should not have reached here if no changes were made to the index", newReader);
+        long newGeneration = newReader.getIndexCommit().getGeneration();
+        assertTrue("expected newer generation; current=" + lastIndexGeneration + " new=" + newGeneration, newGeneration > lastIndexGeneration);
+        indexReader.close();
+        indexReader = newReader;
+        lastIndexGeneration = newGeneration;
+        _TestUtil.checkIndex(indexDir);
+        
+        // verify taxonomy index
+        DirectoryTaxonomyReader newTaxoReader = TaxonomyReader.openIfChanged(taxoReader);
+        if (newTaxoReader != null) {
+          taxoReader.close();
+          taxoReader = newTaxoReader;
+        }
+        _TestUtil.checkIndex(taxoDir);
+        
+        // verify faceted search
+        int id = Integer.parseInt(indexReader.getIndexCommit().getUserData().get(VERSION_ID), 16);
+        CategoryPath cp = new CategoryPath("A", Integer.toString(id, 16));
+        IndexSearcher searcher = new IndexSearcher(indexReader);
+        FacetsCollector fc = FacetsCollector.create(new FacetSearchParams(new CountFacetRequest(cp, 10)), indexReader, taxoReader);
+        searcher.search(new MatchAllDocsQuery(), fc);
+        assertEquals(1, (int) fc.getFacetResults().get(0).getFacetResultNode().value);
+        
+        DrillDownQuery drillDown = new DrillDownQuery(FacetIndexingParams.DEFAULT);
+        drillDown.add(cp);
+        TopDocs docs = searcher.search(drillDown, 10);
+        assertEquals(1, docs.totalHits);
+      }
+      return null;
+    }
+    
+    @Override
+    public void close() throws IOException {
+      IOUtils.close(indexReader, taxoReader);
+    }
+  }
+  
+  private Directory publishIndexDir, publishTaxoDir;
+  private MockDirectoryWrapper handlerIndexDir, handlerTaxoDir;
+  private Replicator replicator;
+  private SourceDirectoryFactory sourceDirFactory;
+  private ReplicationClient client;
+  private ReplicationHandler handler;
+  private IndexWriter publishIndexWriter;
+  private SnapshotDirectoryTaxonomyWriter publishTaxoWriter;
+  private IndexAndTaxonomyReadyCallback callback;
+  private File clientWorkDir;
+  
+  private static final String VERSION_ID = "version";
+  
+  private void assertHandlerRevision(int expectedID, Directory dir) throws IOException {
+    // loop as long as client is alive. test-framework will terminate us if
+    // there's a serious bug, e.g. client doesn't really update. otherwise,
+    // introducing timeouts is not good, can easily lead to false positives.
+    while (client.isUpdateThreadAlive()) {
+      // give client a chance to update
+      try {
+        Thread.sleep(100);
+      } catch (InterruptedException e) {
+        throw new ThreadInterruptedException(e);
+      }
+      
+      try {
+        DirectoryReader reader = DirectoryReader.open(dir);
+        try {
+          int handlerID = Integer.parseInt(reader.getIndexCommit().getUserData().get(VERSION_ID), 16);
+          if (expectedID == handlerID) {
+            return;
+          }
+        } finally {
+          reader.close();
+        }
+      } catch (Exception e) {
+        // we can hit IndexNotFoundException or e.g. EOFException (on
+        // segments_N) because it is being copied at the same time it is read by
+        // DirectoryReader.open().
+      }
+    }
+  }
+  
+  private Revision createRevision(final int id) throws IOException {
+    publishIndexWriter.addDocument(newDocument(publishTaxoWriter, id));
+    publishIndexWriter.setCommitData(new HashMap<String, String>() {{
+      put(VERSION_ID, Integer.toString(id, 16));
+    }});
+    publishIndexWriter.commit();
+    publishTaxoWriter.commit();
+    return new IndexAndTaxonomyRevision(publishIndexWriter, publishTaxoWriter);
+  }
+  
+  private Document newDocument(TaxonomyWriter taxoWriter, int id) throws IOException {
+    Document doc = new Document();
+    FacetFields facetFields = new FacetFields(taxoWriter);
+    facetFields.addFields(doc, Collections.singleton(new CategoryPath("A", Integer.toString(id, 16))));
+    return doc;
+  }
+  
+  @Override
+  @Before
+  public void setUp() throws Exception {
+    super.setUp();
+    publishIndexDir = newDirectory();
+    publishTaxoDir = newDirectory();
+    handlerIndexDir = newMockDirectory();
+    handlerTaxoDir = newMockDirectory();
+    clientWorkDir = _TestUtil.getTempDir("replicationClientTest");
+    sourceDirFactory = new PerSessionDirectoryFactory(clientWorkDir);
+    replicator = new LocalReplicator();
+    callback = new IndexAndTaxonomyReadyCallback(handlerIndexDir, handlerTaxoDir);
+    handler = new IndexAndTaxonomyReplicationHandler(handlerIndexDir, handlerTaxoDir, callback);
+    client = new ReplicationClient(replicator, handler, sourceDirFactory);
+    
+    IndexWriterConfig conf = newIndexWriterConfig(TEST_VERSION_CURRENT, null);
+    conf.setIndexDeletionPolicy(new SnapshotDeletionPolicy(conf.getIndexDeletionPolicy()));
+    publishIndexWriter = new IndexWriter(publishIndexDir, conf);
+    publishTaxoWriter = new SnapshotDirectoryTaxonomyWriter(publishTaxoDir);
+  }
+  
+  @After
+  @Override
+  public void tearDown() throws Exception {
+    IOUtils.close(client, callback, publishIndexWriter, publishTaxoWriter, replicator, publishIndexDir, publishTaxoDir,
+        handlerIndexDir, handlerTaxoDir);
+    super.tearDown();
+  }
+  
+  @Test
+  public void testNoUpdateThread() throws Exception {
+    assertNull("no version expected at start", handler.currentVersion());
+    
+    // Callback validates the replicated index
+    replicator.publish(createRevision(1));
+    client.updateNow();
+    
+    // make sure updating twice, when in fact there's nothing to update, works
+    client.updateNow();
+    
+    replicator.publish(createRevision(2));
+    client.updateNow();
+    
+    // Publish two revisions without update, handler should be upgraded to latest
+    replicator.publish(createRevision(3));
+    replicator.publish(createRevision(4));
+    client.updateNow();
+  }
+  
+  @Test
+  public void testRestart() throws Exception {
+    replicator.publish(createRevision(1));
+    client.updateNow();
+    
+    replicator.publish(createRevision(2));
+    client.updateNow();
+    
+    client.stopUpdateThread();
+    client.close();
+    client = new ReplicationClient(replicator, handler, sourceDirFactory);
+    
+    // Publish two revisions without update, handler should be upgraded to latest
+    replicator.publish(createRevision(3));
+    replicator.publish(createRevision(4));
+    client.updateNow();
+  }
+  
+  @Test
+  public void testUpdateThread() throws Exception {
+    client.startUpdateThread(10, "indexTaxo");
+    
+    replicator.publish(createRevision(1));
+    assertHandlerRevision(1, handlerIndexDir);
+    
+    replicator.publish(createRevision(2));
+    assertHandlerRevision(2, handlerIndexDir);
+    
+    // Publish two revisions without update, handler should be upgraded to latest
+    replicator.publish(createRevision(3));
+    replicator.publish(createRevision(4));
+    assertHandlerRevision(4, handlerIndexDir);
+  }
+  
+  @Test
+  public void testRecreateTaxonomy() throws Exception {
+    replicator.publish(createRevision(1));
+    client.updateNow();
+    
+    // recreate index and taxonomy
+    Directory newTaxo = newDirectory();
+    new DirectoryTaxonomyWriter(newTaxo).close();
+    publishTaxoWriter.replaceTaxonomy(newTaxo);
+    publishIndexWriter.deleteAll();
+    replicator.publish(createRevision(2));
+    
+    client.updateNow();
+    newTaxo.close();
+  }
+
+  /*
+   * This test verifies that the client and handler do not end up in a corrupt
+   * index if exceptions are thrown at any point during replication. Either when
+   * a client copies files from the server to the temporary space, or when the
+   * handler copies them to the index directory.
+   */
+  @Test
+  public void testConsistencyOnExceptions() throws Exception {
+    // so the handler's index isn't empty
+    replicator.publish(createRevision(1));
+    client.updateNow();
+    client.close();
+    callback.close();
+
+    // Replicator violates write-once policy. It may be that the
+    // handler copies files to the index dir, then fails to copy a
+    // file and reverts the copy operation. On the next attempt, it
+    // will copy the same file again. There is nothing wrong with this
+    // in a real system, but it does violate write-once, and MDW
+    // doesn't like it. Disabling it means that we won't catch cases
+    // where the handler overwrites an existing index file, but
+    // there's nothing currently we can do about it, unless we don't
+    // use MDW.
+    handlerIndexDir.setPreventDoubleWrite(false);
+    handlerTaxoDir.setPreventDoubleWrite(false);
+
+    // wrap sourceDirFactory to return a MockDirWrapper so we can simulate errors
+    final SourceDirectoryFactory in = sourceDirFactory;
+    final AtomicInteger failures = new AtomicInteger(atLeast(10));
+    sourceDirFactory = new SourceDirectoryFactory() {
+      
+      private long clientMaxSize = 100, handlerIndexMaxSize = 100, handlerTaxoMaxSize = 100;
+      private double clientExRate = 1.0, handlerIndexExRate = 1.0, handlerTaxoExRate = 1.0;
+      
+      @Override
+      public void cleanupSession(String sessionID) throws IOException {
+        in.cleanupSession(sessionID);
+      }
+      
+      @SuppressWarnings("synthetic-access")
+      @Override
+      public Directory getDirectory(String sessionID, String source) throws IOException {
+        Directory dir = in.getDirectory(sessionID, source);
+        if (random().nextBoolean() && failures.get() > 0) { // client should fail, return wrapped dir
+          MockDirectoryWrapper mdw = new MockDirectoryWrapper(random(), dir);
+          mdw.setRandomIOExceptionRateOnOpen(clientExRate);
+          mdw.setMaxSizeInBytes(clientMaxSize);
+          mdw.setRandomIOExceptionRate(clientExRate);
+          mdw.setCheckIndexOnClose(false);
+          clientMaxSize *= 2;
+          clientExRate /= 2;
+          return mdw;
+        }
+        
+        if (failures.get() > 0 && random().nextBoolean()) { // handler should fail
+          if (random().nextBoolean()) { // index dir fail
+            handlerIndexDir.setMaxSizeInBytes(handlerIndexMaxSize);
+            handlerIndexDir.setRandomIOExceptionRate(handlerIndexExRate);
+            handlerIndexDir.setRandomIOExceptionRateOnOpen(handlerIndexExRate);
+            handlerIndexMaxSize *= 2;
+            handlerIndexExRate /= 2;
+          } else { // taxo dir fail
+            handlerTaxoDir.setMaxSizeInBytes(handlerTaxoMaxSize);
+            handlerTaxoDir.setRandomIOExceptionRate(handlerTaxoExRate);
+            handlerTaxoDir.setRandomIOExceptionRateOnOpen(handlerTaxoExRate);
+            handlerTaxoDir.setCheckIndexOnClose(false);
+            handlerTaxoMaxSize *= 2;
+            handlerTaxoExRate /= 2;
+          }
+        } else {
+          // disable all errors
+          handlerIndexDir.setMaxSizeInBytes(0);
+          handlerIndexDir.setRandomIOExceptionRate(0.0);
+          handlerIndexDir.setRandomIOExceptionRateOnOpen(0.0);
+          handlerTaxoDir.setMaxSizeInBytes(0);
+          handlerTaxoDir.setRandomIOExceptionRate(0.0);
+          handlerTaxoDir.setRandomIOExceptionRateOnOpen(0.0);
+        }
+
+        return dir;
+      }
+    };
+    
+    handler = new IndexAndTaxonomyReplicationHandler(handlerIndexDir, handlerTaxoDir, new Callable<Boolean>() {
+      @Override
+      public Boolean call() throws Exception {
+        if (random().nextDouble() < 0.2 && failures.get() > 0) {
+          throw new RuntimeException("random exception from callback");
+        }
+        return null;
+      }
+    });
+
+    // wrap handleUpdateException so we can act on the thrown exception
+    client = new ReplicationClient(replicator, handler, sourceDirFactory) {
+      @SuppressWarnings("synthetic-access")
+      @Override
+      protected void handleUpdateException(Throwable t) {
+        if (t instanceof IOException) {
+          try {
+            if (VERBOSE) {
+              System.out.println("hit exception during update: " + t);
+              t.printStackTrace(System.out);
+            }
+
+            // test that the index can be read and also some basic statistics
+            DirectoryReader reader = DirectoryReader.open(handlerIndexDir.getDelegate());
+            try {
+              int numDocs = reader.numDocs();
+              int version = Integer.parseInt(reader.getIndexCommit().getUserData().get(VERSION_ID), 16);
+              assertEquals(numDocs, version);
+            } finally {
+              reader.close();
+            }
+            // verify index is fully consistent
+            _TestUtil.checkIndex(handlerIndexDir.getDelegate());
+            
+            // verify taxonomy index is fully consistent (since we only add one
+            // category to all documents, there's nothing much more to validate
+            _TestUtil.checkIndex(handlerTaxoDir.getDelegate());
+          } catch (IOException e) {
+            throw new RuntimeException(e);
+          } finally {
+            // count-down number of failures
+            failures.decrementAndGet();
+            assert failures.get() >= 0 : "handler failed too many times: " + failures.get();
+            if (VERBOSE) {
+              if (failures.get() == 0) {
+                System.out.println("no more failures expected");
+              } else {
+                System.out.println("num failures left: " + failures.get());
+              }
+            }
+          }
+        } else {
+          if (t instanceof RuntimeException) throw (RuntimeException) t;
+          throw new RuntimeException(t);
+        }
+      }
+    };
+    
+    client.startUpdateThread(10, "indexAndTaxo");
+    
+    final Directory baseHandlerIndexDir = handlerIndexDir.getDelegate();
+    int numRevisions = atLeast(20) + 2;
+    for (int i = 2; i < numRevisions; i++) {
+      replicator.publish(createRevision(i));
+      assertHandlerRevision(i, baseHandlerIndexDir);
+    }
+
+    // disable errors -- maybe randomness didn't exhaust all allowed failures,
+    // and we don't want e.g. CheckIndex to hit false errors. 
+    handlerIndexDir.setMaxSizeInBytes(0);
+    handlerIndexDir.setRandomIOExceptionRate(0.0);
+    handlerIndexDir.setRandomIOExceptionRateOnOpen(0.0);
+    handlerTaxoDir.setMaxSizeInBytes(0);
+    handlerTaxoDir.setRandomIOExceptionRate(0.0);
+    handlerTaxoDir.setRandomIOExceptionRateOnOpen(0.0);
+  }
+  
+}