You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@lucene.apache.org by ma...@apache.org on 2015/01/27 00:23:25 UTC
svn commit: r1654910 - in /lucene/dev/branches/branch_5x: ./ solr/
solr/core/ solr/core/src/java/org/apache/solr/core/
solr/core/src/java/org/apache/solr/handler/
Author: markrmiller
Date: Mon Jan 26 23:23:25 2015
New Revision: 1654910
URL: http://svn.apache.org/r1654910
Log:
SOLR-6500: Refactor FileFetcher in SnapPuller, add debug logging.
Modified:
lucene/dev/branches/branch_5x/ (props changed)
lucene/dev/branches/branch_5x/solr/ (props changed)
lucene/dev/branches/branch_5x/solr/CHANGES.txt (contents, props changed)
lucene/dev/branches/branch_5x/solr/core/ (props changed)
lucene/dev/branches/branch_5x/solr/core/src/java/org/apache/solr/core/IndexDeletionPolicyWrapper.java
lucene/dev/branches/branch_5x/solr/core/src/java/org/apache/solr/handler/ReplicationHandler.java
lucene/dev/branches/branch_5x/solr/core/src/java/org/apache/solr/handler/SnapPuller.java
Modified: lucene/dev/branches/branch_5x/solr/CHANGES.txt
URL: http://svn.apache.org/viewvc/lucene/dev/branches/branch_5x/solr/CHANGES.txt?rev=1654910&r1=1654909&r2=1654910&view=diff
==============================================================================
--- lucene/dev/branches/branch_5x/solr/CHANGES.txt (original)
+++ lucene/dev/branches/branch_5x/solr/CHANGES.txt Mon Jan 26 23:23:25 2015
@@ -10,7 +10,7 @@ servlet container such as Jetty.
See http://lucene.apache.org/solr for more information.
-* SOLR-6902Use JUnit rules instead of inheritance with distributed Solr
+* SOLR-6902: Use JUnit rules instead of inheritance with distributed Solr
tests to allow for multiple tests without the same class.
(Ramkumar Aiyengar, Erick Erickson, Mike McCandless)
@@ -43,8 +43,12 @@ New Features
Other Changes
----------------------
+
* SOLR-7014: Collapse identical catch branches in try-catch statements. (shalin)
+* SOLR-6500: Refactor FileFetcher in SnapPuller, add debug logging.
+ (Ramkumar Aiyengar via Mark Miller)
+
================== 5.0.0 ==================
Consult the LUCENE_CHANGES.txt file for additional, low level, changes in this release.
Modified: lucene/dev/branches/branch_5x/solr/core/src/java/org/apache/solr/core/IndexDeletionPolicyWrapper.java
URL: http://svn.apache.org/viewvc/lucene/dev/branches/branch_5x/solr/core/src/java/org/apache/solr/core/IndexDeletionPolicyWrapper.java?rev=1654910&r1=1654909&r2=1654910&view=diff
==============================================================================
--- lucene/dev/branches/branch_5x/solr/core/src/java/org/apache/solr/core/IndexDeletionPolicyWrapper.java (original)
+++ lucene/dev/branches/branch_5x/solr/core/src/java/org/apache/solr/core/IndexDeletionPolicyWrapper.java Mon Jan 26 23:23:25 2015
@@ -21,6 +21,8 @@ import org.apache.lucene.index.IndexDele
import org.apache.lucene.index.IndexWriter;
import org.apache.lucene.store.Directory;
import org.apache.solr.update.SolrIndexWriter;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
import java.io.IOException;
import java.util.*;
@@ -43,6 +45,8 @@ import java.util.concurrent.atomic.Atomi
* @see org.apache.lucene.index.IndexDeletionPolicy
*/
public final class IndexDeletionPolicyWrapper extends IndexDeletionPolicy {
+ private static final Logger LOG = LoggerFactory.getLogger(IndexDeletionPolicyWrapper.class.getName());
+
private final IndexDeletionPolicy deletionPolicy;
private volatile Map<Long, IndexCommit> solrVersionVsCommits = new ConcurrentHashMap<>();
private final Map<Long, Long> reserves = new ConcurrentHashMap<>();
@@ -82,7 +86,11 @@ public final class IndexDeletionPolicyWr
// this is the common success case: the older time didn't exist, or
// came before the new time.
- if (previousTime == null || previousTime <= timeToSet) break;
+ if (previousTime == null || previousTime <= timeToSet) {
+ LOG.debug("Commit point reservation for generation {} set to {} (requested reserve time of {})",
+ indexGen, timeToSet, reserveTime);
+ break;
+ }
// At this point, we overwrote a longer reservation, so we want to restore the older one.
// the problem is that an even longer reservation may come in concurrently
Modified: lucene/dev/branches/branch_5x/solr/core/src/java/org/apache/solr/handler/ReplicationHandler.java
URL: http://svn.apache.org/viewvc/lucene/dev/branches/branch_5x/solr/core/src/java/org/apache/solr/handler/ReplicationHandler.java?rev=1654910&r1=1654909&r2=1654910&view=diff
==============================================================================
--- lucene/dev/branches/branch_5x/solr/core/src/java/org/apache/solr/handler/ReplicationHandler.java (original)
+++ lucene/dev/branches/branch_5x/solr/core/src/java/org/apache/solr/handler/ReplicationHandler.java Mon Jan 26 23:23:25 2015
@@ -99,7 +99,7 @@ import org.slf4j.LoggerFactory;
* @since solr 1.4
*/
public class ReplicationHandler extends RequestHandlerBase implements SolrCoreAware {
-
+
private static final Logger LOG = LoggerFactory.getLogger(ReplicationHandler.class.getName());
SolrCore core;
@@ -212,7 +212,7 @@ public class ReplicationHandler extends
doSnapShoot(new ModifiableSolrParams(solrParams), rsp, req);
rsp.add(STATUS, OK_STATUS);
} else if (command.equalsIgnoreCase(CMD_DELETE_BACKUP)) {
- deleteSnapshot(new ModifiableSolrParams(solrParams), rsp, req);
+ deleteSnapshot(new ModifiableSolrParams(solrParams));
rsp.add(STATUS, OK_STATUS);
} else if (command.equalsIgnoreCase(CMD_FETCH_INDEX)) {
String masterUrl = solrParams.get(MASTER_URL);
@@ -272,7 +272,7 @@ public class ReplicationHandler extends
}
}
- private void deleteSnapshot(ModifiableSolrParams params, SolrQueryResponse rsp, SolrQueryRequest req) {
+ private void deleteSnapshot(ModifiableSolrParams params) {
String name = params.get("name");
if(name == null) {
throw new SolrException(ErrorCode.BAD_REQUEST, "Missing mandatory param: name");
@@ -793,8 +793,8 @@ public class ReplicationHandler extends
} else if (clzz == List.class) {
String ss[] = s.split(",");
List<String> l = new ArrayList<>();
- for (int i = 0; i < ss.length; i++) {
- l.add(new Date(Long.valueOf(ss[i])).toString());
+ for (String s1 : ss) {
+ l.add(new Date(Long.valueOf(s1)).toString());
}
nl.add(key, l);
} else {
@@ -1182,6 +1182,7 @@ public class ReplicationHandler extends
offset = offset == -1 ? 0 : offset;
int read = (int) Math.min(buf.length, filelen - offset);
in.readBytes(buf, 0, read);
+
fos.writeInt(read);
if (useChecksum) {
checksum.reset();
@@ -1190,6 +1191,7 @@ public class ReplicationHandler extends
}
fos.write(buf, 0, read);
fos.flush();
+ LOG.debug("Wrote {} bytes for file {}", offset + read, fileName);
//Pause if necessary
maxBytesBeforePause += read;
@@ -1239,8 +1241,8 @@ public class ReplicationHandler extends
FileInputStream inputStream = null;
try {
initWrite();
-
- //if if is a conf file read from config diectory
+
+ //if if is a conf file read from config directory
File file = new File(core.getResourceLoader().getConfigDir(), cfileName);
if (file.exists() && file.canRead()) {
@@ -1364,7 +1366,7 @@ public class ReplicationHandler extends
* Boolean param for tests that can be specified when using
* {@link #CMD_FETCH_INDEX} to force the current request to block until
* the fetch is complete. <b>NOTE:</b> This param is not advised for
- * non-test code, since the the durration of the fetch for non-trivial
+ * non-test code, since the the duration of the fetch for non-trivial
* indexes will likeley cause the request to time out.
*
* @lucene.internal
Modified: lucene/dev/branches/branch_5x/solr/core/src/java/org/apache/solr/handler/SnapPuller.java
URL: http://svn.apache.org/viewvc/lucene/dev/branches/branch_5x/solr/core/src/java/org/apache/solr/handler/SnapPuller.java?rev=1654910&r1=1654909&r2=1654910&view=diff
==============================================================================
--- lucene/dev/branches/branch_5x/solr/core/src/java/org/apache/solr/handler/SnapPuller.java (original)
+++ lucene/dev/branches/branch_5x/solr/core/src/java/org/apache/solr/handler/SnapPuller.java Mon Jan 26 23:23:25 2015
@@ -179,9 +179,7 @@ public class SnapPuller {
httpClientParams.set(HttpClientUtil.PROP_BASIC_AUTH_PASS, httpBasicAuthPassword);
httpClientParams.set(HttpClientUtil.PROP_ALLOW_COMPRESSION, useCompression);
- HttpClient httpClient = HttpClientUtil.createClient(httpClientParams, core.getCoreDescriptor().getCoreContainer().getUpdateShardHandler().getConnectionManager());
-
- return httpClient;
+ return HttpClientUtil.createClient(httpClientParams, core.getCoreDescriptor().getCoreContainer().getUpdateShardHandler().getConnectionManager());
}
public SnapPuller(final NamedList initArgs, final ReplicationHandler handler, final SolrCore sc) {
@@ -520,7 +518,7 @@ public class SnapPuller {
solrCore.getUpdateHandler().newIndexWriter(isFullCopyNeeded);
}
- openNewSearcherAndUpdateCommitPoint(isFullCopyNeeded);
+ openNewSearcherAndUpdateCommitPoint();
}
replicationStartTime = 0;
@@ -699,9 +697,7 @@ public class SnapPuller {
List<String> l = new ArrayList<>();
if (str != null && str.length() != 0) {
String[] ss = str.split(",");
- for (int i = 0; i < ss.length; i++) {
- l.add(ss[i]);
- }
+ Collections.addAll(l, ss);
}
sb.append(replicationTime);
if (!l.isEmpty()) {
@@ -714,7 +710,7 @@ public class SnapPuller {
return sb;
}
- private void openNewSearcherAndUpdateCommitPoint(boolean isFullCopyNeeded) throws IOException {
+ private void openNewSearcherAndUpdateCommitPoint() throws IOException {
SolrQueryRequest req = new LocalSolrQueryRequest(solrCore,
new ModifiableSolrParams());
@@ -749,8 +745,7 @@ public class SnapPuller {
private String createTempindexDir(SolrCore core, String tmpIdxDirName) {
// TODO: there should probably be a DirectoryFactory#concatPath(parent, name)
// or something
- String tmpIdxDir = core.getDataDir() + tmpIdxDirName;
- return tmpIdxDir;
+ return core.getDataDir() + tmpIdxDirName;
}
private void reloadCore() {
@@ -868,7 +863,7 @@ public class SnapPuller {
* Copy a file by the File#renameTo() method. If it fails, it is considered a failure
* <p/>
*/
- private boolean moveAFile(Directory tmpIdxDir, Directory indexDir, String fname, List<String> copiedfiles) {
+ private boolean moveAFile(Directory tmpIdxDir, Directory indexDir, String fname) {
LOG.debug("Moving file: {}", fname);
boolean success = false;
try {
@@ -902,7 +897,6 @@ public class SnapPuller {
}
}
String segmentsFile = null;
- List<String> movedfiles = new ArrayList<>();
for (Map<String, Object> f : filesDownloaded) {
String fname = (String) f.get(NAME);
// the segments file must be copied last
@@ -914,12 +908,11 @@ public class SnapPuller {
segmentsFile = fname;
continue;
}
- if (!moveAFile(tmpIdxDir, indexDir, fname, movedfiles)) return false;
- movedfiles.add(fname);
+ if (!moveAFile(tmpIdxDir, indexDir, fname)) return false;
}
//copy the segments file last
if (segmentsFile != null) {
- if (!moveAFile(tmpIdxDir, indexDir, segmentsFile, movedfiles)) return false;
+ if (!moveAFile(tmpIdxDir, indexDir, segmentsFile)) return false;
}
return true;
}
@@ -1157,7 +1150,7 @@ public class SnapPuller {
return null;
tmp = new HashMap<>(tmp);
if (tmpFileFetcher != null)
- tmp.put("bytesDownloaded", tmpFileFetcher.bytesDownloaded);
+ tmp.put("bytesDownloaded", tmpFileFetcher.getBytesDownloaded());
return tmp;
}
@@ -1178,58 +1171,53 @@ public class SnapPuller {
}
}
+ private interface FileInterface {
+ public void sync() throws IOException;
+ public void write(byte[] buf, int packetSize) throws IOException;
+ public void close() throws Exception;
+ public void delete() throws Exception;
+ }
+
/**
* The class acts as a client for ReplicationHandler.FileStream. It understands the protocol of wt=filestream
*
* @see org.apache.solr.handler.ReplicationHandler.DirectoryFileStream
*/
- private class DirectoryFileFetcher {
- boolean includeChecksum = true;
-
- Directory copy2Dir;
-
- String fileName;
-
- String saveAs;
-
- long size;
-
- long bytesDownloaded = 0;
-
- byte[] buf = new byte[1024 * 1024];
-
- Checksum checksum;
-
- int errorCount = 0;
-
+ private class FileFetcher {
+ private final FileInterface file;
+ private boolean includeChecksum = true;
+ private String fileName;
+ private String saveAs;
private boolean isConf;
-
- private boolean aborted = false;
-
private Long indexGen;
- private IndexOutput outStream;
+ private long size;
+ private long bytesDownloaded = 0;
+ private byte[] buf = new byte[1024 * 1024];
+ private Checksum checksum;
+ private int errorCount = 0;
+ private boolean aborted = false;
- DirectoryFileFetcher(Directory tmpIndexDir, Map<String, Object> fileDetails, String saveAs,
+ FileFetcher(FileInterface file, Map<String, Object> fileDetails, String saveAs,
boolean isConf, long latestGen) throws IOException {
- this.copy2Dir = tmpIndexDir;
+ this.file = file;
this.fileName = (String) fileDetails.get(NAME);
this.size = (Long) fileDetails.get(SIZE);
this.isConf = isConf;
this.saveAs = saveAs;
-
indexGen = latestGen;
-
- outStream = copy2Dir.createOutput(saveAs, DirectoryFactory.IOCONTEXT_NO_CACHE);
-
if (includeChecksum)
checksum = new Adler32();
}
+ public long getBytesDownloaded() {
+ return bytesDownloaded;
+ }
+
/**
* The main method which downloads file
*/
- void fetchFile() throws Exception {
+ public void fetchFile() throws Exception {
try {
while (true) {
final FastInputStream is = getStream();
@@ -1248,12 +1236,12 @@ public class SnapPuller {
}
} finally {
cleanup();
- //if cleanup suceeds . The file is downloaded fully. do an fsync
+ //if cleanup succeeds . The file is downloaded fully. do an fsync
fsyncService.submit(new Runnable(){
@Override
public void run() {
try {
- copy2Dir.sync(Collections.singleton(saveAs));
+ file.sync();
} catch (IOException e) {
fsyncException = e;
}
@@ -1277,7 +1265,7 @@ public class SnapPuller {
//read the size of the packet
int packetSize = readInt(intbytes);
if (packetSize <= 0) {
- LOG.warn("No content received for file: " + currentFile);
+ LOG.warn("No content received for file: {}", fileName);
return NO_CONTENT;
}
if (buf.length < packetSize)
@@ -1295,45 +1283,45 @@ public class SnapPuller {
checksum.update(buf, 0, packetSize);
long checkSumClient = checksum.getValue();
if (checkSumClient != checkSumServer) {
- LOG.error("Checksum not matched between client and server for: " + currentFile);
+ LOG.error("Checksum not matched between client and server for file: {}", fileName);
//if checksum is wrong it is a problem return for retry
return 1;
}
}
//if everything is fine, write down the packet to the file
- writeBytes(packetSize);
+ file.write(buf, packetSize);
bytesDownloaded += packetSize;
+ LOG.debug("Fetched and wrote {} bytes of file: {}", bytesDownloaded, fileName);
if (bytesDownloaded >= size)
return 0;
- //errorcount is always set to zero after a successful packet
+ //errorCount is always set to zero after a successful packet
errorCount = 0;
}
} catch (ReplicationHandlerException e) {
throw e;
} catch (Exception e) {
- LOG.warn("Error in fetching packets ", e);
- //for any failure , increment the error count
+ LOG.warn("Error in fetching file: {} (downloaded {} of {} bytes)",
+ fileName, bytesDownloaded, size, e);
+ //for any failure, increment the error count
errorCount++;
- //if it fails for the same pacaket for MAX_RETRIES fail and come out
+ //if it fails for the same packet for MAX_RETRIES fail and come out
if (errorCount > MAX_RETRIES) {
throw new SolrException(SolrException.ErrorCode.SERVER_ERROR,
- "Fetch failed for file:" + fileName, e);
+ "Failed to fetch file: " + fileName +
+ " (downloaded " + bytesDownloaded + " of " + size + " bytes" +
+ ", error count: " + errorCount + " > " + MAX_RETRIES + ")", e);
}
return ERR;
}
}
- protected void writeBytes(int packetSize) throws IOException {
- outStream.writeBytes(buf, 0, packetSize);
- }
-
/**
* The webcontainer flushes the data only after it fills the buffer size. So, all data has to be read as readFully()
* other wise it fails. So read everything as bytes and then extract an integer out of it
*/
private int readInt(byte[] b) {
return (((b[0] & 0xff) << 24) | ((b[1] & 0xff) << 16)
- | ((b[2] & 0xff) << 8) | (b[3] & 0xff));
+ | ((b[2] & 0xff) << 8) | (b[3] & 0xff));
}
@@ -1342,9 +1330,9 @@ public class SnapPuller {
*/
private long readLong(byte[] b) {
return (((long) (b[0] & 0xff)) << 56) | (((long) (b[1] & 0xff)) << 48)
- | (((long) (b[2] & 0xff)) << 40) | (((long) (b[3] & 0xff)) << 32)
- | (((long) (b[4] & 0xff)) << 24) | ((b[5] & 0xff) << 16)
- | ((b[6] & 0xff) << 8) | ((b[7] & 0xff));
+ | (((long) (b[2] & 0xff)) << 40) | (((long) (b[3] & 0xff)) << 32)
+ | (((long) (b[4] & 0xff)) << 24) | ((b[5] & 0xff) << 16)
+ | ((b[6] & 0xff) << 8) | ((b[7] & 0xff));
}
@@ -1353,30 +1341,30 @@ public class SnapPuller {
*/
private void cleanup() {
try {
- outStream.close();
- } catch (Exception e) {/* noop */
- LOG.error("Error closing the file stream: "+ this.saveAs ,e);
+ file.close();
+ } catch (Exception e) {/* no-op */
+ LOG.error("Error closing file: {}", this.saveAs, e);
}
if (bytesDownloaded != size) {
//if the download is not complete then
//delete the file being downloaded
try {
- copy2Dir.deleteFile(saveAs);
+ file.delete();
} catch (Exception e) {
- LOG.error("Error deleting file in cleanup" + e.getMessage());
+ LOG.error("Error deleting file: {}", this.saveAs, e);
}
- //if the failure is due to a user abort it is returned nomally else an exception is thrown
+ //if the failure is due to a user abort it is returned normally else an exception is thrown
if (!aborted)
throw new SolrException(SolrException.ErrorCode.SERVER_ERROR,
- "Unable to download " + fileName + " completely. Downloaded "
- + bytesDownloaded + "!=" + size);
+ "Unable to download " + fileName + " completely. Downloaded "
+ + bytesDownloaded + "!=" + size);
}
}
/**
* Open a new stream using HttpClient
*/
- FastInputStream getStream() throws IOException {
+ private FastInputStream getStream() throws IOException {
ModifiableSolrParams params = new ModifiableSolrParams();
@@ -1392,7 +1380,7 @@ public class SnapPuller {
params.set(FILE, fileName);
}
if (useInternal) {
- params.set(COMPRESSION, "true");
+ params.set(COMPRESSION, "true");
}
//use checksum
if (this.includeChecksum) {
@@ -1400,16 +1388,16 @@ public class SnapPuller {
}
//wt=filestream this is a custom protocol
params.set(CommonParams.WT, FILE_STREAM);
- // This happen if there is a failure there is a retry. the offset=<sizedownloaded> ensures that
- // the server starts from the offset
+ // This happen if there is a failure there is a retry. the offset=<sizedownloaded> ensures that
+ // the server starts from the offset
if (bytesDownloaded > 0) {
params.set(OFFSET, Long.toString(bytesDownloaded));
}
-
+
NamedList response;
InputStream is = null;
-
+
HttpSolrClient client = new HttpSolrClient(masterUrl, myHttpClient, null); //XXX use shardhandler
try {
client.setSoTimeout(60000);
@@ -1430,274 +1418,91 @@ public class SnapPuller {
}
}
}
-
- /**
- * The class acts as a client for ReplicationHandler.FileStream. It understands the protocol of wt=filestream
- *
- * @see org.apache.solr.handler.ReplicationHandler.LocalFsFileStream
- */
- private class LocalFsFileFetcher {
- boolean includeChecksum = true;
- private File copy2Dir;
+ private class DirectoryFile implements FileInterface {
+ private final String saveAs;
+ private Directory copy2Dir;
+ private IndexOutput outStream;
- String fileName;
+ DirectoryFile(Directory tmpIndexDir, String saveAs) throws IOException {
+ this.saveAs = saveAs;
+ this.copy2Dir = tmpIndexDir;
+ outStream = copy2Dir.createOutput(this.saveAs, DirectoryFactory.IOCONTEXT_NO_CACHE);
+ }
- String saveAs;
+ public void sync() throws IOException {
+ copy2Dir.sync(Collections.singleton(saveAs));
+ }
- long size;
+ public void write(byte[] buf, int packetSize) throws IOException {
+ outStream.writeBytes(buf, 0, packetSize);
+ }
- long bytesDownloaded = 0;
+ public void close() throws Exception {
+ outStream.close();
+ }
- FileChannel fileChannel;
-
- private FileOutputStream fileOutputStream;
+ public void delete() throws Exception {
+ copy2Dir.deleteFile(saveAs);
+ }
+ }
- byte[] buf = new byte[1024 * 1024];
+ private class DirectoryFileFetcher extends FileFetcher {
+ DirectoryFileFetcher(Directory tmpIndexDir, Map<String, Object> fileDetails, String saveAs,
+ boolean isConf, long latestGen) throws IOException {
+ super(new DirectoryFile(tmpIndexDir, saveAs), fileDetails, saveAs, isConf, latestGen);
+ }
+ }
- Checksum checksum;
+ private class LocalFsFile implements FileInterface {
+ private File copy2Dir;
+ FileChannel fileChannel;
+ private FileOutputStream fileOutputStream;
File file;
- int errorCount = 0;
-
- private boolean isConf;
-
- private boolean aborted = false;
-
- private Long indexGen;
-
- // TODO: could do more code sharing with DirectoryFileFetcher
- LocalFsFileFetcher(File dir, Map<String, Object> fileDetails, String saveAs,
- boolean isConf, long latestGen) throws IOException {
+ LocalFsFile(File dir, String saveAs) throws IOException {
this.copy2Dir = dir;
- this.fileName = (String) fileDetails.get(NAME);
- this.size = (Long) fileDetails.get(SIZE);
- this.isConf = isConf;
- this.saveAs = saveAs;
-
- indexGen = latestGen;
this.file = new File(copy2Dir, saveAs);
-
+
File parentDir = this.file.getParentFile();
if( ! parentDir.exists() ){
if ( ! parentDir.mkdirs() ) {
throw new SolrException(SolrException.ErrorCode.SERVER_ERROR,
- "Failed to create (sub)directory for file: " + saveAs);
+ "Failed to create (sub)directory for file: " + saveAs);
}
}
-
+
this.fileOutputStream = new FileOutputStream(file);
this.fileChannel = this.fileOutputStream.getChannel();
-
- if (includeChecksum)
- checksum = new Adler32();
}
- /**
- * The main method which downloads file
- */
- void fetchFile() throws Exception {
- try {
- while (true) {
- final FastInputStream is = getStream();
- int result;
- try {
- //fetch packets one by one in a single request
- result = fetchPackets(is);
- if (result == 0 || result == NO_CONTENT) {
- return;
- }
- //if there is an error continue. But continue from the point where it got broken
- } finally {
- IOUtils.closeQuietly(is);
- }
- }
- } finally {
- cleanup();
- //if cleanup suceeds . The file is downloaded fully. do an fsync
- fsyncService.submit(new Runnable(){
- @Override
- public void run() {
- try {
- FileUtils.sync(file);
- } catch (IOException e) {
- fsyncException = e;
- }
- }
- });
- }
+ public void sync() throws IOException {
+ FileUtils.sync(file);
}
- private int fetchPackets(FastInputStream fis) throws Exception {
- byte[] intbytes = new byte[4];
- byte[] longbytes = new byte[8];
- try {
- while (true) {
- if (stop) {
- stop = false;
- aborted = true;
- throw new ReplicationHandlerException("User aborted replication");
- }
- long checkSumServer = -1;
- fis.readFully(intbytes);
- //read the size of the packet
- int packetSize = readInt(intbytes);
- if (packetSize <= 0) {
- LOG.warn("No content received for file: " + currentFile);
- return NO_CONTENT;
- }
- if (buf.length < packetSize)
- buf = new byte[packetSize];
- if (checksum != null) {
- //read the checksum
- fis.readFully(longbytes);
- checkSumServer = readLong(longbytes);
- }
- //then read the packet of bytes
- fis.readFully(buf, 0, packetSize);
- //compare the checksum as sent from the master
- if (includeChecksum) {
- checksum.reset();
- checksum.update(buf, 0, packetSize);
- long checkSumClient = checksum.getValue();
- if (checkSumClient != checkSumServer) {
- LOG.error("Checksum not matched between client and server for: " + currentFile);
- //if checksum is wrong it is a problem return for retry
- return 1;
- }
- }
- //if everything is fine, write down the packet to the file
- fileChannel.write(ByteBuffer.wrap(buf, 0, packetSize));
- bytesDownloaded += packetSize;
- if (bytesDownloaded >= size)
- return 0;
- //errorcount is always set to zero after a successful packet
- errorCount = 0;
- }
- } catch (ReplicationHandlerException e) {
- throw e;
- } catch (Exception e) {
- LOG.warn("Error in fetching packets ", e);
- //for any failure , increment the error count
- errorCount++;
- //if it fails for the same pacaket for MAX_RETRIES fail and come out
- if (errorCount > MAX_RETRIES) {
- throw new SolrException(SolrException.ErrorCode.SERVER_ERROR,
- "Fetch failed for file:" + fileName, e);
- }
- return ERR;
- }
- }
-
- /**
- * The webcontainer flushes the data only after it fills the buffer size. So, all data has to be read as readFully()
- * other wise it fails. So read everything as bytes and then extract an integer out of it
- */
- private int readInt(byte[] b) {
- return (((b[0] & 0xff) << 24) | ((b[1] & 0xff) << 16)
- | ((b[2] & 0xff) << 8) | (b[3] & 0xff));
-
+ public void write(byte[] buf, int packetSize) throws IOException {
+ fileChannel.write(ByteBuffer.wrap(buf, 0, packetSize));
}
- /**
- * Same as above but to read longs from a byte array
- */
- private long readLong(byte[] b) {
- return (((long) (b[0] & 0xff)) << 56) | (((long) (b[1] & 0xff)) << 48)
- | (((long) (b[2] & 0xff)) << 40) | (((long) (b[3] & 0xff)) << 32)
- | (((long) (b[4] & 0xff)) << 24) | ((b[5] & 0xff) << 16)
- | ((b[6] & 0xff) << 8) | ((b[7] & 0xff));
-
+ public void close() throws Exception {
+ //close the FileOutputStream (which also closes the Channel)
+ fileOutputStream.close();
}
- /**
- * cleanup everything
- */
- private void cleanup() {
- try {
- //close the FileOutputStream (which also closes the Channel)
- fileOutputStream.close();
- } catch (Exception e) {/* noop */
- LOG.error("Error closing the file stream: "+ this.saveAs ,e);
- }
- if (bytesDownloaded != size) {
- //if the download is not complete then
- //delete the file being downloaded
- try {
- Files.delete(file.toPath());
- } catch (SecurityException e) {
- LOG.error("Error deleting file in cleanup" + e.getMessage());
- } catch (Throwable other) {
- // TODO: should this class care if a file couldnt be deleted?
- // this just emulates previous behavior, where only SecurityException would be handled.
- }
- //if the failure is due to a user abort it is returned nomally else an exception is thrown
- if (!aborted)
- throw new SolrException(SolrException.ErrorCode.SERVER_ERROR,
- "Unable to download " + fileName + " completely. Downloaded "
- + bytesDownloaded + "!=" + size);
- }
+ public void delete() throws Exception {
+ Files.delete(file.toPath());
}
+ }
- /**
- * Open a new stream using HttpClient
- */
- FastInputStream getStream() throws IOException {
-
- ModifiableSolrParams params = new ModifiableSolrParams();
-
-// //the method is command=filecontent
- params.set(COMMAND, CMD_GET_FILE);
- params.set(GENERATION, Long.toString(indexGen));
- params.set(CommonParams.QT, "/replication");
- //add the version to download. This is used to reserve the download
- if (isConf) {
- //set cf instead of file for config file
- params.set(CONF_FILE_SHORT, fileName);
- } else {
- params.set(FILE, fileName);
- }
- if (useInternal) {
- params.set(COMPRESSION, "true");
- }
- //use checksum
- if (this.includeChecksum) {
- params.set(CHECKSUM, true);
- }
- //wt=filestream this is a custom protocol
- params.set(CommonParams.WT, FILE_STREAM);
- // This happen if there is a failure there is a retry. the offset=<sizedownloaded> ensures that
- // the server starts from the offset
- if (bytesDownloaded > 0) {
- params.set(OFFSET, Long.toString(bytesDownloaded));
- }
-
-
- NamedList response;
- InputStream is = null;
- HttpSolrClient client = new HttpSolrClient(masterUrl, myHttpClient, null); //XXX use shardhandler
- try {
- client.setSoTimeout(60000);
- client.setConnectionTimeout(15000);
- QueryRequest req = new QueryRequest(params);
- response = client.request(req);
- is = (InputStream) response.get("stream");
- if(useInternal) {
- is = new InflaterInputStream(is);
- }
- return new FastInputStream(is);
- } catch (Exception e) {
- //close stream on error
- IOUtils.closeQuietly(is);
- throw new IOException("Could not download file '" + fileName + "'", e);
- } finally {
- client.shutdown();
- }
+ private class LocalFsFileFetcher extends FileFetcher {
+ LocalFsFileFetcher(File dir, Map<String, Object> fileDetails, String saveAs,
+ boolean isConf, long latestGen) throws IOException {
+ super(new LocalFsFile(dir, saveAs), fileDetails, saveAs, isConf, latestGen);
}
}
-
+
NamedList getDetails() throws IOException, SolrServerException {
ModifiableSolrParams params = new ModifiableSolrParams();
params.set(COMMAND, CMD_DETAILS);
@@ -1720,31 +1525,27 @@ public class SnapPuller {
if (interval == null)
return null;
int result = 0;
- if (interval != null) {
- Matcher m = INTERVAL_PATTERN.matcher(interval.trim());
- if (m.find()) {
- String hr = m.group(1);
- String min = m.group(2);
- String sec = m.group(3);
- result = 0;
- try {
- if (sec != null && sec.length() > 0)
- result += Integer.parseInt(sec);
- if (min != null && min.length() > 0)
- result += (60 * Integer.parseInt(min));
- if (hr != null && hr.length() > 0)
- result += (60 * 60 * Integer.parseInt(hr));
- result *= 1000;
- } catch (NumberFormatException e) {
- throw new SolrException(SolrException.ErrorCode.SERVER_ERROR,
- INTERVAL_ERR_MSG);
- }
- } else {
- throw new SolrException(SolrException.ErrorCode.SERVER_ERROR,
- INTERVAL_ERR_MSG);
+ Matcher m = INTERVAL_PATTERN.matcher(interval.trim());
+ if (m.find()) {
+ String hr = m.group(1);
+ String min = m.group(2);
+ String sec = m.group(3);
+ result = 0;
+ try {
+ if (sec != null && sec.length() > 0)
+ result += Integer.parseInt(sec);
+ if (min != null && min.length() > 0)
+ result += (60 * Integer.parseInt(min));
+ if (hr != null && hr.length() > 0)
+ result += (60 * 60 * Integer.parseInt(hr));
+ result *= 1000;
+ } catch (NumberFormatException e) {
+ throw new SolrException(ErrorCode.SERVER_ERROR, INTERVAL_ERR_MSG);
}
-
+ } else {
+ throw new SolrException(ErrorCode.SERVER_ERROR, INTERVAL_ERR_MSG);
}
+
return result;
}