You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@lucene.apache.org by mi...@apache.org on 2012/10/29 15:55:07 UTC
svn commit: r1403336 [3/5] - in /lucene/dev/branches/lucene3846: ./
dev-tools/ dev-tools/eclipse/ dev-tools/idea/.idea/libraries/
dev-tools/idea/lucene/classification/ dev-tools/maven/
dev-tools/maven/lucene/classification/ dev-tools/maven/lucene/core/...
Modified: lucene/dev/branches/lucene3846/solr/core/src/java/org/apache/solr/handler/ReplicationHandler.java
URL: http://svn.apache.org/viewvc/lucene/dev/branches/lucene3846/solr/core/src/java/org/apache/solr/handler/ReplicationHandler.java?rev=1403336&r1=1403335&r2=1403336&view=diff
==============================================================================
--- lucene/dev/branches/lucene3846/solr/core/src/java/org/apache/solr/handler/ReplicationHandler.java (original)
+++ lucene/dev/branches/lucene3846/solr/core/src/java/org/apache/solr/handler/ReplicationHandler.java Mon Oct 29 14:55:01 2012
@@ -38,11 +38,13 @@ import java.util.zip.Adler32;
import java.util.zip.Checksum;
import java.util.zip.DeflaterOutputStream;
-import org.apache.commons.io.FileUtils;
import org.apache.commons.io.IOUtils;
import org.apache.lucene.index.IndexCommit;
import org.apache.lucene.index.IndexDeletionPolicy;
import org.apache.lucene.index.DirectoryReader;
+import org.apache.lucene.store.Directory;
+import org.apache.lucene.store.IOContext;
+import org.apache.lucene.store.IndexInput;
import org.apache.solr.common.SolrException;
import org.apache.solr.common.SolrException.ErrorCode;
import org.apache.solr.common.params.CommonParams;
@@ -53,6 +55,7 @@ import org.apache.solr.common.util.Named
import org.apache.solr.common.util.SimpleOrderedMap;
import org.apache.solr.common.util.StrUtils;
import org.apache.solr.core.CloseHook;
+import org.apache.solr.core.DirectoryFactory;
import org.apache.solr.core.IndexDeletionPolicyWrapper;
import org.apache.solr.core.SolrCore;
import org.apache.solr.core.SolrDeletionPolicy;
@@ -204,9 +207,6 @@ public class ReplicationHandler extends
rsp.add(STATUS,ERR_STATUS);
rsp.add("message","No slave configured");
}
- } else if (command.equals(CMD_FILE_CHECKSUM)) {
- // this command is not used by anyone
- getFileChecksum(solrParams, rsp);
} else if (command.equals(CMD_SHOW_COMMITS)) {
rsp.add(CMD_SHOW_COMMITS, getCommits());
} else if (command.equals(CMD_DETAILS)) {
@@ -239,30 +239,6 @@ public class ReplicationHandler extends
return l;
}
- /**
- * Gets the checksum of a file
- */
- private void getFileChecksum(SolrParams solrParams, SolrQueryResponse rsp) {
- Checksum checksum = new Adler32();
- File dir = new File(core.getIndexDir());
- rsp.add(CHECKSUM, getCheckSums(solrParams.getParams(FILE), dir, checksum));
- dir = new File(core.getResourceLoader().getConfigDir());
- rsp.add(CONF_CHECKSUM, getCheckSums(solrParams.getParams(CONF_FILE_SHORT), dir, checksum));
- }
-
- private Map<String, Long> getCheckSums(String[] files, File dir, Checksum checksum) {
- Map<String, Long> checksumMap = new HashMap<String, Long>();
- if (files == null || files.length == 0)
- return checksumMap;
- for (String file : files) {
- File f = new File(dir, file);
- Long checkSumVal = getCheckSum(checksum, f);
- if (checkSumVal != null)
- checksumMap.put(file, checkSumVal);
- }
- return checksumMap;
- }
-
static Long getCheckSum(Checksum checksum, File f) {
FileInputStream fis = null;
checksum.reset();
@@ -343,15 +319,22 @@ public class ReplicationHandler extends
}
/**
- * This method adds an Object of FileStream to the resposnse . The FileStream implements a custom protocol which is
+ * This method adds an Object of FileStream to the response . The FileStream implements a custom protocol which is
* understood by SnapPuller.FileFetcher
*
- * @see org.apache.solr.handler.SnapPuller.FileFetcher
+ * @see org.apache.solr.handler.SnapPuller.LocalFsFileFetcher
+ * @see org.apache.solr.handler.SnapPuller.DirectoryFileFetcher
*/
private void getFileStream(SolrParams solrParams, SolrQueryResponse rsp) {
ModifiableSolrParams rawParams = new ModifiableSolrParams(solrParams);
rawParams.set(CommonParams.WT, FILE_STREAM);
- rsp.add(FILE_STREAM, new FileStream(solrParams));
+
+ String cfileName = solrParams.get(CONF_FILE_SHORT);
+ if (cfileName != null) {
+ rsp.add(FILE_STREAM, new LocalFsFileStream(solrParams));
+ } else {
+ rsp.add(FILE_STREAM, new DirectoryFileStream(solrParams));
+ }
}
@SuppressWarnings("unchecked")
@@ -372,21 +355,29 @@ public class ReplicationHandler extends
// reserve the indexcommit for sometime
core.getDeletionPolicy().setReserveDuration(gen, reserveCommitDuration);
List<Map<String, Object>> result = new ArrayList<Map<String, Object>>();
+ Directory dir = null;
try {
- //get all the files in the commit
- //use a set to workaround possible Lucene bug which returns same file name multiple times
+ // get all the files in the commit
+ // use a set to workaround possible Lucene bug which returns same file
+ // name multiple times
Collection<String> files = new HashSet<String>(commit.getFileNames());
- for (String fileName : files) {
- if(fileName.endsWith(".lock")) continue;
- File file = new File(core.getIndexDir(), fileName);
- Map<String, Object> fileMeta = getFileInfo(file);
- result.add(fileMeta);
+ dir = core.getDirectoryFactory().get(core.getNewIndexDir(), null);
+ try {
+
+ for (String fileName : files) {
+ if (fileName.endsWith(".lock")) continue;
+ Map<String,Object> fileMeta = new HashMap<String,Object>();
+ fileMeta.put(NAME, fileName);
+ fileMeta.put(SIZE, dir.fileLength(fileName));
+ result.add(fileMeta);
+ }
+ } finally {
+ core.getDirectoryFactory().release(dir);
}
} catch (IOException e) {
rsp.add("status", "unable to get file names for given index generation");
rsp.add("exception", e);
- LOG.warn("Unable to get file names for indexCommit generation: "
- + gen, e);
+ LOG.error("Unable to get file names for indexCommit generation: " + gen, e);
}
rsp.add(CMD_GET_FILE_LIST, result);
if (confFileNameAlias.size() < 1 || core.getCoreDescriptor().getCoreContainer().isZooKeeperAware())
@@ -444,7 +435,6 @@ public class ReplicationHandler extends
Map<String, Object> map = new HashMap<String, Object>();
map.put(NAME, name);
map.put(SIZE, size);
- map.put(LAST_MODIFIED, lastmodified);
map.put(CHECKSUM, checksum);
return map;
}
@@ -474,18 +464,19 @@ public class ReplicationHandler extends
}
long getIndexSize() {
- return FileUtils.sizeOfDirectory(new File(core.getIndexDir()));
- }
-
- /**
- * Collects the details such as name, size ,lastModified of a file
- */
- private Map<String, Object> getFileInfo(File file) {
- Map<String, Object> fileMeta = new HashMap<String, Object>();
- fileMeta.put(NAME, file.getName());
- fileMeta.put(SIZE, file.length());
- fileMeta.put(LAST_MODIFIED, file.lastModified());
- return fileMeta;
+ Directory dir;
+ long size = 0;
+ try {
+ dir = core.getDirectoryFactory().get(core.getIndexDir(), null);
+ try {
+ size = DirectoryFactory.sizeOfDirectory(dir);
+ } finally {
+ core.getDirectoryFactory().release(dir);
+ }
+ } catch (IOException e) {
+ SolrException.log(LOG, "IO error while trying to get the size of the Directory", e);
+ }
+ return size;
}
@Override
@@ -885,7 +876,8 @@ public class ReplicationHandler extends
}
// reboot the writer on the new index
- core.getUpdateHandler().newIndexWriter(true);
+ // TODO: perhaps this is no longer necessary then?
+ // core.getUpdateHandler().newIndexWriter(true);
} catch (IOException e) {
LOG.warn("Unable to get IndexCommit on startup", e);
@@ -936,7 +928,7 @@ public class ReplicationHandler extends
private void registerFileStreamResponseWriter() {
core.registerResponseWriter(FILE_STREAM, new BinaryQueryResponseWriter() {
public void write(OutputStream out, SolrQueryRequest request, SolrQueryResponse resp) throws IOException {
- FileStream stream = (FileStream) resp.getValues().get(FILE_STREAM);
+ DirectoryFileStream stream = (DirectoryFileStream) resp.getValues().get(FILE_STREAM);
stream.write(out);
}
@@ -1009,15 +1001,15 @@ public class ReplicationHandler extends
};
}
- private class FileStream {
- private SolrParams params;
+ private class DirectoryFileStream {
+ protected SolrParams params;
- private FastOutputStream fos;
+ protected FastOutputStream fos;
- private Long indexGen;
- private IndexDeletionPolicyWrapper delPolicy;
+ protected Long indexGen;
+ protected IndexDeletionPolicyWrapper delPolicy;
- public FileStream(SolrParams solrParams) {
+ public DirectoryFileStream(SolrParams solrParams) {
params = solrParams;
delPolicy = core.getDeletionPolicy();
}
@@ -1036,6 +1028,100 @@ public class ReplicationHandler extends
} else {
fos = new FastOutputStream(out);
}
+
+ int packetsWritten = 0;
+ IndexInput in = null;
+ try {
+ long offset = -1;
+ int len = -1;
+ // check if checksum is requested
+ boolean useChecksum = Boolean.parseBoolean(sChecksum);
+ if (sOffset != null) offset = Long.parseLong(sOffset);
+ if (sLen != null) len = Integer.parseInt(sLen);
+ if (fileName == null && cfileName == null) {
+ // no filename do nothing
+ writeNothing();
+ }
+
+ RefCounted<SolrIndexSearcher> sref = core.getSearcher();
+ Directory dir;
+ try {
+ SolrIndexSearcher searcher = sref.get();
+ dir = searcher.getIndexReader().directory();
+ } finally {
+ sref.decref();
+ }
+ in = dir.openInput(fileName, IOContext.READONCE);
+ // if offset is mentioned move the pointer to that point
+ if (offset != -1) in.seek(offset);
+ byte[] buf = new byte[(len == -1 || len > PACKET_SZ) ? PACKET_SZ : len];
+ Checksum checksum = null;
+ if (useChecksum) checksum = new Adler32();
+
+ long filelen = dir.fileLength(fileName);
+ while (true) {
+ offset = offset == -1 ? 0 : offset;
+ int read = (int) Math.min(buf.length, filelen - offset);
+ in.readBytes(buf, offset == -1 ? 0 : (int) offset, read);
+
+ fos.writeInt((int) read);
+ if (useChecksum) {
+ checksum.reset();
+ checksum.update(buf, 0, read);
+ fos.writeLong(checksum.getValue());
+ }
+ fos.write(buf, 0, read);
+ fos.flush();
+ if (indexGen != null && (packetsWritten % 5 == 0)) {
+ // after every 5 packets reserve the commitpoint for some time
+ delPolicy.setReserveDuration(indexGen, reserveCommitDuration);
+ }
+ packetsWritten++;
+ if (read != buf.length) {
+ writeNothing();
+ fos.close();
+ break;
+ }
+ }
+ } catch (IOException e) {
+ LOG.warn("Exception while writing response for params: " + params, e);
+ } finally {
+ if (in != null) {
+ in.close();
+ }
+ }
+ }
+
+
+ /**
+ * Used to write a marker for EOF
+ */
+ protected void writeNothing() throws IOException {
+ fos.writeInt(0);
+ fos.flush();
+ }
+ }
+
+ private class LocalFsFileStream extends DirectoryFileStream {
+
+ public LocalFsFileStream(SolrParams solrParams) {
+ super(solrParams);
+ }
+
+ public void write(OutputStream out) throws IOException {
+ String fileName = params.get(FILE);
+ String cfileName = params.get(CONF_FILE_SHORT);
+ String sOffset = params.get(OFFSET);
+ String sLen = params.get(LEN);
+ String compress = params.get(COMPRESSION);
+ String sChecksum = params.get(CHECKSUM);
+ String sGen = params.get(GENERATION);
+ if (sGen != null) indexGen = Long.parseLong(sGen);
+ if (Boolean.parseBoolean(compress)) {
+ fos = new FastOutputStream(new DeflaterOutputStream(out));
+ } else {
+ fos = new FastOutputStream(out);
+ }
FileInputStream inputStream = null;
int packetsWritten = 0;
try {
@@ -1053,13 +1139,10 @@ public class ReplicationHandler extends
}
File file = null;
- if (cfileName != null) {
- //if if is a conf file read from config diectory
- file = new File(core.getResourceLoader().getConfigDir(), cfileName);
- } else {
- //else read from the indexdirectory
- file = new File(core.getIndexDir(), fileName);
- }
+
+ //if if is a conf file read from config diectory
+ file = new File(core.getResourceLoader().getConfigDir(), cfileName);
+
if (file.exists() && file.canRead()) {
inputStream = new FileInputStream(file);
FileChannel channel = inputStream.getChannel();
@@ -1103,17 +1186,8 @@ public class ReplicationHandler extends
IOUtils.closeQuietly(inputStream);
}
}
-
-
- /**
- * Used to write a marker for EOF
- */
- private void writeNothing() throws IOException {
- fos.writeInt(0);
- fos.flush();
- }
- }
-
+ }
+
public static final String MASTER_URL = "masterUrl";
public static final String STATUS = "status";
@@ -1132,8 +1206,6 @@ public class ReplicationHandler extends
public static final String CMD_GET_FILE = "filecontent";
- public static final String CMD_FILE_CHECKSUM = "filechecksum";
-
public static final String CMD_DISABLE_POLL = "disablepoll";
public static final String CMD_DISABLE_REPL = "disablereplication";
@@ -1158,8 +1230,6 @@ public class ReplicationHandler extends
public static final String SIZE = "size";
- public static final String LAST_MODIFIED = "lastmodified";
-
public static final String CONF_FILE_SHORT = "cf";
public static final String CHECKSUM = "checksum";
Modified: lucene/dev/branches/lucene3846/solr/core/src/java/org/apache/solr/handler/SnapPuller.java
URL: http://svn.apache.org/viewvc/lucene/dev/branches/lucene3846/solr/core/src/java/org/apache/solr/handler/SnapPuller.java?rev=1403336&r1=1403335&r2=1403336&view=diff
==============================================================================
--- lucene/dev/branches/lucene3846/solr/core/src/java/org/apache/solr/handler/SnapPuller.java (original)
+++ lucene/dev/branches/lucene3846/solr/core/src/java/org/apache/solr/handler/SnapPuller.java Mon Oct 29 14:55:01 2012
@@ -16,11 +16,67 @@
*/
package org.apache.solr.handler;
+import static org.apache.solr.handler.ReplicationHandler.ALIAS;
+import static org.apache.solr.handler.ReplicationHandler.CHECKSUM;
+import static org.apache.solr.handler.ReplicationHandler.CMD_DETAILS;
+import static org.apache.solr.handler.ReplicationHandler.CMD_GET_FILE;
+import static org.apache.solr.handler.ReplicationHandler.CMD_GET_FILE_LIST;
+import static org.apache.solr.handler.ReplicationHandler.CMD_INDEX_VERSION;
+import static org.apache.solr.handler.ReplicationHandler.COMMAND;
+import static org.apache.solr.handler.ReplicationHandler.COMPRESSION;
+import static org.apache.solr.handler.ReplicationHandler.CONF_FILES;
+import static org.apache.solr.handler.ReplicationHandler.CONF_FILE_SHORT;
+import static org.apache.solr.handler.ReplicationHandler.EXTERNAL;
+import static org.apache.solr.handler.ReplicationHandler.FILE;
+import static org.apache.solr.handler.ReplicationHandler.FILE_STREAM;
+import static org.apache.solr.handler.ReplicationHandler.GENERATION;
+import static org.apache.solr.handler.ReplicationHandler.INTERNAL;
+import static org.apache.solr.handler.ReplicationHandler.MASTER_URL;
+import static org.apache.solr.handler.ReplicationHandler.NAME;
+import static org.apache.solr.handler.ReplicationHandler.OFFSET;
+import static org.apache.solr.handler.ReplicationHandler.SIZE;
+
+import java.io.EOFException;
+import java.io.File;
+import java.io.FileInputStream;
+import java.io.FileNotFoundException;
+import java.io.FileOutputStream;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStream;
+import java.nio.ByteBuffer;
+import java.nio.channels.FileChannel;
+import java.text.SimpleDateFormat;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.Date;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Locale;
+import java.util.Map;
+import java.util.Properties;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.Future;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.regex.Matcher;
+import java.util.regex.Pattern;
+import java.util.zip.Adler32;
+import java.util.zip.Checksum;
+import java.util.zip.InflaterInputStream;
+
import org.apache.commons.io.IOUtils;
import org.apache.http.client.HttpClient;
import org.apache.lucene.index.IndexCommit;
import org.apache.lucene.index.IndexWriter;
import org.apache.lucene.store.Directory;
+import org.apache.lucene.store.IOContext;
+import org.apache.lucene.store.IndexInput;
+import org.apache.lucene.store.IndexOutput;
import org.apache.solr.client.solrj.SolrServer;
import org.apache.solr.client.solrj.SolrServerException;
import org.apache.solr.client.solrj.impl.HttpClientUtil;
@@ -31,35 +87,22 @@ import org.apache.solr.common.params.Com
import org.apache.solr.common.params.ModifiableSolrParams;
import org.apache.solr.common.params.SolrParams;
import org.apache.solr.common.util.FastInputStream;
-import org.apache.solr.util.DefaultSolrThreadFactory;
-import org.apache.solr.util.FileUtils;
import org.apache.solr.common.util.NamedList;
import org.apache.solr.core.CachingDirectoryFactory.CloseListener;
-import org.apache.solr.core.SolrCore;
+import org.apache.solr.core.DirectoryFactory;
import org.apache.solr.core.IndexDeletionPolicyWrapper;
-import static org.apache.solr.handler.ReplicationHandler.*;
-
+import org.apache.solr.core.SolrCore;
+import org.apache.solr.handler.ReplicationHandler.FileInfo;
import org.apache.solr.request.LocalSolrQueryRequest;
import org.apache.solr.request.SolrQueryRequest;
import org.apache.solr.search.SolrIndexSearcher;
import org.apache.solr.update.CommitUpdateCommand;
+import org.apache.solr.util.DefaultSolrThreadFactory;
+import org.apache.solr.util.FileUtils;
import org.apache.solr.util.RefCounted;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import java.io.*;
-import java.nio.ByteBuffer;
-import java.nio.channels.FileChannel;
-import java.text.SimpleDateFormat;
-import java.util.*;
-import java.util.concurrent.*;
-import java.util.concurrent.atomic.AtomicBoolean;
-import java.util.regex.Matcher;
-import java.util.regex.Pattern;
-import java.util.zip.Adler32;
-import java.util.zip.Checksum;
-import java.util.zip.InflaterInputStream;
-
/**
* <p/> Provides functionality of downloading changed index files as well as config files and a timer for scheduling fetches from the
* master. </p>
@@ -96,7 +139,9 @@ public class SnapPuller {
private volatile Map<String, Object> currentFile;
- private volatile FileFetcher fileFetcher;
+ private volatile DirectoryFileFetcher dirFileFetcher;
+
+ private volatile LocalFsFileFetcher localFileFetcher;
private volatile ExecutorService fsyncService;
@@ -247,9 +292,12 @@ public class SnapPuller {
* @return true on success, false if slave is already in sync
* @throws IOException if an exception occurs
*/
- boolean fetchLatestIndex(SolrCore core, boolean forceReplication) throws IOException, InterruptedException {
+ boolean fetchLatestIndex(final SolrCore core, boolean forceReplication) throws IOException, InterruptedException {
successfulInstall = false;
replicationStartTime = System.currentTimeMillis();
+ Directory tmpIndexDir = null;
+ Directory indexDir = null;
+ boolean deleteTmpIdxDir = true;
try {
//get the current 'replicateable' index version in the master
NamedList response = null;
@@ -318,28 +366,34 @@ public class SnapPuller {
// if the generateion of master is older than that of the slave , it means they are not compatible to be copied
// then a new index direcory to be created and all the files need to be copied
boolean isFullCopyNeeded = IndexDeletionPolicyWrapper.getCommitTimestamp(commit) >= latestVersion || forceReplication;
- File tmpIndexDir = createTempindexDir(core);
- if (isIndexStale()) {
- isFullCopyNeeded = true;
- }
- LOG.info("Starting download to " + tmpIndexDir + " fullCopy=" + isFullCopyNeeded);
- successfulInstall = false;
- boolean deleteTmpIdxDir = true;
+
+ String tmpIdxDirName = "index." + new SimpleDateFormat(SnapShooter.DATE_FMT, Locale.ROOT).format(new Date());
+ String tmpIndex = createTempindexDir(core, tmpIdxDirName);
+ tmpIndexDir = core.getDirectoryFactory().get(tmpIndex, null);
+
// make sure it's the newest known index dir...
- final File indexDir = new File(core.getNewIndexDir());
+ indexDir = core.getDirectoryFactory().get(core.getNewIndexDir(), null);
Directory oldDirectory = null;
+
try {
+
+ if (isIndexStale(indexDir)) {
+ isFullCopyNeeded = true;
+ }
+ LOG.info("Starting download to " + tmpIndexDir + " fullCopy=" + isFullCopyNeeded);
+ successfulInstall = false;
+
downloadIndexFiles(isFullCopyNeeded, tmpIndexDir, latestGeneration);
LOG.info("Total time taken for download : " + ((System.currentTimeMillis() - replicationStartTime) / 1000) + " secs");
Collection<Map<String, Object>> modifiedConfFiles = getModifiedConfFiles(confFilesToDownload);
if (!modifiedConfFiles.isEmpty()) {
downloadConfFiles(confFilesToDownload, latestGeneration);
if (isFullCopyNeeded) {
- successfulInstall = modifyIndexProps(tmpIndexDir.getName());
- deleteTmpIdxDir = false;
+ successfulInstall = modifyIndexProps(tmpIdxDirName);
+ deleteTmpIdxDir = false;
} else {
- successfulInstall = copyIndexFiles(tmpIndexDir, indexDir);
+ successfulInstall = moveIndexFiles(tmpIndexDir, indexDir);
}
if (successfulInstall) {
LOG.info("Configuration files are modified, core will be reloaded");
@@ -349,7 +403,7 @@ public class SnapPuller {
} else {
terminateAndWaitFsyncService();
if (isFullCopyNeeded) {
- successfulInstall = modifyIndexProps(tmpIndexDir.getName());
+ successfulInstall = modifyIndexProps(tmpIdxDirName);
deleteTmpIdxDir = false;
RefCounted<IndexWriter> iw = core.getUpdateHandler().getSolrCoreState().getIndexWriter(core);
try {
@@ -358,7 +412,7 @@ public class SnapPuller {
iw.decref();
}
} else {
- successfulInstall = copyIndexFiles(tmpIndexDir, indexDir);
+ successfulInstall = moveIndexFiles(tmpIndexDir, indexDir);
}
if (successfulInstall) {
logReplicationTimeAndConfFiles(modifiedConfFiles, successfulInstall);
@@ -367,17 +421,28 @@ public class SnapPuller {
if (isFullCopyNeeded) {
// we have to do this before commit
+ final Directory freezeIndexDir = indexDir;
core.getDirectoryFactory().addCloseListener(oldDirectory, new CloseListener(){
@Override
- public void onClose() {
- LOG.info("removing old index directory " + indexDir);
- delTree(indexDir);
+ public void preClose() {
+ LOG.info("removing old index files " + freezeIndexDir);
+ DirectoryFactory.empty(freezeIndexDir);
+ }
+
+ @Override
+ public void postClose() {
+ LOG.info("removing old index directory " + freezeIndexDir);
+ try {
+ core.getDirectoryFactory().remove(freezeIndexDir);
+ } catch (IOException e) {
+ SolrException.log(LOG, "Error removing directory " + freezeIndexDir, e);
+ }
}
});
}
-
+
if (successfulInstall) {
if (isFullCopyNeeded) {
// let the system know we are changing dir's and the old one
@@ -400,21 +465,39 @@ public class SnapPuller {
throw new SolrException(SolrException.ErrorCode.SERVER_ERROR, "Index fetch failed : ", e);
} finally {
if (deleteTmpIdxDir) {
- LOG.info("removing temporary index download directory " + tmpIndexDir);
- delTree(tmpIndexDir);
+ LOG.info("removing temporary index download directory files " + tmpIndexDir);
+ DirectoryFactory.empty(tmpIndexDir);
}
}
} finally {
- if (!successfulInstall) {
- logReplicationTimeAndConfFiles(null, successfulInstall);
+ try {
+ if (!successfulInstall) {
+ logReplicationTimeAndConfFiles(null, successfulInstall);
+ }
+ filesToDownload = filesDownloaded = confFilesDownloaded = confFilesToDownload = null;
+ replicationStartTime = 0;
+ dirFileFetcher = null;
+ localFileFetcher = null;
+ if (fsyncService != null && !fsyncService.isShutdown()) fsyncService
+ .shutdownNow();
+ fsyncService = null;
+ stop = false;
+ fsyncException = null;
+ } finally {
+ if (tmpIndexDir != null) {
+ core.getDirectoryFactory().release(tmpIndexDir);
+ }
+ if (deleteTmpIdxDir && tmpIndexDir != null) {
+ try {
+ core.getDirectoryFactory().remove(tmpIndexDir);
+ } catch (IOException e) {
+ SolrException.log(LOG, "Error removing directory " + tmpIndexDir, e);
+ }
+ }
+ if (indexDir != null) {
+ core.getDirectoryFactory().release(indexDir);
+ }
}
- filesToDownload = filesDownloaded = confFilesDownloaded = confFilesToDownload = null;
- replicationStartTime = 0;
- fileFetcher = null;
- if (fsyncService != null && !fsyncService.isShutdown()) fsyncService.shutdownNow();
- fsyncService = null;
- stop = false;
- fsyncException = null;
}
}
@@ -535,7 +618,7 @@ public class SnapPuller {
SolrQueryRequest req = new LocalSolrQueryRequest(solrCore,
new ModifiableSolrParams());
// reboot the writer on the new index and get a new searcher
- solrCore.getUpdateHandler().newIndexWriter(isFullCopyNeeded);
+ solrCore.getUpdateHandler().newIndexWriter(isFullCopyNeeded, false);
try {
// first try to open an NRT searcher so that the new
@@ -567,11 +650,9 @@ public class SnapPuller {
/**
* All the files are copied to a temp dir first
*/
- private File createTempindexDir(SolrCore core) {
- String tmpIdxDirName = "index." + new SimpleDateFormat(SnapShooter.DATE_FMT, Locale.ROOT).format(new Date());
+ private String createTempindexDir(SolrCore core, String tmpIdxDirName) {
File tmpIdxDir = new File(core.getDataDir(), tmpIdxDirName);
- tmpIdxDir.mkdirs();
- return tmpIdxDir;
+ return tmpIdxDir.toString();
}
private void reloadCore() {
@@ -599,9 +680,9 @@ public class SnapPuller {
}
for (Map<String, Object> file : confFilesToDownload) {
String saveAs = (String) (file.get(ALIAS) == null ? file.get(NAME) : file.get(ALIAS));
- fileFetcher = new FileFetcher(tmpconfDir, file, saveAs, true, latestGeneration);
+ localFileFetcher = new LocalFsFileFetcher(tmpconfDir, file, saveAs, true, latestGeneration);
currentFile = file;
- fileFetcher.fetchFile();
+ localFileFetcher.fetchFile();
confFilesDownloaded.add(new HashMap<String, Object>(file));
}
// this is called before copying the files to the original conf dir
@@ -617,21 +698,29 @@ public class SnapPuller {
* Download the index files. If a new index is needed, download all the files.
*
* @param downloadCompleteIndex is it a fresh index copy
- * @param tmpIdxDir the directory to which files need to be downloadeed to
+ * @param tmpIndexDir the directory to which files need to be downloadeed to
* @param latestGeneration the version number
*/
- private void downloadIndexFiles(boolean downloadCompleteIndex, File tmpIdxDir, long latestGeneration) throws Exception {
+ private void downloadIndexFiles(boolean downloadCompleteIndex,
+ Directory tmpIndexDir, long latestGeneration) throws Exception {
String indexDir = solrCore.getIndexDir();
- for (Map<String, Object> file : filesToDownload) {
- File localIndexFile = new File(indexDir, (String) file.get(NAME));
- if (!localIndexFile.exists() || downloadCompleteIndex) {
- fileFetcher = new FileFetcher(tmpIdxDir, file, (String) file.get(NAME), false, latestGeneration);
- currentFile = file;
- fileFetcher.fetchFile();
- filesDownloaded.add(new HashMap<String, Object>(file));
- } else {
- LOG.info("Skipping download for " + localIndexFile);
+
+ // it's okay to use null for lock factory since we know this dir will exist
+ Directory dir = solrCore.getDirectoryFactory().get(indexDir, null);
+ try {
+ for (Map<String,Object> file : filesToDownload) {
+ if (!dir.fileExists((String) file.get(NAME)) || downloadCompleteIndex) {
+ dirFileFetcher = new DirectoryFileFetcher(tmpIndexDir, file,
+ (String) file.get(NAME), false, latestGeneration);
+ currentFile = file;
+ dirFileFetcher.fetchFile();
+ filesDownloaded.add(new HashMap<String,Object>(file));
+ } else {
+ LOG.info("Skipping download for " + file.get(NAME));
+ }
}
+ } finally {
+ solrCore.getDirectoryFactory().release(dir);
}
}
@@ -640,13 +729,12 @@ public class SnapPuller {
* not compatible (stale).
*
* @return true if the index stale and we need to download a fresh copy, false otherwise.
+ * @throws IOException if low level io error
*/
- private boolean isIndexStale() {
+ private boolean isIndexStale(Directory dir) throws IOException {
for (Map<String, Object> file : filesToDownload) {
- File localIndexFile = new File(solrCore.getIndexDir(), (String) file
- .get(NAME));
- if (localIndexFile.exists()
- && localIndexFile.length() != (Long) file.get(SIZE)) {
+ if (dir.fileExists((String) file.get(NAME))
+ && dir.fileLength((String) file.get(NAME)) != (Long) file.get(SIZE)) {
// file exists and size is different, therefore we must assume
// corrupted index
return true;
@@ -659,52 +747,31 @@ public class SnapPuller {
* Copy a file by the File#renameTo() method. If it fails, it is considered a failure
* <p/>
*/
- private boolean copyAFile(File tmpIdxDir, File indexDir, String fname, List<String> copiedfiles) {
- File indexFileInTmpDir = new File(tmpIdxDir, fname);
- File indexFileInIndex = new File(indexDir, fname);
- boolean success = indexFileInTmpDir.renameTo(indexFileInIndex);
- if(!success){
- try {
- LOG.error("Unable to move index file from: " + indexFileInTmpDir
- + " to: " + indexFileInIndex + " Trying to do a copy");
- FileUtils.copyFile(indexFileInTmpDir,indexFileInIndex);
- success = true;
- } catch (FileNotFoundException e) {
- if (!indexDir.exists()) {
- File parent = indexDir.getParentFile();
- String[] children = null;
- if (parent != null) {
- children = parent.list();
- }
- LOG.error("The index directory does not exist: " + indexDir.getAbsolutePath()
- + " dirs found: " + (children == null ? "none could be found" : Arrays.asList(children)));
- }
- LOG.error("Unable to copy index file from: " + indexFileInTmpDir
- + " to: " + indexFileInIndex , e);
- } catch (IOException e) {
- LOG.error("Unable to copy index file from: " + indexFileInTmpDir
- + " to: " + indexFileInIndex , e);
- }
- }
-
- if (!success) {
- for (String f : copiedfiles) {
- File indexFile = new File(indexDir, f);
- if (indexFile.exists())
- indexFile.delete();
+ private boolean moveAFile(Directory tmpIdxDir, Directory indexDir, String fname, List<String> copiedfiles) {
+ boolean success = false;
+ try {
+ if (indexDir.fileExists(fname)) {
+ return true;
}
- delTree(tmpIdxDir);
+ } catch (IOException e) {
+ SolrException.log(LOG, "could not check if a file exists", e);
return false;
}
- return true;
+ try {
+ solrCore.getDirectoryFactory().move(tmpIdxDir, indexDir, fname);
+ success = true;
+ } catch (IOException e) {
+ SolrException.log(LOG, "Could not move file", e);
+ }
+ return success;
}
/**
* Copy all index files from the temp index dir to the actual index. The segments_N file is copied last.
*/
- private boolean copyIndexFiles(File tmpIdxDir, File indexDir) {
+ private boolean moveIndexFiles(Directory tmpIdxDir, Directory indexDir) {
String segmentsFile = null;
- List<String> copiedfiles = new ArrayList<String>();
+ List<String> movedfiles = new ArrayList<String>();
for (Map<String, Object> f : filesDownloaded) {
String fname = (String) f.get(NAME);
// the segments file must be copied last
@@ -716,12 +783,12 @@ public class SnapPuller {
segmentsFile = fname;
continue;
}
- if (!copyAFile(tmpIdxDir, indexDir, fname, copiedfiles)) return false;
- copiedfiles.add(fname);
+ if (!moveAFile(tmpIdxDir, indexDir, fname, movedfiles)) return false;
+ movedfiles.add(fname);
}
//copy the segments file last
if (segmentsFile != null) {
- if (!copyAFile(tmpIdxDir, indexDir, segmentsFile, copiedfiles)) return false;
+ if (!moveAFile(tmpIdxDir, indexDir, segmentsFile, movedfiles)) return false;
}
return true;
}
@@ -759,31 +826,84 @@ public class SnapPuller {
*/
private boolean modifyIndexProps(String tmpIdxDirName) {
LOG.info("New index installed. Updating index properties... index="+tmpIdxDirName);
- File idxprops = new File(solrCore.getDataDir() + "index.properties");
Properties p = new Properties();
- if (idxprops.exists()) {
- InputStream is = null;
+ Directory dir = null;
+ try {
+ dir = solrCore.getDirectoryFactory().get(solrCore.getDataDir(), null);
+ if (dir.fileExists("index.properties")){
+ final IndexInput input = dir.openInput("index.properties", IOContext.DEFAULT);
+
+ final InputStream is = new InputStream() {
+
+ @Override
+ public int read() throws IOException {
+ byte next;
+ try {
+ next = input.readByte();
+ } catch (EOFException e) {
+ return -1;
+ }
+ return next;
+ }
+
+ @Override
+ public void close() throws IOException {
+ super.close();
+ input.close();
+ }
+ };
+
+ try {
+ p.load(is);
+ } catch (Exception e) {
+ LOG.error("Unable to load index.properties", e);
+ } finally {
+ IOUtils.closeQuietly(is);
+ }
+ }
+ try {
+ dir.deleteFile("index.properties");
+ } catch (IOException e) {
+ // no problem
+ }
+ final IndexOutput out = dir.createOutput("index.properties", IOContext.DEFAULT);
+ p.put("index", tmpIdxDirName);
+ OutputStream os = null;
try {
- is = new FileInputStream(idxprops);
- p.load(is);
+ os = new OutputStream() {
+
+ @Override
+ public void write(int b) throws IOException {
+ out.writeByte((byte) b);
+ }
+
+ @Override
+ public void close() throws IOException {
+ super.close();
+ out.close();
+ }
+ };
+ p.store(os, "index properties");
} catch (Exception e) {
- LOG.error("Unable to load index.properties");
+ throw new SolrException(SolrException.ErrorCode.SERVER_ERROR,
+ "Unable to write index.properties", e);
} finally {
- IOUtils.closeQuietly(is);
+ IOUtils.closeQuietly(os);
}
- }
- p.put("index", tmpIdxDirName);
- FileOutputStream os = null;
- try {
- os = new FileOutputStream(idxprops);
- p.store(os, "index properties");
- } catch (Exception e) {
- throw new SolrException(SolrException.ErrorCode.SERVER_ERROR,
- "Unable to write index.properties", e);
+ return true;
+
+ } catch (IOException e1) {
+ throw new RuntimeException(e1);
} finally {
- IOUtils.closeQuietly(os);
+ if (dir != null) {
+ try {
+ solrCore.getDirectoryFactory().release(dir);
+ } catch (IOException e) {
+ SolrException.log(LOG, "", e);
+ }
+ }
}
- return true;
+
}
private final Map<String, FileInfo> confFileInfoCache = new HashMap<String, FileInfo>();
@@ -820,13 +940,8 @@ public class SnapPuller {
}
return nameVsFile.isEmpty() ? Collections.EMPTY_LIST : nameVsFile.values();
}
-
- /**
- * Delete the directory tree recursively
- */
+
static boolean delTree(File dir) {
- if (dir == null || !dir.exists())
- return false;
boolean isSuccess = true;
File contents[] = dir.listFiles();
if (contents != null) {
@@ -902,9 +1017,10 @@ public class SnapPuller {
return tmp == null ? Collections.EMPTY_LIST : new ArrayList<Map<String, Object>>(tmp);
}
+ // TODO: currently does not reflect conf files
Map<String, Object> getCurrentFile() {
Map<String, Object> tmp = currentFile;
- FileFetcher tmpFileFetcher = fileFetcher;
+ DirectoryFileFetcher tmpFileFetcher = dirFileFetcher;
if (tmp == null)
return null;
tmp = new HashMap<String, Object>(tmp);
@@ -933,9 +1049,255 @@ public class SnapPuller {
/**
* The class acts as a client for ReplicationHandler.FileStream. It understands the protocol of wt=filestream
*
- * @see org.apache.solr.handler.ReplicationHandler.FileStream
+ * @see org.apache.solr.handler.ReplicationHandler.DirectoryFileStream
+ */
+ private class DirectoryFileFetcher {
+ boolean includeChecksum = true;
+
+ Directory copy2Dir;
+
+ String fileName;
+
+ String saveAs;
+
+ long size;
+
+ long bytesDownloaded = 0;
+
+ byte[] buf = new byte[1024 * 1024];
+
+ Checksum checksum;
+
+ int errorCount = 0;
+
+ private boolean isConf;
+
+ private boolean aborted = false;
+
+ private Long indexGen;
+
+ private IndexOutput outStream;
+
+ DirectoryFileFetcher(Directory tmpIndexDir, Map<String, Object> fileDetails, String saveAs,
+ boolean isConf, long latestGen) throws IOException {
+ this.copy2Dir = tmpIndexDir;
+ this.fileName = (String) fileDetails.get(NAME);
+ this.size = (Long) fileDetails.get(SIZE);
+ this.isConf = isConf;
+ this.saveAs = saveAs;
+
+ indexGen = latestGen;
+
+ outStream = copy2Dir.createOutput(saveAs, IOContext.DEFAULT);
+
+ if (includeChecksum)
+ checksum = new Adler32();
+ }
+
+ /**
+ * The main method which downloads file
+ */
+ void fetchFile() throws Exception {
+ try {
+ while (true) {
+ final FastInputStream is = getStream();
+ int result;
+ try {
+ //fetch packets one by one in a single request
+ result = fetchPackets(is);
+ if (result == 0 || result == NO_CONTENT) {
+
+ return;
+ }
+ //if there is an error continue. But continue from the point where it got broken
+ } finally {
+ IOUtils.closeQuietly(is);
+ }
+ }
+ } finally {
+ cleanup();
+ //if cleanup suceeds . The file is downloaded fully. do an fsync
+ fsyncService.submit(new Runnable(){
+ public void run() {
+ try {
+ copy2Dir.sync(Collections.singleton(saveAs));
+ } catch (IOException e) {
+ fsyncException = e;
+ }
+ }
+ });
+ }
+ }
+
+ private int fetchPackets(FastInputStream fis) throws Exception {
+ byte[] intbytes = new byte[4];
+ byte[] longbytes = new byte[8];
+ try {
+ while (true) {
+ if (stop) {
+ stop = false;
+ aborted = true;
+ throw new ReplicationHandlerException("User aborted replication");
+ }
+ long checkSumServer = -1;
+ fis.readFully(intbytes);
+ //read the size of the packet
+ int packetSize = readInt(intbytes);
+ if (packetSize <= 0) {
+ LOG.warn("No content recieved for file: " + currentFile);
+ return NO_CONTENT;
+ }
+ if (buf.length < packetSize)
+ buf = new byte[packetSize];
+ if (checksum != null) {
+ //read the checksum
+ fis.readFully(longbytes);
+ checkSumServer = readLong(longbytes);
+ }
+ //then read the packet of bytes
+ fis.readFully(buf, 0, packetSize);
+ //compare the checksum as sent from the master
+ if (includeChecksum) {
+ checksum.reset();
+ checksum.update(buf, 0, packetSize);
+ long checkSumClient = checksum.getValue();
+ if (checkSumClient != checkSumServer) {
+ LOG.error("Checksum not matched between client and server for: " + currentFile);
+ //if checksum is wrong it is a problem return for retry
+ return 1;
+ }
+ }
+ //if everything is fine, write down the packet to the file
+ writeBytes(packetSize);
+ bytesDownloaded += packetSize;
+ if (bytesDownloaded >= size)
+ return 0;
+ //errorcount is always set to zero after a successful packet
+ errorCount = 0;
+ }
+ } catch (ReplicationHandlerException e) {
+ throw e;
+ } catch (Exception e) {
+ LOG.warn("Error in fetching packets ", e);
+ //for any failure , increment the error count
+ errorCount++;
+ //if it fails for the same pacaket for MAX_RETRIES fail and come out
+ if (errorCount > MAX_RETRIES) {
+ throw new SolrException(SolrException.ErrorCode.SERVER_ERROR,
+ "Fetch failed for file:" + fileName, e);
+ }
+ return ERR;
+ }
+ }
+
+ protected void writeBytes(int packetSize) throws IOException {
+ outStream.writeBytes(buf, 0, packetSize);
+ }
+
+ /**
+ * The webcontainer flushes the data only after it fills the buffer size. So, all data has to be read as readFully()
+ * other wise it fails. So read everything as bytes and then extract an integer out of it
+ */
+ private int readInt(byte[] b) {
+ return (((b[0] & 0xff) << 24) | ((b[1] & 0xff) << 16)
+ | ((b[2] & 0xff) << 8) | (b[3] & 0xff));
+
+ }
+
+ /**
+ * Same as above but to read longs from a byte array
+ */
+ private long readLong(byte[] b) {
+ return (((long) (b[0] & 0xff)) << 56) | (((long) (b[1] & 0xff)) << 48)
+ | (((long) (b[2] & 0xff)) << 40) | (((long) (b[3] & 0xff)) << 32)
+ | (((long) (b[4] & 0xff)) << 24) | ((b[5] & 0xff) << 16)
+ | ((b[6] & 0xff) << 8) | ((b[7] & 0xff));
+
+ }
+
+ /**
+ * cleanup everything
+ */
+ private void cleanup() {
+ try {
+ outStream.close();
+ } catch (Exception e) {/* noop */
+ LOG.error("Error closing the file stream: "+ this.saveAs ,e);
+ }
+ if (bytesDownloaded != size) {
+ //if the download is not complete then
+ //delete the file being downloaded
+ try {
+ copy2Dir.deleteFile(saveAs);
+ } catch (Exception e) {
+ LOG.error("Error deleting file in cleanup" + e.getMessage());
+ }
+ //if the failure is due to a user abort it is returned nomally else an exception is thrown
+ if (!aborted)
+ throw new SolrException(SolrException.ErrorCode.SERVER_ERROR,
+ "Unable to download " + fileName + " completely. Downloaded "
+ + bytesDownloaded + "!=" + size);
+ }
+ }
+
+ /**
+ * Open a new stream using HttpClient
+ */
+ FastInputStream getStream() throws IOException {
+ SolrServer s = new HttpSolrServer(masterUrl, myHttpClient, null); //XXX use shardhandler
+ ModifiableSolrParams params = new ModifiableSolrParams();
+
+// //the method is command=filecontent
+ params.set(COMMAND, CMD_GET_FILE);
+ params.set(GENERATION, Long.toString(indexGen));
+ params.set(CommonParams.QT, "/replication");
+ //add the version to download. This is used to reserve the download
+ if (isConf) {
+ //set cf instead of file for config file
+ params.set(CONF_FILE_SHORT, fileName);
+ } else {
+ params.set(FILE, fileName);
+ }
+ if (useInternal) {
+ params.set(COMPRESSION, "true");
+ }
+ //use checksum
+ if (this.includeChecksum) {
+ params.set(CHECKSUM, true);
+ }
+ //wt=filestream this is a custom protocol
+ params.set(CommonParams.WT, FILE_STREAM);
+ // This happen if there is a failure there is a retry. the offset=<sizedownloaded> ensures that
+ // the server starts from the offset
+ if (bytesDownloaded > 0) {
+ params.set(OFFSET, Long.toString(bytesDownloaded));
+ }
+
+
+ NamedList response;
+ InputStream is = null;
+ try {
+ QueryRequest req = new QueryRequest(params);
+ response = s.request(req);
+ is = (InputStream) response.get("stream");
+ if(useInternal) {
+ is = new InflaterInputStream(is);
+ }
+ return new FastInputStream(is);
+ } catch (Throwable t) {
+ //close stream on error
+ IOUtils.closeQuietly(is);
+ throw new IOException("Could not download file '" + fileName + "'", t);
+ }
+ }
+ }
+
+ /**
+ * The class acts as a client for ReplicationHandler.FileStream. It understands the protocol of wt=filestream
+ *
+ * @see org.apache.solr.handler.ReplicationHandler.LocalFsFileStream
*/
- private class FileFetcher {
+ private class LocalFsFileFetcher {
boolean includeChecksum = true;
private File copy2Dir;
@@ -944,7 +1306,7 @@ public class SnapPuller {
String saveAs;
- long size, lastmodified;
+ long size;
long bytesDownloaded = 0;
@@ -966,16 +1328,15 @@ public class SnapPuller {
private Long indexGen;
- FileFetcher(File dir, Map<String, Object> fileDetails, String saveAs,
+ // TODO: could do more code sharing with DirectoryFileFetcher
+ LocalFsFileFetcher(File dir, Map<String, Object> fileDetails, String saveAs,
boolean isConf, long latestGen) throws IOException {
this.copy2Dir = dir;
this.fileName = (String) fileDetails.get(NAME);
this.size = (Long) fileDetails.get(SIZE);
this.isConf = isConf;
this.saveAs = saveAs;
- if(fileDetails.get(LAST_MODIFIED) != null){
- lastmodified = (Long)fileDetails.get(LAST_MODIFIED);
- }
+
indexGen = latestGen;
this.file = new File(copy2Dir, saveAs);
@@ -1007,10 +1368,6 @@ public class SnapPuller {
//fetch packets one by one in a single request
result = fetchPackets(is);
if (result == 0 || result == NO_CONTENT) {
- // if the file is downloaded properly set the
- // timestamp same as that in the server
- if (file.exists() && lastmodified > 0)
- file.setLastModified(lastmodified);
return;
}
//if there is an error continue. But continue from the point where it got broken
Modified: lucene/dev/branches/lucene3846/solr/core/src/java/org/apache/solr/handler/SnapShooter.java
URL: http://svn.apache.org/viewvc/lucene/dev/branches/lucene3846/solr/core/src/java/org/apache/solr/handler/SnapShooter.java?rev=1403336&r1=1403335&r2=1403336&view=diff
==============================================================================
--- lucene/dev/branches/lucene3846/solr/core/src/java/org/apache/solr/handler/SnapShooter.java (original)
+++ lucene/dev/branches/lucene3846/solr/core/src/java/org/apache/solr/handler/SnapShooter.java Mon Oct 29 14:55:01 2012
@@ -17,9 +17,6 @@
package org.apache.solr.handler;
import java.io.File;
-import java.io.FileInputStream;
-import java.io.FileNotFoundException;
-import java.io.FileOutputStream;
import java.io.IOException;
import java.text.SimpleDateFormat;
import java.util.ArrayList;
@@ -31,12 +28,13 @@ import java.util.Locale;
import java.util.regex.Matcher;
import java.util.regex.Pattern;
-import org.apache.commons.io.IOUtils;
import org.apache.lucene.index.IndexCommit;
+import org.apache.lucene.store.Directory;
+import org.apache.lucene.store.FSDirectory;
+import org.apache.lucene.store.IOContext;
import org.apache.lucene.store.Lock;
import org.apache.lucene.store.SimpleFSLockFactory;
import org.apache.solr.common.util.NamedList;
-import org.apache.solr.core.IndexDeletionPolicyWrapper;
import org.apache.solr.core.SolrCore;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -57,7 +55,7 @@ public class SnapShooter {
solrCore = core;
if (location == null) snapDir = core.getDataDir();
else {
- File base = new File(core.getCoreDescriptor().getInstanceDir());
+ File base = new File(core.getCoreDescriptor().getRawInstanceDir());
snapDir = org.apache.solr.util.FileUtils.resolvePath(base, location).getAbsolutePath();
File dir = new File(snapDir);
if (!dir.exists()) dir.mkdirs();
@@ -101,8 +99,14 @@ public class SnapShooter {
return;
}
Collection<String> files = indexCommit.getFileNames();
- FileCopier fileCopier = new FileCopier(solrCore.getDeletionPolicy(), indexCommit);
- fileCopier.copyFiles(files, snapShotDir);
+ FileCopier fileCopier = new FileCopier();
+
+ Directory dir = solrCore.getDirectoryFactory().get(solrCore.getIndexDir(), null);
+ try {
+ fileCopier.copyFiles(dir, files, snapShotDir);
+ } finally {
+ solrCore.getDirectoryFactory().release(dir);
+ }
details.add("fileCount", files.size());
details.add("status", "success");
@@ -169,36 +173,26 @@ public class SnapShooter {
private class FileCopier {
- private static final int DEFAULT_BUFFER_SIZE = 32768;
- private byte[] buffer = new byte[DEFAULT_BUFFER_SIZE];
- private IndexCommit indexCommit;
- private IndexDeletionPolicyWrapper delPolicy;
-
- public FileCopier(IndexDeletionPolicyWrapper delPolicy, IndexCommit commit) {
- this.delPolicy = delPolicy;
- this.indexCommit = commit;
- }
- public void copyFiles(Collection<String> files, File destDir) throws IOException {
- for (String indexFile : files) {
- File source = new File(solrCore.getIndexDir(), indexFile);
- copyFile(source, new File(destDir, source.getName()), true);
+ public void copyFiles(Directory sourceDir, Collection<String> files,
+ File destDir) throws IOException {
+ // does destinations directory exist ?
+ if (destDir != null && !destDir.exists()) {
+ destDir.mkdirs();
+ }
+
+ FSDirectory dir = FSDirectory.open(destDir);
+ try {
+ for (String indexFile : files) {
+ copyFile(sourceDir, indexFile, new File(destDir, indexFile), dir);
+ }
+ } finally {
+ dir.close();
}
}
- public void copyFile(File source, File destination, boolean preserveFileDate)
+ public void copyFile(Directory sourceDir, String indexFile, File destination, Directory destDir)
throws IOException {
- // check source exists
- if (!source.exists()) {
- String message = "File " + source + " does not exist";
- throw new FileNotFoundException(message);
- }
-
- // does destinations directory exist ?
- if (destination.getParentFile() != null
- && !destination.getParentFile().exists()) {
- destination.getParentFile().mkdirs();
- }
// make sure we can write to destination
if (destination.exists() && !destination.canWrite()) {
@@ -206,45 +200,7 @@ public class SnapShooter {
throw new IOException(message);
}
- FileInputStream input = null;
- FileOutputStream output = null;
- try {
- input = new FileInputStream(source);
- output = new FileOutputStream(destination);
-
- int count = 0;
- int n = 0;
- int rcnt = 0;
- while (-1 != (n = input.read(buffer))) {
- output.write(buffer, 0, n);
- count += n;
- rcnt++;
- /***
- // reserve every 4.6875 MB
- if (rcnt == 150) {
- rcnt = 0;
- delPolicy.setReserveDuration(indexCommit.getVersion(), reserveTime);
- }
- ***/
- }
- } finally {
- try {
- IOUtils.closeQuietly(input);
- } finally {
- IOUtils.closeQuietly(output);
- }
- }
-
- if (source.length() != destination.length()) {
- String message = "Failed to copy full contents from " + source + " to "
- + destination;
- throw new IOException(message);
- }
-
- if (preserveFileDate) {
- // file copy should preserve file date
- destination.setLastModified(source.lastModified());
- }
+ sourceDir.copy(destDir, indexFile, indexFile, IOContext.DEFAULT);
}
}
Modified: lucene/dev/branches/lucene3846/solr/core/src/java/org/apache/solr/handler/admin/CoreAdminHandler.java
URL: http://svn.apache.org/viewvc/lucene/dev/branches/lucene3846/solr/core/src/java/org/apache/solr/handler/admin/CoreAdminHandler.java?rev=1403336&r1=1403335&r2=1403336&view=diff
==============================================================================
--- lucene/dev/branches/lucene3846/solr/core/src/java/org/apache/solr/handler/admin/CoreAdminHandler.java (original)
+++ lucene/dev/branches/lucene3846/solr/core/src/java/org/apache/solr/handler/admin/CoreAdminHandler.java Mon Oct 29 14:55:01 2012
@@ -610,20 +610,26 @@ public class CoreAdminHandler extends Re
@Override
public void postClose(SolrCore core) {
- File dataDir = new File(core.getIndexDir());
- File[] files = dataDir.listFiles();
- if (files != null) {
- for (File file : files) {
- if (!file.delete()) {
- log.error(file.getAbsolutePath()
- + " could not be deleted on core unload");
+ Directory dir = null;
+ try {
+ dir = core.getDirectoryFactory().get(core.getIndexDir(), null);
+ core.getDirectoryFactory().remove(dir);
+ core.getDirectoryFactory().doneWithDirectory(dir);
+ } catch (IOException e) {
+ throw new RuntimeException(e);
+ } finally {
+ if (dir != null) {
+ try {
+ core.getDirectoryFactory().release(dir);
+ } catch (IOException e) {
+ log.error("IOException trying to release directory", e);
}
}
- if (!dataDir.delete()) log.error(dataDir.getAbsolutePath()
- + " could not be deleted on core unload");
- } else {
- log.error(dataDir.getAbsolutePath()
- + " could not be deleted on core unload");
+ }
+ try {
+ core.getDirectoryFactory().remove(dir);
+ } catch (IOException e) {
+ log.error("IOException trying to remove directory", e);
}
}
});
@@ -668,7 +674,16 @@ public class CoreAdminHandler extends Re
});
}
} finally {
- if (core != null) core.close();
+ // it's important that we try and cancel recovery
+ // before we close here - else we might close the
+ // core *in* recovery and end up locked in recovery
+ // waiting to for recovery to be cancelled
+ if (core != null) {
+ if (coreContainer.getZkController() != null) {
+ core.getSolrCoreState().cancelRecovery();
+ }
+ core.close();
+ }
}
return coreContainer.isPersistent();
@@ -996,7 +1011,19 @@ public class CoreAdminHandler extends Re
}
private long getIndexSize(SolrCore core) {
- return FileUtils.sizeOfDirectory(new File(core.getIndexDir()));
+ Directory dir;
+ long size = 0;
+ try {
+ dir = core.getDirectoryFactory().get(core.getIndexDir(), null);
+ try {
+ size = DirectoryFactory.sizeOfDirectory(dir);
+ } finally {
+ core.getDirectoryFactory().release(dir);
+ }
+ } catch (IOException e) {
+ SolrException.log(log, "IO error while trying to get the size of the Directory", e);
+ }
+ return size;
}
protected static String normalizePath(String path) {
Modified: lucene/dev/branches/lucene3846/solr/core/src/java/org/apache/solr/handler/admin/SystemInfoHandler.java
URL: http://svn.apache.org/viewvc/lucene/dev/branches/lucene3846/solr/core/src/java/org/apache/solr/handler/admin/SystemInfoHandler.java?rev=1403336&r1=1403335&r2=1403336&view=diff
==============================================================================
--- lucene/dev/branches/lucene3846/solr/core/src/java/org/apache/solr/handler/admin/SystemInfoHandler.java (original)
+++ lucene/dev/branches/lucene3846/solr/core/src/java/org/apache/solr/handler/admin/SystemInfoHandler.java Mon Oct 29 14:55:01 2012
@@ -19,6 +19,7 @@ package org.apache.solr.handler.admin;
import java.io.DataInputStream;
import java.io.File;
+import java.io.IOException;
import java.io.InputStreamReader;
import java.lang.management.ManagementFactory;
import java.lang.management.OperatingSystemMXBean;
@@ -104,7 +105,13 @@ public class SystemInfoHandler extends R
dirs.add( "cwd" , new File( System.getProperty("user.dir")).getAbsolutePath() );
dirs.add( "instance", new File( core.getResourceLoader().getInstanceDir() ).getAbsolutePath() );
dirs.add( "data", new File( core.getDataDir() ).getAbsolutePath() );
- dirs.add( "index", new File( core.getIndexDir() ).getAbsolutePath() );
+ dirs.add( "dirimpl", core.getDirectoryFactory().getClass().getName());
+ try {
+ dirs.add( "index", core.getDirectoryFactory().normalize(core.getIndexDir()) );
+ } catch (IOException e) {
+ log.warn("Problem getting the normalized index directory path", e);
+ dirs.add( "index", "N/A" );
+ }
info.add( "directory", dirs );
return info;
}
Modified: lucene/dev/branches/lucene3846/solr/core/src/java/org/apache/solr/handler/component/RealTimeGetComponent.java
URL: http://svn.apache.org/viewvc/lucene/dev/branches/lucene3846/solr/core/src/java/org/apache/solr/handler/component/RealTimeGetComponent.java?rev=1403336&r1=1403335&r2=1403336&view=diff
==============================================================================
--- lucene/dev/branches/lucene3846/solr/core/src/java/org/apache/solr/handler/component/RealTimeGetComponent.java (original)
+++ lucene/dev/branches/lucene3846/solr/core/src/java/org/apache/solr/handler/component/RealTimeGetComponent.java Mon Oct 29 14:55:01 2012
@@ -525,7 +525,7 @@ public class RealTimeGetComponent extend
boolean cantReachIsSuccess = rb.req.getParams().getBool("cantReachIsSuccess", false);
- PeerSync peerSync = new PeerSync(rb.req.getCore(), replicas, nVersions, cantReachIsSuccess);
+ PeerSync peerSync = new PeerSync(rb.req.getCore(), replicas, nVersions, cantReachIsSuccess, true);
boolean success = peerSync.sync();
// TODO: more complex response?
Modified: lucene/dev/branches/lucene3846/solr/core/src/java/org/apache/solr/schema/ExternalFileField.java
URL: http://svn.apache.org/viewvc/lucene/dev/branches/lucene3846/solr/core/src/java/org/apache/solr/schema/ExternalFileField.java?rev=1403336&r1=1403335&r2=1403336&view=diff
==============================================================================
--- lucene/dev/branches/lucene3846/solr/core/src/java/org/apache/solr/schema/ExternalFileField.java (original)
+++ lucene/dev/branches/lucene3846/solr/core/src/java/org/apache/solr/schema/ExternalFileField.java Mon Oct 29 14:55:01 2012
@@ -16,18 +16,16 @@
*/
package org.apache.solr.schema;
+import org.apache.lucene.index.StorableField;
import org.apache.lucene.queries.function.ValueSource;
import org.apache.lucene.search.SortField;
-import org.apache.lucene.index.GeneralField;
-import org.apache.lucene.index.IndexableField;
-import org.apache.lucene.index.StorableField;
-import org.apache.solr.search.function.FileFloatSource;
-import org.apache.solr.search.QParser;
-import org.apache.solr.response.TextResponseWriter;
import org.apache.solr.common.SolrException;
+import org.apache.solr.response.TextResponseWriter;
+import org.apache.solr.search.QParser;
+import org.apache.solr.search.function.FileFloatSource;
-import java.util.Map;
import java.io.IOException;
+import java.util.Map;
/** Get values from an external file instead of the index.
*
@@ -55,7 +53,7 @@ import java.io.IOException;
* <p/>The external file may be sorted or unsorted by the key field, but it will be substantially slower (untested) if it isn't sorted.
* <p/>Fields of this type may currently only be used as a ValueSource in a FunctionQuery.
*
- *
+ * @see ExternalFileFieldReloader
*/
public class ExternalFileField extends FieldType {
private FieldType ftype;
@@ -94,10 +92,26 @@ public class ExternalFileField extends F
@Override
public ValueSource getValueSource(SchemaField field, QParser parser) {
- // default key field to unique key
- SchemaField keyField = keyFieldName==null ? schema.getUniqueKeyField() : schema.getField(keyFieldName);
- return new FileFloatSource(field, keyField, defVal, parser);
+ return getFileFloatSource(field, parser.getReq().getCore().getDataDir());
}
+ /**
+ * Get a FileFloatSource for the given field, looking in datadir for the relevant file
+ * @param field the field to get a source for
+ * @param datadir the data directory in which to look for the external file
+ * @return a FileFloatSource
+ */
+ public FileFloatSource getFileFloatSource(SchemaField field, String datadir) {
+ // Because the float source uses a static cache, all source objects will
+ // refer to the same data.
+ return new FileFloatSource(field, getKeyField(), defVal, datadir);
+ }
+
+ // If no key field is defined, we use the unique key field
+ private SchemaField getKeyField() {
+ return keyFieldName == null ?
+ schema.getUniqueKeyField() :
+ schema.getField(keyFieldName);
+ }
}
Modified: lucene/dev/branches/lucene3846/solr/core/src/java/org/apache/solr/search/SolrIndexSearcher.java
URL: http://svn.apache.org/viewvc/lucene/dev/branches/lucene3846/solr/core/src/java/org/apache/solr/search/SolrIndexSearcher.java?rev=1403336&r1=1403335&r2=1403336&view=diff
==============================================================================
--- lucene/dev/branches/lucene3846/solr/core/src/java/org/apache/solr/search/SolrIndexSearcher.java (original)
+++ lucene/dev/branches/lucene3846/solr/core/src/java/org/apache/solr/search/SolrIndexSearcher.java Mon Oct 29 14:55:01 2012
@@ -41,6 +41,7 @@ import org.apache.lucene.store.NRTCachin
import org.apache.lucene.util.Bits;
import org.apache.lucene.util.BytesRef;
import org.apache.lucene.util.OpenBitSet;
+import org.apache.solr.common.SolrException;
import org.apache.solr.common.params.ModifiableSolrParams;
import org.apache.solr.common.util.NamedList;
import org.apache.solr.common.util.SimpleOrderedMap;
@@ -77,7 +78,7 @@ public class SolrIndexSearcher extends I
private static Logger log = LoggerFactory.getLogger(SolrIndexSearcher.class);
private final SolrCore core;
private final IndexSchema schema;
- private String indexDir;
+
private boolean debug = log.isDebugEnabled();
private final String name;
@@ -148,8 +149,6 @@ public class SolrIndexSearcher extends I
directoryFactory.incRef(dir);
}
- this.indexDir = getIndexDir(dir);
-
this.closeReader = closeReader;
setSimilarity(schema.getSimilarity());
@@ -273,7 +272,11 @@ public class SolrIndexSearcher extends I
// super.close();
// can't use super.close() since it just calls reader.close() and that may only be called once
// per reader (even if incRef() was previously called).
- if (closeReader) reader.decRef();
+ try {
+ if (closeReader) reader.decRef();
+ } catch (Throwable t) {
+ SolrException.log(log, "Problem dec ref'ing reader", t);
+ }
for (SolrCache cache : cacheList) {
cache.close();
@@ -409,12 +412,6 @@ public class SolrIndexSearcher extends I
// }
// }
- /**
- * @return the indexDir on which this searcher is opened
- */
- public String getIndexDir() {
- return indexDir;
- }
/* ********************** Document retrieval *************************/
Modified: lucene/dev/branches/lucene3846/solr/core/src/java/org/apache/solr/search/SurroundQParserPlugin.java
URL: http://svn.apache.org/viewvc/lucene/dev/branches/lucene3846/solr/core/src/java/org/apache/solr/search/SurroundQParserPlugin.java?rev=1403336&r1=1403335&r2=1403336&view=diff
==============================================================================
--- lucene/dev/branches/lucene3846/solr/core/src/java/org/apache/solr/search/SurroundQParserPlugin.java (original)
+++ lucene/dev/branches/lucene3846/solr/core/src/java/org/apache/solr/search/SurroundQParserPlugin.java Mon Oct 29 14:55:01 2012
@@ -34,8 +34,8 @@ import org.slf4j.LoggerFactory;
* Plugin for lucene/contrib Surround query parser, bringing SpanQuery support
* to Solr
*
- * <queryParser name="surround"
- * class="org.apache.solr.search.SurroundQParserPlugin" />
+ * <queryParser name="surround"
+ * class="org.apache.solr.search.SurroundQParserPlugin" />
*
* Examples of query syntax can be found in lucene/queryparser/docs/surround
*
Modified: lucene/dev/branches/lucene3846/solr/core/src/java/org/apache/solr/search/function/FileFloatSource.java
URL: http://svn.apache.org/viewvc/lucene/dev/branches/lucene3846/solr/core/src/java/org/apache/solr/search/function/FileFloatSource.java?rev=1403336&r1=1403335&r2=1403336&view=diff
==============================================================================
--- lucene/dev/branches/lucene3846/solr/core/src/java/org/apache/solr/search/function/FileFloatSource.java (original)
+++ lucene/dev/branches/lucene3846/solr/core/src/java/org/apache/solr/search/function/FileFloatSource.java Mon Oct 29 14:55:01 2012
@@ -16,24 +16,7 @@
*/
package org.apache.solr.search.function;
-import java.io.BufferedReader;
-import java.io.IOException;
-import java.io.InputStream;
-import java.io.InputStreamReader;
-import java.util.ArrayList;
-import java.util.Arrays;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-import java.util.WeakHashMap;
-
-import org.apache.lucene.index.DocsEnum;
-import org.apache.lucene.index.IndexReader;
-import org.apache.lucene.index.MultiFields;
-import org.apache.lucene.index.IndexReaderContext;
-import org.apache.lucene.index.ReaderUtil;
-import org.apache.lucene.index.TermsEnum;
-import org.apache.lucene.index.AtomicReaderContext;
+import org.apache.lucene.index.*;
import org.apache.lucene.queries.function.FunctionValues;
import org.apache.lucene.queries.function.ValueSource;
import org.apache.lucene.queries.function.docvalues.FloatDocValues;
@@ -47,29 +30,45 @@ import org.apache.solr.request.SolrQuery
import org.apache.solr.response.SolrQueryResponse;
import org.apache.solr.schema.FieldType;
import org.apache.solr.schema.SchemaField;
-import org.apache.solr.search.QParser;
import org.apache.solr.update.processor.UpdateRequestProcessor;
import org.apache.solr.util.VersionedFile;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import java.io.BufferedReader;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.InputStreamReader;
+import java.util.*;
+
/**
* Obtains float field values from an external file.
*
+ * @see org.apache.solr.schema.ExternalFileField
+ * @see org.apache.solr.schema.ExternalFileFieldReloader
*/
public class FileFloatSource extends ValueSource {
+
private SchemaField field;
private final SchemaField keyField;
private final float defVal;
-
private final String dataDir;
- public FileFloatSource(SchemaField field, SchemaField keyField, float defVal, QParser parser) {
+ private static final Logger log = LoggerFactory.getLogger(FileFloatSource.class);
+
+ /**
+ * Creates a new FileFloatSource
+ * @param field the source's SchemaField
+ * @param keyField the field to use as a key
+ * @param defVal the default value to use if a field has no entry in the external file
+ * @param datadir the directory in which to look for the external file
+ */
+ public FileFloatSource(SchemaField field, SchemaField keyField, float defVal, String datadir) {
this.field = field;
this.keyField = keyField;
this.defVal = defVal;
- this.dataDir = parser.getReq().getCore().getDataDir();
+ this.dataDir = datadir;
}
@Override
@@ -117,11 +116,27 @@ public class FileFloatSource extends Val
+ ",defVal="+defVal+",dataDir="+dataDir+")";
}
-
+
+ /**
+ * Remove all cached entries. Values are lazily loaded next time getValues() is
+ * called.
+ */
public static void resetCache(){
floatCache.resetCache();
}
+ /**
+ * Refresh the cache for an IndexReader. The new values are loaded in the background
+ * and then swapped in, so queries against the cache should not block while the reload
+ * is happening.
+ * @param reader the IndexReader whose cache needs refreshing
+ */
+ public void refreshCache(IndexReader reader) {
+ log.info("Refreshing FlaxFileFloatSource cache for field {}", this.field.getName());
+ floatCache.refresh(reader, new Entry(this));
+ log.info("FlaxFileFloatSource cache for field {} reloaded", this.field.getName());
+ }
+
private final float[] getCachedFloats(IndexReader reader) {
return (float[])floatCache.get(reader, new Entry(this));
}
@@ -139,6 +154,18 @@ public class FileFloatSource extends Val
protected abstract Object createValue(IndexReader reader, Object key);
+ public void refresh(IndexReader reader, Object key) {
+ Object refreshedValues = createValue(reader, key);
+ synchronized (readerCache) {
+ Map innerCache = (Map) readerCache.get(reader);
+ if (innerCache == null) {
+ innerCache = new HashMap();
+ readerCache.put(reader, innerCache);
+ }
+ innerCache.put(key, refreshedValues);
+ }
+ }
+
public Object get(IndexReader reader, Object key) {
Map innerCache;
Object value;
Modified: lucene/dev/branches/lucene3846/solr/core/src/java/org/apache/solr/servlet/SolrDispatchFilter.java
URL: http://svn.apache.org/viewvc/lucene/dev/branches/lucene3846/solr/core/src/java/org/apache/solr/servlet/SolrDispatchFilter.java?rev=1403336&r1=1403335&r2=1403336&view=diff
==============================================================================
--- lucene/dev/branches/lucene3846/solr/core/src/java/org/apache/solr/servlet/SolrDispatchFilter.java (original)
+++ lucene/dev/branches/lucene3846/solr/core/src/java/org/apache/solr/servlet/SolrDispatchFilter.java Mon Oct 29 14:55:01 2012
@@ -493,20 +493,24 @@ public class SolrDispatchFilter implemen
* filter into a larger web application.
*
* For example, if web.xml specifies:
- *
+ * <pre class="prettyprint">
+ * {@code
* <filter-mapping>
* <filter-name>SolrRequestFilter</filter-name>
* <url-pattern>/xxx/*</url-pattern>
- * </filter-mapping>
+ * </filter-mapping>}
+ * </pre>
*
* Make sure to set the PathPrefix to "/xxx" either with this function
* or in web.xml.
*
+ * <pre class="prettyprint">
+ * {@code
* <init-param>
* <param-name>path-prefix</param-name>
* <param-value>/xxx</param-value>
- * </init-param>
- *
+ * </init-param>}
+ * </pre>
*/
public void setPathPrefix(String pathPrefix) {
this.pathPrefix = pathPrefix;
Modified: lucene/dev/branches/lucene3846/solr/core/src/java/org/apache/solr/update/DefaultSolrCoreState.java
URL: http://svn.apache.org/viewvc/lucene/dev/branches/lucene3846/solr/core/src/java/org/apache/solr/update/DefaultSolrCoreState.java?rev=1403336&r1=1403335&r2=1403336&view=diff
==============================================================================
--- lucene/dev/branches/lucene3846/solr/core/src/java/org/apache/solr/update/DefaultSolrCoreState.java (original)
+++ lucene/dev/branches/lucene3846/solr/core/src/java/org/apache/solr/update/DefaultSolrCoreState.java Mon Oct 29 14:55:01 2012
@@ -44,7 +44,7 @@ public final class DefaultSolrCoreState
private volatile boolean recoveryRunning;
private RecoveryStrategy recoveryStrat;
- private boolean closed = false;
+ private volatile boolean closed = false;
private RefCounted<IndexWriter> refCntWriter;
@@ -113,7 +113,7 @@ public final class DefaultSolrCoreState
}
@Override
- public synchronized void newIndexWriter(SolrCore core, boolean rollback) throws IOException {
+ public synchronized void newIndexWriter(SolrCore core, boolean rollback, boolean forceNewDir) throws IOException {
log.info("Creating new IndexWriter...");
String coreName = core.getName();
synchronized (writerPauseLock) {
@@ -148,7 +148,7 @@ public final class DefaultSolrCoreState
}
}
}
- indexWriter = createMainIndexWriter(core, "DirectUpdateHandler2", true);
+ indexWriter = createMainIndexWriter(core, "DirectUpdateHandler2", forceNewDir);
log.info("New IndexWriter is ready to be used.");
// we need to null this so it picks up the new writer next get call
refCntWriter = null;
@@ -162,7 +162,7 @@ public final class DefaultSolrCoreState
@Override
public synchronized void rollbackIndexWriter(SolrCore core) throws IOException {
- newIndexWriter(core, true);
+ newIndexWriter(core, true, true);
}
protected SolrIndexWriter createMainIndexWriter(SolrCore core, String name, boolean forceNewDirectory) throws IOException {
Modified: lucene/dev/branches/lucene3846/solr/core/src/java/org/apache/solr/update/DirectUpdateHandler2.java
URL: http://svn.apache.org/viewvc/lucene/dev/branches/lucene3846/solr/core/src/java/org/apache/solr/update/DirectUpdateHandler2.java?rev=1403336&r1=1403335&r2=1403336&view=diff
==============================================================================
--- lucene/dev/branches/lucene3846/solr/core/src/java/org/apache/solr/update/DirectUpdateHandler2.java (original)
+++ lucene/dev/branches/lucene3846/solr/core/src/java/org/apache/solr/update/DirectUpdateHandler2.java Mon Oct 29 14:55:01 2012
@@ -45,7 +45,6 @@ import org.apache.lucene.search.MatchAll
import org.apache.lucene.search.Query;
import org.apache.lucene.search.TermQuery;
import org.apache.solr.common.SolrException;
-import org.apache.solr.common.SolrException.ErrorCode;
import org.apache.solr.common.params.ModifiableSolrParams;
import org.apache.solr.common.util.NamedList;
import org.apache.solr.common.util.SimpleOrderedMap;
@@ -447,7 +446,11 @@ public class DirectUpdateHandler2 extend
log.info("start "+cmd);
RefCounted<IndexWriter> iw = solrCoreState.getIndexWriter(core);
try {
- iw.get().prepareCommit();
+ final Map<String,String> commitData = new HashMap<String,String>();
+ commitData.put(SolrIndexWriter.COMMIT_TIME_MSEC_KEY,
+ String.valueOf(System.currentTimeMillis()));
+
+ iw.get().prepareCommit(commitData);
} finally {
iw.decref();
}
@@ -600,8 +603,8 @@ public class DirectUpdateHandler2 extend
}
@Override
- public void newIndexWriter(boolean rollback) throws IOException {
- solrCoreState.newIndexWriter(core, rollback);
+ public void newIndexWriter(boolean rollback, boolean forceNewDir) throws IOException {
+ solrCoreState.newIndexWriter(core, rollback, forceNewDir);
}
/**
Modified: lucene/dev/branches/lucene3846/solr/core/src/java/org/apache/solr/update/DocumentBuilder.java
URL: http://svn.apache.org/viewvc/lucene/dev/branches/lucene3846/solr/core/src/java/org/apache/solr/update/DocumentBuilder.java?rev=1403336&r1=1403335&r2=1403336&view=diff
==============================================================================
--- lucene/dev/branches/lucene3846/solr/core/src/java/org/apache/solr/update/DocumentBuilder.java (original)
+++ lucene/dev/branches/lucene3846/solr/core/src/java/org/apache/solr/update/DocumentBuilder.java Mon Oct 29 14:55:01 2012
@@ -238,8 +238,6 @@ public class DocumentBuilder {
SchemaField sfield = schema.getFieldOrNull(name);
boolean used = false;
- float boost = field.getBoost();
- boolean applyBoost = sfield != null && sfield.indexed() && !sfield.omitNorms();
// Make sure it has the correct number
if( sfield!=null && !sfield.multiValued() && field.getValueCount() > 1 ) {
@@ -248,17 +246,18 @@ public class DocumentBuilder {
sfield.getName() + ": " +field.getValue() );
}
- if (applyBoost == false && boost != 1.0F) {
+ float fieldBoost = field.getBoost();
+ boolean applyBoost = sfield != null && sfield.indexed() && !sfield.omitNorms();
+
+ if (applyBoost == false && fieldBoost != 1.0F) {
throw new SolrException( SolrException.ErrorCode.BAD_REQUEST,
"ERROR: "+getID(doc, schema)+"cannot set an index-time boost, unindexed or norms are omitted for field " +
sfield.getName() + ": " +field.getValue() );
}
// Lucene no longer has a native docBoost, so we have to multiply
- // it ourselves (do this after the applyBoost error check so we don't
- // give an error on fields that don't support boost just because of a
- // docBoost)
- boost *= docBoost;
+ // it ourselves
+ float compoundBoost = fieldBoost * docBoost;
// load each field value
boolean hasField = false;
@@ -270,16 +269,20 @@ public class DocumentBuilder {
hasField = true;
if (sfield != null) {
used = true;
- addField(out, sfield, v, applyBoost ? boost : 1f);
+ addField(out, sfield, v, applyBoost ? compoundBoost : 1f);
}
- // Check if we should copy this field to any other fields.
+ // Check if we should copy this field value to any other fields.
// This could happen whether it is explicit or not.
List<CopyField> copyFields = schema.getCopyFieldsList(name);
for (CopyField cf : copyFields) {
SchemaField destinationField = cf.getDestination();
+
+ final boolean destHasValues =
+ (null != out.getField(destinationField.getName()));
+
// check if the copy field is a multivalued or not
- if (!destinationField.multiValued() && out.getField(destinationField.getName()) != null) {
+ if (!destinationField.multiValued() && destHasValues) {
throw new SolrException(SolrException.ErrorCode.BAD_REQUEST,
"ERROR: "+getID(doc, schema)+"multiple values encountered for non multiValued copy field " +
destinationField.getName() + ": " + v);
@@ -292,14 +295,23 @@ public class DocumentBuilder {
if( val instanceof String && cf.getMaxChars() > 0 ) {
val = cf.getLimitedValue((String)val);
}
- addField(out, destinationField, val, destinationField.indexed() && !destinationField.omitNorms() ? boost : 1F);
+
+ // we can't copy any boost unless the dest field is
+ // indexed & !omitNorms, but which boost we copy depends
+ // on wether the dest field already contains values (we
+ // don't want to apply the compounded docBoost more then once)
+ final float destBoost =
+ (destinationField.indexed() && !destinationField.omitNorms()) ?
+ (destHasValues ? fieldBoost : compoundBoost) : 1.0F;
+
+ addField(out, destinationField, val, destBoost);
}
- // The boost for a given field is the product of the
+ // The final boost for a given field named is the product of the
// *all* boosts on values of that field.
// For multi-valued fields, we only want to set the boost on the
// first field.
- boost = 1.0f;
+ fieldBoost = compoundBoost = 1.0f;
}
}
catch( SolrException ex ) {
Modified: lucene/dev/branches/lucene3846/solr/core/src/java/org/apache/solr/update/PeerSync.java
URL: http://svn.apache.org/viewvc/lucene/dev/branches/lucene3846/solr/core/src/java/org/apache/solr/update/PeerSync.java?rev=1403336&r1=1403335&r2=1403336&view=diff
==============================================================================
--- lucene/dev/branches/lucene3846/solr/core/src/java/org/apache/solr/update/PeerSync.java (original)
+++ lucene/dev/branches/lucene3846/solr/core/src/java/org/apache/solr/update/PeerSync.java Mon Oct 29 14:55:01 2012
@@ -19,6 +19,7 @@ package org.apache.solr.update;
import java.io.IOException;
import java.net.ConnectException;
+import java.net.SocketException;
import java.util.ArrayList;
import java.util.Collections;
import java.util.Comparator;
@@ -79,6 +80,7 @@ public class PeerSync {
private long ourLowThreshold; // 20th percentile
private long ourHighThreshold; // 80th percentile
private boolean cantReachIsSuccess;
+ private boolean getNoVersionsIsSuccess;
private static final HttpClient client;
static {
ModifiableSolrParams params = new ModifiableSolrParams();
@@ -129,14 +131,15 @@ public class PeerSync {
}
public PeerSync(SolrCore core, List<String> replicas, int nUpdates) {
- this(core, replicas, nUpdates, false);
+ this(core, replicas, nUpdates, false, true);
}
- public PeerSync(SolrCore core, List<String> replicas, int nUpdates, boolean cantReachIsSuccess) {
+ public PeerSync(SolrCore core, List<String> replicas, int nUpdates, boolean cantReachIsSuccess, boolean getNoVersionsIsSuccess) {
this.replicas = replicas;
this.nUpdates = nUpdates;
this.maxUpdates = nUpdates;
this.cantReachIsSuccess = cantReachIsSuccess;
+ this.getNoVersionsIsSuccess = getNoVersionsIsSuccess;
uhandler = core.getUpdateHandler();
@@ -301,7 +304,7 @@ public class PeerSync {
Throwable solrException = ((SolrServerException) srsp.getException())
.getRootCause();
if (solrException instanceof ConnectException || solrException instanceof ConnectTimeoutException
- || solrException instanceof NoHttpResponseException) {
+ || solrException instanceof NoHttpResponseException || solrException instanceof SocketException) {
log.warn(msg() + " couldn't connect to " + srsp.getShardAddress() + ", counting as success");
return true;
@@ -343,7 +346,7 @@ public class PeerSync {
log.info(msg() + " Received " + otherVersions.size() + " versions from " + sreq.shards[0] );
if (otherVersions.size() == 0) {
- return true;
+ return getNoVersionsIsSuccess;
}
boolean completeList = otherVersions.size() < nUpdates; // do we have their complete list of updates?
Modified: lucene/dev/branches/lucene3846/solr/core/src/java/org/apache/solr/update/SolrCmdDistributor.java
URL: http://svn.apache.org/viewvc/lucene/dev/branches/lucene3846/solr/core/src/java/org/apache/solr/update/SolrCmdDistributor.java?rev=1403336&r1=1403335&r2=1403336&view=diff
==============================================================================
--- lucene/dev/branches/lucene3846/solr/core/src/java/org/apache/solr/update/SolrCmdDistributor.java (original)
+++ lucene/dev/branches/lucene3846/solr/core/src/java/org/apache/solr/update/SolrCmdDistributor.java Mon Oct 29 14:55:01 2012
@@ -101,7 +101,6 @@ public class SolrCmdDistributor {
public void finish() {
- // piggyback on any outstanding adds or deletes if possible.
flushAdds(1);
flushDeletes(1);
@@ -150,6 +149,12 @@ public class SolrCmdDistributor {
public void distribCommit(CommitUpdateCommand cmd, List<Node> nodes,
ModifiableSolrParams params) throws IOException {
+
+ // make sure we are ordered
+ flushAdds(1);
+ flushDeletes(1);
+
+
// Wait for all outstanding responses to make sure that a commit
// can't sneak in ahead of adds or deletes we already sent.
// We could do this on a per-server basis, but it's more complex
@@ -163,7 +168,7 @@ public class SolrCmdDistributor {
addCommit(ureq, cmd);
- log.info("Distrib commit to:" + nodes);
+ log.info("Distrib commit to:" + nodes + " params:" + params);
for (Node node : nodes) {
submit(ureq, node);
Modified: lucene/dev/branches/lucene3846/solr/core/src/java/org/apache/solr/update/SolrCoreState.java
URL: http://svn.apache.org/viewvc/lucene/dev/branches/lucene3846/solr/core/src/java/org/apache/solr/update/SolrCoreState.java?rev=1403336&r1=1403335&r2=1403336&view=diff
==============================================================================
--- lucene/dev/branches/lucene3846/solr/core/src/java/org/apache/solr/update/SolrCoreState.java (original)
+++ lucene/dev/branches/lucene3846/solr/core/src/java/org/apache/solr/update/SolrCoreState.java Mon Oct 29 14:55:01 2012
@@ -44,7 +44,7 @@ public abstract class SolrCoreState {
* @param rollback close IndexWriter if false, else rollback
* @throws IOException If there is a low-level I/O error.
*/
- public abstract void newIndexWriter(SolrCore core, boolean rollback) throws IOException;
+ public abstract void newIndexWriter(SolrCore core, boolean rollback, boolean forceNewDir) throws IOException;
/**
* Get the current IndexWriter. If a new IndexWriter must be created, use the
Modified: lucene/dev/branches/lucene3846/solr/core/src/java/org/apache/solr/update/UpdateHandler.java
URL: http://svn.apache.org/viewvc/lucene/dev/branches/lucene3846/solr/core/src/java/org/apache/solr/update/UpdateHandler.java?rev=1403336&r1=1403335&r2=1403336&view=diff
==============================================================================
--- lucene/dev/branches/lucene3846/solr/core/src/java/org/apache/solr/update/UpdateHandler.java (original)
+++ lucene/dev/branches/lucene3846/solr/core/src/java/org/apache/solr/update/UpdateHandler.java Mon Oct 29 14:55:01 2012
@@ -116,10 +116,11 @@ public abstract class UpdateHandler impl
* all of the index files.
*
* @param rollback IndexWriter if true else close
+ * @param forceNewDir Force a new Directory instance
*
* @throws IOException If there is a low-level I/O error.
*/
- public abstract void newIndexWriter(boolean rollback) throws IOException;
+ public abstract void newIndexWriter(boolean rollback, boolean forceNewDir) throws IOException;
public abstract SolrCoreState getSolrCoreState();