You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@lucene.apache.org by ma...@apache.org on 2015/01/27 00:23:25 UTC

svn commit: r1654910 - in /lucene/dev/branches/branch_5x: ./ solr/ solr/core/ solr/core/src/java/org/apache/solr/core/ solr/core/src/java/org/apache/solr/handler/

Author: markrmiller
Date: Mon Jan 26 23:23:25 2015
New Revision: 1654910

URL: http://svn.apache.org/r1654910
Log:
SOLR-6500: Refactor FileFetcher in SnapPuller, add debug logging.

Modified:
    lucene/dev/branches/branch_5x/   (props changed)
    lucene/dev/branches/branch_5x/solr/   (props changed)
    lucene/dev/branches/branch_5x/solr/CHANGES.txt   (contents, props changed)
    lucene/dev/branches/branch_5x/solr/core/   (props changed)
    lucene/dev/branches/branch_5x/solr/core/src/java/org/apache/solr/core/IndexDeletionPolicyWrapper.java
    lucene/dev/branches/branch_5x/solr/core/src/java/org/apache/solr/handler/ReplicationHandler.java
    lucene/dev/branches/branch_5x/solr/core/src/java/org/apache/solr/handler/SnapPuller.java

Modified: lucene/dev/branches/branch_5x/solr/CHANGES.txt
URL: http://svn.apache.org/viewvc/lucene/dev/branches/branch_5x/solr/CHANGES.txt?rev=1654910&r1=1654909&r2=1654910&view=diff
==============================================================================
--- lucene/dev/branches/branch_5x/solr/CHANGES.txt (original)
+++ lucene/dev/branches/branch_5x/solr/CHANGES.txt Mon Jan 26 23:23:25 2015
@@ -10,7 +10,7 @@ servlet container such as Jetty.
 See http://lucene.apache.org/solr for more information.
 
 
-* SOLR-6902Use JUnit rules instead of inheritance with distributed Solr 
+* SOLR-6902: Use JUnit rules instead of inheritance with distributed Solr 
   tests to allow for multiple tests without the same class.
   (Ramkumar Aiyengar, Erick Erickson, Mike McCandless)
   
@@ -43,8 +43,12 @@ New Features
 
 Other Changes
 ----------------------
+
 * SOLR-7014: Collapse identical catch branches in try-catch statements. (shalin)
 
+* SOLR-6500: Refactor FileFetcher in SnapPuller, add debug logging. 
+  (Ramkumar Aiyengar via Mark Miller)
+
 ==================  5.0.0 ==================
 
 Consult the LUCENE_CHANGES.txt file for additional, low level, changes in this release.

Modified: lucene/dev/branches/branch_5x/solr/core/src/java/org/apache/solr/core/IndexDeletionPolicyWrapper.java
URL: http://svn.apache.org/viewvc/lucene/dev/branches/branch_5x/solr/core/src/java/org/apache/solr/core/IndexDeletionPolicyWrapper.java?rev=1654910&r1=1654909&r2=1654910&view=diff
==============================================================================
--- lucene/dev/branches/branch_5x/solr/core/src/java/org/apache/solr/core/IndexDeletionPolicyWrapper.java (original)
+++ lucene/dev/branches/branch_5x/solr/core/src/java/org/apache/solr/core/IndexDeletionPolicyWrapper.java Mon Jan 26 23:23:25 2015
@@ -21,6 +21,8 @@ import org.apache.lucene.index.IndexDele
 import org.apache.lucene.index.IndexWriter;
 import org.apache.lucene.store.Directory;
 import org.apache.solr.update.SolrIndexWriter;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 import java.io.IOException;
 import java.util.*;
@@ -43,6 +45,8 @@ import java.util.concurrent.atomic.Atomi
  * @see org.apache.lucene.index.IndexDeletionPolicy
  */
 public final class IndexDeletionPolicyWrapper extends IndexDeletionPolicy {
+  private static final Logger LOG = LoggerFactory.getLogger(IndexDeletionPolicyWrapper.class.getName());
+
   private final IndexDeletionPolicy deletionPolicy;
   private volatile Map<Long, IndexCommit> solrVersionVsCommits = new ConcurrentHashMap<>();
   private final Map<Long, Long> reserves = new ConcurrentHashMap<>();
@@ -82,7 +86,11 @@ public final class IndexDeletionPolicyWr
 
       // this is the common success case: the older time didn't exist, or
       // came before the new time.
-      if (previousTime == null || previousTime <= timeToSet) break;
+      if (previousTime == null || previousTime <= timeToSet) {
+        LOG.debug("Commit point reservation for generation {} set to {} (requested reserve time of {})",
+            indexGen, timeToSet, reserveTime);
+        break;
+      }
 
       // At this point, we overwrote a longer reservation, so we want to restore the older one.
       // the problem is that an even longer reservation may come in concurrently

Modified: lucene/dev/branches/branch_5x/solr/core/src/java/org/apache/solr/handler/ReplicationHandler.java
URL: http://svn.apache.org/viewvc/lucene/dev/branches/branch_5x/solr/core/src/java/org/apache/solr/handler/ReplicationHandler.java?rev=1654910&r1=1654909&r2=1654910&view=diff
==============================================================================
--- lucene/dev/branches/branch_5x/solr/core/src/java/org/apache/solr/handler/ReplicationHandler.java (original)
+++ lucene/dev/branches/branch_5x/solr/core/src/java/org/apache/solr/handler/ReplicationHandler.java Mon Jan 26 23:23:25 2015
@@ -99,7 +99,7 @@ import org.slf4j.LoggerFactory;
  * @since solr 1.4
  */
 public class ReplicationHandler extends RequestHandlerBase implements SolrCoreAware {
-  
+
   private static final Logger LOG = LoggerFactory.getLogger(ReplicationHandler.class.getName());
   SolrCore core;
 
@@ -212,7 +212,7 @@ public class ReplicationHandler extends
       doSnapShoot(new ModifiableSolrParams(solrParams), rsp, req);
       rsp.add(STATUS, OK_STATUS);
     } else if (command.equalsIgnoreCase(CMD_DELETE_BACKUP)) {
-      deleteSnapshot(new ModifiableSolrParams(solrParams), rsp, req);
+      deleteSnapshot(new ModifiableSolrParams(solrParams));
       rsp.add(STATUS, OK_STATUS);
     } else if (command.equalsIgnoreCase(CMD_FETCH_INDEX)) {
       String masterUrl = solrParams.get(MASTER_URL);
@@ -272,7 +272,7 @@ public class ReplicationHandler extends
     }
   }
 
-  private void deleteSnapshot(ModifiableSolrParams params, SolrQueryResponse rsp, SolrQueryRequest req) {
+  private void deleteSnapshot(ModifiableSolrParams params) {
     String name = params.get("name");
     if(name == null) {
       throw new SolrException(ErrorCode.BAD_REQUEST, "Missing mandatory param: name");
@@ -793,8 +793,8 @@ public class ReplicationHandler extends
     } else if (clzz == List.class) {
       String ss[] = s.split(",");
       List<String> l = new ArrayList<>();
-      for (int i = 0; i < ss.length; i++) {
-        l.add(new Date(Long.valueOf(ss[i])).toString());
+      for (String s1 : ss) {
+        l.add(new Date(Long.valueOf(s1)).toString());
       }
       nl.add(key, l);
     } else {
@@ -1182,6 +1182,7 @@ public class ReplicationHandler extends
           offset = offset == -1 ? 0 : offset;
           int read = (int) Math.min(buf.length, filelen - offset);
           in.readBytes(buf, 0, read);
+
           fos.writeInt(read);
           if (useChecksum) {
             checksum.reset();
@@ -1190,6 +1191,7 @@ public class ReplicationHandler extends
           }
           fos.write(buf, 0, read);
           fos.flush();
+          LOG.debug("Wrote {} bytes for file {}", offset + read, fileName);
 
           //Pause if necessary
           maxBytesBeforePause += read;
@@ -1239,8 +1241,8 @@ public class ReplicationHandler extends
       FileInputStream inputStream = null;
       try {
         initWrite();
-  
-        //if if is a conf file read from config diectory
+
+        //if if is a conf file read from config directory
         File file = new File(core.getResourceLoader().getConfigDir(), cfileName);
 
         if (file.exists() && file.canRead()) {
@@ -1364,7 +1366,7 @@ public class ReplicationHandler extends
    * Boolean param for tests that can be specified when using 
    * {@link #CMD_FETCH_INDEX} to force the current request to block until 
    * the fetch is complete.  <b>NOTE:</b> This param is not advised for 
-   * non-test code, since the the durration of the fetch for non-trivial
+   * non-test code, since the the duration of the fetch for non-trivial
    * indexes will likeley cause the request to time out.
    *
    * @lucene.internal

Modified: lucene/dev/branches/branch_5x/solr/core/src/java/org/apache/solr/handler/SnapPuller.java
URL: http://svn.apache.org/viewvc/lucene/dev/branches/branch_5x/solr/core/src/java/org/apache/solr/handler/SnapPuller.java?rev=1654910&r1=1654909&r2=1654910&view=diff
==============================================================================
--- lucene/dev/branches/branch_5x/solr/core/src/java/org/apache/solr/handler/SnapPuller.java (original)
+++ lucene/dev/branches/branch_5x/solr/core/src/java/org/apache/solr/handler/SnapPuller.java Mon Jan 26 23:23:25 2015
@@ -179,9 +179,7 @@ public class SnapPuller {
     httpClientParams.set(HttpClientUtil.PROP_BASIC_AUTH_PASS, httpBasicAuthPassword);
     httpClientParams.set(HttpClientUtil.PROP_ALLOW_COMPRESSION, useCompression);
 
-    HttpClient httpClient = HttpClientUtil.createClient(httpClientParams, core.getCoreDescriptor().getCoreContainer().getUpdateShardHandler().getConnectionManager());
-
-    return httpClient;
+    return HttpClientUtil.createClient(httpClientParams, core.getCoreDescriptor().getCoreContainer().getUpdateShardHandler().getConnectionManager());
   }
 
   public SnapPuller(final NamedList initArgs, final ReplicationHandler handler, final SolrCore sc) {
@@ -520,7 +518,7 @@ public class SnapPuller {
             solrCore.getUpdateHandler().newIndexWriter(isFullCopyNeeded);
           }
           
-          openNewSearcherAndUpdateCommitPoint(isFullCopyNeeded);
+          openNewSearcherAndUpdateCommitPoint();
         }
         
         replicationStartTime = 0;
@@ -699,9 +697,7 @@ public class SnapPuller {
     List<String> l = new ArrayList<>();
     if (str != null && str.length() != 0) {
       String[] ss = str.split(",");
-      for (int i = 0; i < ss.length; i++) {
-        l.add(ss[i]);
-      }
+      Collections.addAll(l, ss);
     }
     sb.append(replicationTime);
     if (!l.isEmpty()) {
@@ -714,7 +710,7 @@ public class SnapPuller {
     return sb;
   }
 
-  private void openNewSearcherAndUpdateCommitPoint(boolean isFullCopyNeeded) throws IOException {
+  private void openNewSearcherAndUpdateCommitPoint() throws IOException {
     SolrQueryRequest req = new LocalSolrQueryRequest(solrCore,
         new ModifiableSolrParams());
     
@@ -749,8 +745,7 @@ public class SnapPuller {
   private String createTempindexDir(SolrCore core, String tmpIdxDirName) {
     // TODO: there should probably be a DirectoryFactory#concatPath(parent, name)
     // or something
-    String tmpIdxDir = core.getDataDir() + tmpIdxDirName;
-    return tmpIdxDir;
+    return core.getDataDir() + tmpIdxDirName;
   }
 
   private void reloadCore() {
@@ -868,7 +863,7 @@ public class SnapPuller {
    * Copy a file by the File#renameTo() method. If it fails, it is considered a failure
    * <p/>
    */
-  private boolean moveAFile(Directory tmpIdxDir, Directory indexDir, String fname, List<String> copiedfiles) {
+  private boolean moveAFile(Directory tmpIdxDir, Directory indexDir, String fname) {
     LOG.debug("Moving file: {}", fname);
     boolean success = false;
     try {
@@ -902,7 +897,6 @@ public class SnapPuller {
       }
     }
     String segmentsFile = null;
-    List<String> movedfiles = new ArrayList<>();
     for (Map<String, Object> f : filesDownloaded) {
       String fname = (String) f.get(NAME);
       // the segments file must be copied last
@@ -914,12 +908,11 @@ public class SnapPuller {
         segmentsFile = fname;
         continue;
       }
-      if (!moveAFile(tmpIdxDir, indexDir, fname, movedfiles)) return false;
-      movedfiles.add(fname);
+      if (!moveAFile(tmpIdxDir, indexDir, fname)) return false;
     }
     //copy the segments file last
     if (segmentsFile != null) {
-      if (!moveAFile(tmpIdxDir, indexDir, segmentsFile, movedfiles)) return false;
+      if (!moveAFile(tmpIdxDir, indexDir, segmentsFile)) return false;
     }
     return true;
   }
@@ -1157,7 +1150,7 @@ public class SnapPuller {
       return null;
     tmp = new HashMap<>(tmp);
     if (tmpFileFetcher != null)
-      tmp.put("bytesDownloaded", tmpFileFetcher.bytesDownloaded);
+      tmp.put("bytesDownloaded", tmpFileFetcher.getBytesDownloaded());
     return tmp;
   }
 
@@ -1178,58 +1171,53 @@ public class SnapPuller {
     }
   }
 
+  private interface FileInterface {
+    public void sync() throws IOException;
+    public void write(byte[] buf, int packetSize) throws IOException;
+    public void close() throws Exception;
+    public void delete() throws Exception;
+  }
+
   /**
    * The class acts as a client for ReplicationHandler.FileStream. It understands the protocol of wt=filestream
    *
    * @see org.apache.solr.handler.ReplicationHandler.DirectoryFileStream
    */
-  private class DirectoryFileFetcher {
-    boolean includeChecksum = true;
-
-    Directory copy2Dir;
-
-    String fileName;
-
-    String saveAs;
-
-    long size;
-
-    long bytesDownloaded = 0;
-
-    byte[] buf = new byte[1024 * 1024];
-
-    Checksum checksum;
-
-    int errorCount = 0;
-
+  private class FileFetcher {
+    private final FileInterface file;
+    private boolean includeChecksum = true;
+    private String fileName;
+    private String saveAs;
     private boolean isConf;
-
-    private boolean aborted = false;
-
     private Long indexGen;
 
-    private IndexOutput outStream;
+    private long size;
+    private long bytesDownloaded = 0;
+    private byte[] buf = new byte[1024 * 1024];
+    private Checksum checksum;
+    private int errorCount = 0;
+    private boolean aborted = false;
 
-    DirectoryFileFetcher(Directory tmpIndexDir, Map<String, Object> fileDetails, String saveAs,
+    FileFetcher(FileInterface file, Map<String, Object> fileDetails, String saveAs,
                 boolean isConf, long latestGen) throws IOException {
-      this.copy2Dir = tmpIndexDir;
+      this.file = file;
       this.fileName = (String) fileDetails.get(NAME);
       this.size = (Long) fileDetails.get(SIZE);
       this.isConf = isConf;
       this.saveAs = saveAs;
-
       indexGen = latestGen;
-      
-      outStream = copy2Dir.createOutput(saveAs, DirectoryFactory.IOCONTEXT_NO_CACHE);
-
       if (includeChecksum)
         checksum = new Adler32();
     }
 
+    public long getBytesDownloaded() {
+      return bytesDownloaded;
+    }
+
     /**
      * The main method which downloads file
      */
-    void fetchFile() throws Exception {
+    public void fetchFile() throws Exception {
       try {
         while (true) {
           final FastInputStream is = getStream();
@@ -1248,12 +1236,12 @@ public class SnapPuller {
         }
       } finally {
         cleanup();
-        //if cleanup suceeds . The file is downloaded fully. do an fsync
+        //if cleanup succeeds . The file is downloaded fully. do an fsync
         fsyncService.submit(new Runnable(){
           @Override
           public void run() {
             try {
-              copy2Dir.sync(Collections.singleton(saveAs));
+              file.sync();
             } catch (IOException e) {
               fsyncException = e;
             }
@@ -1277,7 +1265,7 @@ public class SnapPuller {
           //read the size of the packet
           int packetSize = readInt(intbytes);
           if (packetSize <= 0) {
-            LOG.warn("No content received for file: " + currentFile);
+            LOG.warn("No content received for file: {}", fileName);
             return NO_CONTENT;
           }
           if (buf.length < packetSize)
@@ -1295,45 +1283,45 @@ public class SnapPuller {
             checksum.update(buf, 0, packetSize);
             long checkSumClient = checksum.getValue();
             if (checkSumClient != checkSumServer) {
-              LOG.error("Checksum not matched between client and server for: " + currentFile);
+              LOG.error("Checksum not matched between client and server for file: {}", fileName);
               //if checksum is wrong it is a problem return for retry
               return 1;
             }
           }
           //if everything is fine, write down the packet to the file
-          writeBytes(packetSize);
+          file.write(buf, packetSize);
           bytesDownloaded += packetSize;
+          LOG.debug("Fetched and wrote {} bytes of file: {}", bytesDownloaded, fileName);
           if (bytesDownloaded >= size)
             return 0;
-          //errorcount is always set to zero after a successful packet
+          //errorCount is always set to zero after a successful packet
           errorCount = 0;
         }
       } catch (ReplicationHandlerException e) {
         throw e;
       } catch (Exception e) {
-        LOG.warn("Error in fetching packets ", e);
-        //for any failure , increment the error count
+        LOG.warn("Error in fetching file: {} (downloaded {} of {} bytes)",
+            fileName, bytesDownloaded, size, e);
+        //for any failure, increment the error count
         errorCount++;
-        //if it fails for the same pacaket for   MAX_RETRIES fail and come out
+        //if it fails for the same packet for MAX_RETRIES fail and come out
         if (errorCount > MAX_RETRIES) {
           throw new SolrException(SolrException.ErrorCode.SERVER_ERROR,
-                  "Fetch failed for file:" + fileName, e);
+              "Failed to fetch file: " + fileName +
+                  " (downloaded " + bytesDownloaded + " of " + size + " bytes" +
+                  ", error count: " + errorCount + " > " + MAX_RETRIES + ")", e);
         }
         return ERR;
       }
     }
 
-    protected void writeBytes(int packetSize) throws IOException {
-      outStream.writeBytes(buf, 0, packetSize);
-    }
-
     /**
      * The webcontainer flushes the data only after it fills the buffer size. So, all data has to be read as readFully()
      * other wise it fails. So read everything as bytes and then extract an integer out of it
      */
     private int readInt(byte[] b) {
       return (((b[0] & 0xff) << 24) | ((b[1] & 0xff) << 16)
-              | ((b[2] & 0xff) << 8) | (b[3] & 0xff));
+          | ((b[2] & 0xff) << 8) | (b[3] & 0xff));
 
     }
 
@@ -1342,9 +1330,9 @@ public class SnapPuller {
      */
     private long readLong(byte[] b) {
       return (((long) (b[0] & 0xff)) << 56) | (((long) (b[1] & 0xff)) << 48)
-              | (((long) (b[2] & 0xff)) << 40) | (((long) (b[3] & 0xff)) << 32)
-              | (((long) (b[4] & 0xff)) << 24) | ((b[5] & 0xff) << 16)
-              | ((b[6] & 0xff) << 8) | ((b[7] & 0xff));
+          | (((long) (b[2] & 0xff)) << 40) | (((long) (b[3] & 0xff)) << 32)
+          | (((long) (b[4] & 0xff)) << 24) | ((b[5] & 0xff) << 16)
+          | ((b[6] & 0xff) << 8) | ((b[7] & 0xff));
 
     }
 
@@ -1353,30 +1341,30 @@ public class SnapPuller {
      */
     private void cleanup() {
       try {
-        outStream.close();
-      } catch (Exception e) {/* noop */
-          LOG.error("Error closing the file stream: "+ this.saveAs ,e);
+        file.close();
+      } catch (Exception e) {/* no-op */
+        LOG.error("Error closing file: {}", this.saveAs, e);
       }
       if (bytesDownloaded != size) {
         //if the download is not complete then
         //delete the file being downloaded
         try {
-          copy2Dir.deleteFile(saveAs);
+          file.delete();
         } catch (Exception e) {
-          LOG.error("Error deleting file in cleanup" + e.getMessage());
+          LOG.error("Error deleting file: {}", this.saveAs, e);
         }
-        //if the failure is due to a user abort it is returned nomally else an exception is thrown
+        //if the failure is due to a user abort it is returned normally else an exception is thrown
         if (!aborted)
           throw new SolrException(SolrException.ErrorCode.SERVER_ERROR,
-                  "Unable to download " + fileName + " completely. Downloaded "
-                          + bytesDownloaded + "!=" + size);
+              "Unable to download " + fileName + " completely. Downloaded "
+                  + bytesDownloaded + "!=" + size);
       }
     }
 
     /**
      * Open a new stream using HttpClient
      */
-    FastInputStream getStream() throws IOException {
+    private FastInputStream getStream() throws IOException {
 
       ModifiableSolrParams params = new ModifiableSolrParams();
 
@@ -1392,7 +1380,7 @@ public class SnapPuller {
         params.set(FILE, fileName);
       }
       if (useInternal) {
-        params.set(COMPRESSION, "true"); 
+        params.set(COMPRESSION, "true");
       }
       //use checksum
       if (this.includeChecksum) {
@@ -1400,16 +1388,16 @@ public class SnapPuller {
       }
       //wt=filestream this is a custom protocol
       params.set(CommonParams.WT, FILE_STREAM);
-        // This happen if there is a failure there is a retry. the offset=<sizedownloaded> ensures that
-        // the server starts from the offset
+      // This happen if there is a failure there is a retry. the offset=<sizedownloaded> ensures that
+      // the server starts from the offset
       if (bytesDownloaded > 0) {
         params.set(OFFSET, Long.toString(bytesDownloaded));
       }
-      
+
 
       NamedList response;
       InputStream is = null;
-      
+
       HttpSolrClient client = new HttpSolrClient(masterUrl, myHttpClient, null);  //XXX use shardhandler
       try {
         client.setSoTimeout(60000);
@@ -1430,274 +1418,91 @@ public class SnapPuller {
       }
     }
   }
-  
-  /**
-   * The class acts as a client for ReplicationHandler.FileStream. It understands the protocol of wt=filestream
-   *
-   * @see org.apache.solr.handler.ReplicationHandler.LocalFsFileStream
-   */
-  private class LocalFsFileFetcher {
-    boolean includeChecksum = true;
 
-    private File copy2Dir;
+  private class DirectoryFile implements FileInterface {
+    private final String saveAs;
+    private Directory copy2Dir;
+    private IndexOutput outStream;
 
-    String fileName;
+    DirectoryFile(Directory tmpIndexDir, String saveAs) throws IOException {
+      this.saveAs = saveAs;
+      this.copy2Dir = tmpIndexDir;
+      outStream = copy2Dir.createOutput(this.saveAs, DirectoryFactory.IOCONTEXT_NO_CACHE);
+    }
 
-    String saveAs;
+    public void sync() throws IOException {
+      copy2Dir.sync(Collections.singleton(saveAs));
+    }
 
-    long size;
+    public void write(byte[] buf, int packetSize) throws IOException {
+      outStream.writeBytes(buf, 0, packetSize);
+    }
 
-    long bytesDownloaded = 0;
+    public void close() throws Exception {
+      outStream.close();
+    }
 
-    FileChannel fileChannel;
-    
-    private FileOutputStream fileOutputStream;
+    public void delete() throws Exception {
+      copy2Dir.deleteFile(saveAs);
+    }
+  }
 
-    byte[] buf = new byte[1024 * 1024];
+  private class DirectoryFileFetcher extends FileFetcher {
+    DirectoryFileFetcher(Directory tmpIndexDir, Map<String, Object> fileDetails, String saveAs,
+                boolean isConf, long latestGen) throws IOException {
+      super(new DirectoryFile(tmpIndexDir, saveAs), fileDetails, saveAs, isConf, latestGen);
+    }
+  }
 
-    Checksum checksum;
+  private class LocalFsFile implements FileInterface {
+    private File copy2Dir;
 
+    FileChannel fileChannel;
+    private FileOutputStream fileOutputStream;
     File file;
 
-    int errorCount = 0;
-
-    private boolean isConf;
-
-    private boolean aborted = false;
-
-    private Long indexGen;
-
-    // TODO: could do more code sharing with DirectoryFileFetcher
-    LocalFsFileFetcher(File dir, Map<String, Object> fileDetails, String saveAs,
-                boolean isConf, long latestGen) throws IOException {
+    LocalFsFile(File dir, String saveAs) throws IOException {
       this.copy2Dir = dir;
-      this.fileName = (String) fileDetails.get(NAME);
-      this.size = (Long) fileDetails.get(SIZE);
-      this.isConf = isConf;
-      this.saveAs = saveAs;
-
-      indexGen = latestGen;
 
       this.file = new File(copy2Dir, saveAs);
-      
+
       File parentDir = this.file.getParentFile();
       if( ! parentDir.exists() ){
         if ( ! parentDir.mkdirs() ) {
           throw new SolrException(SolrException.ErrorCode.SERVER_ERROR,
-                                  "Failed to create (sub)directory for file: " + saveAs);
+              "Failed to create (sub)directory for file: " + saveAs);
         }
       }
-      
+
       this.fileOutputStream = new FileOutputStream(file);
       this.fileChannel = this.fileOutputStream.getChannel();
-
-      if (includeChecksum)
-        checksum = new Adler32();
     }
 
-    /**
-     * The main method which downloads file
-     */
-    void fetchFile() throws Exception {
-      try {
-        while (true) {
-          final FastInputStream is = getStream();
-          int result;
-          try {
-            //fetch packets one by one in a single request
-            result = fetchPackets(is);
-            if (result == 0 || result == NO_CONTENT) {
-              return;
-            }
-            //if there is an error continue. But continue from the point where it got broken
-          } finally {
-            IOUtils.closeQuietly(is);
-          }
-        }
-      } finally {
-        cleanup();
-        //if cleanup suceeds . The file is downloaded fully. do an fsync
-        fsyncService.submit(new Runnable(){
-          @Override
-          public void run() {
-            try {
-              FileUtils.sync(file);
-            } catch (IOException e) {
-              fsyncException = e;
-            }
-          }
-        });
-      }
+    public void sync() throws IOException {
+      FileUtils.sync(file);
     }
 
-    private int fetchPackets(FastInputStream fis) throws Exception {
-      byte[] intbytes = new byte[4];
-      byte[] longbytes = new byte[8];
-      try {
-        while (true) {
-          if (stop) {
-            stop = false;
-            aborted = true;
-            throw new ReplicationHandlerException("User aborted replication");
-          }
-          long checkSumServer = -1;
-          fis.readFully(intbytes);
-          //read the size of the packet
-          int packetSize = readInt(intbytes);
-          if (packetSize <= 0) {
-            LOG.warn("No content received for file: " + currentFile);
-            return NO_CONTENT;
-          }
-          if (buf.length < packetSize)
-            buf = new byte[packetSize];
-          if (checksum != null) {
-            //read the checksum
-            fis.readFully(longbytes);
-            checkSumServer = readLong(longbytes);
-          }
-          //then read the packet of bytes
-          fis.readFully(buf, 0, packetSize);
-          //compare the checksum as sent from the master
-          if (includeChecksum) {
-            checksum.reset();
-            checksum.update(buf, 0, packetSize);
-            long checkSumClient = checksum.getValue();
-            if (checkSumClient != checkSumServer) {
-              LOG.error("Checksum not matched between client and server for: " + currentFile);
-              //if checksum is wrong it is a problem return for retry
-              return 1;
-            }
-          }
-          //if everything is fine, write down the packet to the file
-          fileChannel.write(ByteBuffer.wrap(buf, 0, packetSize));
-          bytesDownloaded += packetSize;
-          if (bytesDownloaded >= size)
-            return 0;
-          //errorcount is always set to zero after a successful packet
-          errorCount = 0;
-        }
-      } catch (ReplicationHandlerException e) {
-        throw e;
-      } catch (Exception e) {
-        LOG.warn("Error in fetching packets ", e);
-        //for any failure , increment the error count
-        errorCount++;
-        //if it fails for the same pacaket for   MAX_RETRIES fail and come out
-        if (errorCount > MAX_RETRIES) {
-          throw new SolrException(SolrException.ErrorCode.SERVER_ERROR,
-                  "Fetch failed for file:" + fileName, e);
-        }
-        return ERR;
-      }
-    }
-
-    /**
-     * The webcontainer flushes the data only after it fills the buffer size. So, all data has to be read as readFully()
-     * other wise it fails. So read everything as bytes and then extract an integer out of it
-     */
-    private int readInt(byte[] b) {
-      return (((b[0] & 0xff) << 24) | ((b[1] & 0xff) << 16)
-              | ((b[2] & 0xff) << 8) | (b[3] & 0xff));
-
+    public void write(byte[] buf, int packetSize) throws IOException {
+      fileChannel.write(ByteBuffer.wrap(buf, 0, packetSize));
     }
 
-    /**
-     * Same as above but to read longs from a byte array
-     */
-    private long readLong(byte[] b) {
-      return (((long) (b[0] & 0xff)) << 56) | (((long) (b[1] & 0xff)) << 48)
-              | (((long) (b[2] & 0xff)) << 40) | (((long) (b[3] & 0xff)) << 32)
-              | (((long) (b[4] & 0xff)) << 24) | ((b[5] & 0xff) << 16)
-              | ((b[6] & 0xff) << 8) | ((b[7] & 0xff));
-
+    public void close() throws Exception {
+      //close the FileOutputStream (which also closes the Channel)
+      fileOutputStream.close();
     }
 
-    /**
-     * cleanup everything
-     */
-    private void cleanup() {
-      try {
-        //close the FileOutputStream (which also closes the Channel)
-        fileOutputStream.close();
-      } catch (Exception e) {/* noop */
-          LOG.error("Error closing the file stream: "+ this.saveAs ,e);
-      }
-      if (bytesDownloaded != size) {
-        //if the download is not complete then
-        //delete the file being downloaded
-        try {
-          Files.delete(file.toPath());
-        } catch (SecurityException e) {
-          LOG.error("Error deleting file in cleanup" + e.getMessage());
-        } catch (Throwable other) {
-          // TODO: should this class care if a file couldnt be deleted?
-          // this just emulates previous behavior, where only SecurityException would be handled.
-        }
-        //if the failure is due to a user abort it is returned nomally else an exception is thrown
-        if (!aborted)
-          throw new SolrException(SolrException.ErrorCode.SERVER_ERROR,
-                  "Unable to download " + fileName + " completely. Downloaded "
-                          + bytesDownloaded + "!=" + size);
-      }
+    public void delete() throws Exception {
+      Files.delete(file.toPath());
     }
+  }
 
-    /**
-     * Open a new stream using HttpClient
-     */
-    FastInputStream getStream() throws IOException {
-
-      ModifiableSolrParams params = new ModifiableSolrParams();
-
-//    //the method is command=filecontent
-      params.set(COMMAND, CMD_GET_FILE);
-      params.set(GENERATION, Long.toString(indexGen));
-      params.set(CommonParams.QT, "/replication");
-      //add the version to download. This is used to reserve the download
-      if (isConf) {
-        //set cf instead of file for config file
-        params.set(CONF_FILE_SHORT, fileName);
-      } else {
-        params.set(FILE, fileName);
-      }
-      if (useInternal) {
-        params.set(COMPRESSION, "true"); 
-      }
-      //use checksum
-      if (this.includeChecksum) {
-        params.set(CHECKSUM, true);
-      }
-      //wt=filestream this is a custom protocol
-      params.set(CommonParams.WT, FILE_STREAM);
-        // This happen if there is a failure there is a retry. the offset=<sizedownloaded> ensures that
-        // the server starts from the offset
-      if (bytesDownloaded > 0) {
-        params.set(OFFSET, Long.toString(bytesDownloaded));
-      }
-      
-
-      NamedList response;
-      InputStream is = null;
-      HttpSolrClient client = new HttpSolrClient(masterUrl, myHttpClient, null);  //XXX use shardhandler
-      try {
-        client.setSoTimeout(60000);
-        client.setConnectionTimeout(15000);
-        QueryRequest req = new QueryRequest(params);
-        response = client.request(req);
-        is = (InputStream) response.get("stream");
-        if(useInternal) {
-          is = new InflaterInputStream(is);
-        }
-        return new FastInputStream(is);
-      } catch (Exception e) {
-        //close stream on error
-        IOUtils.closeQuietly(is);
-        throw new IOException("Could not download file '" + fileName + "'", e);
-      } finally {
-        client.shutdown();
-      }
+  private class LocalFsFileFetcher extends FileFetcher {
+    LocalFsFileFetcher(File dir, Map<String, Object> fileDetails, String saveAs,
+                boolean isConf, long latestGen) throws IOException {
+      super(new LocalFsFile(dir, saveAs), fileDetails, saveAs, isConf, latestGen);
     }
   }
-  
+
   NamedList getDetails() throws IOException, SolrServerException {
     ModifiableSolrParams params = new ModifiableSolrParams();
     params.set(COMMAND, CMD_DETAILS);
@@ -1720,31 +1525,27 @@ public class SnapPuller {
     if (interval == null)
       return null;
     int result = 0;
-    if (interval != null) {
-      Matcher m = INTERVAL_PATTERN.matcher(interval.trim());
-      if (m.find()) {
-        String hr = m.group(1);
-        String min = m.group(2);
-        String sec = m.group(3);
-        result = 0;
-        try {
-          if (sec != null && sec.length() > 0)
-            result += Integer.parseInt(sec);
-          if (min != null && min.length() > 0)
-            result += (60 * Integer.parseInt(min));
-          if (hr != null && hr.length() > 0)
-            result += (60 * 60 * Integer.parseInt(hr));
-          result *= 1000;
-        } catch (NumberFormatException e) {
-          throw new SolrException(SolrException.ErrorCode.SERVER_ERROR,
-                  INTERVAL_ERR_MSG);
-        }
-      } else {
-        throw new SolrException(SolrException.ErrorCode.SERVER_ERROR,
-                INTERVAL_ERR_MSG);
+    Matcher m = INTERVAL_PATTERN.matcher(interval.trim());
+    if (m.find()) {
+      String hr = m.group(1);
+      String min = m.group(2);
+      String sec = m.group(3);
+      result = 0;
+      try {
+        if (sec != null && sec.length() > 0)
+          result += Integer.parseInt(sec);
+        if (min != null && min.length() > 0)
+          result += (60 * Integer.parseInt(min));
+        if (hr != null && hr.length() > 0)
+          result += (60 * 60 * Integer.parseInt(hr));
+        result *= 1000;
+      } catch (NumberFormatException e) {
+        throw new SolrException(ErrorCode.SERVER_ERROR, INTERVAL_ERR_MSG);
       }
-
+    } else {
+      throw new SolrException(ErrorCode.SERVER_ERROR, INTERVAL_ERR_MSG);
     }
+
     return result;
   }