You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@lucene.apache.org by sh...@apache.org on 2010/07/30 07:34:01 UTC

svn commit: r980654 - in /lucene/dev/branches/branch_3x/lucene: ./ src/java/org/apache/lucene/index/ src/java/org/apache/lucene/store/

Author: shaie
Date: Fri Jul 30 05:34:00 2010
New Revision: 980654

URL: http://svn.apache.org/viewvc?rev=980654&view=rev
Log:
LUCENE-2574: Optimize copies between IndexInput and Output

Modified:
    lucene/dev/branches/branch_3x/lucene/CHANGES.txt
    lucene/dev/branches/branch_3x/lucene/src/java/org/apache/lucene/index/CompoundFileReader.java
    lucene/dev/branches/branch_3x/lucene/src/java/org/apache/lucene/store/BufferedIndexInput.java
    lucene/dev/branches/branch_3x/lucene/src/java/org/apache/lucene/store/Directory.java
    lucene/dev/branches/branch_3x/lucene/src/java/org/apache/lucene/store/FSDirectory.java
    lucene/dev/branches/branch_3x/lucene/src/java/org/apache/lucene/store/IndexInput.java
    lucene/dev/branches/branch_3x/lucene/src/java/org/apache/lucene/store/RAMInputStream.java
    lucene/dev/branches/branch_3x/lucene/src/java/org/apache/lucene/store/RAMOutputStream.java
    lucene/dev/branches/branch_3x/lucene/src/java/org/apache/lucene/store/SimpleFSDirectory.java

Modified: lucene/dev/branches/branch_3x/lucene/CHANGES.txt
URL: http://svn.apache.org/viewvc/lucene/dev/branches/branch_3x/lucene/CHANGES.txt?rev=980654&r1=980653&r2=980654&view=diff
==============================================================================
--- lucene/dev/branches/branch_3x/lucene/CHANGES.txt (original)
+++ lucene/dev/branches/branch_3x/lucene/CHANGES.txt Fri Jul 30 05:34:00 2010
@@ -410,6 +410,10 @@ Optimizations
 * LUCENE-2531: Fix issue when sorting by a String field that was
   causing too many fallbacks to compare-by-value (instead of by-ord).
   (Mike McCandless)
+
+* LUCENE-2574: IndexInput exposes copyBytes(IndexOutput, long) to allow for 
+  efficient copying by sub-classes. Optimized copy is implemented for RAM and FS
+  streams. (Shai Erera)
   
 Build
 

Modified: lucene/dev/branches/branch_3x/lucene/src/java/org/apache/lucene/index/CompoundFileReader.java
URL: http://svn.apache.org/viewvc/lucene/dev/branches/branch_3x/lucene/src/java/org/apache/lucene/index/CompoundFileReader.java?rev=980654&r1=980653&r2=980654&view=diff
==============================================================================
--- lucene/dev/branches/branch_3x/lucene/src/java/org/apache/lucene/index/CompoundFileReader.java (original)
+++ lucene/dev/branches/branch_3x/lucene/src/java/org/apache/lucene/index/CompoundFileReader.java Fri Jul 30 05:34:00 2010
@@ -302,6 +302,17 @@ class CompoundFileReader extends Directo
           return length;
         }
 
+        @Override
+        public void copyBytes(IndexOutput out, long numBytes) throws IOException {
+          // Copy first whatever is in the buffer
+          numBytes -= flushBuffer(out, numBytes);
+          
+          // If there are more bytes left to copy, delegate the copy task to the
+          // base IndexInput, in case it can do an optimized copy.
+          if (numBytes > 0) {
+            base.copyBytes(out, numBytes);
+          }
+        }
 
     }
     

Modified: lucene/dev/branches/branch_3x/lucene/src/java/org/apache/lucene/store/BufferedIndexInput.java
URL: http://svn.apache.org/viewvc/lucene/dev/branches/branch_3x/lucene/src/java/org/apache/lucene/store/BufferedIndexInput.java?rev=980654&r1=980653&r2=980654&view=diff
==============================================================================
--- lucene/dev/branches/branch_3x/lucene/src/java/org/apache/lucene/store/BufferedIndexInput.java (original)
+++ lucene/dev/branches/branch_3x/lucene/src/java/org/apache/lucene/store/BufferedIndexInput.java Fri Jul 30 05:34:00 2010
@@ -26,9 +26,9 @@ public abstract class BufferedIndexInput
   public static final int BUFFER_SIZE = 1024;
 
   private int bufferSize = BUFFER_SIZE;
-
+  
   protected byte[] buffer;
-
+  
   private long bufferStart = 0;			  // position in file of buffer
   private int bufferLength = 0;			  // end of valid bytes
   private int bufferPosition = 0;		  // next byte to read
@@ -205,4 +205,37 @@ public abstract class BufferedIndexInput
     return clone;
   }
 
+  /**
+   * Flushes the in-memory bufer to the given output, copying at most
+   * <code>numBytes</code>.
+   * <p>
+   * <b>NOTE:</b> this method does not refill the buffer, however it does
+   * advance the buffer position.
+   * 
+   * @return the number of bytes actually flushed from the in-memory buffer.
+   */
+  protected int flushBuffer(IndexOutput out, long numBytes) throws IOException {
+    int toCopy = bufferLength - bufferPosition;
+    if (toCopy > numBytes) {
+      toCopy = (int) numBytes;
+    }
+    if (toCopy > 0) {
+      out.writeBytes(buffer, bufferPosition, toCopy);
+      bufferPosition += toCopy;
+    }
+    return toCopy;
+  }
+  
+  @Override
+  public void copyBytes(IndexOutput out, long numBytes) throws IOException {
+    assert numBytes >= 0: "numBytes=" + numBytes;
+
+    while (numBytes > 0) {
+      if (bufferLength == bufferPosition) {
+        refill();
+      }
+      numBytes -= flushBuffer(out, numBytes);
+    }
+  }
+  
 }

Modified: lucene/dev/branches/branch_3x/lucene/src/java/org/apache/lucene/store/Directory.java
URL: http://svn.apache.org/viewvc/lucene/dev/branches/branch_3x/lucene/src/java/org/apache/lucene/store/Directory.java?rev=980654&r1=980653&r2=980654&view=diff
==============================================================================
--- lucene/dev/branches/branch_3x/lucene/src/java/org/apache/lucene/store/Directory.java (original)
+++ lucene/dev/branches/branch_3x/lucene/src/java/org/apache/lucene/store/Directory.java Fri Jul 30 05:34:00 2010
@@ -216,26 +216,11 @@ public abstract class Directory implemen
    * overwrite it if it does.
    */
   public void copy(Directory to, String src, String dest) throws IOException {
-    IndexOutput os = null;
-    IndexInput is = null;
+    IndexOutput os = to.createOutput(dest);
+    IndexInput is = openInput(src);
     IOException priorException = null;
-    int bufSize = BufferedIndexOutput.BUFFER_SIZE;
-    byte[] buf = new byte[bufSize];
     try {
-      // create file in dest directory
-      os = to.createOutput(dest);
-      // read current file
-      is = openInput(src);
-      // and copy to dest directory
-      long len = is.length();
-      long numRead = 0;
-      while (numRead < len) {
-        long left = len - numRead;
-        int toRead = (int) (bufSize < left ? bufSize : left);
-        is.readBytes(buf, 0, toRead);
-        os.writeBytes(buf, toRead);
-        numRead += toRead;
-      }
+      is.copyBytes(os, is.length());
     } catch (IOException ioe) {
       priorException = ioe;
     } finally {

Modified: lucene/dev/branches/branch_3x/lucene/src/java/org/apache/lucene/store/FSDirectory.java
URL: http://svn.apache.org/viewvc/lucene/dev/branches/branch_3x/lucene/src/java/org/apache/lucene/store/FSDirectory.java?rev=980654&r1=980653&r2=980654&view=diff
==============================================================================
--- lucene/dev/branches/branch_3x/lucene/src/java/org/apache/lucene/store/FSDirectory.java (original)
+++ lucene/dev/branches/branch_3x/lucene/src/java/org/apache/lucene/store/FSDirectory.java Fri Jul 30 05:34:00 2010
@@ -18,9 +18,7 @@ package org.apache.lucene.store;
  */
 
 import java.io.File;
-import java.io.FileInputStream;
 import java.io.FileNotFoundException;
-import java.io.FileOutputStream;
 import java.io.FilenameFilter;
 import java.io.IOException;
 import java.io.RandomAccessFile;
@@ -35,7 +33,6 @@ import java.util.HashSet;
 import java.util.Set;
 
 import org.apache.lucene.store.SimpleFSDirectory.SimpleFSIndexInput;
-import org.apache.lucene.util.IOUtils;
 import org.apache.lucene.util.ThreadInterruptedException;
 import org.apache.lucene.util.Constants;
 
@@ -438,47 +435,6 @@ public abstract class FSDirectory extend
     return chunkSize;
   }
 
-  @Override
-  public void copy(Directory to, String src, String dest) throws IOException {
-    if (to instanceof FSDirectory) {
-      FSDirectory target = (FSDirectory) to;
-      target.ensureCanWrite(dest);
-      FileChannel input = null;
-      FileChannel output = null;
-      IOException priorException = null;
-      try {
-        input = new FileInputStream(new File(directory, src)).getChannel();
-        output = new FileOutputStream(new File(target.directory, dest)).getChannel();
-        copy(input, output, input.size());
-      } catch (IOException ioe) {
-        priorException = ioe;
-      } finally {
-        IOUtils.closeSafely(priorException, input, output);
-      }
-    } else {
-      super.copy(to, src, dest);
-    }
-  }
-
-  /**
-   * Copies the content of a given {@link FileChannel} to a destination one. The
-   * copy is done in chunks of 2MB because if transferFrom is used without a
-   * limit when copying a very large file, then an OOM may be thrown (depends on
-   * the state of the RAM in the machine, as well as the OS used). Performance
-   * measurements showed that chunk sizes larger than 2MB do not result in much
-   * faster file copy, therefore we limit the size to be safe with different
-   * file sizes and systems.
-   */
-  static void copy(FileChannel input, FileChannel output, long numBytes) throws IOException {
-    long pos = output.position();
-    long writeTo = numBytes + pos;
-    while (pos < writeTo) {
-      pos += output.transferFrom(input, pos, Math.min(CHANNEL_CHUNK_SIZE, writeTo - pos));
-    }
-    // transferFrom does not change the position of the channel. Need to change it manually
-    output.position(pos);
-  }
-  
   protected static class FSIndexOutput extends BufferedIndexOutput {
     private final FSDirectory parent;
     private final String name;
@@ -501,23 +457,37 @@ public abstract class FSDirectory extend
     @Override
     public void copyBytes(IndexInput input, long numBytes) throws IOException {
       // Optimized copy only if the number of bytes to copy is larger than the
-      // buffer size, and the given IndexInput supports FileChannel copying ..
+      // buffer size, and the given IndexInput supports FileChannel copying.
       // NOTE: the below check relies on NIOIndexInput extending Simple. If that
       // changes in the future, we should change the check as well.
-      if (numBytes > BUFFER_SIZE && input instanceof SimpleFSIndexInput) {
-        // flush any bytes in the buffer
-        flush();
-        // do the optimized copy
-        FileChannel in = ((SimpleFSIndexInput) input).file.getChannel();
-        FileChannel out = file.getChannel();
-        copy(in, out, numBytes);
-        // corrects the position in super (BufferedIndexOutput), so that calls
-        // to getFilePointer will return the correct pointer.
-        // Perhaps a specific method is better?
-        super.seek(out.position());
-      } else {
+      if (numBytes <= BUFFER_SIZE || !(input instanceof SimpleFSIndexInput)) {
         super.copyBytes(input, numBytes);
+        return;
+      }
+
+      SimpleFSIndexInput fsInput = (SimpleFSIndexInput) input;
+
+      // flush any bytes in the buffer
+      flush();
+      
+      // flush any bytes in the input's buffer.
+      numBytes -= fsInput.flushBuffer(this, numBytes);
+      
+      // do the optimized copy
+      FileChannel in = fsInput.file.getChannel();
+      FileChannel out = file.getChannel();
+      long pos = out.position();
+      long writeTo = numBytes + pos;
+      while (pos < writeTo) {
+        pos += out.transferFrom(in, pos, Math.min(CHANNEL_CHUNK_SIZE, writeTo - pos));
       }
+      // transferFrom does not change the position of the channel. Need to change it manually
+      out.position(pos);
+      
+      // corrects the position in super (BufferedIndexOutput), so that calls
+      // to getFilePointer will return the correct pointer.
+      // Perhaps a specific method is better?
+      super.seek(out.position());
     }
     
     @Override

Modified: lucene/dev/branches/branch_3x/lucene/src/java/org/apache/lucene/store/IndexInput.java
URL: http://svn.apache.org/viewvc/lucene/dev/branches/branch_3x/lucene/src/java/org/apache/lucene/store/IndexInput.java?rev=980654&r1=980653&r2=980654&view=diff
==============================================================================
--- lucene/dev/branches/branch_3x/lucene/src/java/org/apache/lucene/store/IndexInput.java (original)
+++ lucene/dev/branches/branch_3x/lucene/src/java/org/apache/lucene/store/IndexInput.java Fri Jul 30 05:34:00 2010
@@ -22,16 +22,16 @@ import java.io.Closeable;
 import java.util.Map;
 import java.util.HashMap;
 
-import org.apache.lucene.util.ArrayUtil;
-import org.apache.lucene.util.RamUsageEstimator;
-
 /** Abstract base class for input from a file in a {@link Directory}.  A
  * random-access input stream.  Used for all Lucene index input operations.
  * @see Directory
  */
 public abstract class IndexInput implements Cloneable,Closeable {
+
   private boolean preUTF8Strings;                 // true if we are reading old (modified UTF8) string format
 
+  protected byte[] copyBuf = null;
+
   /** Reads and returns a single byte.
    * @see IndexOutput#writeByte(byte)
    */
@@ -59,8 +59,7 @@ public abstract class IndexInput impleme
    * @see IndexOutput#writeBytes(byte[],int)
    */
   public void readBytes(byte[] b, int offset, int len, boolean useBuffer)
-    throws IOException
-  {
+    throws IOException {
     // Default to ignoring useBuffer entirely
     readBytes(b, offset, len);
   }
@@ -239,4 +238,31 @@ public abstract class IndexInput impleme
 
     return map;
   }
+
+  /**
+   * Copies <code>numBytes</code> bytes to the given {@link IndexOutput}.
+   * <p>
+   * <b>NOTE:</b> this method uses an intermediate buffer to copy the bytes.
+   * Consider overriding it in your implementation, if you can make a better,
+   * optimized copy.
+   * <p>
+   * <b>NOTE</b> ensure that there are enough bytes in the input to copy to
+   * output. Otherwise, different exceptions may be thrown, depending on the
+   * implementation.
+   */
+  public void copyBytes(IndexOutput out, long numBytes) throws IOException {
+    assert numBytes >= 0: "numBytes=" + numBytes;
+
+    if (copyBuf == null) {
+      copyBuf = new byte[BufferedIndexInput.BUFFER_SIZE];
+    }
+
+    while (numBytes > 0) {
+      final int toCopy = (int) (numBytes > copyBuf.length ? copyBuf.length : numBytes);
+      readBytes(copyBuf, 0, toCopy);
+      out.writeBytes(copyBuf, 0, toCopy);
+      numBytes -= toCopy;
+    }
+  }
+  
 }

Modified: lucene/dev/branches/branch_3x/lucene/src/java/org/apache/lucene/store/RAMInputStream.java
URL: http://svn.apache.org/viewvc/lucene/dev/branches/branch_3x/lucene/src/java/org/apache/lucene/store/RAMInputStream.java?rev=980654&r1=980653&r2=980654&view=diff
==============================================================================
--- lucene/dev/branches/branch_3x/lucene/src/java/org/apache/lucene/store/RAMInputStream.java (original)
+++ lucene/dev/branches/branch_3x/lucene/src/java/org/apache/lucene/store/RAMInputStream.java Fri Jul 30 05:34:00 2010
@@ -1,7 +1,5 @@
 package org.apache.lucene.store;
 
-import java.io.IOException;
-
 /**
  * Licensed to the Apache Software Foundation (ASF) under one or more
  * contributor license agreements.  See the NOTICE file distributed with
@@ -19,10 +17,9 @@ import java.io.IOException;
  * limitations under the License.
  */
 
-/**
- * A memory-resident {@link IndexInput} implementation.
- */
+import java.io.IOException;
 
+/** A memory-resident {@link IndexInput} implementation. */
 class RAMInputStream extends IndexInput implements Cloneable {
   static final int BUFFER_SIZE = RAMOutputStream.BUFFER_SIZE;
 
@@ -105,6 +102,27 @@ class RAMInputStream extends IndexInput 
   }
 
   @Override
+  public void copyBytes(IndexOutput out, long numBytes) throws IOException {
+    assert numBytes >= 0: "numBytes=" + numBytes;
+    
+    long left = numBytes;
+    while (left > 0) {
+      if (bufferPosition == bufferLength) {
+        ++currentBufferIndex;
+        switchCurrentBuffer(true);
+      }
+      
+      final int bytesInBuffer = bufferLength - bufferPosition;
+      final int toCopy = (int) (bytesInBuffer < left ? bytesInBuffer : left);
+      out.writeBytes(currentBuffer, bufferPosition, toCopy);
+      bufferPosition += toCopy;
+      left -= toCopy;
+    }
+    
+    assert left == 0: "Insufficient bytes to copy: numBytes=" + numBytes + " copied=" + (numBytes - left);
+  }
+  
+  @Override
   public long getFilePointer() {
     return currentBufferIndex < 0 ? 0 : bufferStart + bufferPosition;
   }

Modified: lucene/dev/branches/branch_3x/lucene/src/java/org/apache/lucene/store/RAMOutputStream.java
URL: http://svn.apache.org/viewvc/lucene/dev/branches/branch_3x/lucene/src/java/org/apache/lucene/store/RAMOutputStream.java?rev=980654&r1=980653&r2=980654&view=diff
==============================================================================
--- lucene/dev/branches/branch_3x/lucene/src/java/org/apache/lucene/store/RAMOutputStream.java (original)
+++ lucene/dev/branches/branch_3x/lucene/src/java/org/apache/lucene/store/RAMOutputStream.java Fri Jul 30 05:34:00 2010
@@ -24,7 +24,6 @@ import java.io.IOException;
  *
  * @lucene.internal
  */
-
 public class RAMOutputStream extends IndexOutput {
   static final int BUFFER_SIZE = 1024;
 
@@ -161,4 +160,26 @@ public class RAMOutputStream extends Ind
   public long sizeInBytes() {
     return file.numBuffers() * BUFFER_SIZE;
   }
+  
+  @Override
+  public void copyBytes(IndexInput input, long numBytes) throws IOException {
+    assert numBytes >= 0: "numBytes=" + numBytes;
+
+    while (numBytes > 0) {
+      if (bufferPosition == bufferLength) {
+        currentBufferIndex++;
+        switchCurrentBuffer();
+      }
+
+      int toCopy = currentBuffer.length - bufferPosition;
+      if (numBytes < toCopy) {
+        toCopy = (int) numBytes;
+      }
+      input.readBytes(currentBuffer, bufferPosition, toCopy, false);
+      numBytes -= toCopy;
+      bufferPosition += toCopy;
+    }
+
+  }
+  
 }

Modified: lucene/dev/branches/branch_3x/lucene/src/java/org/apache/lucene/store/SimpleFSDirectory.java
URL: http://svn.apache.org/viewvc/lucene/dev/branches/branch_3x/lucene/src/java/org/apache/lucene/store/SimpleFSDirectory.java?rev=980654&r1=980653&r2=980654&view=diff
==============================================================================
--- lucene/dev/branches/branch_3x/lucene/src/java/org/apache/lucene/store/SimpleFSDirectory.java (original)
+++ lucene/dev/branches/branch_3x/lucene/src/java/org/apache/lucene/store/SimpleFSDirectory.java Fri Jul 30 05:34:00 2010
@@ -160,5 +160,12 @@ public class SimpleFSDirectory extends F
     boolean isFDValid() throws IOException {
       return file.getFD().valid();
     }
+    
+    @Override
+    public void copyBytes(IndexOutput out, long numBytes) throws IOException {
+      numBytes -= flushBuffer(out, numBytes);
+      // If out is FSIndexOutput, the copy will be optimized
+      out.copyBytes(this, numBytes);
+    }
   }
 }