You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hbase.apache.org by st...@apache.org on 2008/11/04 02:28:53 UTC

svn commit: r711155 [1/2] - in /hadoop/hbase/trunk: ./ src/java/org/apache/hadoop/hbase/client/ src/java/org/apache/hadoop/hbase/io/ src/java/org/apache/hadoop/hbase/regionserver/ src/test/org/apache/hadoop/hbase/ src/test/org/apache/hadoop/hbase/regio...

Author: stack
Date: Mon Nov  3 17:28:53 2008
New Revision: 711155

URL: http://svn.apache.org/viewvc?rev=711155&view=rev
Log:
HBASE-975  Improve MapFile performance for start and end key

Added:
    hadoop/hbase/trunk/src/java/org/apache/hadoop/hbase/io/BloomFilterMapFile.java
    hadoop/hbase/trunk/src/java/org/apache/hadoop/hbase/io/HBaseMapFile.java
    hadoop/hbase/trunk/src/java/org/apache/hadoop/hbase/io/HalfMapFileReader.java
    hadoop/hbase/trunk/src/java/org/apache/hadoop/hbase/io/Reference.java
Modified:
    hadoop/hbase/trunk/CHANGES.txt
    hadoop/hbase/trunk/src/java/org/apache/hadoop/hbase/client/HTable.java
    hadoop/hbase/trunk/src/java/org/apache/hadoop/hbase/io/BlockFSInputStream.java
    hadoop/hbase/trunk/src/java/org/apache/hadoop/hbase/io/ImmutableBytesWritable.java
    hadoop/hbase/trunk/src/java/org/apache/hadoop/hbase/regionserver/HRegion.java
    hadoop/hbase/trunk/src/java/org/apache/hadoop/hbase/regionserver/HStore.java
    hadoop/hbase/trunk/src/java/org/apache/hadoop/hbase/regionserver/HStoreFile.java
    hadoop/hbase/trunk/src/test/org/apache/hadoop/hbase/HBaseClusterTestCase.java
    hadoop/hbase/trunk/src/test/org/apache/hadoop/hbase/regionserver/TestHStoreFile.java
    hadoop/hbase/trunk/src/test/org/apache/hadoop/hbase/regionserver/TestSplit.java

Modified: hadoop/hbase/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/hadoop/hbase/trunk/CHANGES.txt?rev=711155&r1=711154&r2=711155&view=diff
==============================================================================
--- hadoop/hbase/trunk/CHANGES.txt (original)
+++ hadoop/hbase/trunk/CHANGES.txt Mon Nov  3 17:28:53 2008
@@ -96,6 +96,7 @@
                instead (requires hadoop 0.19)
    HBASE-81    When a scanner lease times out, throw a more "user friendly" exception
    HBASE-978   Remove BloomFilterDescriptor. It is no longer used.
+   HBASE-975   Improve MapFile performance for start and end key
         
   NEW FEATURES
    HBASE-875   Use MurmurHash instead of JenkinsHash [in bloomfilters]

Modified: hadoop/hbase/trunk/src/java/org/apache/hadoop/hbase/client/HTable.java
URL: http://svn.apache.org/viewvc/hadoop/hbase/trunk/src/java/org/apache/hadoop/hbase/client/HTable.java?rev=711155&r1=711154&r2=711155&view=diff
==============================================================================
--- hadoop/hbase/trunk/src/java/org/apache/hadoop/hbase/client/HTable.java (original)
+++ hadoop/hbase/trunk/src/java/org/apache/hadoop/hbase/client/HTable.java Mon Nov  3 17:28:53 2008
@@ -225,7 +225,6 @@
    * @return Array of region starting row keys
    * @throws IOException
    */
-  @SuppressWarnings("null")
   public byte[][] getStartKeys() throws IOException {
     final List<byte[]> keyList = new ArrayList<byte[]>();
 
@@ -1154,7 +1153,6 @@
     private byte[][] columns;
     private byte [] startRow;
     protected long scanTime;
-    @SuppressWarnings("hiding")
     private boolean closed = false;
     private HRegionInfo currentRegion = null;
     private ScannerCallable callable = null;

Modified: hadoop/hbase/trunk/src/java/org/apache/hadoop/hbase/io/BlockFSInputStream.java
URL: http://svn.apache.org/viewvc/hadoop/hbase/trunk/src/java/org/apache/hadoop/hbase/io/BlockFSInputStream.java?rev=711155&r1=711154&r2=711155&view=diff
==============================================================================
--- hadoop/hbase/trunk/src/java/org/apache/hadoop/hbase/io/BlockFSInputStream.java (original)
+++ hadoop/hbase/trunk/src/java/org/apache/hadoop/hbase/io/BlockFSInputStream.java Mon Nov  3 17:28:53 2008
@@ -59,7 +59,6 @@
    * @param fileLength
    * @param blockSize the size of each block in bytes.
    */
-  @SuppressWarnings({"unchecked",  "serial"})
   public BlockFSInputStream(InputStream in, long fileLength, int blockSize) {
     this.in = in;
     if (!(in instanceof Seekable) || !(in instanceof PositionedReadable)) {
@@ -157,12 +156,12 @@
     long blockLength = targetBlockEnd - targetBlockStart + 1;
     long offsetIntoBlock = target - targetBlockStart;
 
-    byte[] block = blocks.get(targetBlockStart);
+    byte[] block = blocks.get(Long.valueOf(targetBlockStart));
     if (block == null) {
       block = new byte[blockSize];
       ((PositionedReadable) in).readFully(targetBlockStart, block, 0,
           (int) blockLength);
-      blocks.put(targetBlockStart, block);
+      blocks.put(Long.valueOf(targetBlockStart), block);
     }
     
     this.pos = target;

Added: hadoop/hbase/trunk/src/java/org/apache/hadoop/hbase/io/BloomFilterMapFile.java
URL: http://svn.apache.org/viewvc/hadoop/hbase/trunk/src/java/org/apache/hadoop/hbase/io/BloomFilterMapFile.java?rev=711155&view=auto
==============================================================================
--- hadoop/hbase/trunk/src/java/org/apache/hadoop/hbase/io/BloomFilterMapFile.java (added)
+++ hadoop/hbase/trunk/src/java/org/apache/hadoop/hbase/io/BloomFilterMapFile.java Mon Nov  3 17:28:53 2008
@@ -0,0 +1,231 @@
+/**
+ * Copyright 2008 The Apache Software Foundation
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.io;
+
+import java.io.FileNotFoundException;
+import java.io.IOException;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FSDataInputStream;
+import org.apache.hadoop.fs.FSDataOutputStream;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hbase.HRegionInfo;
+import org.apache.hadoop.hbase.util.Hash;
+import org.apache.hadoop.io.SequenceFile;
+import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.io.WritableComparable;
+import org.onelab.filter.BloomFilter;
+
+/**
+ * On write, all keys are added to a bloom filter.  On read, all keys are
+ * tested first against bloom filter. Keys are HStoreKey.  If passed bloom
+ * filter is null, just passes invocation to parent.
+ */
+public class BloomFilterMapFile extends HBaseMapFile {
+  private static final Log LOG = LogFactory.getLog(BloomFilterMapFile.class);
+  protected static final String BLOOMFILTER_FILE_NAME = "filter";
+
+  public static class Reader extends HBaseReader {
+    private final BloomFilter bloomFilter;
+
+    /**
+     * @param fs
+     * @param dirName
+     * @param conf
+     * @param filter
+     * @param blockCacheEnabled
+     * @param hri
+     * @throws IOException
+     */
+    public Reader(FileSystem fs, String dirName, Configuration conf,
+        final boolean filter, final boolean blockCacheEnabled, 
+        HRegionInfo hri)
+    throws IOException {
+      super(fs, dirName, conf, blockCacheEnabled, hri);
+      if (filter) {
+        this.bloomFilter = loadBloomFilter(fs, dirName);
+      } else {
+        this.bloomFilter = null;
+      }
+    }
+
+    private BloomFilter loadBloomFilter(FileSystem fs, String dirName)
+    throws IOException {
+      Path filterFile = new Path(dirName, BLOOMFILTER_FILE_NAME);
+      if(!fs.exists(filterFile)) {
+        throw new FileNotFoundException("Could not find bloom filter: " +
+            filterFile);
+      }
+      BloomFilter filter = new BloomFilter();
+      FSDataInputStream in = fs.open(filterFile);
+      try {
+        filter.readFields(in);
+      } finally {
+        in.close();
+      }
+      return filter;
+    }
+    
+    @Override
+    public Writable get(WritableComparable key, Writable val)
+    throws IOException {
+      if (bloomFilter == null) {
+        return super.get(key, val);
+      }
+      if(bloomFilter.membershipTest(getBloomFilterKey(key))) {
+        if (LOG.isDebugEnabled()) {
+          LOG.debug("bloom filter reported that key exists");
+        }
+        return super.get(key, val);
+      }
+      if (LOG.isDebugEnabled()) {
+        LOG.debug("bloom filter reported that key does not exist");
+      }
+      return null;
+    }
+
+    @Override
+    public WritableComparable getClosest(WritableComparable key,
+        Writable val) throws IOException {
+      if (bloomFilter == null) {
+        return super.getClosest(key, val);
+      }
+      // Note - the key being passed to us is always a HStoreKey
+      if(bloomFilter.membershipTest(getBloomFilterKey(key))) {
+        if (LOG.isDebugEnabled()) {
+          LOG.debug("bloom filter reported that key exists");
+        }
+        return super.getClosest(key, val);
+      }
+      if (LOG.isDebugEnabled()) {
+        LOG.debug("bloom filter reported that key does not exist");
+      }
+      return null;
+    }
+    
+    /**
+     * @return size of the bloom filter
+     */
+   public int getBloomFilterSize() {
+      return bloomFilter == null ? 0 : bloomFilter.getVectorSize();
+    }
+  }
+  
+  public static class Writer extends HBaseWriter {
+    private static final double DEFAULT_NUMBER_OF_HASH_FUNCTIONS = 4.0;
+    private final BloomFilter bloomFilter;
+    private final String dirName;
+    private final FileSystem fs;
+    
+    /**
+     * @param conf
+     * @param fs
+     * @param dirName
+     * @param compression
+     * @param filter
+     * @param nrows
+     * @param hri
+     * @throws IOException
+     */
+    @SuppressWarnings("unchecked")
+    public Writer(Configuration conf, FileSystem fs, String dirName,
+      SequenceFile.CompressionType compression, final boolean filter,
+      int nrows, final HRegionInfo hri)
+    throws IOException {
+      super(conf, fs, dirName, compression, hri);
+      this.dirName = dirName;
+      this.fs = fs;
+      if (filter) {
+        /* 
+         * There is no way to automatically determine the vector size and the
+         * number of hash functions to use. In particular, bloom filters are
+         * very sensitive to the number of elements inserted into them. For
+         * HBase, the number of entries depends on the size of the data stored
+         * in the column. Currently the default region size is 256MB, so the
+         * number of entries is approximately 
+         * 256MB / (average value size for column).
+         * 
+         * If m denotes the number of bits in the Bloom filter (vectorSize),
+         * n denotes the number of elements inserted into the Bloom filter and
+         * k represents the number of hash functions used (nbHash), then
+         * according to Broder and Mitzenmacher,
+         * 
+         * ( http://www.eecs.harvard.edu/~michaelm/NEWWORK/postscripts/BloomFilterSurvey.pdf )
+         * 
+         * the probability of false positives is minimized when k is
+         * approximately m/n ln(2).
+         * 
+         * If we fix the number of hash functions and know the number of
+         * entries, then the optimal vector size m = (k * n) / ln(2)
+         */
+        this.bloomFilter = new BloomFilter(
+            (int) Math.ceil(
+                (DEFAULT_NUMBER_OF_HASH_FUNCTIONS * (1.0 * nrows)) /
+                Math.log(2.0)),
+            (int) DEFAULT_NUMBER_OF_HASH_FUNCTIONS,
+            Hash.getHashType(conf)
+        );
+      } else {
+        this.bloomFilter = null;
+      }
+    }
+
+    @Override
+    public void append(WritableComparable key, Writable val)
+    throws IOException {
+      if (bloomFilter != null) {
+        bloomFilter.add(getBloomFilterKey(key));
+      }
+      super.append(key, val);
+    }
+
+    @Override
+    public synchronized void close() throws IOException {
+      super.close();
+      if (this.bloomFilter != null) {
+        flushBloomFilter();
+      }
+    }
+    
+    /**
+     * Flushes bloom filter to disk
+     * 
+     * @throws IOException
+     */
+    private void flushBloomFilter() throws IOException {
+      if (LOG.isDebugEnabled()) {
+        LOG.debug("flushing bloom filter for " + this.dirName);
+      }
+      FSDataOutputStream out =
+        fs.create(new Path(dirName, BLOOMFILTER_FILE_NAME));
+      try {
+        bloomFilter.write(out);
+      } finally {
+        out.close();
+      }
+      if (LOG.isDebugEnabled()) {
+        LOG.debug("flushed bloom filter for " + this.dirName);
+      }
+    }
+  }
+}
\ No newline at end of file

Added: hadoop/hbase/trunk/src/java/org/apache/hadoop/hbase/io/HBaseMapFile.java
URL: http://svn.apache.org/viewvc/hadoop/hbase/trunk/src/java/org/apache/hadoop/hbase/io/HBaseMapFile.java?rev=711155&view=auto
==============================================================================
--- hadoop/hbase/trunk/src/java/org/apache/hadoop/hbase/io/HBaseMapFile.java (added)
+++ hadoop/hbase/trunk/src/java/org/apache/hadoop/hbase/io/HBaseMapFile.java Mon Nov  3 17:28:53 2008
@@ -0,0 +1,219 @@
+/**
+ * Copyright 2008 The Apache Software Foundation
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.io;
+
+import java.io.IOException;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FSDataInputStream;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hbase.HRegionInfo;
+import org.apache.hadoop.hbase.HStoreKey;
+import org.apache.hadoop.hbase.util.Writables;
+import org.apache.hadoop.io.MapFile;
+import org.apache.hadoop.io.SequenceFile;
+import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.io.WritableComparable;
+import org.onelab.filter.Key;
+
+/**
+ * Hbase customizations of MapFile.
+ */
+public class HBaseMapFile extends MapFile {
+  private static final Log LOG = LogFactory.getLog(HBaseMapFile.class);
+  public static final Class<? extends Writable>  VALUE_CLASS =
+    ImmutableBytesWritable.class;
+
+  /**
+   * Custom bloom filter key maker.
+   * @param key
+   * @return Key made of bytes of row only.
+   */
+  protected static Key getBloomFilterKey(WritableComparable key) {
+    return new Key(((HStoreKey) key).getRow());
+  }
+
+  /**
+   * A reader capable of reading and caching blocks of the data file.
+   */
+  public static class HBaseReader extends MapFile.Reader {
+    private final boolean blockCacheEnabled;
+    private final HStoreKey firstKey;
+    private final HStoreKey finalKey;
+    private final String dirName;
+
+    /**
+     * @param fs
+     * @param dirName
+     * @param conf
+     * @param hri
+     * @throws IOException
+     */
+    public HBaseReader(FileSystem fs, String dirName, Configuration conf,
+      HRegionInfo hri)
+    throws IOException {
+      this(fs, dirName, conf, false, hri);
+    }
+    
+    /**
+     * @param fs
+     * @param dirName
+     * @param conf
+     * @param blockCacheEnabled
+     * @param hri
+     * @throws IOException
+     */
+    public HBaseReader(FileSystem fs, String dirName, Configuration conf,
+        boolean blockCacheEnabled, HRegionInfo hri)
+    throws IOException {
+      super(fs, dirName, new HStoreKey.HStoreKeyWritableComparator(hri), 
+          conf, false); // defer opening streams
+      this.dirName = dirName;
+      this.blockCacheEnabled = blockCacheEnabled;
+      open(fs, dirName, new HStoreKey.HStoreKeyWritableComparator(hri), conf);
+      
+      // Force reading of the mapfile index by calling midKey.
+      // Reading the index will bring the index into memory over
+      // here on the client and then close the index file freeing
+      // up socket connection and resources in the datanode. 
+      // Usually, the first access on a MapFile.Reader will load the
+      // index force the issue in HStoreFile MapFiles because an
+      // access may not happen for some time; meantime we're
+      // using up datanode resources.  See HADOOP-2341.
+      // Of note, midKey just goes to index.  Does not seek.
+      midKey();
+
+      // Read in the first and last key.  Cache them.  Make sure we are at start
+      // of the file.
+      reset();
+      HStoreKey key = new HStoreKey();
+      super.next(key, new ImmutableBytesWritable());
+      key.setHRegionInfo(hri);
+      this.firstKey = key;
+      // Set us back to start of file.  Call to finalKey restores whatever
+      // the previous position.
+      reset();
+
+      // Get final key.
+      key = new HStoreKey();
+      super.finalKey(key);
+      key.setHRegionInfo(hri);
+      this.finalKey = key;
+    }
+
+    @Override
+    protected org.apache.hadoop.io.SequenceFile.Reader createDataFileReader(
+        FileSystem fs, Path dataFile, Configuration conf)
+    throws IOException {
+      if (!blockCacheEnabled) {
+        return super.createDataFileReader(fs, dataFile, conf);
+      }
+      final int blockSize = conf.getInt("hbase.hstore.blockCache.blockSize",
+          64 * 1024);
+      return new SequenceFile.Reader(fs, dataFile,  conf) {
+        @Override
+        protected FSDataInputStream openFile(FileSystem fs, Path file,
+            int bufferSize, long length) throws IOException {
+          
+          return new FSDataInputStream(new BlockFSInputStream(
+                  super.openFile(fs, file, bufferSize, length), length,
+                  blockSize));
+        }
+      };
+    }
+    
+    @Override
+    public synchronized void finalKey(final WritableComparable fk)
+    throws IOException {
+      Writables.copyWritable(this.finalKey, fk);
+    }
+    
+    /**
+     * @param hsk
+     * @return True if the file *could* contain <code>hsk</code> and false if
+     * outside the bounds of this files' start and end keys.
+     */
+    public boolean containsKey(final HStoreKey hsk) {
+      return this.firstKey.compareTo(hsk) <= 0 &&
+         this.finalKey.compareTo(hsk) >= 0;
+    }
+    
+    public String toString() {
+      HStoreKey midkey = null;
+      try {
+        midkey = (HStoreKey)midKey();
+      } catch (IOException ioe) {
+        LOG.warn("Failed get of midkey", ioe);
+      }
+      return "dirName=" + this.dirName + ", firstKey=" +
+        this.firstKey.toString() + ", midKey=" + midkey +
+          ", finalKey=" + this.finalKey;
+    }
+    
+    /**
+     * @return First key in this file.  Can be null around construction time.
+     */
+    public HStoreKey getFirstKey() {
+      return this.firstKey;
+    }
+
+    /**
+     * @return Final key in file.  Can be null around construction time.
+     */
+    public HStoreKey getFinalKey() {
+      return this.finalKey;
+    }
+    
+    @Override
+    public synchronized WritableComparable getClosest(WritableComparable key,
+        Writable value, boolean before)
+    throws IOException {
+      if ((!before && ((HStoreKey)key).compareTo(this.finalKey) > 0) ||
+          (before && ((HStoreKey)key).compareTo(this.firstKey) < 0)) {
+        return null;
+      }
+      return  super.getClosest(key, value, before);
+    }
+  }
+  
+  public static class HBaseWriter extends MapFile.Writer {
+    /**
+     * @param conf
+     * @param fs
+     * @param dirName
+     * @param compression
+     * @param hri
+     * @throws IOException
+     */
+    public HBaseWriter(Configuration conf, FileSystem fs, String dirName,
+        SequenceFile.CompressionType compression, final HRegionInfo hri)
+    throws IOException {
+      super(conf, fs, dirName, new HStoreKey.HStoreKeyWritableComparator(hri),
+         VALUE_CLASS, compression);
+      // Default for mapfiles is 128.  Makes random reads faster if we
+      // have more keys indexed and we're not 'next'-ing around in the
+      // mapfile.
+      setIndexInterval(conf.getInt("hbase.io.index.interval", 128));
+    }
+  }
+}
\ No newline at end of file

Added: hadoop/hbase/trunk/src/java/org/apache/hadoop/hbase/io/HalfMapFileReader.java
URL: http://svn.apache.org/viewvc/hadoop/hbase/trunk/src/java/org/apache/hadoop/hbase/io/HalfMapFileReader.java?rev=711155&view=auto
==============================================================================
--- hadoop/hbase/trunk/src/java/org/apache/hadoop/hbase/io/HalfMapFileReader.java (added)
+++ hadoop/hbase/trunk/src/java/org/apache/hadoop/hbase/io/HalfMapFileReader.java Mon Nov  3 17:28:53 2008
@@ -0,0 +1,228 @@
+/**
+ * Copyright 2008 The Apache Software Foundation
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.io;
+
+import java.io.IOException;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.hbase.HRegionInfo;
+import org.apache.hadoop.hbase.HStoreKey;
+import org.apache.hadoop.hbase.io.Reference.Range;
+import org.apache.hadoop.hbase.util.Writables;
+import org.apache.hadoop.io.MapFile;
+import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.io.WritableComparable;
+
+/**
+ * A facade for a {@link MapFile.Reader} that serves up either the top or
+ * bottom half of a MapFile where 'bottom' is the first half of the file
+ * containing the keys that sort lowest and 'top' is the second half of the
+ * file with keys that sort greater than those of the bottom half.  The top
+ * includes the split files midkey, of the key that follows if it does not
+ * exist in the file.
+ * 
+ * <p>This type works in tandem with the {@link Reference} type.  This class
+ * is used reading while Reference is used writing.
+ * 
+ * <p>This file is not splitable.  Calls to {@link #midKey()} return null.
+ */
+public class HalfMapFileReader extends BloomFilterMapFile.Reader {
+  private final boolean top;
+  private final HStoreKey midkey;
+  private boolean firstNextCall = true;
+  private final WritableComparable<HStoreKey> firstKey;
+  private final WritableComparable<HStoreKey> finalKey;
+  
+  public HalfMapFileReader(final FileSystem fs, final String dirName, 
+      final Configuration conf, final Range r,
+      final WritableComparable<HStoreKey> mk,
+      final HRegionInfo hri)
+  throws IOException {
+    this(fs, dirName, conf, r, mk, false, false, hri);
+  }
+  
+  @SuppressWarnings("unchecked")
+  public HalfMapFileReader(final FileSystem fs, final String dirName, 
+      final Configuration conf, final Range r,
+      final WritableComparable<HStoreKey> mk, final boolean filter,
+      final boolean blockCacheEnabled,
+      final HRegionInfo hri)
+  throws IOException {
+    super(fs, dirName, conf, filter, blockCacheEnabled, hri);
+    // This is not actual midkey for this half-file; its just border
+    // around which we split top and bottom.  Have to look in files to find
+    // actual last and first keys for bottom and top halves.  Half-files don't
+    // have an actual midkey themselves. No midkey is how we indicate file is
+    // not splittable.
+    this.midkey = new HStoreKey((HStoreKey)mk);
+    this.midkey.setHRegionInfo(hri);
+    // Is it top or bottom half?
+    this.top = Reference.isTopFileRegion(r);
+    // Firstkey comes back null if passed midkey is lower than first key in file
+    // and its a bottom half HalfMapFileReader, OR, if midkey is higher than
+    // the final key in the backing file.  In either case, it means this half
+    // file is empty.
+    this.firstKey = this.top?
+      super.getClosest(this.midkey, new ImmutableBytesWritable()):
+      super.getFirstKey().compareTo(this.midkey) > 0?
+        null: super.getFirstKey();
+    this.finalKey = this.top?
+      super.getFinalKey():
+      super.getClosest(new HStoreKey.BeforeThisStoreKey(this.midkey),
+          new ImmutableBytesWritable(), true);
+  }
+  
+  /*
+   * Check key is not bleeding into wrong half of the file.
+   * @param key
+   * @throws IOException
+   */
+  private void checkKey(final WritableComparable<HStoreKey> key)
+  throws IOException {
+    if (top) {
+      if (key.compareTo(midkey) < 0) {
+        throw new IOException("Illegal Access: Key is less than midKey of " +
+        "backing mapfile");
+      }
+    } else if (key.compareTo(midkey) >= 0) {
+      throw new IOException("Illegal Access: Key is greater than or equal " +
+      "to midKey of backing mapfile");
+    }
+  }
+
+  @SuppressWarnings("unchecked")
+  @Override
+  public synchronized void finalKey(WritableComparable key)
+  throws IOException {
+     Writables.copyWritable(this.finalKey, key);
+     return;
+  }
+
+  @SuppressWarnings("unchecked")
+  @Override
+  public synchronized Writable get(WritableComparable key, Writable val)
+  throws IOException {
+    checkKey(key);
+    return super.get(key, val);
+  }
+
+  @SuppressWarnings("unchecked")
+  @Override
+  public synchronized WritableComparable getClosest(WritableComparable key,
+    Writable val)
+  throws IOException {
+    WritableComparable closest = null;
+    if (top) {
+      // If top, the lowest possible key is first key.  Do not have to check
+      // what comes back from super getClosest.  Will return exact match or
+      // greater.
+      closest = (key.compareTo(getFirstKey()) < 0)?
+        getClosest(getFirstKey(), val): super.getClosest(key, val);
+    } else {
+      // We're serving bottom of the file.
+      if (key.compareTo(this.midkey) < 0) {
+        // Check key is within range for bottom.
+        closest = super.getClosest(key, val);
+        // midkey was made against largest store file at time of split. Smaller
+        // store files could have anything in them.  Check return value is
+        // not beyond the midkey (getClosest returns exact match or next
+        // after).
+        if (closest != null && closest.compareTo(this.midkey) >= 0) {
+          // Don't let this value out.
+          closest = null;
+        }
+      }
+      // Else, key is > midkey so let out closest = null.
+    }
+    return closest;
+  }
+
+  @SuppressWarnings({"unused", "unchecked"})
+  @Override
+  public synchronized WritableComparable midKey() throws IOException {
+    // Returns null to indicate file is not splitable.
+    return null;
+  }
+
+  @SuppressWarnings("unchecked")
+  @Override
+  public synchronized boolean next(WritableComparable key, Writable val)
+  throws IOException {
+    if (firstNextCall) {
+      firstNextCall = false;
+      if (isEmpty()) {
+        return false;
+      }
+      // Seek and fill by calling getClosest on first key.
+      if (this.firstKey != null) {
+        Writables.copyWritable(this.firstKey, key);
+        WritableComparable nearest = getClosest(key, val);
+        if (!key.equals(nearest)) {
+          throw new IOException("Keys don't match and should: " +
+              key.toString() + ", " + nearest.toString());
+        }
+      }
+      return true;
+    }
+    boolean result = super.next(key, val);
+    if (!top && key.compareTo(midkey) >= 0) {
+      result = false;
+    }
+    return result;
+  }
+  
+  private boolean isEmpty() {
+    return this.firstKey == null;
+  }
+
+  @Override
+  public synchronized void reset() throws IOException {
+    if (top) {
+      firstNextCall = true;
+      // I don't think this is needed. seek(this.firstKey);
+      return;
+    }
+    super.reset();
+  }
+
+  @SuppressWarnings("unchecked")
+  @Override
+  public synchronized boolean seek(WritableComparable key)
+  throws IOException {
+    checkKey(key);
+    return super.seek(key);
+  }
+  
+  @Override
+  public HStoreKey getFirstKey() {
+    return (HStoreKey)this.firstKey;
+  }
+  
+  @Override
+  public HStoreKey getFinalKey() {
+    return (HStoreKey)this.finalKey;
+  }
+  
+  @Override
+  public String toString() {
+    return super.toString() + ", half=" + (top? "top": "bottom");
+  }
+}
\ No newline at end of file

Modified: hadoop/hbase/trunk/src/java/org/apache/hadoop/hbase/io/ImmutableBytesWritable.java
URL: http://svn.apache.org/viewvc/hadoop/hbase/trunk/src/java/org/apache/hadoop/hbase/io/ImmutableBytesWritable.java?rev=711155&r1=711154&r2=711155&view=diff
==============================================================================
--- hadoop/hbase/trunk/src/java/org/apache/hadoop/hbase/io/ImmutableBytesWritable.java (original)
+++ hadoop/hbase/trunk/src/java/org/apache/hadoop/hbase/io/ImmutableBytesWritable.java Mon Nov  3 17:28:53 2008
@@ -79,7 +79,7 @@
    * Get the data from the BytesWritable.
    * @return The data is only valid between 0 and getSize() - 1.
    */
-  public byte[] get() {
+  public byte [] get() {
     if (this.bytes == null) {
       throw new IllegalStateException("Uninitialiized. Null constructor " +
         "called w/o accompaying readFields invocation");

Added: hadoop/hbase/trunk/src/java/org/apache/hadoop/hbase/io/Reference.java
URL: http://svn.apache.org/viewvc/hadoop/hbase/trunk/src/java/org/apache/hadoop/hbase/io/Reference.java?rev=711155&view=auto
==============================================================================
--- hadoop/hbase/trunk/src/java/org/apache/hadoop/hbase/io/Reference.java (added)
+++ hadoop/hbase/trunk/src/java/org/apache/hadoop/hbase/io/Reference.java Mon Nov  3 17:28:53 2008
@@ -0,0 +1,117 @@
+/**
+ * 
+ */
+package org.apache.hadoop.hbase.io;
+
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+
+import org.apache.hadoop.hbase.HStoreKey;
+import org.apache.hadoop.io.Writable;
+
+/**
+ * A reference to a part of a store file.  The file referenced usually lives
+ * under a different region.  The part referenced is usually the top or bottom
+ * half of the file.  References are made at region split time.  Being lazy
+ * about copying data between the parent of the split and the split daughters
+ * makes splitting faster.
+ * 
+ * <p>References work with {@link HalfMapFileReader}.  References know how to
+ * write out the reference format in the file system and are whats juggled when
+ * references are mixed in with direct store files.  The
+ * {@link HalfMapFileReader} is used reading the referred to file.
+ *
+ * <p>References to store files located over in some other region look like
+ * this in the file system
+ * <code>1278437856009925445.hbaserepository,qAReLZD-OyQORZWq_vqR1k==,959247014679548184</code>:
+ * i.e. an id followed by the name of the referenced region.  The data
+ * ('mapfiles') of references are empty. The accompanying <code>info</code> file
+ * contains the <code>midkey</code> that demarks top and bottom of the
+ * referenced storefile, the id of the remote store we're referencing and
+ * whether we're to serve the top or bottom region of the remote store file.
+ * Note, a region is itself not splitable if it has instances of store file
+ * references.  References are cleaned up by compactions.
+ */
+public class Reference implements Writable {
+  // TODO: see if it makes sense making a ReferenceMapFile whose Writer is this
+  // class and whose Reader is the {@link HalfMapFileReader}.
+
+  private int encodedRegionName;
+  private long fileid;
+  private Range region;
+  private HStoreKey midkey;
+  
+  /** 
+   * For split HStoreFiles, it specifies if the file covers the lower half or
+   * the upper half of the key range
+   */
+  public static enum Range {
+    /** HStoreFile contains upper half of key range */
+    top,
+    /** HStoreFile contains lower half of key range */
+    bottom
+  }
+  
+ public Reference(final int ern, final long fid, final HStoreKey m,
+      final Range fr) {
+    this.encodedRegionName = ern;
+    this.fileid = fid;
+    this.region = fr;
+    this.midkey = m;
+  }
+  
+ public Reference() {
+    this(-1, -1, null, Range.bottom);
+  }
+
+  public long getFileId() {
+    return fileid;
+  }
+
+  public Range getFileRegion() {
+    return region;
+  }
+  
+  public HStoreKey getMidkey() {
+    return midkey;
+  }
+  
+  public int getEncodedRegionName() {
+    return this.encodedRegionName;
+  }
+
+  @Override
+  public String toString() {
+    return encodedRegionName + "/" + fileid + "/" + region;
+  }
+
+  // Make it serializable.
+
+  public void write(DataOutput out) throws IOException {
+    // Write out the encoded region name as a String.  Doing it as a String
+    // keeps a Reference's serialization backword compatible with
+    // pre-HBASE-82 serializations.  ALternative is rewriting all
+    // info files in hbase (Serialized References are written into the
+    // 'info' file that accompanies HBase Store files).
+    out.writeUTF(Integer.toString(encodedRegionName));
+    out.writeLong(fileid);
+    // Write true if we're doing top of the file.
+    out.writeBoolean(isTopFileRegion(region));
+    this.midkey.write(out);
+  }
+
+  public void readFields(DataInput in) throws IOException {
+    this.encodedRegionName = Integer.parseInt(in.readUTF());
+    fileid = in.readLong();
+    boolean tmp = in.readBoolean();
+    // If true, set region to top.
+    region = tmp? Range.top: Range.bottom;
+    midkey = new HStoreKey();
+    midkey.readFields(in);
+  }
+  
+  public static boolean isTopFileRegion(final Range r) {
+    return r.equals(Range.top);
+  }
+}
\ No newline at end of file

Modified: hadoop/hbase/trunk/src/java/org/apache/hadoop/hbase/regionserver/HRegion.java
URL: http://svn.apache.org/viewvc/hadoop/hbase/trunk/src/java/org/apache/hadoop/hbase/regionserver/HRegion.java?rev=711155&r1=711154&r2=711155&view=diff
==============================================================================
--- hadoop/hbase/trunk/src/java/org/apache/hadoop/hbase/regionserver/HRegion.java (original)
+++ hadoop/hbase/trunk/src/java/org/apache/hadoop/hbase/regionserver/HRegion.java Mon Nov  3 17:28:53 2008
@@ -1,5 +1,5 @@
   /**
- * Copyright 2007 The Apache Software Foundation
+ * Copyright 2008 The Apache Software Foundation
  *
  * Licensed to the Apache Software Foundation (ASF) under one
  * or more contributor license agreements.  See the NOTICE file
@@ -57,6 +57,7 @@
 import org.apache.hadoop.hbase.io.BatchUpdate;
 import org.apache.hadoop.hbase.io.Cell;
 import org.apache.hadoop.hbase.io.HbaseMapWritable;
+import org.apache.hadoop.hbase.io.Reference;
 import org.apache.hadoop.hbase.io.RowResult;
 import org.apache.hadoop.hbase.ipc.HRegionInterface;
 import org.apache.hadoop.hbase.util.Bytes;
@@ -775,15 +776,15 @@
       // Split each store file.
       for(HStoreFile h: hstoreFilesToSplit) {
         // A reference to the bottom half of the hsf store file.
-        HStoreFile.Reference aReference = new HStoreFile.Reference(
+        Reference aReference = new Reference(
             this.regionInfo.getEncodedName(), h.getFileId(),
-            new HStoreKey(midKey, this.regionInfo), HStoreFile.Range.bottom);
+            new HStoreKey(midKey, this.regionInfo), Reference.Range.bottom);
         HStoreFile a = new HStoreFile(this.conf, fs, splits,
             regionAInfo, h.getColFamily(), -1, aReference);
         // Reference to top half of the hsf store file.
-        HStoreFile.Reference bReference = new HStoreFile.Reference(
+        Reference bReference = new Reference(
             this.regionInfo.getEncodedName(), h.getFileId(),
-            new HStoreKey(midKey, this.regionInfo), HStoreFile.Range.top);
+            new HStoreKey(midKey, this.regionInfo), Reference.Range.top);
         HStoreFile b = new HStoreFile(this.conf, fs, splits,
             regionBInfo, h.getColFamily(), -1, bReference);
         h.splitStoreFile(a, b, this.fs);

Modified: hadoop/hbase/trunk/src/java/org/apache/hadoop/hbase/regionserver/HStore.java
URL: http://svn.apache.org/viewvc/hadoop/hbase/trunk/src/java/org/apache/hadoop/hbase/regionserver/HStore.java?rev=711155&r1=711154&r2=711155&view=diff
==============================================================================
--- hadoop/hbase/trunk/src/java/org/apache/hadoop/hbase/regionserver/HStore.java (original)
+++ hadoop/hbase/trunk/src/java/org/apache/hadoop/hbase/regionserver/HStore.java Mon Nov  3 17:28:53 2008
@@ -49,13 +49,15 @@
 import org.apache.hadoop.hbase.HStoreKey;
 import org.apache.hadoop.hbase.RemoteExceptionHandler;
 import org.apache.hadoop.hbase.filter.RowFilterInterface;
+import org.apache.hadoop.hbase.io.BloomFilterMapFile;
 import org.apache.hadoop.hbase.io.Cell;
+import org.apache.hadoop.hbase.io.HBaseMapFile;
 import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
+import org.apache.hadoop.hbase.io.Reference;
 import org.apache.hadoop.hbase.util.Bytes;
 import org.apache.hadoop.hbase.util.FSUtils;
 import org.apache.hadoop.io.MapFile;
 import org.apache.hadoop.io.SequenceFile;
-import org.apache.hadoop.io.Writable;
 import org.apache.hadoop.util.Progressable;
 import org.apache.hadoop.util.StringUtils;
 
@@ -191,9 +193,11 @@
     }
     this.desiredMaxFileSize = maxFileSize;
 
-    this.majorCompactionTime = conf.getLong(HConstants.MAJOR_COMPACTION_PERIOD, 86400000);
+    this.majorCompactionTime =
+      conf.getLong(HConstants.MAJOR_COMPACTION_PERIOD, 86400000);
     if (family.getValue(HConstants.MAJOR_COMPACTION_PERIOD) != null) {
-      String strCompactionTime = family.getValue(HConstants.MAJOR_COMPACTION_PERIOD);
+      String strCompactionTime =
+        family.getValue(HConstants.MAJOR_COMPACTION_PERIOD);
       this.majorCompactionTime = (new Long(strCompactionTime)).longValue();
     }
 
@@ -209,16 +213,10 @@
       this.compression = SequenceFile.CompressionType.NONE;
     }
     
-    Path mapdir = HStoreFile.getMapDir(basedir, info.getEncodedName(),
-        family.getName());
-    if (!fs.exists(mapdir)) {
-      fs.mkdirs(mapdir);
-    }
-    Path infodir = HStoreFile.getInfoDir(basedir, info.getEncodedName(),
-        family.getName());
-    if (!fs.exists(infodir)) {
-      fs.mkdirs(infodir);
-    }
+    Path mapdir = checkdir(HStoreFile.getMapDir(basedir, info.getEncodedName(),
+        family.getName()));
+    Path infodir = checkdir(HStoreFile.getInfoDir(basedir, info.getEncodedName(),
+        family.getName()));
     
     // Go through the 'mapdir' and 'infodir' together, make sure that all 
     // MapFiles are in a reliable state.  Every entry in 'mapdir' must have a 
@@ -232,27 +230,19 @@
         Bytes.toString(this.storeName) + ", max sequence id " + this.maxSeqId);
     }
     
-    try {
-      doReconstructionLog(reconstructionLog, maxSeqId, reporter);
-    } catch (EOFException e) {
-      // Presume we got here because of lack of HADOOP-1700; for now keep going
-      // but this is probably not what we want long term.  If we got here there
-      // has been data-loss
-      LOG.warn("Exception processing reconstruction log " + reconstructionLog +
-        " opening " + this.storeName +
-        " -- continuing.  Probably lack-of-HADOOP-1700 causing DATA LOSS!", e);
-    } catch (IOException e) {
-      // Presume we got here because of some HDFS issue. Don't just keep going.
-      // Fail to open the HStore.  Probably means we'll fail over and over
-      // again until human intervention but alternative has us skipping logs
-      // and losing edits: HBASE-642.
-      LOG.warn("Exception processing reconstruction log " + reconstructionLog +
-        " opening " + this.storeName, e);
-      throw e;
-    }
-
-    // Finally, start up all the map readers! (There could be more than one
-    // since we haven't compacted yet.)
+    // Do reconstruction log.
+    runReconstructionLog(reconstructionLog, this.maxSeqId, reporter);
+    
+    // Finally, start up all the map readers!
+    setupReaders();
+  }
+  
+  /*
+   * Setup the mapfile readers for this store. There could be more than one
+   * since we haven't compacted yet.
+   * @throws IOException
+   */
+  private void setupReaders() throws IOException {
     boolean first = true;
     for(Map.Entry<Long, HStoreFile> e: this.storefiles.entrySet()) {
       MapFile.Reader r = null;
@@ -270,6 +260,18 @@
     }
   }
 
+  /*
+   * @param dir If doesn't exist, create it.
+   * @return Passed <code>dir</code>.
+   * @throws IOException
+   */
+  private Path checkdir(final Path dir) throws IOException {
+    if (!fs.exists(dir)) {
+      fs.mkdirs(dir);
+    }
+    return dir;
+  }
+
   HColumnDescriptor getFamily() {
     return this.family;
   }
@@ -279,6 +281,36 @@
   }
   
   /*
+   * Run reconstuction log
+   * @param reconstructionLog
+   * @param msid
+   * @param reporter
+   * @throws IOException
+   */
+  private void runReconstructionLog(final Path reconstructionLog,
+    final long msid, final Progressable reporter)
+  throws IOException {
+    try {
+      doReconstructionLog(reconstructionLog, msid, reporter);
+    } catch (EOFException e) {
+      // Presume we got here because of lack of HADOOP-1700; for now keep going
+      // but this is probably not what we want long term.  If we got here there
+      // has been data-loss
+      LOG.warn("Exception processing reconstruction log " + reconstructionLog +
+        " opening " + this.storeName +
+        " -- continuing.  Probably lack-of-HADOOP-1700 causing DATA LOSS!", e);
+    } catch (IOException e) {
+      // Presume we got here because of some HDFS issue. Don't just keep going.
+      // Fail to open the HStore.  Probably means we'll fail over and over
+      // again until human intervention but alternative has us skipping logs
+      // and losing edits: HBASE-642.
+      LOG.warn("Exception processing reconstruction log " + reconstructionLog +
+        " opening " + this.storeName, e);
+      throw e;
+    }
+  }
+
+  /*
    * Read the reconstructionLog to see whether we need to build a brand-new 
    * MapFile out of non-flushed log entries.  
    *
@@ -396,7 +428,7 @@
       long fid = Long.parseLong(m.group(1));
       
       HStoreFile curfile = null;
-      HStoreFile.Reference reference = null;
+      Reference reference = null;
       if (isReference) {
         reference = HStoreFile.readSplitInfo(p, fs);
       }
@@ -437,7 +469,7 @@
           // TODO: This is going to fail if we are to rebuild a file from
           // meta because it won't have right comparator: HBASE-848.
           long count = MapFile.fix(this.fs, mapfile, HStoreKey.class,
-            HStoreFile.HbaseMapFile.VALUE_CLASS, false, this.conf);
+            HBaseMapFile.VALUE_CLASS, false, this.conf);
           if (LOG.isDebugEnabled()) {
             LOG.debug("Fixed index on " + mapfile.toString() + "; had " +
               count + " entries");
@@ -448,21 +480,33 @@
           continue;
         }
       }
-      storeSize += curfile.length();
+      long length = curfile.length();
+      storeSize += length;
       
       // TODO: Confirm referent exists.
       
       // Found map and sympathetic info file.  Add this hstorefile to result.
       if (LOG.isDebugEnabled()) {
         LOG.debug("loaded " + FSUtils.getPath(p) + ", isReference=" +
-          isReference + ", sequence id=" + storeSeqId);
+          isReference + ", sequence id=" + storeSeqId + ", length=" + length);
       }
       results.put(Long.valueOf(storeSeqId), curfile);
       // Keep list of sympathetic data mapfiles for cleaning info dir in next
       // section.  Make sure path is fully qualified for compare.
       mapfiles.add(this.fs.makeQualified(mapfile));
     }
-    
+    cleanDataFiles(mapfiles, mapdir);
+    return results;
+  }
+  
+  /*
+   * If no info file delete the sympathetic data file.
+   * @param mapfiles List of mapfiles.
+   * @param mapdir Directory to check.
+   * @throws IOException
+   */
+  private void cleanDataFiles(final List<Path> mapfiles, final Path mapdir)
+  throws IOException {
     // List paths by experience returns fully qualified names -- at least when
     // running on a mini hdfs cluster.
     FileStatus [] datfiles = fs.listStatus(mapdir);
@@ -474,7 +518,6 @@
         fs.delete(p, true);
       }
     }
-    return results;
   }
 
   /* 
@@ -732,19 +775,20 @@
    * @param dir
    * @throws IOException
    */
-  private static long getLowestTimestamp(FileSystem fs, Path dir) throws IOException {
-   FileStatus[] stats = fs.listStatus(dir);
-   if (stats == null || stats.length == 0) {
-     return 0l;
-   }
-   long lowTimestamp = Long.MAX_VALUE;   
-   for (int i = 0; i < stats.length; i++) {
-     long timestamp = stats[i].getModificationTime();
-     if (timestamp < lowTimestamp){
-       lowTimestamp = timestamp;
-     }
-   }
-  return lowTimestamp;
+  private static long getLowestTimestamp(FileSystem fs, Path dir)
+  throws IOException {
+    FileStatus[] stats = fs.listStatus(dir);
+    if (stats == null || stats.length == 0) {
+      return 0l;
+    }
+    long lowTimestamp = Long.MAX_VALUE;   
+    for (int i = 0; i < stats.length; i++) {
+      long timestamp = stats[i].getModificationTime();
+      if (timestamp < lowTimestamp){
+        lowTimestamp = timestamp;
+      }
+    }
+    return lowTimestamp;
   }
 
   /**
@@ -768,12 +812,11 @@
    * @return mid key if a split is needed, null otherwise
    * @throws IOException
    */
-  StoreSize compact(boolean majorCompaction) throws IOException {
+  StoreSize compact(final boolean majorCompaction) throws IOException {
     boolean forceSplit = this.info.shouldSplit(false);
     boolean doMajorCompaction = majorCompaction;
     synchronized (compactLock) {
       long maxId = -1;
-      int nrows = -1;
       List<HStoreFile> filesToCompact = null;
       synchronized (storefiles) {
         if (this.storefiles.size() <= 0) {
@@ -791,18 +834,7 @@
       // compacting below. Only check if doMajorCompaction is not true.
       long lastMajorCompaction = 0L;
       if (!doMajorCompaction) {
-        Path mapdir = HStoreFile.getMapDir(basedir, info.getEncodedName(), family.getName());
-        long lowTimestamp = getLowestTimestamp(fs, mapdir);
-        lastMajorCompaction = System.currentTimeMillis() - lowTimestamp;
-        if (lowTimestamp < (System.currentTimeMillis() - majorCompactionTime) &&
-            lowTimestamp > 0l) {
-          if (LOG.isDebugEnabled()) {
-            LOG.debug("Major compaction triggered on store: " + this.storeNameStr +
-              ". Time since last major compaction: " +
-              ((System.currentTimeMillis() - lowTimestamp)/1000) + " seconds");
-          }
-          doMajorCompaction = true;
-        }
+        doMajorCompaction = isMajorCompaction();
       }
       if (!doMajorCompaction && !hasReferences(filesToCompact) &&
           filesToCompact.size() < compactionThreshold) {
@@ -813,7 +845,7 @@
         return checkSplit(forceSplit);
       }
 
-      // HBASE-745, preparing all store file size for incremental compacting
+      // HBASE-745, preparing all store file sizes for incremental compacting
       // selection.
       int countOfFiles = filesToCompact.size();
       long totalSize = 0;
@@ -866,25 +898,7 @@
        * based access pattern is practically designed to ruin the cache.
        */
       List<MapFile.Reader> rdrs = new ArrayList<MapFile.Reader>();
-      for (HStoreFile file: filesToCompact) {
-        try {
-          HStoreFile.BloomFilterMapFile.Reader reader =
-            file.getReader(fs, false, false);
-          rdrs.add(reader);
-          
-          // Compute the size of the new bloomfilter if needed
-          if (this.family.isBloomfilter()) {
-            nrows += reader.getBloomFilterSize();
-          }
-        } catch (IOException e) {
-          // Add info about which file threw exception. It may not be in the
-          // exception message so output a message here where we know the
-          // culprit.
-          LOG.warn("Failed with " + e.toString() + ": " + file.toString());
-          closeCompactionReaders(rdrs);
-          throw e;
-        }
-      }
+      int nrows = createReaders(rdrs, filesToCompact);
 
       // Step through them, writing to the brand-new MapFile
       HStoreFile compactedOutputFile = new HStoreFile(conf, fs, 
@@ -915,9 +929,75 @@
       }
     }
     return checkSplit(forceSplit);
-  }
+  }  
 
   /*
+   * @return True if we should run a major compaction.
+   */
+  private boolean isMajorCompaction() throws IOException {
+    boolean result = false;
+    Path mapdir = HStoreFile.getMapDir(this.basedir, this.info.getEncodedName(),
+        this.family.getName());
+    long lowTimestamp = getLowestTimestamp(fs, mapdir);
+    if (lowTimestamp < (System.currentTimeMillis() - this.majorCompactionTime) &&
+        lowTimestamp > 0l) {
+      if (LOG.isDebugEnabled()) {
+        LOG.debug("Major compaction triggered on store: " +
+          this.storeNameStr + ". Time since last major compaction: " +
+          ((System.currentTimeMillis() - lowTimestamp)/1000) + " seconds");
+      }
+      result = true;
+    }
+    return result;
+  }
+  
+  /*
+   * Create readers for the passed in list of HStoreFiles and add them to
+   * <code>readers</code> list.
+   * @param readers Add Readers here.
+   * @param files List of HSFs to make Readers for.
+   * @return Count of rows for bloom filter sizing.  Returns -1 if no bloom
+   * filter wanted.
+   */
+  private int createReaders(final List<MapFile.Reader> rs,
+    final List<HStoreFile> files)
+  throws IOException {
+    /* We create a new list of MapFile.Reader objects so we don't screw up
+     * the caching associated with the currently-loaded ones. Our iteration-
+     * based access pattern is practically designed to ruin the cache.
+     */
+    int nrows = -1;
+    for (HStoreFile file: files) {
+      try {
+        // TODO: Readers are opened without block-cache enabled.  Means we don't
+        // get the prefetch that makes the read faster.  But we don't want to
+        // enable block-cache for these readers that are about to be closed.
+        // The compaction of soon-to-be closed readers will probably force out
+        // blocks that may be needed servicing real-time requests whereas
+        // compaction runs in background.  TODO: We know we're going to read
+        // this file straight through.  Leverage this fact.  Use a big buffer
+        // client side to speed things up or read it all up into memory one file
+        // at a time or pull local and memory-map the file but leave the writer
+        // up in hdfs?
+        BloomFilterMapFile.Reader reader = file.getReader(fs, false, false);
+        rs.add(reader);
+        // Compute the size of the new bloomfilter if needed
+        if (this.family.isBloomfilter()) {
+          nrows += reader.getBloomFilterSize();
+        }
+      } catch (IOException e) {
+        // Add info about which file threw exception. It may not be in the
+        // exception message so output a message here where we know the
+        // culprit.
+        LOG.warn("Failed with " + e.toString() + ": " + file.toString());
+        closeCompactionReaders(rs);
+        throw e;
+      }
+    }
+    return nrows;
+  }
+  
+  /*
    * Compact a list of MapFile.Readers into MapFile.Writer.
    * 
    * We work by iterating through the readers in parallel. We always increment
@@ -1166,7 +1246,6 @@
         // time
         getFullFromMapFile(map, key, columns, deletes, results);
       }
-      
     } finally {
       this.lock.readLock().unlock();
     }
@@ -1203,7 +1282,7 @@
             // recent delete timestamp, record it for later
             if (!deletes.containsKey(readcol) 
               || deletes.get(readcol).longValue() < readkey.getTimestamp()) {
-              deletes.put(readcol, readkey.getTimestamp());              
+              deletes.put(readcol, Long.valueOf(readkey.getTimestamp()));              
             }
           } else if (!(deletes.containsKey(readcol) 
             && deletes.get(readcol).longValue() >= readkey.getTimestamp()) ) {
@@ -1377,7 +1456,8 @@
    * @return Matching keys.
    * @throws IOException
    */
-  public List<HStoreKey> getKeys(final HStoreKey origin, final int versions, final long now)
+  public List<HStoreKey> getKeys(final HStoreKey origin, final int versions,
+    final long now)
   throws IOException {
     // This code below is very close to the body of the get method.  Any 
     // changes in the flow below should also probably be done in get.  TODO:
@@ -1788,6 +1868,9 @@
    * @throws IOException
    */
   private HStoreKey getFinalKey(final MapFile.Reader mf) throws IOException {
+    if (mf instanceof HBaseMapFile.HBaseReader) {
+      return ((HBaseMapFile.HBaseReader)mf).getFinalKey();
+    }
     HStoreKey finalKey = new HStoreKey(); 
     mf.finalKey(finalKey);
     finalKey.setHRegionInfo(this.info);
@@ -1839,10 +1922,10 @@
   
   /**
    * Determines if HStore can be split
-   * 
+   * @param force Whether to force a split or not.
    * @return a StoreSize if store can be split, null otherwise
    */
-  StoreSize checkSplit(boolean force) {
+  StoreSize checkSplit(final boolean force) {
     if (this.storefiles.size() <= 0) {
       return null;
     }
@@ -1859,39 +1942,35 @@
       synchronized (storefiles) {
         for (Map.Entry<Long, HStoreFile> e: storefiles.entrySet()) {
           HStoreFile curHSF = e.getValue();
+          if (splitable) {
+            splitable = !curHSF.isReference();
+            if (!splitable) {
+              // RETURN IN MIDDLE OF FUNCTION!!! If not splitable, just return.
+              return null;
+            }
+          }
           long size = curHSF.length();
           if (size > maxSize) {
             // This is the largest one so far
             maxSize = size;
             mapIndex = e.getKey();
           }
-          if (splitable) {
-            splitable = !curHSF.isReference();
-          }
         }
       }
-      if (!splitable) {
-        return null;
-      }
-      MapFile.Reader r = this.readers.get(mapIndex);
 
-      // seek back to the beginning of mapfile
-      r.reset();
-
-      // get the first and last keys
-      HStoreKey firstKey = new HStoreKey();
-      HStoreKey lastKey = new HStoreKey();
-      Writable value = new ImmutableBytesWritable();
-      r.next(firstKey, value);
-      r.finalKey(lastKey);
+      // Cast to HbaseReader.
+      HBaseMapFile.HBaseReader r =
+        (HBaseMapFile.HBaseReader)this.readers.get(mapIndex);
 
       // get the midkey
       HStoreKey mk = (HStoreKey)r.midKey();
       if (mk != null) {
         // if the midkey is the same as the first and last keys, then we cannot
         // (ever) split this region. 
-        if (HStoreKey.equalsTwoRowKeys(info, mk.getRow(), firstKey.getRow()) && 
-            HStoreKey.equalsTwoRowKeys(info, mk.getRow(), lastKey.getRow())) {
+        if (HStoreKey.equalsTwoRowKeys(info, mk.getRow(),
+              r.getFirstKey().getRow()) && 
+            HStoreKey.equalsTwoRowKeys(info, mk.getRow(),
+              r.getFinalKey().getRow())) {
           return null;
         }
         return new StoreSize(maxSize, mk.getRow());
@@ -1967,6 +2046,9 @@
     return this.storefiles.size();
   }
   
+  /*
+   * Datastructure that holds size and key.
+   */
   class StoreSize {
     private final long size;
     private final byte[] key;
@@ -1988,4 +2070,4 @@
   HRegionInfo getHRegionInfo() {
     return this.info;
   }
-}
+}
\ No newline at end of file

Modified: hadoop/hbase/trunk/src/java/org/apache/hadoop/hbase/regionserver/HStoreFile.java
URL: http://svn.apache.org/viewvc/hadoop/hbase/trunk/src/java/org/apache/hadoop/hbase/regionserver/HStoreFile.java?rev=711155&r1=711154&r2=711155&view=diff
==============================================================================
--- hadoop/hbase/trunk/src/java/org/apache/hadoop/hbase/regionserver/HStoreFile.java (original)
+++ hadoop/hbase/trunk/src/java/org/apache/hadoop/hbase/regionserver/HStoreFile.java Mon Nov  3 17:28:53 2008
@@ -19,19 +19,13 @@
  */
 package org.apache.hadoop.hbase.regionserver;
 
-import java.io.ByteArrayInputStream;
-import java.io.ByteArrayOutputStream;
-import java.io.DataInput;
 import java.io.DataInputStream;
-import java.io.DataOutput;
-import java.io.DataOutputStream;
 import java.io.FileNotFoundException;
 import java.io.IOException;
 import java.util.Random;
 
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
-import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FSDataInputStream;
 import org.apache.hadoop.fs.FSDataOutputStream;
 import org.apache.hadoop.fs.FileSystem;
@@ -39,18 +33,12 @@
 import org.apache.hadoop.hbase.HBaseConfiguration;
 import org.apache.hadoop.hbase.HConstants;
 import org.apache.hadoop.hbase.HRegionInfo;
-import org.apache.hadoop.hbase.HStoreKey;
-import org.apache.hadoop.hbase.io.BlockFSInputStream;
-import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
+import org.apache.hadoop.hbase.io.BloomFilterMapFile;
+import org.apache.hadoop.hbase.io.HalfMapFileReader;
+import org.apache.hadoop.hbase.io.Reference;
 import org.apache.hadoop.hbase.util.Bytes;
-import org.apache.hadoop.hbase.util.Hash;
-import org.apache.hadoop.hbase.util.Writables;
 import org.apache.hadoop.io.MapFile;
 import org.apache.hadoop.io.SequenceFile;
-import org.apache.hadoop.io.Writable;
-import org.apache.hadoop.io.WritableComparable;
-import org.onelab.filter.BloomFilter;
-import org.onelab.filter.Key;
 
 /**
  * A HStore data file.  HStores usually have one or more of these files.  They
@@ -99,17 +87,6 @@
   static final String HSTORE_INFO_DIR = "info";
   static final String HSTORE_FILTER_DIR = "filter";
   
-  /** 
-   * For split HStoreFiles, specifies if the file covers the lower half or
-   * the upper half of the key range
-   */
-  public static enum Range {
-    /** HStoreFile contains upper half of key range */
-    top,
-    /** HStoreFile contains lower half of key range */
-    bottom
-  }
-  
   private final static Random rand = new Random();
 
   private final Path basedir;
@@ -154,8 +131,7 @@
     this.fileId = id;
     
     // If a reference, construction does not write the pointer files.  Thats
-    // done by invocations of writeReferenceFiles(hsf, fs).  Happens at fast
-    // split time.
+    // done by invocations of writeReferenceFiles(hsf, fs). Happens at split.
     this.reference = ref;
   }
 
@@ -287,11 +263,11 @@
   /**
    * @see #writeSplitInfo(FileSystem fs)
    */
-  static HStoreFile.Reference readSplitInfo(final Path p, final FileSystem fs)
+  static Reference readSplitInfo(final Path p, final FileSystem fs)
   throws IOException {
     FSDataInputStream in = fs.open(p);
     try {
-      HStoreFile.Reference r = new HStoreFile.Reference();
+      Reference r = new Reference();
       r.readFields(in);
       return r;
     } finally {
@@ -319,7 +295,8 @@
   long loadInfo(FileSystem fs) throws IOException {
     Path p = null;
     if (isReference()) {
-      p = getInfoFilePath(reference.getEncodedRegionName(), reference.getFileId());
+      p = getInfoFilePath(reference.getEncodedRegionName(),
+        this.reference.getFileId());
     } else {
       p = getInfoFilePath();
     }
@@ -405,7 +382,7 @@
       final boolean bloomFilter, final boolean blockCacheEnabled)
   throws IOException {
     if (isReference()) {
-      return new HStoreFile.HalfMapFileReader(fs,
+      return new HalfMapFileReader(fs,
           getMapFilePath(reference).toString(), conf, 
           reference.getFileRegion(), reference.getMidkey(), bloomFilter,
           blockCacheEnabled, this.hri);
@@ -453,10 +430,6 @@
     return encodedRegionName + "/" + Bytes.toString(colFamily) + "/" + fileId +
       (isReference()? "-" + reference.toString(): "");
   }
-  
-  static boolean isTopFileRegion(final Range r) {
-    return r.equals(Range.top);
-  }
 
   private static String createHStoreFilename(final long fid,
       final int encodedRegionName) {
@@ -510,525 +483,4 @@
     return new Path(base, new Path(Integer.toString(encodedRegionName),
         new Path(Bytes.toString(f), subdir)));
   }
-
-  /*
-   * Data structure to hold reference to a store file over in another region.
-   */
-  static class Reference implements Writable {
-    private int encodedRegionName;
-    private long fileid;
-    private Range region;
-    private HStoreKey midkey;
-    
-    Reference(final int ern, final long fid, final HStoreKey m,
-        final Range fr) {
-      this.encodedRegionName = ern;
-      this.fileid = fid;
-      this.region = fr;
-      this.midkey = m;
-    }
-    
-    Reference() {
-      this(-1, -1, null, Range.bottom);
-    }
-
-    long getFileId() {
-      return fileid;
-    }
-
-    Range getFileRegion() {
-      return region;
-    }
-    
-    HStoreKey getMidkey() {
-      return midkey;
-    }
-    
-    int getEncodedRegionName() {
-      return this.encodedRegionName;
-    }
-   
-    @Override
-    public String toString() {
-      return encodedRegionName + "/" + fileid + "/" + region;
-    }
-
-    // Make it serializable.
-
-    public void write(DataOutput out) throws IOException {
-      // Write out the encoded region name as a String.  Doing it as a String
-      // keeps a Reference's serialziation backword compatible with
-      // pre-HBASE-82 serializations.  ALternative is rewriting all
-      // info files in hbase (Serialized References are written into the
-      // 'info' file that accompanies HBase Store files).
-      out.writeUTF(Integer.toString(encodedRegionName));
-      out.writeLong(fileid);
-      // Write true if we're doing top of the file.
-      out.writeBoolean(isTopFileRegion(region));
-      midkey.write(out);
-    }
-
-    public void readFields(DataInput in) throws IOException {
-      this.encodedRegionName = Integer.parseInt(in.readUTF());
-      fileid = in.readLong();
-      boolean tmp = in.readBoolean();
-      // If true, set region to top.
-      region = tmp? Range.top: Range.bottom;
-      midkey = new HStoreKey();
-      midkey.readFields(in);
-    }
-  }
-
-  /**
-   * Hbase customizations of MapFile.
-   */
-  static class HbaseMapFile extends MapFile {
-    static final Class<? extends Writable>  VALUE_CLASS =
-      ImmutableBytesWritable.class;
-
-    /**
-     * Custom bloom filter key maker.
-     * @param key
-     * @return Key made of bytes of row only.
-     */
-    protected static Key getBloomFilterKey(WritableComparable key) {
-      return new Key(((HStoreKey) key).getRow());
-    }
-
-    /**
-     * A reader capable of reading and caching blocks of the data file.
-     */
-    static class HbaseReader extends MapFile.Reader {
-
-      private final boolean blockCacheEnabled;
-
-      /**
-       * @param fs
-       * @param dirName
-       * @param conf
-       * @param hri
-       * @throws IOException
-       */
-      public HbaseReader(FileSystem fs, String dirName, Configuration conf,
-        HRegionInfo hri)
-      throws IOException {
-        this(fs, dirName, conf, false, hri);
-      }
-      
-      /**
-       * @param fs
-       * @param dirName
-       * @param conf
-       * @param blockCacheEnabled
-       * @param hri
-       * @throws IOException
-       */
-      public HbaseReader(FileSystem fs, String dirName, Configuration conf,
-          boolean blockCacheEnabled, HRegionInfo hri)
-      throws IOException {
-        super(fs, dirName, new HStoreKey.HStoreKeyWritableComparator(hri), 
-            conf, false); // defer opening streams
-        this.blockCacheEnabled = blockCacheEnabled;
-        open(fs, dirName, new HStoreKey.HStoreKeyWritableComparator(hri), conf);
-        
-        // Force reading of the mapfile index by calling midKey.
-        // Reading the index will bring the index into memory over
-        // here on the client and then close the index file freeing
-        // up socket connection and resources in the datanode. 
-        // Usually, the first access on a MapFile.Reader will load the
-        // index force the issue in HStoreFile MapFiles because an
-        // access may not happen for some time; meantime we're
-        // using up datanode resources.  See HADOOP-2341.
-        midKey();
-      }
-
-      @Override
-      protected org.apache.hadoop.io.SequenceFile.Reader createDataFileReader(
-          FileSystem fs, Path dataFile, Configuration conf)
-      throws IOException {
-        if (!blockCacheEnabled) {
-          return super.createDataFileReader(fs, dataFile, conf);
-        }
-        LOG.info("Block Cache enabled");
-        final int blockSize = conf.getInt("hbase.hstore.blockCache.blockSize",
-            64 * 1024);
-        return new SequenceFile.Reader(fs, dataFile,  conf) {
-          @Override
-          protected FSDataInputStream openFile(FileSystem fs, Path file,
-              int bufferSize, long length) throws IOException {
-            
-            return new FSDataInputStream(new BlockFSInputStream(
-                    super.openFile(fs, file, bufferSize, length), length,
-                    blockSize));
-          }
-        };
-      }
-    }
-    
-    static class HbaseWriter extends MapFile.Writer {
-      /**
-       * @param conf
-       * @param fs
-       * @param dirName
-       * @param compression
-       * @param hri
-       * @throws IOException
-       */
-      public HbaseWriter(Configuration conf, FileSystem fs, String dirName,
-          SequenceFile.CompressionType compression, final HRegionInfo hri)
-      throws IOException {
-        super(conf, fs, dirName, new HStoreKey.HStoreKeyWritableComparator(hri),
-           VALUE_CLASS, compression);
-        // Default for mapfiles is 128.  Makes random reads faster if we
-        // have more keys indexed and we're not 'next'-ing around in the
-        // mapfile.
-        setIndexInterval(conf.getInt("hbase.io.index.interval", 128));
-      }
-    }
-  }
-  
-  /**
-   * On write, all keys are added to a bloom filter.  On read, all keys are
-   * tested first against bloom filter. Keys are HStoreKey.  If passed bloom
-   * filter is null, just passes invocation to parent.
-   */
-  static class BloomFilterMapFile extends HbaseMapFile {
-    protected static final String BLOOMFILTER_FILE_NAME = "filter";
-
-    static class Reader extends HbaseReader {
-      private final BloomFilter bloomFilter;
-
-      /**
-       * @param fs
-       * @param dirName
-       * @param conf
-       * @param filter
-       * @param blockCacheEnabled
-       * @param hri
-       * @throws IOException
-       */
-      public Reader(FileSystem fs, String dirName, Configuration conf,
-          final boolean filter, final boolean blockCacheEnabled, 
-          HRegionInfo hri)
-      throws IOException {
-        super(fs, dirName, conf, blockCacheEnabled, hri);
-        if (filter) {
-          this.bloomFilter = loadBloomFilter(fs, dirName);
-        } else {
-          this.bloomFilter = null;
-        }
-      }
-
-      private BloomFilter loadBloomFilter(FileSystem fs, String dirName)
-      throws IOException {
-        Path filterFile = new Path(dirName, BLOOMFILTER_FILE_NAME);
-        if(!fs.exists(filterFile)) {
-          throw new FileNotFoundException("Could not find bloom filter: " +
-              filterFile);
-        }
-        BloomFilter filter = new BloomFilter();
-        FSDataInputStream in = fs.open(filterFile);
-        try {
-          filter.readFields(in);
-        } finally {
-          in.close();
-        }
-        return filter;
-      }
-      
-      @Override
-      public Writable get(WritableComparable key, Writable val)
-      throws IOException {
-        if (bloomFilter == null) {
-          return super.get(key, val);
-        }
-        if(bloomFilter.membershipTest(getBloomFilterKey(key))) {
-          if (LOG.isDebugEnabled()) {
-            LOG.debug("bloom filter reported that key exists");
-          }
-          return super.get(key, val);
-        }
-        if (LOG.isDebugEnabled()) {
-          LOG.debug("bloom filter reported that key does not exist");
-        }
-        return null;
-      }
-
-      @Override
-      public WritableComparable getClosest(WritableComparable key,
-          Writable val) throws IOException {
-        if (bloomFilter == null) {
-          return super.getClosest(key, val);
-        }
-        // Note - the key being passed to us is always a HStoreKey
-        if(bloomFilter.membershipTest(getBloomFilterKey(key))) {
-          if (LOG.isDebugEnabled()) {
-            LOG.debug("bloom filter reported that key exists");
-          }
-          return super.getClosest(key, val);
-        }
-        if (LOG.isDebugEnabled()) {
-          LOG.debug("bloom filter reported that key does not exist");
-        }
-        return null;
-      }
-      
-      /* @return size of the bloom filter */
-      int getBloomFilterSize() {
-        return bloomFilter == null ? 0 : bloomFilter.getVectorSize();
-      }
-    }
-    
-    static class Writer extends HbaseWriter {
-      private static final double DEFAULT_NUMBER_OF_HASH_FUNCTIONS = 4.0;
-      private final BloomFilter bloomFilter;
-      private final String dirName;
-      private final FileSystem fs;
-      
-      /**
-       * @param conf
-       * @param fs
-       * @param dirName
-       * @param compression
-       * @param filter
-       * @param nrows
-       * @param hri
-       * @throws IOException
-       */
-      @SuppressWarnings("unchecked")
-      public Writer(Configuration conf, FileSystem fs, String dirName,
-        SequenceFile.CompressionType compression, final boolean filter,
-        int nrows, final HRegionInfo hri)
-      throws IOException {
-        super(conf, fs, dirName, compression, hri);
-        this.dirName = dirName;
-        this.fs = fs;
-        if (filter) {
-          /* 
-           * There is no way to automatically determine the vector size and the
-           * number of hash functions to use. In particular, bloom filters are
-           * very sensitive to the number of elements inserted into them. For
-           * HBase, the number of entries depends on the size of the data stored
-           * in the column. Currently the default region size is 256MB, so the
-           * number of entries is approximately 
-           * 256MB / (average value size for column).
-           * 
-           * If m denotes the number of bits in the Bloom filter (vectorSize),
-           * n denotes the number of elements inserted into the Bloom filter and
-           * k represents the number of hash functions used (nbHash), then
-           * according to Broder and Mitzenmacher,
-           * 
-           * ( http://www.eecs.harvard.edu/~michaelm/NEWWORK/postscripts/BloomFilterSurvey.pdf )
-           * 
-           * the probability of false positives is minimized when k is
-           * approximately m/n ln(2).
-           * 
-           * If we fix the number of hash functions and know the number of
-           * entries, then the optimal vector size m = (k * n) / ln(2)
-           */
-          this.bloomFilter = new BloomFilter(
-              (int) Math.ceil(
-                  (DEFAULT_NUMBER_OF_HASH_FUNCTIONS * (1.0 * nrows)) /
-                  Math.log(2.0)),
-              (int) DEFAULT_NUMBER_OF_HASH_FUNCTIONS,
-              Hash.getHashType(conf)
-          );
-        } else {
-          this.bloomFilter = null;
-        }
-      }
-
-      @Override
-      public void append(WritableComparable key, Writable val)
-      throws IOException {
-        if (bloomFilter != null) {
-          bloomFilter.add(getBloomFilterKey(key));
-        }
-        super.append(key, val);
-      }
-
-      @Override
-      public synchronized void close() throws IOException {
-        super.close();
-        if (this.bloomFilter != null) {
-          flushBloomFilter();
-        }
-      }
-      
-      /**
-       * Flushes bloom filter to disk
-       * 
-       * @throws IOException
-       */
-      private void flushBloomFilter() throws IOException {
-        if (LOG.isDebugEnabled()) {
-          LOG.debug("flushing bloom filter for " + this.dirName);
-        }
-        FSDataOutputStream out =
-          fs.create(new Path(dirName, BLOOMFILTER_FILE_NAME));
-        try {
-          bloomFilter.write(out);
-        } finally {
-          out.close();
-        }
-        if (LOG.isDebugEnabled()) {
-          LOG.debug("flushed bloom filter for " + this.dirName);
-        }
-      }
-    }
-  }
-  
-  /**
-   * A facade for a {@link MapFile.Reader} that serves up either the top or
-   * bottom half of a MapFile (where 'bottom' is the first half of the file
-   * containing the keys that sort lowest and 'top' is the second half of the
-   * file with keys that sort greater than those of the bottom half).
-   * Subclasses BloomFilterMapFile.Reader in case 
-   * 
-   * <p>This file is not splitable.  Calls to {@link #midKey()} return null.
-   */
-  static class HalfMapFileReader extends BloomFilterMapFile.Reader {
-    private final boolean top;
-    private final WritableComparable midkey;
-    private boolean firstNextCall = true;
-    
-    HalfMapFileReader(final FileSystem fs, final String dirName, 
-        final Configuration conf, final Range r,
-        final WritableComparable midKey,
-        final HRegionInfo hri)
-    throws IOException {
-      this(fs, dirName, conf, r, midKey, false, false, hri);
-    }
-    
-    HalfMapFileReader(final FileSystem fs, final String dirName, 
-        final Configuration conf, final Range r,
-        final WritableComparable midKey, final boolean filter,
-        final boolean blockCacheEnabled,
-        final HRegionInfo hri)
-    throws IOException {
-      super(fs, dirName, conf, filter, blockCacheEnabled, hri);
-      top = isTopFileRegion(r);
-      midkey = midKey;
-    }
-    
-    @SuppressWarnings("unchecked")
-    private void checkKey(final WritableComparable key)
-    throws IOException {
-      if (top) {
-        if (key.compareTo(midkey) < 0) {
-          throw new IOException("Illegal Access: Key is less than midKey of " +
-          "backing mapfile");
-        }
-      } else if (key.compareTo(midkey) >= 0) {
-        throw new IOException("Illegal Access: Key is greater than or equal " +
-        "to midKey of backing mapfile");
-      }
-    }
-
-    @Override
-    public synchronized void finalKey(WritableComparable key)
-    throws IOException {
-      if (top) {
-        super.finalKey(key); 
-      } else {
-        reset();
-        Writable value = new ImmutableBytesWritable();
-        WritableComparable k = super.getClosest(midkey, value, true);
-        ByteArrayOutputStream byteout = new ByteArrayOutputStream();
-        DataOutputStream out = new DataOutputStream(byteout);
-        k.write(out);
-        ByteArrayInputStream bytein =
-          new ByteArrayInputStream(byteout.toByteArray());
-        DataInputStream in = new DataInputStream(bytein);
-        key.readFields(in);
-      }
-    }
-
-    @Override
-    public synchronized Writable get(WritableComparable key, Writable val)
-        throws IOException {
-      checkKey(key);
-      return super.get(key, val);
-    }
-
-    @SuppressWarnings("unchecked")
-    @Override
-    public synchronized WritableComparable getClosest(WritableComparable key,
-      Writable val)
-    throws IOException {
-      WritableComparable closest = null;
-      if (top) {
-        // If top, the lowest possible key is midkey.  Do not have to check
-        // what comes back from super getClosest.  Will return exact match or
-        // greater.
-        closest = (key.compareTo(this.midkey) < 0)?
-          this.midkey: super.getClosest(key, val);
-      } else {
-        // We're serving bottom of the file.
-        if (key.compareTo(this.midkey) < 0) {
-          // Check key is within range for bottom.
-          closest = super.getClosest(key, val);
-          // midkey was made against largest store file at time of split. Smaller
-          // store files could have anything in them.  Check return value is
-          // not beyond the midkey (getClosest returns exact match or next
-          // after).
-          if (closest != null && closest.compareTo(this.midkey) >= 0) {
-            // Don't let this value out.
-            closest = null;
-          }
-        }
-        // Else, key is > midkey so let out closest = null.
-      }
-      return closest;
-    }
-
-    @SuppressWarnings("unused")
-    @Override
-    public synchronized WritableComparable midKey() throws IOException {
-      // Returns null to indicate file is not splitable.
-      return null;
-    }
-
-    @SuppressWarnings("unchecked")
-    @Override
-    public synchronized boolean next(WritableComparable key, Writable val)
-    throws IOException {
-      if (firstNextCall) {
-        firstNextCall = false;
-        if (this.top) {
-          // Seek to midkey.  Midkey may not exist in this file.  That should be
-          // fine.  Then we'll either be positioned at end or start of file.
-          WritableComparable nearest = getClosest(midkey, val);
-          // Now copy the mid key into the passed key.
-          if (nearest != null) {
-            Writables.copyWritable(nearest, key);
-            return true;
-          }
-          return false;
-        }
-      }
-      boolean result = super.next(key, val);
-      if (!top && key.compareTo(midkey) >= 0) {
-        result = false;
-      }
-      return result;
-    }
-
-    @Override
-    public synchronized void reset() throws IOException {
-      if (top) {
-        firstNextCall = true;
-        seek(midkey);
-        return;
-      }
-      super.reset();
-    }
-
-    @Override
-    public synchronized boolean seek(WritableComparable key)
-    throws IOException {
-      checkKey(key);
-      return super.seek(key);
-    }
-  }
-}
+}
\ No newline at end of file

Modified: hadoop/hbase/trunk/src/test/org/apache/hadoop/hbase/HBaseClusterTestCase.java
URL: http://svn.apache.org/viewvc/hadoop/hbase/trunk/src/test/org/apache/hadoop/hbase/HBaseClusterTestCase.java?rev=711155&r1=711154&r2=711155&view=diff
==============================================================================
--- hadoop/hbase/trunk/src/test/org/apache/hadoop/hbase/HBaseClusterTestCase.java (original)
+++ hadoop/hbase/trunk/src/test/org/apache/hadoop/hbase/HBaseClusterTestCase.java Mon Nov  3 17:28:53 2008
@@ -21,24 +21,22 @@
 
 import java.io.PrintWriter;
 
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hbase.client.HConnectionManager;
+import org.apache.hadoop.hbase.client.HTable;
 import org.apache.hadoop.hbase.util.FSUtils;
 import org.apache.hadoop.hdfs.MiniDFSCluster;
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.util.ReflectionUtils;
-import org.apache.hadoop.hbase.client.HConnectionManager;
-import org.apache.hadoop.hbase.client.HTable;
 
 /**
  * Abstract base class for HBase cluster junit tests.  Spins up an hbase
  * cluster in setup and tears it down again in tearDown.
  */
 public abstract class HBaseClusterTestCase extends HBaseTestCase {
-  private static final Log LOG =
-    LogFactory.getLog(HBaseClusterTestCase.class.getName());
-  
+  private static final Log LOG = LogFactory.getLog(HBaseClusterTestCase.class);
   protected MiniHBaseCluster cluster;
   protected MiniDFSCluster dfsCluster;
   protected int regionServers;
@@ -85,7 +83,6 @@
   /**
    * Actually start the MiniHBase instance.
    */
-  @SuppressWarnings("unused")
   protected void hBaseClusterSetup() throws Exception {
     // start the mini cluster
     this.cluster = new MiniHBaseCluster(conf, regionServers);
@@ -93,7 +90,7 @@
     // We need to sleep because we cannot open a HTable when the cluster
     // is not ready
     Thread.sleep(5000);
-    HTable meta = new HTable(conf, ".META.");
+    new HTable(conf, HConstants.META_TABLE_NAME);
   }
   
   /**
@@ -112,12 +109,12 @@
 
         // mangle the conf so that the fs parameter points to the minidfs we
         // just started up
-        FileSystem fs = dfsCluster.getFileSystem();
-        conf.set("fs.default.name", fs.getUri().toString());      
-        Path parentdir = fs.getHomeDirectory();
+        FileSystem filesystem = dfsCluster.getFileSystem();
+        conf.set("fs.default.name", filesystem.getUri().toString());      
+        Path parentdir = filesystem.getHomeDirectory();
         conf.set(HConstants.HBASE_DIR, parentdir.toString());
-        fs.mkdirs(parentdir);
-        FSUtils.setVersion(fs, parentdir);
+        filesystem.mkdirs(parentdir);
+        FSUtils.setVersion(filesystem, parentdir);
       }
 
       // do the super setup now. if we had done it first, then we would have