You are viewing a plain text version of this content. The canonical link for it is here.
Posted to solr-commits@lucene.apache.org by sh...@apache.org on 2008/10/29 09:54:55 UTC
svn commit: r708837 - in /lucene/solr/trunk/src/java/org/apache/solr/handler:
ReplicationHandler.java SnapPuller.java
Author: shalin
Date: Wed Oct 29 01:54:54 2008
New Revision: 708837
URL: http://svn.apache.org/viewvc?rev=708837&view=rev
Log:
More comments, fixed typos.
Modified:
lucene/solr/trunk/src/java/org/apache/solr/handler/ReplicationHandler.java
lucene/solr/trunk/src/java/org/apache/solr/handler/SnapPuller.java
Modified: lucene/solr/trunk/src/java/org/apache/solr/handler/ReplicationHandler.java
URL: http://svn.apache.org/viewvc/lucene/solr/trunk/src/java/org/apache/solr/handler/ReplicationHandler.java?rev=708837&r1=708836&r2=708837&view=diff
==============================================================================
--- lucene/solr/trunk/src/java/org/apache/solr/handler/ReplicationHandler.java (original)
+++ lucene/solr/trunk/src/java/org/apache/solr/handler/ReplicationHandler.java Wed Oct 29 01:54:54 2008
@@ -25,9 +25,9 @@
import org.apache.solr.common.util.NamedList;
import org.apache.solr.common.util.SimpleOrderedMap;
import org.apache.solr.core.CloseHook;
+import org.apache.solr.core.IndexDeletionPolicyWrapper;
import org.apache.solr.core.SolrCore;
import org.apache.solr.core.SolrEventListener;
-import org.apache.solr.core.IndexDeletionPolicyWrapper;
import org.apache.solr.request.BinaryQueryResponseWriter;
import org.apache.solr.request.SolrQueryRequest;
import org.apache.solr.request.SolrQueryResponse;
@@ -106,15 +106,15 @@
rsp.add("status", "OK");
return;
}
- //This command does not give the current index version of the master
- // It gives the current replicateable index version
+ // This command does not give the current index version of the master
+ // It gives the current 'replicateable' index version
if (command.equals(CMD_INDEX_VERSION)) {
IndexCommit commitPoint = indexCommitPoint; // make a copy so it won't change
if (commitPoint != null) {
rsp.add(CMD_INDEX_VERSION, commitPoint.getVersion());
rsp.add(GENERATION, commitPoint.getGeneration());
} else {
- // This happens when replicateAfter does not have startup and no commit/optimize
+ // This happens when replication is not configured to happen after startup and no commit/optimize
// has happened yet.
rsp.add(CMD_INDEX_VERSION, 0L);
rsp.add(GENERATION, 0L);
@@ -164,7 +164,8 @@
return l;
}
- /**Gets the checksum of a file
+ /**
+ * Gets the checksum of a file
*/
private void getFileChecksum(SolrParams solrParams, SolrQueryResponse rsp) {
Checksum checksum = new Adler32();
@@ -231,8 +232,11 @@
}
}
- /**This method adds an Object of FileStream to the resposnse .
- * The FileStream implements a custom protocol which is also understoop by the SnapPuller
+ /**
+ * This method adds an Object of FileStream to the resposnse .
+ * The FileStream implements a custom protocol which is understood by SnapPuller.FileFetcher
+ *
+ * @see org.apache.solr.handler.SnapPuller.FileFetcher
*/
private void getFileStream(SolrParams solrParams, SolrQueryResponse rsp) {
ModifiableSolrParams rawParams = new ModifiableSolrParams(solrParams);
@@ -279,10 +283,12 @@
rsp.add(CONF_FILES, confFiles);
}
- /** for configuration files checksum of the file also is included
- * because ,unlike index ,files they may have same content but different timestamps
+ /**
+ * For configuration files, checksum of the file is included
+ * because, unlike index files, they may have same content but different timestamps.
+ * <p/>
* The local conf files information is cached so that everytime it does not have to
- * read the file content. The cache is refreshed only if the lastmodified of the file changes
+ * compute the checksum. The cache is refreshed only if the lastModified of the file changes
*/
List<Map<String, Object>> getConfFileCache(Collection<String> filenames) {
List<Map<String, Object>> confFiles = new ArrayList<Map<String, Object>>();
@@ -291,7 +297,7 @@
Checksum checksum = null;
for (String cf : filenames) {
File f = new File(confDir, cf);
- if (!f.exists() || f.isDirectory()) continue;//must not happen
+ if (!f.exists() || f.isDirectory()) continue; //must not happen
FileInfo info = confFileInfoCache.get(cf);
if (info == null || info.lastmodified != f.lastModified() || info.size != f.length()) {
if (checksum == null) checksum = new Adler32();
@@ -305,7 +311,6 @@
}
private static class FileInfo {
-
long lastmodified;
String name;
long size;
@@ -365,7 +370,8 @@
return size;
}
- /**Collects the details such as name, size ,lasmodified of a file
+ /**
+ * Collects the details such as name, size ,lastModified of a file
*/
private Map<String, Object> getFileInfo(File file) {
Map<String, Object> fileMeta = new HashMap<String, Object>();
@@ -376,19 +382,19 @@
}
public String getDescription() {
- return "";
+ return "ReplicationHandler provides replication of index and configuration files from Master to Slaves";
}
public String getSourceId() {
- return "";
+ return "$Id$";
}
public String getSource() {
- return "";
+ return "$URL$";
}
public String getVersion() {
- return "$Id$";
+ return "$Revision$";
}
String readableSize(long size) {
@@ -451,6 +457,9 @@
return list;
}
+ /**
+ * Used for showing statistics and progress information.
+ */
void getReplicationDetails(SolrQueryResponse resp) {
String timeLastReplicated = "", confFilesReplicated = "", confFilesReplicatedTime = "", timesIndexReplicated = "", timesConfigReplicated = "";
NamedList<Object> details = new SimpleOrderedMap<Object>();
@@ -645,10 +654,10 @@
LOG.info("Replication enabled for following config files: " + includeConfFiles);
}
List snapshot = master.getAll("snapshot");
- boolean snapshotOnCommit = snapshot.contains("commit");
+ boolean snapshotOnCommit = snapshot.contains("commit");
boolean snapshotOnOptimize = snapshot.contains("optimize");
- List replicateAfter = master.getAll(REPLICATE_AFTER);
- replicateOnCommit = replicateAfter.contains("commit");
+ List replicateAfter = master.getAll(REPLICATE_AFTER);
+ replicateOnCommit = replicateAfter.contains("commit");
replicateOnOptimize = replicateAfter.contains("optimize");
if (replicateOnOptimize || snapshotOnOptimize) {
@@ -663,7 +672,7 @@
try {
indexCommitPoint = s.get().getReader().getIndexCommit();
} catch (IOException e) {
- LOG.warn("Unable to get IndexCommit on startup",e);
+ LOG.warn("Unable to get IndexCommit on startup", e);
} finally {
s.decref();
}
@@ -677,7 +686,8 @@
}
}
- /**register a closehook
+ /**
+ * register a closehook
*/
private void registerCloseHook() {
core.addCloseHook(new CloseHook() {
@@ -689,7 +699,10 @@
});
}
- /**A responsewriter is registered automatically for wt=filestream
+ /**
+ * A ResponseWriter is registered automatically for wt=filestream
+ * This response writer is used to transfer index files in a block-by-block manner within
+ * the same HTTP response.
*/
private void registerFileStreamResponseWriter() {
core.registerResponseWriter(FILE_STREAM, new BinaryQueryResponseWriter() {
@@ -711,18 +724,22 @@
}
- /**Register a listener for postcommit/optimize
+ /**
+ * Register a listener for postcommit/optimize
+ *
* @param snapshoot do a snapshoot
* @param getCommit get a commitpoint also
* @return an instance of the eventlistener
*/
-
private SolrEventListener getEventListener(final boolean snapshoot, final boolean getCommit) {
return new SolrEventListener() {
public void init(NamedList args) {/*no op*/ }
+ /**
+ * This refreshes the latest replicateable index commit and optionally can create Snapshots as well
+ */
public void postCommit() {
- if(getCommit){
+ if (getCommit) {
indexCommitPoint = core.getDeletionPolicy().getLatestCommit();
}
if (snapshoot) {
@@ -767,7 +784,7 @@
String sLen = params.get(LEN);
String sChecksum = params.get(CHECKSUM);
String sindexVersion = params.get(CMD_INDEX_VERSION);
- if(sindexVersion != null) indexVersion = Long.parseLong(sindexVersion);
+ if (sindexVersion != null) indexVersion = Long.parseLong(sindexVersion);
FileInputStream inputStream = null;
int packetsWritten = 0;
try {
@@ -820,7 +837,7 @@
}
fos.write(buf, 0, (int) bytesRead);
fos.flush();
- if(indexVersion != null && (packetsWritten % 5 == 0)){
+ if (indexVersion != null && (packetsWritten % 5 == 0)) {
//after every 5 packets reserve the commitpoint for some time
delPolicy.setReserveDuration(indexVersion, reserveCommitDuration);
}
@@ -836,6 +853,9 @@
}
}
+ /**
+ * Used to write a marker for EOF
+ */
private void writeNothing() throws IOException {
fos.writeInt(0);
fos.flush();
Modified: lucene/solr/trunk/src/java/org/apache/solr/handler/SnapPuller.java
URL: http://svn.apache.org/viewvc/lucene/solr/trunk/src/java/org/apache/solr/handler/SnapPuller.java?rev=708837&r1=708836&r2=708837&view=diff
==============================================================================
--- lucene/solr/trunk/src/java/org/apache/solr/handler/SnapPuller.java (original)
+++ lucene/solr/trunk/src/java/org/apache/solr/handler/SnapPuller.java Wed Oct 29 01:54:54 2008
@@ -131,10 +131,6 @@
/**
* Gets the latest commit version and generation from the master
- *
- * @param client
- * @return
- * @throws IOException
*/
@SuppressWarnings("unchecked")
NamedList getLatestVersion(HttpClient client) throws IOException {
@@ -151,8 +147,7 @@
return getNamedListResponse(client, post);
}
- private NamedList getNamedListResponse(HttpClient client, PostMethod method)
- throws IOException {
+ private NamedList getNamedListResponse(HttpClient client, PostMethod method) throws IOException {
try {
int status = client.executeMethod(method);
if (status != HttpStatus.SC_OK) {
@@ -169,12 +164,7 @@
}
/**
- * Fetches the list of files in a given snapshot
- *
- * @param version
- * @param client
- * @return
- * @throws IOException
+ * Fetches the list of files in a given index commit point
*/
void fetchFileList(long version, HttpClient client) throws IOException {
PostMethod post = new PostMethod(masterUrl);
@@ -192,8 +182,8 @@
/**
* This command downloads all the necessary files from master to install a
- * snapshot. Only changed files are downloaded. it also downloads the
- * conf files (if they are modified)
+ * index commit point. Only changed files are downloaded. It also downloads the
+ * conf files (if they are modified).
*
* @param core the SolrCore
* @return true on success, false if slave is already in sync
@@ -297,6 +287,10 @@
}
}
+ /**
+ * Helper method to record the last replication's details so that we can show them on the
+ * statistics page across restarts.
+ */
private void logReplicationTimeAndConfFiles(Collection<Map<String, Object>> modifiedConfFiles) {
FileOutputStream outFile = null;
FileInputStream inFile = null;
@@ -364,11 +358,11 @@
}
- /**All the files are copied to a temp dir first
+ /**
+ * All the files are copied to a temp dir first
*/
private File createTempindexDir(SolrCore core) {
- String snapName = "index."
- + new SimpleDateFormat(SnapShooter.DATE_FMT).format(new Date());
+ String snapName = "index." + new SimpleDateFormat(SnapShooter.DATE_FMT).format(new Date());
File snapDir = new File(core.getDataDir(), snapName);
snapDir.mkdirs();
return snapDir;
@@ -404,17 +398,19 @@
copyTmpConfFiles2Conf(tmpconfDir);
}
- /** download the index files. if snap needed download all the files .
- * @param snapNeeded is it a fresh index copy
- * @param snapDir the directory to which files need to be downloadeed to
- * @param client the httpclient instance
+ /**
+ * Download the index files. If a new index is needed, download all the files.
+ *
+ * @param downloadCompleteIndex is it a fresh index copy
+ * @param snapDir the directory to which files need to be downloadeed to
+ * @param client the httpclient instance
* @param latestVersion the version number
*/
- private void downloadIndexFiles(boolean snapNeeded, File snapDir,
+ private void downloadIndexFiles(boolean downloadCompleteIndex, File snapDir,
HttpClient client, long latestVersion) throws Exception {
for (Map<String, Object> file : filesToDownload) {
File localIndexFile = new File(solrCore.getIndexDir(), (String) file.get(NAME));
- if (!localIndexFile.exists() || snapNeeded) {
+ if (!localIndexFile.exists() || downloadCompleteIndex) {
fileFetcher = new FileFetcher(snapDir, file, (String) file.get(NAME),
client, false, latestVersion);
currentFile = file;
@@ -426,8 +422,11 @@
}
}
- /**All the files which are common between master and slave must have
- * same timestamp and size else we assume they are not compatible (stale)
+ /**
+ * All the files which are common between master and slave must have
+ * same timestamp and size else we assume they are not compatible (stale).
+ *
+ * @return true if the index stale and we need to download a fresh copy, false otherwise.
*/
private boolean isIndexStale() {
for (Map<String, Object> file : filesToDownload) {
@@ -443,9 +442,11 @@
return false;
}
- /**Copy a file by the File#renameTo() method. if it fails , it is considered
+ /**
+ * Copy a file by the File#renameTo() method. If it fails, it is considered
* a failure
- * todo may be we should try a simple copy if it fails
+ *
+ * Todo may be we should try a simple copy if it fails
*/
private boolean copyAFile(File snapDir, File indexDir, String fname, List<String> copiedfiles) {
File indexFileInSnap = new File(snapDir, fname);
@@ -465,9 +466,10 @@
return true;
}
- /**Copy all index files from the temp index dir to the actual index
+ /**
+ * Copy all index files from the temp index dir to the actual index.
+ * The segments_N file is copied last.
*/
-
private boolean copyIndexFiles(File snapDir, File indexDir) {
String segmentsFile = null;
List<String> copiedfiles = new ArrayList<String>();
@@ -492,7 +494,8 @@
return true;
}
- /**The conf files are copied to the tmp dir to the config dir
+ /**
+ * The conf files are copied to the tmp dir to the conf dir.
* A backup of the old file is maintained
*/
private void copyTmpConfFiles2Conf(File tmpconfDir) throws IOException {
@@ -523,8 +526,8 @@
return new SimpleDateFormat(SnapShooter.DATE_FMT).format(d);
}
- /**if the index is stale by any chance. use the new feature of solr to load index
- * from a different dir in the data dir.
+ /**
+ * If the index is stale by any chance, load index from a different dir in the data dir.
*/
private void modifyIndexProps(String snap) {
LOG.info("New index installed. Updating index properties...");
@@ -554,8 +557,11 @@
}
}
- /**The local conf files are compared with the conf files in the master. If they are
- * same (by checksum) do not copy
+ /**
+ * The local conf files are compared with the conf files in the master. If they are
+ * same (by checksum) do not copy.
+ *
+ * @return a list of configuration files which have changed on the master and need to be downloaded.
*/
private Collection<Map<String, Object>> getModifiedConfFiles(List<Map<String, Object>> confFilesToDownload) {
if (confFilesToDownload == null || confFilesToDownload.isEmpty())
@@ -576,7 +582,8 @@
return nameVsFile.isEmpty() ? Collections.EMPTY_LIST : nameVsFile.values();
}
- /**delete the directree recursively
+ /**
+ * Delete the directory tree recursively
*/
static boolean delTree(File dir) {
if (dir == null || !dir.exists())
@@ -598,7 +605,8 @@
return dir.delete();
}
- /**periodic polling is disabled
+ /**
+ * Disable periodic polling
*/
void disablePoll() {
pollDisabled.set(true);
@@ -613,7 +621,8 @@
LOG.info("inside enable poll, value of pollDisabled = " + pollDisabled);
}
- /** Stops the ongoing pull
+ /**
+ * Stops the ongoing pull
*/
void abortPull() {
stop = true;
@@ -623,8 +632,6 @@
return replicationStartTime;
}
- /**used by details page for display.
- */
List<Map<String, Object>> getConfFilesToDownload() {
//make a copy first because it can be null later
List<Map<String, Object>> tmp = confFilesToDownload;
@@ -673,16 +680,15 @@
}
private class ReplicationHandlerException extends InterruptedException {
-
public ReplicationHandlerException(String message) {
super(message);
}
-
}
- /**The class acts as a client for ReplicationHandler.FileStream.
- * It understands the protoolc well
- *
+ /**
+ * The class acts as a client for ReplicationHandler.FileStream.
+ * It understands the protocol of wt=filestream
+ * @see org.apache.solr.handler.ReplicationHandler.FileStream
*/
private class FileFetcher {
boolean includeChecksum = true;
@@ -733,8 +739,8 @@
checksum = new Adler32();
}
- /**The main method which downloads file
- * @throws Exception
+ /**
+ * The main method which downloads file
*/
void fetchFile() throws Exception {
try {
@@ -753,7 +759,7 @@
}
//if there is an error continue. But continue from the point where it got broken
} finally {
- //closing Inputstream and HTTP connection takes a long time,
+ // closing Inputstream and HTTP connection takes a long time,
// so replication status shows as 'replicating' even though it is aborted.
new Thread() {
public void run() {
@@ -806,7 +812,7 @@
long checkSumClient = checksum.getValue();
if (checkSumClient != checkSumServer) {
LOG.error("Checksum not matched between client and server for: " + currentFile);
- //if checksum is wrong it is a problem return for retry
+ //if checksum is wrong it is a problem return for retry
return 1;
}
}
@@ -836,7 +842,7 @@
/**
* The webcontainer flushes the data only after it fills the buffer size.
* So, all data has to be read as readFully() other wise it fails. So read
- * everything as bytes and then extract int out of it
+ * everything as bytes and then extract an integer out of it
*/
private int readInt(byte[] b) {
return (((b[0] & 0xff) << 24) | ((b[1] & 0xff) << 16)
@@ -845,7 +851,7 @@
}
/**
- * Same as above but to read long
+ * Same as above but to read longs from a byte array
*/
private long readLong(byte[] b) {
return (((long) (b[0] & 0xff)) << 56) | (((long) (b[1] & 0xff)) << 48)
@@ -855,7 +861,8 @@
}
- /**cleanup everything
+ /**
+ * cleanup everything
*/
private void cleanup() {
try {
@@ -864,7 +871,7 @@
} catch (Exception e) {/* noop */
}
if (bytesDownloaded != size) {
- //if the download is notcomplete then
+ //if the download is not complete then
//delete the file being downloaded
try {
file.delete();
@@ -879,7 +886,8 @@
}
}
- /**Open a new stream using HttpClient
+ /**
+ * Open a new stream using HttpClient
*/
FastInputStream getStream() throws IOException {
post = new PostMethod(masterUrl);
@@ -898,7 +906,7 @@
post.addParameter(CHECKSUM, "true");
//wt=filestream this is a custom protocol
post.addParameter("wt", FILE_STREAM);
- //This happen if there is a failure there is a retry. the offset=<sizedownloaded> ensures that
+ // This happen if there is a failure there is a retry. the offset=<sizedownloaded> ensures that
// the server starts from the offset
if (bytesDownloaded > 0) {
post.addParameter(OFFSET, "" + bytesDownloaded);