You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@lucene.apache.org by er...@apache.org on 2015/05/22 20:58:29 UTC
svn commit: r1681186 [3/5] - in /lucene/dev/trunk/solr: ./
core/src/java/org/apache/solr/handler/ core/src/java/org/apache/solr/update/
core/src/java/org/apache/solr/update/processor/
core/src/java/org/apache/solr/util/ core/src/test-files/solr/collect...
Added: lucene/dev/trunk/solr/core/src/java/org/apache/solr/update/CdcrUpdateLog.java
URL: http://svn.apache.org/viewvc/lucene/dev/trunk/solr/core/src/java/org/apache/solr/update/CdcrUpdateLog.java?rev=1681186&view=auto
==============================================================================
--- lucene/dev/trunk/solr/core/src/java/org/apache/solr/update/CdcrUpdateLog.java (added)
+++ lucene/dev/trunk/solr/core/src/java/org/apache/solr/update/CdcrUpdateLog.java Fri May 22 18:58:29 2015
@@ -0,0 +1,560 @@
+package org.apache.solr.update;
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+import java.io.File;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.List;
+import java.util.Locale;
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.LinkedBlockingDeque;
+
+import org.apache.solr.common.SolrException;
+import org.apache.solr.core.SolrCore;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * An extension of the {@link org.apache.solr.update.UpdateLog} for the CDCR scenario.<br>
+ * Compared to the original update log implementation, transaction logs are removed based on
+ * pointers instead of a fixed size limit. Pointers are created by the CDC replicators and
+ * correspond to replication checkpoints. If all pointers are ahead of a transaction log,
+ * this transaction log is removed.<br>
+ * Given that the number of transaction logs can become considerable if some pointers are
+ * lagging behind, the {@link org.apache.solr.update.CdcrUpdateLog.CdcrLogReader} provides
+ * a {@link org.apache.solr.update.CdcrUpdateLog.CdcrLogReader#seek(long)} method to
+ * efficiently lookup a particular transaction log file given a version number.
+ */
+public class CdcrUpdateLog extends UpdateLog {
+
+ protected final Map<CdcrLogReader, CdcrLogPointer> logPointers = new ConcurrentHashMap<>();
+
+ /**
+ * A reader that will be used as toggle to turn on/off the buffering of tlogs
+ */
+ private CdcrLogReader bufferToggle;
+
+ public static String LOG_FILENAME_PATTERN = "%s.%019d.%1d";
+
+ protected static Logger log = LoggerFactory.getLogger(CdcrUpdateLog.class);
+
+ @Override
+ public void init(UpdateHandler uhandler, SolrCore core) {
+ // remove dangling readers
+ for (CdcrLogReader reader : logPointers.keySet()) {
+ reader.close();
+ }
+ logPointers.clear();
+
+ // init
+ super.init(uhandler, core);
+ }
+
+ @Override
+ public TransactionLog newTransactionLog(File tlogFile, Collection<String> globalStrings, boolean openExisting) {
+ return new CdcrTransactionLog(tlogFile, globalStrings, openExisting);
+ }
+
+ @Override
+ protected void addOldLog(TransactionLog oldLog, boolean removeOld) {
+ if (oldLog == null) return;
+
+ numOldRecords += oldLog.numRecords();
+
+ int currRecords = numOldRecords;
+
+ if (oldLog != tlog && tlog != null) {
+ currRecords += tlog.numRecords();
+ }
+
+ while (removeOld && logs.size() > 0) {
+ TransactionLog log = logs.peekLast();
+ int nrec = log.numRecords();
+
+ // remove oldest log if we don't need it to keep at least numRecordsToKeep, or if
+ // we already have the limit of 10 log files.
+ if (currRecords - nrec >= numRecordsToKeep || logs.size() >= 10) {
+ // remove the oldest log if nobody points to it
+ if (!this.hasLogPointer(log)) {
+ currRecords -= nrec;
+ numOldRecords -= nrec;
+ TransactionLog last = logs.removeLast();
+ last.deleteOnClose = true;
+ last.close(); // it will be deleted if no longer in use
+ continue;
+ }
+ // we have one log with one pointer, we should stop removing logs
+ break;
+ }
+
+ break;
+ }
+
+ // Decref old log as we do not write to it anymore
+ // If the oldlog is uncapped, i.e., a write commit has to be performed
+ // during recovery, the output stream will be automatically re-open when
+ // TransaactionLog#incref will be called.
+ oldLog.deleteOnClose = false;
+ oldLog.decref();
+
+ // don't incref... we are taking ownership from the caller.
+ logs.addFirst(oldLog);
+ }
+
+ /**
+ * Checks if one of the log pointer is pointing to the given tlog.
+ */
+ private boolean hasLogPointer(TransactionLog tlog) {
+ for (CdcrLogPointer pointer : logPointers.values()) {
+ // if we have a pointer that is not initialised, then do not remove the old tlogs
+ // as we have a log reader that didn't pick them up yet.
+ if (!pointer.isInitialised()) {
+ return true;
+ }
+
+ if (pointer.tlogFile == tlog.tlogFile) {
+ return true;
+ }
+ }
+ return false;
+ }
+
+ @Override
+ public long getLastLogId() {
+ if (id != -1) return id;
+ if (tlogFiles.length == 0) return -1;
+ String last = tlogFiles[tlogFiles.length - 1];
+ return Long.parseLong(last.substring(TLOG_NAME.length() + 1, last.lastIndexOf('.')));
+ }
+
+ @Override
+ public void add(AddUpdateCommand cmd, boolean clearCaches) {
+ // Ensure we create a new tlog file following our filename format,
+ // the variable tlog will be not null, and the ensureLog of the parent will be skipped
+ synchronized (this) {
+ if ((cmd.getFlags() & UpdateCommand.REPLAY) == 0) {
+ ensureLog(cmd.getVersion());
+ }
+ }
+ // Then delegate to parent method
+ super.add(cmd, clearCaches);
+ }
+
+ @Override
+ public void delete(DeleteUpdateCommand cmd) {
+ // Ensure we create a new tlog file following our filename format
+ // the variable tlog will be not null, and the ensureLog of the parent will be skipped
+ synchronized (this) {
+ if ((cmd.getFlags() & UpdateCommand.REPLAY) == 0) {
+ ensureLog(cmd.getVersion());
+ }
+ }
+ // Then delegate to parent method
+ super.delete(cmd);
+ }
+
+ @Override
+ public void deleteByQuery(DeleteUpdateCommand cmd) {
+ // Ensure we create a new tlog file following our filename format
+ // the variable tlog will be not null, and the ensureLog of the parent will be skipped
+ synchronized (this) {
+ if ((cmd.getFlags() & UpdateCommand.REPLAY) == 0) {
+ ensureLog(cmd.getVersion());
+ }
+ }
+ // Then delegate to parent method
+ super.deleteByQuery(cmd);
+ }
+
+ /**
+ * Creates a new {@link org.apache.solr.update.CdcrUpdateLog.CdcrLogReader}
+ * initialised with the current list of tlogs.
+ */
+ public CdcrLogReader newLogReader() {
+ return new CdcrLogReader(new ArrayList(logs), tlog);
+ }
+
+ /**
+ * Enable the buffering of the tlogs. When buffering is activated, the update logs will not remove any
+ * old transaction log files.
+ */
+ public void enableBuffer() {
+ if (bufferToggle == null) {
+ bufferToggle = this.newLogReader();
+ }
+ }
+
+ /**
+ * Disable the buffering of the tlogs.
+ */
+ public void disableBuffer() {
+ if (bufferToggle != null) {
+ bufferToggle.close();
+ bufferToggle = null;
+ }
+ }
+
+ public CdcrLogReader getBufferToggle() {
+ return bufferToggle;
+ }
+
+ /**
+ * Is the update log buffering the tlogs ?
+ */
+ public boolean isBuffering() {
+ return bufferToggle == null ? false : true;
+ }
+
+ protected void ensureLog(long startVersion) {
+ if (tlog == null) {
+ long absoluteVersion = Math.abs(startVersion); // version is negative for deletes
+ if (tlog == null) {
+ String newLogName = String.format(Locale.ROOT, LOG_FILENAME_PATTERN, TLOG_NAME, id, absoluteVersion);
+ tlog = new CdcrTransactionLog(new File(tlogDir, newLogName), globalStrings);
+ }
+
+ // push the new tlog to the opened readers
+ for (CdcrLogReader reader : logPointers.keySet()) {
+ reader.push(tlog);
+ }
+ }
+ }
+
+ @Override
+ public void close(boolean committed, boolean deleteOnClose) {
+ for (CdcrLogReader reader : new ArrayList<>(logPointers.keySet())) {
+ reader.close();
+ }
+ super.close(committed, deleteOnClose);
+ }
+
+ private static class CdcrLogPointer {
+
+ File tlogFile = null;
+
+ private CdcrLogPointer() {
+ }
+
+ private void set(File tlogFile) {
+ this.tlogFile = tlogFile;
+ }
+
+ private boolean isInitialised() {
+ return tlogFile == null ? false : true;
+ }
+
+ @Override
+ public String toString() {
+ return "CdcrLogPointer(" + tlogFile + ")";
+ }
+
+ }
+
+ public class CdcrLogReader {
+
+ private TransactionLog currentTlog;
+ private TransactionLog.LogReader tlogReader;
+
+ // we need to use a blocking deque because of #getNumberOfRemainingRecords
+ private final LinkedBlockingDeque<TransactionLog> tlogs;
+ private final CdcrLogPointer pointer;
+
+ /**
+ * Used to record the last position of the tlog
+ */
+ private long lastPositionInTLog = 0;
+
+ /**
+ * lastVersion is used to get nextToLastVersion
+ */
+ private long lastVersion = -1;
+
+ /**
+ * nextToLastVersion is communicated by leader to replicas so that they can remove no longer needed tlogs
+ * <p>
+ * nextToLastVersion is used because thanks to {@link #resetToLastPosition()} lastVersion can become the current version
+ */
+ private long nextToLastVersion = -1;
+
+ /**
+ * Used to record the number of records read in the current tlog
+ */
+ private long numRecordsReadInCurrentTlog = 0;
+
+ private CdcrLogReader(List<TransactionLog> tlogs, TransactionLog tlog) {
+ this.tlogs = new LinkedBlockingDeque<>();
+ this.tlogs.addAll(tlogs);
+ if (tlog != null) this.tlogs.push(tlog); // ensure that the tlog being written is pushed
+
+ // Register the pointer in the parent UpdateLog
+ pointer = new CdcrLogPointer();
+ logPointers.put(this, pointer);
+
+ // If the reader is initialised while the updates log is empty, do nothing
+ if ((currentTlog = this.tlogs.peekLast()) != null) {
+ tlogReader = currentTlog.getReader(0);
+ pointer.set(currentTlog.tlogFile);
+ numRecordsReadInCurrentTlog = 0;
+ log.debug("Init new tlog reader for {} - tlogReader = {}", currentTlog.tlogFile, tlogReader);
+ }
+ }
+
+ private void push(TransactionLog tlog) {
+ this.tlogs.push(tlog);
+
+ // The reader was initialised while the update logs was empty, or reader was exhausted previously,
+ // we have to update the current tlog and the associated tlog reader.
+ if (currentTlog == null && !tlogs.isEmpty()) {
+ currentTlog = tlogs.peekLast();
+ tlogReader = currentTlog.getReader(0);
+ pointer.set(currentTlog.tlogFile);
+ numRecordsReadInCurrentTlog = 0;
+ log.debug("Init new tlog reader for {} - tlogReader = {}", currentTlog.tlogFile, tlogReader);
+ }
+ }
+
+ /**
+ * Expert: Instantiate a sub-reader. A sub-reader is used for batch updates. It allows to iterates over the
+ * update logs entries without modifying the state of the parent log reader. If the batch update fails, the state
+ * of the sub-reader is discarded and the state of the parent reader is not modified. If the batch update
+ * is successful, the sub-reader is used to fast forward the parent reader with the method
+ * {@link #forwardSeek(org.apache.solr.update.CdcrUpdateLog.CdcrLogReader)}.
+ */
+ public CdcrLogReader getSubReader() {
+ // Add the last element of the queue to properly initialise the pointer and log reader
+ CdcrLogReader clone = new CdcrLogReader(new ArrayList<TransactionLog>(), this.tlogs.peekLast());
+ clone.tlogs.clear(); // clear queue before copy
+ clone.tlogs.addAll(tlogs); // perform a copy of the list
+ clone.lastPositionInTLog = this.lastPositionInTLog;
+ clone.numRecordsReadInCurrentTlog = this.numRecordsReadInCurrentTlog;
+ clone.lastVersion = this.lastVersion;
+ clone.nextToLastVersion = this.nextToLastVersion;
+
+ // If the update log is not empty, we need to initialise the tlog reader
+ // NB: the tlogReader is equal to null if the update log is empty
+ if (tlogReader != null) {
+ clone.tlogReader.close();
+ clone.tlogReader = currentTlog.getReader(this.tlogReader.currentPos());
+ }
+
+ return clone;
+ }
+
+ /**
+ * Expert: Fast forward this log reader with a log subreader. The subreader will be closed after calling this
+ * method. In order to avoid unexpected results, the log
+ * subreader must be created from this reader with the method {@link #getSubReader()}.
+ */
+ public void forwardSeek(CdcrLogReader subReader) {
+ // If a subreader has a null tlog reader, does nothing
+ // This can happend if a subreader is instantiated from a non-initialised parent reader, or if the subreader
+ // has been closed.
+ if (subReader.tlogReader == null) {
+ return;
+ }
+
+ tlogReader.close(); // close the existing reader, a new one will be created
+ while (this.tlogs.peekLast().id < subReader.tlogs.peekLast().id) {
+ tlogs.removeLast();
+ currentTlog = tlogs.peekLast();
+ }
+ assert this.tlogs.peekLast().id == subReader.tlogs.peekLast().id;
+ this.pointer.set(currentTlog.tlogFile);
+ this.lastPositionInTLog = subReader.lastPositionInTLog;
+ this.numRecordsReadInCurrentTlog = subReader.numRecordsReadInCurrentTlog;
+ this.lastVersion = subReader.lastVersion;
+ this.nextToLastVersion = subReader.nextToLastVersion;
+ this.tlogReader = currentTlog.getReader(subReader.tlogReader.currentPos());
+ }
+
+ /**
+ * Advances to the next log entry in the updates log and returns the log entry itself.
+ * Returns null if there are no more log entries in the updates log.<br>
+ * <p>
+ * <b>NOTE:</b> after the reader has exhausted, you can call again this method since the updates
+ * log might have been updated with new entries.
+ */
+ public Object next() throws IOException, InterruptedException {
+ while (!tlogs.isEmpty()) {
+ lastPositionInTLog = tlogReader.currentPos();
+ Object o = tlogReader.next();
+
+ if (o != null) {
+ pointer.set(currentTlog.tlogFile);
+ nextToLastVersion = lastVersion;
+ lastVersion = getVersion(o);
+ numRecordsReadInCurrentTlog++;
+ return o;
+ }
+
+ if (tlogs.size() > 1) { // if the current tlog is not the newest one, we can advance to the next one
+ tlogReader.close();
+ tlogs.removeLast();
+ currentTlog = tlogs.peekLast();
+ tlogReader = currentTlog.getReader(0);
+ pointer.set(currentTlog.tlogFile);
+ numRecordsReadInCurrentTlog = 0;
+ log.debug("Init new tlog reader for {} - tlogReader = {}", currentTlog.tlogFile, tlogReader);
+ } else {
+ // the only tlog left is the new tlog which is currently being written,
+ // we should not remove it as we have to try to read it again later.
+ return null;
+ }
+ }
+
+ return null;
+ }
+
+ /**
+ * Advances to the first beyond the current whose version number is greater
+ * than or equal to <i>targetVersion</i>.<br>
+ * Returns true if the reader has been advanced. If <i>targetVersion</i> is
+ * greater than the highest version number in the updates log, the reader
+ * has been advanced to the end of the current tlog, and a call to
+ * {@link #next()} will probably return null.<br>
+ * Returns false if <i>targetVersion</i> is lower than the oldest known entry.
+ * In this scenario, it probably means that there is a gap in the updates log.<br>
+ * <p>
+ * <b>NOTE:</b> This method must be called before the first call to {@link #next()}.
+ */
+ public boolean seek(long targetVersion) throws IOException, InterruptedException {
+ Object o;
+ // version is negative for deletes - ensure that we are manipulating absolute version numbers.
+ targetVersion = Math.abs(targetVersion);
+
+ if (tlogs.isEmpty() || !this.seekTLog(targetVersion)) {
+ return false;
+ }
+
+ // now that we might be on the right tlog, iterates over the entries to find the one we are looking for
+ while ((o = this.next()) != null) {
+ if (this.getVersion(o) >= targetVersion) {
+ this.resetToLastPosition();
+ return true;
+ }
+ }
+
+ return true;
+ }
+
+ /**
+ * Seeks the tlog associated to the target version by using the updates log index,
+ * and initialises the log reader to the start of the tlog. Returns true if it was able
+ * to seek the corresponding tlog, false if the <i>targetVersion</i> is lower than the
+ * oldest known entry (which probably indicates a gap).<br>
+ * <p>
+ * <b>NOTE:</b> This method might modify the tlog queue by removing tlogs that are older
+ * than the target version.
+ */
+ private boolean seekTLog(long targetVersion) {
+ // if the target version is lower than the oldest known entry, we have probably a gap.
+ if (targetVersion < ((CdcrTransactionLog) tlogs.peekLast()).startVersion) {
+ return false;
+ }
+
+ // closes existing reader before performing seek and possibly modifying the queue;
+ tlogReader.close();
+
+ // iterates over the queue and removes old tlogs
+ TransactionLog last = null;
+ while (tlogs.size() > 1) {
+ if (((CdcrTransactionLog) tlogs.peekLast()).startVersion >= targetVersion) {
+ break;
+ }
+ last = tlogs.pollLast();
+ }
+
+ // the last tlog removed is the one we look for, add it back to the queue
+ if (last != null) tlogs.addLast(last);
+
+ currentTlog = tlogs.peekLast();
+ tlogReader = currentTlog.getReader(0);
+ pointer.set(currentTlog.tlogFile);
+ numRecordsReadInCurrentTlog = 0;
+
+ return true;
+ }
+
+ /**
+ * Extracts the version number and converts it to its absolute form.
+ */
+ private long getVersion(Object o) {
+ List entry = (List) o;
+ // version is negative for delete, ensure that we are manipulating absolute version numbers
+ return Math.abs((Long) entry.get(1));
+ }
+
+ /**
+ * If called after {@link #next()}, it resets the reader to its last position.
+ */
+ public void resetToLastPosition() {
+ try {
+ if (tlogReader != null) {
+ tlogReader.fis.seek(lastPositionInTLog);
+ numRecordsReadInCurrentTlog--;
+ lastVersion = nextToLastVersion;
+ }
+ } catch (IOException e) {
+ log.error("Failed to seek last position in tlog", e);
+ throw new SolrException(SolrException.ErrorCode.SERVER_ERROR, "Failed to seek last position in tlog", e);
+ }
+ }
+
+ /**
+ * Returns the number of remaining records (including commit but excluding header) to be read in the logs.
+ */
+ public long getNumberOfRemainingRecords() {
+ long numRemainingRecords = 0;
+
+ synchronized (tlogs) {
+ for (TransactionLog tlog : tlogs) {
+ numRemainingRecords += tlog.numRecords() - 1; // minus 1 as the number of records returned by the tlog includes the header
+ }
+ }
+
+ return numRemainingRecords - numRecordsReadInCurrentTlog;
+ }
+
+ /**
+ * Closes streams and remove the associated {@link org.apache.solr.update.CdcrUpdateLog.CdcrLogPointer} from the
+ * parent {@link org.apache.solr.update.CdcrUpdateLog}.
+ */
+ public void close() {
+ if (tlogReader != null) {
+ tlogReader.close();
+ tlogReader = null;
+ currentTlog = null;
+ }
+ tlogs.clear();
+ logPointers.remove(this);
+ }
+
+ /**
+ * Returns the absolute form of the version number of the last entry read. If the current version is equal
+ * to 0 (because of a commit), it will return the next to last version number.
+ */
+ public long getLastVersion() {
+ return lastVersion == 0 ? nextToLastVersion : lastVersion;
+ }
+ }
+
+}
+
Modified: lucene/dev/trunk/solr/core/src/java/org/apache/solr/update/TransactionLog.java
URL: http://svn.apache.org/viewvc/lucene/dev/trunk/solr/core/src/java/org/apache/solr/update/TransactionLog.java?rev=1681186&r1=1681185&r2=1681186&view=diff
==============================================================================
--- lucene/dev/trunk/solr/core/src/java/org/apache/solr/update/TransactionLog.java (original)
+++ lucene/dev/trunk/solr/core/src/java/org/apache/solr/update/TransactionLog.java Fri May 22 18:58:29 2015
@@ -75,7 +75,7 @@ public class TransactionLog {
OutputStream os;
FastOutputStream fos; // all accesses to this stream should be synchronized on "this" (The TransactionLog)
int numRecords;
-
+
protected volatile boolean deleteOnClose = true; // we can delete old tlogs since they are currently only used for real-time-get (and in the future, recovery)
AtomicInteger refcount = new AtomicInteger(1);
@@ -84,7 +84,7 @@ public class TransactionLog {
long snapshot_size;
int snapshot_numRecords;
-
+
// write a BytesRef as a byte array
JavaBinCodec.ObjectResolver resolver = new JavaBinCodec.ObjectResolver() {
@Override
@@ -133,10 +133,8 @@ public class TransactionLog {
}
}
-
}
-
TransactionLog(File tlogFile, Collection<String> globalStrings) {
this(tlogFile, globalStrings, false);
}
@@ -148,6 +146,10 @@ public class TransactionLog {
log.debug("New TransactionLog file=" + tlogFile + ", exists=" + tlogFile.exists() + ", size=" + tlogFile.length() + ", openExisting=" + openExisting);
}
+ // Parse tlog id from the filename
+ String filename = tlogFile.getName();
+ id = Long.parseLong(filename.substring(filename.indexOf('.') + 1, filename.indexOf('.') + 20));
+
this.tlogFile = tlogFile;
raf = new RandomAccessFile(this.tlogFile, "rw");
long start = raf.length();
@@ -214,7 +216,6 @@ public class TransactionLog {
size = fos.size();
}
-
// the end of the file should have the end message (added during a commit) plus a 4 byte size
byte[] buf = new byte[ END_MESSAGE.length() ];
long pos = size - END_MESSAGE.length() - 4;
@@ -234,9 +235,9 @@ public class TransactionLog {
snapshot_size = fos.size();
snapshot_numRecords = numRecords;
return snapshot_size;
- }
+ }
}
-
+
// This could mess with any readers or reverse readers that are open, or anything that might try to do a log lookup.
// This should only be used to roll back buffered updates, not actually applied updates.
public void rollback(long pos) throws IOException {
@@ -250,7 +251,6 @@ public class TransactionLog {
}
}
-
public long writeData(Object o) {
LogCodec codec = new LogCodec(resolver);
try {
@@ -323,7 +323,7 @@ public class TransactionLog {
private void checkWriteHeader(LogCodec codec, SolrInputDocument optional) throws IOException {
- // Unsynchronized access. We can get away with an unsynchronized access here
+ // Unsynchronized access. We can get away with an unsynchronized access here
// since we will never get a false non-zero when the position is in fact 0.
// rollback() is the only function that can reset to zero, and it blocks updates.
if (fos.size() != 0) return;
@@ -454,7 +454,7 @@ public class TransactionLog {
codec.writeStr(END_MESSAGE); // ensure these bytes are (almost) last in the file
endRecord(pos);
-
+
fos.flush(); // flush since this will be the last record in a log fill
assert fos.size() == channel.size();
@@ -561,7 +561,7 @@ public class TransactionLog {
assert ObjectReleaseTracker.release(this);
}
}
-
+
public void forceClose() {
if (refcount.get() > 0) {
log.error("Error: Forcing close of " + this);
@@ -595,7 +595,7 @@ public class TransactionLog {
}
public class LogReader {
- private ChannelFastInputStream fis;
+ protected ChannelFastInputStream fis;
private LogCodec codec = new LogCodec(resolver);
public LogReader(long startingPos) {
@@ -614,7 +614,6 @@ public class TransactionLog {
public Object next() throws IOException, InterruptedException {
long pos = fis.position();
-
synchronized (TransactionLog.this) {
if (trace) {
log.trace("Reading log record. pos="+pos+" currentSize="+fos.size());
@@ -664,7 +663,7 @@ public class TransactionLog {
public long currentPos() {
return fis.position();
}
-
+
// returns best effort current size
// for info purposes
public long currentSize() throws IOException {
@@ -675,8 +674,6 @@ public class TransactionLog {
public abstract class ReverseReader {
-
-
/** Returns the next object from the log, or null if none available.
*
* @return The log record, or null if EOF
@@ -691,9 +688,8 @@ public class TransactionLog {
@Override
public abstract String toString() ;
-
}
-
+
public class FSReverseReader extends ReverseReader {
ChannelFastInputStream fis;
private LogCodec codec = new LogCodec(resolver) {
@@ -727,7 +723,6 @@ public class TransactionLog {
}
}
-
/** Returns the next object from the log, or null if none available.
*
* @return The log record, or null if EOF
@@ -835,7 +830,7 @@ class ChannelFastInputStream extends Fas
public void close() throws IOException {
ch.close();
}
-
+
@Override
public String toString() {
return "readFromStream="+readFromStream +" pos="+pos +" end="+end + " bufferPos="+getBufferPos() + " position="+position() ;
Modified: lucene/dev/trunk/solr/core/src/java/org/apache/solr/update/UpdateHandler.java
URL: http://svn.apache.org/viewvc/lucene/dev/trunk/solr/core/src/java/org/apache/solr/update/UpdateHandler.java?rev=1681186&r1=1681185&r2=1681186&view=diff
==============================================================================
--- lucene/dev/trunk/solr/core/src/java/org/apache/solr/update/UpdateHandler.java (original)
+++ lucene/dev/trunk/solr/core/src/java/org/apache/solr/update/UpdateHandler.java Fri May 22 18:58:29 2015
@@ -82,8 +82,8 @@ public abstract class UpdateHandler impl
for (SolrEventListener listener : softCommitCallbacks) {
listener.postSoftCommit();
}
- }
-
+ }
+
protected void callPostOptimizeCallbacks() {
for (SolrEventListener listener : optimizeCallbacks) {
listener.postCommit();
@@ -93,18 +93,18 @@ public abstract class UpdateHandler impl
public UpdateHandler(SolrCore core) {
this(core, null);
}
-
+
public UpdateHandler(SolrCore core, UpdateLog updateLog) {
this.core=core;
idField = core.getLatestSchema().getUniqueKeyField();
idFieldType = idField!=null ? idField.getType() : null;
parseEventListeners();
PluginInfo ulogPluginInfo = core.getSolrConfig().getPluginInfo(UpdateLog.class.getName());
-
+
if (updateLog == null && ulogPluginInfo != null && ulogPluginInfo.isEnabled()) {
String dataDir = (String)ulogPluginInfo.initArgs.get("dir");
-
+
String ulogDir = core.getCoreDescriptor().getUlogDir();
if (ulogDir != null) {
dataDir = ulogDir;
@@ -112,7 +112,7 @@ public abstract class UpdateHandler impl
if (dataDir == null || dataDir.length()==0) {
dataDir = core.getDataDir();
}
-
+
if (dataDir != null && dataDir.startsWith("hdfs:/")) {
DirectoryFactory dirFactory = core.getDirectoryFactory();
if (dirFactory instanceof HdfsDirectoryFactory) {
@@ -120,17 +120,18 @@ public abstract class UpdateHandler impl
} else {
ulog = new HdfsUpdateLog();
}
-
+
} else {
- ulog = new UpdateLog();
+ String className = ulogPluginInfo.className == null ? UpdateLog.class.getName() : ulogPluginInfo.className;
+ ulog = core.getResourceLoader().newInstance(className, UpdateLog.class);
}
-
+
if (!core.isReloaded() && !core.getDirectoryFactory().isPersistent()) {
ulog.clearLog(core, ulogPluginInfo);
}
-
+
log.info("Using UpdateLog implementation: " + ulog.getClass().getName());
-
+
ulog.init(ulogPluginInfo);
ulog.init(this, core);
@@ -144,9 +145,9 @@ public abstract class UpdateHandler impl
/**
* Called when the Writer should be opened again - eg when replication replaces
* all of the index files.
- *
+ *
* @param rollback IndexWriter if true else close
- *
+ *
* @throws IOException If there is a low-level I/O error.
*/
public abstract void newIndexWriter(boolean rollback) throws IOException;
@@ -173,7 +174,7 @@ public abstract class UpdateHandler impl
{
commitCallbacks.add( listener );
}
-
+
/**
* NOTE: this function is not thread safe. However, it is safe to call within the
* <code>inform( SolrCore core )</code> function for <code>SolrCoreAware</code> classes.
Modified: lucene/dev/trunk/solr/core/src/java/org/apache/solr/update/UpdateLog.java
URL: http://svn.apache.org/viewvc/lucene/dev/trunk/solr/core/src/java/org/apache/solr/update/UpdateLog.java?rev=1681186&r1=1681185&r2=1681186&view=diff
==============================================================================
--- lucene/dev/trunk/solr/core/src/java/org/apache/solr/update/UpdateLog.java (original)
+++ lucene/dev/trunk/solr/core/src/java/org/apache/solr/update/UpdateLog.java Fri May 22 18:58:29 2015
@@ -17,9 +17,6 @@
package org.apache.solr.update;
-import static org.apache.solr.update.processor.DistributedUpdateProcessor.DistribPhase.FROMLEADER;
-import static org.apache.solr.update.processor.DistributingUpdateProcessorFactory.DISTRIB_UPDATE_PARAM;
-
import java.io.File;
import java.io.FileNotFoundException;
import java.io.FilenameFilter;
@@ -66,6 +63,9 @@ import org.apache.solr.util.plugin.Plugi
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import static org.apache.solr.update.processor.DistributedUpdateProcessor.DistribPhase.FROMLEADER;
+import static org.apache.solr.update.processor.DistributingUpdateProcessorFactory.DISTRIB_UPDATE_PARAM;
+
/** @lucene.experimental */
public class UpdateLog implements PluginInfoInitialized {
@@ -138,7 +138,7 @@ public class UpdateLog implements Plugin
protected Map<BytesRef,LogPtr> prevMap; // used while committing/reopening is happening
protected Map<BytesRef,LogPtr> prevMap2; // used while committing/reopening is happening
protected TransactionLog prevMapLog; // the transaction log used to look up entries found in prevMap
- protected TransactionLog prevMapLog2; // the transaction log used to look up entries found in prevMap
+ protected TransactionLog prevMapLog2; // the transaction log used to look up entries found in prevMap2
protected final int numDeletesToKeep = 1000;
protected final int numDeletesByQueryToKeep = 100;
@@ -281,12 +281,12 @@ public class UpdateLog implements Plugin
if (debug) {
log.debug("UpdateHandler init: tlogDir=" + tlogDir + ", existing tlogs=" + Arrays.asList(tlogFiles) + ", next id=" + id);
}
-
+
TransactionLog oldLog = null;
for (String oldLogName : tlogFiles) {
File f = new File(tlogDir, oldLogName);
try {
- oldLog = new TransactionLog( f, null, true );
+ oldLog = newTransactionLog(f, null, true);
addOldLog(oldLog, false); // don't remove old logs on startup since more than one may be uncapped.
} catch (Exception e) {
SolrException.log(log, "Failure to open existing log file (non fatal) " + f, e);
@@ -335,11 +335,19 @@ public class UpdateLog implements Plugin
}
}
-
+
+ /**
+ * Returns a new {@link org.apache.solr.update.TransactionLog}. Sub-classes can override this method to
+ * change the implementation of the transaction log.
+ */
+ public TransactionLog newTransactionLog(File tlogFile, Collection<String> globalStrings, boolean openExisting) {
+ return new TransactionLog(tlogFile, globalStrings, openExisting);
+ }
+
public String getLogDir() {
return tlogDir.getAbsolutePath();
}
-
+
public List<Long> getStartingVersions() {
return startingVersions;
}
@@ -381,7 +389,6 @@ public class UpdateLog implements Plugin
logs.addFirst(oldLog);
}
-
public String[] getLogList(File directory) {
final String prefix = TLOG_NAME+'.';
String[] names = directory.list(new FilenameFilter() {
@@ -397,20 +404,17 @@ public class UpdateLog implements Plugin
return names;
}
-
public long getLastLogId() {
if (id != -1) return id;
if (tlogFiles.length == 0) return -1;
String last = tlogFiles[tlogFiles.length-1];
- return Long.parseLong(last.substring(TLOG_NAME.length()+1));
+ return Long.parseLong(last.substring(TLOG_NAME.length() + 1));
}
-
public void add(AddUpdateCommand cmd) {
add(cmd, false);
}
-
public void add(AddUpdateCommand cmd, boolean clearCaches) {
// don't log if we are replaying from another log
// TODO: we currently need to log to maintain correct versioning, rtg, etc
@@ -621,7 +625,7 @@ public class UpdateLog implements Plugin
public boolean hasUncommittedChanges() {
return tlog != null;
}
-
+
public void preCommit(CommitUpdateCommand cmd) {
synchronized (this) {
if (debug) {
@@ -718,13 +722,13 @@ public class UpdateLog implements Plugin
// SolrCore.verbose("TLOG: lookup: for id ",indexedId.utf8ToString(),"in map",System.identityHashCode(map),"got",entry,"lookupLog=",lookupLog);
if (entry == null && prevMap != null) {
entry = prevMap.get(indexedId);
- // something found in prevMap will always be found in preMapLog (which could be tlog or prevTlog)
+ // something found in prevMap will always be found in prevMapLog (which could be tlog or prevTlog)
lookupLog = prevMapLog;
// SolrCore.verbose("TLOG: lookup: for id ",indexedId.utf8ToString(),"in prevMap",System.identityHashCode(map),"got",entry,"lookupLog=",lookupLog);
}
if (entry == null && prevMap2 != null) {
entry = prevMap2.get(indexedId);
- // something found in prevMap2 will always be found in preMapLog2 (which could be tlog or prevTlog)
+ // something found in prevMap2 will always be found in prevMapLog2 (which could be tlog or prevTlog)
lookupLog = prevMapLog2;
// SolrCore.verbose("TLOG: lookup: for id ",indexedId.utf8ToString(),"in prevMap2",System.identityHashCode(map),"got",entry,"lookupLog=",lookupLog);
}
@@ -758,13 +762,13 @@ public class UpdateLog implements Plugin
// SolrCore.verbose("TLOG: lookup ver: for id ",indexedId.utf8ToString(),"in map",System.identityHashCode(map),"got",entry,"lookupLog=",lookupLog);
if (entry == null && prevMap != null) {
entry = prevMap.get(indexedId);
- // something found in prevMap will always be found in preMapLog (which could be tlog or prevTlog)
+ // something found in prevMap will always be found in prevMapLog (which could be tlog or prevTlog)
lookupLog = prevMapLog;
// SolrCore.verbose("TLOG: lookup ver: for id ",indexedId.utf8ToString(),"in prevMap",System.identityHashCode(map),"got",entry,"lookupLog=",lookupLog);
}
if (entry == null && prevMap2 != null) {
entry = prevMap2.get(indexedId);
- // something found in prevMap2 will always be found in preMapLog2 (which could be tlog or prevTlog)
+ // something found in prevMap2 will always be found in prevMapLog2 (which could be tlog or prevTlog)
lookupLog = prevMapLog2;
// SolrCore.verbose("TLOG: lookup ver: for id ",indexedId.utf8ToString(),"in prevMap2",System.identityHashCode(map),"got",entry,"lookupLog=",lookupLog);
}
@@ -860,7 +864,7 @@ public class UpdateLog implements Plugin
protected void ensureLog() {
if (tlog == null) {
String newLogName = String.format(Locale.ROOT, LOG_FILENAME_PATTERN, TLOG_NAME, id);
- tlog = new TransactionLog(new File(tlogDir, newLogName), globalStrings);
+ tlog = newTransactionLog(new File(tlogDir, newLogName), globalStrings, false);
}
}
@@ -879,11 +883,11 @@ public class UpdateLog implements Plugin
theLog.forceClose();
}
}
-
+
public void close(boolean committed) {
close(committed, false);
}
-
+
public void close(boolean committed, boolean deleteOnClose) {
synchronized (this) {
recoveryExecutor.shutdown(); // no new tasks
@@ -924,7 +928,7 @@ public class UpdateLog implements Plugin
this.id = id;
}
}
-
+
public class RecentUpdates {
Deque<TransactionLog> logList; // newest first
List<List<Update>> updateList;
@@ -935,17 +939,17 @@ public class UpdateLog implements Plugin
public List<Long> getVersions(int n) {
List<Long> ret = new ArrayList(n);
-
+
for (List<Update> singleList : updateList) {
for (Update ptr : singleList) {
ret.add(ptr.version);
if (--n <= 0) return ret;
}
}
-
+
return ret;
}
-
+
public Object lookup(long version) {
Update update = updates.get(version);
if (update == null) return null;
@@ -989,7 +993,7 @@ public class UpdateLog implements Plugin
try {
o = reader.next();
if (o==null) break;
-
+
// should currently be a List<Oper,Ver,Doc/Id>
List entry = (List)o;
@@ -1012,13 +1016,13 @@ public class UpdateLog implements Plugin
updatesForLog.add(update);
updates.put(version, update);
-
+
if (oper == UpdateLog.DELETE_BY_QUERY) {
deleteByQueryList.add(update);
} else if (oper == UpdateLog.DELETE) {
deleteList.add(new DeleteUpdate(version, (byte[])entry.get(2)));
}
-
+
break;
case UpdateLog.COMMIT:
@@ -1048,7 +1052,7 @@ public class UpdateLog implements Plugin
}
}
-
+
public void close() {
for (TransactionLog log : logList) {
log.decref();
@@ -1295,7 +1299,7 @@ public class UpdateLog implements Plugin
public void doReplay(TransactionLog translog) {
try {
- loglog.warn("Starting log replay " + translog + " active="+activeLog + " starting pos=" + recoveryInfo.positionOfStart);
+ loglog.warn("Starting log replay " + translog + " active=" + activeLog + " starting pos=" + recoveryInfo.positionOfStart);
long lastStatusTime = System.nanoTime();
tlogReader = translog.getReader(recoveryInfo.positionOfStart);
@@ -1309,7 +1313,7 @@ public class UpdateLog implements Plugin
int operationAndFlags = 0;
long nextCount = 0;
- for(;;) {
+ for (; ; ) {
Object o = null;
if (cancelApplyBufferUpdate) break;
try {
@@ -1321,13 +1325,13 @@ public class UpdateLog implements Plugin
long cpos = tlogReader.currentPos();
long csize = tlogReader.currentSize();
loglog.info(
- "log replay status {} active={} starting pos={} current pos={} current size={} % read={}",
- translog, activeLog, recoveryInfo.positionOfStart, cpos, csize,
- Math.round(cpos / (double) csize * 100.));
-
+ "log replay status {} active={} starting pos={} current pos={} current size={} % read={}",
+ translog, activeLog, recoveryInfo.positionOfStart, cpos, csize,
+ Math.round(cpos / (double) csize * 100.));
+
}
}
-
+
o = null;
o = tlogReader.next();
if (o == null && activeLog) {
@@ -1352,7 +1356,7 @@ public class UpdateLog implements Plugin
}
}
} catch (Exception e) {
- SolrException.log(log,e);
+ SolrException.log(log, e);
}
if (o == null) break;
@@ -1360,62 +1364,58 @@ public class UpdateLog implements Plugin
try {
// should currently be a List<Oper,Ver,Doc/Id>
- List entry = (List)o;
+ List entry = (List) o;
- operationAndFlags = (Integer)entry.get(0);
+ operationAndFlags = (Integer) entry.get(0);
int oper = operationAndFlags & OPERATION_MASK;
long version = (Long) entry.get(1);
switch (oper) {
- case UpdateLog.ADD:
- {
+ case UpdateLog.ADD: {
recoveryInfo.adds++;
// byte[] idBytes = (byte[]) entry.get(2);
- SolrInputDocument sdoc = (SolrInputDocument)entry.get(entry.size()-1);
+ SolrInputDocument sdoc = (SolrInputDocument) entry.get(entry.size() - 1);
AddUpdateCommand cmd = new AddUpdateCommand(req);
// cmd.setIndexedId(new BytesRef(idBytes));
cmd.solrDoc = sdoc;
cmd.setVersion(version);
cmd.setFlags(UpdateCommand.REPLAY | UpdateCommand.IGNORE_AUTOCOMMIT);
- if (debug) log.debug("add " + cmd);
+ if (debug) log.debug("add " + cmd);
proc.processAdd(cmd);
break;
}
- case UpdateLog.DELETE:
- {
+ case UpdateLog.DELETE: {
recoveryInfo.deletes++;
byte[] idBytes = (byte[]) entry.get(2);
DeleteUpdateCommand cmd = new DeleteUpdateCommand(req);
cmd.setIndexedId(new BytesRef(idBytes));
cmd.setVersion(version);
cmd.setFlags(UpdateCommand.REPLAY | UpdateCommand.IGNORE_AUTOCOMMIT);
- if (debug) log.debug("delete " + cmd);
+ if (debug) log.debug("delete " + cmd);
proc.processDelete(cmd);
break;
}
- case UpdateLog.DELETE_BY_QUERY:
- {
+ case UpdateLog.DELETE_BY_QUERY: {
recoveryInfo.deleteByQuery++;
- String query = (String)entry.get(2);
+ String query = (String) entry.get(2);
DeleteUpdateCommand cmd = new DeleteUpdateCommand(req);
cmd.query = query;
cmd.setVersion(version);
cmd.setFlags(UpdateCommand.REPLAY | UpdateCommand.IGNORE_AUTOCOMMIT);
- if (debug) log.debug("deleteByQuery " + cmd);
+ if (debug) log.debug("deleteByQuery " + cmd);
proc.processDelete(cmd);
break;
}
- case UpdateLog.COMMIT:
- {
+ case UpdateLog.COMMIT: {
commitVersion = version;
break;
}
default:
- throw new SolrException(SolrException.ErrorCode.SERVER_ERROR, "Unknown Operation! " + oper);
+ throw new SolrException(SolrException.ErrorCode.SERVER_ERROR, "Unknown Operation! " + oper);
}
if (rsp.getException() != null) {
@@ -1430,7 +1430,7 @@ public class UpdateLog implements Plugin
recoveryInfo.errors++;
loglog.warn("REPLAY_ERR: Unexpected log entry or corrupt log. Entry=" + o, cl);
// would be caused by a corrupt transaction log
- } catch (SolrException ex) {
+ } catch (SolrException ex) {
if (ex.code() == ErrorCode.SERVICE_UNAVAILABLE.code) {
throw ex;
}
@@ -1450,7 +1450,7 @@ public class UpdateLog implements Plugin
cmd.waitSearcher = true;
cmd.setFlags(UpdateCommand.REPLAY);
try {
- if (debug) log.debug("commit " + cmd);
+ if (debug) log.debug("commit " + cmd);
uhandler.commit(cmd); // this should cause a commit to be added to the incomplete log and avoid it being replayed again after a restart.
} catch (IOException ex) {
recoveryInfo.errors++;
@@ -1506,25 +1506,25 @@ public class UpdateLog implements Plugin
}
}
}
-
+
protected String getTlogDir(SolrCore core, PluginInfo info) {
String dataDir = (String) info.initArgs.get("dir");
-
+
String ulogDir = core.getCoreDescriptor().getUlogDir();
if (ulogDir != null) {
dataDir = ulogDir;
}
-
+
if (dataDir == null || dataDir.length() == 0) {
dataDir = core.getDataDir();
}
return dataDir + "/" + TLOG_NAME;
}
-
+
/**
* Clears the logs on the file system. Only call before init.
- *
+ *
* @param core the SolrCore
* @param ulogPluginInfo the init info for the UpdateHandler
*/
Added: lucene/dev/trunk/solr/core/src/java/org/apache/solr/update/processor/CdcrUpdateProcessor.java
URL: http://svn.apache.org/viewvc/lucene/dev/trunk/solr/core/src/java/org/apache/solr/update/processor/CdcrUpdateProcessor.java?rev=1681186&view=auto
==============================================================================
--- lucene/dev/trunk/solr/core/src/java/org/apache/solr/update/processor/CdcrUpdateProcessor.java (added)
+++ lucene/dev/trunk/solr/core/src/java/org/apache/solr/update/processor/CdcrUpdateProcessor.java Fri May 22 18:58:29 2015
@@ -0,0 +1,125 @@
+package org.apache.solr.update.processor;
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+import java.io.IOException;
+
+import org.apache.solr.common.params.ModifiableSolrParams;
+import org.apache.solr.common.params.SolrParams;
+import org.apache.solr.request.SolrQueryRequest;
+import org.apache.solr.response.SolrQueryResponse;
+import org.apache.solr.update.AddUpdateCommand;
+import org.apache.solr.update.DeleteUpdateCommand;
+import org.apache.solr.update.UpdateCommand;
+
+/**
+ * <p>
+ * Extends {@link org.apache.solr.update.processor.DistributedUpdateProcessor} to force peer sync logic
+ * for every updates. This ensures that the version parameter sent by the source cluster is kept
+ * by the target cluster.
+ * </p>
+ */
+public class CdcrUpdateProcessor extends DistributedUpdateProcessor {
+
+ public static final String CDCR_UPDATE = "cdcr.update";
+
+ public CdcrUpdateProcessor(SolrQueryRequest req, SolrQueryResponse rsp, UpdateRequestProcessor next) {
+ super(req, rsp, next);
+ }
+
+ @Override
+ protected boolean versionAdd(AddUpdateCommand cmd) throws IOException {
+ /*
+ temporarily set the PEER_SYNC flag so that DistributedUpdateProcessor.versionAdd doesn't execute leader logic
+ but the else part of that if. That way version remains preserved.
+
+ we cannot set the flag for the whole processAdd method because DistributedUpdateProcessor.setupRequest() would set
+ isLeader to false which wouldn't work
+ */
+ if (cmd.getReq().getParams().get(CDCR_UPDATE) != null) {
+ cmd.setFlags(cmd.getFlags() | UpdateCommand.PEER_SYNC); // we need super.versionAdd() to set leaderLogic to false
+ }
+
+ boolean result = super.versionAdd(cmd);
+
+ // unset the flag to avoid unintended consequences down the chain
+ if (cmd.getReq().getParams().get(CDCR_UPDATE) != null) {
+ cmd.setFlags(cmd.getFlags() & ~UpdateCommand.PEER_SYNC);
+ }
+
+ return result;
+ }
+
+ @Override
+ protected boolean versionDelete(DeleteUpdateCommand cmd) throws IOException {
+ /*
+ temporarily set the PEER_SYNC flag so that DistributedUpdateProcessor.deleteAdd doesn't execute leader logic
+ but the else part of that if. That way version remains preserved.
+
+ we cannot set the flag for the whole processDelete method because DistributedUpdateProcessor.setupRequest() would set
+ isLeader to false which wouldn't work
+ */
+ if (cmd.getReq().getParams().get(CDCR_UPDATE) != null) {
+ cmd.setFlags(cmd.getFlags() | UpdateCommand.PEER_SYNC); // we need super.versionAdd() to set leaderLogic to false
+ }
+
+ boolean result = super.versionDelete(cmd);
+
+ // unset the flag to avoid unintended consequences down the chain
+ if (cmd.getReq().getParams().get(CDCR_UPDATE) != null) {
+ cmd.setFlags(cmd.getFlags() & ~UpdateCommand.PEER_SYNC);
+ }
+
+ return result;
+ }
+
+ protected ModifiableSolrParams filterParams(SolrParams params) {
+ ModifiableSolrParams result = super.filterParams(params);
+ if (params.get(CDCR_UPDATE) != null) {
+ result.set(CDCR_UPDATE, "");
+ if (params.get(DistributedUpdateProcessor.VERSION_FIELD) == null) {
+ log.warn("+++ cdcr.update but no version field, params are: " + params);
+ } else {
+ log.info("+++ cdcr.update version present, params are: " + params);
+ }
+ result.set(DistributedUpdateProcessor.VERSION_FIELD, params.get(DistributedUpdateProcessor.VERSION_FIELD));
+ }
+
+ return result;
+ }
+
+ @Override
+ protected void versionDeleteByQuery(DeleteUpdateCommand cmd) throws IOException {
+ /*
+ temporarily set the PEER_SYNC flag so that DistributedUpdateProcessor.versionDeleteByQuery doesn't execute leader logic
+ That way version remains preserved.
+
+ */
+ if (cmd.getReq().getParams().get(CDCR_UPDATE) != null) {
+ cmd.setFlags(cmd.getFlags() | UpdateCommand.PEER_SYNC); // we need super.versionDeleteByQuery() to set leaderLogic to false
+ }
+
+ super.versionDeleteByQuery(cmd);
+
+ // unset the flag to avoid unintended consequences down the chain
+ if (cmd.getReq().getParams().get(CDCR_UPDATE) != null) {
+ cmd.setFlags(cmd.getFlags() & ~UpdateCommand.PEER_SYNC);
+ }
+ }
+}
+
Added: lucene/dev/trunk/solr/core/src/java/org/apache/solr/update/processor/CdcrUpdateProcessorFactory.java
URL: http://svn.apache.org/viewvc/lucene/dev/trunk/solr/core/src/java/org/apache/solr/update/processor/CdcrUpdateProcessorFactory.java?rev=1681186&view=auto
==============================================================================
--- lucene/dev/trunk/solr/core/src/java/org/apache/solr/update/processor/CdcrUpdateProcessorFactory.java (added)
+++ lucene/dev/trunk/solr/core/src/java/org/apache/solr/update/processor/CdcrUpdateProcessorFactory.java Fri May 22 18:58:29 2015
@@ -0,0 +1,46 @@
+package org.apache.solr.update.processor;
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+import org.apache.solr.common.util.NamedList;
+import org.apache.solr.request.SolrQueryRequest;
+import org.apache.solr.response.SolrQueryResponse;
+
+/**
+ * Factory for {@link org.apache.solr.update.processor.CdcrUpdateProcessor}.
+ *
+ * @see org.apache.solr.update.processor.CdcrUpdateProcessor
+ */
+public class CdcrUpdateProcessorFactory
+ extends UpdateRequestProcessorFactory
+ implements DistributingUpdateProcessorFactory {
+
+ @Override
+ public void init(NamedList args) {
+
+ }
+
+ @Override
+ public CdcrUpdateProcessor getInstance(SolrQueryRequest req,
+ SolrQueryResponse rsp, UpdateRequestProcessor next) {
+
+ return new CdcrUpdateProcessor(req, rsp, next);
+ }
+
+}
+
Modified: lucene/dev/trunk/solr/core/src/java/org/apache/solr/update/processor/DistributedUpdateProcessor.java
URL: http://svn.apache.org/viewvc/lucene/dev/trunk/solr/core/src/java/org/apache/solr/update/processor/DistributedUpdateProcessor.java?rev=1681186&r1=1681185&r2=1681186&view=diff
==============================================================================
--- lucene/dev/trunk/solr/core/src/java/org/apache/solr/update/processor/DistributedUpdateProcessor.java (original)
+++ lucene/dev/trunk/solr/core/src/java/org/apache/solr/update/processor/DistributedUpdateProcessor.java Fri May 22 18:58:29 2015
@@ -964,7 +964,7 @@ public class DistributedUpdateProcessor
* @return whether or not to drop this cmd
* @throws IOException If there is a low-level I/O error.
*/
- private boolean versionAdd(AddUpdateCommand cmd) throws IOException {
+ protected boolean versionAdd(AddUpdateCommand cmd) throws IOException {
BytesRef idBytes = cmd.getIndexedId();
if (idBytes == null) {
@@ -1226,7 +1226,7 @@ public class DistributedUpdateProcessor
}
}
- private ModifiableSolrParams filterParams(SolrParams params) {
+ protected ModifiableSolrParams filterParams(SolrParams params) {
ModifiableSolrParams fparams = new ModifiableSolrParams();
passParam(params, fparams, UpdateParams.UPDATE_CHAIN);
passParam(params, fparams, TEST_DISTRIB_SKIP_SERVERS);
@@ -1333,53 +1333,10 @@ public class DistributedUpdateProcessor
// at this point, there is an update we need to try and apply.
// we may or may not be the leader.
- // Find the version
- long versionOnUpdate = cmd.getVersion();
- if (versionOnUpdate == 0) {
- String versionOnUpdateS = req.getParams().get(VERSION_FIELD);
- versionOnUpdate = versionOnUpdateS == null ? 0 : Long.parseLong(versionOnUpdateS);
- }
- versionOnUpdate = Math.abs(versionOnUpdate); // normalize to positive version
-
boolean isReplayOrPeersync = (cmd.getFlags() & (UpdateCommand.REPLAY | UpdateCommand.PEER_SYNC)) != 0;
boolean leaderLogic = isLeader && !isReplayOrPeersync;
- if (!leaderLogic && versionOnUpdate==0) {
- throw new SolrException(ErrorCode.BAD_REQUEST, "missing _version_ on update from leader");
- }
-
- vinfo.blockUpdates();
- try {
-
- if (versionsStored) {
- if (leaderLogic) {
- long version = vinfo.getNewClock();
- cmd.setVersion(-version);
- // TODO update versions in all buckets
-
- doLocalDelete(cmd);
-
- } else {
- cmd.setVersion(-versionOnUpdate);
-
- if (ulog.getState() != UpdateLog.State.ACTIVE && (cmd.getFlags() & UpdateCommand.REPLAY) == 0) {
- // we're not in an active state, and this update isn't from a replay, so buffer it.
- cmd.setFlags(cmd.getFlags() | UpdateCommand.BUFFERING);
- ulog.deleteByQuery(cmd);
- return;
- }
-
- doLocalDelete(cmd);
- }
- }
-
- // since we don't know which documents were deleted, the easiest thing to do is to invalidate
- // all real-time caches (i.e. UpdateLog) which involves also getting a new version of the IndexReader
- // (so cache misses will see up-to-date data)
-
- } finally {
- vinfo.unblockUpdates();
- }
+ versionDeleteByQuery(cmd);
if (zkEnabled) {
// forward to all replicas
@@ -1449,6 +1406,56 @@ public class DistributedUpdateProcessor
}
}
+ protected void versionDeleteByQuery(DeleteUpdateCommand cmd) throws IOException {
+ // Find the version
+ long versionOnUpdate = cmd.getVersion();
+ if (versionOnUpdate == 0) {
+ String versionOnUpdateS = req.getParams().get(VERSION_FIELD);
+ versionOnUpdate = versionOnUpdateS == null ? 0 : Long.parseLong(versionOnUpdateS);
+ }
+ versionOnUpdate = Math.abs(versionOnUpdate); // normalize to positive version
+
+ boolean isReplayOrPeersync = (cmd.getFlags() & (UpdateCommand.REPLAY | UpdateCommand.PEER_SYNC)) != 0;
+ boolean leaderLogic = isLeader && !isReplayOrPeersync;
+
+ if (!leaderLogic && versionOnUpdate == 0) {
+ throw new SolrException(ErrorCode.BAD_REQUEST, "missing _version_ on update from leader");
+ }
+
+ vinfo.blockUpdates();
+ try {
+
+ if (versionsStored) {
+ if (leaderLogic) {
+ long version = vinfo.getNewClock();
+ cmd.setVersion(-version);
+ // TODO update versions in all buckets
+
+ doLocalDelete(cmd);
+
+ } else {
+ cmd.setVersion(-versionOnUpdate);
+
+ if (ulog.getState() != UpdateLog.State.ACTIVE && (cmd.getFlags() & UpdateCommand.REPLAY) == 0) {
+ // we're not in an active state, and this update isn't from a replay, so buffer it.
+ cmd.setFlags(cmd.getFlags() | UpdateCommand.BUFFERING);
+ ulog.deleteByQuery(cmd);
+ return;
+ }
+
+ doLocalDelete(cmd);
+ }
+ }
+
+ // since we don't know which documents were deleted, the easiest thing to do is to invalidate
+ // all real-time caches (i.e. UpdateLog) which involves also getting a new version of the IndexReader
+ // (so cache misses will see up-to-date data)
+
+ } finally {
+ vinfo.unblockUpdates();
+ }
+ }
+
// internal helper method to tell if we are the leader for an add or deleteById update
boolean isLeader(UpdateCommand cmd) {
updateCommand = cmd;
@@ -1482,7 +1489,7 @@ public class DistributedUpdateProcessor
throw new SolrException(ErrorCode.SERVICE_UNAVAILABLE, "Cannot talk to ZooKeeper - Updates are disabled.");
}
- private boolean versionDelete(DeleteUpdateCommand cmd) throws IOException {
+ protected boolean versionDelete(DeleteUpdateCommand cmd) throws IOException {
BytesRef idBytes = cmd.getIndexedId();
Modified: lucene/dev/trunk/solr/core/src/java/org/apache/solr/util/SolrCLI.java
URL: http://svn.apache.org/viewvc/lucene/dev/trunk/solr/core/src/java/org/apache/solr/util/SolrCLI.java?rev=1681186&r1=1681185&r2=1681186&view=diff
==============================================================================
--- lucene/dev/trunk/solr/core/src/java/org/apache/solr/util/SolrCLI.java (original)
+++ lucene/dev/trunk/solr/core/src/java/org/apache/solr/util/SolrCLI.java Fri May 22 18:58:29 2015
@@ -144,7 +144,7 @@ public class SolrCLI {
}
/**
- * Runs a SolrCloud tool with CloudSolrServer initialized
+ * Runs a SolrCloud tool with CloudSolrClient initialized
*/
protected abstract int runCloudTool(CloudSolrClient cloudSolrClient, CommandLine cli)
throws Exception;
@@ -1217,10 +1217,10 @@ public class SolrCLI {
int toolExitStatus = 0;
- try (CloudSolrClient cloudSolrServer = new CloudSolrClient(zkHost)) {
+ try (CloudSolrClient cloudSolrClient = new CloudSolrClient(zkHost)) {
System.out.println("Connecting to ZooKeeper at " + zkHost);
- cloudSolrServer.connect();
- toolExitStatus = runCloudTool(cloudSolrServer, cli);
+ cloudSolrClient.connect();
+ toolExitStatus = runCloudTool(cloudSolrClient, cli);
} catch (Exception exc) {
// since this is a CLI, spare the user the stacktrace
String excMsg = exc.getMessage();
Added: lucene/dev/trunk/solr/core/src/test-files/solr/collection1/conf/solrconfig-cdcr.xml
URL: http://svn.apache.org/viewvc/lucene/dev/trunk/solr/core/src/test-files/solr/collection1/conf/solrconfig-cdcr.xml?rev=1681186&view=auto
==============================================================================
--- lucene/dev/trunk/solr/core/src/test-files/solr/collection1/conf/solrconfig-cdcr.xml (added)
+++ lucene/dev/trunk/solr/core/src/test-files/solr/collection1/conf/solrconfig-cdcr.xml Fri May 22 18:58:29 2015
@@ -0,0 +1,85 @@
+<?xml version="1.0" ?>
+
+<!--
+ Licensed to the Apache Software Foundation (ASF) under one or more
+ contributor license agreements. See the NOTICE file distributed with
+ this work for additional information regarding copyright ownership.
+ The ASF licenses this file to You under the Apache License, Version 2.0
+ (the "License"); you may not use this file except in compliance with
+ the License. You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+ Unless required by applicable law or agreed to in writing, software
+ distributed under the License is distributed on an "AS IS" BASIS,
+ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ See the License for the specific language governing permissions and
+ limitations under the License.
+-->
+
+<config>
+ <jmx/>
+
+ <luceneMatchVersion>${tests.luceneMatchVersion:LATEST}</luceneMatchVersion>
+
+ <directoryFactory name="DirectoryFactory" class="${solr.directoryFactory:solr.RAMDirectoryFactory}">
+ <!-- used to keep RAM reqs down for HdfsDirectoryFactory -->
+ <bool name="solr.hdfs.blockcache.enabled">${solr.hdfs.blockcache.enabled:true}</bool>
+ <int name="solr.hdfs.blockcache.blocksperbank">${solr.hdfs.blockcache.blocksperbank:1024}</int>
+ <str name="solr.hdfs.home">${solr.hdfs.home:}</str>
+ <str name="solr.hdfs.confdir">${solr.hdfs.confdir:}</str>
+ <str name="solr.hdfs.blockcache.global">${solr.hdfs.blockcache.global:false}</str>
+ </directoryFactory>
+
+ <dataDir>${solr.data.dir:}</dataDir>
+
+ <xi:include href="solrconfig.snippet.randomindexconfig.xml" xmlns:xi="http://www.w3.org/2001/XInclude"/>
+
+ <requestHandler name="standard" class="solr.StandardRequestHandler">
+ </requestHandler>
+
+ <requestHandler name="/get" class="solr.RealTimeGetHandler">
+ <lst name="defaults">
+ <str name="omitHeader">true</str>
+ </lst>
+ </requestHandler>
+
+ <requestHandler name="/replication" class="solr.ReplicationHandler" startup="lazy"/>
+
+ <requestHandler name="/update" class="solr.UpdateRequestHandler">
+ <lst name="defaults">
+ <str name="update.chain">cdcr-processor-chain</str>
+ </lst>
+ </requestHandler>
+
+ <updateRequestProcessorChain name="cdcr-processor-chain">
+ <processor class="solr.CdcrUpdateProcessorFactory"/>
+ <processor class="solr.RunUpdateProcessorFactory"/>
+ </updateRequestProcessorChain>
+
+ <requestHandler name="/cdcr" class="solr.CdcrRequestHandler">
+ <lst name="replica">
+ <str name="zkHost">${zkHost}</str>
+ <str name="source">source_collection</str>
+ <str name="target">target_collection</str>
+ </lst>
+ <lst name="replicator">
+ <str name="threadPoolSize">8</str>
+ <str name="schedule">1000</str>
+ <str name="batchSize">64</str>
+ </lst>
+ <lst name="updateLogSynchronizer">
+ <str name="schedule">1000</str>
+ </lst>
+ </requestHandler>
+
+ <updateHandler class="solr.DirectUpdateHandler2">
+ <updateLog class="solr.CdcrUpdateLog">
+ <str name="dir">${solr.ulog.dir:}</str>
+ </updateLog>
+ </updateHandler>
+
+ <requestHandler name="/admin/" class="org.apache.solr.handler.admin.AdminHandlers"/>
+
+</config>
+
Added: lucene/dev/trunk/solr/core/src/test-files/solr/collection1/conf/solrconfig-cdcrupdatelog.xml
URL: http://svn.apache.org/viewvc/lucene/dev/trunk/solr/core/src/test-files/solr/collection1/conf/solrconfig-cdcrupdatelog.xml?rev=1681186&view=auto
==============================================================================
--- lucene/dev/trunk/solr/core/src/test-files/solr/collection1/conf/solrconfig-cdcrupdatelog.xml (added)
+++ lucene/dev/trunk/solr/core/src/test-files/solr/collection1/conf/solrconfig-cdcrupdatelog.xml Fri May 22 18:58:29 2015
@@ -0,0 +1,59 @@
+<?xml version="1.0" ?>
+
+<!--
+ Licensed to the Apache Software Foundation (ASF) under one or more
+ contributor license agreements. See the NOTICE file distributed with
+ this work for additional information regarding copyright ownership.
+ The ASF licenses this file to You under the Apache License, Version 2.0
+ (the "License"); you may not use this file except in compliance with
+ the License. You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+ Unless required by applicable law or agreed to in writing, software
+ distributed under the License is distributed on an "AS IS" BASIS,
+ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ See the License for the specific language governing permissions and
+ limitations under the License.
+-->
+
+<config>
+ <jmx/>
+
+ <luceneMatchVersion>${tests.luceneMatchVersion:LATEST}</luceneMatchVersion>
+
+ <directoryFactory name="DirectoryFactory" class="${solr.directoryFactory:solr.RAMDirectoryFactory}">
+ <!-- used to keep RAM reqs down for HdfsDirectoryFactory -->
+ <bool name="solr.hdfs.blockcache.enabled">${solr.hdfs.blockcache.enabled:true}</bool>
+ <int name="solr.hdfs.blockcache.blocksperbank">${solr.hdfs.blockcache.blocksperbank:1024}</int>
+ <str name="solr.hdfs.home">${solr.hdfs.home:}</str>
+ <str name="solr.hdfs.confdir">${solr.hdfs.confdir:}</str>
+ <str name="solr.hdfs.blockcache.global">${solr.hdfs.blockcache.global:false}</str>
+ </directoryFactory>
+
+ <dataDir>${solr.data.dir:}</dataDir>
+
+ <xi:include href="solrconfig.snippet.randomindexconfig.xml" xmlns:xi="http://www.w3.org/2001/XInclude"/>
+
+ <requestHandler name="standard" class="solr.StandardRequestHandler">
+ </requestHandler>
+
+ <requestHandler name="/get" class="solr.RealTimeGetHandler">
+ <lst name="defaults">
+ <str name="omitHeader">true</str>
+ </lst>
+ </requestHandler>
+
+ <requestHandler name="/replication" class="solr.ReplicationHandler" startup="lazy"/>
+
+ <requestHandler name="/update" class="solr.UpdateRequestHandler"/>
+
+ <updateHandler class="solr.DirectUpdateHandler2">
+ <updateLog class="solr.CdcrUpdateLog">
+ <str name="dir">${solr.ulog.dir:}</str>
+ </updateLog>
+ </updateHandler>
+
+ <requestHandler name="/admin/" class="org.apache.solr.handler.admin.AdminHandlers"/>
+
+</config>