You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hbase.apache.org by st...@apache.org on 2010/08/27 07:01:07 UTC

svn commit: r990018 [7/10] - in /hbase/branches/0.90_master_rewrite: ./ bin/ bin/replication/ src/assembly/ src/docbkx/ src/main/java/org/apache/hadoop/hbase/ src/main/java/org/apache/hadoop/hbase/client/ src/main/java/org/apache/hadoop/hbase/filter/ s...

Added: hbase/branches/0.90_master_rewrite/src/site/xdoc/replication.xml
URL: http://svn.apache.org/viewvc/hbase/branches/0.90_master_rewrite/src/site/xdoc/replication.xml?rev=990018&view=auto
==============================================================================
--- hbase/branches/0.90_master_rewrite/src/site/xdoc/replication.xml (added)
+++ hbase/branches/0.90_master_rewrite/src/site/xdoc/replication.xml Fri Aug 27 05:01:02 2010
@@ -0,0 +1,429 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+  Copyright 2010 The Apache Software Foundation
+
+  Licensed under the Apache License, Version 2.0 (the "License");
+  you may not use this file except in compliance with the License.
+  You may obtain a copy of the License at
+
+      http://www.apache.org/licenses/LICENSE-2.0
+
+  Unless required by applicable law or agreed to in writing, software
+  distributed under the License is distributed on an "AS IS" BASIS,
+  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  See the License for the specific language governing permissions and
+  limitations under the License.
+-->
+
+<!DOCTYPE document PUBLIC "-//APACHE//DTD Documentation V2.0//EN"
+          "http://forrest.apache.org/dtd/document-v20.dtd">
+
+<document xmlns="http://maven.apache.org/XDOC/2.0"
+  xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+  xsi:schemaLocation="http://maven.apache.org/XDOC/2.0 http://maven.apache.org/xsd/xdoc-2.0.xsd">
+  <properties>
+    <title>
+      HBase Replication
+    </title>
+  </properties>
+  <body>
+    <section name="Overview">
+      <p>
+        HBase replication is a way to copy data between HBase deployments. It
+        can serve as a disaster recovery solution and can contribute to provide
+        higher availability at HBase layer. It can also serve more practically;
+        for example, as a way to easily copy edits from a web-facing cluster to a "MapReduce"
+        cluster which will process old and new data and ship back the results
+        automatically.
+      </p>
+      <p>
+        The basic architecture pattern used for HBase replication is (HBase cluster) master-push;
+        it is much easier to keep track of what’s currently being replicated since
+        each region server has its own write-ahead-log (aka WAL or HLog), just like
+        other well known solutions like MySQL master/slave replication where
+        there’s only one bin log to keep track of. One master cluster can
+        replicate to any number of slave clusters, and each region server will
+        participate to replicate their own stream of edits.
+      </p>
+      <p>
+        The replication is done asynchronously, meaning that the clusters can
+        be geographically distant, the links between them can be offline for
+        some time, and rows inserted on the master cluster won’t be
+        available at the same time on the slave clusters (eventual consistency).
+      </p>
+      <p>
+        The replication format used in this design is conceptually the same as
+        <a href="http://dev.mysql.com/doc/refman/5.1/en/replication-formats.html">
+        MySQL’s statement-based replication </a>. Instead of SQL statements, whole
+        WALEdits (consisting of multiple cell inserts coming from the clients'
+        Put and Delete) are replicated in order to maintain atomicity.
+      </p>
+      <p>
+        The HLogs from each region server are the basis of HBase replication,
+        and must be kept in HDFS as long as they are needed to replicate data
+        to any slave cluster. Each RS reads from the oldest log it needs to
+        replicate and keeps the current position inside ZooKeeper to simplify
+        failure recovery. That position can be different for every slave 
+        cluster, same for the queue of HLogs to process.
+      </p>
+      <p>
+        The clusters participating in replication can be of asymmetric sizes
+        and the master cluster will do its “best effort” to balance the stream
+        of replication on the slave clusters by relying on randomization.
+      </p>
+      <img src="images/replication_overview.png"/>
+    </section>
+    <section name="Life of a log edit">
+      <p>
+        The following sections describe the life of a single edit going from a
+        client that communicates with a master cluster all the way to a single
+        slave cluster.
+      </p>
+      <section name="Normal processing">
+        <p>
+          The client uses a HBase API that sends a Put, Delete or ICV to a region
+          server. The key values are transformed into a WALEdit by the region
+          server and is inspected by the replication code that, for each family
+          that is scoped for replication, adds the scope to the edit. The edit
+          is appended to the current WAL and is then applied to its MemStore.
+        </p>
+        <p>
+          In a separate thread, the edit is read from the log (as part of a batch)
+          and only the KVs that are replicable are kept (that is, that are part
+          of a family scoped GLOBAL in the family's schema and non-catalog so not
+          .META. or -ROOT-). When the buffer is filled, or the reader hits the
+          end of the file, the buffer is sent to a random region server on the
+          slave cluster.
+        </p>
+        <p>
+          Synchronously, the region server that receives the edits reads them
+          sequentially and applies them on its own cluster using the HBase client
+          (HTables managed by a HTablePool) automatically. If consecutive rows
+          belong to the same table, they are inserted together in order to
+          leverage parallel insertions.
+        </p>
+        <p>
+          Back in the master cluster's region server, the offset for the current
+          WAL that's being replicated is registered in ZooKeeper.
+        </p>
+      </section>
+      <section name="Non-responding slave clusters">
+        <p>
+          The edit is inserted in the same way.
+        </p>
+        <p>
+          In the separate thread, the region server reads, filters and buffers
+          the log edits the same way as during normal processing. The slave
+          region server that's contacted doesn't answer to the RPC, so the master
+          region server will sleep and retry up to a configured number of times.
+          If the slave RS still isn't available, the master cluster RS will select a
+          new subset of RS to replicate to and will retry sending the buffer of
+          edits.
+        </p>
+        <p>
+          In the mean time, the WALs will be rolled and stored in a queue in
+          ZooKeeper. Logs that are archived by their region server (archiving is
+          basically moving a log from the region server's logs directory to a
+          central logs archive directory) will update their paths in the in-memory
+          queue of the replicating thread.
+        </p>
+        <p>
+          When the slave cluster is finally available, the buffer will be applied
+          the same way as during normal processing. The master cluster RS will then
+          replicate the backlog of logs.
+        </p>
+      </section>
+    </section>
+    <section name="Internals">
+      <p>
+        This section describes in depth how each of replication's internal
+        features operate.
+      </p>
+      <section name="Choosing region servers to replicate to">
+        <p>
+          When a master cluster RS initiates a replication source to a slave cluster,
+          it first connects to the slave's ZooKeeper ensemble using the provided
+          cluster key (taht key is composed of the value of hbase.zookeeper.quorum,
+          zookeeper.znode.parent and hbase.zookeeper.property.clientPort). It
+          then scans the "rs" directory to discover all the available sinks
+          (region servers that are accepting incoming streams of edits to replicate)
+          and will randomly choose a subset of them using a configured
+          ratio (which has a default value of 10%). For example, if a slave
+          cluster has 150 machines, 15 will be chosen as potential recipient for
+          edits that this master cluster RS will be sending. Since this is done by all
+          master cluster RSs, the probability that all slave RSs are used is very high,
+          and this method works for clusters of any size. For example, a master cluster
+          of 10 machines replicating to a slave cluster of 5 machines with a ratio
+          of 10% means that the master cluster RSs will choose one machine each
+          at random, thus the chance of overlapping and full usage of the slave
+          cluster is higher.
+        </p>
+      </section>
+      <section name="Keeping track of logs">
+        <p>
+          Every master cluster RS has its own znode in the replication znodes hierarchy.
+          It contains one znode per peer cluster (if 5 slave clusters, 5 znodes
+          are created), and each of these contain a queue
+          of HLogs to process. Each of these queues will track the HLogs created
+          by that RS, but they can differ in size. For example, if one slave
+          cluster becomes unavailable for some time then the HLogs cannot be,
+          thus they need to stay in the queue (while the others are processed).
+          See the section named "Region server failover" for an example.
+        </p>
+        <p>
+          When a source is instantiated, it contains the current HLog that the
+          region server is writing to. During log rolling, the new file is added
+          to the queue of each slave cluster's znode just before it's made available.
+          This ensures that all the sources are aware that a new log exists
+          before HLog is able to append edits into it, but this operations is
+          now more expensive.
+          The queue items are discarded when the replication thread cannot read
+          more entries from a file (because it reached the end of the last block)
+          and that there are other files in the queue.
+          This means that if a source is up-to-date and replicates from the log
+          that the region server writes to, reading up to the "end" of the
+          current file won't delete the item in the queue.
+        </p>
+        <p>
+          When a log is archived (because it's not used anymore or because there's
+          too many of them per hbase.regionserver.maxlogs typically because insertion
+          rate is faster than region flushing), it will notify the source threads that the path
+          for that log changed. If the a particular source was already done with
+          it, it will just ignore the message. If it's in the queue, the path
+          will be updated in memory. If the log is currently being replicated,
+          the change will be done atomically so that the reader doesn't try to
+          open the file when it's already moved. Also, moving a file is a NameNode
+          operation so, if the reader is currently reading the log, it won't
+          generate any exception.
+        </p>
+      </section>
+      <section name="Reading, filtering and sending edits">
+        <p>
+          By default, a source will try to read from a log file and ship log
+          entries as fast as possible to a sink. This is first limited by the
+          filtering of log entries; only KeyValues that are scoped GLOBAL and
+          that don't belong to catalog tables will be retained. A second limit
+          is imposed on the total size of the list of edits to replicate per slave,
+          which by default is 64MB. This means that a master cluster RS with 3 slaves
+          will use at most 192MB to store data to replicate. This doesn't account
+          the data filtered that wasn't garbage collected.
+        </p>
+        <p>
+          Once the maximum size of edits was buffered or the reader hits the end
+          of the log file, the source thread will stop reading and will choose
+          at random a sink to replicate to (from the list that was generated by
+          keeping only a subset of slave RSs). It will directly issue a RPC to
+          the chosen machine and will wait for the method to return. If it's
+          successful, the source will determine if the current file is emptied
+          or if it should continue to read from it. If the former, it will delete
+          the znode in the queue. If the latter, it will register the new offset
+          in the log's znode. If the RPC threw an exception, the source will retry
+          10 times until trying to find a different sink.
+        </p>
+      </section>
+      <section name="Applying edits">
+        <p>
+          The sink synchronously applies the edits to its local cluster using
+          the native client API. This method is also synchronized between every
+          incoming sources, which means that only one batch of log entries can be
+          replicated at a time by each slave region server.
+        </p>
+        <p>
+          The sink applies the edits one by one, unless it's able to batch
+          sequential Puts that belong to the same table in order to use the
+          parallel puts feature of HConnectionManager. The Put and Delete objects
+          are recreated by inspecting the incoming WALEdit objects and are
+          with the exact same row, family, qualifier, timestamp, and value (for
+          Put). Note that if the master and slave cluster don't have the same
+          time, time-related issues may occur.
+        </p>
+      </section>
+      <section name="Cleaning logs">
+        <p>
+          If replication isn't enabled, the master's logs cleaning thread will
+          delete old logs using a configured TTL. This doesn't work well with
+          replication since archived logs passed their TTL may still be in a
+          queue. Thus, the default behavior is augmented so that if a log is
+          passed its TTL, the cleaning thread will lookup every queue until it
+          finds the log (while caching the ones it finds). If it's not found,
+          the log will be deleted. The next time it has to look for a log,
+          it will first use its cache.
+        </p>
+      </section>
+      <section name="Region server failover">
+        <p>
+          As long as region servers don't fail, keeping track of the logs in ZK
+          doesn't add any value. Unfortunately, they do fail, so since ZooKeeper
+          is highly available we can count on it and its semantics to help us
+          managing the transfer of the queues.
+        </p>
+        <p>
+          All the master cluster RSs keep a watcher on every other one of them to be
+          notified when one dies (just like the master does). When it happens,
+          they all race to create a znode called "lock" inside the dead RS' znode
+          that contains its queues. The one that creates it successfully will
+          proceed by transferring all the queues to its own znode (one by one
+          since ZK doesn't support the rename operation) and will delete all the
+          old ones when it's done. The recovered queues' znodes will be named
+          with the id of the slave cluster appended with the name of the dead
+          server. 
+        </p>
+        <p>
+          Once that is done, the master cluster RS will create one new source thread per
+          copied queue, and each of them will follow the read/filter/ship pattern.
+          The main difference is that those queues will never have new data since
+          they don't belong to their new region server, which means that when
+          the reader hits the end of the last log, the queue's znode will be
+          deleted and the master cluster RS will close that replication source.
+        </p>
+        <p>
+          For example, consider a master cluster with 3 region servers that's
+          replicating to a single slave with id '2'. The following hierarchy
+          represents what the znodes layout could be at some point in time. We
+          can see the RSs' znodes all contain a "peers" znode that contains a
+          single queue. The znode names in the queues represent the actual file
+          names on HDFS in the form "address,port.timestamp".
+        </p>
+        <pre>
+/hbase/replication/rs/
+                      1.1.1.1,60020,123456780/
+                        peers/
+                              2/
+                                1.1.1.1,60020.1234  (Contains a position)
+                                1.1.1.1,60020.1265
+                      1.1.1.2,60020,123456790/
+                        peers/
+                              2/
+                                1.1.1.2,60020.1214  (Contains a position)
+                                1.1.1.2,60020.1248
+                                1.1.1.2,60020.1312
+                      1.1.1.3,60020,    123456630/
+                        peers/
+                              2/
+                                1.1.1.3,60020.1280  (Contains a position)
+
+        </pre>
+        <p>
+          Now let's say that 1.1.1.2 loses its ZK session. The survivors will race
+          to create a lock, and for some reasons 1.1.1.3 wins. It will then start
+          transferring all the queues to its local peers znode by appending the
+          name of the dead server. Right before 1.1.1.3 is able to clean up the
+          old znodes, the layout will look like the following:
+        </p>
+        <pre>
+/hbase/replication/rs/
+                      1.1.1.1,60020,123456780/
+                        peers/
+                              2/
+                                1.1.1.1,60020.1234  (Contains a position)
+                                1.1.1.1,60020.1265
+                      1.1.1.2,60020,123456790/
+                        lock
+                        peers/
+                              2/
+                                1.1.1.2,60020.1214  (Contains a position)
+                                1.1.1.2,60020.1248
+                                1.1.1.2,60020.1312
+                      1.1.1.3,60020,123456630/
+                        peers/
+                              2/
+                                1.1.1.3,60020.1280  (Contains a position)
+
+                              2-1.1.1.2,60020,123456790/
+                                1.1.1.2,60020.1214  (Contains a position)
+                                1.1.1.2,60020.1248
+                                1.1.1.2,60020.1312
+        </pre>
+        <p>
+          Some time later, but before 1.1.1.3 is able to finish replicating the
+          last HLog from 1.1.1.2, let's say that it dies too (also some new logs
+          were created in the normal queues). The last RS will then try to lock
+          1.1.1.3's znode and will begin transferring all the queues. The new
+          layout will be:
+        </p>
+        <pre>
+/hbase/replication/rs/
+                      1.1.1.1,60020,123456780/
+                        peers/
+                              2/
+                                1.1.1.1,60020.1378  (Contains a position)
+
+                              2-1.1.1.3,60020,123456630/
+                                1.1.1.3,60020.1325  (Contains a position)
+                                1.1.1.3,60020.1401
+
+                              2-1.1.1.2,60020,123456790-1.1.1.3,60020,123456630/
+                                1.1.1.2,60020.1312  (Contains a position)
+                      1.1.1.3,60020,123456630/
+                        lock
+                        peers/
+                              2/
+                                1.1.1.3,60020.1325  (Contains a position)
+                                1.1.1.3,60020.1401
+
+                              2-1.1.1.2,60020,123456790/
+                                1.1.1.2,60020.1312  (Contains a position)
+        </pre>
+      </section>
+    </section>
+    <section name="FAQ">
+      <section name="Why do all clusters need to be in the same timezone?">
+        <p>
+          Suppose an edit to cell X happens in a EST cluster, then 2 minutes
+          later a new edits happens to the same cell in a PST cluster and that
+          both clusters are in a master-master replication. The second edit is
+          considered younger, so the first will always hide it while in fact the
+          second is older.
+        </p>
+      </section>
+      <section name="GLOBAL means replicate? Any provision to replicate only to cluster X and not to cluster Y? or is that for later?">
+        <p>
+          Yes, this is for much later.
+        </p>
+      </section>
+      <section name="You need a bulk edit shipper? Something that allows you transfer 64MB of edits in one go?">
+        <p>
+          You can use the HBase-provided utility called CopyTable from the package
+          org.apache.hadoop.hbase.mapreduce in order to have a discp-like tool to
+          bulk copy data.
+        </p>
+      </section>
+      <section name="Is it a mistake that WALEdit doesn't carry Put and Delete objects, that we have to reinstantiate not only replicating but when replaying edits?">
+        <p>
+          Yes, this behavior would help a lot but it's not currently available
+          in HBase (BatchUpdate had that, but it was lost in the new API).
+        </p>
+      </section>
+    </section>
+    <section name="Known bugs/missing features">
+      <p>
+        Here's a list of all the jiras that relate to major issues or missing
+        features in the replication implementation.
+      </p>
+      <ol>
+        <li>
+            HBASE-2688, master-master replication is disabled in the code, we need
+            to enable and test it.
+        </li>
+        <li>
+            HBASE-2611, basically if a region server dies while recovering the
+            queues of another dead RS, we will miss the data from the queues
+            that weren't copied.
+        </li>
+        <li>
+            HBASE-2196, a master cluster can only support a single slave, some
+            refactoring is needed to support this.
+        </li>
+        <li>
+            HBASE-2195, edits are applied disregard their home cluster, it should
+            carry that data and check it.
+        </li>
+        <li>
+            HBASE-2200, currently all the replication operations (adding or removing
+            streams for example) are done only when the clusters are offline. This
+            should be possible at runtime.
+        </li>
+      </ol>
+    </section>
+  </body>
+</document>
\ No newline at end of file

Added: hbase/branches/0.90_master_rewrite/src/test/java/org/apache/hadoop/hbase/BROKE_TODO_FIX_TestAcidGuarantees.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.90_master_rewrite/src/test/java/org/apache/hadoop/hbase/BROKE_TODO_FIX_TestAcidGuarantees.java?rev=990018&view=auto
==============================================================================
--- hbase/branches/0.90_master_rewrite/src/test/java/org/apache/hadoop/hbase/BROKE_TODO_FIX_TestAcidGuarantees.java (added)
+++ hbase/branches/0.90_master_rewrite/src/test/java/org/apache/hadoop/hbase/BROKE_TODO_FIX_TestAcidGuarantees.java Fri Aug 27 05:01:02 2010
@@ -0,0 +1,330 @@
+/**
+ * Copyright 2010 The Apache Software Foundation
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase;
+
+import java.io.IOException;
+import java.util.List;
+import java.util.Random;
+import java.util.concurrent.atomic.AtomicLong;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.MultithreadedTestUtil.TestContext;
+import org.apache.hadoop.hbase.MultithreadedTestUtil.RepeatingTestThread;
+import org.apache.hadoop.hbase.client.Get;
+import org.apache.hadoop.hbase.client.HTable;
+import org.apache.hadoop.hbase.client.Put;
+import org.apache.hadoop.hbase.client.Result;
+import org.apache.hadoop.hbase.client.ResultScanner;
+import org.apache.hadoop.hbase.client.Scan;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.junit.Ignore;
+import org.junit.Test;
+
+import com.google.common.collect.Lists;
+
+/**
+ * Test case that uses multiple threads to read and write multifamily rows
+ * into a table, verifying that reads never see partially-complete writes.
+ * 
+ * This can run as a junit test, or with a main() function which runs against
+ * a real cluster (eg for testing with failures, region movement, etc)
+ */
+public class BROKE_TODO_FIX_TestAcidGuarantees {
+  protected static final Log LOG = LogFactory.getLog(BROKE_TODO_FIX_TestAcidGuarantees.class);
+  public static final byte [] TABLE_NAME = Bytes.toBytes("TestAcidGuarantees");
+  public static final byte [] FAMILY_A = Bytes.toBytes("A");
+  public static final byte [] FAMILY_B = Bytes.toBytes("B");
+  public static final byte [] FAMILY_C = Bytes.toBytes("C");
+  public static final byte [] QUALIFIER_NAME = Bytes.toBytes("data");
+
+  public static final byte[][] FAMILIES = new byte[][] {
+    FAMILY_A, FAMILY_B, FAMILY_C };
+
+  private HBaseTestingUtility util;
+
+  public static int NUM_COLS_TO_CHECK = 50;
+
+  private void createTableIfMissing()
+    throws IOException {
+    try {
+      util.createTable(TABLE_NAME, FAMILIES);
+    } catch (TableExistsException tee) {
+    }
+  }
+
+  public BROKE_TODO_FIX_TestAcidGuarantees() {
+    // Set small flush size for minicluster so we exercise reseeking scanners
+    Configuration conf = HBaseConfiguration.create();
+    conf.set("hbase.hregion.memstore.flush.size", String.valueOf(128*1024));
+    util = new HBaseTestingUtility(conf);
+  }
+  
+  /**
+   * Thread that does random full-row writes into a table.
+   */
+  public static class AtomicityWriter extends RepeatingTestThread {
+    Random rand = new Random();
+    byte data[] = new byte[10];
+    byte targetRows[][];
+    byte targetFamilies[][];
+    HTable table;
+    AtomicLong numWritten = new AtomicLong();
+    
+    public AtomicityWriter(TestContext ctx, byte targetRows[][],
+                           byte targetFamilies[][]) throws IOException {
+      super(ctx);
+      this.targetRows = targetRows;
+      this.targetFamilies = targetFamilies;
+      table = new HTable(ctx.getConf(), TABLE_NAME);
+    }
+    public void doAnAction() throws Exception {
+      // Pick a random row to write into
+      byte[] targetRow = targetRows[rand.nextInt(targetRows.length)];
+      Put p = new Put(targetRow); 
+      rand.nextBytes(data);
+
+      for (byte[] family : targetFamilies) {
+        for (int i = 0; i < NUM_COLS_TO_CHECK; i++) {
+          byte qualifier[] = Bytes.toBytes("col" + i);
+          p.add(family, qualifier, data);
+        }
+      }
+      table.put(p);
+      numWritten.getAndIncrement();
+    }
+  }
+  
+  /**
+   * Thread that does single-row reads in a table, looking for partially
+   * completed rows.
+   */
+  public static class AtomicGetReader extends RepeatingTestThread {
+    byte targetRow[];
+    byte targetFamilies[][];
+    HTable table;
+    int numVerified = 0;
+    AtomicLong numRead = new AtomicLong();
+
+    public AtomicGetReader(TestContext ctx, byte targetRow[],
+                           byte targetFamilies[][]) throws IOException {
+      super(ctx);
+      this.targetRow = targetRow;
+      this.targetFamilies = targetFamilies;
+      table = new HTable(ctx.getConf(), TABLE_NAME);
+    }
+
+    public void doAnAction() throws Exception {
+      Get g = new Get(targetRow);
+      Result res = table.get(g);
+      byte[] gotValue = null;
+      if (res.getRow() == null) {
+        // Trying to verify but we didn't find the row - the writing
+        // thread probably just hasn't started writing yet, so we can
+        // ignore this action
+        return;
+      }
+      
+      for (byte[] family : targetFamilies) {
+        for (int i = 0; i < NUM_COLS_TO_CHECK; i++) {
+          byte qualifier[] = Bytes.toBytes("col" + i);
+          byte thisValue[] = res.getValue(family, qualifier);
+          if (gotValue != null && !Bytes.equals(gotValue, thisValue)) {
+            gotFailure(gotValue, res);
+          }
+          numVerified++;
+          gotValue = thisValue;
+        }
+      }
+      numRead.getAndIncrement();
+    }
+
+    private void gotFailure(byte[] expected, Result res) {
+      StringBuilder msg = new StringBuilder();
+      msg.append("Failed after ").append(numVerified).append("!");
+      msg.append("Expected=").append(Bytes.toStringBinary(expected));
+      msg.append("Got:\n");
+      for (KeyValue kv : res.list()) {
+        msg.append(kv.toString());
+        msg.append(" val= ");
+        msg.append(Bytes.toStringBinary(kv.getValue()));
+        msg.append("\n");
+      }
+      throw new RuntimeException(msg.toString());
+    }
+  }
+  
+  /**
+   * Thread that does full scans of the table looking for any partially completed
+   * rows.
+   */
+  public static class AtomicScanReader extends RepeatingTestThread {
+    byte targetFamilies[][];
+    HTable table;
+    AtomicLong numScans = new AtomicLong();
+    AtomicLong numRowsScanned = new AtomicLong();
+
+    public AtomicScanReader(TestContext ctx,
+                           byte targetFamilies[][]) throws IOException {
+      super(ctx);
+      this.targetFamilies = targetFamilies;
+      table = new HTable(ctx.getConf(), TABLE_NAME);
+    }
+
+    public void doAnAction() throws Exception {
+      Scan s = new Scan();
+      for (byte[] family : targetFamilies) {
+        s.addFamily(family);
+      }
+      ResultScanner scanner = table.getScanner(s);
+      
+      for (Result res : scanner) {
+        byte[] gotValue = null;
+  
+        for (byte[] family : targetFamilies) {
+          for (int i = 0; i < NUM_COLS_TO_CHECK; i++) {
+            byte qualifier[] = Bytes.toBytes("col" + i);
+            byte thisValue[] = res.getValue(family, qualifier);
+            if (gotValue != null && !Bytes.equals(gotValue, thisValue)) {
+              gotFailure(gotValue, res);
+            }
+            gotValue = thisValue;
+          }
+        }
+        numRowsScanned.getAndIncrement();
+      }
+      numScans.getAndIncrement();
+    }
+
+    private void gotFailure(byte[] expected, Result res) {
+      StringBuilder msg = new StringBuilder();
+      msg.append("Failed after ").append(numRowsScanned).append("!");
+      msg.append("Expected=").append(Bytes.toStringBinary(expected));
+      msg.append("Got:\n");
+      for (KeyValue kv : res.list()) {
+        msg.append(kv.toString());
+        msg.append(" val= ");
+        msg.append(Bytes.toStringBinary(kv.getValue()));
+        msg.append("\n");
+      }
+      throw new RuntimeException(msg.toString());
+    }
+  }
+
+
+  public void runTestAtomicity(long millisToRun,
+      int numWriters,
+      int numGetters,
+      int numScanners,
+      int numUniqueRows) throws Exception {
+    createTableIfMissing();
+    TestContext ctx = new TestContext(util.getConfiguration());
+    
+    byte rows[][] = new byte[numUniqueRows][];
+    for (int i = 0; i < numUniqueRows; i++) {
+      rows[i] = Bytes.toBytes("test_row_" + i);
+    }
+    
+    List<AtomicityWriter> writers = Lists.newArrayList();
+    for (int i = 0; i < numWriters; i++) {
+      AtomicityWriter writer = new AtomicityWriter(
+          ctx, rows, FAMILIES);
+      writers.add(writer);
+      ctx.addThread(writer);
+    }
+
+    List<AtomicGetReader> getters = Lists.newArrayList();
+    for (int i = 0; i < numGetters; i++) {
+      AtomicGetReader getter = new AtomicGetReader(
+          ctx, rows[i % numUniqueRows], FAMILIES);
+      getters.add(getter);
+      ctx.addThread(getter);
+    }
+    
+    List<AtomicScanReader> scanners = Lists.newArrayList();
+    for (int i = 0; i < numScanners; i++) {
+      AtomicScanReader scanner = new AtomicScanReader(ctx, FAMILIES);
+      scanners.add(scanner);
+      ctx.addThread(scanner);
+    }
+    
+    ctx.startThreads();
+    ctx.waitFor(millisToRun);
+    ctx.stop();
+    
+    LOG.info("Finished test. Writers:");
+    for (AtomicityWriter writer : writers) {
+      LOG.info("  wrote " + writer.numWritten.get());
+    }
+    LOG.info("Readers:");
+    for (AtomicGetReader reader : getters) {
+      LOG.info("  read " + reader.numRead.get());
+    }
+    LOG.info("Scanners:");
+    for (AtomicScanReader scanner : scanners) {
+      LOG.info("  scanned " + scanner.numScans.get());
+      LOG.info("  verified " + scanner.numRowsScanned.get() + " rows");
+    }
+  }
+
+  @Test
+  public void testGetAtomicity() throws Exception {
+    util.startMiniCluster(1);
+    try {
+      runTestAtomicity(20000, 5, 5, 0, 3);
+    } finally {
+      util.shutdownMiniCluster();
+    }    
+  }
+
+  @Test
+  @Ignore("Currently not passing - see HBASE-2670")
+  public void testScanAtomicity() throws Exception {
+    util.startMiniCluster(1);
+    try {
+      runTestAtomicity(20000, 5, 0, 5, 3);
+    } finally {
+      util.shutdownMiniCluster();
+    }    
+  }
+
+  @Test
+  @Ignore("Currently not passing - see HBASE-2670")
+  public void testMixedAtomicity() throws Exception {
+    util.startMiniCluster(1);
+    try {
+      runTestAtomicity(20000, 5, 2, 2, 3);
+    } finally {
+      util.shutdownMiniCluster();
+    }    
+  }
+
+  public static void main(String args[]) throws Exception {
+    Configuration c = HBaseConfiguration.create();
+    BROKE_TODO_FIX_TestAcidGuarantees test = new BROKE_TODO_FIX_TestAcidGuarantees();
+    test.setConf(c);
+    test.runTestAtomicity(5*60*1000, 5, 2, 2, 3);
+  }
+
+  private void setConf(Configuration c) {
+    util = new HBaseTestingUtility(c);
+  }
+}

Modified: hbase/branches/0.90_master_rewrite/src/test/java/org/apache/hadoop/hbase/HBaseTestingUtility.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.90_master_rewrite/src/test/java/org/apache/hadoop/hbase/HBaseTestingUtility.java?rev=990018&r1=990017&r2=990018&view=diff
==============================================================================
--- hbase/branches/0.90_master_rewrite/src/test/java/org/apache/hadoop/hbase/HBaseTestingUtility.java (original)
+++ hbase/branches/0.90_master_rewrite/src/test/java/org/apache/hadoop/hbase/HBaseTestingUtility.java Fri Aug 27 05:01:02 2010
@@ -29,6 +29,8 @@ import java.security.MessageDigest;
 import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.List;
+import java.util.Map;
+import java.util.NavigableSet;
 import java.util.UUID;
 
 import org.apache.commons.logging.Log;
@@ -39,6 +41,7 @@ import org.apache.hadoop.conf.Configurat
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.hbase.client.Delete;
+import org.apache.hadoop.hbase.client.Get;
 import org.apache.hadoop.hbase.client.HBaseAdmin;
 import org.apache.hadoop.hbase.client.HConnection;
 import org.apache.hadoop.hbase.client.HTable;
@@ -47,7 +50,11 @@ import org.apache.hadoop.hbase.client.Re
 import org.apache.hadoop.hbase.client.ResultScanner;
 import org.apache.hadoop.hbase.client.Scan;
 import org.apache.hadoop.hbase.master.HMaster;
+import org.apache.hadoop.hbase.regionserver.HRegion;
 import org.apache.hadoop.hbase.regionserver.HRegionServer;
+import org.apache.hadoop.hbase.regionserver.InternalScanner;
+import org.apache.hadoop.hbase.regionserver.ReadWriteConsistencyControl;
+import org.apache.hadoop.hbase.regionserver.Store;
 import org.apache.hadoop.hbase.util.Bytes;
 import org.apache.hadoop.hbase.util.FSUtils;
 import org.apache.hadoop.hbase.util.Threads;
@@ -384,6 +391,14 @@ public class HBaseTestingUtility {
     this.hbaseCluster.flushcache();
   }
 
+  /**
+   * Flushes all caches in the mini hbase cluster
+   * @throws IOException
+   */
+  public void flush(byte [] tableName) throws IOException {
+    this.hbaseCluster.flushcache(tableName);
+  }
+
 
   /**
    * Create a table.
@@ -506,6 +521,7 @@ public class HBaseTestingUtility {
    * @throws IOException
    */
   public int loadTable(final HTable t, final byte[] f) throws IOException {
+    t.setAutoFlush(false);
     byte[] k = new byte[3];
     int rowCount = 0;
     for (byte b1 = 'a'; b1 <= 'z'; b1++) {
@@ -521,6 +537,34 @@ public class HBaseTestingUtility {
         }
       }
     }
+    t.flushCommits();
+    return rowCount;
+  }
+  /**
+   * Load region with rows from 'aaa' to 'zzz'.
+   * @param r Region
+   * @param f Family
+   * @return Count of rows loaded.
+   * @throws IOException
+   */
+  public int loadRegion(final HRegion r, final byte[] f)
+  throws IOException {
+    byte[] k = new byte[3];
+    int rowCount = 0;
+    for (byte b1 = 'a'; b1 <= 'z'; b1++) {
+      for (byte b2 = 'a'; b2 <= 'z'; b2++) {
+        for (byte b3 = 'a'; b3 <= 'z'; b3++) {
+          k[0] = b1;
+          k[1] = b2;
+          k[2] = b3;
+          Put put = new Put(k);
+          put.add(f, null, k);
+          if (r.getLog() == null) put.setWriteToWAL(false);
+          r.put(put);
+          rowCount++;
+        }
+      }
+    }
     return rowCount;
   }
 
@@ -678,6 +722,26 @@ public class HBaseTestingUtility {
   }
 
   /**
+   * Tool to get the reference to the region server object that holds the
+   * region of the specified user table.
+   * It first searches for the meta rows that contain the region of the
+   * specified table, then gets the index of that RS, and finally retrieves
+   * the RS's reference.
+   * @param tableName user table to lookup in .META.
+   * @return region server that holds it, null if the row doesn't exist
+   * @throws IOException
+   */
+  public HRegionServer getRSForFirstRegionInTable(byte[] tableName)
+      throws IOException {
+    List<byte[]> metaRows = getMetaTableRows(tableName);
+    if (metaRows == null || metaRows.size() == 0) {
+      return null;
+    }
+    int index = hbaseCluster.getServerWith(metaRows.get(0));
+    return hbaseCluster.getRegionServerThreads().get(index).getRegionServer();
+  }
+
+  /**
    * Starts a <code>MiniMRCluster</code> with a default number of
    * <code>TaskTracker</code>'s.
    *
@@ -1026,4 +1090,42 @@ public class HBaseTestingUtility {
       Threads.sleep(1000);
     }
   }
-}
\ No newline at end of file
+
+  /**
+   * Do a small get/scan against one store. This is required because store
+   * has no actual methods of querying itself, and relies on StoreScanner.
+   */
+  public static List<KeyValue> getFromStoreFile(Store store,
+                                                Get get) throws IOException {
+    ReadWriteConsistencyControl.resetThreadReadPoint();
+    Scan scan = new Scan(get);
+    InternalScanner scanner = (InternalScanner) store.getScanner(scan,
+        scan.getFamilyMap().get(store.getFamily().getName()));
+
+    List<KeyValue> result = new ArrayList<KeyValue>();
+    scanner.next(result);
+    if (!result.isEmpty()) {
+      // verify that we are on the row we want:
+      KeyValue kv = result.get(0);
+      if (!Bytes.equals(kv.getRow(), get.getRow())) {
+        result.clear();
+      }
+    }
+    return result;
+  }
+
+  /**
+   * Do a small get/scan against one store. This is required because store
+   * has no actual methods of querying itself, and relies on StoreScanner.
+   */
+  public static List<KeyValue> getFromStoreFile(Store store,
+                                                byte [] row,
+                                                NavigableSet<byte[]> columns
+                                                ) throws IOException {
+    Get get = new Get(row);
+    Map<byte[], NavigableSet<byte[]>> s = get.getFamilyMap();
+    s.put(store.getFamily().getName(), columns);
+
+    return getFromStoreFile(store,get);
+  }
+}

Modified: hbase/branches/0.90_master_rewrite/src/test/java/org/apache/hadoop/hbase/MiniHBaseCluster.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.90_master_rewrite/src/test/java/org/apache/hadoop/hbase/MiniHBaseCluster.java?rev=990018&r1=990017&r2=990018&view=diff
==============================================================================
--- hbase/branches/0.90_master_rewrite/src/test/java/org/apache/hadoop/hbase/MiniHBaseCluster.java (original)
+++ hbase/branches/0.90_master_rewrite/src/test/java/org/apache/hadoop/hbase/MiniHBaseCluster.java Fri Aug 27 05:01:02 2010
@@ -350,6 +350,21 @@ public class MiniHBaseCluster {
   }
 
   /**
+   * Call flushCache on all regions of the specified table.
+   * @throws IOException
+   */
+  public void flushcache(byte [] tableName) throws IOException {
+    for (JVMClusterUtil.RegionServerThread t:
+        this.hbaseCluster.getRegionServers()) {
+      for(HRegion r: t.getRegionServer().getOnlineRegionsLocalContext()) {
+        if(Bytes.equals(r.getTableDesc().getName(), tableName)) {
+          r.flushcache();
+        }
+      }
+    }
+  }
+
+  /**
    * @return List of region server threads.
    */
   public List<JVMClusterUtil.RegionServerThread> getRegionServerThreads() {

Modified: hbase/branches/0.90_master_rewrite/src/test/java/org/apache/hadoop/hbase/PerformanceEvaluation.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.90_master_rewrite/src/test/java/org/apache/hadoop/hbase/PerformanceEvaluation.java?rev=990018&r1=990017&r2=990018&view=diff
==============================================================================
--- hbase/branches/0.90_master_rewrite/src/test/java/org/apache/hadoop/hbase/PerformanceEvaluation.java (original)
+++ hbase/branches/0.90_master_rewrite/src/test/java/org/apache/hadoop/hbase/PerformanceEvaluation.java Fri Aug 27 05:01:02 2010
@@ -746,7 +746,6 @@ public class PerformanceEvaluation {
       this.admin = new HBaseAdmin(conf);
       this.table = new HTable(conf, tableName);
       this.table.setAutoFlush(false);
-      this.table.setWriteBufferSize(1024*1024*12);
       this.table.setScannerCaching(30);
     }
 

Modified: hbase/branches/0.90_master_rewrite/src/test/java/org/apache/hadoop/hbase/TestAcidGuarantees.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.90_master_rewrite/src/test/java/org/apache/hadoop/hbase/TestAcidGuarantees.java?rev=990018&r1=990017&r2=990018&view=diff
==============================================================================
--- hbase/branches/0.90_master_rewrite/src/test/java/org/apache/hadoop/hbase/TestAcidGuarantees.java (original)
+++ hbase/branches/0.90_master_rewrite/src/test/java/org/apache/hadoop/hbase/TestAcidGuarantees.java Fri Aug 27 05:01:02 2010
@@ -286,6 +286,7 @@ public class TestAcidGuarantees {
   }
 
   @Test
+  @Ignore("Currently not passing - see HBASE-2856")
   public void testGetAtomicity() throws Exception {
     util.startMiniCluster(1);
     try {

Added: hbase/branches/0.90_master_rewrite/src/test/java/org/apache/hadoop/hbase/client/TestMultipleTimestamps.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.90_master_rewrite/src/test/java/org/apache/hadoop/hbase/client/TestMultipleTimestamps.java?rev=990018&view=auto
==============================================================================
--- hbase/branches/0.90_master_rewrite/src/test/java/org/apache/hadoop/hbase/client/TestMultipleTimestamps.java (added)
+++ hbase/branches/0.90_master_rewrite/src/test/java/org/apache/hadoop/hbase/client/TestMultipleTimestamps.java Fri Aug 27 05:01:02 2010
@@ -0,0 +1,516 @@
+/**
+ * Copyright 2009 The Apache Software Foundation
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.client;
+
+import static org.junit.Assert.*;
+
+import java.io.IOException;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.List;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.hbase.HBaseTestingUtility;
+import org.apache.hadoop.hbase.KeyValue;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.junit.After;
+import org.junit.AfterClass;
+import org.junit.Before;
+import org.junit.BeforeClass;
+import org.junit.Test;
+
+/**
+ * Run tests related to {@link TimestampsFilter} using HBase client APIs.
+ * Sets up the HBase mini cluster once at start. Each creates a table
+ * named for the method and does its stuff against that.
+ */
+public class TestMultipleTimestamps {
+  final Log LOG = LogFactory.getLog(getClass());
+  private final static HBaseTestingUtility TEST_UTIL = new HBaseTestingUtility();
+
+  /**
+   * @throws java.lang.Exception
+   */
+  @BeforeClass
+  public static void setUpBeforeClass() throws Exception {
+    TEST_UTIL.startMiniCluster(3);
+  }
+
+  /**
+   * @throws java.lang.Exception
+   */
+  @AfterClass
+  public static void tearDownAfterClass() throws Exception {
+    TEST_UTIL.shutdownMiniCluster();
+  }
+
+  /**
+   * @throws java.lang.Exception
+   */
+  @Before
+  public void setUp() throws Exception {
+    // Nothing to do.
+  }
+
+  /**
+   * @throws java.lang.Exception
+   */
+  @After
+  public void tearDown() throws Exception {
+    // Nothing to do.
+  }
+
+  @Test
+  public void testReseeksWithOneColumnMiltipleTimestamp() throws IOException {
+    byte [] TABLE = Bytes.toBytes("testReseeksWithOne" +
+    "ColumnMiltipleTimestamps");
+    byte [] FAMILY = Bytes.toBytes("event_log");
+    byte [][] FAMILIES = new byte[][] { FAMILY };
+
+    // create table; set versions to max...
+    HTable ht = TEST_UTIL.createTable(TABLE, FAMILIES, Integer.MAX_VALUE);
+
+    Integer[] putRows = new Integer[] {1, 3, 5, 7};
+    Integer[] putColumns = new Integer[] { 1, 3, 5};
+    Long[] putTimestamps = new Long[] {1L, 2L, 3L, 4L, 5L};
+
+    Integer[] scanRows = new Integer[] {3, 5};
+    Integer[] scanColumns = new Integer[] {3};
+    Long[] scanTimestamps = new Long[] {3L, 4L};
+    int scanMaxVersions = 2;
+
+    put(ht, FAMILY, putRows, putColumns, putTimestamps);
+
+    flush(TABLE);
+
+    ResultScanner scanner = scan(ht, FAMILY, scanRows, scanColumns,
+        scanTimestamps, scanMaxVersions);
+
+    KeyValue[] kvs;
+
+    kvs = scanner.next().raw();
+    assertEquals(2, kvs.length);
+    checkOneCell(kvs[0], FAMILY, 3, 3, 4);
+    checkOneCell(kvs[1], FAMILY, 3, 3, 3);
+    kvs = scanner.next().raw();
+    assertEquals(2, kvs.length);
+    checkOneCell(kvs[0], FAMILY, 5, 3, 4);
+    checkOneCell(kvs[1], FAMILY, 5, 3, 3);
+  }
+
+  @Test
+  public void testReseeksWithMultipleColumnOneTimestamp() throws IOException {
+    byte [] TABLE = Bytes.toBytes("testReseeksWithMultiple" +
+    "ColumnOneTimestamps");
+    byte [] FAMILY = Bytes.toBytes("event_log");
+    byte [][] FAMILIES = new byte[][] { FAMILY };
+
+    // create table; set versions to max...
+    HTable ht = TEST_UTIL.createTable(TABLE, FAMILIES, Integer.MAX_VALUE);
+
+    Integer[] putRows = new Integer[] {1, 3, 5, 7};
+    Integer[] putColumns = new Integer[] { 1, 3, 5};
+    Long[] putTimestamps = new Long[] {1L, 2L, 3L, 4L, 5L};
+
+    Integer[] scanRows = new Integer[] {3, 5};
+    Integer[] scanColumns = new Integer[] {3,4};
+    Long[] scanTimestamps = new Long[] {3L};
+    int scanMaxVersions = 2;
+
+    put(ht, FAMILY, putRows, putColumns, putTimestamps);
+
+    flush(TABLE);
+
+    ResultScanner scanner = scan(ht, FAMILY, scanRows, scanColumns,
+        scanTimestamps, scanMaxVersions);
+
+    KeyValue[] kvs;
+
+    kvs = scanner.next().raw();
+    assertEquals(1, kvs.length);
+    checkOneCell(kvs[0], FAMILY, 3, 3, 3);
+    kvs = scanner.next().raw();
+    assertEquals(1, kvs.length);
+    checkOneCell(kvs[0], FAMILY, 5, 3, 3);
+  }
+
+  @Test
+  public void testReseeksWithMultipleColumnMultipleTimestamp() throws
+  IOException {
+    byte [] TABLE = Bytes.toBytes("testReseeksWithMultiple" +
+    "ColumnMiltipleTimestamps");
+    byte [] FAMILY = Bytes.toBytes("event_log");
+    byte [][] FAMILIES = new byte[][] { FAMILY };
+
+    // create table; set versions to max...
+    HTable ht = TEST_UTIL.createTable(TABLE, FAMILIES, Integer.MAX_VALUE);
+
+    Integer[] putRows = new Integer[] {1, 3, 5, 7};
+    Integer[] putColumns = new Integer[] { 1, 3, 5};
+    Long[] putTimestamps = new Long[] {1L, 2L, 3L, 4L, 5L};
+
+    Integer[] scanRows = new Integer[] {5, 7};
+    Integer[] scanColumns = new Integer[] {3, 4, 5};
+    Long[] scanTimestamps = new Long[] {2l, 3L};
+    int scanMaxVersions = 2;
+
+    put(ht, FAMILY, putRows, putColumns, putTimestamps);
+
+    flush(TABLE);
+
+    ResultScanner scanner = scan(ht, FAMILY, scanRows, scanColumns,
+        scanTimestamps, scanMaxVersions);
+
+    KeyValue[] kvs;
+
+    kvs = scanner.next().raw();
+    assertEquals(4, kvs.length);
+    checkOneCell(kvs[0], FAMILY, 5, 3, 3);
+    checkOneCell(kvs[1], FAMILY, 5, 3, 2);
+    checkOneCell(kvs[2], FAMILY, 5, 5, 3);
+    checkOneCell(kvs[3], FAMILY, 5, 5, 2);
+    kvs = scanner.next().raw();
+    assertEquals(4, kvs.length);
+    checkOneCell(kvs[0], FAMILY, 7, 3, 3);
+    checkOneCell(kvs[1], FAMILY, 7, 3, 2);
+    checkOneCell(kvs[2], FAMILY, 7, 5, 3);
+    checkOneCell(kvs[3], FAMILY, 7, 5, 2);
+  }
+
+  @Test
+  public void testReseeksWithMultipleFiles() throws IOException {
+    byte [] TABLE = Bytes.toBytes("testReseeksWithMultipleFiles");
+    byte [] FAMILY = Bytes.toBytes("event_log");
+    byte [][] FAMILIES = new byte[][] { FAMILY };
+
+    // create table; set versions to max...
+    HTable ht = TEST_UTIL.createTable(TABLE, FAMILIES, Integer.MAX_VALUE);
+
+    Integer[] putRows1 = new Integer[] {1, 2, 3};
+    Integer[] putColumns1 = new Integer[] { 2, 5, 6};
+    Long[] putTimestamps1 = new Long[] {1L, 2L, 5L};
+
+    Integer[] putRows2 = new Integer[] {6, 7};
+    Integer[] putColumns2 = new Integer[] {3, 6};
+    Long[] putTimestamps2 = new Long[] {4L, 5L};
+
+    Integer[] putRows3 = new Integer[] {2, 3, 5};
+    Integer[] putColumns3 = new Integer[] {1, 2, 3};
+    Long[] putTimestamps3 = new Long[] {4L,8L};
+
+
+    Integer[] scanRows = new Integer[] {3, 5, 7};
+    Integer[] scanColumns = new Integer[] {3, 4, 5};
+    Long[] scanTimestamps = new Long[] {2l, 4L};
+    int scanMaxVersions = 5;
+
+    put(ht, FAMILY, putRows1, putColumns1, putTimestamps1);
+    flush(TABLE);
+    put(ht, FAMILY, putRows2, putColumns2, putTimestamps2);
+    flush(TABLE);
+    put(ht, FAMILY, putRows3, putColumns3, putTimestamps3);
+
+    ResultScanner scanner = scan(ht, FAMILY, scanRows, scanColumns,
+        scanTimestamps, scanMaxVersions);
+
+    KeyValue[] kvs;
+
+    kvs = scanner.next().raw();
+    assertEquals(2, kvs.length);
+    checkOneCell(kvs[0], FAMILY, 3, 3, 4);
+    checkOneCell(kvs[1], FAMILY, 3, 5, 2);
+
+    kvs = scanner.next().raw();
+    assertEquals(1, kvs.length);
+    checkOneCell(kvs[0], FAMILY, 5, 3, 4);
+
+    kvs = scanner.next().raw();
+    assertEquals(1, kvs.length);
+    checkOneCell(kvs[0], FAMILY, 6, 3, 4);
+
+    kvs = scanner.next().raw();
+    assertEquals(1, kvs.length);
+    checkOneCell(kvs[0], FAMILY, 7, 3, 4);
+  }
+
+  @Test
+  public void testWithVersionDeletes() throws Exception {
+
+    // first test from memstore (without flushing).
+    testWithVersionDeletes(false);
+
+    // run same test against HFiles (by forcing a flush).
+    testWithVersionDeletes(true);
+  }
+
+  public void testWithVersionDeletes(boolean flushTables) throws IOException {
+    byte [] TABLE = Bytes.toBytes("testWithVersionDeletes_" +
+        (flushTables ? "flush" : "noflush"));
+    byte [] FAMILY = Bytes.toBytes("event_log");
+    byte [][] FAMILIES = new byte[][] { FAMILY };
+
+    // create table; set versions to max...
+    HTable ht = TEST_UTIL.createTable(TABLE, FAMILIES, Integer.MAX_VALUE);
+
+    // For row:0, col:0: insert versions 1 through 5.
+    putNVersions(ht, FAMILY, 0, 0, 1, 5);
+
+    if (flushTables) {
+      flush(TABLE);
+    }
+
+    // delete version 4.
+    deleteOneVersion(ht, FAMILY, 0, 0, 4);
+
+    // request a bunch of versions including the deleted version. We should
+    // only get back entries for the versions that exist.
+    KeyValue kvs[] = getNVersions(ht, FAMILY, 0, 0,
+        Arrays.asList(2L, 3L, 4L, 5L));
+    assertEquals(3, kvs.length);
+    checkOneCell(kvs[0], FAMILY, 0, 0, 5);
+    checkOneCell(kvs[1], FAMILY, 0, 0, 3);
+    checkOneCell(kvs[2], FAMILY, 0, 0, 2);
+  }
+
+  @Test
+  public void testWithMultipleVersionDeletes() throws IOException {
+    byte [] TABLE = Bytes.toBytes("testWithMultipleVersionDeletes");
+    byte [] FAMILY = Bytes.toBytes("event_log");
+    byte [][] FAMILIES = new byte[][] { FAMILY };
+
+    // create table; set versions to max...
+    HTable ht = TEST_UTIL.createTable(TABLE, FAMILIES, Integer.MAX_VALUE);
+
+    // For row:0, col:0: insert versions 1 through 5.
+    putNVersions(ht, FAMILY, 0, 0, 1, 5);
+
+    flush(TABLE);
+
+    // delete all versions before 4.
+    deleteAllVersionsBefore(ht, FAMILY, 0, 0, 4);
+
+    // request a bunch of versions including the deleted version. We should
+    // only get back entries for the versions that exist.
+    KeyValue kvs[] = getNVersions(ht, FAMILY, 0, 0, Arrays.asList(2L, 3L));
+    assertEquals(0, kvs.length);
+  }
+
+  @Test
+  public void testWithColumnDeletes() throws IOException {
+    byte [] TABLE = Bytes.toBytes("testWithColumnDeletes");
+    byte [] FAMILY = Bytes.toBytes("event_log");
+    byte [][] FAMILIES = new byte[][] { FAMILY };
+
+    // create table; set versions to max...
+    HTable ht = TEST_UTIL.createTable(TABLE, FAMILIES, Integer.MAX_VALUE);
+
+    // For row:0, col:0: insert versions 1 through 5.
+    putNVersions(ht, FAMILY, 0, 0, 1, 5);
+
+    flush(TABLE);
+
+    // delete all versions before 4.
+    deleteColumn(ht, FAMILY, 0, 0);
+
+    // request a bunch of versions including the deleted version. We should
+    // only get back entries for the versions that exist.
+    KeyValue kvs[] = getNVersions(ht, FAMILY, 0, 0, Arrays.asList(2L, 3L));
+    assertEquals(0, kvs.length);
+  }
+
+  @Test
+  public void testWithFamilyDeletes() throws IOException {
+    byte [] TABLE = Bytes.toBytes("testWithFamilyDeletes");
+    byte [] FAMILY = Bytes.toBytes("event_log");
+    byte [][] FAMILIES = new byte[][] { FAMILY };
+
+    // create table; set versions to max...
+    HTable ht = TEST_UTIL.createTable(TABLE, FAMILIES, Integer.MAX_VALUE);
+
+    // For row:0, col:0: insert versions 1 through 5.
+    putNVersions(ht, FAMILY, 0, 0, 1, 5);
+
+    flush(TABLE);
+
+    // delete all versions before 4.
+    deleteFamily(ht, FAMILY, 0);
+
+    // request a bunch of versions including the deleted version. We should
+    // only get back entries for the versions that exist.
+    KeyValue kvs[] = getNVersions(ht, FAMILY, 0, 0, Arrays.asList(2L, 3L));
+    assertEquals(0, kvs.length);
+  }
+
+  // Flush tables. Since flushing is asynchronous, sleep for a bit.
+  private void flush(byte [] tableName) throws IOException {
+    TEST_UTIL.flush(tableName);
+    try {
+      Thread.sleep(3000);
+    } catch (InterruptedException i) {
+      // ignore
+    }
+  }
+
+  /**
+   * Assert that the passed in KeyValue has expected contents for the
+   * specified row, column & timestamp.
+   */
+  private void checkOneCell(KeyValue kv, byte[] cf,
+      int rowIdx, int colIdx, long ts) {
+
+    String ctx = "rowIdx=" + rowIdx + "; colIdx=" + colIdx + "; ts=" + ts;
+
+    assertEquals("Row mismatch which checking: " + ctx,
+        "row:"+ rowIdx, Bytes.toString(kv.getRow()));
+
+    assertEquals("ColumnFamily mismatch while checking: " + ctx,
+        Bytes.toString(cf), Bytes.toString(kv.getFamily()));
+
+    assertEquals("Column qualifier mismatch while checking: " + ctx,
+        "column:" + colIdx,
+        Bytes.toString(kv.getQualifier()));
+
+    assertEquals("Timestamp mismatch while checking: " + ctx,
+        ts, kv.getTimestamp());
+
+    assertEquals("Value mismatch while checking: " + ctx,
+        "value-version-" + ts, Bytes.toString(kv.getValue()));
+  }
+
+  /**
+   * Uses the TimestampFilter on a Get to request a specified list of
+   * versions for the row/column specified by rowIdx & colIdx.
+   *
+   */
+  private  KeyValue[] getNVersions(HTable ht, byte[] cf, int rowIdx,
+      int colIdx, List<Long> versions)
+  throws IOException {
+    byte row[] = Bytes.toBytes("row:" + rowIdx);
+    byte column[] = Bytes.toBytes("column:" + colIdx);
+    Get get = new Get(row);
+    get.addColumn(cf, column);
+    get.setMaxVersions();
+    get.setTimeRange(Collections.min(versions), Collections.max(versions)+1);
+    Result result = ht.get(get);
+
+    return result.raw();
+  }
+
+  private  ResultScanner scan(HTable ht, byte[] cf,
+      Integer[] rowIndexes, Integer[] columnIndexes,
+      Long[] versions, int maxVersions)
+  throws IOException {
+    Arrays.asList(rowIndexes);
+    byte startRow[] = Bytes.toBytes("row:" +
+        Collections.min( Arrays.asList(rowIndexes)));
+    byte endRow[] = Bytes.toBytes("row:" +
+        Collections.max( Arrays.asList(rowIndexes))+1);
+    Scan scan = new Scan(startRow, endRow);
+    for (Integer colIdx: columnIndexes) {
+      byte column[] = Bytes.toBytes("column:" + colIdx);
+      scan.addColumn(cf, column);
+    }
+    scan.setMaxVersions(maxVersions);
+    scan.setTimeRange(Collections.min(Arrays.asList(versions)),
+        Collections.max(Arrays.asList(versions))+1);
+    ResultScanner scanner = ht.getScanner(scan);
+    return scanner;
+  }
+
+  private void put(HTable ht, byte[] cf, Integer[] rowIndexes,
+      Integer[] columnIndexes, Long[] versions)
+  throws IOException {
+    for (int rowIdx: rowIndexes) {
+      byte row[] = Bytes.toBytes("row:" + rowIdx);
+      Put put = new Put(row);
+      for(int colIdx: columnIndexes) {
+        byte column[] = Bytes.toBytes("column:" + colIdx);
+        for (long version: versions) {
+          put.add(cf, column, version, Bytes.toBytes("value-version-" +
+              version));
+        }
+      }
+      ht.put(put);
+    }
+  }
+
+  /**
+   * Insert in specific row/column versions with timestamps
+   * versionStart..versionEnd.
+   */
+  private void putNVersions(HTable ht, byte[] cf, int rowIdx, int colIdx,
+      long versionStart, long versionEnd)
+  throws IOException {
+    byte row[] = Bytes.toBytes("row:" + rowIdx);
+    byte column[] = Bytes.toBytes("column:" + colIdx);
+    Put put = new Put(row);
+
+    for (long idx = versionStart; idx <= versionEnd; idx++) {
+      put.add(cf, column, idx, Bytes.toBytes("value-version-" + idx));
+    }
+
+    ht.put(put);
+  }
+
+  /**
+   * For row/column specified by rowIdx/colIdx, delete the cell
+   * corresponding to the specified version.
+   */
+  private void deleteOneVersion(HTable ht, byte[] cf, int rowIdx,
+      int colIdx, long version)
+  throws IOException {
+    byte row[] = Bytes.toBytes("row:" + rowIdx);
+    byte column[] = Bytes.toBytes("column:" + colIdx);
+    Delete del = new Delete(row);
+    del.deleteColumn(cf, column, version);
+    ht.delete(del);
+  }
+
+  /**
+   * For row/column specified by rowIdx/colIdx, delete all cells
+   * preceeding the specified version.
+   */
+  private void deleteAllVersionsBefore(HTable ht, byte[] cf, int rowIdx,
+      int colIdx, long version)
+  throws IOException {
+    byte row[] = Bytes.toBytes("row:" + rowIdx);
+    byte column[] = Bytes.toBytes("column:" + colIdx);
+    Delete del = new Delete(row);
+    del.deleteColumns(cf, column, version);
+    ht.delete(del);
+  }
+
+  private void deleteColumn(HTable ht, byte[] cf, int rowIdx, int colIdx) throws IOException {
+    byte row[] = Bytes.toBytes("row:" + rowIdx);
+    byte column[] = Bytes.toBytes("column:" + colIdx);
+    Delete del = new Delete(row);
+    del.deleteColumns(cf, column);
+    ht.delete(del);
+  }
+
+  private void deleteFamily(HTable ht, byte[] cf, int rowIdx) throws IOException {
+    byte row[] = Bytes.toBytes("row:" + rowIdx);
+    Delete del = new Delete(row);
+    del.deleteFamily(cf);
+    ht.delete(del);
+  }
+}
+

Modified: hbase/branches/0.90_master_rewrite/src/test/java/org/apache/hadoop/hbase/client/TestScannerTimeout.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.90_master_rewrite/src/test/java/org/apache/hadoop/hbase/client/TestScannerTimeout.java?rev=990018&r1=990017&r2=990018&view=diff
==============================================================================
--- hbase/branches/0.90_master_rewrite/src/test/java/org/apache/hadoop/hbase/client/TestScannerTimeout.java (original)
+++ hbase/branches/0.90_master_rewrite/src/test/java/org/apache/hadoop/hbase/client/TestScannerTimeout.java Fri Aug 27 05:01:02 2010
@@ -24,6 +24,7 @@ import org.apache.commons.logging.LogFac
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.hbase.HBaseTestingUtility;
 import org.apache.hadoop.hbase.HConstants;
+import org.apache.hadoop.hbase.regionserver.HRegionServer;
 import org.apache.hadoop.hbase.util.Bytes;
 import org.junit.After;
 import org.junit.AfterClass;
@@ -114,9 +115,7 @@ public class TestScannerTimeout {
    */
   @Test
   public void test2772() throws Exception {
-    int rs = TEST_UTIL.getHBaseCluster().getServerWith(
-        TEST_UTIL.getHBaseCluster().getRegions(
-            TABLE_NAME).get(0).getRegionName());
+    HRegionServer rs = TEST_UTIL.getRSForFirstRegionInTable(TABLE_NAME);
     Scan scan = new Scan();
     // Set a very high timeout, we want to test what happens when a RS
     // fails but the region is recovered before the lease times out.
@@ -128,7 +127,7 @@ public class TestScannerTimeout {
     HTable higherScanTimeoutTable = new HTable(conf, TABLE_NAME);
     ResultScanner r = higherScanTimeoutTable.getScanner(scan);
     // This takes way less than SCANNER_TIMEOUT*100
-    TEST_UTIL.getHBaseCluster().getRegionServer(rs).abort("die!");
+    rs.abort("die!");
     Result[] results = r.next(NB_ROWS);
     assertEquals(NB_ROWS, results.length);
     r.close();

Added: hbase/branches/0.90_master_rewrite/src/test/java/org/apache/hadoop/hbase/client/TestTimestampsFilter.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.90_master_rewrite/src/test/java/org/apache/hadoop/hbase/client/TestTimestampsFilter.java?rev=990018&view=auto
==============================================================================
--- hbase/branches/0.90_master_rewrite/src/test/java/org/apache/hadoop/hbase/client/TestTimestampsFilter.java (added)
+++ hbase/branches/0.90_master_rewrite/src/test/java/org/apache/hadoop/hbase/client/TestTimestampsFilter.java Fri Aug 27 05:01:02 2010
@@ -0,0 +1,342 @@
+/**
+ * Copyright 2009 The Apache Software Foundation
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.client;
+
+import static org.junit.Assert.*;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.hbase.HBaseTestingUtility;
+import org.apache.hadoop.hbase.KeyValue;
+import org.apache.hadoop.hbase.filter.Filter;
+import org.apache.hadoop.hbase.filter.TimestampsFilter;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.junit.After;
+import org.junit.AfterClass;
+import org.junit.Before;
+import org.junit.BeforeClass;
+import org.junit.Test;
+
+/**
+ * Run tests related to {@link TimestampsFilter} using HBase client APIs.
+ * Sets up the HBase mini cluster once at start. Each creates a table
+ * named for the method and does its stuff against that.
+ */
+public class TestTimestampsFilter {
+  final Log LOG = LogFactory.getLog(getClass());
+  private final static HBaseTestingUtility TEST_UTIL = new HBaseTestingUtility();
+
+  /**
+   * @throws java.lang.Exception
+   */
+  @BeforeClass
+  public static void setUpBeforeClass() throws Exception {
+    TEST_UTIL.startMiniCluster(3);
+  }
+
+  /**
+   * @throws java.lang.Exception
+   */
+  @AfterClass
+  public static void tearDownAfterClass() throws Exception {
+    TEST_UTIL.shutdownMiniCluster();
+  }
+
+  /**
+   * @throws java.lang.Exception
+   */
+  @Before
+  public void setUp() throws Exception {
+    // Nothing to do.
+  }
+
+  /**
+   * @throws java.lang.Exception
+   */
+  @After
+  public void tearDown() throws Exception {
+    // Nothing to do.
+  }
+
+  /**
+   * Test from client side for TimestampsFilter.
+   *
+   * The TimestampsFilter provides the ability to request cells (KeyValues)
+   * whose timestamp/version is in the specified list of timestamps/version.
+   *
+   * @throws Exception
+   */
+  @Test
+  public void testTimestampsFilter() throws Exception {
+    byte [] TABLE = Bytes.toBytes("testTimestampsFilter");
+    byte [] FAMILY = Bytes.toBytes("event_log");
+    byte [][] FAMILIES = new byte[][] { FAMILY };
+    KeyValue kvs[];
+
+    // create table; set versions to max...
+    HTable ht = TEST_UTIL.createTable(TABLE, FAMILIES, Integer.MAX_VALUE);
+
+    for (int rowIdx = 0; rowIdx < 5; rowIdx++) {
+      for (int colIdx = 0; colIdx < 5; colIdx++) {
+        // insert versions 201..300
+        putNVersions(ht, FAMILY, rowIdx, colIdx, 201, 300);
+        // insert versions 1..100
+        putNVersions(ht, FAMILY, rowIdx, colIdx, 1, 100);
+      }
+    }
+
+    // do some verification before flush
+    verifyInsertedValues(ht, FAMILY);
+
+    flush();
+
+    // do some verification after flush
+    verifyInsertedValues(ht, FAMILY);
+
+    // Insert some more versions after flush. These should be in memstore.
+    // After this we should have data in both memstore & HFiles.
+    for (int rowIdx = 0; rowIdx < 5; rowIdx++) {
+      for (int colIdx = 0; colIdx < 5; colIdx++) {
+        putNVersions(ht, FAMILY, rowIdx, colIdx, 301, 400);
+        putNVersions(ht, FAMILY, rowIdx, colIdx, 101, 200);
+      }
+    }
+
+    for (int rowIdx = 0; rowIdx < 5; rowIdx++) {
+      for (int colIdx = 0; colIdx < 5; colIdx++) {
+        kvs = getNVersions(ht, FAMILY, rowIdx, colIdx,
+                           Arrays.asList(505L, 5L, 105L, 305L, 205L));
+        assertEquals(4, kvs.length);
+        checkOneCell(kvs[0], FAMILY, rowIdx, colIdx, 305);
+        checkOneCell(kvs[1], FAMILY, rowIdx, colIdx, 205);
+        checkOneCell(kvs[2], FAMILY, rowIdx, colIdx, 105);
+        checkOneCell(kvs[3], FAMILY, rowIdx, colIdx, 5);
+      }
+    }
+
+    // Request an empty list of versions using the Timestamps filter;
+    // Should return none.
+    kvs = getNVersions(ht, FAMILY, 2, 2, new ArrayList<Long>());
+    assertEquals(0, kvs.length);
+
+    //
+    // Test the filter using a Scan operation
+    // Scan rows 0..4. For each row, get all its columns, but only
+    // those versions of the columns with the specified timestamps.
+    Result[] results = scanNVersions(ht, FAMILY, 0, 4,
+                                     Arrays.asList(6L, 106L, 306L));
+    assertEquals("# of rows returned from scan", 5, results.length);
+    for (int rowIdx = 0; rowIdx < 5; rowIdx++) {
+      kvs = results[rowIdx].raw();
+      // each row should have 5 columns.
+      // And we have requested 3 versions for each.
+      assertEquals("Number of KeyValues in result for row:" + rowIdx,
+                   3*5, kvs.length);
+      for (int colIdx = 0; colIdx < 5; colIdx++) {
+        int offset = colIdx * 3;
+        checkOneCell(kvs[offset + 0], FAMILY, rowIdx, colIdx, 306);
+        checkOneCell(kvs[offset + 1], FAMILY, rowIdx, colIdx, 106);
+        checkOneCell(kvs[offset + 2], FAMILY, rowIdx, colIdx, 6);
+      }
+    }
+  }
+
+  /**
+   * Test TimestampsFilter in the presence of version deletes.
+   *
+   * @throws Exception
+   */
+  @Test
+  public void testWithVersionDeletes() throws Exception {
+
+    // first test from memstore (without flushing).
+    testWithVersionDeletes(false);
+
+    // run same test against HFiles (by forcing a flush).
+    testWithVersionDeletes(true);
+  }
+
+  private void testWithVersionDeletes(boolean flushTables) throws IOException {
+    byte [] TABLE = Bytes.toBytes("testWithVersionDeletes_" +
+                                   (flushTables ? "flush" : "noflush")); 
+    byte [] FAMILY = Bytes.toBytes("event_log");
+    byte [][] FAMILIES = new byte[][] { FAMILY };
+
+    // create table; set versions to max...
+    HTable ht = TEST_UTIL.createTable(TABLE, FAMILIES, Integer.MAX_VALUE);
+
+    // For row:0, col:0: insert versions 1 through 5.
+    putNVersions(ht, FAMILY, 0, 0, 1, 5);
+
+    // delete version 4.
+    deleteOneVersion(ht, FAMILY, 0, 0, 4);
+
+    if (flushTables) {
+      flush();
+    }
+
+    // request a bunch of versions including the deleted version. We should
+    // only get back entries for the versions that exist.
+    KeyValue kvs[] = getNVersions(ht, FAMILY, 0, 0, Arrays.asList(2L, 3L, 4L, 5L));
+    assertEquals(3, kvs.length);
+    checkOneCell(kvs[0], FAMILY, 0, 0, 5);
+    checkOneCell(kvs[1], FAMILY, 0, 0, 3);
+    checkOneCell(kvs[2], FAMILY, 0, 0, 2);
+  }
+
+  private void verifyInsertedValues(HTable ht, byte[] cf) throws IOException {
+    for (int rowIdx = 0; rowIdx < 5; rowIdx++) {
+      for (int colIdx = 0; colIdx < 5; colIdx++) {
+        // ask for versions that exist.
+        KeyValue[] kvs = getNVersions(ht, cf, rowIdx, colIdx,
+                                      Arrays.asList(5L, 300L, 6L, 80L));
+        assertEquals(4, kvs.length);
+        checkOneCell(kvs[0], cf, rowIdx, colIdx, 300);
+        checkOneCell(kvs[1], cf, rowIdx, colIdx, 80);
+        checkOneCell(kvs[2], cf, rowIdx, colIdx, 6);
+        checkOneCell(kvs[3], cf, rowIdx, colIdx, 5);
+
+        // ask for versions that do not exist.
+        kvs = getNVersions(ht, cf, rowIdx, colIdx,
+                           Arrays.asList(101L, 102L));
+        assertEquals(0, kvs.length);
+
+        // ask for some versions that exist and some that do not.
+        kvs = getNVersions(ht, cf, rowIdx, colIdx,
+                           Arrays.asList(1L, 300L, 105L, 70L, 115L));
+        assertEquals(3, kvs.length);
+        checkOneCell(kvs[0], cf, rowIdx, colIdx, 300);
+        checkOneCell(kvs[1], cf, rowIdx, colIdx, 70);
+        checkOneCell(kvs[2], cf, rowIdx, colIdx, 1);
+      }
+    }
+  }
+
+  // Flush tables. Since flushing is asynchronous, sleep for a bit.
+  private void flush() throws IOException {
+    TEST_UTIL.flush();
+    try {
+      Thread.sleep(3000);
+    } catch (InterruptedException i) {
+      // ignore
+    }
+  }
+
+  /**
+   * Assert that the passed in KeyValue has expected contents for the
+   * specified row, column & timestamp.
+   */
+  private void checkOneCell(KeyValue kv, byte[] cf,
+                             int rowIdx, int colIdx, long ts) {
+
+    String ctx = "rowIdx=" + rowIdx + "; colIdx=" + colIdx + "; ts=" + ts;
+
+    assertEquals("Row mismatch which checking: " + ctx,
+                 "row:"+ rowIdx, Bytes.toString(kv.getRow()));
+
+    assertEquals("ColumnFamily mismatch while checking: " + ctx,
+                 Bytes.toString(cf), Bytes.toString(kv.getFamily()));
+
+    assertEquals("Column qualifier mismatch while checking: " + ctx,
+                 "column:" + colIdx,
+                  Bytes.toString(kv.getQualifier()));
+
+    assertEquals("Timestamp mismatch while checking: " + ctx,
+                 ts, kv.getTimestamp());
+
+    assertEquals("Value mismatch while checking: " + ctx,
+                 "value-version-" + ts, Bytes.toString(kv.getValue()));
+  }
+
+  /**
+   * Uses the TimestampFilter on a Get to request a specified list of
+   * versions for the row/column specified by rowIdx & colIdx.
+   *
+   */
+  private  KeyValue[] getNVersions(HTable ht, byte[] cf, int rowIdx,
+                                   int colIdx, List<Long> versions)
+    throws IOException {
+    byte row[] = Bytes.toBytes("row:" + rowIdx);
+    byte column[] = Bytes.toBytes("column:" + colIdx);
+    Filter filter = new TimestampsFilter(versions);
+    Get get = new Get(row);
+    get.addColumn(cf, column);
+    get.setFilter(filter);
+    get.setMaxVersions();
+    Result result = ht.get(get);
+
+    return result.raw();
+  }
+
+  /**
+   * Uses the TimestampFilter on a Scan to request a specified list of
+   * versions for the rows from startRowIdx to endRowIdx (both inclusive).
+   */
+  private Result[] scanNVersions(HTable ht, byte[] cf, int startRowIdx,
+                                 int endRowIdx, List<Long> versions)
+    throws IOException {
+    byte startRow[] = Bytes.toBytes("row:" + startRowIdx);
+    byte endRow[] = Bytes.toBytes("row:" + endRowIdx + 1); // exclusive
+    Filter filter = new TimestampsFilter(versions);
+    Scan scan = new Scan(startRow, endRow);
+    scan.setFilter(filter);
+    scan.setMaxVersions();
+    ResultScanner scanner = ht.getScanner(scan);
+    return scanner.next(endRowIdx - startRowIdx + 1);
+  }
+
+  /**
+   * Insert in specific row/column versions with timestamps
+   * versionStart..versionEnd.
+   */
+  private void putNVersions(HTable ht, byte[] cf, int rowIdx, int colIdx,
+                            long versionStart, long versionEnd)
+      throws IOException {
+    byte row[] = Bytes.toBytes("row:" + rowIdx);
+    byte column[] = Bytes.toBytes("column:" + colIdx);
+    Put put = new Put(row);
+
+    for (long idx = versionStart; idx <= versionEnd; idx++) {
+      put.add(cf, column, idx, Bytes.toBytes("value-version-" + idx));
+    }
+
+    ht.put(put);
+  }
+
+  /**
+   * For row/column specified by rowIdx/colIdx, delete the cell
+   * corresponding to the specified version.
+   */
+  private void deleteOneVersion(HTable ht, byte[] cf, int rowIdx,
+                                int colIdx, long version)
+    throws IOException {
+    byte row[] = Bytes.toBytes("row:" + rowIdx);
+    byte column[] = Bytes.toBytes("column:" + colIdx);
+    Delete del = new Delete(row);
+    del.deleteColumn(cf, column, version);
+    ht.delete(del);
+  }
+}
+

Added: hbase/branches/0.90_master_rewrite/src/test/java/org/apache/hadoop/hbase/filter/TestColumnPrefixFilter.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.90_master_rewrite/src/test/java/org/apache/hadoop/hbase/filter/TestColumnPrefixFilter.java?rev=990018&view=auto
==============================================================================
--- hbase/branches/0.90_master_rewrite/src/test/java/org/apache/hadoop/hbase/filter/TestColumnPrefixFilter.java (added)
+++ hbase/branches/0.90_master_rewrite/src/test/java/org/apache/hadoop/hbase/filter/TestColumnPrefixFilter.java Fri Aug 27 05:01:02 2010
@@ -0,0 +1,104 @@
+package org.apache.hadoop.hbase.filter;
+
+import static org.junit.Assert.*;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+
+import org.apache.hadoop.hbase.HBaseTestingUtility;
+import org.apache.hadoop.hbase.HColumnDescriptor;
+import org.apache.hadoop.hbase.HRegionInfo;
+import org.apache.hadoop.hbase.HTableDescriptor;
+import org.apache.hadoop.hbase.KeyValue;
+import org.apache.hadoop.hbase.KeyValueTestUtil;
+import org.apache.hadoop.hbase.client.Put;
+import org.apache.hadoop.hbase.client.Scan;
+import org.apache.hadoop.hbase.regionserver.HRegion;
+import org.apache.hadoop.hbase.regionserver.InternalScanner;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.junit.Test;
+
+public class TestColumnPrefixFilter {
+
+  private final static HBaseTestingUtility TEST_UTIL = new
+      HBaseTestingUtility();
+
+  @Test
+  public void testColumnPrefixFilter() throws IOException {
+    String family = "Family";
+    HTableDescriptor htd = new HTableDescriptor("TestColumnPrefixFilter");
+    htd.addFamily(new HColumnDescriptor(family));
+    HRegionInfo info = new HRegionInfo(htd, null, null, false);
+    HRegion region = HRegion.createHRegion(info, HBaseTestingUtility.
+        getTestDir(), TEST_UTIL.getConfiguration());
+
+    List<String> rows = generateRandomWords(100, "row");
+    List<String> columns = generateRandomWords(10000, "column");
+    long maxTimestamp = 2;
+
+    List<KeyValue> kvList = new ArrayList<KeyValue>();
+
+    Map<String, List<KeyValue>> prefixMap = new HashMap<String,
+        List<KeyValue>>();
+
+    prefixMap.put("p", new ArrayList<KeyValue>());
+    prefixMap.put("s", new ArrayList<KeyValue>());
+
+    String valueString = "ValueString";
+
+    for (String row: rows) {
+      Put p = new Put(Bytes.toBytes(row));
+      for (String column: columns) {
+        for (long timestamp = 1; timestamp <= maxTimestamp; timestamp++) {
+          KeyValue kv = KeyValueTestUtil.create(row, family, column, timestamp,
+              valueString);
+          p.add(kv);
+          kvList.add(kv);
+          for (String s: prefixMap.keySet()) {
+            if (column.startsWith(s)) {
+              prefixMap.get(s).add(kv);
+            }
+          }
+        }
+      }
+      region.put(p);
+    }
+
+    ColumnPrefixFilter filter;
+    Scan scan = new Scan();
+    scan.setMaxVersions();
+    for (String s: prefixMap.keySet()) {
+      filter = new ColumnPrefixFilter(Bytes.toBytes(s));
+      scan.setFilter(filter);
+      InternalScanner scanner = region.getScanner(scan);
+      List<KeyValue> results = new ArrayList<KeyValue>();
+      while(scanner.next(results));
+      assertEquals(prefixMap.get(s).size(), results.size());
+    }
+  }
+
+  List<String> generateRandomWords(int numberOfWords, String suffix) {
+    Set<String> wordSet = new HashSet<String>();
+    for (int i = 0; i < numberOfWords; i++) {
+      int lengthOfWords = (int) (Math.random()*2) + 1;
+      char[] wordChar = new char[lengthOfWords];
+      for (int j = 0; j < wordChar.length; j++) {
+        wordChar[j] = (char) (Math.random() * 26 + 97);
+      }
+      String word;
+      if (suffix == null) {
+        word = new String(wordChar);
+      } else {
+        word = new String(wordChar) + suffix;
+      }
+      wordSet.add(word);
+    }
+    List<String> wordList = new ArrayList<String>(wordSet);
+    return wordList;
+  }
+}

Modified: hbase/branches/0.90_master_rewrite/src/test/java/org/apache/hadoop/hbase/io/TestImmutableBytesWritable.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.90_master_rewrite/src/test/java/org/apache/hadoop/hbase/io/TestImmutableBytesWritable.java?rev=990018&r1=990017&r2=990018&view=diff
==============================================================================
--- hbase/branches/0.90_master_rewrite/src/test/java/org/apache/hadoop/hbase/io/TestImmutableBytesWritable.java (original)
+++ hbase/branches/0.90_master_rewrite/src/test/java/org/apache/hadoop/hbase/io/TestImmutableBytesWritable.java Fri Aug 27 05:01:02 2010
@@ -40,6 +40,13 @@ public class TestImmutableBytesWritable 
       new ImmutableBytesWritable(Bytes.toBytes("xxabc"), 2, 2).hashCode());
   }
 
+  public void testSpecificCompare() {
+    ImmutableBytesWritable ibw1 = new ImmutableBytesWritable(new byte[]{0x0f});
+    ImmutableBytesWritable ibw2 = new ImmutableBytesWritable(new byte[]{0x00, 0x00});
+    ImmutableBytesWritable.Comparator c = new ImmutableBytesWritable.Comparator();
+    assertFalse("ibw1 < ibw2", c.compare( ibw1, ibw2 ) < 0 );
+  }
+
   public void testComparison() throws Exception {
     runTests("aa", "b", -1);
     runTests("aa", "aa", 0);

Modified: hbase/branches/0.90_master_rewrite/src/test/java/org/apache/hadoop/hbase/io/hfile/TestCachedBlockQueue.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.90_master_rewrite/src/test/java/org/apache/hadoop/hbase/io/hfile/TestCachedBlockQueue.java?rev=990018&r1=990017&r2=990018&view=diff
==============================================================================
--- hbase/branches/0.90_master_rewrite/src/test/java/org/apache/hadoop/hbase/io/hfile/TestCachedBlockQueue.java (original)
+++ hbase/branches/0.90_master_rewrite/src/test/java/org/apache/hadoop/hbase/io/hfile/TestCachedBlockQueue.java Fri Aug 27 05:01:02 2010
@@ -20,6 +20,8 @@
 package org.apache.hadoop.hbase.io.hfile;
 
 import java.nio.ByteBuffer;
+import java.util.LinkedList;
+
 import junit.framework.TestCase;
 
 public class TestCachedBlockQueue extends TestCase {
@@ -57,15 +59,16 @@ public class TestCachedBlockQueue extend
 
     assertEquals(queue.heapSize(), expectedSize);
 
-    org.apache.hadoop.hbase.io.hfile.CachedBlock [] blocks = queue.get();
-    assertEquals(blocks[0].getName(), "cb1");
-    assertEquals(blocks[1].getName(), "cb2");
-    assertEquals(blocks[2].getName(), "cb3");
-    assertEquals(blocks[3].getName(), "cb4");
-    assertEquals(blocks[4].getName(), "cb5");
-    assertEquals(blocks[5].getName(), "cb6");
-    assertEquals(blocks[6].getName(), "cb7");
-    assertEquals(blocks[7].getName(), "cb8");
+    LinkedList<org.apache.hadoop.hbase.io.hfile.CachedBlock> blocks =
+      queue.get();
+    assertEquals(blocks.poll().getName(), "cb1");
+    assertEquals(blocks.poll().getName(), "cb2");
+    assertEquals(blocks.poll().getName(), "cb3");
+    assertEquals(blocks.poll().getName(), "cb4");
+    assertEquals(blocks.poll().getName(), "cb5");
+    assertEquals(blocks.poll().getName(), "cb6");
+    assertEquals(blocks.poll().getName(), "cb7");
+    assertEquals(blocks.poll().getName(), "cb8");
 
   }
 
@@ -109,16 +112,16 @@ public class TestCachedBlockQueue extend
 
     assertEquals(queue.heapSize(), expectedSize);
 
-    org.apache.hadoop.hbase.io.hfile.CachedBlock [] blocks = queue.get();
-    assertEquals(blocks[0].getName(), "cb0");
-    assertEquals(blocks[1].getName(), "cb1");
-    assertEquals(blocks[2].getName(), "cb2");
-    assertEquals(blocks[3].getName(), "cb3");
-    assertEquals(blocks[4].getName(), "cb4");
-    assertEquals(blocks[5].getName(), "cb5");
-    assertEquals(blocks[6].getName(), "cb6");
-    assertEquals(blocks[7].getName(), "cb7");
-    assertEquals(blocks[8].getName(), "cb8");
+    LinkedList<org.apache.hadoop.hbase.io.hfile.CachedBlock> blocks = queue.get();
+    assertEquals(blocks.poll().getName(), "cb0");
+    assertEquals(blocks.poll().getName(), "cb1");
+    assertEquals(blocks.poll().getName(), "cb2");
+    assertEquals(blocks.poll().getName(), "cb3");
+    assertEquals(blocks.poll().getName(), "cb4");
+    assertEquals(blocks.poll().getName(), "cb5");
+    assertEquals(blocks.poll().getName(), "cb6");
+    assertEquals(blocks.poll().getName(), "cb7");
+    assertEquals(blocks.poll().getName(), "cb8");
 
   }
 
@@ -130,5 +133,4 @@ public class TestCachedBlockQueue extend
           accessTime,false);
     }
   }
-
-}
+}
\ No newline at end of file

Modified: hbase/branches/0.90_master_rewrite/src/test/java/org/apache/hadoop/hbase/io/hfile/TestLruBlockCache.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.90_master_rewrite/src/test/java/org/apache/hadoop/hbase/io/hfile/TestLruBlockCache.java?rev=990018&r1=990017&r2=990018&view=diff
==============================================================================
--- hbase/branches/0.90_master_rewrite/src/test/java/org/apache/hadoop/hbase/io/hfile/TestLruBlockCache.java (original)
+++ hbase/branches/0.90_master_rewrite/src/test/java/org/apache/hadoop/hbase/io/hfile/TestLruBlockCache.java Fri Aug 27 05:01:02 2010
@@ -117,6 +117,9 @@ public class TestLruBlockCache extends T
 
     // Expect no evictions
     assertEquals(0, cache.getEvictionCount());
+    Thread t = new LruBlockCache.StatisticsThread(cache);
+    t.start();
+    t.join();
   }
 
   public void testCacheEvictionSimple() throws Exception {

Added: hbase/branches/0.90_master_rewrite/src/test/java/org/apache/hadoop/hbase/io/hfile/TestReseekTo.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.90_master_rewrite/src/test/java/org/apache/hadoop/hbase/io/hfile/TestReseekTo.java?rev=990018&view=auto
==============================================================================
--- hbase/branches/0.90_master_rewrite/src/test/java/org/apache/hadoop/hbase/io/hfile/TestReseekTo.java (added)
+++ hbase/branches/0.90_master_rewrite/src/test/java/org/apache/hadoop/hbase/io/hfile/TestReseekTo.java Fri Aug 27 05:01:02 2010
@@ -0,0 +1,88 @@
+/**
+ * Copyright 2010 The Apache Software Foundation
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.io.hfile;
+
+import java.util.ArrayList;
+import java.util.List;
+
+import org.apache.hadoop.fs.FSDataOutputStream;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hbase.HBaseTestingUtility;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.junit.Test;
+
+import static org.junit.Assert.*;
+
+/**
+ * Test {@link HFileScanner#reseekTo(byte[])}
+ */
+public class TestReseekTo {
+
+  private final static HBaseTestingUtility TEST_UTIL = new HBaseTestingUtility();
+
+  @Test
+  public void testReseekTo() throws Exception {
+
+    Path ncTFile = new Path(HBaseTestingUtility.getTestDir(), "basic.hfile");
+    FSDataOutputStream fout = TEST_UTIL.getTestFileSystem().create(ncTFile);
+    HFile.Writer writer = new HFile.Writer(fout, 4000, "none", null);
+    int numberOfKeys = 1000;
+
+    String valueString = "Value";
+
+    List<Integer> keyList = new ArrayList<Integer>();
+    List<String> valueList = new ArrayList<String>();
+
+    for (int key = 0; key < numberOfKeys; key++) {
+      String value = valueString + key;
+      keyList.add(key);
+      valueList.add(value);
+      writer.append(Bytes.toBytes(key), Bytes.toBytes(value));
+    }
+    writer.close();
+    fout.close();
+
+    HFile.Reader reader = new HFile.Reader(TEST_UTIL.getTestFileSystem(),
+        ncTFile, null, false);
+    reader.loadFileInfo();
+    HFileScanner scanner = reader.getScanner(false, true);
+
+    scanner.seekTo();
+    for (int i = 0; i < keyList.size(); i++) {
+      Integer key = keyList.get(i);
+      String value = valueList.get(i);
+      long start = System.nanoTime();
+      scanner.seekTo(Bytes.toBytes(key));
+      System.out.println("Seek Finished in: " + (System.nanoTime() - start)/1000 + " micro s");
+      assertEquals(value, scanner.getValueString());
+    }
+
+    scanner.seekTo();
+    for (int i = 0; i < keyList.size(); i += 10) {
+      Integer key = keyList.get(i);
+      String value = valueList.get(i);
+      long start = System.nanoTime();
+      scanner.reseekTo(Bytes.toBytes(key));
+      System.out.println("Reseek Finished in: " + (System.nanoTime() - start)/1000 + " micro s");
+      assertEquals(value, scanner.getValueString());
+    }
+  }
+
+}
\ No newline at end of file