You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by cu...@apache.org on 2006/11/14 23:35:25 UTC

svn commit: r475025 [1/9] - in /lucene/hadoop/trunk: ./ bin/ src/java/org/apache/hadoop/io/ src/java/org/apache/hadoop/io/compress/ src/java/org/apache/hadoop/io/compress/zlib/ src/java/org/apache/hadoop/util/ src/native/ src/native/config/ src/native/...

Author: cutting
Date: Tue Nov 14 14:35:22 2006
New Revision: 475025

URL: http://svn.apache.org/viewvc?view=rev&rev=475025
Log:
HADOOP-538.  Add support for building an optional native library, libhadoop.so.  Contributed by Arun.

Added:
    lucene/hadoop/trunk/src/java/org/apache/hadoop/io/compress/Compressor.java
    lucene/hadoop/trunk/src/java/org/apache/hadoop/io/compress/CompressorStream.java
    lucene/hadoop/trunk/src/java/org/apache/hadoop/io/compress/Decompressor.java
    lucene/hadoop/trunk/src/java/org/apache/hadoop/io/compress/DecompressorStream.java
    lucene/hadoop/trunk/src/java/org/apache/hadoop/io/compress/zlib/
    lucene/hadoop/trunk/src/java/org/apache/hadoop/io/compress/zlib/BuiltInZlibDeflater.java
    lucene/hadoop/trunk/src/java/org/apache/hadoop/io/compress/zlib/BuiltInZlibInflater.java
    lucene/hadoop/trunk/src/java/org/apache/hadoop/io/compress/zlib/ZlibCompressor.java
    lucene/hadoop/trunk/src/java/org/apache/hadoop/io/compress/zlib/ZlibDecompressor.java
    lucene/hadoop/trunk/src/java/org/apache/hadoop/io/compress/zlib/ZlibFactory.java
    lucene/hadoop/trunk/src/java/org/apache/hadoop/util/NativeCodeLoader.java
    lucene/hadoop/trunk/src/java/org/apache/hadoop/util/PlatformName.java
    lucene/hadoop/trunk/src/native/
    lucene/hadoop/trunk/src/native/.autom4te.cfg
    lucene/hadoop/trunk/src/native/AUTHORS
    lucene/hadoop/trunk/src/native/COPYING
    lucene/hadoop/trunk/src/native/ChangeLog
    lucene/hadoop/trunk/src/native/INSTALL
    lucene/hadoop/trunk/src/native/Makefile.am
    lucene/hadoop/trunk/src/native/Makefile.in
    lucene/hadoop/trunk/src/native/NEWS
    lucene/hadoop/trunk/src/native/README
    lucene/hadoop/trunk/src/native/acinclude.m4
    lucene/hadoop/trunk/src/native/aclocal.m4
    lucene/hadoop/trunk/src/native/config/
    lucene/hadoop/trunk/src/native/config.h.in
    lucene/hadoop/trunk/src/native/config/config.guess
    lucene/hadoop/trunk/src/native/config/config.sub
    lucene/hadoop/trunk/src/native/config/depcomp
    lucene/hadoop/trunk/src/native/config/install-sh
    lucene/hadoop/trunk/src/native/config/ltmain.sh
    lucene/hadoop/trunk/src/native/config/missing
    lucene/hadoop/trunk/src/native/configure
    lucene/hadoop/trunk/src/native/configure.ac
    lucene/hadoop/trunk/src/native/lib/
    lucene/hadoop/trunk/src/native/lib/Makefile.am
    lucene/hadoop/trunk/src/native/lib/Makefile.in
    lucene/hadoop/trunk/src/native/packageNativeHadoop.sh
    lucene/hadoop/trunk/src/native/src/
    lucene/hadoop/trunk/src/native/src/org/
    lucene/hadoop/trunk/src/native/src/org/apache/
    lucene/hadoop/trunk/src/native/src/org/apache/hadoop/
    lucene/hadoop/trunk/src/native/src/org/apache/hadoop/io/
    lucene/hadoop/trunk/src/native/src/org/apache/hadoop/io/compress/
    lucene/hadoop/trunk/src/native/src/org/apache/hadoop/io/compress/zlib/
    lucene/hadoop/trunk/src/native/src/org/apache/hadoop/io/compress/zlib/Makefile.am
    lucene/hadoop/trunk/src/native/src/org/apache/hadoop/io/compress/zlib/Makefile.in
    lucene/hadoop/trunk/src/native/src/org/apache/hadoop/io/compress/zlib/ZlibCompressor.c
    lucene/hadoop/trunk/src/native/src/org/apache/hadoop/io/compress/zlib/ZlibDecompressor.c
    lucene/hadoop/trunk/src/native/src/org/apache/hadoop/io/compress/zlib/org_apache_hadoop_io_compress_zlib.h
    lucene/hadoop/trunk/src/native/src/org_apache_hadoop.h
Modified:
    lucene/hadoop/trunk/CHANGES.txt
    lucene/hadoop/trunk/bin/hadoop
    lucene/hadoop/trunk/build.xml
    lucene/hadoop/trunk/src/java/org/apache/hadoop/io/SequenceFile.java
    lucene/hadoop/trunk/src/java/org/apache/hadoop/io/compress/CompressionInputStream.java
    lucene/hadoop/trunk/src/java/org/apache/hadoop/io/compress/DefaultCodec.java
    lucene/hadoop/trunk/src/java/org/apache/hadoop/io/compress/GzipCodec.java
    lucene/hadoop/trunk/src/java/org/apache/hadoop/util/ReflectionUtils.java
    lucene/hadoop/trunk/src/test/org/apache/hadoop/io/TestSequenceFile.java
    lucene/hadoop/trunk/src/test/org/apache/hadoop/io/compress/TestCodec.java
    lucene/hadoop/trunk/src/test/org/apache/hadoop/io/compress/TestCodecFactory.java
    lucene/hadoop/trunk/src/test/org/apache/hadoop/mapred/TestTextInputFormat.java

Modified: lucene/hadoop/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/CHANGES.txt?view=diff&rev=475025&r1=475024&r2=475025
==============================================================================
--- lucene/hadoop/trunk/CHANGES.txt (original)
+++ lucene/hadoop/trunk/CHANGES.txt Tue Nov 14 14:35:22 2006
@@ -76,6 +76,11 @@
 23. HADOOP-715.  Fix build.xml so that test logs are written in build
     directory, rather than in CWD.  (Arun C Murthy via cutting)
 
+24. HADOOP-538.  Add support for building an optional native library,
+    libhadoop.so, that improves the performance of zlib-based
+    compression.  To build this, specify -Dcompile.native to Ant.
+    (Arun C Murthy via cutting)
+
 
 Release 0.8.0 - 2006-11-03
 

Modified: lucene/hadoop/trunk/bin/hadoop
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/bin/hadoop?view=diff&rev=475025&r1=475024&r2=475025
==============================================================================
--- lucene/hadoop/trunk/bin/hadoop (original)
+++ lucene/hadoop/trunk/bin/hadoop Tue Nov 14 14:35:22 2006
@@ -107,6 +107,14 @@
   CLASSPATH=${CLASSPATH}:$f;
 done
 
+# setup 'java.library.path' for native-hadoop code
+JAVA_PLATFORM=`CLASSPATH=${CLASSPATH} java org.apache.hadoop.util.PlatformName`
+JAVA_LIBRARY_PATH=''
+if [ -d "$HADOOP_HOME/build/classes" ]; then
+  JAVA_LIBRARY_PATH=${HADOOP_HOME}/build/native/${JAVA_PLATFORM}/lib
+fi
+JAVA_LIBRARY_PATH=${JAVA_LIBRARY_PATH}:${HADOOP_HOME}/lib/native/${JAVA_PLATFORM}
+  
 # restore ordinary behaviour
 unset IFS
 
@@ -157,6 +165,7 @@
 HADOOP_OPTS="$HADOOP_OPTS -Dhadoop.home.dir=$HADOOP_HOME"
 HADOOP_OPTS="$HADOOP_OPTS -Dhadoop.id.str=$HADOOP_IDENT_STRING"
 HADOOP_OPTS="$HADOOP_OPTS -Dhadoop.root.logger=${HADOOP_ROOT_LOGGER:-INFO,console}"
+HADOOP_OPTS="$HADOOP_OPTS -Djava.library.path=$JAVA_LIBRARY_PATH"
 
 # run it
 exec "$JAVA" $JAVA_HEAP_MAX $HADOOP_OPTS -classpath "$CLASSPATH" $CLASS "$@"

Modified: lucene/hadoop/trunk/build.xml
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/build.xml?view=diff&rev=475025&r1=475024&r2=475025
==============================================================================
--- lucene/hadoop/trunk/build.xml (original)
+++ lucene/hadoop/trunk/build.xml Tue Nov 14 14:35:22 2006
@@ -15,6 +15,7 @@
   <property name="libhdfs.version" value="1"/>
 
   <property name="src.dir" value="${basedir}/src/java"/>
+  <property name="native.src.dir" value="${basedir}/src/native"/>
   <property name="examples.dir" value="${basedir}/src/examples"/>
   <property name="lib.dir" value="${basedir}/lib"/>
   <property name="conf.dir" value="${basedir}/conf"/>
@@ -28,6 +29,8 @@
   <property name="build.webapps" value="${build.dir}/webapps"/>
   <property name="build.examples" value="${build.dir}/examples"/>
   <property name="build.libhdfs" value="${build.dir}/libhdfs"/>
+  <property name="build.platform" value="${os.name}-${os.arch}-${sun.arch.data.model}"/>
+  <property name="build.native" value="${build.dir}/native/${build.platform}"/>
   <property name="build.docs" value="${build.dir}/docs"/>
   <property name="build.javadoc" value="${build.docs}/api"/>
   <property name="build.encoding" value="ISO-8859-1"/>
@@ -140,7 +143,7 @@
       />
   </target>
   
-  <target name="compile-core" depends="init, record-parser">
+  <target name="compile-core-classes" depends="init, record-parser">
 
     <copy file="${src.webapps}/datanode/browseDirectory.jsp" todir="${src.webapps}/dfs/"/>
     <jsp-compile
@@ -184,6 +187,46 @@
       <classpath refid="classpath"/>
     </javac>    
 
+    </target>
+
+  <target name="compile-core-native" depends="compile-core-classes"
+          if="compile.native">
+  	
+    <mkdir dir="${build.native}/lib"/>
+    <mkdir dir="${build.native}/src/org/apache/hadoop/io/compress/zlib"/>
+
+  	<javah
+  	  classpath="${build.classes}"
+  	  destdir="${build.native}/src/org/apache/hadoop/io/compress/zlib"
+      force="yes"
+  	  verbose="yes"
+  	  >
+  	  <class name="org.apache.hadoop.io.compress.zlib.ZlibCompressor" />
+      <class name="org.apache.hadoop.io.compress.zlib.ZlibDecompressor" />
+  	</javah>
+
+	<exec dir="${build.native}" executable="sh" failonerror="true">
+	  <env key="OS_NAME" value="${os.name}"/>
+	  <env key="OS_ARCH" value="${os.arch}"/>
+	  <env key="JVM_DATA_MODEL" value="${sun.arch.data.model}"/>
+	  <env key="HADOOP_NATIVE_SRCDIR" value="${native.src.dir}"/>
+	  <arg line="${native.src.dir}/configure"/>
+    </exec>
+
+    <exec dir="${build.native}" executable="make" failonerror="true">
+      <env key="OS_NAME" value="${os.name}"/>
+      <env key="OS_ARCH" value="${os.arch}"/>
+  	  <env key="JVM_DATA_MODEL" value="${sun.arch.data.model}"/>
+  	  <env key="HADOOP_NATIVE_SRCDIR" value="${native.src.dir}"/>
+    </exec>
+
+	<exec dir="${build.native}" executable="sh" failonerror="true">
+	  <arg line="${build.native}/libtool --mode=install cp ${build.native}/lib/libhadoop.la ${build.native}/lib"/>
+    </exec>
+
+  </target>
+
+  <target name="compile-core" depends="compile-core-classes,compile-core-native">
   </target>
 
   <target name="compile-contrib" depends="compile-core">
@@ -328,6 +371,8 @@
       <sysproperty key="test.build.data" value="${test.build.data}"/>
       <sysproperty key="hadoop.log.dir" value="${test.log.dir}"/>
       <sysproperty key="test.src.dir" value="${test.src.dir}"/>
+      <sysproperty key="java.library.path"
+       value="${build.native}/lib:${lib.dir}/native/${build.platform}"/>
       <classpath refid="${test.classpath.id}"/>
       <formatter type="${test.junit.output.format}" />
       <batchtest todir="${test.build.dir}" unless="testcase">
@@ -409,8 +454,18 @@
     <mkdir dir="${dist.dir}/docs/api"/>
 
     <copy todir="${dist.dir}/lib" includeEmptyDirs="false">
-      <fileset dir="lib"/>
+      <fileset dir="lib">
+        <exclude name="**/native/**"/>
+      </fileset>
     </copy>
+
+    <mkdir dir="${dist.dir}/lib/native"/>
+  	<exec dir="${dist.dir}" executable="sh" failonerror="true">
+	  <env key="BASE_NATIVE_LIB_DIR" value="${lib.dir}/native"/>
+	  <env key="BUILD_NATIVE_DIR" value="${build.dir}/native"/>
+	  <env key="DIST_LIB_DIR" value="${dist.dir}/lib/native"/>
+	  <arg line="${native.src.dir}/packageNativeHadoop.sh"/>
+    </exec>
 
     <copy todir="${dist.dir}/webapps">
       <fileset dir="${build.webapps}"/>

Modified: lucene/hadoop/trunk/src/java/org/apache/hadoop/io/SequenceFile.java
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/java/org/apache/hadoop/io/SequenceFile.java?view=diff&rev=475025&r1=475024&r2=475025
==============================================================================
--- lucene/hadoop/trunk/src/java/org/apache/hadoop/io/SequenceFile.java (original)
+++ lucene/hadoop/trunk/src/java/org/apache/hadoop/io/SequenceFile.java Tue Nov 14 14:35:22 2006
@@ -30,11 +30,13 @@
 import org.apache.hadoop.io.compress.CompressionInputStream;
 import org.apache.hadoop.io.compress.CompressionOutputStream;
 import org.apache.hadoop.io.compress.DefaultCodec;
-import org.apache.hadoop.mapred.JobConf;
+import org.apache.hadoop.io.compress.GzipCodec;
+import org.apache.hadoop.io.compress.zlib.ZlibFactory;
 import org.apache.hadoop.conf.*;
 import org.apache.hadoop.util.Progressable;
 import org.apache.hadoop.util.Progress;
 import org.apache.hadoop.util.ReflectionUtils;
+import org.apache.hadoop.util.NativeCodeLoader;
 
 /** Support for flat files of binary key/value pairs. */
 public class SequenceFile {
@@ -185,6 +187,13 @@
       Class keyClass, Class valClass, 
       CompressionType compressionType, CompressionCodec codec) 
   throws IOException {
+    if ((codec instanceof GzipCodec) && 
+        !NativeCodeLoader.isNativeCodeLoaded() && 
+        !ZlibFactory.isNativeZlibLoaded()) {
+      throw new IllegalArgumentException("SequenceFile doesn't work with " +
+          "GzipCodec without native-hadoop code!");
+    }
+    
     Writer writer = null;
     
     if (compressionType == CompressionType.NONE) {
@@ -218,6 +227,13 @@
       Class keyClass, Class valClass, 
       CompressionType compressionType, CompressionCodec codec,
       Progressable progress) throws IOException {
+    if ((codec instanceof GzipCodec) && 
+        !NativeCodeLoader.isNativeCodeLoaded() && 
+        !ZlibFactory.isNativeZlibLoaded()) {
+      throw new IllegalArgumentException("SequenceFile doesn't work with " +
+          "GzipCodec without native-hadoop code!");
+    }
+    
     Writer writer = null;
     
     if (compressionType == CompressionType.NONE) {
@@ -244,18 +260,25 @@
    * @throws IOException
    */
   private static Writer
-  createWriter(FSDataOutputStream out, 
+  createWriter(Configuration conf, FSDataOutputStream out, 
       Class keyClass, Class valClass, boolean compress, boolean blockCompress,
       CompressionCodec codec)
   throws IOException {
+    if ((codec instanceof GzipCodec) && 
+        !NativeCodeLoader.isNativeCodeLoaded() && 
+        !ZlibFactory.isNativeZlibLoaded()) {
+      throw new IllegalArgumentException("SequenceFile doesn't work with " +
+          "GzipCodec without native-hadoop code!");
+    }
+
     Writer writer = null;
 
     if (!compress) {
-      writer = new Writer(out, keyClass, valClass);
+      writer = new Writer(conf, out, keyClass, valClass);
     } else if (compress && !blockCompress) {
-      writer = new RecordCompressWriter(out, keyClass, valClass, codec);
+      writer = new RecordCompressWriter(conf, out, keyClass, valClass, codec);
     } else {
-      writer = new BlockCompressWriter(out, keyClass, valClass, codec);
+      writer = new BlockCompressWriter(conf, out, keyClass, valClass, codec);
     }
     
     return writer;
@@ -364,6 +387,7 @@
   
   /** Write key/value pairs to a sequence-format file. */
   public static class Writer {
+    Configuration conf;
     FSDataOutputStream out;
     DataOutputBuffer buffer = new DataOutputBuffer();
     Path target = null;
@@ -406,16 +430,17 @@
     public Writer(FileSystem fs, Configuration conf, Path name, 
         Class keyClass, Class valClass, Progressable progress)
       throws IOException {
-      init(name, fs.create(name, progress), keyClass, valClass, false, null);
+      init(name, conf, fs.create(name, progress), keyClass, valClass, false, null);
       initializeFileHeader();
       writeFileHeader();
       finalizeFileHeader();
     }
 
     /** Write to an arbitrary stream using a specified buffer size. */
-    private Writer(FSDataOutputStream out, Class keyClass, Class valClass)
+    private Writer(Configuration conf, FSDataOutputStream out, 
+        Class keyClass, Class valClass)
     throws IOException {
-      init(null, out, keyClass, valClass, false, null);
+      init(null, conf, out, keyClass, valClass, false, null);
       
       initializeFileHeader();
       writeFileHeader();
@@ -453,17 +478,19 @@
     }
 
     /** Initialize. */
-    void init(Path name, FSDataOutputStream out,
+    void init(Path name, Configuration conf, FSDataOutputStream out,
                       Class keyClass, Class valClass,
                       boolean compress, CompressionCodec codec) 
     throws IOException {
       this.target = name;
+      this.conf = conf;
       this.out = out;
       this.keyClass = keyClass;
       this.valClass = valClass;
       this.compress = compress;
       this.codec = codec;
       if(this.codec != null) {
+        ReflectionUtils.setConf(this.codec, this.conf);
         this.deflateFilter = this.codec.createOutputStream(buffer);
         this.deflateOut = 
           new DataOutputStream(new BufferedOutputStream(deflateFilter));
@@ -479,6 +506,9 @@
     /** Returns the compression codec of data in this file. */
     public CompressionCodec getCompressionCodec() { return codec; }
 
+    /** Returns the configuration of this file. */
+    Configuration getConf() { return conf; }
+    
     /** Close the file. */
     public synchronized void close() throws IOException {
       if (out != null) {
@@ -571,7 +601,7 @@
     public RecordCompressWriter(FileSystem fs, Configuration conf, Path name, 
         Class keyClass, Class valClass, CompressionCodec codec) 
     throws IOException {
-      super.init(name, fs.create(name), keyClass, valClass, true, codec);
+      super.init(name, conf, fs.create(name), keyClass, valClass, true, codec);
       
       initializeFileHeader();
       writeFileHeader();
@@ -583,7 +613,7 @@
         Class keyClass, Class valClass, CompressionCodec codec,
         Progressable progress)
     throws IOException {
-      super.init(name, fs.create(name, progress), 
+      super.init(name, conf, fs.create(name, progress), 
           keyClass, valClass, true, codec);
       
       initializeFileHeader();
@@ -592,10 +622,10 @@
     }
     
     /** Write to an arbitrary stream using a specified buffer size. */
-    private RecordCompressWriter(FSDataOutputStream out,
+    private RecordCompressWriter(Configuration conf, FSDataOutputStream out,
                    Class keyClass, Class valClass, CompressionCodec codec)
       throws IOException {
-      super.init(null, out, keyClass, valClass, true, codec);
+      super.init(null, conf, out, keyClass, valClass, true, codec);
       
       initializeFileHeader();
       writeFileHeader();
@@ -675,7 +705,7 @@
     public BlockCompressWriter(FileSystem fs, Configuration conf, Path name, 
         Class keyClass, Class valClass, CompressionCodec codec) 
     throws IOException {
-      super.init(name, fs.create(name), keyClass, valClass, true, codec);
+      super.init(name, conf, fs.create(name), keyClass, valClass, true, codec);
       init(conf.getInt("io.seqfile.compress.blocksize", 1000000));
       
       initializeFileHeader();
@@ -688,7 +718,7 @@
         Class keyClass, Class valClass, CompressionCodec codec,
         Progressable progress)
     throws IOException {
-      super.init(name, fs.create(name, progress), keyClass, valClass, 
+      super.init(name, conf, fs.create(name, progress), keyClass, valClass, 
           true, codec);
       init(conf.getInt("io.seqfile.compress.blocksize", 1000000));
       
@@ -698,10 +728,10 @@
     }
     
     /** Write to an arbitrary stream using a specified buffer size. */
-    private BlockCompressWriter(FSDataOutputStream out,
+    private BlockCompressWriter(Configuration conf, FSDataOutputStream out,
                    Class keyClass, Class valClass, CompressionCodec codec)
       throws IOException {
-      super.init(null, out, keyClass, valClass, true, codec);
+      super.init(null, conf, out, keyClass, valClass, true, codec);
       init(1000000);
       
       initializeFileHeader();
@@ -1013,6 +1043,9 @@
     /** Returns the compression codec of data in this file. */
     public CompressionCodec getCompressionCodec() { return codec; }
 
+    /** Returns the configuration used for this file. */
+    Configuration getConf() { return conf; }
+    
     /** Read a compressed buffer */
     private synchronized void readBuffer(DataInputBuffer buffer, 
         CompressionInputStream filter) throws IOException {
@@ -1731,7 +1764,7 @@
         }
 
         long segmentStart = out.getPos();
-        Writer writer = createWriter(out, keyClass, valClass, 
+        Writer writer = createWriter(conf, out, keyClass, valClass, 
             isCompressed, isBlockCompressed, codec);
         
         if (!done) {
@@ -1851,7 +1884,7 @@
      * @param inNames the array of path names
      * @param deleteInputs true if the input files should be deleted when 
      * unnecessary
-     * @return RawKeyValueIterator
+     * @return RawKeyValueIteratorMergeQueue
      * @throws IOException
      */
     public RawKeyValueIterator merge(Path [] inNames, boolean deleteInputs) 
@@ -1893,7 +1926,7 @@
         out = fs.create(outputFile, true, memory/(factor+1), prog);
       else
         out = fs.create(outputFile, true, memory/(factor+1));
-      Writer writer = createWriter(out, keyClass, valClass, compress, 
+      Writer writer = createWriter(conf, out, keyClass, valClass, compress, 
                           blockCompress, codec);
       return writer;
     }

Modified: lucene/hadoop/trunk/src/java/org/apache/hadoop/io/compress/CompressionInputStream.java
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/java/org/apache/hadoop/io/compress/CompressionInputStream.java?view=diff&rev=475025&r1=475024&r2=475025
==============================================================================
--- lucene/hadoop/trunk/src/java/org/apache/hadoop/io/compress/CompressionInputStream.java (original)
+++ lucene/hadoop/trunk/src/java/org/apache/hadoop/io/compress/CompressionInputStream.java Tue Nov 14 14:35:22 2006
@@ -39,7 +39,8 @@
   /**
    * Create a compression input stream that reads
    * the decompressed bytes from the given stream.
-   * @param in
+   * 
+   * @param in The input stream to be compressed.
    */
   protected CompressionInputStream(InputStream in) {
     this.in = in;

Added: lucene/hadoop/trunk/src/java/org/apache/hadoop/io/compress/Compressor.java
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/java/org/apache/hadoop/io/compress/Compressor.java?view=auto&rev=475025
==============================================================================
--- lucene/hadoop/trunk/src/java/org/apache/hadoop/io/compress/Compressor.java (added)
+++ lucene/hadoop/trunk/src/java/org/apache/hadoop/io/compress/Compressor.java Tue Nov 14 14:35:22 2006
@@ -0,0 +1,98 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.io.compress;
+
+import java.io.IOException;
+
+/**
+ * Specification of a stream-based 'compressor' which can be  
+ * plugged into a {@link CompressionOutputStream} to compress data.
+ * This is modelled after {@link java.util.zip.Deflater}
+ * 
+ * @author Arun C Murthy
+ */
+public interface Compressor {
+  /**
+   * Sets input data for compression. 
+   * This should be called whenever #needsInput() returns 
+   * <code>true</code> indicating that more input data is required.
+   * 
+   * @param b Input data
+   * @param off Start offset
+   * @param len Length
+   */
+  public void setInput(byte[] b, int off, int len);
+  
+  /**
+   * Returns true if the input data buffer is empty and 
+   * #setInput() should be called to provide more input. 
+   * 
+   * @return <code>true</code> if the input data buffer is empty and 
+   * #setInput() should be called in order to provide more input.
+   */
+  public boolean needsInput();
+  
+  /**
+   * Sets preset dictionary for compression. A preset dictionary 
+   * is used when the history buffer can be predetermined. 
+   *
+   * @param b Dictionary data bytes
+   * @param off Start offset
+   * @param len Length
+   */
+  public void setDictionary(byte[] b, int off, int len);
+  
+
+  /**
+   * When called, indicates that compression should end
+   * with the current contents of the input buffer.
+   */
+  public void finish();
+  
+  /**
+   * Returns true if the end of the compressed 
+   * data output stream has been reached.
+   * @return <code>true</code> if the end of the compressed
+   * data output stream has been reached.
+   */
+  public boolean finished();
+  
+  /**
+   * Fills specified buffer with compressed data. Returns actual number
+   * of bytes of compressed data. A return value of 0 indicates that
+   * needsInput() should be called in order to determine if more input
+   * data is required.
+   * 
+   * @param b Buffer for the compressed data
+   * @param off Start offset of the data
+   * @param len Size of the buffer
+   * @return The actual number of bytes of compressed data.
+   */
+  public int compress(byte[] b, int off, int len) throws IOException;
+  
+  /**
+   * Resets compressor so that a new set of input data can be processed.
+   */
+  public void reset();
+  
+  /**
+   * Closes the compressor and discards any unprocessed input.
+   */
+  public void end(); 
+}

Added: lucene/hadoop/trunk/src/java/org/apache/hadoop/io/compress/CompressorStream.java
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/java/org/apache/hadoop/io/compress/CompressorStream.java?view=auto&rev=475025
==============================================================================
--- lucene/hadoop/trunk/src/java/org/apache/hadoop/io/compress/CompressorStream.java (added)
+++ lucene/hadoop/trunk/src/java/org/apache/hadoop/io/compress/CompressorStream.java Tue Nov 14 14:35:22 2006
@@ -0,0 +1,113 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.io.compress;
+
+import java.io.IOException;
+import java.io.OutputStream;
+
+import org.apache.hadoop.io.compress.CompressionOutputStream;
+import org.apache.hadoop.io.compress.Compressor;
+
+class CompressorStream extends CompressionOutputStream {
+  Compressor compressor;
+  byte[] buffer;
+  boolean closed = false;
+  OutputStream rawOut;
+  
+  public CompressorStream(OutputStream out, Compressor compressor, int bufferSize) {
+    super(out);
+    rawOut = out;
+
+    if (out == null || compressor == null) {
+      throw new NullPointerException();
+    } else if (bufferSize <= 0) {
+      throw new IllegalArgumentException("Illegal bufferSize");
+    }
+
+    this.compressor = compressor;
+    buffer = new byte[bufferSize];
+  }
+
+  public CompressorStream(OutputStream out, Compressor compressor) {
+    this(out, compressor, 512);
+  }
+  
+  /**
+   * Allow derived classes to directly set the underlying stream.
+   * 
+   * @param out Underlying output stream.
+   */
+  protected CompressorStream(OutputStream out) {
+    super(out);
+  }
+
+  public void write(byte[] b, int off, int len) throws IOException {
+    // Sanity checks
+    if (compressor.finished()) {
+      throw new IOException("write beyond end of stream");
+    }
+    if ((off | len | (off + len) | (b.length - (off + len))) < 0) {
+      throw new IndexOutOfBoundsException();
+    } else if (len == 0) {
+      return;
+    }
+    
+    if (!compressor.finished()) {
+      compressor.setInput(b, off, len);
+      while (!compressor.needsInput()) {
+        compress();
+      }
+    }
+  }
+
+  void compress() throws IOException {
+    int len = compressor.compress(buffer, 0, buffer.length);
+    if (len > 0) {
+      out.write(buffer, 0, len);
+    }
+  }
+
+  public void finish() throws IOException {
+    if (!compressor.finished()) {
+      compressor.finish();
+      while (!compressor.finished()) {
+        compress();
+      }
+    }
+  }
+
+  public void resetState() throws IOException {
+    compressor.reset();
+  }
+  
+  public void close() throws IOException {
+    if (!closed) {
+      finish();
+      out.close();
+      closed = true;
+    }
+  }
+
+  byte[] oneByte = new byte[1];
+  public void write(int b) throws IOException {
+    oneByte[0] = (byte)(b & 0xff);
+    write(oneByte, 0, oneByte.length);
+  }
+
+}

Added: lucene/hadoop/trunk/src/java/org/apache/hadoop/io/compress/Decompressor.java
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/java/org/apache/hadoop/io/compress/Decompressor.java?view=auto&rev=475025
==============================================================================
--- lucene/hadoop/trunk/src/java/org/apache/hadoop/io/compress/Decompressor.java (added)
+++ lucene/hadoop/trunk/src/java/org/apache/hadoop/io/compress/Decompressor.java Tue Nov 14 14:35:22 2006
@@ -0,0 +1,98 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.io.compress;
+
+import java.io.IOException;
+
+/**
+ * Specification of a stream-based 'de-compressor' which can be  
+ * plugged into a {@link CompressionInputStream} to compress data.
+ * This is modelled after {@link java.util.zip.Inflater}
+ * 
+ * @author Arun C Murthy
+ */
+public interface Decompressor {
+  /**
+   * Sets input data for decompression. 
+   * This should be called whenever #needsInput() returns 
+   * <code>true</code> indicating that more input data is required.
+   * 
+   * @param b Input data
+   * @param off Start offset
+   * @param len Length
+   */
+  public void setInput(byte[] b, int off, int len);
+  
+  /**
+   * Returns true if the input data buffer is empty and 
+   * #setInput() should be called to provide more input. 
+   * 
+   * @return <code>true</code> if the input data buffer is empty and 
+   * #setInput() should be called in order to provide more input.
+   */
+  public boolean needsInput();
+  
+  /**
+   * Sets preset dictionary for compression. A preset dictionary
+   * is used when the history buffer can be predetermined. 
+   *
+   * @param b Dictionary data bytes
+   * @param off Start offset
+   * @param len Length
+   */
+  public void setDictionary(byte[] b, int off, int len);
+  
+  /**
+   * Returns <code>true</code> if a preset dictionary is needed for decompression.
+   * @return <code>true</code> if a preset dictionary is needed for decompression
+   */
+  public boolean needsDictionary();
+
+  /**
+   * Returns true if the end of the compressed 
+   * data output stream has been reached.
+   * @return <code>true</code> if the end of the compressed
+   * data output stream has been reached.
+   */
+  public boolean finished();
+  
+  /**
+   * Fills specified buffer with uncompressed data. Returns actual number
+   * of bytes of uncompressed data. A return value of 0 indicates that
+   * #needsInput() should be called in order to determine if more input
+   * data is required.
+   * 
+   * @param b Buffer for the compressed data
+   * @param off Start offset of the data
+   * @param len Size of the buffer
+   * @return The actual number of bytes of compressed data.
+   * @throws IOException
+   */
+  public int decompress(byte[] b, int off, int len) throws IOException;
+  
+  /**
+   * Resets decompressor so that a new set of input data can be processed.
+   */
+  public void reset();
+  
+  /**
+   * Closes the decompressor and discards any unprocessed input.
+   */
+  public void end(); 
+}

Added: lucene/hadoop/trunk/src/java/org/apache/hadoop/io/compress/DecompressorStream.java
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/java/org/apache/hadoop/io/compress/DecompressorStream.java?view=auto&rev=475025
==============================================================================
--- lucene/hadoop/trunk/src/java/org/apache/hadoop/io/compress/DecompressorStream.java (added)
+++ lucene/hadoop/trunk/src/java/org/apache/hadoop/io/compress/DecompressorStream.java Tue Nov 14 14:35:22 2006
@@ -0,0 +1,159 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.io.compress;
+
+import java.io.EOFException;
+import java.io.IOException;
+import java.io.InputStream;
+
+import org.apache.hadoop.io.compress.Decompressor;
+
+class DecompressorStream extends CompressionInputStream {
+  Decompressor decompressor = null;
+  byte[] buffer;
+  boolean eof = false;
+  boolean closed = false;
+  
+  public DecompressorStream(InputStream in, Decompressor decompressor, int bufferSize) {
+    super(in);
+
+    if (in == null || decompressor == null) {
+      throw new NullPointerException();
+    } else if (bufferSize <= 0) {
+      throw new IllegalArgumentException("Illegal bufferSize");
+    }
+
+    this.decompressor = decompressor;
+    buffer = new byte[bufferSize];
+  }
+
+  public DecompressorStream(InputStream in, Decompressor decompressor) {
+    this(in, decompressor, 512);
+  }
+
+  /**
+   * Allow derived classes to directly set the underlying stream.
+   * 
+   * @param in Underlying input stream.
+   */
+  protected DecompressorStream(InputStream in) {
+    super(in);
+  }
+  
+  byte[] oneByte = new byte[1];
+  public int read() throws IOException {
+    checkStream();
+    return (read(oneByte, 0, oneByte.length) == -1) ? -1 : (oneByte[0] & 0xff);
+  }
+
+  public int read(byte[] b, int off, int len) throws IOException {
+    checkStream();
+    
+    if ((off | len | (off + len) | (b.length - (off + len))) < 0) {
+      throw new IndexOutOfBoundsException();
+    } else if (len == 0) {
+      return 0;
+    }
+
+    return decompress(b, off, len);
+  }
+
+  int decompress(byte[] b, int off, int len) throws IOException {
+    int n = 0;
+    
+    while ((n = decompressor.decompress(b, off, len)) == 0) {
+      if (decompressor.finished() || decompressor.needsDictionary()) {
+        eof = true;
+        return -1;
+      }
+      if (decompressor.needsInput()) {
+        getCompressedData();
+      }
+    }
+    
+    return n;
+  }
+  
+  void getCompressedData() throws IOException {
+    checkStream();
+  
+    int n = in.read(buffer, 0, buffer.length);
+    if (n == -1) {
+        throw new EOFException("Unexpected end of input stream");
+    }
+
+    decompressor.setInput(buffer, 0, n);
+  }
+  
+  void checkStream() throws IOException {
+    if (closed) {
+      throw new IOException("Stream closed");
+    }
+  }
+  
+  public void resetState() throws IOException {
+    decompressor.reset();
+  }
+
+  byte[] skipBytes = new byte[512];
+  public long skip(long n) throws IOException {
+    // Sanity checks
+    if (n < 0) {
+      throw new IllegalArgumentException("negative skip length");
+    }
+    checkStream();
+    
+    // Read 'n' bytes
+    int skipped = 0;
+    while (skipped < n) {
+      int len = Math.min(((int)n - skipped), skipBytes.length);
+      len = read(skipBytes, 0, len);
+      if (len == -1) {
+        eof = true;
+        break;
+      }
+      skipped += len;
+    }
+    return skipped;
+  }
+
+  public int available() throws IOException {
+    checkStream();
+    return (eof) ? 0 : 1;
+  }
+
+  public void close() throws IOException {
+    if (!closed) {
+      in.close();
+      closed = true;
+    }
+  }
+
+  public boolean markSupported() {
+    return false;
+  }
+
+  public synchronized void mark(int readlimit) {
+  }
+
+  public synchronized void reset() throws IOException {
+    throw new IOException("mark/reset not supported");
+  }
+
+}

Modified: lucene/hadoop/trunk/src/java/org/apache/hadoop/io/compress/DefaultCodec.java
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/java/org/apache/hadoop/io/compress/DefaultCodec.java?view=diff&rev=475025&r1=475024&r2=475025
==============================================================================
--- lucene/hadoop/trunk/src/java/org/apache/hadoop/io/compress/DefaultCodec.java (original)
+++ lucene/hadoop/trunk/src/java/org/apache/hadoop/io/compress/DefaultCodec.java Tue Nov 14 14:35:22 2006
@@ -18,128 +18,24 @@
 
 package org.apache.hadoop.io.compress;
 
-import java.io.*;
-import java.util.zip.DeflaterOutputStream;
-import java.util.zip.InflaterInputStream;
+import java.io.IOException;
+import java.io.OutputStream;
+import java.io.InputStream;
+
+import org.apache.hadoop.conf.Configurable;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.io.compress.zlib.*;
 
-public class DefaultCodec implements CompressionCodec {
-
-  /**
-   * A bridge that wraps around a DeflaterOutputStream to make it 
-   * a CompressionOutputStream.
-   * @author Owen O'Malley
-   */
-  protected static class DefaultCompressionOutputStream 
-  extends CompressionOutputStream {
-
-    /**
-     * A DeflaterOutputStream that provides a mechanism to 
-     * reset the decompressor.
-     * @author Owen O'Malley
-     */
-    private static class ResetableDeflaterOutputStream 
-    extends DeflaterOutputStream {
-      
-      public ResetableDeflaterOutputStream(OutputStream out) {
-        super(out);
-      }
-      
-      public void resetState() throws IOException {
-        def.reset();
-      }
-    }
-    
-    public DefaultCompressionOutputStream(OutputStream out) {
-      super(new ResetableDeflaterOutputStream(out));
-    }
-    
-    /**
-     * Allow children types to put a different type in here (namely gzip).
-     * @param out the Deflater stream to use
-     */
-    protected DefaultCompressionOutputStream(DeflaterOutputStream out) {
-      super(out);
-    }
-    
-    public void close() throws IOException {
-      out.close();
-    }
-    
-    public void flush() throws IOException {
-      out.flush();
-    }
-    
-    public void write(int b) throws IOException {
-      out.write(b);
-    }
-    
-    public void write(byte[] data, int offset, int length) 
-    throws IOException {
-      out.write(data, offset, length);
-    }
-    
-    public void finish() throws IOException {
-      ((DeflaterOutputStream) out).finish();
-    }
-    
-    public void resetState() throws IOException {
-      ((ResetableDeflaterOutputStream) out).resetState();
-    }
+public class DefaultCodec implements Configurable, CompressionCodec {
+  
+  Configuration conf;
+  
+  public void setConf(Configuration conf) {
+    this.conf = conf;
   }
   
-  protected static class DefaultCompressionInputStream 
-  extends CompressionInputStream {
-    
-    /**
-     * A InflaterStream that provides a mechanism to reset the decompressor.
-     * @author Owen O'Malley
-     */
-    private static class ResetableInflaterInputStream 
-    extends InflaterInputStream {
-      public ResetableInflaterInputStream(InputStream in) {
-        super(in);
-      }
-      
-      public void resetState() throws IOException {
-        inf.reset();
-      }
-    }
-    
-    public DefaultCompressionInputStream(InputStream in) {
-      super(new ResetableInflaterInputStream(in));
-    }
-    
-    /**
-     * Allow subclasses to directly set the inflater stream
-     */
-    protected DefaultCompressionInputStream(InflaterInputStream in) {
-      super(in);
-    }
-    
-    public int available() throws IOException {
-      return in.available(); 
-    }
-
-    public void close() throws IOException {
-      in.close();
-    }
-
-    public int read() throws IOException {
-      return in.read();
-    }
-    
-    public int read(byte[] data, int offset, int len) throws IOException {
-      return in.read(data, offset, len);
-    }
-    
-    public long skip(long offset) throws IOException {
-      return in.skip(offset);
-    }
-    
-    public void resetState() throws IOException {
-      ((ResetableInflaterInputStream) in).resetState();
-    }
-    
+  public Configuration getConf() {
+    return conf;
   }
   
   /**
@@ -149,7 +45,8 @@
    */
   public CompressionOutputStream createOutputStream(OutputStream out) 
   throws IOException {
-    return new DefaultCompressionOutputStream(out);
+    return new CompressorStream(out, ZlibFactory.getZlibCompressor(), 
+        conf.getInt("io.file.buffer.size", 4*1024));
   }
   
   /**
@@ -159,7 +56,8 @@
    */
   public CompressionInputStream createInputStream(InputStream in) 
   throws IOException {
-    return new DefaultCompressionInputStream(in);
+    return new DecompressorStream(in, ZlibFactory.getZlibDecompressor(),
+        conf.getInt("io.file.buffer.size", 4*1024));
   }
   
   /**
@@ -169,5 +67,4 @@
   public String getDefaultExtension() {
     return ".deflate";
   }
-
 }

Modified: lucene/hadoop/trunk/src/java/org/apache/hadoop/io/compress/GzipCodec.java
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/java/org/apache/hadoop/io/compress/GzipCodec.java?view=diff&rev=475025&r1=475024&r2=475025
==============================================================================
--- lucene/hadoop/trunk/src/java/org/apache/hadoop/io/compress/GzipCodec.java (original)
+++ lucene/hadoop/trunk/src/java/org/apache/hadoop/io/compress/GzipCodec.java Tue Nov 14 14:35:22 2006
@@ -23,20 +23,22 @@
 import java.util.zip.GZIPInputStream;
 
 import org.apache.hadoop.io.compress.DefaultCodec;
+import org.apache.hadoop.io.compress.zlib.*;
 
 /**
  * This class creates gzip compressors/decompressors. 
  * @author Owen O'Malley
  */
 public class GzipCodec extends DefaultCodec {
-  
   /**
    * A bridge that wraps around a DeflaterOutputStream to make it 
    * a CompressionOutputStream.
    * @author Owen O'Malley
    */
-  protected static class GzipOutputStream extends DefaultCompressionOutputStream {
+  protected static class GzipOutputStream extends CompressorStream {
+
     private static class ResetableGZIPOutputStream extends GZIPOutputStream {
+      
       public ResetableGZIPOutputStream(OutputStream out) throws IOException {
         super(out);
       }
@@ -54,20 +56,40 @@
      * Allow children types to put a different type in here.
      * @param out the Deflater stream to use
      */
-    protected GzipOutputStream(DefaultCompressionOutputStream out) {
+    protected GzipOutputStream(CompressorStream out) {
       super(out);
     }
     
+    public void close() throws IOException {
+      out.close();
+    }
+    
+    public void flush() throws IOException {
+      out.flush();
+    }
+    
+    public void write(int b) throws IOException {
+      out.write(b);
+    }
+    
+    public void write(byte[] data, int offset, int length) 
+    throws IOException {
+      out.write(data, offset, length);
+    }
+    
+    public void finish() throws IOException {
+      ((ResetableGZIPOutputStream) out).finish();
+    }
 
     public void resetState() throws IOException {
       ((ResetableGZIPOutputStream) out).resetState();
     }
-
   }
   
-  protected static class GzipInputStream extends DefaultCompressionInputStream {
+  protected static class GzipInputStream extends DecompressorStream {
     
     private static class ResetableGZIPInputStream extends GZIPInputStream {
+
       public ResetableGZIPInputStream(InputStream in) throws IOException {
         super(in);
       }
@@ -84,10 +106,34 @@
     /**
      * Allow subclasses to directly set the inflater stream.
      */
-    protected GzipInputStream(DefaultCompressionInputStream in) {
+    protected GzipInputStream(DecompressorStream in) {
       super(in);
     }
-  }
+
+    public int available() throws IOException {
+      return in.available(); 
+    }
+
+    public void close() throws IOException {
+      in.close();
+    }
+
+    public int read() throws IOException {
+      return in.read();
+    }
+    
+    public int read(byte[] data, int offset, int len) throws IOException {
+      return in.read(data, offset, len);
+    }
+    
+    public long skip(long offset) throws IOException {
+      return in.skip(offset);
+    }
+    
+    public void resetState() throws IOException {
+      ((ResetableGZIPInputStream) in).resetState();
+    }
+  }  
   
   /**
    * Create a stream compressor that will write to the given output stream.
@@ -96,7 +142,22 @@
    */
   public CompressionOutputStream createOutputStream(OutputStream out) 
   throws IOException {
-    return new GzipOutputStream(out);
+    CompressionOutputStream compOutStream = null;
+    
+    if (ZlibFactory.isNativeZlibLoaded()) {
+      Compressor compressor = 
+        new ZlibCompressor(ZlibCompressor.CompressionLevel.DEFAULT_COMPRESSION,
+            ZlibCompressor.CompressionStrategy.DEFAULT_STRATEGY,
+            ZlibCompressor.CompressionHeader.GZIP_FORMAT,
+            64*1024); 
+     
+      compOutStream = new CompressorStream(out, compressor,
+                        conf.getInt("io.file.buffer.size", 4*1024)); 
+    } else {
+      compOutStream = new GzipOutputStream(out);
+    }
+    
+    return compOutStream;
   }
   
   /**
@@ -106,7 +167,20 @@
    */
   public CompressionInputStream createInputStream(InputStream in) 
   throws IOException {
-    return new GzipInputStream(in);
+    CompressionInputStream compInStream = null;
+    
+    if (ZlibFactory.isNativeZlibLoaded()) {
+      Decompressor decompressor =
+        new ZlibDecompressor(ZlibDecompressor.CompressionHeader.AUTODETECT_GZIP_ZLIB,
+            64*1-24);
+
+      compInStream = new DecompressorStream(in, decompressor,
+                        conf.getInt("io.file.buffer.size", 4*1024)); 
+    } else {
+      compInStream = new GzipInputStream(in);
+    }
+    
+    return compInStream;
   }
   
   /**

Added: lucene/hadoop/trunk/src/java/org/apache/hadoop/io/compress/zlib/BuiltInZlibDeflater.java
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/java/org/apache/hadoop/io/compress/zlib/BuiltInZlibDeflater.java?view=auto&rev=475025
==============================================================================
--- lucene/hadoop/trunk/src/java/org/apache/hadoop/io/compress/zlib/BuiltInZlibDeflater.java (added)
+++ lucene/hadoop/trunk/src/java/org/apache/hadoop/io/compress/zlib/BuiltInZlibDeflater.java Tue Nov 14 14:35:22 2006
@@ -0,0 +1,50 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.io.compress.zlib;
+
+import java.io.IOException;
+import java.util.zip.Deflater;
+
+import org.apache.hadoop.io.compress.Compressor;
+
+/**
+ * A wrapper around java.util.zip.Deflater to make it conform 
+ * to org.apache.hadoop.io.compress.Compressor interface.
+ * 
+ * @author Arun C Murthy
+ */
+public class BuiltInZlibDeflater extends Deflater implements Compressor {
+
+  public BuiltInZlibDeflater(int level, boolean nowrap) {
+    super(level, nowrap);
+  }
+
+  public BuiltInZlibDeflater(int level) {
+    super(level);
+  }
+
+  public BuiltInZlibDeflater() {
+    super();
+  }
+
+  public synchronized int compress(byte[] b, int off, int len) 
+  throws IOException {
+    return super.deflate(b, off, len);
+  }
+}

Added: lucene/hadoop/trunk/src/java/org/apache/hadoop/io/compress/zlib/BuiltInZlibInflater.java
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/java/org/apache/hadoop/io/compress/zlib/BuiltInZlibInflater.java?view=auto&rev=475025
==============================================================================
--- lucene/hadoop/trunk/src/java/org/apache/hadoop/io/compress/zlib/BuiltInZlibInflater.java (added)
+++ lucene/hadoop/trunk/src/java/org/apache/hadoop/io/compress/zlib/BuiltInZlibInflater.java Tue Nov 14 14:35:22 2006
@@ -0,0 +1,51 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.io.compress.zlib;
+
+import java.io.IOException;
+import java.util.zip.DataFormatException;
+import java.util.zip.Inflater;
+
+import org.apache.hadoop.io.compress.Decompressor;
+
+/**
+ * A wrapper around java.util.zip.Inflater to make it conform 
+ * to org.apache.hadoop.io.compress.Decompressor interface.
+ * 
+ * @author Arun C Murthy
+ */
+public class BuiltInZlibInflater extends Inflater implements Decompressor {
+
+  public BuiltInZlibInflater(boolean nowrap) {
+    super(nowrap);
+  }
+
+  public BuiltInZlibInflater() {
+    super();
+  }
+
+  public synchronized int decompress(byte[] b, int off, int len) 
+  throws IOException {
+    try {
+      return super.inflate(b, off, len);
+    } catch (DataFormatException dfe) {
+      throw new IOException(dfe.getMessage());
+    }
+  }
+}

Added: lucene/hadoop/trunk/src/java/org/apache/hadoop/io/compress/zlib/ZlibCompressor.java
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/java/org/apache/hadoop/io/compress/zlib/ZlibCompressor.java?view=auto&rev=475025
==============================================================================
--- lucene/hadoop/trunk/src/java/org/apache/hadoop/io/compress/zlib/ZlibCompressor.java (added)
+++ lucene/hadoop/trunk/src/java/org/apache/hadoop/io/compress/zlib/ZlibCompressor.java Tue Nov 14 14:35:22 2006
@@ -0,0 +1,378 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.io.compress.zlib;
+
+import java.io.IOException;
+import java.nio.Buffer;
+import java.nio.ByteBuffer;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.io.compress.Compressor;
+import org.apache.hadoop.util.NativeCodeLoader;
+
+/**
+ * A {@link Compressor} based on the popular 
+ * zlib compression algorithm.
+ * http://www.zlib.net/
+ * 
+ * @author Arun C Murthy
+ */
+public class ZlibCompressor implements Compressor {
+  private static final int DEFAULT_DIRECT_BUFFER_SIZE = 1*1024;
+  
+  private long stream;
+  private CompressionLevel level;
+  private CompressionStrategy strategy;
+  private CompressionHeader windowBits;
+  private int directBufferSize;
+  private byte[] userBuf = null;
+  private int userBufOff = 0, userBufLen = 0;
+  private Buffer uncompressedDirectBuf = null;
+  private int uncompressedDirectBufOff = 0, uncompressedDirectBufLen = 0;
+  private Buffer compressedDirectBuf = null;
+  private boolean finish, finished;
+
+  /**
+   * The compression level for zlib library.
+   */
+  public static enum CompressionLevel {
+    /**
+     * Compression level for no compression.
+     */
+    NO_COMPRESSION (0),
+    
+    /**
+     * Compression level for fastest compression.
+     */
+    BEST_SPEED (1),
+    
+    /**
+     * Compression level for best compression.
+     */
+    BEST_COMPRESSION (9),
+    
+    /**
+     * Default compression level.
+     */
+    DEFAULT_COMPRESSION (-1);
+    
+    
+    private final int compressionLevel;
+    
+    CompressionLevel(int level) {
+      compressionLevel = level;
+    }
+    
+    int compressionLevel() {
+      return compressionLevel;
+    }
+  };
+  
+  /**
+   * The compression level for zlib library.
+   */
+  public static enum CompressionStrategy {
+    /**
+     * Compression strategy best used for data consisting mostly of small
+     * values with a somewhat random distribution. Forces more Huffman coding
+     * and less string matching.
+     */
+    FILTERED (1),
+    
+    /**
+     * Compression strategy for Huffman coding only.
+     */
+    HUFFMAN_ONLY (2),
+    
+    /**
+     * Compression strategy to limit match distances to one
+     * (run-length encoding).
+     */
+    RLE (3),
+
+    /**
+     * Compression strategy to prevent the use of dynamic Huffman codes, 
+     * allowing for a simpler decoder for special applications.
+     */
+    FIXED (4),
+
+    /**
+     * Default compression strategy.
+     */
+    DEFAULT_STRATEGY (0);
+    
+    
+    private final int compressionStrategy;
+    
+    CompressionStrategy(int strategy) {
+      compressionStrategy = strategy;
+    }
+    
+    int compressionStrategy() {
+      return compressionStrategy;
+    }
+  };
+
+  /**
+   * The type of header for compressed data.
+   */
+  public static enum CompressionHeader {
+    /**
+     * No headers/trailers/checksums.
+     */
+    NO_HEADER (-15),
+    
+    /**
+     * Default headers/trailers/checksums.
+     */
+    DEFAULT_HEADER (15),
+    
+    /**
+     * Simple gzip headers/trailers.
+     */
+    GZIP_FORMAT (31);
+
+    private final int windowBits;
+    
+    CompressionHeader(int windowBits) {
+      this.windowBits = windowBits;
+    }
+    
+    public int windowBits() {
+      return windowBits;
+    }
+  }
+  
+  private static boolean nativeZlibLoaded = false;
+  
+  static {
+    if (NativeCodeLoader.isNativeCodeLoaded()) {
+      try {
+        // Initialize the native library
+        initIDs();
+        nativeZlibLoaded = true;
+      } catch (Throwable t) {
+        // Ignore failure to load/initialize native-zlib
+      }
+    }
+  }
+  
+  static boolean isNativeZlibLoaded() {
+    return nativeZlibLoaded;
+  }
+
+  /** 
+   * Creates a new compressor using the specified compression level.
+   * Compressed data will be generated in ZLIB format.
+   * 
+   * @param level Compression level #CompressionLevel
+   * @param strategy Compression strategy #CompressionStrategy
+   * @param header Compression header #CompressionHeader
+   * @param directBufferSize Size of the direct buffer to be used.
+   */
+  public ZlibCompressor(CompressionLevel level, CompressionStrategy strategy, 
+      CompressionHeader header, int directBufferSize) {
+    this.level = level;
+    this.strategy = strategy;
+    this.windowBits = header;
+    this.directBufferSize = directBufferSize;
+    
+    uncompressedDirectBuf = ByteBuffer.allocateDirect(directBufferSize);
+    compressedDirectBuf = ByteBuffer.allocateDirect(directBufferSize);
+    compressedDirectBuf.position(directBufferSize);
+    
+    stream = init(this.level.compressionLevel(), 
+                  this.strategy.compressionStrategy(), 
+                  this.windowBits.windowBits());
+  }
+  
+  /**
+   * Creates a new compressor with the default compression level.
+   * Compressed data will be generated in ZLIB format.
+   */
+  public ZlibCompressor() {
+    this(CompressionLevel.DEFAULT_COMPRESSION, 
+        CompressionStrategy.DEFAULT_STRATEGY, 
+        CompressionHeader.DEFAULT_HEADER, 
+        DEFAULT_DIRECT_BUFFER_SIZE);
+  }
+  
+  public synchronized void setInput(byte[] b, int off, int len) {
+    if (b== null) {
+      throw new NullPointerException();
+    }
+    if (off < 0 || len < 0 || off > b.length - len) {
+      throw new ArrayIndexOutOfBoundsException();
+    }
+    
+    this.userBuf = b;
+    this.userBufOff = off;
+    this.userBufLen = len;
+    setInputFromSavedData();
+    
+    // Reinitialize zlib's output direct buffer 
+    compressedDirectBuf.limit(directBufferSize);
+    compressedDirectBuf.position(directBufferSize);
+  }
+  
+  synchronized void setInputFromSavedData() {
+    uncompressedDirectBufOff = 0;
+    uncompressedDirectBufLen = userBufLen;
+    if (uncompressedDirectBufLen > directBufferSize) {
+      uncompressedDirectBufLen = directBufferSize;
+    }
+
+    // Reinitialize zlib's input direct buffer
+    uncompressedDirectBuf.rewind();
+    ((ByteBuffer)uncompressedDirectBuf).put(userBuf, userBufOff,  
+                                          uncompressedDirectBufLen);
+
+    // Note how much data is being fed to zlib
+    userBufOff += uncompressedDirectBufLen;
+    userBufLen -= uncompressedDirectBufLen;
+  }
+
+  public synchronized void setDictionary(byte[] b, int off, int len) {
+    if (stream == 0 || b == null) {
+      throw new NullPointerException();
+    }
+    if (off < 0 || len < 0 || off > b.length - len) {
+      throw new ArrayIndexOutOfBoundsException();
+    }
+    setDictionary(stream, b, off, len);
+  }
+
+  public boolean needsInput() {
+    // Consume remaining compressed data?
+    if (compressedDirectBuf.remaining() > 0) {
+      return false;
+    }
+
+    // Check if zlib has consumed all input
+    if (uncompressedDirectBufLen <= 0) {
+      // Check if we have consumed all user-input
+      if (userBufLen <= 0) {
+        return true;
+      } else {
+        setInputFromSavedData();
+      }
+    }
+    
+    return false;
+  }
+  
+  public synchronized void finish() {
+    finish = true;
+  }
+  
+  public synchronized boolean finished() {
+    // Check if 'zlib' says its 'finished' and
+    // all compressed data has been consumed
+    return (finished && compressedDirectBuf.remaining() == 0);
+  }
+
+  public synchronized int compress(byte[] b, int off, int len) 
+  throws IOException {
+    if (b == null) {
+      throw new NullPointerException();
+    }
+    if (off < 0 || len < 0 || off > b.length - len) {
+      throw new ArrayIndexOutOfBoundsException();
+    }
+    
+    int n = 0;
+    
+    // Check if there is compressed data
+    n = compressedDirectBuf.remaining();
+    if (n > 0) {
+      n = Math.min(n, len);
+      ((ByteBuffer)compressedDirectBuf).get(b, off, n);
+      return n;
+    }
+
+    // Re-initialize the zlib's output direct buffer
+    compressedDirectBuf.rewind();
+    compressedDirectBuf.limit(directBufferSize);
+
+    // Compress data
+    n = deflateBytesDirect();
+    compressedDirectBuf.limit(n);
+    
+    // Get atmost 'len' bytes
+    n = Math.min(n, len);
+    ((ByteBuffer)compressedDirectBuf).get(b, off, n);
+
+    return n;
+  }
+
+  /**
+   * Returns the total number of compressed bytes output so far.
+   *
+   * @return the total (non-negative) number of compressed bytes output so far
+   */
+  public synchronized long getBytesWritten() {
+    checkStream();
+    return getBytesWritten(stream);
+  }
+
+  /**
+   * Returns the total number of uncompressed bytes input so far.</p>
+   *
+   * @return the total (non-negative) number of uncompressed bytes input so far
+   */
+  public synchronized long getBytesRead() {
+    checkStream();
+    return getBytesRead(stream);
+  }
+
+  public synchronized void reset() {
+    checkStream();
+    reset(stream);
+    finish = false;
+    finished = false;
+    uncompressedDirectBuf.rewind();
+    uncompressedDirectBufOff = uncompressedDirectBufLen = 0;
+    compressedDirectBuf.limit(directBufferSize);
+    compressedDirectBuf.position(directBufferSize);
+    userBufOff = userBufLen = 0;
+  }
+  
+  public synchronized void end() {
+    if (stream != 0) {
+      end(stream);
+      stream = 0;
+    }
+  }
+  
+  private void checkStream() {
+    if (stream == 0)
+      throw new NullPointerException();
+  }
+  
+  private native static void initIDs();
+  private native static long init(int level, int strategy, int windowBits);
+  private native static void setDictionary(long strm, byte[] b, int off,
+       int len);
+  private native int deflateBytesDirect();
+  private native static long getBytesRead(long strm);
+  private native static long getBytesWritten(long strm);
+  private native static void reset(long strm);
+  private native static void end(long strm);
+}

Added: lucene/hadoop/trunk/src/java/org/apache/hadoop/io/compress/zlib/ZlibDecompressor.java
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/java/org/apache/hadoop/io/compress/zlib/ZlibDecompressor.java?view=auto&rev=475025
==============================================================================
--- lucene/hadoop/trunk/src/java/org/apache/hadoop/io/compress/zlib/ZlibDecompressor.java (added)
+++ lucene/hadoop/trunk/src/java/org/apache/hadoop/io/compress/zlib/ZlibDecompressor.java Tue Nov 14 14:35:22 2006
@@ -0,0 +1,287 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.io.compress.zlib;
+
+import java.io.IOException;
+import java.nio.Buffer;
+import java.nio.ByteBuffer;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.io.compress.Decompressor;
+import org.apache.hadoop.util.NativeCodeLoader;
+
+/**
+ * A {@link Decompressor} based on the popular 
+ * zlib compression algorithm.
+ * http://www.zlib.net/
+ * 
+ * @author Arun C Murthy
+ */
+public class ZlibDecompressor implements Decompressor {
+  private static final int DEFAULT_DIRECT_BUFFER_SIZE = 1*1024;
+  
+  private long stream;
+  private CompressionHeader header;
+  private int directBufferSize;
+  private Buffer compressedDirectBuf = null;
+  private int compressedDirectBufOff, compressedDirectBufLen;
+  private Buffer uncompressedDirectBuf = null;
+  private byte[] userBuf = null;
+  private int userBufOff = 0, userBufLen = 0;
+  private boolean finished;
+  private boolean needDict;
+
+  /**
+   * The headers to detect from compressed data.
+   */
+  public static enum CompressionHeader {
+    /**
+     * No headers/trailers/checksums.
+     */
+    NO_HEADER (-15),
+    
+    /**
+     * Default headers/trailers/checksums.
+     */
+    DEFAULT_HEADER (15),
+    
+    /**
+     * Simple gzip headers/trailers.
+     */
+    GZIP_FORMAT (31),
+    
+    /**
+     * Autodetect gzip/zlib headers/trailers.
+     */
+    AUTODETECT_GZIP_ZLIB (47);
+
+    private final int windowBits;
+    
+    CompressionHeader(int windowBits) {
+      this.windowBits = windowBits;
+    }
+    
+    public int windowBits() {
+      return windowBits;
+    }
+  }
+
+  private static boolean nativeZlibLoaded = false;
+  
+  static {
+    if (NativeCodeLoader.isNativeCodeLoaded()) {
+      try {
+        // Initialize the native library
+        initIDs();
+        nativeZlibLoaded = true;
+      } catch (Throwable t) {
+        // Ignore failure to load/initialize native-zlib
+      }
+    }
+  }
+  
+  static boolean isNativeZlibLoaded() {
+    return nativeZlibLoaded;
+  }
+
+  /**
+   * Creates a new decompressor.
+   */
+  public ZlibDecompressor(CompressionHeader header, int directBufferSize) {
+    this.header = header;
+    this.directBufferSize = directBufferSize;
+    compressedDirectBuf = ByteBuffer.allocateDirect(directBufferSize);
+    uncompressedDirectBuf = ByteBuffer.allocateDirect(directBufferSize);
+    uncompressedDirectBuf.position(directBufferSize);
+    
+    stream = init(this.header.windowBits());
+  }
+  
+  public ZlibDecompressor() {
+    this(CompressionHeader.DEFAULT_HEADER, DEFAULT_DIRECT_BUFFER_SIZE);
+  }
+
+  public synchronized void setInput(byte[] b, int off, int len) {
+    if (b == null) {
+      throw new NullPointerException();
+    }
+    if (off < 0 || len < 0 || off > b.length - len) {
+      throw new ArrayIndexOutOfBoundsException();
+    }
+  
+    this.userBuf = b;
+    this.userBufOff = off;
+    this.userBufLen = len;
+    
+    setInputFromSavedData();
+    
+    // Reinitialize zlib's output direct buffer 
+    uncompressedDirectBuf.limit(directBufferSize);
+    uncompressedDirectBuf.position(directBufferSize);
+  }
+  
+  synchronized void setInputFromSavedData() {
+    compressedDirectBufOff = 0;
+    compressedDirectBufLen = userBufLen;
+    if (compressedDirectBufLen > directBufferSize) {
+      compressedDirectBufLen = directBufferSize;
+    }
+
+    // Reinitialize zlib's input direct buffer
+    compressedDirectBuf.rewind();
+    ((ByteBuffer)compressedDirectBuf).put(userBuf, userBufOff, 
+                                        compressedDirectBufLen);
+    
+    // Note how much data is being fed to zlib
+    userBufOff += compressedDirectBufLen;
+    userBufLen -= compressedDirectBufLen;
+  }
+
+  public synchronized void setDictionary(byte[] b, int off, int len) {
+    if (stream == 0 || b == null) {
+      throw new NullPointerException();
+    }
+    if (off < 0 || len < 0 || off > b.length - len) {
+      throw new ArrayIndexOutOfBoundsException();
+    }
+    setDictionary(stream, b, off, len);
+    needDict = false;
+  }
+
+  public synchronized boolean needsInput() {
+    // Consume remanining compressed data?
+    if (uncompressedDirectBuf.remaining() > 0) {
+      return false;
+    }
+    
+    // Check if zlib has consumed all input
+    if (compressedDirectBufLen <= 0) {
+      // Check if we have consumed all user-input
+      if (userBufLen <= 0) {
+        return true;
+      } else {
+        setInputFromSavedData();
+      }
+    }
+    
+    return false;
+  }
+
+  public synchronized boolean needsDictionary() {
+    return needDict;
+  }
+
+  public synchronized boolean finished() {
+    // Check if 'zlib' says its 'finished' and
+    // all compressed data has been consumed
+    return (finished && uncompressedDirectBuf.remaining() == 0);
+  }
+
+  public synchronized int decompress(byte[] b, int off, int len) 
+  throws IOException {
+    if (b == null) {
+      throw new NullPointerException();
+    }
+    if (off < 0 || len < 0 || off > b.length - len) {
+      throw new ArrayIndexOutOfBoundsException();
+    }
+    
+    int n = 0;
+    
+    // Check if there is uncompressed data
+    n = uncompressedDirectBuf.remaining();
+    if(n > 0) {
+      n = Math.min(n, len);
+      ((ByteBuffer)uncompressedDirectBuf).get(b, off, n);
+      return n;
+    }
+    
+    // Re-initialize the zlib's output direct buffer
+    uncompressedDirectBuf.rewind();
+    uncompressedDirectBuf.limit(directBufferSize);
+
+    // Decompress data
+    n = inflateBytesDirect();
+    uncompressedDirectBuf.limit(n);
+
+    // Get atmost 'len' bytes
+    n = Math.min(n, len);
+    ((ByteBuffer)uncompressedDirectBuf).get(b, off, n);
+
+    return n;
+  }
+  
+  /**
+   * Returns the total number of compressed bytes output so far.
+   *
+   * @return the total (non-negative) number of compressed bytes output so far
+   */
+  public synchronized long getBytesWritten() {
+    checkStream();
+    return getBytesWritten(stream);
+  }
+
+  /**
+   * Returns the total number of uncompressed bytes input so far.</p>
+   *
+   * @return the total (non-negative) number of uncompressed bytes input so far
+   */
+  public synchronized long getBytesRead() {
+    checkStream();
+    return getBytesRead(stream);
+  }
+
+  public synchronized void reset() {
+    checkStream();
+    reset(stream);
+    finished = false;
+    needDict = false;
+    compressedDirectBufOff = compressedDirectBufLen = 0;
+    uncompressedDirectBuf.limit(directBufferSize);
+    uncompressedDirectBuf.position(directBufferSize);
+    userBufOff = userBufLen = 0;
+  }
+
+  public synchronized void end() {
+    if (stream != 0) {
+      end(stream);
+      stream = 0;
+    }
+  }
+
+  protected void finalize() {
+    end();
+  }
+  
+  private void checkStream() {
+    if (stream == 0)
+      throw new NullPointerException();
+  }
+  
+  private native static void initIDs();
+  private native static long init(int windowBits);
+  private native static void setDictionary(long strm, byte[] b, int off,
+       int len);
+  private native int inflateBytesDirect();
+  private native static long getBytesRead(long strm);
+  private native static long getBytesWritten(long strm);
+  private native static void reset(long strm);
+  private native static void end(long strm);
+}

Added: lucene/hadoop/trunk/src/java/org/apache/hadoop/io/compress/zlib/ZlibFactory.java
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/java/org/apache/hadoop/io/compress/zlib/ZlibFactory.java?view=auto&rev=475025
==============================================================================
--- lucene/hadoop/trunk/src/java/org/apache/hadoop/io/compress/zlib/ZlibFactory.java (added)
+++ lucene/hadoop/trunk/src/java/org/apache/hadoop/io/compress/zlib/ZlibFactory.java Tue Nov 14 14:35:22 2006
@@ -0,0 +1,82 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.io.compress.zlib;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.io.compress.Compressor;
+import org.apache.hadoop.io.compress.Decompressor;
+import org.apache.hadoop.util.NativeCodeLoader;
+
+/**
+ * A collection of factories to create the right 
+ * zlib/gzip compressor/decompressor instances.
+ * 
+ * @author Arun C Murthy
+ */
+public class ZlibFactory {
+  private static final Log LOG =
+    LogFactory.getLog("org.apache.hadoop.io.compress.zlib.ZlibFactory");
+
+  private static boolean nativeZlibLoaded = false;
+  
+  static {
+    if (NativeCodeLoader.isNativeCodeLoaded()) {
+      nativeZlibLoaded = ZlibCompressor.isNativeZlibLoaded() &&
+                          ZlibDecompressor.isNativeZlibLoaded();
+      
+      if (nativeZlibLoaded) {
+        LOG.info("Successfully loaded & initialized native-zlib library");
+      } else {
+        LOG.warn("Failed to load/initialize native-zlib library");
+      }
+    }
+  }
+  
+  /**
+   * Check if native-zlib code is loaded and initialized correctly.
+   * 
+   * @return <code>true</code> if native-zlib is loaded and initialized, 
+   *         else <code>false</code>
+   */
+  public static boolean isNativeZlibLoaded() {
+    return nativeZlibLoaded; 
+  }
+  
+  /**
+   * Return the appropriate implementation of the zlib compressor. 
+   * 
+   * @return the appropriate implementation of the zlib compressor.
+   */
+  public static Compressor getZlibCompressor() {
+    return (nativeZlibLoaded) ? 
+        new ZlibCompressor() : new BuiltInZlibDeflater(); 
+  }
+
+  /**
+   * Return the appropriate implementation of the zlib decompressor. 
+   * 
+   * @return the appropriate implementation of the zlib decompressor.
+   */
+  public static Decompressor getZlibDecompressor() {
+    return (nativeZlibLoaded) ? 
+        new ZlibDecompressor() : new BuiltInZlibInflater(); 
+  }
+  
+}

Added: lucene/hadoop/trunk/src/java/org/apache/hadoop/util/NativeCodeLoader.java
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/java/org/apache/hadoop/util/NativeCodeLoader.java?view=auto&rev=475025
==============================================================================
--- lucene/hadoop/trunk/src/java/org/apache/hadoop/util/NativeCodeLoader.java (added)
+++ lucene/hadoop/trunk/src/java/org/apache/hadoop/util/NativeCodeLoader.java Tue Nov 14 14:35:22 2006
@@ -0,0 +1,65 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.util;
+
+import org.apache.commons.logging.*;
+
+/**
+ * A helper to load the native hadoop code i.e. libhadoop.so.
+ * This handles the fallback to either the bundled libhadoop-Linux-i386-32.so
+ * or the the default java implementations where appropriate.
+ *  
+ * @author Arun C Murthy
+ */
+public class NativeCodeLoader {
+
+  private static final Log LOG =
+    LogFactory.getLog("org.apache.hadoop.util.NativeCodeLoader");
+  
+  private static boolean nativeCodeLoaded = false;
+  
+  static {
+    // Try to load native hadoop library and set fallback flag appropriately
+    LOG.debug("Trying to load the custom-built native-hadoop library...");
+    try {
+      System.loadLibrary("hadoop");
+      LOG.info("Loaded the native-hadoop library");
+      nativeCodeLoaded = true;
+    } catch (Throwable t) {
+      // Ignore failure to load
+      LOG.debug("Failed to load native-hadoop with error: " + t);
+    }
+    
+    if (!nativeCodeLoaded) {
+      LOG.warn("Unable to load native-hadoop library for your platform... " +
+      "using builtin-java classes where applicable");
+    }
+  }
+
+  /**
+   * Check if native-hadoop code is loaded for this platform.
+   * 
+   * @return <code>true</code> if native-hadoop is loaded, 
+   *         else <code>false</code>
+   */
+  public static boolean isNativeCodeLoaded() {
+    return nativeCodeLoaded;
+  }
+
+}

Added: lucene/hadoop/trunk/src/java/org/apache/hadoop/util/PlatformName.java
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/java/org/apache/hadoop/util/PlatformName.java?view=auto&rev=475025
==============================================================================
--- lucene/hadoop/trunk/src/java/org/apache/hadoop/util/PlatformName.java (added)
+++ lucene/hadoop/trunk/src/java/org/apache/hadoop/util/PlatformName.java Tue Nov 14 14:35:22 2006
@@ -0,0 +1,46 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.util;
+
+/**
+ * A helper class for getting build-info of the java-vm. 
+ * 
+ * @author Arun C Murthy
+ */
+public class PlatformName {
+  /**
+   * The complete platform 'name' to identify the platform as 
+   * per the java-vm.
+   */
+  private static final String platformName = System.getProperty("os.name") + "-" + 
+                                      System.getProperty("os.arch") + "-" +
+                                      System.getProperty("sun.arch.data.model");
+  
+  /**
+   * Get the complete platform as per the java-vm.
+   * @return returns the complete platform as per the java-vm.
+   */
+  public static String getPlatformName() {
+    return platformName;
+  }
+  
+  public static void main(String[] args) {
+    System.out.println(platformName);
+  }
+}

Modified: lucene/hadoop/trunk/src/java/org/apache/hadoop/util/ReflectionUtils.java
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/java/org/apache/hadoop/util/ReflectionUtils.java?view=diff&rev=475025&r1=475024&r2=475025
==============================================================================
--- lucene/hadoop/trunk/src/java/org/apache/hadoop/util/ReflectionUtils.java (original)
+++ lucene/hadoop/trunk/src/java/org/apache/hadoop/util/ReflectionUtils.java Tue Nov 14 14:35:22 2006
@@ -23,7 +23,6 @@
 import java.lang.management.*;
 
 import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.conf.*;
 import org.apache.hadoop.mapred.*;
 
@@ -35,6 +34,24 @@
     
     private static final Class[] emptyArray = new Class[]{};
 
+    /**
+     * Check and set 'configuration' if necessary.
+     * 
+     * @param theObject object for which to set configuration
+     * @param conf Configuration
+     */
+    public static void setConf(Object theObject, Configuration conf) {
+      if (conf != null) {
+        if (theObject instanceof Configurable) {
+            ((Configurable) theObject).setConf(conf);
+        }
+        if (conf instanceof JobConf && 
+                theObject instanceof JobConfigurable) {
+            ((JobConfigurable)theObject).configure((JobConf) conf);
+        }
+      }
+    }
+
     /** Create an object for the given class and initialize it from conf
      * 
      * @param theClass class of which an object is created
@@ -50,18 +67,10 @@
         } catch (Exception e) {
             throw new RuntimeException(e);
         }
-        if (conf != null) {
-            if (result instanceof Configurable) {
-                ((Configurable) result).setConf(conf);
-            }
-            if (conf instanceof JobConf && 
-                    result instanceof JobConfigurable) {
-                ((JobConfigurable)result).configure((JobConf) conf);
-            }
-        }
+        setConf(result, conf);
         return result;
     }
-    
+
     static private ThreadMXBean threadBean = 
       ManagementFactory.getThreadMXBean();
     

Added: lucene/hadoop/trunk/src/native/.autom4te.cfg
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/native/.autom4te.cfg?view=auto&rev=475025
==============================================================================
--- lucene/hadoop/trunk/src/native/.autom4te.cfg (added)
+++ lucene/hadoop/trunk/src/native/.autom4te.cfg Tue Nov 14 14:35:22 2006
@@ -0,0 +1,42 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+#
+# autom4te configuration for hadoop-native library
+#
+
+begin-language: "Autoheader-preselections"
+args: --no-cache 
+end-language: "Autoheader-preselections"
+
+begin-language: "Automake-preselections"
+args: --no-cache 
+end-language: "Automake-preselections"
+
+begin-language: "Autoreconf-preselections"
+args: --no-cache 
+end-language: "Autoreconf-preselections"
+
+begin-language: "Autoconf-without-aclocal-m4"
+args: --no-cache 
+end-language: "Autoconf-without-aclocal-m4"
+
+begin-language: "Autoconf"
+args: --no-cache 
+end-language: "Autoconf"
+

Added: lucene/hadoop/trunk/src/native/AUTHORS
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/native/AUTHORS?view=auto&rev=475025
==============================================================================
--- lucene/hadoop/trunk/src/native/AUTHORS (added)
+++ lucene/hadoop/trunk/src/native/AUTHORS Tue Nov 14 14:35:22 2006
@@ -0,0 +1,3 @@
+Arun C Murthy <ar...@yahoo-inc.com>
+  * Initial version
+