You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hbase.apache.org by st...@apache.org on 2010/08/27 07:01:07 UTC

svn commit: r990018 [4/10] - in /hbase/branches/0.90_master_rewrite: ./ bin/ bin/replication/ src/assembly/ src/docbkx/ src/main/java/org/apache/hadoop/hbase/ src/main/java/org/apache/hadoop/hbase/client/ src/main/java/org/apache/hadoop/hbase/filter/ s...

Modified: hbase/branches/0.90_master_rewrite/src/main/java/org/apache/hadoop/hbase/regionserver/ScanQueryMatcher.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.90_master_rewrite/src/main/java/org/apache/hadoop/hbase/regionserver/ScanQueryMatcher.java?rev=990018&r1=990017&r2=990018&view=diff
==============================================================================
--- hbase/branches/0.90_master_rewrite/src/main/java/org/apache/hadoop/hbase/regionserver/ScanQueryMatcher.java (original)
+++ hbase/branches/0.90_master_rewrite/src/main/java/org/apache/hadoop/hbase/regionserver/ScanQueryMatcher.java Fri Aug 27 05:01:02 2010
@@ -20,9 +20,12 @@
 
 package org.apache.hadoop.hbase.regionserver;
 
+import org.apache.hadoop.hbase.HConstants;
 import org.apache.hadoop.hbase.KeyValue;
 import org.apache.hadoop.hbase.client.Scan;
+import org.apache.hadoop.hbase.filter.Filter;
 import org.apache.hadoop.hbase.filter.Filter.ReturnCode;
+import org.apache.hadoop.hbase.io.TimeRange;
 import org.apache.hadoop.hbase.util.Bytes;
 
 import java.util.NavigableSet;
@@ -30,13 +33,36 @@ import java.util.NavigableSet;
 /**
  * A query matcher that is specifically designed for the scan case.
  */
-public class ScanQueryMatcher extends QueryMatcher {
+public class ScanQueryMatcher {
   // Optimization so we can skip lots of compares when we decide to skip
   // to the next row.
   private boolean stickyNextRow;
+  private byte[] stopRow;
+
+  protected TimeRange tr;
+
+  protected Filter filter;
+
+  /** Keeps track of deletes */
+  protected DeleteTracker deletes;
+
+  /** Keeps track of columns and versions */
+  protected ColumnTracker columns;
+
+  /** Key to seek to in memstore and StoreFiles */
+  protected KeyValue startKey;
+
+  /** Oldest allowed version stamp for TTL enforcement */
+  protected long oldestStamp;
+
+  /** Row comparator for the region this query is for */
+  KeyValue.KeyComparator rowComparator;
+
+  /** Row the query is on */
+  protected byte [] row;
 
   /**
-   * Constructs a QueryMatcher for a Scan.
+   * Constructs a ScanQueryMatcher for a Scan.
    * @param scan
    * @param family
    * @param columns
@@ -50,6 +76,7 @@ public class ScanQueryMatcher extends Qu
     this.oldestStamp = System.currentTimeMillis() - ttl;
     this.rowComparator = rowComparator;
     this.deletes =  new ScanDeleteTracker();
+    this.stopRow = scan.getStopRow();
     this.startKey = KeyValue.createFirstOnRow(scan.getStartRow());
     this.filter = scan.getFilter();
 
@@ -98,7 +125,7 @@ public class ScanQueryMatcher extends Qu
       // could optimize this, if necessary?
       // Could also be called SEEK_TO_CURRENT_ROW, but this
       // should be rare/never happens.
-      return MatchCode.SKIP;
+      return MatchCode.SEEK_NEXT_ROW;
     }
 
     // optimize case.
@@ -123,7 +150,7 @@ public class ScanQueryMatcher extends Qu
     long timestamp = kv.getTimestamp();
     if (isExpired(timestamp)) {
       // done, the rest of this column will also be expired as well.
-      return MatchCode.SEEK_NEXT_COL;
+      return getNextRowOrNextColumn(bytes, offset, qualLength);
     }
 
     byte type = kv.getType();
@@ -140,17 +167,39 @@ public class ScanQueryMatcher extends Qu
       return MatchCode.SKIP;
     }
 
-    if (!tr.withinTimeRange(timestamp)) {
+    if (!this.deletes.isEmpty() &&
+        deletes.isDeleted(bytes, offset, qualLength, timestamp)) {
       return MatchCode.SKIP;
     }
 
-    if (!this.deletes.isEmpty() &&
-        deletes.isDeleted(bytes, offset, qualLength, timestamp)) {
+    int timestampComparison = tr.compare(timestamp);
+    if (timestampComparison >= 1) {
       return MatchCode.SKIP;
+    } else if (timestampComparison <= -1) {
+      return getNextRowOrNextColumn(bytes, offset, qualLength);
     }
 
-    MatchCode colChecker = columns.checkColumn(bytes, offset, qualLength);
+    /**
+     * Filters should be checked before checking column trackers. If we do
+     * otherwise, as was previously being done, ColumnTracker may increment its
+     * counter for even that KV which may be discarded later on by Filter. This
+     * would lead to incorrect results in certain cases.
+     */
+    if (filter != null) {
+      ReturnCode filterResponse = filter.filterKeyValue(kv);
+      if (filterResponse == ReturnCode.SKIP) {
+        return MatchCode.SKIP;
+      } else if (filterResponse == ReturnCode.NEXT_COL) {
+        return getNextRowOrNextColumn(bytes, offset, qualLength);
+      } else if (filterResponse == ReturnCode.NEXT_ROW) {
+        stickyNextRow = true;
+        return MatchCode.SEEK_NEXT_ROW;
+      } else if (filterResponse == ReturnCode.SEEK_NEXT_USING_HINT) {
+        return MatchCode.SEEK_NEXT_USING_HINT;
+      }
+    }
 
+    MatchCode colChecker = columns.checkColumn(bytes, offset, qualLength);
     // if SKIP -> SEEK_NEXT_COL
     // if (NEXT,DONE) -> SEEK_NEXT_ROW
     // if (INCLUDE) -> INCLUDE
@@ -161,37 +210,127 @@ public class ScanQueryMatcher extends Qu
       return MatchCode.SEEK_NEXT_ROW;
     }
 
-    // else INCLUDE
-    // if (colChecker == MatchCode.INCLUDE)
-    // give the filter a chance to run.
-    if (filter == null)
-      return MatchCode.INCLUDE;
-
-    ReturnCode filterResponse = filter.filterKeyValue(kv);
-    if (filterResponse == ReturnCode.INCLUDE)
-      return MatchCode.INCLUDE;
+    return MatchCode.INCLUDE;
 
-    if (filterResponse == ReturnCode.SKIP)
-      return MatchCode.SKIP;
+  }
 
-    // else if (filterResponse == ReturnCode.NEXT_ROW)
-    stickyNextRow = true;
-    return MatchCode.SEEK_NEXT_ROW;
+  public MatchCode getNextRowOrNextColumn(byte[] bytes, int offset,
+      int qualLength) {
+    if (columns instanceof ExplicitColumnTracker) {
+      //We only come here when we know that columns is an instance of
+      //ExplicitColumnTracker so we should never have a cast exception
+      ((ExplicitColumnTracker)columns).doneWithColumn(bytes, offset,
+          qualLength);
+      if (columns.getColumnHint() == null) {
+        return MatchCode.SEEK_NEXT_ROW;
+      } else {
+        return MatchCode.SEEK_NEXT_COL;
+      }
+    } else {
+      return MatchCode.SEEK_NEXT_COL;
+    }
+  }
+
+  public boolean moreRowsMayExistAfter(KeyValue kv) {
+    if (!Bytes.equals(stopRow , HConstants.EMPTY_END_ROW) &&
+        rowComparator.compareRows(kv.getBuffer(),kv.getRowOffset(),
+            kv.getRowLength(), stopRow, 0, stopRow.length) >= 0) {
+      return false;
+    } else {
+      return true;
+    }
   }
 
   /**
    * Set current row
    * @param row
    */
-  @Override
   public void setRow(byte [] row) {
     this.row = row;
     reset();
   }
 
-  @Override
   public void reset() {
-    super.reset();
+    this.deletes.reset();
+    this.columns.reset();
+
     stickyNextRow = false;
   }
+
+  // should be in KeyValue.
+  protected boolean isDelete(byte type) {
+    return (type != KeyValue.Type.Put.getCode());
+  }
+
+  protected boolean isExpired(long timestamp) {
+    return (timestamp < oldestStamp);
+  }
+
+  /**
+   *
+   * @return the start key
+   */
+  public KeyValue getStartKey() {
+    return this.startKey;
+  }
+
+  public KeyValue getNextKeyHint(KeyValue kv) {
+    if (filter == null) {
+      return null;
+    } else {
+      return filter.getNextKeyHint(kv);
+    }
+  }
+
+  /**
+   * {@link #match} return codes.  These instruct the scanner moving through
+   * memstores and StoreFiles what to do with the current KeyValue.
+   * <p>
+   * Additionally, this contains "early-out" language to tell the scanner to
+   * move on to the next File (memstore or Storefile), or to return immediately.
+   */
+  public static enum MatchCode {
+    /**
+     * Include KeyValue in the returned result
+     */
+    INCLUDE,
+
+    /**
+     * Do not include KeyValue in the returned result
+     */
+    SKIP,
+
+    /**
+     * Do not include, jump to next StoreFile or memstore (in time order)
+     */
+    NEXT,
+
+    /**
+     * Do not include, return current result
+     */
+    DONE,
+
+    /**
+     * These codes are used by the ScanQueryMatcher
+     */
+
+    /**
+     * Done with the row, seek there.
+     */
+    SEEK_NEXT_ROW,
+    /**
+     * Done with column, seek to next.
+     */
+    SEEK_NEXT_COL,
+
+    /**
+     * Done with scan, thanks to the row filter.
+     */
+    DONE_SCAN,
+
+    /*
+     * Seek to next key which is given as hint.
+     */
+    SEEK_NEXT_USING_HINT,
+  }
 }
\ No newline at end of file

Modified: hbase/branches/0.90_master_rewrite/src/main/java/org/apache/hadoop/hbase/regionserver/ScanWildcardColumnTracker.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.90_master_rewrite/src/main/java/org/apache/hadoop/hbase/regionserver/ScanWildcardColumnTracker.java?rev=990018&r1=990017&r2=990018&view=diff
==============================================================================
--- hbase/branches/0.90_master_rewrite/src/main/java/org/apache/hadoop/hbase/regionserver/ScanWildcardColumnTracker.java (original)
+++ hbase/branches/0.90_master_rewrite/src/main/java/org/apache/hadoop/hbase/regionserver/ScanWildcardColumnTracker.java Fri Aug 27 05:01:02 2010
@@ -22,7 +22,7 @@ package org.apache.hadoop.hbase.regionse
 
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
-import org.apache.hadoop.hbase.regionserver.QueryMatcher.MatchCode;
+import org.apache.hadoop.hbase.regionserver.ScanQueryMatcher.MatchCode;
 import org.apache.hadoop.hbase.util.Bytes;
 
 /**
@@ -65,15 +65,15 @@ public class ScanWildcardColumnTracker i
       currentCount = 0;
 
       if (++currentCount > maxVersions)
-        return MatchCode.SKIP;
-      return MatchCode.INCLUDE;
+        return ScanQueryMatcher.MatchCode.SKIP;
+      return ScanQueryMatcher.MatchCode.INCLUDE;
     }
     int cmp = Bytes.compareTo(bytes, offset, length,
         columnBuffer, columnOffset, columnLength);
     if (cmp == 0) {
       if (++currentCount > maxVersions)
-        return MatchCode.SKIP; // skip to next col
-      return MatchCode.INCLUDE;
+        return ScanQueryMatcher.MatchCode.SKIP; // skip to next col
+      return ScanQueryMatcher.MatchCode.INCLUDE;
     }
 
     // new col > old col
@@ -84,8 +84,8 @@ public class ScanWildcardColumnTracker i
       columnLength = length;
       currentCount = 0;
       if (++currentCount > maxVersions)
-        return MatchCode.SKIP;
-      return MatchCode.INCLUDE;
+        return ScanQueryMatcher.MatchCode.SKIP;
+      return ScanQueryMatcher.MatchCode.INCLUDE;
     }
 
     // new col < oldcol
@@ -102,8 +102,8 @@ public class ScanWildcardColumnTracker i
     columnLength = length;
     currentCount = 0;
     if (++currentCount > maxVersions)
-      return MatchCode.SKIP;
-    return MatchCode.INCLUDE;
+      return ScanQueryMatcher.MatchCode.SKIP;
+    return ScanQueryMatcher.MatchCode.INCLUDE;
   }
 
   @Override

Added: hbase/branches/0.90_master_rewrite/src/main/java/org/apache/hadoop/hbase/regionserver/SplitTransaction.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.90_master_rewrite/src/main/java/org/apache/hadoop/hbase/regionserver/SplitTransaction.java?rev=990018&view=auto
==============================================================================
--- hbase/branches/0.90_master_rewrite/src/main/java/org/apache/hadoop/hbase/regionserver/SplitTransaction.java (added)
+++ hbase/branches/0.90_master_rewrite/src/main/java/org/apache/hadoop/hbase/regionserver/SplitTransaction.java Fri Aug 27 05:01:02 2010
@@ -0,0 +1,520 @@
+/**
+ * Copyright 2010 The Apache Software Foundation
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.regionserver;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.ListIterator;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hbase.HConstants;
+import org.apache.hadoop.hbase.HRegionInfo;
+import org.apache.hadoop.hbase.catalog.CatalogTracker;
+import org.apache.hadoop.hbase.catalog.MetaEditor;
+import org.apache.hadoop.hbase.client.HTable;
+import org.apache.hadoop.hbase.io.Reference.Range;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
+import org.apache.hadoop.hbase.util.FSUtils;
+import org.apache.hadoop.hbase.util.PairOfSameType;
+
+/**
+ * Executes region split as a "transaction".  Call {@link #prepare()} to setup
+ * the transaction, {@link #execute(OnlineRegions)} to run the transaction and
+ * {@link #rollback(OnlineRegions)} to cleanup if execute fails.
+ *
+ * <p>Here is an example of how you would use this class:
+ * <pre>
+ *  SplitTransaction st = new SplitTransaction(this.conf, parent, midKey)
+ *  if (!st.prepare()) return;
+ *  try {
+ *    st.execute(myOnlineRegions);
+ *  } catch (IOException ioe) {
+ *    try {
+ *      st.rollback(myOnlineRegions);
+ *      return;
+ *    } catch (RuntimeException e) {
+ *      myAbortable.abort("Failed split, abort");
+ *    }
+ *  }
+ * </Pre>
+ */
+class SplitTransaction {
+  private static final Log LOG = LogFactory.getLog(SplitTransaction.class);
+  private static final String SPLITDIR = "splits";
+
+  /*
+   * Region to split
+   */
+  private final HRegion parent;
+  private HRegionInfo hri_a;
+  private HRegionInfo hri_b;
+  private Path splitdir;
+
+  /*
+   * Row to split around
+   */
+  private final byte [] splitrow;
+
+  /**
+   * Types to add to the transaction journal
+   */
+  enum JournalEntry {
+    /**
+     * We created the temporary split data directory.
+     */
+    CREATE_SPLIT_DIR,
+    /**
+     * Closed the parent region.
+     */
+    CLOSED_PARENT_REGION,
+    /**
+     * The parent has been taken out of the server's online regions list.
+     */
+    OFFLINED_PARENT,
+    /**
+     * Started in on creation of the first daughter region.
+     */
+    STARTED_REGION_A_CREATION,
+    /**
+     * Started in on the creation of the second daughter region.
+     */
+    STARTED_REGION_B_CREATION
+  }
+
+  /*
+   * Journal of how far the split transaction has progressed.
+   */
+  private final List<JournalEntry> journal = new ArrayList<JournalEntry>();
+
+  /**
+   * Constructor
+   * @param c Configuration to use running split
+   * @param r Region to split
+   * @param splitrow Row to split around
+   */
+  SplitTransaction(final HRegion r, final byte [] splitrow) {
+    this.parent = r;
+    this.splitrow = splitrow;
+    this.splitdir = getSplitDir(this.parent);
+  }
+
+  /**
+   * Does checks on split inputs.
+   * @return <code>true</code> if the region is splittable else
+   * <code>false</code> if it is not (e.g. its already closed, etc.). If we
+   * return <code>true</code>, we'll have taken out the parent's
+   * <code>splitsAndClosesLock</code> and only way to unlock is successful
+   * {@link #execute(OnlineRegions)} or {@link #rollback(OnlineRegions)}
+   */
+  public boolean prepare() {
+    boolean prepared = false;
+    this.parent.lock.writeLock().lock();
+    try {
+      if (this.parent.isClosed() || this.parent.isClosing()) return prepared;
+      HRegionInfo hri = this.parent.getRegionInfo();
+      // Check splitrow.
+      byte [] startKey = hri.getStartKey();
+      byte [] endKey = hri.getEndKey();
+      if (Bytes.equals(startKey, splitrow) ||
+          !this.parent.getRegionInfo().containsRow(splitrow)) {
+        LOG.info("Split row is not inside region key range or is equal to " +
+          "startkey: " + Bytes.toString(this.splitrow));
+        return prepared;
+      }
+      long rid = getDaughterRegionIdTimestamp(hri);
+      this.hri_a = new HRegionInfo(hri.getTableDesc(), startKey, this.splitrow,
+        false, rid);
+      this.hri_b = new HRegionInfo(hri.getTableDesc(), this.splitrow, endKey,
+        false, rid);
+      prepared = true;
+    } finally {
+      if (!prepared) this.parent.lock.writeLock().unlock();
+    }
+    return prepared;
+  }
+
+  /**
+   * Calculate daughter regionid to use.
+   * @param hri Parent {@link HRegionInfo}
+   * @return Daughter region id (timestamp) to use.
+   */
+  private static long getDaughterRegionIdTimestamp(final HRegionInfo hri) {
+    long rid = EnvironmentEdgeManager.currentTimeMillis();
+    // Regionid is timestamp.  Can't be less than that of parent else will insert
+    // at wrong location in .META. (See HBASE-710).
+    if (rid < hri.getRegionId()) {
+      LOG.warn("Clock skew; parent regions id is " + hri.getRegionId() +
+        " but current time here is " + rid);
+      rid = hri.getRegionId() + 1;
+    }
+    return rid;
+  }
+
+  /**
+   * Run the transaction.
+   * @param or Object that can online/offline parent region. Can be null
+   * @param ct CatalogTracker instance.
+   * @throws IOException If thrown, transaction failed. Call {@link #rollback(OnlineRegions)}
+   * @return Regions created
+   * @see #rollback(OnlineRegions)
+   */
+  public PairOfSameType<HRegion> execute(final OnlineRegions or,
+      final CatalogTracker ct)
+  throws IOException {
+    return execute(or, ct, or != null);
+  }
+
+  /**
+   * Run the transaction.
+   * @param or Object that can online/offline parent region.  Can be null (Tests
+   * will pass null).
+   * @param ct CatalogTracker instance.  Can be null (for testing)
+   * @param updateMeta If <code>true</code>, update meta (set to false when testing).
+   * @throws IOException If thrown, transaction failed. Call {@link #rollback(OnlineRegions)}
+   * @return Regions created
+   * @see #rollback(OnlineRegions)
+   */
+  PairOfSameType<HRegion> execute(final OnlineRegions or, final CatalogTracker ct,
+      final boolean updateMeta)
+  throws IOException {
+    LOG.info("Starting split of region " + this.parent);
+    if (!this.parent.lock.writeLock().isHeldByCurrentThread()) {
+      throw new SplitAndCloseWriteLockNotHeld();
+    }
+
+    // We'll need one of these later but get it now because if we fail there
+    // is nothing to undo.
+    HTable t = null;
+    if (updateMeta) t = getTable(this.parent.getConf());
+
+    createSplitDir(this.parent.getFilesystem(), this.splitdir);
+    this.journal.add(JournalEntry.CREATE_SPLIT_DIR);
+
+    List<StoreFile> hstoreFilesToSplit = this.parent.close(false);
+    this.journal.add(JournalEntry.CLOSED_PARENT_REGION);
+
+    if (or != null) or.removeFromOnlineRegions(this.parent.getRegionInfo().getEncodedName());
+    this.journal.add(JournalEntry.OFFLINED_PARENT);
+
+    splitStoreFiles(this.splitdir, hstoreFilesToSplit);
+    // splitStoreFiles creates daughter region dirs under the parent splits dir
+    // Nothing to unroll here if failure -- clean up of CREATE_SPLIT_DIR will
+    // clean this up.
+
+    // Log to the journal that we are creating region A, the first daughter
+    // region.  We could fail halfway through.  If we do, we could have left
+    // stuff in fs that needs cleanup -- a storefile or two.  Thats why we
+    // add entry to journal BEFORE rather than AFTER the change.
+    this.journal.add(JournalEntry.STARTED_REGION_A_CREATION);
+    HRegion a = createDaughterRegion(this.hri_a);
+
+    // Ditto
+    this.journal.add(JournalEntry.STARTED_REGION_B_CREATION);
+    HRegion b = createDaughterRegion(this.hri_b);
+
+    // Edit parent in meta
+    if (ct != null) {
+      MetaEditor.offlineParentInMeta(ct, this.parent.getRegionInfo(),
+        a.getRegionInfo(), b.getRegionInfo());
+    }
+
+    // The is the point of no return.  We are committed to the split now.  Up to
+    // a failure editing parent in meta or a crash of the hosting regionserver,
+    // we could rollback (or, if crash, we could cleanup on redeploy) but now
+    // meta has been changed, we can only go forward.  If the below last steps
+    // do not complete, repair has to be done by another agent.  For example,
+    // basescanner, at least up till master rewrite, would add daughter rows if
+    // missing from meta.  It could do this because the parent edit includes the
+    // daughter specs.  In Bigtable paper, they have another mechanism where
+    // some feedback to the master somehow flags it that split is incomplete and
+    // needs fixup.  Whatever the mechanism, its a TODO that we have some fixup.
+    
+    // I looked at writing the put of the parent edit above out to the WAL log
+    // before changing meta with the notion that should we fail, then on replay
+    // the offlining of the parent and addition of daughters up into meta could
+    // be reinserted.  The edits would have to be 'special' and given how our
+    // splits work, splitting by region, I think the replay would have to happen
+    // inside in the split code -- as soon as it saw one of these special edits,
+    // rather than write the edit out a file for the .META. region to replay or
+    // somehow, write it out to this regions edits file for it to handle on
+    // redeploy -- this'd be whacky, we'd be telling meta about a split during
+    // the deploy of the parent -- instead we'd have to play the edit inside
+    // in the split code somehow; this would involve a stop-the-splitting till
+    // meta had been edited which might hold up splitting a good while.
+
+    // Finish up the meta edits.  If these fail, another agent needs to do fixup
+    HRegionInfo hri = a.getRegionInfo();
+    try {
+      if (ct != null) MetaEditor.addRegionToMeta(ct, hri);
+      hri = b.getRegionInfo();
+      if (ct != null) MetaEditor.addRegionToMeta(ct, hri);
+    } catch (IOException e) {
+      // Don't let this out or we'll run rollback.
+      LOG.warn("Failed adding daughter " + hri.toString());
+    }
+    // This should not fail because the HTable instance we are using is not
+    // running a buffer -- its immediately flushing its puts.
+    if (t != null) t.close();
+
+    // Unlock if successful split.
+    this.parent.lock.writeLock().unlock();
+
+    // Leaving here, the splitdir with its dross will be in place but since the
+    // split was successful, just leave it; it'll be cleaned when parent is
+    // deleted and cleaned up.
+    return new PairOfSameType<HRegion>(a, b);
+  }
+
+  private static Path getSplitDir(final HRegion r) {
+    return new Path(r.getRegionDir(), SPLITDIR);
+  }
+
+  /**
+   * @param fs Filesystem to use
+   * @param splitdir Directory to store temporary split data in
+   * @throws IOException If <code>splitdir</code> already exists or we fail
+   * to create it.
+   * @see #cleanupSplitDir(FileSystem, Path)
+   */
+  private static void createSplitDir(final FileSystem fs, final Path splitdir)
+  throws IOException {
+    if (fs.exists(splitdir)) throw new IOException("Splitdir already exits? " + splitdir);
+    if (!fs.mkdirs(splitdir)) throw new IOException("Failed create of " + splitdir);
+  }
+
+  private static void cleanupSplitDir(final FileSystem fs, final Path splitdir)
+  throws IOException {
+    // Splitdir may have been cleaned up by reopen of the parent dir.
+    deleteDir(fs, splitdir, false);
+  }
+
+  /**
+   * @param fs Filesystem to use
+   * @param dir Directory to delete
+   * @param mustPreExist If true, we'll throw exception if <code>dir</code>
+   * does not preexist, else we'll just pass.
+   * @throws IOException Thrown if we fail to delete passed <code>dir</code>
+   */
+  private static void deleteDir(final FileSystem fs, final Path dir,
+      final boolean mustPreExist)
+  throws IOException {
+    if (!fs.exists(dir)) {
+      if (mustPreExist) throw new IOException(dir.toString() + " does not exist!");
+    } else if (!fs.delete(dir, true)) {
+      throw new IOException("Failed delete of " + dir);
+    }
+  }
+
+  private void splitStoreFiles(final Path splitdir,
+    final List<StoreFile> hstoreFilesToSplit)
+  throws IOException {
+    if (hstoreFilesToSplit == null) {
+      // Could be null because close didn't succeed -- for now consider it fatal
+      throw new IOException("Close returned empty list of StoreFiles");
+    }
+
+     // Split each store file.
+     for (StoreFile sf: hstoreFilesToSplit) {
+       splitStoreFile(sf, splitdir);
+     }
+  }
+
+  private void splitStoreFile(final StoreFile sf, final Path splitdir)
+  throws IOException {
+    FileSystem fs = this.parent.getFilesystem();
+    byte [] family = sf.getFamily();
+    String encoded = this.hri_a.getEncodedName();
+    Path storedir = Store.getStoreHomedir(splitdir, encoded, family);
+    StoreFile.split(fs, storedir, sf, this.splitrow, Range.bottom);
+    encoded = this.hri_b.getEncodedName();
+    storedir = Store.getStoreHomedir(splitdir, encoded, family);
+    StoreFile.split(fs, storedir, sf, this.splitrow, Range.top);
+  }
+
+  /**
+   * @param hri
+   * @return Created daughter HRegion.
+   * @throws IOException
+   * @see #cleanupDaughterRegion(FileSystem, Path, HRegionInfo)
+   */
+  HRegion createDaughterRegion(final HRegionInfo hri)
+  throws IOException {
+    // Package private so unit tests have access.
+    FileSystem fs = this.parent.getFilesystem();
+    Path regionDir = getSplitDirForDaughter(this.parent.getFilesystem(),
+      this.splitdir, hri);
+    HRegion r = HRegion.newHRegion(this.parent.getTableDir(),
+      this.parent.getLog(), fs, this.parent.getConf(),
+      hri, null);
+    HRegion.moveInitialFilesIntoPlace(fs, regionDir, r.getRegionDir());
+    return r;
+  }
+
+  private static void cleanupDaughterRegion(final FileSystem fs,
+    final Path tabledir, final String encodedName)
+  throws IOException {
+    Path regiondir = HRegion.getRegionDir(tabledir, encodedName);
+    // Dir may not preexist.
+    deleteDir(fs, regiondir, false);
+  }
+
+  /*
+   * Get the daughter directories in the splits dir.  The splits dir is under
+   * the parent regions' directory.
+   * @param fs
+   * @param splitdir
+   * @param hri
+   * @return Path to daughter split dir.
+   * @throws IOException
+   */
+  private static Path getSplitDirForDaughter(final FileSystem fs,
+      final Path splitdir, final HRegionInfo hri)
+  throws IOException {
+    return new Path(splitdir, hri.getEncodedName());
+  }
+
+  /*
+   * @param r Parent region we want to edit.
+   * @return An HTable instance against the meta table that holds passed
+   * <code>r</code>; it has autoFlush enabled so we immediately send puts (No
+   * buffering enabled).
+   * @throws IOException
+   */
+  private HTable getTable(final Configuration conf) throws IOException {
+    // When a region is split, the META table needs to updated if we're
+    // splitting a 'normal' region, and the ROOT table needs to be
+    // updated if we are splitting a META region.
+    HTable t = null;
+    if (this.parent.getRegionInfo().isMetaTable()) {
+      t = new HTable(conf, HConstants.ROOT_TABLE_NAME);
+    } else {
+      t = new HTable(conf, HConstants.META_TABLE_NAME);
+    }
+    // Flush puts as we send them -- no buffering.
+    t.setAutoFlush(true);
+    return t;
+  }
+
+  /**
+   * @param or Object that can online/offline parent region.  Can be passed null
+   * by unit tests.
+   * @return The region we were splitting
+   * @throws IOException If thrown, rollback failed.  Take drastic action.
+   */
+  public void rollback(final OnlineRegions or) throws IOException {
+    if (!this.parent.lock.writeLock().isHeldByCurrentThread()) {
+      throw new SplitAndCloseWriteLockNotHeld();
+    }
+    FileSystem fs = this.parent.getFilesystem();
+    ListIterator<JournalEntry> iterator =
+      this.journal.listIterator(this.journal.size());
+    while (iterator.hasPrevious()) {
+      JournalEntry je = iterator.previous();
+      switch(je) {
+      case CREATE_SPLIT_DIR:
+        cleanupSplitDir(fs, this.splitdir);
+        break;
+
+      case CLOSED_PARENT_REGION:
+        // So, this returns a seqid but if we just closed and then reopened, we
+        // should be ok. On close, we flushed using sequenceid obtained from
+        // hosting regionserver so no need to propagate the sequenceid returned
+        // out of initialize below up into regionserver as we normally do.
+        // TODO: Verify.
+        this.parent.initialize();
+        break;
+
+      case STARTED_REGION_A_CREATION:
+        cleanupDaughterRegion(fs, this.parent.getTableDir(),
+          this.hri_a.getEncodedName());
+        break;
+
+      case STARTED_REGION_B_CREATION:
+        cleanupDaughterRegion(fs, this.parent.getTableDir(),
+          this.hri_b.getEncodedName());
+        break;
+
+      case OFFLINED_PARENT:
+        if (or != null) or.addToOnlineRegions(this.parent);
+        break;
+
+      default:
+        throw new RuntimeException("Unhandled journal entry: " + je);
+      }
+    }
+    if (this.parent.lock.writeLock().isHeldByCurrentThread()) {
+      this.parent.lock.writeLock().unlock();
+    }
+  }
+
+  /**
+   * Thrown if lock not held.
+   */
+  @SuppressWarnings("serial")
+  public class SplitAndCloseWriteLockNotHeld extends IOException {}
+
+  HRegionInfo getFirstDaughter() {
+    return hri_a;
+  }
+
+  HRegionInfo getSecondDaughter() {
+    return hri_b;
+  }
+
+  // For unit testing.
+  Path getSplitDir() {
+    return this.splitdir;
+  }
+
+  /**
+   * Clean up any split detritus that may have been left around from previous
+   * split attempts.
+   * Call this method on initial region deploy.  Cleans up any mess
+   * left by previous deploys of passed <code>r</code> region.
+   * @param r
+   * @throws IOException 
+   */
+  static void cleanupAnySplitDetritus(final HRegion r) throws IOException {
+    Path splitdir = getSplitDir(r);
+    FileSystem fs = r.getFilesystem();
+    if (!fs.exists(splitdir)) return;
+    // Look at the splitdir.  It could have the encoded names of the daughter
+    // regions we tried to make.  See if the daughter regions actually got made
+    // out under the tabledir.  If here under splitdir still, then the split did
+    // not complete.  Try and do cleanup.  This code WILL NOT catch the case
+    // where we successfully created daughter a but regionserver crashed during
+    // the creation of region b.  In this case, there'll be an orphan daughter
+    // dir in the filesystem.  TOOD: Fix.
+    FileStatus [] daughters = fs.listStatus(splitdir, new FSUtils.DirFilter(fs));
+    for (int i = 0; i < daughters.length; i++) {
+      cleanupDaughterRegion(fs, r.getTableDir(),
+        daughters[i].getPath().getName());
+    }
+    cleanupSplitDir(r.getFilesystem(), splitdir);
+    LOG.info("Cleaned up old failed split transaction detritus: " + splitdir);
+  }
+}

Modified: hbase/branches/0.90_master_rewrite/src/main/java/org/apache/hadoop/hbase/regionserver/Store.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.90_master_rewrite/src/main/java/org/apache/hadoop/hbase/regionserver/Store.java?rev=990018&r1=990017&r2=990018&view=diff
==============================================================================
--- hbase/branches/0.90_master_rewrite/src/main/java/org/apache/hadoop/hbase/regionserver/Store.java (original)
+++ hbase/branches/0.90_master_rewrite/src/main/java/org/apache/hadoop/hbase/regionserver/Store.java Fri Aug 27 05:01:02 2010
@@ -25,9 +25,7 @@ import java.util.Collection;
 import java.util.Collections;
 import java.util.List;
 import java.util.NavigableSet;
-import java.util.Set;
 import java.util.SortedSet;
-import java.util.TreeSet;
 import java.util.concurrent.CopyOnWriteArraySet;
 import java.util.concurrent.locks.ReentrantReadWriteLock;
 
@@ -42,9 +40,7 @@ import org.apache.hadoop.hbase.HColumnDe
 import org.apache.hadoop.hbase.HConstants;
 import org.apache.hadoop.hbase.HRegionInfo;
 import org.apache.hadoop.hbase.KeyValue;
-import org.apache.hadoop.hbase.KeyValue.KeyComparator;
 import org.apache.hadoop.hbase.RemoteExceptionHandler;
-import org.apache.hadoop.hbase.client.Get;
 import org.apache.hadoop.hbase.client.Scan;
 import org.apache.hadoop.hbase.io.HeapSize;
 import org.apache.hadoop.hbase.io.hfile.Compression;
@@ -52,6 +48,7 @@ import org.apache.hadoop.hbase.io.hfile.
 import org.apache.hadoop.hbase.io.hfile.HFileScanner;
 import org.apache.hadoop.hbase.util.Bytes;
 import org.apache.hadoop.hbase.util.ClassSize;
+import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
 import org.apache.hadoop.util.StringUtils;
 
 import com.google.common.collect.ImmutableList;
@@ -190,7 +187,7 @@ public class Store implements HeapSize {
     this.storefiles = ImmutableList.copyOf(loadStoreFiles());
   }
 
-  HColumnDescriptor getFamily() {
+  public HColumnDescriptor getFamily() {
     return this.family;
   }
 
@@ -212,7 +209,7 @@ public class Store implements HeapSize {
     return new Path(tabledir, new Path(encodedName,
       new Path(Bytes.toString(family))));
   }
-  
+
   /**
    * Return the directory in which this store stores its
    * StoreFiles
@@ -417,15 +414,17 @@ public class Store implements HeapSize {
    * Write out current snapshot.  Presumes {@link #snapshot()} has been called
    * previously.
    * @param logCacheFlushId flush sequence number
+   * @param snapshot
    * @return true if a compaction is needed
    * @throws IOException
    */
   private StoreFile flushCache(final long logCacheFlushId,
-                               SortedSet<KeyValue> snapshot) throws IOException {
+      SortedSet<KeyValue> snapshot,
+      TimeRangeTracker snapshotTimeRangeTracker) throws IOException {
     // If an exception happens flushing, we let it out without clearing
     // the memstore snapshot.  The old snapshot will be returned when we say
     // 'snapshot', the next time flush comes around.
-    return internalFlushCache(snapshot, logCacheFlushId);
+    return internalFlushCache(snapshot, logCacheFlushId, snapshotTimeRangeTracker);
   }
 
   /*
@@ -435,7 +434,8 @@ public class Store implements HeapSize {
    * @throws IOException
    */
   private StoreFile internalFlushCache(final SortedSet<KeyValue> set,
-                                       final long logCacheFlushId)
+      final long logCacheFlushId,
+      TimeRangeTracker snapshotTimeRangeTracker)
       throws IOException {
     StoreFile.Writer writer = null;
     long flushed = 0;
@@ -450,6 +450,7 @@ public class Store implements HeapSize {
     synchronized (flushLock) {
       // A. Write the map out to the disk
       writer = createWriterInTmp(set.size());
+      writer.setTimeRangeTracker(snapshotTimeRangeTracker);
       int entries = 0;
       try {
         for (KeyValue kv: set) {
@@ -466,12 +467,12 @@ public class Store implements HeapSize {
         writer.close();
       }
     }
-    
+
     // Write-out finished successfully, move into the right spot
     Path dstPath = StoreFile.getUniqueFile(fs, homedir);
     LOG.info("Renaming flushed file at " + writer.getPath() + " to " + dstPath);
     fs.rename(writer.getPath(), dstPath);
-    
+
     StoreFile sf = new StoreFile(this.fs, dstPath, blockcache,
       this.conf, this.family.getBloomFilterType(), this.inMemory);
     StoreFile.Reader r = sf.createReader();
@@ -664,7 +665,7 @@ public class Store implements HeapSize {
       LOG.info("Started compaction of " + filesToCompact.size() + " file(s) in " +
           this.storeNameStr + " of " + this.region.getRegionInfo().getRegionNameAsString() +
         (references? ", hasReferences=true,": " ") + " into " +
-          region.getTmpDir() + ", seqid=" + maxId);
+          region.getTmpDir() + ", sequenceid=" + maxId);
       StoreFile.Writer writer = compact(filesToCompact, majorcompaction, maxId);
       // Move the compaction into place.
       StoreFile sf = completeCompaction(filesToCompact, writer);
@@ -954,13 +955,6 @@ public class Store implements HeapSize {
     return wantedVersions > maxVersions ? maxVersions: wantedVersions;
   }
 
-  static void expiredOrDeleted(final Set<KeyValue> set, final KeyValue kv) {
-    boolean b = set.remove(kv);
-    if (LOG.isDebugEnabled()) {
-      LOG.debug(kv.toString() + " expired: " + b);
-    }
-  }
-
   static boolean isExpired(final KeyValue key, final long oldestTimestamp) {
     return key.getTimestamp() < oldestTimestamp;
   }
@@ -1200,7 +1194,7 @@ public class Store implements HeapSize {
    * Return a scanner for both the memstore and the HStore files
    * @throws IOException
    */
-  protected KeyValueScanner getScanner(Scan scan,
+  public KeyValueScanner getScanner(Scan scan,
       final NavigableSet<byte []> targetCols) throws IOException {
     lock.readLock().lock();
     try {
@@ -1285,85 +1279,6 @@ public class Store implements HeapSize {
   }
 
   /**
-   * Convenience method that implements the old MapFile.getClosest on top of
-   * HFile Scanners.  getClosest used seek to the asked-for key or just after
-   * (HFile seeks to the key or just before).
-   * @param s Scanner to use
-   * @param kv Key to find.
-   * @return True if we were able to seek the scanner to <code>b</code> or to
-   * the key just after.
-   * @throws IOException
-   */
-  static boolean getClosest(final HFileScanner s, final KeyValue kv)
-      throws IOException {
-    // Pass offsets to key content of a KeyValue; thats whats in the hfile index.
-    int result = s.seekTo(kv.getBuffer(), kv.getKeyOffset(), kv.getKeyLength());
-    if (result < 0) {
-      // Not in file.  Will the first key do?
-      if (!s.seekTo()) {
-        return false;
-      }
-    } else if (result > 0) {
-      // Less than what was asked for but maybe < because we're asking for
-      // r/c/HConstants.LATEST_TIMESTAMP -- what was returned was r/c-1/SOME_TS...
-      // A next will get us a r/c/SOME_TS.
-      if (!s.next()) {
-        return false;
-      }
-    }
-    return true;
-  }
-
-  /**
-   * Retrieve results from this store given the specified Get parameters.
-   * @param get Get operation
-   * @param columns List of columns to match, can be empty (not null)
-   * @param result List to add results to
-   * @throws IOException
-   */
-  public void get(Get get, NavigableSet<byte[]> columns, List<KeyValue> result)
-      throws IOException {
-    KeyComparator keyComparator = this.comparator.getRawComparator();
-
-    // Column matching and version enforcement
-    QueryMatcher matcher = new QueryMatcher(get, this.family.getName(), columns,
-      this.ttl, keyComparator, versionsToReturn(get.getMaxVersions()));
-    this.lock.readLock().lock();
-    try {
-      // Read from memstore
-      if(this.memstore.get(matcher, result)) {
-        // Received early-out from memstore
-        return;
-      }
-
-      // Check if we even have storefiles
-      if (this.storefiles.isEmpty()) {
-        return;
-      }
-
-      // Get storefiles for this store
-      List<HFileScanner> storefileScanners = new ArrayList<HFileScanner>();
-      for (StoreFile sf : Iterables.reverse(this.storefiles)) {
-        StoreFile.Reader r = sf.getReader();
-        if (r == null) {
-          LOG.warn("StoreFile " + sf + " has a null Reader");
-          continue;
-        }
-        // Get a scanner that caches the block and uses pread
-        storefileScanners.add(r.getScanner(true, true));
-      }
-
-      // StoreFileGetScan will handle reading this store's storefiles
-      StoreFileGetScan scanner = new StoreFileGetScan(storefileScanners, matcher);
-
-      // Run a GET scan and put results into the specified list
-      scanner.get(result);
-    } finally {
-      this.lock.readLock().unlock();
-    }
-  }
-
-  /**
    * Increments the value for the given row/family/qualifier.
    *
    * This function will always be seen as atomic by other readers
@@ -1380,49 +1295,18 @@ public class Store implements HeapSize {
   public long updateColumnValue(byte [] row, byte [] f,
                                 byte [] qualifier, long newValue)
       throws IOException {
-    List<KeyValue> result = new ArrayList<KeyValue>();
-    KeyComparator keyComparator = this.comparator.getRawComparator();
-
-    KeyValue kv = null;
-    // Setting up the QueryMatcher
-    Get get = new Get(row);
-    NavigableSet<byte[]> qualifiers =
-      new TreeSet<byte[]>(Bytes.BYTES_COMPARATOR);
-    qualifiers.add(qualifier);
-    QueryMatcher matcher = new QueryMatcher(get, f, qualifiers, this.ttl,
-      keyComparator, 1);
 
-    // lock memstore snapshot for this critical section:
     this.lock.readLock().lock();
-    memstore.readLockLock();
     try {
-      int memstoreCode = this.memstore.getWithCode(matcher, result);
+      long now = EnvironmentEdgeManager.currentTimeMillis();
+
+      return this.memstore.updateColumnValue(row,
+          f,
+          qualifier,
+          newValue,
+          now);
 
-      if (memstoreCode != 0) {
-        // was in memstore (or snapshot)
-        kv = result.get(0).clone();
-        byte [] buffer = kv.getBuffer();
-        int valueOffset = kv.getValueOffset();
-        Bytes.putBytes(buffer, valueOffset, Bytes.toBytes(newValue), 0,
-            Bytes.SIZEOF_LONG);
-        if (memstoreCode == 2) {
-          // from snapshot, assign new TS
-          long currTs = System.currentTimeMillis();
-          if (currTs == kv.getTimestamp()) {
-            currTs++; // unlikely but catastrophic
-          }
-          Bytes.putBytes(buffer, kv.getTimestampOffset(),
-              Bytes.toBytes(currTs), 0, Bytes.SIZEOF_LONG);
-        }
-      } else {
-        kv = new KeyValue(row, f, qualifier,
-            System.currentTimeMillis(),
-            Bytes.toBytes(newValue));
-      }
-      return add(kv);
-      // end lock
     } finally {
-      memstore.readLockUnlock();
       this.lock.readLock().unlock();
     }
   }
@@ -1436,6 +1320,7 @@ public class Store implements HeapSize {
     private long cacheFlushId;
     private SortedSet<KeyValue> snapshot;
     private StoreFile storeFile;
+    private TimeRangeTracker snapshotTimeRangeTracker;
 
     private StoreFlusherImpl(long cacheFlushId) {
       this.cacheFlushId = cacheFlushId;
@@ -1445,11 +1330,12 @@ public class Store implements HeapSize {
     public void prepare() {
       memstore.snapshot();
       this.snapshot = memstore.getSnapshot();
+      this.snapshotTimeRangeTracker = memstore.getSnapshotTimeRangeTracker();
     }
 
     @Override
     public void flushCache() throws IOException {
-      storeFile = Store.this.flushCache(cacheFlushId, snapshot);
+      storeFile = Store.this.flushCache(cacheFlushId, snapshot, snapshotTimeRangeTracker);
     }
 
     @Override

Modified: hbase/branches/0.90_master_rewrite/src/main/java/org/apache/hadoop/hbase/regionserver/StoreFile.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.90_master_rewrite/src/main/java/org/apache/hadoop/hbase/regionserver/StoreFile.java?rev=990018&r1=990017&r2=990018&view=diff
==============================================================================
--- hbase/branches/0.90_master_rewrite/src/main/java/org/apache/hadoop/hbase/regionserver/StoreFile.java (original)
+++ hbase/branches/0.90_master_rewrite/src/main/java/org/apache/hadoop/hbase/regionserver/StoreFile.java Fri Aug 27 05:01:02 2010
@@ -24,9 +24,9 @@ import org.apache.commons.logging.LogFac
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.hbase.HConstants;
 import org.apache.hadoop.hbase.KeyValue;
 import org.apache.hadoop.hbase.KeyValue.KVComparator;
+import org.apache.hadoop.hbase.client.Scan;
 import org.apache.hadoop.hbase.io.HalfStoreFileReader;
 import org.apache.hadoop.hbase.io.Reference;
 import org.apache.hadoop.hbase.io.hfile.BlockCache;
@@ -38,7 +38,9 @@ import org.apache.hadoop.hbase.util.Bloo
 import org.apache.hadoop.hbase.util.ByteBloomFilter;
 import org.apache.hadoop.hbase.util.Bytes;
 import org.apache.hadoop.hbase.util.Hash;
+import org.apache.hadoop.hbase.util.Writables;
 import org.apache.hadoop.io.RawComparator;
+import org.apache.hadoop.io.WritableUtils;
 import org.apache.hadoop.util.StringUtils;
 
 import com.google.common.base.Function;
@@ -105,6 +107,8 @@ public class StoreFile {
   public static final byte [] MAJOR_COMPACTION_KEY = Bytes.toBytes("MAJOR_COMPACTION_KEY");
   /** Bloom filter Type in FileInfo */
   static final byte[] BLOOM_FILTER_TYPE_KEY = Bytes.toBytes("BLOOM_FILTER_TYPE");
+  /** Key for Timerange information in metadata*/
+  static final byte[] TIMERANGE_KEY = Bytes.toBytes("TIMERANGE");
 
   /** Meta data block name for bloom filter meta-info (ie: bloom params/specs) */
   static final String BLOOM_FILTER_META_KEY = "BLOOM_FILTER_META";
@@ -411,6 +415,17 @@ public class StoreFile {
       this.reader.loadBloomfilter();
     }
 
+    try {
+      byte [] timerangeBytes = metadataMap.get(TIMERANGE_KEY);
+      if (timerangeBytes != null) {
+        this.reader.timeRangeTracker = new TimeRangeTracker();
+        Writables.copyWritable(timerangeBytes, this.reader.timeRangeTracker);
+      }
+    } catch (IllegalArgumentException e) {
+      LOG.error("Error reading timestamp range data from meta -- " +
+          "proceeding without", e);
+      this.reader.timeRangeTracker = null;
+    }
     return this.reader;
   }
 
@@ -647,6 +662,14 @@ public class StoreFile {
     private KVComparator kvComparator;
     private KeyValue lastKv = null;
     private byte[] lastByteArray = null;
+    TimeRangeTracker timeRangeTracker = new TimeRangeTracker();
+    /* isTimeRangeTrackerSet keeps track if the timeRange has already been set
+     * When flushing a memstore, we set TimeRange and use this variable to
+     * indicate that it doesn't need to be calculated again while
+     * appending KeyValues.
+     * It is not set in cases of compactions when it is recalculated using only
+     * the appended KeyValues*/
+    boolean isTimeRangeTrackerSet = false;
 
     protected HFile.Writer writer;
     /**
@@ -671,6 +694,13 @@ public class StoreFile {
 
       if (bloomType != BloomType.NONE && conf != null) {
         float err = conf.getFloat(IO_STOREFILE_BLOOM_ERROR_RATE, (float)0.01);
+        // Since in row+col blooms we have 2 calls to shouldSeek() instead of 1
+        // and the false positives are adding up, we should keep the error rate
+        // twice as low in order to maintain the number of false positives as
+        // desired by the user
+        if (bloomType == BloomType.ROWCOL) {
+          err /= 2;
+        }
         int maxFold = conf.getInt(IO_STOREFILE_BLOOM_MAX_FOLD, 7);
 
         this.bloomFilter = new ByteBloomFilter(maxKeys, err,
@@ -693,7 +723,49 @@ public class StoreFile {
     public void appendMetadata(final long maxSequenceId, final boolean majorCompaction)
     throws IOException {
       writer.appendFileInfo(MAX_SEQ_ID_KEY, Bytes.toBytes(maxSequenceId));
-      writer.appendFileInfo(MAJOR_COMPACTION_KEY, Bytes.toBytes(majorCompaction));
+      writer.appendFileInfo(MAJOR_COMPACTION_KEY,
+          Bytes.toBytes(majorCompaction));
+      appendTimeRangeMetadata();
+    }
+
+    /**
+     * Add TimestampRange to Metadata
+     */
+    public void appendTimeRangeMetadata() throws IOException {
+      appendFileInfo(TIMERANGE_KEY,WritableUtils.toByteArray(timeRangeTracker));
+    }
+
+    /**
+     * Set TimeRangeTracker
+     * @param trt
+     */
+    public void setTimeRangeTracker(final TimeRangeTracker trt) {
+      this.timeRangeTracker = trt;
+      isTimeRangeTrackerSet = true;
+    }
+
+    /**
+     * If the timeRangeTracker is not set,
+     * update TimeRangeTracker to include the timestamp of this key
+     * @param kv
+     * @throws IOException
+     */
+    public void includeInTimeRangeTracker(final KeyValue kv) {
+      if (!isTimeRangeTrackerSet) {
+        timeRangeTracker.includeTimestamp(kv);
+      }
+    }
+
+    /**
+     * If the timeRangeTracker is not set,
+     * update TimeRangeTracker to include the timestamp of this key
+     * @param key
+     * @throws IOException
+     */
+    public void includeInTimeRangeTracker(final byte [] key) {
+      if (!isTimeRangeTrackerSet) {
+        timeRangeTracker.includeTimestamp(key);
+      }
     }
 
     public void append(final KeyValue kv) throws IOException {
@@ -735,7 +807,6 @@ public class StoreFile {
             byte [] result = new byte[rl + ql];
             System.arraycopy(kv.getBuffer(), ro, result, 0,  rl);
             System.arraycopy(kv.getBuffer(), qo, result, rl, ql);
-
             this.bloomFilter.add(result);
             break;
           default:
@@ -744,6 +815,7 @@ public class StoreFile {
         }
       }
       writer.append(kv);
+      includeInTimeRangeTracker(kv);
     }
 
     public Path getPath() {
@@ -759,6 +831,7 @@ public class StoreFile {
         }
       }
       writer.append(key, value);
+      includeInTimeRangeTracker(key);
     }
 
     public void close() throws IOException {
@@ -794,6 +867,7 @@ public class StoreFile {
     protected BloomFilter bloomFilter = null;
     protected BloomType bloomFilterType;
     private final HFile.Reader reader;
+    protected TimeRangeTracker timeRangeTracker = null;
 
     public Reader(FileSystem fs, Path path, BlockCache blockCache, boolean inMemory)
         throws IOException {
@@ -834,20 +908,35 @@ public class StoreFile {
       reader.close();
     }
 
-    public boolean shouldSeek(final byte[] row,
-                              final SortedSet<byte[]> columns) {
+    public boolean shouldSeek(Scan scan, final SortedSet<byte[]> columns) {
+        return (passesTimerangeFilter(scan) && passesBloomFilter(scan,columns));
+    }
 
-      if (this.bloomFilter == null) {
+    /**
+     * Check if this storeFile may contain keys within the TimeRange
+     * @param scan
+     * @return False if it definitely does not exist in this StoreFile
+     */
+    private boolean passesTimerangeFilter(Scan scan) {
+      if (timeRangeTracker == null) {
         return true;
+      } else {
+        return timeRangeTracker.includesTimeRange(scan.getTimeRange());
       }
+    }
 
+    private boolean passesBloomFilter(Scan scan, final SortedSet<byte[]> columns) {
+      if (this.bloomFilter == null || !scan.isGetScan()) {
+        return true;
+      }
+      byte[] row = scan.getStartRow();
       byte[] key;
       switch (this.bloomFilterType) {
         case ROW:
           key = row;
           break;
         case ROWCOL:
-          if (columns.size() == 1) {
+          if (columns != null && columns.size() == 1) {
             byte[] col = columns.first();
             key = Bytes.add(row, col);
             break;
@@ -860,7 +949,17 @@ public class StoreFile {
       try {
         ByteBuffer bloom = reader.getMetaBlock(BLOOM_FILTER_DATA_KEY, true);
         if (bloom != null) {
-          return this.bloomFilter.contains(key, bloom);
+          if (this.bloomFilterType == BloomType.ROWCOL) {
+            // Since a Row Delete is essentially a DeleteFamily applied to all
+            // columns, a file might be skipped if using row+col Bloom filter.
+            // In order to ensure this file is included an additional check is
+            // required looking only for a row bloom.
+            return this.bloomFilter.contains(key, bloom) ||
+                this.bloomFilter.contains(row, bloom);
+          }
+          else {
+            return this.bloomFilter.contains(key, bloom);
+          }
         }
       } catch (IOException e) {
         LOG.error("Error reading bloom filter data -- proceeding without",

Modified: hbase/branches/0.90_master_rewrite/src/main/java/org/apache/hadoop/hbase/regionserver/StoreFileScanner.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.90_master_rewrite/src/main/java/org/apache/hadoop/hbase/regionserver/StoreFileScanner.java?rev=990018&r1=990017&r2=990018&view=diff
==============================================================================
--- hbase/branches/0.90_master_rewrite/src/main/java/org/apache/hadoop/hbase/regionserver/StoreFileScanner.java (original)
+++ hbase/branches/0.90_master_rewrite/src/main/java/org/apache/hadoop/hbase/regionserver/StoreFileScanner.java Fri Aug 27 05:01:02 2010
@@ -23,6 +23,7 @@ package org.apache.hadoop.hbase.regionse
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.hbase.KeyValue;
+import org.apache.hadoop.hbase.client.Scan;
 import org.apache.hadoop.hbase.io.hfile.HFileScanner;
 
 import java.io.IOException;
@@ -104,6 +105,20 @@ class StoreFileScanner implements KeyVal
     }
   }
 
+  public boolean reseek(KeyValue key) throws IOException {
+    try {
+      if (!reseekAtOrAfter(hfs, key)) {
+        close();
+        return false;
+      }
+      cur = hfs.getKeyValue();
+      hfs.next();
+      return true;
+    } catch (IOException ioe) {
+      throw new IOException("Could not seek " + this, ioe);
+    }
+  }
+
   public void close() {
     // Nothing to close on HFileScanner?
     cur = null;
@@ -131,9 +146,21 @@ class StoreFileScanner implements KeyVal
     return true;
   }
 
-  // Bloom filter hook.
-  public boolean shouldSeek(final byte[] row,
-                            final SortedSet<byte[]> columns) {
-    return reader.shouldSeek(row, columns);
+  static boolean reseekAtOrAfter(HFileScanner s, KeyValue k)
+  throws IOException {
+    //This function is similar to seekAtOrAfter function
+    int result = s.reseekTo(k.getBuffer(), k.getKeyOffset(), k.getKeyLength());
+    if (result <= 0) {
+      return true;
+    } else {
+      // passed KV is larger than current KV in file, if there is a next
+      // it is after, if not then this scanner is done.
+      return s.next();
+    }
+  }
+
+  // StoreFile filter hook.
+  public boolean shouldSeek(Scan scan, final SortedSet<byte[]> columns) {
+    return reader.shouldSeek(scan, columns);
   }
 }

Modified: hbase/branches/0.90_master_rewrite/src/main/java/org/apache/hadoop/hbase/regionserver/StoreScanner.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.90_master_rewrite/src/main/java/org/apache/hadoop/hbase/regionserver/StoreScanner.java?rev=990018&r1=990017&r2=990018&view=diff
==============================================================================
--- hbase/branches/0.90_master_rewrite/src/main/java/org/apache/hadoop/hbase/regionserver/StoreScanner.java (original)
+++ hbase/branches/0.90_master_rewrite/src/main/java/org/apache/hadoop/hbase/regionserver/StoreScanner.java Fri Aug 27 05:01:02 2010
@@ -154,30 +154,24 @@ class StoreScanner implements KeyValueSc
     List<KeyValueScanner> scanners =
       new ArrayList<KeyValueScanner>(sfScanners.size()+1);
 
-    // exclude scan files that have failed file filters
+    // include only those scan files which pass all filters
     for (StoreFileScanner sfs : sfScanners) {
-      if (isGet &&
-          !sfs.shouldSeek(scan.getStartRow(), columns)) {
-        continue; // exclude this hfs
+      if (sfs.shouldSeek(scan, columns)) {
+        scanners.add(sfs);
       }
-      scanners.add(sfs);
     }
 
     // Then the memstore scanners
-    scanners.addAll(this.store.memstore.getScanners());
+    if (this.store.memstore.shouldSeek(scan)) {
+      scanners.addAll(this.store.memstore.getScanners());
+    }
     return scanners;
   }
 
   public synchronized KeyValue peek() {
-    try {
-      checkReseek();
-    } catch (IOException e) {
-      throw new RuntimeException("IOE conversion", e);
-    }
     if (this.heap == null) {
-      return null;
+      return this.lastTop;
     }
-
     return this.heap.peek();
   }
 
@@ -233,11 +227,16 @@ class StoreScanner implements KeyValueSc
       return false;
     }
 
-    matcher.setRow(peeked.getRow());
+    // only call setRow if the row changes; avoids confusing the query matcher
+    // if scanning intra-row
+    if ((matcher.row == null) || !peeked.matchingRow(matcher.row)) {
+      matcher.setRow(peeked.getRow());
+    }
+
     KeyValue kv;
     List<KeyValue> results = new ArrayList<KeyValue>();
     LOOP: while((kv = this.heap.peek()) != null) {
-      QueryMatcher.MatchCode qcode = matcher.match(kv);
+      ScanQueryMatcher.MatchCode qcode = matcher.match(kv);
       //DebugPrint.println("SS peek kv = " + kv + " with qcode = " + qcode);
       switch(qcode) {
         case INCLUDE:
@@ -262,6 +261,10 @@ class StoreScanner implements KeyValueSc
           return false;
 
         case SEEK_NEXT_ROW:
+          if (!matcher.moreRowsMayExistAfter(kv)) {
+            outResult.addAll(results);
+            return false;
+          }
           heap.next();
           break;
 
@@ -276,6 +279,15 @@ class StoreScanner implements KeyValueSc
           this.heap.next();
           break;
 
+        case SEEK_NEXT_USING_HINT:
+          KeyValue nextKV = matcher.getNextKeyHint(kv);
+          if (nextKV != null) {
+            reseek(nextKV);
+          } else {
+            heap.next();
+          }
+          break;
+
         default:
           throw new RuntimeException("UNEXPECTED");
       }
@@ -321,18 +333,20 @@ class StoreScanner implements KeyValueSc
 
   private void checkReseek() throws IOException {
     if (this.heap == null && this.lastTop != null) {
-
-      reseek(this.lastTop);
+      resetScannerStack(this.lastTop);
       this.lastTop = null; // gone!
     }
     // else dont need to reseek
   }
 
-  private void reseek(KeyValue lastTopKey) throws IOException {
+  private void resetScannerStack(KeyValue lastTopKey) throws IOException {
     if (heap != null) {
       throw new RuntimeException("StoreScanner.reseek run on an existing heap!");
     }
 
+    /* When we have the scan object, should we not pass it to getScanners()
+     * to get a limited set of scanners? We did so in the constructor and we
+     * could have done it now by storing the scan object from the constructor */
     List<KeyValueScanner> scanners = getScanners();
 
     for(KeyValueScanner scanner : scanners) {
@@ -347,4 +361,11 @@ class StoreScanner implements KeyValueSc
     KeyValue kv = heap.peek();
     matcher.setRow((kv == null ? lastTopKey : kv).getRow());
   }
+
+  @Override
+  public synchronized boolean reseek(KeyValue kv) throws IOException {
+    //Heap cannot be null, because this is only called from next() which
+    //guarantees that heap will never be null before this call.
+    return this.heap.reseek(kv);
+  }
 }

Added: hbase/branches/0.90_master_rewrite/src/main/java/org/apache/hadoop/hbase/regionserver/TimeRangeTracker.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.90_master_rewrite/src/main/java/org/apache/hadoop/hbase/regionserver/TimeRangeTracker.java?rev=990018&view=auto
==============================================================================
--- hbase/branches/0.90_master_rewrite/src/main/java/org/apache/hadoop/hbase/regionserver/TimeRangeTracker.java (added)
+++ hbase/branches/0.90_master_rewrite/src/main/java/org/apache/hadoop/hbase/regionserver/TimeRangeTracker.java Fri Aug 27 05:01:02 2010
@@ -0,0 +1,147 @@
+/**
+ * Copyright 2010 The Apache Software Foundation
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.regionserver;
+
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+
+import org.apache.hadoop.hbase.KeyValue;
+import org.apache.hadoop.hbase.KeyValue.Type;
+import org.apache.hadoop.hbase.io.TimeRange;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.hadoop.io.Writable;
+
+/**
+ * Stores the minimum and maximum timestamp values.
+ * Can be used to find if any given time range overlaps with its time range
+ * MemStores use this class to track its minimum and maximum timestamps.
+ * When writing StoreFiles, this information is stored in meta blocks and used
+ * at read time to match against the required TimeRange
+ */
+public class TimeRangeTracker implements Writable {
+
+  long minimumTimestamp = -1;
+  long maximumTimestamp = -1;
+
+  /**
+   * Default constructor.
+   * Initializes TimeRange to be null
+   */
+  public TimeRangeTracker() {
+
+  }
+
+  /**
+   * Copy Constructor
+   * @param trt source TimeRangeTracker
+   */
+  public TimeRangeTracker(final TimeRangeTracker trt) {
+    this.minimumTimestamp = trt.getMinimumTimestamp();
+    this.maximumTimestamp = trt.getMaximumTimestamp();
+  }
+
+  public TimeRangeTracker(long minimumTimestamp, long maximumTimestamp) {
+    this.minimumTimestamp = minimumTimestamp;
+    this.maximumTimestamp = maximumTimestamp;
+  }
+
+  /**
+   * Update the current TimestampRange to include the timestamp from KeyValue
+   * If the Key is of type DeleteColumn or DeleteFamily, it includes the
+   * entire time range from 0 to timestamp of the key.
+   * @param kv the KeyValue to include
+   */
+  public void includeTimestamp(final KeyValue kv) {
+    includeTimestamp(kv.getTimestamp());
+    if (kv.isDeleteColumnOrFamily()) {
+      includeTimestamp(0);
+    }
+  }
+
+  /**
+   * Update the current TimestampRange to include the timestamp from Key.
+   * If the Key is of type DeleteColumn or DeleteFamily, it includes the
+   * entire time range from 0 to timestamp of the key.
+   * @param key
+   */
+  public void includeTimestamp(final byte[] key) {
+    includeTimestamp(Bytes.toLong(key,key.length-KeyValue.TIMESTAMP_TYPE_SIZE));
+    int type = key[key.length - 1];
+    if (type == Type.DeleteColumn.getCode() ||
+        type == Type.DeleteFamily.getCode()) {
+      includeTimestamp(0);
+    }
+  }
+
+  /**
+   * If required, update the current TimestampRange to include timestamp
+   * @param timestamp the timestamp value to include
+   */
+  private void includeTimestamp(final long timestamp) {
+    if (maximumTimestamp == -1) {
+      minimumTimestamp = timestamp;
+      maximumTimestamp = timestamp;
+    }
+    else if (minimumTimestamp > timestamp) {
+      minimumTimestamp = timestamp;
+    }
+    else if (maximumTimestamp < timestamp) {
+      maximumTimestamp = timestamp;
+    }
+    return;
+  }
+
+  /**
+   * Check if the range has any overlap with TimeRange
+   * @param tr TimeRange
+   * @return True if there is overlap, false otherwise
+   */
+  public boolean includesTimeRange(final TimeRange tr) {
+    return (this.minimumTimestamp < tr.getMax() &&
+        this.maximumTimestamp >= tr.getMin());
+  }
+
+  /**
+   * @return the minimumTimestamp
+   */
+  public long getMinimumTimestamp() {
+    return minimumTimestamp;
+  }
+
+  /**
+   * @return the maximumTimestamp
+   */
+  public long getMaximumTimestamp() {
+    return maximumTimestamp;
+  }
+
+  public void write(final DataOutput out) throws IOException {
+    out.writeLong(minimumTimestamp);
+    out.writeLong(maximumTimestamp);
+  }
+
+  public void readFields(final DataInput in) throws IOException {
+    this.minimumTimestamp = in.readLong();
+    this.maximumTimestamp = in.readLong();
+  }
+
+}
+

Modified: hbase/branches/0.90_master_rewrite/src/main/java/org/apache/hadoop/hbase/regionserver/wal/HLog.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.90_master_rewrite/src/main/java/org/apache/hadoop/hbase/regionserver/wal/HLog.java?rev=990018&r1=990017&r2=990018&view=diff
==============================================================================
--- hbase/branches/0.90_master_rewrite/src/main/java/org/apache/hadoop/hbase/regionserver/wal/HLog.java (original)
+++ hbase/branches/0.90_master_rewrite/src/main/java/org/apache/hadoop/hbase/regionserver/wal/HLog.java Fri Aug 27 05:01:02 2010
@@ -28,6 +28,7 @@ import java.io.FileNotFoundException;
 import java.io.IOException;
 import java.io.OutputStream;
 import java.io.UnsupportedEncodingException;
+import java.lang.reflect.InvocationTargetException;
 import java.lang.reflect.Method;
 import java.net.URLEncoder;
 import java.util.ArrayList;
@@ -36,11 +37,13 @@ import java.util.HashMap;
 import java.util.LinkedList;
 import java.util.List;
 import java.util.Map;
+import java.util.NavigableSet;
 import java.util.SortedMap;
 import java.util.TreeMap;
 import java.util.TreeSet;
 import java.util.concurrent.Callable;
 import java.util.concurrent.ConcurrentSkipListMap;
+import java.util.concurrent.CopyOnWriteArrayList;
 import java.util.concurrent.ExecutionException;
 import java.util.concurrent.Executors;
 import java.util.concurrent.Future;
@@ -51,6 +54,7 @@ import java.util.concurrent.atomic.Atomi
 import java.util.concurrent.locks.Condition;
 import java.util.concurrent.locks.Lock;
 import java.util.concurrent.locks.ReentrantLock;
+import java.util.regex.Matcher;
 import java.util.regex.Pattern;
 
 import org.apache.commons.logging.Log;
@@ -59,6 +63,7 @@ import org.apache.hadoop.conf.Configurat
 import org.apache.hadoop.fs.FileStatus;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.PathFilter;
 import org.apache.hadoop.fs.Syncable;
 import org.apache.hadoop.hbase.HBaseConfiguration;
 import org.apache.hadoop.hbase.HConstants;
@@ -120,6 +125,15 @@ public class HLog implements Syncable {
   static final Log LOG = LogFactory.getLog(HLog.class);
   public static final byte [] METAFAMILY = Bytes.toBytes("METAFAMILY");
   static final byte [] METAROW = Bytes.toBytes("METAROW");
+
+  /*
+   * Name of directory that holds recovered edits written by the wal log
+   * splitting code, one per region
+   */
+  private static final String RECOVERED_EDITS_DIR = "recovered.edits";
+  private static final Pattern EDITFILES_NAME_PATTERN =
+    Pattern.compile("-?[0-9]+");
+  
   private final FileSystem fs;
   private final Path dir;
   private final Configuration conf;
@@ -142,11 +156,6 @@ public class HLog implements Syncable {
   private Method getNumCurrentReplicas; // refers to DFSOutputStream.getNumCurrentReplicas
   final static Object [] NO_ARGS = new Object []{};
 
-  /** Name of file that holds recovered edits written by the wal log splitting
-   * code, one per region
-   */
-  public static final String RECOVERED_EDITS = "recovered.edits";
-
   // used to indirectly tell syncFs to force the sync
   private boolean forceSync = false;
 
@@ -220,6 +229,9 @@ public class HLog implements Syncable {
    */
   private final LogSyncer logSyncerThread;
 
+  private final List<LogEntryVisitor> logEntryVisitors =
+      new CopyOnWriteArrayList<LogEntryVisitor>();
+
   /**
    * Pattern used to validate a HLog file name
    */
@@ -388,7 +400,7 @@ public class HLog implements Syncable {
         !this.logSeqNum.compareAndSet(id, newvalue); id = this.logSeqNum.get()) {
       // This could spin on occasion but better the occasional spin than locking
       // every increment of sequence number.
-      LOG.debug("Change sequence number from " + logSeqNum + " to " + newvalue);
+      LOG.debug("Changed sequenceid from " + logSeqNum + " to " + newvalue);
     }
   }
 
@@ -434,22 +446,27 @@ public class HLog implements Syncable {
       if (closed) {
         return regionsToFlush;
       }
+      // Do all the preparation outside of the updateLock to block
+      // as less as possible the incoming writes
+      long currentFilenum = this.filenum;
+      this.filenum = System.currentTimeMillis();
+      Path newPath = computeFilename();
+      HLog.Writer nextWriter = createWriter(fs, newPath, HBaseConfiguration.create(conf));
+      int nextInitialReplication = fs.getFileStatus(newPath).getReplication();
+      // Can we get at the dfsclient outputstream?  If an instance of
+      // SFLW, it'll have done the necessary reflection to get at the
+      // protected field name.
+      OutputStream nextHdfsOut = null;
+      if (nextWriter instanceof SequenceFileLogWriter) {
+        nextHdfsOut =
+          ((SequenceFileLogWriter)nextWriter).getDFSCOutputStream();
+      }
       synchronized (updateLock) {
         // Clean up current writer.
-        Path oldFile = cleanupCurrentWriter(this.filenum);
-        this.filenum = System.currentTimeMillis();
-        Path newPath = computeFilename();
-        this.writer = createWriter(fs, newPath, HBaseConfiguration.create(conf));
-        this.initialReplication = fs.getFileStatus(newPath).getReplication();
-
-        // Can we get at the dfsclient outputstream?  If an instance of
-        // SFLW, it'll have done the necessary reflection to get at the
-        // protected field name.
-        this.hdfs_out = null;
-        if (this.writer instanceof SequenceFileLogWriter) {
-          this.hdfs_out =
-            ((SequenceFileLogWriter)this.writer).getDFSCOutputStream();
-        }
+        Path oldFile = cleanupCurrentWriter(currentFilenum);
+        this.writer = nextWriter;
+        this.initialReplication = nextInitialReplication;
+        this.hdfs_out = nextHdfsOut;
 
         LOG.info((oldFile != null?
             "Roll " + FSUtils.getPath(oldFile) + ", entries=" +
@@ -457,28 +474,28 @@ public class HLog implements Syncable {
             ", filesize=" +
             this.fs.getFileStatus(oldFile).getLen() + ". ": "") +
           "New hlog " + FSUtils.getPath(newPath));
-        // Tell our listeners that a new log was created
-        if (!this.actionListeners.isEmpty()) {
-          for (LogActionsListener list : this.actionListeners) {
-            list.logRolled(newPath);
-          }
+        this.numEntries.set(0);
+      }
+      // Tell our listeners that a new log was created
+      if (!this.actionListeners.isEmpty()) {
+        for (LogActionsListener list : this.actionListeners) {
+          list.logRolled(newPath);
         }
-        // Can we delete any of the old log files?
-        if (this.outputfiles.size() > 0) {
-          if (this.lastSeqWritten.size() <= 0) {
-            LOG.debug("Last sequence written is empty. Deleting all old hlogs");
-            // If so, then no new writes have come in since all regions were
-            // flushed (and removed from the lastSeqWritten map). Means can
-            // remove all but currently open log file.
-            for (Map.Entry<Long, Path> e : this.outputfiles.entrySet()) {
-              archiveLogFile(e.getValue(), e.getKey());
-            }
-            this.outputfiles.clear();
-          } else {
-            regionsToFlush = cleanOldLogs();
+      }
+      // Can we delete any of the old log files?
+      if (this.outputfiles.size() > 0) {
+        if (this.lastSeqWritten.size() <= 0) {
+          LOG.debug("Last sequenceid written is empty. Deleting all old hlogs");
+          // If so, then no new writes have come in since all regions were
+          // flushed (and removed from the lastSeqWritten map). Means can
+          // remove all but currently open log file.
+          for (Map.Entry<Long, Path> e : this.outputfiles.entrySet()) {
+            archiveLogFile(e.getValue(), e.getKey());
           }
+          this.outputfiles.clear();
+        } else {
+          regionsToFlush = cleanOldLogs();
         }
-        this.numEntries.set(0);
       }
     } finally {
       this.cacheFlushLock.unlock();
@@ -560,7 +577,7 @@ public class HLog implements Syncable {
         byte [] oldestRegion = getOldestRegion(oldestOutstandingSeqNum);
         LOG.debug("Found " + logsToRemove + " hlogs to remove " +
           " out of total " + this.outputfiles.size() + "; " +
-          "oldest outstanding seqnum is " + oldestOutstandingSeqNum +
+          "oldest outstanding sequenceid is " + oldestOutstandingSeqNum +
           " from region " + Bytes.toString(oldestRegion));
       }
       for (Long seq : sequenceNumbers) {
@@ -650,7 +667,7 @@ public class HLog implements Syncable {
         throw e;
       }
       if (currentfilenum >= 0) {
-        oldFile = computeFilename();
+        oldFile = computeFilename(currentfilenum);
         this.outputfiles.put(Long.valueOf(this.logSeqNum.get() - 1), oldFile);
       }
     }
@@ -660,22 +677,27 @@ public class HLog implements Syncable {
   private void archiveLogFile(final Path p, final Long seqno) throws IOException {
     Path newPath = getHLogArchivePath(this.oldLogDir, p);
     LOG.info("moving old hlog file " + FSUtils.getPath(p) +
-      " whose highest sequence/edit id is " + seqno + " to " +
+      " whose highest sequenceid is " + seqno + " to " +
       FSUtils.getPath(newPath));
     this.fs.rename(p, newPath);
-    if (!this.actionListeners.isEmpty()) {
-      for (LogActionsListener list : this.actionListeners) {
-        list.logArchived(p, newPath);
-      }
-    }
   }
 
   /**
    * This is a convenience method that computes a new filename with a given
-   * file-number.
+   * using the current HLog file-number
    * @return Path
    */
   protected Path computeFilename() {
+    return computeFilename(this.filenum);
+  }
+
+  /**
+   * This is a convenience method that computes a new filename with a given
+   * file-number.
+   * @param file-number to use
+   * @return Path
+   */
+  protected Path computeFilename(long filenum) {
     if (filenum < 0) {
       throw new RuntimeException("hlog file number can't be < 0");
     }
@@ -997,10 +1019,13 @@ public class HLog implements Syncable {
    * If the pipeline isn't started yet or is empty, you will get the default
    * replication factor.  Therefore, if this function returns 0, it means you
    * are not properly running with the HDFS-826 patch.
+   * @throws InvocationTargetException
+   * @throws IllegalAccessException
+   * @throws IllegalArgumentException
    *
    * @throws Exception
    */
-  int getLogReplication() throws Exception {
+  int getLogReplication() throws IllegalArgumentException, IllegalAccessException, InvocationTargetException {
     if(this.getNumCurrentReplicas != null && this.hdfs_out != null) {
       Object repl = this.getNumCurrentReplicas.invoke(this.hdfs_out, NO_ARGS);
       if (repl instanceof Integer) {
@@ -1030,6 +1055,11 @@ public class HLog implements Syncable {
     if (!this.enabled) {
       return;
     }
+    if (!this.logEntryVisitors.isEmpty()) {
+      for (LogEntryVisitor visitor : this.logEntryVisitors) {
+        visitor.visitLogEntryBeforeWrite(info, logKey, logEdit);
+      }
+    }
     try {
       long now = System.currentTimeMillis();
       this.writer.append(new HLog.Entry(logKey, logEdit));
@@ -1181,8 +1211,16 @@ public class HLog implements Syncable {
       srcDir.toString());
     splits = splitLog(rootDir, srcDir, oldLogDir, logfiles, fs, conf);
     try {
-      LOG.info("Spliting is done. Removing old log dir "+srcDir);
-      fs.delete(srcDir, false);
+      FileStatus[] files = fs.listStatus(srcDir);
+      for(FileStatus file : files) {
+        Path newPath = getHLogArchivePath(oldLogDir, file.getPath());
+        LOG.info("Moving " +  FSUtils.getPath(file.getPath()) + " to " +
+                   FSUtils.getPath(newPath));
+        fs.rename(file.getPath(), newPath);
+      }
+      LOG.debug("Moved " + files.length + " log files to " +
+        FSUtils.getPath(oldLogDir));
+      fs.delete(srcDir, true);
     } catch (IOException e) {
       e = RemoteExceptionHandler.checkIOException(e);
       IOException io = new IOException("Cannot delete: " + srcDir);
@@ -1442,7 +1480,7 @@ public class HLog implements Syncable {
     NamingThreadFactory f  = new NamingThreadFactory(
             "SplitWriter-%1$d", Executors.defaultThreadFactory());
     ThreadPoolExecutor threadPool = (ThreadPoolExecutor)Executors.newFixedThreadPool(logWriterThreads, f);
-    for (final byte[] region : splitLogsMap.keySet()) {
+    for (final byte [] region : splitLogsMap.keySet()) {
       Callable splitter = createNewSplitter(rootDir, logWriters, splitLogsMap, region, fs, conf);
       writeFutureResult.put(region, threadPool.submit(splitter));
     }
@@ -1562,17 +1600,19 @@ public class HLog implements Syncable {
           WriterAndPath wap = logWriters.get(region);
           for (Entry logEntry: entries) {
             if (wap == null) {
-              Path logFile = getRegionLogPath(logEntry, rootDir);
-              if (fs.exists(logFile)) {
-                LOG.warn("Found existing old hlog file. It could be the result of a previous" +
-                        "failed split attempt. Deleting " + logFile +
-                        ", length=" + fs.getFileStatus(logFile).getLen());
-                fs.delete(logFile, false);
+              Path regionedits = getRegionSplitEditsPath(fs, logEntry, rootDir);
+              if (fs.exists(regionedits)) {
+                LOG.warn("Found existing old edits file. It could be the " +
+                  "result of a previous failed split attempt. Deleting " +
+                  regionedits + ", length=" + fs.getFileStatus(regionedits).getLen());
+                if (!fs.delete(regionedits, false)) {
+                  LOG.warn("Failed delete of old " + regionedits);
+                }
               }
-              Writer w = createWriter(fs, logFile, conf);
-              wap = new WriterAndPath(logFile, w);
+              Writer w = createWriter(fs, regionedits, conf);
+              wap = new WriterAndPath(regionedits, w);
               logWriters.put(region, wap);
-              LOG.debug("Creating writer path=" + logFile +
+              LOG.debug("Creating writer path=" + regionedits +
                 " region=" + Bytes.toStringBinary(region));
             }
             wap.w.append(logEntry);
@@ -1626,19 +1666,116 @@ public class HLog implements Syncable {
     }
   }
 
-  private static Path getRegionLogPath(Entry logEntry, Path rootDir) {
-    Path tableDir =
-      HTableDescriptor.getTableDir(rootDir, logEntry.getKey().getTablename());
-    Path regionDir =
-            HRegion.getRegionDir(tableDir, HRegionInfo.encodeRegionName(logEntry.getKey().getRegionName()));
-    return new Path(regionDir, RECOVERED_EDITS);
+  /*
+   * Path to a file under RECOVERED_EDITS_DIR directory of the region found in
+   * <code>logEntry</code> named for the sequenceid in the passed
+   * <code>logEntry</code>: e.g. /hbase/some_table/2323432434/recovered.edits/2332.
+   * This method also ensures existence of RECOVERED_EDITS_DIR under the region
+   * creating it if necessary.
+   * @param fs
+   * @param logEntry
+   * @param rootDir HBase root dir.
+   * @return Path to file into which to dump split log edits.
+   * @throws IOException
+   */
+  private static Path getRegionSplitEditsPath(final FileSystem fs,
+      final Entry logEntry, final Path rootDir)
+  throws IOException {
+    Path tableDir = HTableDescriptor.getTableDir(rootDir,
+      logEntry.getKey().getTablename());
+    Path regiondir = HRegion.getRegionDir(tableDir,
+      HRegionInfo.encodeRegionName(logEntry.getKey().getRegionName()));
+    Path dir = getRegionDirRecoveredEditsDir(regiondir);
+    if (!fs.exists(dir)) {
+      if (!fs.mkdirs(dir)) LOG.warn("mkdir failed on " + dir);
+    }
+    return new Path(dir,
+      formatRecoveredEditsFileName(logEntry.getKey().getLogSeqNum()));
    }
 
+  static String formatRecoveredEditsFileName(final long seqid) {
+    return String.format("%019d", seqid);
+  }
+
 
+  /**
+   * Returns sorted set of edit files made by wal-log splitter.
+   * @param fs
+   * @param regiondir
+   * @return Files in passed <code>regiondir</code> as a sorted set.
+   * @throws IOException
+   */
+  public static NavigableSet<Path> getSplitEditFilesSorted(final FileSystem fs,
+      final Path regiondir)
+  throws IOException {
+    Path editsdir = getRegionDirRecoveredEditsDir(regiondir);
+    FileStatus [] files = fs.listStatus(editsdir, new PathFilter () {
+      @Override
+      public boolean accept(Path p) {
+        boolean result = false;
+        try {
+          // Return files and only files that match the editfile names pattern.
+          // There can be other files in this directory other than edit files.
+          // In particular, on error, we'll move aside the bad edit file giving
+          // it a timestamp suffix.  See moveAsideBadEditsFile.
+          Matcher m = EDITFILES_NAME_PATTERN.matcher(p.getName());
+          result = fs.isFile(p) && m.matches();
+        } catch (IOException e) {
+          LOG.warn("Failed isFile check on " + p);
+        }
+        return result;
+      }
+    });
+    NavigableSet<Path> filesSorted = new TreeSet<Path>();
+    if (files == null) return filesSorted;
+    for (FileStatus status: files) {
+      filesSorted.add(status.getPath());
+    }
+    return filesSorted;
+  }
 
+  /**
+   * Move aside a bad edits file.
+   * @param fs
+   * @param edits Edits file to move aside.
+   * @return The name of the moved aside file.
+   * @throws IOException
+   */
+  public static Path moveAsideBadEditsFile(final FileSystem fs,
+      final Path edits)
+  throws IOException {
+    Path moveAsideName = new Path(edits.getParent(), edits.getName() + "." +
+      System.currentTimeMillis());
+    if (!fs.rename(edits, moveAsideName)) {
+      LOG.warn("Rename failed from " + edits + " to " + moveAsideName);
+    }
+    return moveAsideName;
+  }
 
+  /**
+   * @param regiondir This regions directory in the filesystem.
+   * @return The directory that holds recovered edits files for the region
+   * <code>regiondir</code>
+   */
+  public static Path getRegionDirRecoveredEditsDir(final Path regiondir) {
+    return new Path(regiondir, RECOVERED_EDITS_DIR);
+  }
 
+  /**
+   *
+   * @param visitor
+   */
+  public void addLogEntryVisitor(LogEntryVisitor visitor) {
+    this.logEntryVisitors.add(visitor);
+  }
 
+  /**
+   * 
+   * @param visitor
+   */
+  public void removeLogEntryVisitor(LogEntryVisitor visitor) {
+    this.logEntryVisitors.remove(visitor);
+  }
 
 
   public void addLogActionsListerner(LogActionsListener list) {

Modified: hbase/branches/0.90_master_rewrite/src/main/java/org/apache/hadoop/hbase/regionserver/wal/HLogKey.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.90_master_rewrite/src/main/java/org/apache/hadoop/hbase/regionserver/wal/HLogKey.java?rev=990018&r1=990017&r2=990018&view=diff
==============================================================================
--- hbase/branches/0.90_master_rewrite/src/main/java/org/apache/hadoop/hbase/regionserver/wal/HLogKey.java (original)
+++ hbase/branches/0.90_master_rewrite/src/main/java/org/apache/hadoop/hbase/regionserver/wal/HLogKey.java Fri Aug 27 05:01:02 2010
@@ -46,7 +46,6 @@ public class HLogKey implements Writable
   private long writeTime;
 
   private byte clusterId;
-  private int scope;
 
   /** Writable Consructor -- Do not use. */
   public HLogKey() {
@@ -70,7 +69,6 @@ public class HLogKey implements Writable
     this.logSeqNum = logSeqNum;
     this.writeTime = now;
     this.clusterId = HConstants.DEFAULT_CLUSTER_ID;
-    this.scope = HConstants.REPLICATION_SCOPE_LOCAL;
   }
 
   //////////////////////////////////////////////////////////////////////////////
@@ -119,22 +117,6 @@ public class HLogKey implements Writable
     this.clusterId = clusterId;
   }
 
-  /**
-   * Get the replication scope of this key
-   * @return replication scope
-   */
-  public int getScope() {
-    return this.scope;
-  }
-
-  /**
-   * Set the replication scope of this key
-   * @param scope The new scope
-   */
-  public void setScope(int scope) {
-    this.scope = scope;
-  }
-
   @Override
   public String toString() {
     return Bytes.toString(tablename) + "/" + Bytes.toString(regionName) + "/" +
@@ -158,7 +140,6 @@ public class HLogKey implements Writable
     result ^= this.logSeqNum;
     result ^= this.writeTime;
     result ^= this.clusterId;
-    result ^= this.scope;
     return result;
   }
 
@@ -187,7 +168,6 @@ public class HLogKey implements Writable
     out.writeLong(this.logSeqNum);
     out.writeLong(this.writeTime);
     out.writeByte(this.clusterId);
-    out.writeInt(this.scope);
   }
 
   public void readFields(DataInput in) throws IOException {
@@ -197,7 +177,6 @@ public class HLogKey implements Writable
     this.writeTime = in.readLong();
     try {
       this.clusterId = in.readByte();
-      this.scope = in.readInt();
     } catch(EOFException e) {
       // Means it's an old key, just continue
     }

Modified: hbase/branches/0.90_master_rewrite/src/main/java/org/apache/hadoop/hbase/regionserver/wal/LogActionsListener.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.90_master_rewrite/src/main/java/org/apache/hadoop/hbase/regionserver/wal/LogActionsListener.java?rev=990018&r1=990017&r2=990018&view=diff
==============================================================================
--- hbase/branches/0.90_master_rewrite/src/main/java/org/apache/hadoop/hbase/regionserver/wal/LogActionsListener.java (original)
+++ hbase/branches/0.90_master_rewrite/src/main/java/org/apache/hadoop/hbase/regionserver/wal/LogActionsListener.java Fri Aug 27 05:01:02 2010
@@ -33,11 +33,4 @@ public interface LogActionsListener {
    * @param newFile the path to the new hlog
    */
   public void logRolled(Path newFile);
-
-  /**
-   * Notify that the following log moved
-   * @param oldPath the old path
-   * @param newPath the new path
-   */
-  public void logArchived(Path oldPath, Path newPath);
 }

Added: hbase/branches/0.90_master_rewrite/src/main/java/org/apache/hadoop/hbase/regionserver/wal/LogEntryVisitor.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.90_master_rewrite/src/main/java/org/apache/hadoop/hbase/regionserver/wal/LogEntryVisitor.java?rev=990018&view=auto
==============================================================================
--- hbase/branches/0.90_master_rewrite/src/main/java/org/apache/hadoop/hbase/regionserver/wal/LogEntryVisitor.java (added)
+++ hbase/branches/0.90_master_rewrite/src/main/java/org/apache/hadoop/hbase/regionserver/wal/LogEntryVisitor.java Fri Aug 27 05:01:02 2010
@@ -0,0 +1,15 @@
+package org.apache.hadoop.hbase.regionserver.wal;
+
+import org.apache.hadoop.hbase.HRegionInfo;
+
+public interface LogEntryVisitor {
+
+  /**
+   *
+   * @param info
+   * @param logKey
+   * @param logEdit
+   */
+  public void visitLogEntryBeforeWrite(HRegionInfo info, HLogKey logKey,
+                                       WALEdit logEdit);
+}