You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@lucene.apache.org by er...@apache.org on 2015/05/22 20:58:29 UTC

svn commit: r1681186 [3/5] - in /lucene/dev/trunk/solr: ./ core/src/java/org/apache/solr/handler/ core/src/java/org/apache/solr/update/ core/src/java/org/apache/solr/update/processor/ core/src/java/org/apache/solr/util/ core/src/test-files/solr/collect...

Added: lucene/dev/trunk/solr/core/src/java/org/apache/solr/update/CdcrUpdateLog.java
URL: http://svn.apache.org/viewvc/lucene/dev/trunk/solr/core/src/java/org/apache/solr/update/CdcrUpdateLog.java?rev=1681186&view=auto
==============================================================================
--- lucene/dev/trunk/solr/core/src/java/org/apache/solr/update/CdcrUpdateLog.java (added)
+++ lucene/dev/trunk/solr/core/src/java/org/apache/solr/update/CdcrUpdateLog.java Fri May 22 18:58:29 2015
@@ -0,0 +1,560 @@
+package org.apache.solr.update;
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+import java.io.File;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.List;
+import java.util.Locale;
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.LinkedBlockingDeque;
+
+import org.apache.solr.common.SolrException;
+import org.apache.solr.core.SolrCore;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * An extension of the {@link org.apache.solr.update.UpdateLog} for the CDCR scenario.<br>
+ * Compared to the original update log implementation, transaction logs are removed based on
+ * pointers instead of a fixed size limit. Pointers are created by the CDC replicators and
+ * correspond to replication checkpoints. If all pointers are ahead of a transaction log,
+ * this transaction log is removed.<br>
+ * Given that the number of transaction logs can become considerable if some pointers are
+ * lagging behind, the {@link org.apache.solr.update.CdcrUpdateLog.CdcrLogReader} provides
+ * a {@link org.apache.solr.update.CdcrUpdateLog.CdcrLogReader#seek(long)} method to
+ * efficiently lookup a particular transaction log file given a version number.
+ */
+public class CdcrUpdateLog extends UpdateLog {
+
+  protected final Map<CdcrLogReader, CdcrLogPointer> logPointers = new ConcurrentHashMap<>();
+
+  /**
+   * A reader that will be used as toggle to turn on/off the buffering of tlogs
+   */
+  private CdcrLogReader bufferToggle;
+
+  public static String LOG_FILENAME_PATTERN = "%s.%019d.%1d";
+
+  protected static Logger log = LoggerFactory.getLogger(CdcrUpdateLog.class);
+
+  @Override
+  public void init(UpdateHandler uhandler, SolrCore core) {
+    // remove dangling readers
+    for (CdcrLogReader reader : logPointers.keySet()) {
+      reader.close();
+    }
+    logPointers.clear();
+
+    // init
+    super.init(uhandler, core);
+  }
+
+  @Override
+  public TransactionLog newTransactionLog(File tlogFile, Collection<String> globalStrings, boolean openExisting) {
+    return new CdcrTransactionLog(tlogFile, globalStrings, openExisting);
+  }
+
+  @Override
+  protected void addOldLog(TransactionLog oldLog, boolean removeOld) {
+    if (oldLog == null) return;
+
+    numOldRecords += oldLog.numRecords();
+
+    int currRecords = numOldRecords;
+
+    if (oldLog != tlog && tlog != null) {
+      currRecords += tlog.numRecords();
+    }
+
+    while (removeOld && logs.size() > 0) {
+      TransactionLog log = logs.peekLast();
+      int nrec = log.numRecords();
+
+      // remove oldest log if we don't need it to keep at least numRecordsToKeep, or if
+      // we already have the limit of 10 log files.
+      if (currRecords - nrec >= numRecordsToKeep || logs.size() >= 10) {
+        // remove the oldest log if nobody points to it
+        if (!this.hasLogPointer(log)) {
+          currRecords -= nrec;
+          numOldRecords -= nrec;
+          TransactionLog last = logs.removeLast();
+          last.deleteOnClose = true;
+          last.close();  // it will be deleted if no longer in use
+          continue;
+        }
+        // we have one log with one pointer, we should stop removing logs
+        break;
+      }
+
+      break;
+    }
+
+    // Decref old log as we do not write to it anymore
+    // If the oldlog is uncapped, i.e., a write commit has to be performed
+    // during recovery, the output stream will be automatically re-open when
+    // TransaactionLog#incref will be called.
+    oldLog.deleteOnClose = false;
+    oldLog.decref();
+
+    // don't incref... we are taking ownership from the caller.
+    logs.addFirst(oldLog);
+  }
+
+  /**
+   * Checks if one of the log pointer is pointing to the given tlog.
+   */
+  private boolean hasLogPointer(TransactionLog tlog) {
+    for (CdcrLogPointer pointer : logPointers.values()) {
+      // if we have a pointer that is not initialised, then do not remove the old tlogs
+      // as we have a log reader that didn't pick them up yet.
+      if (!pointer.isInitialised()) {
+        return true;
+      }
+
+      if (pointer.tlogFile == tlog.tlogFile) {
+        return true;
+      }
+    }
+    return false;
+  }
+
+  @Override
+  public long getLastLogId() {
+    if (id != -1) return id;
+    if (tlogFiles.length == 0) return -1;
+    String last = tlogFiles[tlogFiles.length - 1];
+    return Long.parseLong(last.substring(TLOG_NAME.length() + 1, last.lastIndexOf('.')));
+  }
+
+  @Override
+  public void add(AddUpdateCommand cmd, boolean clearCaches) {
+    // Ensure we create a new tlog file following our filename format,
+    // the variable tlog will be not null, and the ensureLog of the parent will be skipped
+    synchronized (this) {
+      if ((cmd.getFlags() & UpdateCommand.REPLAY) == 0) {
+        ensureLog(cmd.getVersion());
+      }
+    }
+    // Then delegate to parent method
+    super.add(cmd, clearCaches);
+  }
+
+  @Override
+  public void delete(DeleteUpdateCommand cmd) {
+    // Ensure we create a new tlog file following our filename format
+    // the variable tlog will be not null, and the ensureLog of the parent will be skipped
+    synchronized (this) {
+      if ((cmd.getFlags() & UpdateCommand.REPLAY) == 0) {
+        ensureLog(cmd.getVersion());
+      }
+    }
+    // Then delegate to parent method
+    super.delete(cmd);
+  }
+
+  @Override
+  public void deleteByQuery(DeleteUpdateCommand cmd) {
+    // Ensure we create a new tlog file following our filename format
+    // the variable tlog will be not null, and the ensureLog of the parent will be skipped
+    synchronized (this) {
+      if ((cmd.getFlags() & UpdateCommand.REPLAY) == 0) {
+        ensureLog(cmd.getVersion());
+      }
+    }
+    // Then delegate to parent method
+    super.deleteByQuery(cmd);
+  }
+
+  /**
+   * Creates a new {@link org.apache.solr.update.CdcrUpdateLog.CdcrLogReader}
+   * initialised with the current list of tlogs.
+   */
+  public CdcrLogReader newLogReader() {
+    return new CdcrLogReader(new ArrayList(logs), tlog);
+  }
+
+  /**
+   * Enable the buffering of the tlogs. When buffering is activated, the update logs will not remove any
+   * old transaction log files.
+   */
+  public void enableBuffer() {
+    if (bufferToggle == null) {
+      bufferToggle = this.newLogReader();
+    }
+  }
+
+  /**
+   * Disable the buffering of the tlogs.
+   */
+  public void disableBuffer() {
+    if (bufferToggle != null) {
+      bufferToggle.close();
+      bufferToggle = null;
+    }
+  }
+
+  public CdcrLogReader getBufferToggle() {
+    return bufferToggle;
+  }
+
+  /**
+   * Is the update log buffering the tlogs ?
+   */
+  public boolean isBuffering() {
+    return bufferToggle == null ? false : true;
+  }
+
+  protected void ensureLog(long startVersion) {
+    if (tlog == null) {
+      long absoluteVersion = Math.abs(startVersion); // version is negative for deletes
+      if (tlog == null) {
+        String newLogName = String.format(Locale.ROOT, LOG_FILENAME_PATTERN, TLOG_NAME, id, absoluteVersion);
+        tlog = new CdcrTransactionLog(new File(tlogDir, newLogName), globalStrings);
+      }
+
+      // push the new tlog to the opened readers
+      for (CdcrLogReader reader : logPointers.keySet()) {
+        reader.push(tlog);
+      }
+    }
+  }
+
+  @Override
+  public void close(boolean committed, boolean deleteOnClose) {
+    for (CdcrLogReader reader : new ArrayList<>(logPointers.keySet())) {
+      reader.close();
+    }
+    super.close(committed, deleteOnClose);
+  }
+
+  private static class CdcrLogPointer {
+
+    File tlogFile = null;
+
+    private CdcrLogPointer() {
+    }
+
+    private void set(File tlogFile) {
+      this.tlogFile = tlogFile;
+    }
+
+    private boolean isInitialised() {
+      return tlogFile == null ? false : true;
+    }
+
+    @Override
+    public String toString() {
+      return "CdcrLogPointer(" + tlogFile + ")";
+    }
+
+  }
+
+  public class CdcrLogReader {
+
+    private TransactionLog currentTlog;
+    private TransactionLog.LogReader tlogReader;
+
+    // we need to use a blocking deque because of #getNumberOfRemainingRecords
+    private final LinkedBlockingDeque<TransactionLog> tlogs;
+    private final CdcrLogPointer pointer;
+
+    /**
+     * Used to record the last position of the tlog
+     */
+    private long lastPositionInTLog = 0;
+
+    /**
+     * lastVersion is used to get nextToLastVersion
+     */
+    private long lastVersion = -1;
+
+    /**
+     * nextToLastVersion is communicated by leader to replicas so that they can remove no longer needed tlogs
+     * <p>
+     * nextToLastVersion is used because thanks to {@link #resetToLastPosition()} lastVersion can become the current version
+     */
+    private long nextToLastVersion = -1;
+
+    /**
+     * Used to record the number of records read in the current tlog
+     */
+    private long numRecordsReadInCurrentTlog = 0;
+
+    private CdcrLogReader(List<TransactionLog> tlogs, TransactionLog tlog) {
+      this.tlogs = new LinkedBlockingDeque<>();
+      this.tlogs.addAll(tlogs);
+      if (tlog != null) this.tlogs.push(tlog); // ensure that the tlog being written is pushed
+
+      // Register the pointer in the parent UpdateLog
+      pointer = new CdcrLogPointer();
+      logPointers.put(this, pointer);
+
+      // If the reader is initialised while the updates log is empty, do nothing
+      if ((currentTlog = this.tlogs.peekLast()) != null) {
+        tlogReader = currentTlog.getReader(0);
+        pointer.set(currentTlog.tlogFile);
+        numRecordsReadInCurrentTlog = 0;
+        log.debug("Init new tlog reader for {} - tlogReader = {}", currentTlog.tlogFile, tlogReader);
+      }
+    }
+
+    private void push(TransactionLog tlog) {
+      this.tlogs.push(tlog);
+
+      // The reader was initialised while the update logs was empty, or reader was exhausted previously,
+      // we have to update the current tlog and the associated tlog reader.
+      if (currentTlog == null && !tlogs.isEmpty()) {
+        currentTlog = tlogs.peekLast();
+        tlogReader = currentTlog.getReader(0);
+        pointer.set(currentTlog.tlogFile);
+        numRecordsReadInCurrentTlog = 0;
+        log.debug("Init new tlog reader for {} - tlogReader = {}", currentTlog.tlogFile, tlogReader);
+      }
+    }
+
+    /**
+     * Expert: Instantiate a sub-reader. A sub-reader is used for batch updates. It allows to iterates over the
+     * update logs entries without modifying the state of the parent log reader. If the batch update fails, the state
+     * of the sub-reader is discarded and the state of the parent reader is not modified. If the batch update
+     * is successful, the sub-reader is used to fast forward the parent reader with the method
+     * {@link #forwardSeek(org.apache.solr.update.CdcrUpdateLog.CdcrLogReader)}.
+     */
+    public CdcrLogReader getSubReader() {
+      // Add the last element of the queue to properly initialise the pointer and log reader
+      CdcrLogReader clone = new CdcrLogReader(new ArrayList<TransactionLog>(), this.tlogs.peekLast());
+      clone.tlogs.clear(); // clear queue before copy
+      clone.tlogs.addAll(tlogs); // perform a copy of the list
+      clone.lastPositionInTLog = this.lastPositionInTLog;
+      clone.numRecordsReadInCurrentTlog = this.numRecordsReadInCurrentTlog;
+      clone.lastVersion = this.lastVersion;
+      clone.nextToLastVersion = this.nextToLastVersion;
+
+      // If the update log is not empty, we need to initialise the tlog reader
+      // NB: the tlogReader is equal to null if the update log is empty
+      if (tlogReader != null) {
+        clone.tlogReader.close();
+        clone.tlogReader = currentTlog.getReader(this.tlogReader.currentPos());
+      }
+
+      return clone;
+    }
+
+    /**
+     * Expert: Fast forward this log reader with a log subreader. The subreader will be closed after calling this
+     * method. In order to avoid unexpected results, the log
+     * subreader must be created from this reader with the method {@link #getSubReader()}.
+     */
+    public void forwardSeek(CdcrLogReader subReader) {
+      // If a subreader has a null tlog reader, does nothing
+      // This can happend if a subreader is instantiated from a non-initialised parent reader, or if the subreader
+      // has been closed.
+      if (subReader.tlogReader == null) {
+        return;
+      }
+
+      tlogReader.close(); // close the existing reader, a new one will be created
+      while (this.tlogs.peekLast().id < subReader.tlogs.peekLast().id) {
+        tlogs.removeLast();
+        currentTlog = tlogs.peekLast();
+      }
+      assert this.tlogs.peekLast().id == subReader.tlogs.peekLast().id;
+      this.pointer.set(currentTlog.tlogFile);
+      this.lastPositionInTLog = subReader.lastPositionInTLog;
+      this.numRecordsReadInCurrentTlog = subReader.numRecordsReadInCurrentTlog;
+      this.lastVersion = subReader.lastVersion;
+      this.nextToLastVersion = subReader.nextToLastVersion;
+      this.tlogReader = currentTlog.getReader(subReader.tlogReader.currentPos());
+    }
+
+    /**
+     * Advances to the next log entry in the updates log and returns the log entry itself.
+     * Returns null if there are no more log entries in the updates log.<br>
+     * <p>
+     * <b>NOTE:</b> after the reader has exhausted, you can call again this method since the updates
+     * log might have been updated with new entries.
+     */
+    public Object next() throws IOException, InterruptedException {
+      while (!tlogs.isEmpty()) {
+        lastPositionInTLog = tlogReader.currentPos();
+        Object o = tlogReader.next();
+
+        if (o != null) {
+          pointer.set(currentTlog.tlogFile);
+          nextToLastVersion = lastVersion;
+          lastVersion = getVersion(o);
+          numRecordsReadInCurrentTlog++;
+          return o;
+        }
+
+        if (tlogs.size() > 1) { // if the current tlog is not the newest one, we can advance to the next one
+          tlogReader.close();
+          tlogs.removeLast();
+          currentTlog = tlogs.peekLast();
+          tlogReader = currentTlog.getReader(0);
+          pointer.set(currentTlog.tlogFile);
+          numRecordsReadInCurrentTlog = 0;
+          log.debug("Init new tlog reader for {} - tlogReader = {}", currentTlog.tlogFile, tlogReader);
+        } else {
+          // the only tlog left is the new tlog which is currently being written,
+          // we should not remove it as we have to try to read it again later.
+          return null;
+        }
+      }
+
+      return null;
+    }
+
+    /**
+     * Advances to the first beyond the current whose version number is greater
+     * than or equal to <i>targetVersion</i>.<br>
+     * Returns true if the reader has been advanced. If <i>targetVersion</i> is
+     * greater than the highest version number in the updates log, the reader
+     * has been advanced to the end of the current tlog, and a call to
+     * {@link #next()} will probably return null.<br>
+     * Returns false if <i>targetVersion</i> is lower than the oldest known entry.
+     * In this scenario, it probably means that there is a gap in the updates log.<br>
+     * <p>
+     * <b>NOTE:</b> This method must be called before the first call to {@link #next()}.
+     */
+    public boolean seek(long targetVersion) throws IOException, InterruptedException {
+      Object o;
+      // version is negative for deletes - ensure that we are manipulating absolute version numbers.
+      targetVersion = Math.abs(targetVersion);
+
+      if (tlogs.isEmpty() || !this.seekTLog(targetVersion)) {
+        return false;
+      }
+
+      // now that we might be on the right tlog, iterates over the entries to find the one we are looking for
+      while ((o = this.next()) != null) {
+        if (this.getVersion(o) >= targetVersion) {
+          this.resetToLastPosition();
+          return true;
+        }
+      }
+
+      return true;
+    }
+
+    /**
+     * Seeks the tlog associated to the target version by using the updates log index,
+     * and initialises the log reader to the start of the tlog. Returns true if it was able
+     * to seek the corresponding tlog, false if the <i>targetVersion</i> is lower than the
+     * oldest known entry (which probably indicates a gap).<br>
+     * <p>
+     * <b>NOTE:</b> This method might modify the tlog queue by removing tlogs that are older
+     * than the target version.
+     */
+    private boolean seekTLog(long targetVersion) {
+      // if the target version is lower than the oldest known entry, we have probably a gap.
+      if (targetVersion < ((CdcrTransactionLog) tlogs.peekLast()).startVersion) {
+        return false;
+      }
+
+      // closes existing reader before performing seek and possibly modifying the queue;
+      tlogReader.close();
+
+      // iterates over the queue and removes old tlogs
+      TransactionLog last = null;
+      while (tlogs.size() > 1) {
+        if (((CdcrTransactionLog) tlogs.peekLast()).startVersion >= targetVersion) {
+          break;
+        }
+        last = tlogs.pollLast();
+      }
+
+      // the last tlog removed is the one we look for, add it back to the queue
+      if (last != null) tlogs.addLast(last);
+
+      currentTlog = tlogs.peekLast();
+      tlogReader = currentTlog.getReader(0);
+      pointer.set(currentTlog.tlogFile);
+      numRecordsReadInCurrentTlog = 0;
+
+      return true;
+    }
+
+    /**
+     * Extracts the version number and converts it to its absolute form.
+     */
+    private long getVersion(Object o) {
+      List entry = (List) o;
+      // version is negative for delete, ensure that we are manipulating absolute version numbers
+      return Math.abs((Long) entry.get(1));
+    }
+
+    /**
+     * If called after {@link #next()}, it resets the reader to its last position.
+     */
+    public void resetToLastPosition() {
+      try {
+        if (tlogReader != null) {
+          tlogReader.fis.seek(lastPositionInTLog);
+          numRecordsReadInCurrentTlog--;
+          lastVersion = nextToLastVersion;
+        }
+      } catch (IOException e) {
+        log.error("Failed to seek last position in tlog", e);
+        throw new SolrException(SolrException.ErrorCode.SERVER_ERROR, "Failed to seek last position in tlog", e);
+      }
+    }
+
+    /**
+     * Returns the number of remaining records (including commit but excluding header) to be read in the logs.
+     */
+    public long getNumberOfRemainingRecords() {
+      long numRemainingRecords = 0;
+
+      synchronized (tlogs) {
+        for (TransactionLog tlog : tlogs) {
+          numRemainingRecords += tlog.numRecords() - 1; // minus 1 as the number of records returned by the tlog includes the header
+        }
+      }
+
+      return numRemainingRecords - numRecordsReadInCurrentTlog;
+    }
+
+    /**
+     * Closes streams and remove the associated {@link org.apache.solr.update.CdcrUpdateLog.CdcrLogPointer} from the
+     * parent {@link org.apache.solr.update.CdcrUpdateLog}.
+     */
+    public void close() {
+      if (tlogReader != null) {
+        tlogReader.close();
+        tlogReader = null;
+        currentTlog = null;
+      }
+      tlogs.clear();
+      logPointers.remove(this);
+    }
+
+    /**
+     * Returns the absolute form of the version number of the last entry read. If the current version is equal
+     * to 0 (because of a commit), it will return the next to last version number.
+     */
+    public long getLastVersion() {
+      return lastVersion == 0 ? nextToLastVersion : lastVersion;
+    }
+  }
+
+}
+

Modified: lucene/dev/trunk/solr/core/src/java/org/apache/solr/update/TransactionLog.java
URL: http://svn.apache.org/viewvc/lucene/dev/trunk/solr/core/src/java/org/apache/solr/update/TransactionLog.java?rev=1681186&r1=1681185&r2=1681186&view=diff
==============================================================================
--- lucene/dev/trunk/solr/core/src/java/org/apache/solr/update/TransactionLog.java (original)
+++ lucene/dev/trunk/solr/core/src/java/org/apache/solr/update/TransactionLog.java Fri May 22 18:58:29 2015
@@ -75,7 +75,7 @@ public class TransactionLog {
   OutputStream os;
   FastOutputStream fos;    // all accesses to this stream should be synchronized on "this" (The TransactionLog)
   int numRecords;
-  
+
   protected volatile boolean deleteOnClose = true;  // we can delete old tlogs since they are currently only used for real-time-get (and in the future, recovery)
 
   AtomicInteger refcount = new AtomicInteger(1);
@@ -84,7 +84,7 @@ public class TransactionLog {
 
   long snapshot_size;
   int snapshot_numRecords;
-  
+
   // write a BytesRef as a byte array
   JavaBinCodec.ObjectResolver resolver = new JavaBinCodec.ObjectResolver() {
     @Override
@@ -133,10 +133,8 @@ public class TransactionLog {
       }
     }
 
-
   }
 
-
   TransactionLog(File tlogFile, Collection<String> globalStrings) {
     this(tlogFile, globalStrings, false);
   }
@@ -148,6 +146,10 @@ public class TransactionLog {
         log.debug("New TransactionLog file=" + tlogFile + ", exists=" + tlogFile.exists() + ", size=" + tlogFile.length() + ", openExisting=" + openExisting);
       }
 
+      // Parse tlog id from the filename
+      String filename = tlogFile.getName();
+      id = Long.parseLong(filename.substring(filename.indexOf('.') + 1, filename.indexOf('.') + 20));
+
       this.tlogFile = tlogFile;
       raf = new RandomAccessFile(this.tlogFile, "rw");
       long start = raf.length();
@@ -214,7 +216,6 @@ public class TransactionLog {
       size = fos.size();
     }
 
-    
     // the end of the file should have the end message (added during a commit) plus a 4 byte size
     byte[] buf = new byte[ END_MESSAGE.length() ];
     long pos = size - END_MESSAGE.length() - 4;
@@ -234,9 +235,9 @@ public class TransactionLog {
       snapshot_size = fos.size();
       snapshot_numRecords = numRecords;
       return snapshot_size;
-    }    
+    }
   }
-  
+
   // This could mess with any readers or reverse readers that are open, or anything that might try to do a log lookup.
   // This should only be used to roll back buffered updates, not actually applied updates.
   public void rollback(long pos) throws IOException {
@@ -250,7 +251,6 @@ public class TransactionLog {
     }
   }
 
-
   public long writeData(Object o) {
     LogCodec codec = new LogCodec(resolver);
     try {
@@ -323,7 +323,7 @@ public class TransactionLog {
 
   private void checkWriteHeader(LogCodec codec, SolrInputDocument optional) throws IOException {
 
-    // Unsynchronized access.  We can get away with an unsynchronized access here
+    // Unsynchronized access. We can get away with an unsynchronized access here
     // since we will never get a false non-zero when the position is in fact 0.
     // rollback() is the only function that can reset to zero, and it blocks updates.
     if (fos.size() != 0) return;
@@ -454,7 +454,7 @@ public class TransactionLog {
         codec.writeStr(END_MESSAGE);  // ensure these bytes are (almost) last in the file
 
         endRecord(pos);
-        
+
         fos.flush();  // flush since this will be the last record in a log fill
         assert fos.size() == channel.size();
 
@@ -561,7 +561,7 @@ public class TransactionLog {
       assert ObjectReleaseTracker.release(this);
     }
   }
-  
+
   public void forceClose() {
     if (refcount.get() > 0) {
       log.error("Error: Forcing close of " + this);
@@ -595,7 +595,7 @@ public class TransactionLog {
   }
 
   public class LogReader {
-    private ChannelFastInputStream fis;
+    protected ChannelFastInputStream fis;
     private LogCodec codec = new LogCodec(resolver);
 
     public LogReader(long startingPos) {
@@ -614,7 +614,6 @@ public class TransactionLog {
     public Object next() throws IOException, InterruptedException {
       long pos = fis.position();
 
-
       synchronized (TransactionLog.this) {
         if (trace) {
           log.trace("Reading log record.  pos="+pos+" currentSize="+fos.size());
@@ -664,7 +663,7 @@ public class TransactionLog {
     public long currentPos() {
       return fis.position();
     }
-    
+
     // returns best effort current size
     // for info purposes
     public long currentSize() throws IOException {
@@ -675,8 +674,6 @@ public class TransactionLog {
 
   public abstract class ReverseReader {
 
-
-
     /** Returns the next object from the log, or null if none available.
      *
      * @return The log record, or null if EOF
@@ -691,9 +688,8 @@ public class TransactionLog {
     @Override
     public abstract String toString() ;
 
-
   }
-  
+
   public class FSReverseReader extends ReverseReader {
     ChannelFastInputStream fis;
     private LogCodec codec = new LogCodec(resolver) {
@@ -727,7 +723,6 @@ public class TransactionLog {
       }
     }
 
-
     /** Returns the next object from the log, or null if none available.
      *
      * @return The log record, or null if EOF
@@ -835,7 +830,7 @@ class ChannelFastInputStream extends Fas
   public void close() throws IOException {
     ch.close();
   }
-  
+
   @Override
   public String toString() {
     return "readFromStream="+readFromStream +" pos="+pos +" end="+end + " bufferPos="+getBufferPos() + " position="+position() ;

Modified: lucene/dev/trunk/solr/core/src/java/org/apache/solr/update/UpdateHandler.java
URL: http://svn.apache.org/viewvc/lucene/dev/trunk/solr/core/src/java/org/apache/solr/update/UpdateHandler.java?rev=1681186&r1=1681185&r2=1681186&view=diff
==============================================================================
--- lucene/dev/trunk/solr/core/src/java/org/apache/solr/update/UpdateHandler.java (original)
+++ lucene/dev/trunk/solr/core/src/java/org/apache/solr/update/UpdateHandler.java Fri May 22 18:58:29 2015
@@ -82,8 +82,8 @@ public abstract class UpdateHandler impl
     for (SolrEventListener listener : softCommitCallbacks) {
       listener.postSoftCommit();
     }
-  }  
-  
+  }
+
   protected void callPostOptimizeCallbacks() {
     for (SolrEventListener listener : optimizeCallbacks) {
       listener.postCommit();
@@ -93,18 +93,18 @@ public abstract class UpdateHandler impl
   public UpdateHandler(SolrCore core)  {
     this(core, null);
   }
-  
+
   public UpdateHandler(SolrCore core, UpdateLog updateLog)  {
     this.core=core;
     idField = core.getLatestSchema().getUniqueKeyField();
     idFieldType = idField!=null ? idField.getType() : null;
     parseEventListeners();
     PluginInfo ulogPluginInfo = core.getSolrConfig().getPluginInfo(UpdateLog.class.getName());
-    
+
 
     if (updateLog == null && ulogPluginInfo != null && ulogPluginInfo.isEnabled()) {
       String dataDir = (String)ulogPluginInfo.initArgs.get("dir");
-      
+
       String ulogDir = core.getCoreDescriptor().getUlogDir();
       if (ulogDir != null) {
         dataDir = ulogDir;
@@ -112,7 +112,7 @@ public abstract class UpdateHandler impl
       if (dataDir == null || dataDir.length()==0) {
         dataDir = core.getDataDir();
       }
-           
+
       if (dataDir != null && dataDir.startsWith("hdfs:/")) {
         DirectoryFactory dirFactory = core.getDirectoryFactory();
         if (dirFactory instanceof HdfsDirectoryFactory) {
@@ -120,17 +120,18 @@ public abstract class UpdateHandler impl
         } else {
           ulog = new HdfsUpdateLog();
         }
-        
+
       } else {
-        ulog = new UpdateLog();
+        String className = ulogPluginInfo.className == null ? UpdateLog.class.getName() : ulogPluginInfo.className;
+        ulog = core.getResourceLoader().newInstance(className, UpdateLog.class);
       }
-      
+
       if (!core.isReloaded() && !core.getDirectoryFactory().isPersistent()) {
         ulog.clearLog(core, ulogPluginInfo);
       }
-      
+
       log.info("Using UpdateLog implementation: " + ulog.getClass().getName());
-      
+
       ulog.init(ulogPluginInfo);
 
       ulog.init(this, core);
@@ -144,9 +145,9 @@ public abstract class UpdateHandler impl
   /**
    * Called when the Writer should be opened again - eg when replication replaces
    * all of the index files.
-   * 
+   *
    * @param rollback IndexWriter if true else close
-   * 
+   *
    * @throws IOException If there is a low-level I/O error.
    */
   public abstract void newIndexWriter(boolean rollback) throws IOException;
@@ -173,7 +174,7 @@ public abstract class UpdateHandler impl
   {
     commitCallbacks.add( listener );
   }
-  
+
   /**
    * NOTE: this function is not thread safe.  However, it is safe to call within the
    * <code>inform( SolrCore core )</code> function for <code>SolrCoreAware</code> classes.

Modified: lucene/dev/trunk/solr/core/src/java/org/apache/solr/update/UpdateLog.java
URL: http://svn.apache.org/viewvc/lucene/dev/trunk/solr/core/src/java/org/apache/solr/update/UpdateLog.java?rev=1681186&r1=1681185&r2=1681186&view=diff
==============================================================================
--- lucene/dev/trunk/solr/core/src/java/org/apache/solr/update/UpdateLog.java (original)
+++ lucene/dev/trunk/solr/core/src/java/org/apache/solr/update/UpdateLog.java Fri May 22 18:58:29 2015
@@ -17,9 +17,6 @@
 
 package org.apache.solr.update;
 
-import static org.apache.solr.update.processor.DistributedUpdateProcessor.DistribPhase.FROMLEADER;
-import static org.apache.solr.update.processor.DistributingUpdateProcessorFactory.DISTRIB_UPDATE_PARAM;
-
 import java.io.File;
 import java.io.FileNotFoundException;
 import java.io.FilenameFilter;
@@ -66,6 +63,9 @@ import org.apache.solr.util.plugin.Plugi
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import static org.apache.solr.update.processor.DistributedUpdateProcessor.DistribPhase.FROMLEADER;
+import static org.apache.solr.update.processor.DistributingUpdateProcessorFactory.DISTRIB_UPDATE_PARAM;
+
 
 /** @lucene.experimental */
 public class UpdateLog implements PluginInfoInitialized {
@@ -138,7 +138,7 @@ public class UpdateLog implements Plugin
   protected Map<BytesRef,LogPtr> prevMap;  // used while committing/reopening is happening
   protected Map<BytesRef,LogPtr> prevMap2;  // used while committing/reopening is happening
   protected TransactionLog prevMapLog;  // the transaction log used to look up entries found in prevMap
-  protected TransactionLog prevMapLog2;  // the transaction log used to look up entries found in prevMap
+  protected TransactionLog prevMapLog2;  // the transaction log used to look up entries found in prevMap2
 
   protected final int numDeletesToKeep = 1000;
   protected final int numDeletesByQueryToKeep = 100;
@@ -281,12 +281,12 @@ public class UpdateLog implements Plugin
     if (debug) {
       log.debug("UpdateHandler init: tlogDir=" + tlogDir + ", existing tlogs=" + Arrays.asList(tlogFiles) + ", next id=" + id);
     }
-    
+
     TransactionLog oldLog = null;
     for (String oldLogName : tlogFiles) {
       File f = new File(tlogDir, oldLogName);
       try {
-        oldLog = new TransactionLog( f, null, true );
+        oldLog = newTransactionLog(f, null, true);
         addOldLog(oldLog, false);  // don't remove old logs on startup since more than one may be uncapped.
       } catch (Exception e) {
         SolrException.log(log, "Failure to open existing log file (non fatal) " + f, e);
@@ -335,11 +335,19 @@ public class UpdateLog implements Plugin
     }
 
   }
-  
+
+  /**
+   * Returns a new {@link org.apache.solr.update.TransactionLog}. Sub-classes can override this method to
+   * change the implementation of the transaction log.
+   */
+  public TransactionLog newTransactionLog(File tlogFile, Collection<String> globalStrings, boolean openExisting) {
+    return new TransactionLog(tlogFile, globalStrings, openExisting);
+  }
+
   public String getLogDir() {
     return tlogDir.getAbsolutePath();
   }
-  
+
   public List<Long> getStartingVersions() {
     return startingVersions;
   }
@@ -381,7 +389,6 @@ public class UpdateLog implements Plugin
     logs.addFirst(oldLog);
   }
 
-
   public String[] getLogList(File directory) {
     final String prefix = TLOG_NAME+'.';
     String[] names = directory.list(new FilenameFilter() {
@@ -397,20 +404,17 @@ public class UpdateLog implements Plugin
     return names;
   }
 
-
   public long getLastLogId() {
     if (id != -1) return id;
     if (tlogFiles.length == 0) return -1;
     String last = tlogFiles[tlogFiles.length-1];
-    return Long.parseLong(last.substring(TLOG_NAME.length()+1));
+    return Long.parseLong(last.substring(TLOG_NAME.length() + 1));
   }
 
-
   public void add(AddUpdateCommand cmd) {
     add(cmd, false);
   }
 
-
   public void add(AddUpdateCommand cmd, boolean clearCaches) {
     // don't log if we are replaying from another log
     // TODO: we currently need to log to maintain correct versioning, rtg, etc
@@ -621,7 +625,7 @@ public class UpdateLog implements Plugin
   public boolean hasUncommittedChanges() {
     return tlog != null;
   }
-  
+
   public void preCommit(CommitUpdateCommand cmd) {
     synchronized (this) {
       if (debug) {
@@ -718,13 +722,13 @@ public class UpdateLog implements Plugin
       // SolrCore.verbose("TLOG: lookup: for id ",indexedId.utf8ToString(),"in map",System.identityHashCode(map),"got",entry,"lookupLog=",lookupLog);
       if (entry == null && prevMap != null) {
         entry = prevMap.get(indexedId);
-        // something found in prevMap will always be found in preMapLog (which could be tlog or prevTlog)
+        // something found in prevMap will always be found in prevMapLog (which could be tlog or prevTlog)
         lookupLog = prevMapLog;
         // SolrCore.verbose("TLOG: lookup: for id ",indexedId.utf8ToString(),"in prevMap",System.identityHashCode(map),"got",entry,"lookupLog=",lookupLog);
       }
       if (entry == null && prevMap2 != null) {
         entry = prevMap2.get(indexedId);
-        // something found in prevMap2 will always be found in preMapLog2 (which could be tlog or prevTlog)
+        // something found in prevMap2 will always be found in prevMapLog2 (which could be tlog or prevTlog)
         lookupLog = prevMapLog2;
         // SolrCore.verbose("TLOG: lookup: for id ",indexedId.utf8ToString(),"in prevMap2",System.identityHashCode(map),"got",entry,"lookupLog=",lookupLog);
       }
@@ -758,13 +762,13 @@ public class UpdateLog implements Plugin
       // SolrCore.verbose("TLOG: lookup ver: for id ",indexedId.utf8ToString(),"in map",System.identityHashCode(map),"got",entry,"lookupLog=",lookupLog);
       if (entry == null && prevMap != null) {
         entry = prevMap.get(indexedId);
-        // something found in prevMap will always be found in preMapLog (which could be tlog or prevTlog)
+        // something found in prevMap will always be found in prevMapLog (which could be tlog or prevTlog)
         lookupLog = prevMapLog;
         // SolrCore.verbose("TLOG: lookup ver: for id ",indexedId.utf8ToString(),"in prevMap",System.identityHashCode(map),"got",entry,"lookupLog=",lookupLog);
       }
       if (entry == null && prevMap2 != null) {
         entry = prevMap2.get(indexedId);
-        // something found in prevMap2 will always be found in preMapLog2 (which could be tlog or prevTlog)
+        // something found in prevMap2 will always be found in prevMapLog2 (which could be tlog or prevTlog)
         lookupLog = prevMapLog2;
         // SolrCore.verbose("TLOG: lookup ver: for id ",indexedId.utf8ToString(),"in prevMap2",System.identityHashCode(map),"got",entry,"lookupLog=",lookupLog);
       }
@@ -860,7 +864,7 @@ public class UpdateLog implements Plugin
   protected void ensureLog() {
     if (tlog == null) {
       String newLogName = String.format(Locale.ROOT, LOG_FILENAME_PATTERN, TLOG_NAME, id);
-      tlog = new TransactionLog(new File(tlogDir, newLogName), globalStrings);
+      tlog = newTransactionLog(new File(tlogDir, newLogName), globalStrings, false);
     }
   }
 
@@ -879,11 +883,11 @@ public class UpdateLog implements Plugin
       theLog.forceClose();
     }
   }
-  
+
   public void close(boolean committed) {
     close(committed, false);
   }
-  
+
   public void close(boolean committed, boolean deleteOnClose) {
     synchronized (this) {
       recoveryExecutor.shutdown(); // no new tasks
@@ -924,7 +928,7 @@ public class UpdateLog implements Plugin
       this.id = id;
     }
   }
-  
+
   public class RecentUpdates {
     Deque<TransactionLog> logList;    // newest first
     List<List<Update>> updateList;
@@ -935,17 +939,17 @@ public class UpdateLog implements Plugin
 
     public List<Long> getVersions(int n) {
       List<Long> ret = new ArrayList(n);
-      
+
       for (List<Update> singleList : updateList) {
         for (Update ptr : singleList) {
           ret.add(ptr.version);
           if (--n <= 0) return ret;
         }
       }
-      
+
       return ret;
     }
-    
+
     public Object lookup(long version) {
       Update update = updates.get(version);
       if (update == null) return null;
@@ -989,7 +993,7 @@ public class UpdateLog implements Plugin
             try {
               o = reader.next();
               if (o==null) break;
-              
+
               // should currently be a List<Oper,Ver,Doc/Id>
               List entry = (List)o;
 
@@ -1012,13 +1016,13 @@ public class UpdateLog implements Plugin
 
                   updatesForLog.add(update);
                   updates.put(version, update);
-                  
+
                   if (oper == UpdateLog.DELETE_BY_QUERY) {
                     deleteByQueryList.add(update);
                   } else if (oper == UpdateLog.DELETE) {
                     deleteList.add(new DeleteUpdate(version, (byte[])entry.get(2)));
                   }
-                  
+
                   break;
 
                 case UpdateLog.COMMIT:
@@ -1048,7 +1052,7 @@ public class UpdateLog implements Plugin
       }
 
     }
-    
+
     public void close() {
       for (TransactionLog log : logList) {
         log.decref();
@@ -1295,7 +1299,7 @@ public class UpdateLog implements Plugin
 
     public void doReplay(TransactionLog translog) {
       try {
-        loglog.warn("Starting log replay " + translog + " active="+activeLog + " starting pos=" + recoveryInfo.positionOfStart);
+        loglog.warn("Starting log replay " + translog + " active=" + activeLog + " starting pos=" + recoveryInfo.positionOfStart);
         long lastStatusTime = System.nanoTime();
         tlogReader = translog.getReader(recoveryInfo.positionOfStart);
 
@@ -1309,7 +1313,7 @@ public class UpdateLog implements Plugin
         int operationAndFlags = 0;
         long nextCount = 0;
 
-        for(;;) {
+        for (; ; ) {
           Object o = null;
           if (cancelApplyBufferUpdate) break;
           try {
@@ -1321,13 +1325,13 @@ public class UpdateLog implements Plugin
                 long cpos = tlogReader.currentPos();
                 long csize = tlogReader.currentSize();
                 loglog.info(
-                        "log replay status {} active={} starting pos={} current pos={} current size={} % read={}",
-                        translog, activeLog, recoveryInfo.positionOfStart, cpos, csize,
-                        Math.round(cpos / (double) csize * 100.));
-                
+                    "log replay status {} active={} starting pos={} current pos={} current size={} % read={}",
+                    translog, activeLog, recoveryInfo.positionOfStart, cpos, csize,
+                    Math.round(cpos / (double) csize * 100.));
+
               }
             }
-            
+
             o = null;
             o = tlogReader.next();
             if (o == null && activeLog) {
@@ -1352,7 +1356,7 @@ public class UpdateLog implements Plugin
               }
             }
           } catch (Exception e) {
-            SolrException.log(log,e);
+            SolrException.log(log, e);
           }
 
           if (o == null) break;
@@ -1360,62 +1364,58 @@ public class UpdateLog implements Plugin
           try {
 
             // should currently be a List<Oper,Ver,Doc/Id>
-            List entry = (List)o;
+            List entry = (List) o;
 
-            operationAndFlags = (Integer)entry.get(0);
+            operationAndFlags = (Integer) entry.get(0);
             int oper = operationAndFlags & OPERATION_MASK;
             long version = (Long) entry.get(1);
 
             switch (oper) {
-              case UpdateLog.ADD:
-              {
+              case UpdateLog.ADD: {
                 recoveryInfo.adds++;
                 // byte[] idBytes = (byte[]) entry.get(2);
-                SolrInputDocument sdoc = (SolrInputDocument)entry.get(entry.size()-1);
+                SolrInputDocument sdoc = (SolrInputDocument) entry.get(entry.size() - 1);
                 AddUpdateCommand cmd = new AddUpdateCommand(req);
                 // cmd.setIndexedId(new BytesRef(idBytes));
                 cmd.solrDoc = sdoc;
                 cmd.setVersion(version);
                 cmd.setFlags(UpdateCommand.REPLAY | UpdateCommand.IGNORE_AUTOCOMMIT);
-                if (debug) log.debug("add " +  cmd);
+                if (debug) log.debug("add " + cmd);
 
                 proc.processAdd(cmd);
                 break;
               }
-              case UpdateLog.DELETE:
-              {
+              case UpdateLog.DELETE: {
                 recoveryInfo.deletes++;
                 byte[] idBytes = (byte[]) entry.get(2);
                 DeleteUpdateCommand cmd = new DeleteUpdateCommand(req);
                 cmd.setIndexedId(new BytesRef(idBytes));
                 cmd.setVersion(version);
                 cmd.setFlags(UpdateCommand.REPLAY | UpdateCommand.IGNORE_AUTOCOMMIT);
-                if (debug) log.debug("delete " +  cmd);
+                if (debug) log.debug("delete " + cmd);
                 proc.processDelete(cmd);
                 break;
               }
 
-              case UpdateLog.DELETE_BY_QUERY:
-              {
+              case UpdateLog.DELETE_BY_QUERY: {
                 recoveryInfo.deleteByQuery++;
-                String query = (String)entry.get(2);
+                String query = (String) entry.get(2);
                 DeleteUpdateCommand cmd = new DeleteUpdateCommand(req);
                 cmd.query = query;
                 cmd.setVersion(version);
                 cmd.setFlags(UpdateCommand.REPLAY | UpdateCommand.IGNORE_AUTOCOMMIT);
-                if (debug) log.debug("deleteByQuery " +  cmd);
+                if (debug) log.debug("deleteByQuery " + cmd);
                 proc.processDelete(cmd);
                 break;
               }
 
-              case UpdateLog.COMMIT:
-              {
+              case UpdateLog.COMMIT: {
                 commitVersion = version;
                 break;
               }
 
               default:
-                throw new SolrException(SolrException.ErrorCode.SERVER_ERROR,  "Unknown Operation! " + oper);
+                throw new SolrException(SolrException.ErrorCode.SERVER_ERROR, "Unknown Operation! " + oper);
             }
 
             if (rsp.getException() != null) {
@@ -1430,7 +1430,7 @@ public class UpdateLog implements Plugin
             recoveryInfo.errors++;
             loglog.warn("REPLAY_ERR: Unexpected log entry or corrupt log.  Entry=" + o, cl);
             // would be caused by a corrupt transaction log
-          }  catch (SolrException ex) {
+          } catch (SolrException ex) {
             if (ex.code() == ErrorCode.SERVICE_UNAVAILABLE.code) {
               throw ex;
             }
@@ -1450,7 +1450,7 @@ public class UpdateLog implements Plugin
         cmd.waitSearcher = true;
         cmd.setFlags(UpdateCommand.REPLAY);
         try {
-          if (debug) log.debug("commit " +  cmd);
+          if (debug) log.debug("commit " + cmd);
           uhandler.commit(cmd);          // this should cause a commit to be added to the incomplete log and avoid it being replayed again after a restart.
         } catch (IOException ex) {
           recoveryInfo.errors++;
@@ -1506,25 +1506,25 @@ public class UpdateLog implements Plugin
       }
     }
   }
-  
+
   protected String getTlogDir(SolrCore core, PluginInfo info) {
     String dataDir = (String) info.initArgs.get("dir");
-    
+
     String ulogDir = core.getCoreDescriptor().getUlogDir();
     if (ulogDir != null) {
       dataDir = ulogDir;
     }
-    
+
     if (dataDir == null || dataDir.length() == 0) {
       dataDir = core.getDataDir();
     }
 
     return dataDir + "/" + TLOG_NAME;
   }
-  
+
   /**
    * Clears the logs on the file system. Only call before init.
-   * 
+   *
    * @param core the SolrCore
    * @param ulogPluginInfo the init info for the UpdateHandler
    */

Added: lucene/dev/trunk/solr/core/src/java/org/apache/solr/update/processor/CdcrUpdateProcessor.java
URL: http://svn.apache.org/viewvc/lucene/dev/trunk/solr/core/src/java/org/apache/solr/update/processor/CdcrUpdateProcessor.java?rev=1681186&view=auto
==============================================================================
--- lucene/dev/trunk/solr/core/src/java/org/apache/solr/update/processor/CdcrUpdateProcessor.java (added)
+++ lucene/dev/trunk/solr/core/src/java/org/apache/solr/update/processor/CdcrUpdateProcessor.java Fri May 22 18:58:29 2015
@@ -0,0 +1,125 @@
+package org.apache.solr.update.processor;
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+import java.io.IOException;
+
+import org.apache.solr.common.params.ModifiableSolrParams;
+import org.apache.solr.common.params.SolrParams;
+import org.apache.solr.request.SolrQueryRequest;
+import org.apache.solr.response.SolrQueryResponse;
+import org.apache.solr.update.AddUpdateCommand;
+import org.apache.solr.update.DeleteUpdateCommand;
+import org.apache.solr.update.UpdateCommand;
+
+/**
+ * <p>
+ * Extends {@link org.apache.solr.update.processor.DistributedUpdateProcessor} to force peer sync logic
+ * for every updates. This ensures that the version parameter sent by the source cluster is kept
+ * by the target cluster.
+ * </p>
+ */
+public class CdcrUpdateProcessor extends DistributedUpdateProcessor {
+
+  public static final String CDCR_UPDATE = "cdcr.update";
+
+  public CdcrUpdateProcessor(SolrQueryRequest req, SolrQueryResponse rsp, UpdateRequestProcessor next) {
+    super(req, rsp, next);
+  }
+
+  @Override
+  protected boolean versionAdd(AddUpdateCommand cmd) throws IOException {
+    /*
+    temporarily set the PEER_SYNC flag so that DistributedUpdateProcessor.versionAdd doesn't execute leader logic
+    but the else part of that if. That way version remains preserved.
+
+    we cannot set the flag for the whole processAdd method because DistributedUpdateProcessor.setupRequest() would set
+    isLeader to false which wouldn't work
+     */
+    if (cmd.getReq().getParams().get(CDCR_UPDATE) != null) {
+      cmd.setFlags(cmd.getFlags() | UpdateCommand.PEER_SYNC); // we need super.versionAdd() to set leaderLogic to false
+    }
+
+    boolean result = super.versionAdd(cmd);
+
+    // unset the flag to avoid unintended consequences down the chain
+    if (cmd.getReq().getParams().get(CDCR_UPDATE) != null) {
+      cmd.setFlags(cmd.getFlags() & ~UpdateCommand.PEER_SYNC);
+    }
+
+    return result;
+  }
+
+  @Override
+  protected boolean versionDelete(DeleteUpdateCommand cmd) throws IOException {
+    /*
+    temporarily set the PEER_SYNC flag so that DistributedUpdateProcessor.deleteAdd doesn't execute leader logic
+    but the else part of that if. That way version remains preserved.
+
+    we cannot set the flag for the whole processDelete method because DistributedUpdateProcessor.setupRequest() would set
+    isLeader to false which wouldn't work
+     */
+    if (cmd.getReq().getParams().get(CDCR_UPDATE) != null) {
+      cmd.setFlags(cmd.getFlags() | UpdateCommand.PEER_SYNC); // we need super.versionAdd() to set leaderLogic to false
+    }
+
+    boolean result = super.versionDelete(cmd);
+
+    // unset the flag to avoid unintended consequences down the chain
+    if (cmd.getReq().getParams().get(CDCR_UPDATE) != null) {
+      cmd.setFlags(cmd.getFlags() & ~UpdateCommand.PEER_SYNC);
+    }
+
+    return result;
+  }
+
+  protected ModifiableSolrParams filterParams(SolrParams params) {
+    ModifiableSolrParams result = super.filterParams(params);
+    if (params.get(CDCR_UPDATE) != null) {
+      result.set(CDCR_UPDATE, "");
+      if (params.get(DistributedUpdateProcessor.VERSION_FIELD) == null) {
+        log.warn("+++ cdcr.update but no version field, params are: " + params);
+      } else {
+        log.info("+++ cdcr.update version present, params are: " + params);
+      }
+      result.set(DistributedUpdateProcessor.VERSION_FIELD, params.get(DistributedUpdateProcessor.VERSION_FIELD));
+    }
+
+    return result;
+  }
+
+  @Override
+  protected void versionDeleteByQuery(DeleteUpdateCommand cmd) throws IOException {
+    /*
+    temporarily set the PEER_SYNC flag so that DistributedUpdateProcessor.versionDeleteByQuery doesn't execute leader logic
+    That way version remains preserved.
+
+     */
+    if (cmd.getReq().getParams().get(CDCR_UPDATE) != null) {
+      cmd.setFlags(cmd.getFlags() | UpdateCommand.PEER_SYNC); // we need super.versionDeleteByQuery() to set leaderLogic to false
+    }
+
+    super.versionDeleteByQuery(cmd);
+
+    // unset the flag to avoid unintended consequences down the chain
+    if (cmd.getReq().getParams().get(CDCR_UPDATE) != null) {
+      cmd.setFlags(cmd.getFlags() & ~UpdateCommand.PEER_SYNC);
+    }
+  }
+}
+

Added: lucene/dev/trunk/solr/core/src/java/org/apache/solr/update/processor/CdcrUpdateProcessorFactory.java
URL: http://svn.apache.org/viewvc/lucene/dev/trunk/solr/core/src/java/org/apache/solr/update/processor/CdcrUpdateProcessorFactory.java?rev=1681186&view=auto
==============================================================================
--- lucene/dev/trunk/solr/core/src/java/org/apache/solr/update/processor/CdcrUpdateProcessorFactory.java (added)
+++ lucene/dev/trunk/solr/core/src/java/org/apache/solr/update/processor/CdcrUpdateProcessorFactory.java Fri May 22 18:58:29 2015
@@ -0,0 +1,46 @@
+package org.apache.solr.update.processor;
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+import org.apache.solr.common.util.NamedList;
+import org.apache.solr.request.SolrQueryRequest;
+import org.apache.solr.response.SolrQueryResponse;
+
+/**
+ * Factory for {@link org.apache.solr.update.processor.CdcrUpdateProcessor}.
+ *
+ * @see org.apache.solr.update.processor.CdcrUpdateProcessor
+ */
+public class CdcrUpdateProcessorFactory
+    extends UpdateRequestProcessorFactory
+    implements DistributingUpdateProcessorFactory {
+
+  @Override
+  public void init(NamedList args) {
+
+  }
+
+  @Override
+  public CdcrUpdateProcessor getInstance(SolrQueryRequest req,
+                                         SolrQueryResponse rsp, UpdateRequestProcessor next) {
+
+    return new CdcrUpdateProcessor(req, rsp, next);
+  }
+
+}
+

Modified: lucene/dev/trunk/solr/core/src/java/org/apache/solr/update/processor/DistributedUpdateProcessor.java
URL: http://svn.apache.org/viewvc/lucene/dev/trunk/solr/core/src/java/org/apache/solr/update/processor/DistributedUpdateProcessor.java?rev=1681186&r1=1681185&r2=1681186&view=diff
==============================================================================
--- lucene/dev/trunk/solr/core/src/java/org/apache/solr/update/processor/DistributedUpdateProcessor.java (original)
+++ lucene/dev/trunk/solr/core/src/java/org/apache/solr/update/processor/DistributedUpdateProcessor.java Fri May 22 18:58:29 2015
@@ -964,7 +964,7 @@ public class DistributedUpdateProcessor
    * @return whether or not to drop this cmd
    * @throws IOException If there is a low-level I/O error.
    */
-  private boolean versionAdd(AddUpdateCommand cmd) throws IOException {
+  protected boolean versionAdd(AddUpdateCommand cmd) throws IOException {
     BytesRef idBytes = cmd.getIndexedId();
 
     if (idBytes == null) {
@@ -1226,7 +1226,7 @@ public class DistributedUpdateProcessor
     }
   }
 
-  private ModifiableSolrParams filterParams(SolrParams params) {
+  protected ModifiableSolrParams filterParams(SolrParams params) {
     ModifiableSolrParams fparams = new ModifiableSolrParams();
     passParam(params, fparams, UpdateParams.UPDATE_CHAIN);
     passParam(params, fparams, TEST_DISTRIB_SKIP_SERVERS);
@@ -1333,53 +1333,10 @@ public class DistributedUpdateProcessor
     // at this point, there is an update we need to try and apply.
     // we may or may not be the leader.
 
-    // Find the version
-    long versionOnUpdate = cmd.getVersion();
-    if (versionOnUpdate == 0) {
-      String versionOnUpdateS = req.getParams().get(VERSION_FIELD);
-      versionOnUpdate = versionOnUpdateS == null ? 0 : Long.parseLong(versionOnUpdateS);
-    }
-    versionOnUpdate = Math.abs(versionOnUpdate);  // normalize to positive version
-
     boolean isReplayOrPeersync = (cmd.getFlags() & (UpdateCommand.REPLAY | UpdateCommand.PEER_SYNC)) != 0;
     boolean leaderLogic = isLeader && !isReplayOrPeersync;
 
-    if (!leaderLogic && versionOnUpdate==0) {
-      throw new SolrException(ErrorCode.BAD_REQUEST, "missing _version_ on update from leader");
-    }
-
-    vinfo.blockUpdates();
-    try {
-
-      if (versionsStored) {
-        if (leaderLogic) {
-          long version = vinfo.getNewClock();
-          cmd.setVersion(-version);
-          // TODO update versions in all buckets
-
-          doLocalDelete(cmd);
-
-        } else {
-          cmd.setVersion(-versionOnUpdate);
-
-          if (ulog.getState() != UpdateLog.State.ACTIVE && (cmd.getFlags() & UpdateCommand.REPLAY) == 0) {
-            // we're not in an active state, and this update isn't from a replay, so buffer it.
-            cmd.setFlags(cmd.getFlags() | UpdateCommand.BUFFERING);
-            ulog.deleteByQuery(cmd);
-            return;
-          }
-
-          doLocalDelete(cmd);
-        }
-      }
-
-      // since we don't know which documents were deleted, the easiest thing to do is to invalidate
-      // all real-time caches (i.e. UpdateLog) which involves also getting a new version of the IndexReader
-      // (so cache misses will see up-to-date data)
-
-    } finally {
-      vinfo.unblockUpdates();
-    }
+    versionDeleteByQuery(cmd);
 
     if (zkEnabled)  {
       // forward to all replicas
@@ -1449,6 +1406,56 @@ public class DistributedUpdateProcessor
     }
   }
 
+  protected void versionDeleteByQuery(DeleteUpdateCommand cmd) throws IOException {
+    // Find the version
+    long versionOnUpdate = cmd.getVersion();
+    if (versionOnUpdate == 0) {
+      String versionOnUpdateS = req.getParams().get(VERSION_FIELD);
+      versionOnUpdate = versionOnUpdateS == null ? 0 : Long.parseLong(versionOnUpdateS);
+    }
+    versionOnUpdate = Math.abs(versionOnUpdate);  // normalize to positive version
+
+    boolean isReplayOrPeersync = (cmd.getFlags() & (UpdateCommand.REPLAY | UpdateCommand.PEER_SYNC)) != 0;
+    boolean leaderLogic = isLeader && !isReplayOrPeersync;
+
+    if (!leaderLogic && versionOnUpdate == 0) {
+      throw new SolrException(ErrorCode.BAD_REQUEST, "missing _version_ on update from leader");
+    }
+
+    vinfo.blockUpdates();
+    try {
+
+      if (versionsStored) {
+        if (leaderLogic) {
+          long version = vinfo.getNewClock();
+          cmd.setVersion(-version);
+          // TODO update versions in all buckets
+
+          doLocalDelete(cmd);
+
+        } else {
+          cmd.setVersion(-versionOnUpdate);
+
+          if (ulog.getState() != UpdateLog.State.ACTIVE && (cmd.getFlags() & UpdateCommand.REPLAY) == 0) {
+            // we're not in an active state, and this update isn't from a replay, so buffer it.
+            cmd.setFlags(cmd.getFlags() | UpdateCommand.BUFFERING);
+            ulog.deleteByQuery(cmd);
+            return;
+          }
+
+          doLocalDelete(cmd);
+        }
+      }
+
+      // since we don't know which documents were deleted, the easiest thing to do is to invalidate
+      // all real-time caches (i.e. UpdateLog) which involves also getting a new version of the IndexReader
+      // (so cache misses will see up-to-date data)
+
+    } finally {
+      vinfo.unblockUpdates();
+    }
+  }
+
   // internal helper method to tell if we are the leader for an add or deleteById update
   boolean isLeader(UpdateCommand cmd) {
     updateCommand = cmd;
@@ -1482,7 +1489,7 @@ public class DistributedUpdateProcessor
     throw new SolrException(ErrorCode.SERVICE_UNAVAILABLE, "Cannot talk to ZooKeeper - Updates are disabled.");
   }
 
-  private boolean versionDelete(DeleteUpdateCommand cmd) throws IOException {
+  protected boolean versionDelete(DeleteUpdateCommand cmd) throws IOException {
 
     BytesRef idBytes = cmd.getIndexedId();
 

Modified: lucene/dev/trunk/solr/core/src/java/org/apache/solr/util/SolrCLI.java
URL: http://svn.apache.org/viewvc/lucene/dev/trunk/solr/core/src/java/org/apache/solr/util/SolrCLI.java?rev=1681186&r1=1681185&r2=1681186&view=diff
==============================================================================
--- lucene/dev/trunk/solr/core/src/java/org/apache/solr/util/SolrCLI.java (original)
+++ lucene/dev/trunk/solr/core/src/java/org/apache/solr/util/SolrCLI.java Fri May 22 18:58:29 2015
@@ -144,7 +144,7 @@ public class SolrCLI {
     }
     
     /**
-     * Runs a SolrCloud tool with CloudSolrServer initialized
+     * Runs a SolrCloud tool with CloudSolrClient initialized
      */
     protected abstract int runCloudTool(CloudSolrClient cloudSolrClient, CommandLine cli)
         throws Exception;
@@ -1217,10 +1217,10 @@ public class SolrCLI {
 
       int toolExitStatus = 0;
 
-      try (CloudSolrClient cloudSolrServer = new CloudSolrClient(zkHost)) {
+      try (CloudSolrClient cloudSolrClient = new CloudSolrClient(zkHost)) {
         System.out.println("Connecting to ZooKeeper at " + zkHost);
-        cloudSolrServer.connect();
-        toolExitStatus = runCloudTool(cloudSolrServer, cli);
+        cloudSolrClient.connect();
+        toolExitStatus = runCloudTool(cloudSolrClient, cli);
       } catch (Exception exc) {
         // since this is a CLI, spare the user the stacktrace
         String excMsg = exc.getMessage();

Added: lucene/dev/trunk/solr/core/src/test-files/solr/collection1/conf/solrconfig-cdcr.xml
URL: http://svn.apache.org/viewvc/lucene/dev/trunk/solr/core/src/test-files/solr/collection1/conf/solrconfig-cdcr.xml?rev=1681186&view=auto
==============================================================================
--- lucene/dev/trunk/solr/core/src/test-files/solr/collection1/conf/solrconfig-cdcr.xml (added)
+++ lucene/dev/trunk/solr/core/src/test-files/solr/collection1/conf/solrconfig-cdcr.xml Fri May 22 18:58:29 2015
@@ -0,0 +1,85 @@
+<?xml version="1.0" ?>
+
+<!--
+ Licensed to the Apache Software Foundation (ASF) under one or more
+ contributor license agreements.  See the NOTICE file distributed with
+ this work for additional information regarding copyright ownership.
+ The ASF licenses this file to You under the Apache License, Version 2.0
+ (the "License"); you may not use this file except in compliance with
+ the License.  You may obtain a copy of the License at
+
+     http://www.apache.org/licenses/LICENSE-2.0
+
+ Unless required by applicable law or agreed to in writing, software
+ distributed under the License is distributed on an "AS IS" BASIS,
+ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ See the License for the specific language governing permissions and
+ limitations under the License.
+-->
+
+<config>
+  <jmx/>
+
+  <luceneMatchVersion>${tests.luceneMatchVersion:LATEST}</luceneMatchVersion>
+
+  <directoryFactory name="DirectoryFactory" class="${solr.directoryFactory:solr.RAMDirectoryFactory}">
+    <!-- used to keep RAM reqs down for HdfsDirectoryFactory -->
+    <bool name="solr.hdfs.blockcache.enabled">${solr.hdfs.blockcache.enabled:true}</bool>
+    <int name="solr.hdfs.blockcache.blocksperbank">${solr.hdfs.blockcache.blocksperbank:1024}</int>
+    <str name="solr.hdfs.home">${solr.hdfs.home:}</str>
+    <str name="solr.hdfs.confdir">${solr.hdfs.confdir:}</str>
+    <str name="solr.hdfs.blockcache.global">${solr.hdfs.blockcache.global:false}</str>
+  </directoryFactory>
+
+  <dataDir>${solr.data.dir:}</dataDir>
+
+  <xi:include href="solrconfig.snippet.randomindexconfig.xml" xmlns:xi="http://www.w3.org/2001/XInclude"/>
+
+  <requestHandler name="standard" class="solr.StandardRequestHandler">
+  </requestHandler>
+
+  <requestHandler name="/get" class="solr.RealTimeGetHandler">
+    <lst name="defaults">
+      <str name="omitHeader">true</str>
+    </lst>
+  </requestHandler>
+
+  <requestHandler name="/replication" class="solr.ReplicationHandler" startup="lazy"/>
+
+  <requestHandler name="/update" class="solr.UpdateRequestHandler">
+    <lst name="defaults">
+      <str name="update.chain">cdcr-processor-chain</str>
+    </lst>
+  </requestHandler>
+
+  <updateRequestProcessorChain name="cdcr-processor-chain">
+    <processor class="solr.CdcrUpdateProcessorFactory"/>
+    <processor class="solr.RunUpdateProcessorFactory"/>
+  </updateRequestProcessorChain>
+
+  <requestHandler name="/cdcr" class="solr.CdcrRequestHandler">
+    <lst name="replica">
+      <str name="zkHost">${zkHost}</str>
+      <str name="source">source_collection</str>
+      <str name="target">target_collection</str>
+    </lst>
+    <lst name="replicator">
+      <str name="threadPoolSize">8</str>
+      <str name="schedule">1000</str>
+      <str name="batchSize">64</str>
+    </lst>
+    <lst name="updateLogSynchronizer">
+      <str name="schedule">1000</str>
+    </lst>
+  </requestHandler>
+
+  <updateHandler class="solr.DirectUpdateHandler2">
+    <updateLog class="solr.CdcrUpdateLog">
+      <str name="dir">${solr.ulog.dir:}</str>
+    </updateLog>
+  </updateHandler>
+
+  <requestHandler name="/admin/" class="org.apache.solr.handler.admin.AdminHandlers"/>
+
+</config>
+

Added: lucene/dev/trunk/solr/core/src/test-files/solr/collection1/conf/solrconfig-cdcrupdatelog.xml
URL: http://svn.apache.org/viewvc/lucene/dev/trunk/solr/core/src/test-files/solr/collection1/conf/solrconfig-cdcrupdatelog.xml?rev=1681186&view=auto
==============================================================================
--- lucene/dev/trunk/solr/core/src/test-files/solr/collection1/conf/solrconfig-cdcrupdatelog.xml (added)
+++ lucene/dev/trunk/solr/core/src/test-files/solr/collection1/conf/solrconfig-cdcrupdatelog.xml Fri May 22 18:58:29 2015
@@ -0,0 +1,59 @@
+<?xml version="1.0" ?>
+
+<!--
+ Licensed to the Apache Software Foundation (ASF) under one or more
+ contributor license agreements.  See the NOTICE file distributed with
+ this work for additional information regarding copyright ownership.
+ The ASF licenses this file to You under the Apache License, Version 2.0
+ (the "License"); you may not use this file except in compliance with
+ the License.  You may obtain a copy of the License at
+
+     http://www.apache.org/licenses/LICENSE-2.0
+
+ Unless required by applicable law or agreed to in writing, software
+ distributed under the License is distributed on an "AS IS" BASIS,
+ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ See the License for the specific language governing permissions and
+ limitations under the License.
+-->
+
+<config>
+  <jmx/>
+
+  <luceneMatchVersion>${tests.luceneMatchVersion:LATEST}</luceneMatchVersion>
+
+  <directoryFactory name="DirectoryFactory" class="${solr.directoryFactory:solr.RAMDirectoryFactory}">
+    <!-- used to keep RAM reqs down for HdfsDirectoryFactory -->
+    <bool name="solr.hdfs.blockcache.enabled">${solr.hdfs.blockcache.enabled:true}</bool>
+    <int name="solr.hdfs.blockcache.blocksperbank">${solr.hdfs.blockcache.blocksperbank:1024}</int>
+    <str name="solr.hdfs.home">${solr.hdfs.home:}</str>
+    <str name="solr.hdfs.confdir">${solr.hdfs.confdir:}</str>
+    <str name="solr.hdfs.blockcache.global">${solr.hdfs.blockcache.global:false}</str>
+  </directoryFactory>
+
+  <dataDir>${solr.data.dir:}</dataDir>
+
+  <xi:include href="solrconfig.snippet.randomindexconfig.xml" xmlns:xi="http://www.w3.org/2001/XInclude"/>
+
+  <requestHandler name="standard" class="solr.StandardRequestHandler">
+  </requestHandler>
+
+  <requestHandler name="/get" class="solr.RealTimeGetHandler">
+    <lst name="defaults">
+      <str name="omitHeader">true</str>
+    </lst>
+  </requestHandler>
+
+  <requestHandler name="/replication" class="solr.ReplicationHandler" startup="lazy"/>
+
+  <requestHandler name="/update" class="solr.UpdateRequestHandler"/>
+
+  <updateHandler class="solr.DirectUpdateHandler2">
+    <updateLog class="solr.CdcrUpdateLog">
+      <str name="dir">${solr.ulog.dir:}</str>
+    </updateLog>
+  </updateHandler>
+
+  <requestHandler name="/admin/" class="org.apache.solr.handler.admin.AdminHandlers"/>
+
+</config>