You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hbase.apache.org by st...@apache.org on 2010/08/27 07:01:07 UTC
svn commit: r990018 [7/10] - in /hbase/branches/0.90_master_rewrite: ./ bin/
bin/replication/ src/assembly/ src/docbkx/
src/main/java/org/apache/hadoop/hbase/
src/main/java/org/apache/hadoop/hbase/client/
src/main/java/org/apache/hadoop/hbase/filter/ s...
Added: hbase/branches/0.90_master_rewrite/src/site/xdoc/replication.xml
URL: http://svn.apache.org/viewvc/hbase/branches/0.90_master_rewrite/src/site/xdoc/replication.xml?rev=990018&view=auto
==============================================================================
--- hbase/branches/0.90_master_rewrite/src/site/xdoc/replication.xml (added)
+++ hbase/branches/0.90_master_rewrite/src/site/xdoc/replication.xml Fri Aug 27 05:01:02 2010
@@ -0,0 +1,429 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+ Copyright 2010 The Apache Software Foundation
+
+ Licensed under the Apache License, Version 2.0 (the "License");
+ you may not use this file except in compliance with the License.
+ You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+ Unless required by applicable law or agreed to in writing, software
+ distributed under the License is distributed on an "AS IS" BASIS,
+ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ See the License for the specific language governing permissions and
+ limitations under the License.
+-->
+
+<!DOCTYPE document PUBLIC "-//APACHE//DTD Documentation V2.0//EN"
+ "http://forrest.apache.org/dtd/document-v20.dtd">
+
+<document xmlns="http://maven.apache.org/XDOC/2.0"
+ xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+ xsi:schemaLocation="http://maven.apache.org/XDOC/2.0 http://maven.apache.org/xsd/xdoc-2.0.xsd">
+ <properties>
+ <title>
+ HBase Replication
+ </title>
+ </properties>
+ <body>
+ <section name="Overview">
+ <p>
+ HBase replication is a way to copy data between HBase deployments. It
+ can serve as a disaster recovery solution and can contribute to provide
+ higher availability at HBase layer. It can also serve more practically;
+ for example, as a way to easily copy edits from a web-facing cluster to a "MapReduce"
+ cluster which will process old and new data and ship back the results
+ automatically.
+ </p>
+ <p>
+ The basic architecture pattern used for HBase replication is (HBase cluster) master-push;
+ it is much easier to keep track of whatâs currently being replicated since
+ each region server has its own write-ahead-log (aka WAL or HLog), just like
+ other well known solutions like MySQL master/slave replication where
+ thereâs only one bin log to keep track of. One master cluster can
+ replicate to any number of slave clusters, and each region server will
+ participate to replicate their own stream of edits.
+ </p>
+ <p>
+ The replication is done asynchronously, meaning that the clusters can
+ be geographically distant, the links between them can be offline for
+ some time, and rows inserted on the master cluster wonât be
+ available at the same time on the slave clusters (eventual consistency).
+ </p>
+ <p>
+ The replication format used in this design is conceptually the same as
+ <a href="http://dev.mysql.com/doc/refman/5.1/en/replication-formats.html">
+ MySQLâs statement-based replication </a>. Instead of SQL statements, whole
+ WALEdits (consisting of multiple cell inserts coming from the clients'
+ Put and Delete) are replicated in order to maintain atomicity.
+ </p>
+ <p>
+ The HLogs from each region server are the basis of HBase replication,
+ and must be kept in HDFS as long as they are needed to replicate data
+ to any slave cluster. Each RS reads from the oldest log it needs to
+ replicate and keeps the current position inside ZooKeeper to simplify
+ failure recovery. That position can be different for every slave
+ cluster, same for the queue of HLogs to process.
+ </p>
+ <p>
+ The clusters participating in replication can be of asymmetric sizes
+ and the master cluster will do its âbest effortâ to balance the stream
+ of replication on the slave clusters by relying on randomization.
+ </p>
+ <img src="images/replication_overview.png"/>
+ </section>
+ <section name="Life of a log edit">
+ <p>
+ The following sections describe the life of a single edit going from a
+ client that communicates with a master cluster all the way to a single
+ slave cluster.
+ </p>
+ <section name="Normal processing">
+ <p>
+ The client uses a HBase API that sends a Put, Delete or ICV to a region
+ server. The key values are transformed into a WALEdit by the region
+ server and is inspected by the replication code that, for each family
+ that is scoped for replication, adds the scope to the edit. The edit
+ is appended to the current WAL and is then applied to its MemStore.
+ </p>
+ <p>
+ In a separate thread, the edit is read from the log (as part of a batch)
+ and only the KVs that are replicable are kept (that is, that are part
+ of a family scoped GLOBAL in the family's schema and non-catalog so not
+ .META. or -ROOT-). When the buffer is filled, or the reader hits the
+ end of the file, the buffer is sent to a random region server on the
+ slave cluster.
+ </p>
+ <p>
+ Synchronously, the region server that receives the edits reads them
+ sequentially and applies them on its own cluster using the HBase client
+ (HTables managed by a HTablePool) automatically. If consecutive rows
+ belong to the same table, they are inserted together in order to
+ leverage parallel insertions.
+ </p>
+ <p>
+ Back in the master cluster's region server, the offset for the current
+ WAL that's being replicated is registered in ZooKeeper.
+ </p>
+ </section>
+ <section name="Non-responding slave clusters">
+ <p>
+ The edit is inserted in the same way.
+ </p>
+ <p>
+ In the separate thread, the region server reads, filters and buffers
+ the log edits the same way as during normal processing. The slave
+ region server that's contacted doesn't answer to the RPC, so the master
+ region server will sleep and retry up to a configured number of times.
+ If the slave RS still isn't available, the master cluster RS will select a
+ new subset of RS to replicate to and will retry sending the buffer of
+ edits.
+ </p>
+ <p>
+ In the mean time, the WALs will be rolled and stored in a queue in
+ ZooKeeper. Logs that are archived by their region server (archiving is
+ basically moving a log from the region server's logs directory to a
+ central logs archive directory) will update their paths in the in-memory
+ queue of the replicating thread.
+ </p>
+ <p>
+ When the slave cluster is finally available, the buffer will be applied
+ the same way as during normal processing. The master cluster RS will then
+ replicate the backlog of logs.
+ </p>
+ </section>
+ </section>
+ <section name="Internals">
+ <p>
+ This section describes in depth how each of replication's internal
+ features operate.
+ </p>
+ <section name="Choosing region servers to replicate to">
+ <p>
+ When a master cluster RS initiates a replication source to a slave cluster,
+ it first connects to the slave's ZooKeeper ensemble using the provided
+ cluster key (taht key is composed of the value of hbase.zookeeper.quorum,
+ zookeeper.znode.parent and hbase.zookeeper.property.clientPort). It
+ then scans the "rs" directory to discover all the available sinks
+ (region servers that are accepting incoming streams of edits to replicate)
+ and will randomly choose a subset of them using a configured
+ ratio (which has a default value of 10%). For example, if a slave
+ cluster has 150 machines, 15 will be chosen as potential recipient for
+ edits that this master cluster RS will be sending. Since this is done by all
+ master cluster RSs, the probability that all slave RSs are used is very high,
+ and this method works for clusters of any size. For example, a master cluster
+ of 10 machines replicating to a slave cluster of 5 machines with a ratio
+ of 10% means that the master cluster RSs will choose one machine each
+ at random, thus the chance of overlapping and full usage of the slave
+ cluster is higher.
+ </p>
+ </section>
+ <section name="Keeping track of logs">
+ <p>
+ Every master cluster RS has its own znode in the replication znodes hierarchy.
+ It contains one znode per peer cluster (if 5 slave clusters, 5 znodes
+ are created), and each of these contain a queue
+ of HLogs to process. Each of these queues will track the HLogs created
+ by that RS, but they can differ in size. For example, if one slave
+ cluster becomes unavailable for some time then the HLogs cannot be,
+ thus they need to stay in the queue (while the others are processed).
+ See the section named "Region server failover" for an example.
+ </p>
+ <p>
+ When a source is instantiated, it contains the current HLog that the
+ region server is writing to. During log rolling, the new file is added
+ to the queue of each slave cluster's znode just before it's made available.
+ This ensures that all the sources are aware that a new log exists
+ before HLog is able to append edits into it, but this operations is
+ now more expensive.
+ The queue items are discarded when the replication thread cannot read
+ more entries from a file (because it reached the end of the last block)
+ and that there are other files in the queue.
+ This means that if a source is up-to-date and replicates from the log
+ that the region server writes to, reading up to the "end" of the
+ current file won't delete the item in the queue.
+ </p>
+ <p>
+ When a log is archived (because it's not used anymore or because there's
+ too many of them per hbase.regionserver.maxlogs typically because insertion
+ rate is faster than region flushing), it will notify the source threads that the path
+ for that log changed. If the a particular source was already done with
+ it, it will just ignore the message. If it's in the queue, the path
+ will be updated in memory. If the log is currently being replicated,
+ the change will be done atomically so that the reader doesn't try to
+ open the file when it's already moved. Also, moving a file is a NameNode
+ operation so, if the reader is currently reading the log, it won't
+ generate any exception.
+ </p>
+ </section>
+ <section name="Reading, filtering and sending edits">
+ <p>
+ By default, a source will try to read from a log file and ship log
+ entries as fast as possible to a sink. This is first limited by the
+ filtering of log entries; only KeyValues that are scoped GLOBAL and
+ that don't belong to catalog tables will be retained. A second limit
+ is imposed on the total size of the list of edits to replicate per slave,
+ which by default is 64MB. This means that a master cluster RS with 3 slaves
+ will use at most 192MB to store data to replicate. This doesn't account
+ the data filtered that wasn't garbage collected.
+ </p>
+ <p>
+ Once the maximum size of edits was buffered or the reader hits the end
+ of the log file, the source thread will stop reading and will choose
+ at random a sink to replicate to (from the list that was generated by
+ keeping only a subset of slave RSs). It will directly issue a RPC to
+ the chosen machine and will wait for the method to return. If it's
+ successful, the source will determine if the current file is emptied
+ or if it should continue to read from it. If the former, it will delete
+ the znode in the queue. If the latter, it will register the new offset
+ in the log's znode. If the RPC threw an exception, the source will retry
+ 10 times until trying to find a different sink.
+ </p>
+ </section>
+ <section name="Applying edits">
+ <p>
+ The sink synchronously applies the edits to its local cluster using
+ the native client API. This method is also synchronized between every
+ incoming sources, which means that only one batch of log entries can be
+ replicated at a time by each slave region server.
+ </p>
+ <p>
+ The sink applies the edits one by one, unless it's able to batch
+ sequential Puts that belong to the same table in order to use the
+ parallel puts feature of HConnectionManager. The Put and Delete objects
+ are recreated by inspecting the incoming WALEdit objects and are
+ with the exact same row, family, qualifier, timestamp, and value (for
+ Put). Note that if the master and slave cluster don't have the same
+ time, time-related issues may occur.
+ </p>
+ </section>
+ <section name="Cleaning logs">
+ <p>
+ If replication isn't enabled, the master's logs cleaning thread will
+ delete old logs using a configured TTL. This doesn't work well with
+ replication since archived logs passed their TTL may still be in a
+ queue. Thus, the default behavior is augmented so that if a log is
+ passed its TTL, the cleaning thread will lookup every queue until it
+ finds the log (while caching the ones it finds). If it's not found,
+ the log will be deleted. The next time it has to look for a log,
+ it will first use its cache.
+ </p>
+ </section>
+ <section name="Region server failover">
+ <p>
+ As long as region servers don't fail, keeping track of the logs in ZK
+ doesn't add any value. Unfortunately, they do fail, so since ZooKeeper
+ is highly available we can count on it and its semantics to help us
+ managing the transfer of the queues.
+ </p>
+ <p>
+ All the master cluster RSs keep a watcher on every other one of them to be
+ notified when one dies (just like the master does). When it happens,
+ they all race to create a znode called "lock" inside the dead RS' znode
+ that contains its queues. The one that creates it successfully will
+ proceed by transferring all the queues to its own znode (one by one
+ since ZK doesn't support the rename operation) and will delete all the
+ old ones when it's done. The recovered queues' znodes will be named
+ with the id of the slave cluster appended with the name of the dead
+ server.
+ </p>
+ <p>
+ Once that is done, the master cluster RS will create one new source thread per
+ copied queue, and each of them will follow the read/filter/ship pattern.
+ The main difference is that those queues will never have new data since
+ they don't belong to their new region server, which means that when
+ the reader hits the end of the last log, the queue's znode will be
+ deleted and the master cluster RS will close that replication source.
+ </p>
+ <p>
+ For example, consider a master cluster with 3 region servers that's
+ replicating to a single slave with id '2'. The following hierarchy
+ represents what the znodes layout could be at some point in time. We
+ can see the RSs' znodes all contain a "peers" znode that contains a
+ single queue. The znode names in the queues represent the actual file
+ names on HDFS in the form "address,port.timestamp".
+ </p>
+ <pre>
+/hbase/replication/rs/
+ 1.1.1.1,60020,123456780/
+ peers/
+ 2/
+ 1.1.1.1,60020.1234 (Contains a position)
+ 1.1.1.1,60020.1265
+ 1.1.1.2,60020,123456790/
+ peers/
+ 2/
+ 1.1.1.2,60020.1214 (Contains a position)
+ 1.1.1.2,60020.1248
+ 1.1.1.2,60020.1312
+ 1.1.1.3,60020, 123456630/
+ peers/
+ 2/
+ 1.1.1.3,60020.1280 (Contains a position)
+
+ </pre>
+ <p>
+ Now let's say that 1.1.1.2 loses its ZK session. The survivors will race
+ to create a lock, and for some reasons 1.1.1.3 wins. It will then start
+ transferring all the queues to its local peers znode by appending the
+ name of the dead server. Right before 1.1.1.3 is able to clean up the
+ old znodes, the layout will look like the following:
+ </p>
+ <pre>
+/hbase/replication/rs/
+ 1.1.1.1,60020,123456780/
+ peers/
+ 2/
+ 1.1.1.1,60020.1234 (Contains a position)
+ 1.1.1.1,60020.1265
+ 1.1.1.2,60020,123456790/
+ lock
+ peers/
+ 2/
+ 1.1.1.2,60020.1214 (Contains a position)
+ 1.1.1.2,60020.1248
+ 1.1.1.2,60020.1312
+ 1.1.1.3,60020,123456630/
+ peers/
+ 2/
+ 1.1.1.3,60020.1280 (Contains a position)
+
+ 2-1.1.1.2,60020,123456790/
+ 1.1.1.2,60020.1214 (Contains a position)
+ 1.1.1.2,60020.1248
+ 1.1.1.2,60020.1312
+ </pre>
+ <p>
+ Some time later, but before 1.1.1.3 is able to finish replicating the
+ last HLog from 1.1.1.2, let's say that it dies too (also some new logs
+ were created in the normal queues). The last RS will then try to lock
+ 1.1.1.3's znode and will begin transferring all the queues. The new
+ layout will be:
+ </p>
+ <pre>
+/hbase/replication/rs/
+ 1.1.1.1,60020,123456780/
+ peers/
+ 2/
+ 1.1.1.1,60020.1378 (Contains a position)
+
+ 2-1.1.1.3,60020,123456630/
+ 1.1.1.3,60020.1325 (Contains a position)
+ 1.1.1.3,60020.1401
+
+ 2-1.1.1.2,60020,123456790-1.1.1.3,60020,123456630/
+ 1.1.1.2,60020.1312 (Contains a position)
+ 1.1.1.3,60020,123456630/
+ lock
+ peers/
+ 2/
+ 1.1.1.3,60020.1325 (Contains a position)
+ 1.1.1.3,60020.1401
+
+ 2-1.1.1.2,60020,123456790/
+ 1.1.1.2,60020.1312 (Contains a position)
+ </pre>
+ </section>
+ </section>
+ <section name="FAQ">
+ <section name="Why do all clusters need to be in the same timezone?">
+ <p>
+ Suppose an edit to cell X happens in a EST cluster, then 2 minutes
+ later a new edits happens to the same cell in a PST cluster and that
+ both clusters are in a master-master replication. The second edit is
+ considered younger, so the first will always hide it while in fact the
+ second is older.
+ </p>
+ </section>
+ <section name="GLOBAL means replicate? Any provision to replicate only to cluster X and not to cluster Y? or is that for later?">
+ <p>
+ Yes, this is for much later.
+ </p>
+ </section>
+ <section name="You need a bulk edit shipper? Something that allows you transfer 64MB of edits in one go?">
+ <p>
+ You can use the HBase-provided utility called CopyTable from the package
+ org.apache.hadoop.hbase.mapreduce in order to have a discp-like tool to
+ bulk copy data.
+ </p>
+ </section>
+ <section name="Is it a mistake that WALEdit doesn't carry Put and Delete objects, that we have to reinstantiate not only replicating but when replaying edits?">
+ <p>
+ Yes, this behavior would help a lot but it's not currently available
+ in HBase (BatchUpdate had that, but it was lost in the new API).
+ </p>
+ </section>
+ </section>
+ <section name="Known bugs/missing features">
+ <p>
+ Here's a list of all the jiras that relate to major issues or missing
+ features in the replication implementation.
+ </p>
+ <ol>
+ <li>
+ HBASE-2688, master-master replication is disabled in the code, we need
+ to enable and test it.
+ </li>
+ <li>
+ HBASE-2611, basically if a region server dies while recovering the
+ queues of another dead RS, we will miss the data from the queues
+ that weren't copied.
+ </li>
+ <li>
+ HBASE-2196, a master cluster can only support a single slave, some
+ refactoring is needed to support this.
+ </li>
+ <li>
+ HBASE-2195, edits are applied disregard their home cluster, it should
+ carry that data and check it.
+ </li>
+ <li>
+ HBASE-2200, currently all the replication operations (adding or removing
+ streams for example) are done only when the clusters are offline. This
+ should be possible at runtime.
+ </li>
+ </ol>
+ </section>
+ </body>
+</document>
\ No newline at end of file
Added: hbase/branches/0.90_master_rewrite/src/test/java/org/apache/hadoop/hbase/BROKE_TODO_FIX_TestAcidGuarantees.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.90_master_rewrite/src/test/java/org/apache/hadoop/hbase/BROKE_TODO_FIX_TestAcidGuarantees.java?rev=990018&view=auto
==============================================================================
--- hbase/branches/0.90_master_rewrite/src/test/java/org/apache/hadoop/hbase/BROKE_TODO_FIX_TestAcidGuarantees.java (added)
+++ hbase/branches/0.90_master_rewrite/src/test/java/org/apache/hadoop/hbase/BROKE_TODO_FIX_TestAcidGuarantees.java Fri Aug 27 05:01:02 2010
@@ -0,0 +1,330 @@
+/**
+ * Copyright 2010 The Apache Software Foundation
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase;
+
+import java.io.IOException;
+import java.util.List;
+import java.util.Random;
+import java.util.concurrent.atomic.AtomicLong;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.MultithreadedTestUtil.TestContext;
+import org.apache.hadoop.hbase.MultithreadedTestUtil.RepeatingTestThread;
+import org.apache.hadoop.hbase.client.Get;
+import org.apache.hadoop.hbase.client.HTable;
+import org.apache.hadoop.hbase.client.Put;
+import org.apache.hadoop.hbase.client.Result;
+import org.apache.hadoop.hbase.client.ResultScanner;
+import org.apache.hadoop.hbase.client.Scan;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.junit.Ignore;
+import org.junit.Test;
+
+import com.google.common.collect.Lists;
+
+/**
+ * Test case that uses multiple threads to read and write multifamily rows
+ * into a table, verifying that reads never see partially-complete writes.
+ *
+ * This can run as a junit test, or with a main() function which runs against
+ * a real cluster (eg for testing with failures, region movement, etc)
+ */
+public class BROKE_TODO_FIX_TestAcidGuarantees {
+ protected static final Log LOG = LogFactory.getLog(BROKE_TODO_FIX_TestAcidGuarantees.class);
+ public static final byte [] TABLE_NAME = Bytes.toBytes("TestAcidGuarantees");
+ public static final byte [] FAMILY_A = Bytes.toBytes("A");
+ public static final byte [] FAMILY_B = Bytes.toBytes("B");
+ public static final byte [] FAMILY_C = Bytes.toBytes("C");
+ public static final byte [] QUALIFIER_NAME = Bytes.toBytes("data");
+
+ public static final byte[][] FAMILIES = new byte[][] {
+ FAMILY_A, FAMILY_B, FAMILY_C };
+
+ private HBaseTestingUtility util;
+
+ public static int NUM_COLS_TO_CHECK = 50;
+
+ private void createTableIfMissing()
+ throws IOException {
+ try {
+ util.createTable(TABLE_NAME, FAMILIES);
+ } catch (TableExistsException tee) {
+ }
+ }
+
+ public BROKE_TODO_FIX_TestAcidGuarantees() {
+ // Set small flush size for minicluster so we exercise reseeking scanners
+ Configuration conf = HBaseConfiguration.create();
+ conf.set("hbase.hregion.memstore.flush.size", String.valueOf(128*1024));
+ util = new HBaseTestingUtility(conf);
+ }
+
+ /**
+ * Thread that does random full-row writes into a table.
+ */
+ public static class AtomicityWriter extends RepeatingTestThread {
+ Random rand = new Random();
+ byte data[] = new byte[10];
+ byte targetRows[][];
+ byte targetFamilies[][];
+ HTable table;
+ AtomicLong numWritten = new AtomicLong();
+
+ public AtomicityWriter(TestContext ctx, byte targetRows[][],
+ byte targetFamilies[][]) throws IOException {
+ super(ctx);
+ this.targetRows = targetRows;
+ this.targetFamilies = targetFamilies;
+ table = new HTable(ctx.getConf(), TABLE_NAME);
+ }
+ public void doAnAction() throws Exception {
+ // Pick a random row to write into
+ byte[] targetRow = targetRows[rand.nextInt(targetRows.length)];
+ Put p = new Put(targetRow);
+ rand.nextBytes(data);
+
+ for (byte[] family : targetFamilies) {
+ for (int i = 0; i < NUM_COLS_TO_CHECK; i++) {
+ byte qualifier[] = Bytes.toBytes("col" + i);
+ p.add(family, qualifier, data);
+ }
+ }
+ table.put(p);
+ numWritten.getAndIncrement();
+ }
+ }
+
+ /**
+ * Thread that does single-row reads in a table, looking for partially
+ * completed rows.
+ */
+ public static class AtomicGetReader extends RepeatingTestThread {
+ byte targetRow[];
+ byte targetFamilies[][];
+ HTable table;
+ int numVerified = 0;
+ AtomicLong numRead = new AtomicLong();
+
+ public AtomicGetReader(TestContext ctx, byte targetRow[],
+ byte targetFamilies[][]) throws IOException {
+ super(ctx);
+ this.targetRow = targetRow;
+ this.targetFamilies = targetFamilies;
+ table = new HTable(ctx.getConf(), TABLE_NAME);
+ }
+
+ public void doAnAction() throws Exception {
+ Get g = new Get(targetRow);
+ Result res = table.get(g);
+ byte[] gotValue = null;
+ if (res.getRow() == null) {
+ // Trying to verify but we didn't find the row - the writing
+ // thread probably just hasn't started writing yet, so we can
+ // ignore this action
+ return;
+ }
+
+ for (byte[] family : targetFamilies) {
+ for (int i = 0; i < NUM_COLS_TO_CHECK; i++) {
+ byte qualifier[] = Bytes.toBytes("col" + i);
+ byte thisValue[] = res.getValue(family, qualifier);
+ if (gotValue != null && !Bytes.equals(gotValue, thisValue)) {
+ gotFailure(gotValue, res);
+ }
+ numVerified++;
+ gotValue = thisValue;
+ }
+ }
+ numRead.getAndIncrement();
+ }
+
+ private void gotFailure(byte[] expected, Result res) {
+ StringBuilder msg = new StringBuilder();
+ msg.append("Failed after ").append(numVerified).append("!");
+ msg.append("Expected=").append(Bytes.toStringBinary(expected));
+ msg.append("Got:\n");
+ for (KeyValue kv : res.list()) {
+ msg.append(kv.toString());
+ msg.append(" val= ");
+ msg.append(Bytes.toStringBinary(kv.getValue()));
+ msg.append("\n");
+ }
+ throw new RuntimeException(msg.toString());
+ }
+ }
+
+ /**
+ * Thread that does full scans of the table looking for any partially completed
+ * rows.
+ */
+ public static class AtomicScanReader extends RepeatingTestThread {
+ byte targetFamilies[][];
+ HTable table;
+ AtomicLong numScans = new AtomicLong();
+ AtomicLong numRowsScanned = new AtomicLong();
+
+ public AtomicScanReader(TestContext ctx,
+ byte targetFamilies[][]) throws IOException {
+ super(ctx);
+ this.targetFamilies = targetFamilies;
+ table = new HTable(ctx.getConf(), TABLE_NAME);
+ }
+
+ public void doAnAction() throws Exception {
+ Scan s = new Scan();
+ for (byte[] family : targetFamilies) {
+ s.addFamily(family);
+ }
+ ResultScanner scanner = table.getScanner(s);
+
+ for (Result res : scanner) {
+ byte[] gotValue = null;
+
+ for (byte[] family : targetFamilies) {
+ for (int i = 0; i < NUM_COLS_TO_CHECK; i++) {
+ byte qualifier[] = Bytes.toBytes("col" + i);
+ byte thisValue[] = res.getValue(family, qualifier);
+ if (gotValue != null && !Bytes.equals(gotValue, thisValue)) {
+ gotFailure(gotValue, res);
+ }
+ gotValue = thisValue;
+ }
+ }
+ numRowsScanned.getAndIncrement();
+ }
+ numScans.getAndIncrement();
+ }
+
+ private void gotFailure(byte[] expected, Result res) {
+ StringBuilder msg = new StringBuilder();
+ msg.append("Failed after ").append(numRowsScanned).append("!");
+ msg.append("Expected=").append(Bytes.toStringBinary(expected));
+ msg.append("Got:\n");
+ for (KeyValue kv : res.list()) {
+ msg.append(kv.toString());
+ msg.append(" val= ");
+ msg.append(Bytes.toStringBinary(kv.getValue()));
+ msg.append("\n");
+ }
+ throw new RuntimeException(msg.toString());
+ }
+ }
+
+
+ public void runTestAtomicity(long millisToRun,
+ int numWriters,
+ int numGetters,
+ int numScanners,
+ int numUniqueRows) throws Exception {
+ createTableIfMissing();
+ TestContext ctx = new TestContext(util.getConfiguration());
+
+ byte rows[][] = new byte[numUniqueRows][];
+ for (int i = 0; i < numUniqueRows; i++) {
+ rows[i] = Bytes.toBytes("test_row_" + i);
+ }
+
+ List<AtomicityWriter> writers = Lists.newArrayList();
+ for (int i = 0; i < numWriters; i++) {
+ AtomicityWriter writer = new AtomicityWriter(
+ ctx, rows, FAMILIES);
+ writers.add(writer);
+ ctx.addThread(writer);
+ }
+
+ List<AtomicGetReader> getters = Lists.newArrayList();
+ for (int i = 0; i < numGetters; i++) {
+ AtomicGetReader getter = new AtomicGetReader(
+ ctx, rows[i % numUniqueRows], FAMILIES);
+ getters.add(getter);
+ ctx.addThread(getter);
+ }
+
+ List<AtomicScanReader> scanners = Lists.newArrayList();
+ for (int i = 0; i < numScanners; i++) {
+ AtomicScanReader scanner = new AtomicScanReader(ctx, FAMILIES);
+ scanners.add(scanner);
+ ctx.addThread(scanner);
+ }
+
+ ctx.startThreads();
+ ctx.waitFor(millisToRun);
+ ctx.stop();
+
+ LOG.info("Finished test. Writers:");
+ for (AtomicityWriter writer : writers) {
+ LOG.info(" wrote " + writer.numWritten.get());
+ }
+ LOG.info("Readers:");
+ for (AtomicGetReader reader : getters) {
+ LOG.info(" read " + reader.numRead.get());
+ }
+ LOG.info("Scanners:");
+ for (AtomicScanReader scanner : scanners) {
+ LOG.info(" scanned " + scanner.numScans.get());
+ LOG.info(" verified " + scanner.numRowsScanned.get() + " rows");
+ }
+ }
+
+ @Test
+ public void testGetAtomicity() throws Exception {
+ util.startMiniCluster(1);
+ try {
+ runTestAtomicity(20000, 5, 5, 0, 3);
+ } finally {
+ util.shutdownMiniCluster();
+ }
+ }
+
+ @Test
+ @Ignore("Currently not passing - see HBASE-2670")
+ public void testScanAtomicity() throws Exception {
+ util.startMiniCluster(1);
+ try {
+ runTestAtomicity(20000, 5, 0, 5, 3);
+ } finally {
+ util.shutdownMiniCluster();
+ }
+ }
+
+ @Test
+ @Ignore("Currently not passing - see HBASE-2670")
+ public void testMixedAtomicity() throws Exception {
+ util.startMiniCluster(1);
+ try {
+ runTestAtomicity(20000, 5, 2, 2, 3);
+ } finally {
+ util.shutdownMiniCluster();
+ }
+ }
+
+ public static void main(String args[]) throws Exception {
+ Configuration c = HBaseConfiguration.create();
+ BROKE_TODO_FIX_TestAcidGuarantees test = new BROKE_TODO_FIX_TestAcidGuarantees();
+ test.setConf(c);
+ test.runTestAtomicity(5*60*1000, 5, 2, 2, 3);
+ }
+
+ private void setConf(Configuration c) {
+ util = new HBaseTestingUtility(c);
+ }
+}
Modified: hbase/branches/0.90_master_rewrite/src/test/java/org/apache/hadoop/hbase/HBaseTestingUtility.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.90_master_rewrite/src/test/java/org/apache/hadoop/hbase/HBaseTestingUtility.java?rev=990018&r1=990017&r2=990018&view=diff
==============================================================================
--- hbase/branches/0.90_master_rewrite/src/test/java/org/apache/hadoop/hbase/HBaseTestingUtility.java (original)
+++ hbase/branches/0.90_master_rewrite/src/test/java/org/apache/hadoop/hbase/HBaseTestingUtility.java Fri Aug 27 05:01:02 2010
@@ -29,6 +29,8 @@ import java.security.MessageDigest;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
+import java.util.Map;
+import java.util.NavigableSet;
import java.util.UUID;
import org.apache.commons.logging.Log;
@@ -39,6 +41,7 @@ import org.apache.hadoop.conf.Configurat
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.client.Delete;
+import org.apache.hadoop.hbase.client.Get;
import org.apache.hadoop.hbase.client.HBaseAdmin;
import org.apache.hadoop.hbase.client.HConnection;
import org.apache.hadoop.hbase.client.HTable;
@@ -47,7 +50,11 @@ import org.apache.hadoop.hbase.client.Re
import org.apache.hadoop.hbase.client.ResultScanner;
import org.apache.hadoop.hbase.client.Scan;
import org.apache.hadoop.hbase.master.HMaster;
+import org.apache.hadoop.hbase.regionserver.HRegion;
import org.apache.hadoop.hbase.regionserver.HRegionServer;
+import org.apache.hadoop.hbase.regionserver.InternalScanner;
+import org.apache.hadoop.hbase.regionserver.ReadWriteConsistencyControl;
+import org.apache.hadoop.hbase.regionserver.Store;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hbase.util.FSUtils;
import org.apache.hadoop.hbase.util.Threads;
@@ -384,6 +391,14 @@ public class HBaseTestingUtility {
this.hbaseCluster.flushcache();
}
+ /**
+ * Flushes all caches in the mini hbase cluster
+ * @throws IOException
+ */
+ public void flush(byte [] tableName) throws IOException {
+ this.hbaseCluster.flushcache(tableName);
+ }
+
/**
* Create a table.
@@ -506,6 +521,7 @@ public class HBaseTestingUtility {
* @throws IOException
*/
public int loadTable(final HTable t, final byte[] f) throws IOException {
+ t.setAutoFlush(false);
byte[] k = new byte[3];
int rowCount = 0;
for (byte b1 = 'a'; b1 <= 'z'; b1++) {
@@ -521,6 +537,34 @@ public class HBaseTestingUtility {
}
}
}
+ t.flushCommits();
+ return rowCount;
+ }
+ /**
+ * Load region with rows from 'aaa' to 'zzz'.
+ * @param r Region
+ * @param f Family
+ * @return Count of rows loaded.
+ * @throws IOException
+ */
+ public int loadRegion(final HRegion r, final byte[] f)
+ throws IOException {
+ byte[] k = new byte[3];
+ int rowCount = 0;
+ for (byte b1 = 'a'; b1 <= 'z'; b1++) {
+ for (byte b2 = 'a'; b2 <= 'z'; b2++) {
+ for (byte b3 = 'a'; b3 <= 'z'; b3++) {
+ k[0] = b1;
+ k[1] = b2;
+ k[2] = b3;
+ Put put = new Put(k);
+ put.add(f, null, k);
+ if (r.getLog() == null) put.setWriteToWAL(false);
+ r.put(put);
+ rowCount++;
+ }
+ }
+ }
return rowCount;
}
@@ -678,6 +722,26 @@ public class HBaseTestingUtility {
}
/**
+ * Tool to get the reference to the region server object that holds the
+ * region of the specified user table.
+ * It first searches for the meta rows that contain the region of the
+ * specified table, then gets the index of that RS, and finally retrieves
+ * the RS's reference.
+ * @param tableName user table to lookup in .META.
+ * @return region server that holds it, null if the row doesn't exist
+ * @throws IOException
+ */
+ public HRegionServer getRSForFirstRegionInTable(byte[] tableName)
+ throws IOException {
+ List<byte[]> metaRows = getMetaTableRows(tableName);
+ if (metaRows == null || metaRows.size() == 0) {
+ return null;
+ }
+ int index = hbaseCluster.getServerWith(metaRows.get(0));
+ return hbaseCluster.getRegionServerThreads().get(index).getRegionServer();
+ }
+
+ /**
* Starts a <code>MiniMRCluster</code> with a default number of
* <code>TaskTracker</code>'s.
*
@@ -1026,4 +1090,42 @@ public class HBaseTestingUtility {
Threads.sleep(1000);
}
}
-}
\ No newline at end of file
+
+ /**
+ * Do a small get/scan against one store. This is required because store
+ * has no actual methods of querying itself, and relies on StoreScanner.
+ */
+ public static List<KeyValue> getFromStoreFile(Store store,
+ Get get) throws IOException {
+ ReadWriteConsistencyControl.resetThreadReadPoint();
+ Scan scan = new Scan(get);
+ InternalScanner scanner = (InternalScanner) store.getScanner(scan,
+ scan.getFamilyMap().get(store.getFamily().getName()));
+
+ List<KeyValue> result = new ArrayList<KeyValue>();
+ scanner.next(result);
+ if (!result.isEmpty()) {
+ // verify that we are on the row we want:
+ KeyValue kv = result.get(0);
+ if (!Bytes.equals(kv.getRow(), get.getRow())) {
+ result.clear();
+ }
+ }
+ return result;
+ }
+
+ /**
+ * Do a small get/scan against one store. This is required because store
+ * has no actual methods of querying itself, and relies on StoreScanner.
+ */
+ public static List<KeyValue> getFromStoreFile(Store store,
+ byte [] row,
+ NavigableSet<byte[]> columns
+ ) throws IOException {
+ Get get = new Get(row);
+ Map<byte[], NavigableSet<byte[]>> s = get.getFamilyMap();
+ s.put(store.getFamily().getName(), columns);
+
+ return getFromStoreFile(store,get);
+ }
+}
Modified: hbase/branches/0.90_master_rewrite/src/test/java/org/apache/hadoop/hbase/MiniHBaseCluster.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.90_master_rewrite/src/test/java/org/apache/hadoop/hbase/MiniHBaseCluster.java?rev=990018&r1=990017&r2=990018&view=diff
==============================================================================
--- hbase/branches/0.90_master_rewrite/src/test/java/org/apache/hadoop/hbase/MiniHBaseCluster.java (original)
+++ hbase/branches/0.90_master_rewrite/src/test/java/org/apache/hadoop/hbase/MiniHBaseCluster.java Fri Aug 27 05:01:02 2010
@@ -350,6 +350,21 @@ public class MiniHBaseCluster {
}
/**
+ * Call flushCache on all regions of the specified table.
+ * @throws IOException
+ */
+ public void flushcache(byte [] tableName) throws IOException {
+ for (JVMClusterUtil.RegionServerThread t:
+ this.hbaseCluster.getRegionServers()) {
+ for(HRegion r: t.getRegionServer().getOnlineRegionsLocalContext()) {
+ if(Bytes.equals(r.getTableDesc().getName(), tableName)) {
+ r.flushcache();
+ }
+ }
+ }
+ }
+
+ /**
* @return List of region server threads.
*/
public List<JVMClusterUtil.RegionServerThread> getRegionServerThreads() {
Modified: hbase/branches/0.90_master_rewrite/src/test/java/org/apache/hadoop/hbase/PerformanceEvaluation.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.90_master_rewrite/src/test/java/org/apache/hadoop/hbase/PerformanceEvaluation.java?rev=990018&r1=990017&r2=990018&view=diff
==============================================================================
--- hbase/branches/0.90_master_rewrite/src/test/java/org/apache/hadoop/hbase/PerformanceEvaluation.java (original)
+++ hbase/branches/0.90_master_rewrite/src/test/java/org/apache/hadoop/hbase/PerformanceEvaluation.java Fri Aug 27 05:01:02 2010
@@ -746,7 +746,6 @@ public class PerformanceEvaluation {
this.admin = new HBaseAdmin(conf);
this.table = new HTable(conf, tableName);
this.table.setAutoFlush(false);
- this.table.setWriteBufferSize(1024*1024*12);
this.table.setScannerCaching(30);
}
Modified: hbase/branches/0.90_master_rewrite/src/test/java/org/apache/hadoop/hbase/TestAcidGuarantees.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.90_master_rewrite/src/test/java/org/apache/hadoop/hbase/TestAcidGuarantees.java?rev=990018&r1=990017&r2=990018&view=diff
==============================================================================
--- hbase/branches/0.90_master_rewrite/src/test/java/org/apache/hadoop/hbase/TestAcidGuarantees.java (original)
+++ hbase/branches/0.90_master_rewrite/src/test/java/org/apache/hadoop/hbase/TestAcidGuarantees.java Fri Aug 27 05:01:02 2010
@@ -286,6 +286,7 @@ public class TestAcidGuarantees {
}
@Test
+ @Ignore("Currently not passing - see HBASE-2856")
public void testGetAtomicity() throws Exception {
util.startMiniCluster(1);
try {
Added: hbase/branches/0.90_master_rewrite/src/test/java/org/apache/hadoop/hbase/client/TestMultipleTimestamps.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.90_master_rewrite/src/test/java/org/apache/hadoop/hbase/client/TestMultipleTimestamps.java?rev=990018&view=auto
==============================================================================
--- hbase/branches/0.90_master_rewrite/src/test/java/org/apache/hadoop/hbase/client/TestMultipleTimestamps.java (added)
+++ hbase/branches/0.90_master_rewrite/src/test/java/org/apache/hadoop/hbase/client/TestMultipleTimestamps.java Fri Aug 27 05:01:02 2010
@@ -0,0 +1,516 @@
+/**
+ * Copyright 2009 The Apache Software Foundation
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.client;
+
+import static org.junit.Assert.*;
+
+import java.io.IOException;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.List;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.hbase.HBaseTestingUtility;
+import org.apache.hadoop.hbase.KeyValue;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.junit.After;
+import org.junit.AfterClass;
+import org.junit.Before;
+import org.junit.BeforeClass;
+import org.junit.Test;
+
+/**
+ * Run tests related to {@link TimestampsFilter} using HBase client APIs.
+ * Sets up the HBase mini cluster once at start. Each creates a table
+ * named for the method and does its stuff against that.
+ */
+public class TestMultipleTimestamps {
+ final Log LOG = LogFactory.getLog(getClass());
+ private final static HBaseTestingUtility TEST_UTIL = new HBaseTestingUtility();
+
+ /**
+ * @throws java.lang.Exception
+ */
+ @BeforeClass
+ public static void setUpBeforeClass() throws Exception {
+ TEST_UTIL.startMiniCluster(3);
+ }
+
+ /**
+ * @throws java.lang.Exception
+ */
+ @AfterClass
+ public static void tearDownAfterClass() throws Exception {
+ TEST_UTIL.shutdownMiniCluster();
+ }
+
+ /**
+ * @throws java.lang.Exception
+ */
+ @Before
+ public void setUp() throws Exception {
+ // Nothing to do.
+ }
+
+ /**
+ * @throws java.lang.Exception
+ */
+ @After
+ public void tearDown() throws Exception {
+ // Nothing to do.
+ }
+
+ @Test
+ public void testReseeksWithOneColumnMiltipleTimestamp() throws IOException {
+ byte [] TABLE = Bytes.toBytes("testReseeksWithOne" +
+ "ColumnMiltipleTimestamps");
+ byte [] FAMILY = Bytes.toBytes("event_log");
+ byte [][] FAMILIES = new byte[][] { FAMILY };
+
+ // create table; set versions to max...
+ HTable ht = TEST_UTIL.createTable(TABLE, FAMILIES, Integer.MAX_VALUE);
+
+ Integer[] putRows = new Integer[] {1, 3, 5, 7};
+ Integer[] putColumns = new Integer[] { 1, 3, 5};
+ Long[] putTimestamps = new Long[] {1L, 2L, 3L, 4L, 5L};
+
+ Integer[] scanRows = new Integer[] {3, 5};
+ Integer[] scanColumns = new Integer[] {3};
+ Long[] scanTimestamps = new Long[] {3L, 4L};
+ int scanMaxVersions = 2;
+
+ put(ht, FAMILY, putRows, putColumns, putTimestamps);
+
+ flush(TABLE);
+
+ ResultScanner scanner = scan(ht, FAMILY, scanRows, scanColumns,
+ scanTimestamps, scanMaxVersions);
+
+ KeyValue[] kvs;
+
+ kvs = scanner.next().raw();
+ assertEquals(2, kvs.length);
+ checkOneCell(kvs[0], FAMILY, 3, 3, 4);
+ checkOneCell(kvs[1], FAMILY, 3, 3, 3);
+ kvs = scanner.next().raw();
+ assertEquals(2, kvs.length);
+ checkOneCell(kvs[0], FAMILY, 5, 3, 4);
+ checkOneCell(kvs[1], FAMILY, 5, 3, 3);
+ }
+
+ @Test
+ public void testReseeksWithMultipleColumnOneTimestamp() throws IOException {
+ byte [] TABLE = Bytes.toBytes("testReseeksWithMultiple" +
+ "ColumnOneTimestamps");
+ byte [] FAMILY = Bytes.toBytes("event_log");
+ byte [][] FAMILIES = new byte[][] { FAMILY };
+
+ // create table; set versions to max...
+ HTable ht = TEST_UTIL.createTable(TABLE, FAMILIES, Integer.MAX_VALUE);
+
+ Integer[] putRows = new Integer[] {1, 3, 5, 7};
+ Integer[] putColumns = new Integer[] { 1, 3, 5};
+ Long[] putTimestamps = new Long[] {1L, 2L, 3L, 4L, 5L};
+
+ Integer[] scanRows = new Integer[] {3, 5};
+ Integer[] scanColumns = new Integer[] {3,4};
+ Long[] scanTimestamps = new Long[] {3L};
+ int scanMaxVersions = 2;
+
+ put(ht, FAMILY, putRows, putColumns, putTimestamps);
+
+ flush(TABLE);
+
+ ResultScanner scanner = scan(ht, FAMILY, scanRows, scanColumns,
+ scanTimestamps, scanMaxVersions);
+
+ KeyValue[] kvs;
+
+ kvs = scanner.next().raw();
+ assertEquals(1, kvs.length);
+ checkOneCell(kvs[0], FAMILY, 3, 3, 3);
+ kvs = scanner.next().raw();
+ assertEquals(1, kvs.length);
+ checkOneCell(kvs[0], FAMILY, 5, 3, 3);
+ }
+
+ @Test
+ public void testReseeksWithMultipleColumnMultipleTimestamp() throws
+ IOException {
+ byte [] TABLE = Bytes.toBytes("testReseeksWithMultiple" +
+ "ColumnMiltipleTimestamps");
+ byte [] FAMILY = Bytes.toBytes("event_log");
+ byte [][] FAMILIES = new byte[][] { FAMILY };
+
+ // create table; set versions to max...
+ HTable ht = TEST_UTIL.createTable(TABLE, FAMILIES, Integer.MAX_VALUE);
+
+ Integer[] putRows = new Integer[] {1, 3, 5, 7};
+ Integer[] putColumns = new Integer[] { 1, 3, 5};
+ Long[] putTimestamps = new Long[] {1L, 2L, 3L, 4L, 5L};
+
+ Integer[] scanRows = new Integer[] {5, 7};
+ Integer[] scanColumns = new Integer[] {3, 4, 5};
+ Long[] scanTimestamps = new Long[] {2l, 3L};
+ int scanMaxVersions = 2;
+
+ put(ht, FAMILY, putRows, putColumns, putTimestamps);
+
+ flush(TABLE);
+
+ ResultScanner scanner = scan(ht, FAMILY, scanRows, scanColumns,
+ scanTimestamps, scanMaxVersions);
+
+ KeyValue[] kvs;
+
+ kvs = scanner.next().raw();
+ assertEquals(4, kvs.length);
+ checkOneCell(kvs[0], FAMILY, 5, 3, 3);
+ checkOneCell(kvs[1], FAMILY, 5, 3, 2);
+ checkOneCell(kvs[2], FAMILY, 5, 5, 3);
+ checkOneCell(kvs[3], FAMILY, 5, 5, 2);
+ kvs = scanner.next().raw();
+ assertEquals(4, kvs.length);
+ checkOneCell(kvs[0], FAMILY, 7, 3, 3);
+ checkOneCell(kvs[1], FAMILY, 7, 3, 2);
+ checkOneCell(kvs[2], FAMILY, 7, 5, 3);
+ checkOneCell(kvs[3], FAMILY, 7, 5, 2);
+ }
+
+ @Test
+ public void testReseeksWithMultipleFiles() throws IOException {
+ byte [] TABLE = Bytes.toBytes("testReseeksWithMultipleFiles");
+ byte [] FAMILY = Bytes.toBytes("event_log");
+ byte [][] FAMILIES = new byte[][] { FAMILY };
+
+ // create table; set versions to max...
+ HTable ht = TEST_UTIL.createTable(TABLE, FAMILIES, Integer.MAX_VALUE);
+
+ Integer[] putRows1 = new Integer[] {1, 2, 3};
+ Integer[] putColumns1 = new Integer[] { 2, 5, 6};
+ Long[] putTimestamps1 = new Long[] {1L, 2L, 5L};
+
+ Integer[] putRows2 = new Integer[] {6, 7};
+ Integer[] putColumns2 = new Integer[] {3, 6};
+ Long[] putTimestamps2 = new Long[] {4L, 5L};
+
+ Integer[] putRows3 = new Integer[] {2, 3, 5};
+ Integer[] putColumns3 = new Integer[] {1, 2, 3};
+ Long[] putTimestamps3 = new Long[] {4L,8L};
+
+
+ Integer[] scanRows = new Integer[] {3, 5, 7};
+ Integer[] scanColumns = new Integer[] {3, 4, 5};
+ Long[] scanTimestamps = new Long[] {2l, 4L};
+ int scanMaxVersions = 5;
+
+ put(ht, FAMILY, putRows1, putColumns1, putTimestamps1);
+ flush(TABLE);
+ put(ht, FAMILY, putRows2, putColumns2, putTimestamps2);
+ flush(TABLE);
+ put(ht, FAMILY, putRows3, putColumns3, putTimestamps3);
+
+ ResultScanner scanner = scan(ht, FAMILY, scanRows, scanColumns,
+ scanTimestamps, scanMaxVersions);
+
+ KeyValue[] kvs;
+
+ kvs = scanner.next().raw();
+ assertEquals(2, kvs.length);
+ checkOneCell(kvs[0], FAMILY, 3, 3, 4);
+ checkOneCell(kvs[1], FAMILY, 3, 5, 2);
+
+ kvs = scanner.next().raw();
+ assertEquals(1, kvs.length);
+ checkOneCell(kvs[0], FAMILY, 5, 3, 4);
+
+ kvs = scanner.next().raw();
+ assertEquals(1, kvs.length);
+ checkOneCell(kvs[0], FAMILY, 6, 3, 4);
+
+ kvs = scanner.next().raw();
+ assertEquals(1, kvs.length);
+ checkOneCell(kvs[0], FAMILY, 7, 3, 4);
+ }
+
+ @Test
+ public void testWithVersionDeletes() throws Exception {
+
+ // first test from memstore (without flushing).
+ testWithVersionDeletes(false);
+
+ // run same test against HFiles (by forcing a flush).
+ testWithVersionDeletes(true);
+ }
+
+ public void testWithVersionDeletes(boolean flushTables) throws IOException {
+ byte [] TABLE = Bytes.toBytes("testWithVersionDeletes_" +
+ (flushTables ? "flush" : "noflush"));
+ byte [] FAMILY = Bytes.toBytes("event_log");
+ byte [][] FAMILIES = new byte[][] { FAMILY };
+
+ // create table; set versions to max...
+ HTable ht = TEST_UTIL.createTable(TABLE, FAMILIES, Integer.MAX_VALUE);
+
+ // For row:0, col:0: insert versions 1 through 5.
+ putNVersions(ht, FAMILY, 0, 0, 1, 5);
+
+ if (flushTables) {
+ flush(TABLE);
+ }
+
+ // delete version 4.
+ deleteOneVersion(ht, FAMILY, 0, 0, 4);
+
+ // request a bunch of versions including the deleted version. We should
+ // only get back entries for the versions that exist.
+ KeyValue kvs[] = getNVersions(ht, FAMILY, 0, 0,
+ Arrays.asList(2L, 3L, 4L, 5L));
+ assertEquals(3, kvs.length);
+ checkOneCell(kvs[0], FAMILY, 0, 0, 5);
+ checkOneCell(kvs[1], FAMILY, 0, 0, 3);
+ checkOneCell(kvs[2], FAMILY, 0, 0, 2);
+ }
+
+ @Test
+ public void testWithMultipleVersionDeletes() throws IOException {
+ byte [] TABLE = Bytes.toBytes("testWithMultipleVersionDeletes");
+ byte [] FAMILY = Bytes.toBytes("event_log");
+ byte [][] FAMILIES = new byte[][] { FAMILY };
+
+ // create table; set versions to max...
+ HTable ht = TEST_UTIL.createTable(TABLE, FAMILIES, Integer.MAX_VALUE);
+
+ // For row:0, col:0: insert versions 1 through 5.
+ putNVersions(ht, FAMILY, 0, 0, 1, 5);
+
+ flush(TABLE);
+
+ // delete all versions before 4.
+ deleteAllVersionsBefore(ht, FAMILY, 0, 0, 4);
+
+ // request a bunch of versions including the deleted version. We should
+ // only get back entries for the versions that exist.
+ KeyValue kvs[] = getNVersions(ht, FAMILY, 0, 0, Arrays.asList(2L, 3L));
+ assertEquals(0, kvs.length);
+ }
+
+ @Test
+ public void testWithColumnDeletes() throws IOException {
+ byte [] TABLE = Bytes.toBytes("testWithColumnDeletes");
+ byte [] FAMILY = Bytes.toBytes("event_log");
+ byte [][] FAMILIES = new byte[][] { FAMILY };
+
+ // create table; set versions to max...
+ HTable ht = TEST_UTIL.createTable(TABLE, FAMILIES, Integer.MAX_VALUE);
+
+ // For row:0, col:0: insert versions 1 through 5.
+ putNVersions(ht, FAMILY, 0, 0, 1, 5);
+
+ flush(TABLE);
+
+ // delete all versions before 4.
+ deleteColumn(ht, FAMILY, 0, 0);
+
+ // request a bunch of versions including the deleted version. We should
+ // only get back entries for the versions that exist.
+ KeyValue kvs[] = getNVersions(ht, FAMILY, 0, 0, Arrays.asList(2L, 3L));
+ assertEquals(0, kvs.length);
+ }
+
+ @Test
+ public void testWithFamilyDeletes() throws IOException {
+ byte [] TABLE = Bytes.toBytes("testWithFamilyDeletes");
+ byte [] FAMILY = Bytes.toBytes("event_log");
+ byte [][] FAMILIES = new byte[][] { FAMILY };
+
+ // create table; set versions to max...
+ HTable ht = TEST_UTIL.createTable(TABLE, FAMILIES, Integer.MAX_VALUE);
+
+ // For row:0, col:0: insert versions 1 through 5.
+ putNVersions(ht, FAMILY, 0, 0, 1, 5);
+
+ flush(TABLE);
+
+ // delete all versions before 4.
+ deleteFamily(ht, FAMILY, 0);
+
+ // request a bunch of versions including the deleted version. We should
+ // only get back entries for the versions that exist.
+ KeyValue kvs[] = getNVersions(ht, FAMILY, 0, 0, Arrays.asList(2L, 3L));
+ assertEquals(0, kvs.length);
+ }
+
+ // Flush tables. Since flushing is asynchronous, sleep for a bit.
+ private void flush(byte [] tableName) throws IOException {
+ TEST_UTIL.flush(tableName);
+ try {
+ Thread.sleep(3000);
+ } catch (InterruptedException i) {
+ // ignore
+ }
+ }
+
+ /**
+ * Assert that the passed in KeyValue has expected contents for the
+ * specified row, column & timestamp.
+ */
+ private void checkOneCell(KeyValue kv, byte[] cf,
+ int rowIdx, int colIdx, long ts) {
+
+ String ctx = "rowIdx=" + rowIdx + "; colIdx=" + colIdx + "; ts=" + ts;
+
+ assertEquals("Row mismatch which checking: " + ctx,
+ "row:"+ rowIdx, Bytes.toString(kv.getRow()));
+
+ assertEquals("ColumnFamily mismatch while checking: " + ctx,
+ Bytes.toString(cf), Bytes.toString(kv.getFamily()));
+
+ assertEquals("Column qualifier mismatch while checking: " + ctx,
+ "column:" + colIdx,
+ Bytes.toString(kv.getQualifier()));
+
+ assertEquals("Timestamp mismatch while checking: " + ctx,
+ ts, kv.getTimestamp());
+
+ assertEquals("Value mismatch while checking: " + ctx,
+ "value-version-" + ts, Bytes.toString(kv.getValue()));
+ }
+
+ /**
+ * Uses the TimestampFilter on a Get to request a specified list of
+ * versions for the row/column specified by rowIdx & colIdx.
+ *
+ */
+ private KeyValue[] getNVersions(HTable ht, byte[] cf, int rowIdx,
+ int colIdx, List<Long> versions)
+ throws IOException {
+ byte row[] = Bytes.toBytes("row:" + rowIdx);
+ byte column[] = Bytes.toBytes("column:" + colIdx);
+ Get get = new Get(row);
+ get.addColumn(cf, column);
+ get.setMaxVersions();
+ get.setTimeRange(Collections.min(versions), Collections.max(versions)+1);
+ Result result = ht.get(get);
+
+ return result.raw();
+ }
+
+ private ResultScanner scan(HTable ht, byte[] cf,
+ Integer[] rowIndexes, Integer[] columnIndexes,
+ Long[] versions, int maxVersions)
+ throws IOException {
+ Arrays.asList(rowIndexes);
+ byte startRow[] = Bytes.toBytes("row:" +
+ Collections.min( Arrays.asList(rowIndexes)));
+ byte endRow[] = Bytes.toBytes("row:" +
+ Collections.max( Arrays.asList(rowIndexes))+1);
+ Scan scan = new Scan(startRow, endRow);
+ for (Integer colIdx: columnIndexes) {
+ byte column[] = Bytes.toBytes("column:" + colIdx);
+ scan.addColumn(cf, column);
+ }
+ scan.setMaxVersions(maxVersions);
+ scan.setTimeRange(Collections.min(Arrays.asList(versions)),
+ Collections.max(Arrays.asList(versions))+1);
+ ResultScanner scanner = ht.getScanner(scan);
+ return scanner;
+ }
+
+ private void put(HTable ht, byte[] cf, Integer[] rowIndexes,
+ Integer[] columnIndexes, Long[] versions)
+ throws IOException {
+ for (int rowIdx: rowIndexes) {
+ byte row[] = Bytes.toBytes("row:" + rowIdx);
+ Put put = new Put(row);
+ for(int colIdx: columnIndexes) {
+ byte column[] = Bytes.toBytes("column:" + colIdx);
+ for (long version: versions) {
+ put.add(cf, column, version, Bytes.toBytes("value-version-" +
+ version));
+ }
+ }
+ ht.put(put);
+ }
+ }
+
+ /**
+ * Insert in specific row/column versions with timestamps
+ * versionStart..versionEnd.
+ */
+ private void putNVersions(HTable ht, byte[] cf, int rowIdx, int colIdx,
+ long versionStart, long versionEnd)
+ throws IOException {
+ byte row[] = Bytes.toBytes("row:" + rowIdx);
+ byte column[] = Bytes.toBytes("column:" + colIdx);
+ Put put = new Put(row);
+
+ for (long idx = versionStart; idx <= versionEnd; idx++) {
+ put.add(cf, column, idx, Bytes.toBytes("value-version-" + idx));
+ }
+
+ ht.put(put);
+ }
+
+ /**
+ * For row/column specified by rowIdx/colIdx, delete the cell
+ * corresponding to the specified version.
+ */
+ private void deleteOneVersion(HTable ht, byte[] cf, int rowIdx,
+ int colIdx, long version)
+ throws IOException {
+ byte row[] = Bytes.toBytes("row:" + rowIdx);
+ byte column[] = Bytes.toBytes("column:" + colIdx);
+ Delete del = new Delete(row);
+ del.deleteColumn(cf, column, version);
+ ht.delete(del);
+ }
+
+ /**
+ * For row/column specified by rowIdx/colIdx, delete all cells
+ * preceeding the specified version.
+ */
+ private void deleteAllVersionsBefore(HTable ht, byte[] cf, int rowIdx,
+ int colIdx, long version)
+ throws IOException {
+ byte row[] = Bytes.toBytes("row:" + rowIdx);
+ byte column[] = Bytes.toBytes("column:" + colIdx);
+ Delete del = new Delete(row);
+ del.deleteColumns(cf, column, version);
+ ht.delete(del);
+ }
+
+ private void deleteColumn(HTable ht, byte[] cf, int rowIdx, int colIdx) throws IOException {
+ byte row[] = Bytes.toBytes("row:" + rowIdx);
+ byte column[] = Bytes.toBytes("column:" + colIdx);
+ Delete del = new Delete(row);
+ del.deleteColumns(cf, column);
+ ht.delete(del);
+ }
+
+ private void deleteFamily(HTable ht, byte[] cf, int rowIdx) throws IOException {
+ byte row[] = Bytes.toBytes("row:" + rowIdx);
+ Delete del = new Delete(row);
+ del.deleteFamily(cf);
+ ht.delete(del);
+ }
+}
+
Modified: hbase/branches/0.90_master_rewrite/src/test/java/org/apache/hadoop/hbase/client/TestScannerTimeout.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.90_master_rewrite/src/test/java/org/apache/hadoop/hbase/client/TestScannerTimeout.java?rev=990018&r1=990017&r2=990018&view=diff
==============================================================================
--- hbase/branches/0.90_master_rewrite/src/test/java/org/apache/hadoop/hbase/client/TestScannerTimeout.java (original)
+++ hbase/branches/0.90_master_rewrite/src/test/java/org/apache/hadoop/hbase/client/TestScannerTimeout.java Fri Aug 27 05:01:02 2010
@@ -24,6 +24,7 @@ import org.apache.commons.logging.LogFac
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.HBaseTestingUtility;
import org.apache.hadoop.hbase.HConstants;
+import org.apache.hadoop.hbase.regionserver.HRegionServer;
import org.apache.hadoop.hbase.util.Bytes;
import org.junit.After;
import org.junit.AfterClass;
@@ -114,9 +115,7 @@ public class TestScannerTimeout {
*/
@Test
public void test2772() throws Exception {
- int rs = TEST_UTIL.getHBaseCluster().getServerWith(
- TEST_UTIL.getHBaseCluster().getRegions(
- TABLE_NAME).get(0).getRegionName());
+ HRegionServer rs = TEST_UTIL.getRSForFirstRegionInTable(TABLE_NAME);
Scan scan = new Scan();
// Set a very high timeout, we want to test what happens when a RS
// fails but the region is recovered before the lease times out.
@@ -128,7 +127,7 @@ public class TestScannerTimeout {
HTable higherScanTimeoutTable = new HTable(conf, TABLE_NAME);
ResultScanner r = higherScanTimeoutTable.getScanner(scan);
// This takes way less than SCANNER_TIMEOUT*100
- TEST_UTIL.getHBaseCluster().getRegionServer(rs).abort("die!");
+ rs.abort("die!");
Result[] results = r.next(NB_ROWS);
assertEquals(NB_ROWS, results.length);
r.close();
Added: hbase/branches/0.90_master_rewrite/src/test/java/org/apache/hadoop/hbase/client/TestTimestampsFilter.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.90_master_rewrite/src/test/java/org/apache/hadoop/hbase/client/TestTimestampsFilter.java?rev=990018&view=auto
==============================================================================
--- hbase/branches/0.90_master_rewrite/src/test/java/org/apache/hadoop/hbase/client/TestTimestampsFilter.java (added)
+++ hbase/branches/0.90_master_rewrite/src/test/java/org/apache/hadoop/hbase/client/TestTimestampsFilter.java Fri Aug 27 05:01:02 2010
@@ -0,0 +1,342 @@
+/**
+ * Copyright 2009 The Apache Software Foundation
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.client;
+
+import static org.junit.Assert.*;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.hbase.HBaseTestingUtility;
+import org.apache.hadoop.hbase.KeyValue;
+import org.apache.hadoop.hbase.filter.Filter;
+import org.apache.hadoop.hbase.filter.TimestampsFilter;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.junit.After;
+import org.junit.AfterClass;
+import org.junit.Before;
+import org.junit.BeforeClass;
+import org.junit.Test;
+
+/**
+ * Run tests related to {@link TimestampsFilter} using HBase client APIs.
+ * Sets up the HBase mini cluster once at start. Each creates a table
+ * named for the method and does its stuff against that.
+ */
+public class TestTimestampsFilter {
+ final Log LOG = LogFactory.getLog(getClass());
+ private final static HBaseTestingUtility TEST_UTIL = new HBaseTestingUtility();
+
+ /**
+ * @throws java.lang.Exception
+ */
+ @BeforeClass
+ public static void setUpBeforeClass() throws Exception {
+ TEST_UTIL.startMiniCluster(3);
+ }
+
+ /**
+ * @throws java.lang.Exception
+ */
+ @AfterClass
+ public static void tearDownAfterClass() throws Exception {
+ TEST_UTIL.shutdownMiniCluster();
+ }
+
+ /**
+ * @throws java.lang.Exception
+ */
+ @Before
+ public void setUp() throws Exception {
+ // Nothing to do.
+ }
+
+ /**
+ * @throws java.lang.Exception
+ */
+ @After
+ public void tearDown() throws Exception {
+ // Nothing to do.
+ }
+
+ /**
+ * Test from client side for TimestampsFilter.
+ *
+ * The TimestampsFilter provides the ability to request cells (KeyValues)
+ * whose timestamp/version is in the specified list of timestamps/version.
+ *
+ * @throws Exception
+ */
+ @Test
+ public void testTimestampsFilter() throws Exception {
+ byte [] TABLE = Bytes.toBytes("testTimestampsFilter");
+ byte [] FAMILY = Bytes.toBytes("event_log");
+ byte [][] FAMILIES = new byte[][] { FAMILY };
+ KeyValue kvs[];
+
+ // create table; set versions to max...
+ HTable ht = TEST_UTIL.createTable(TABLE, FAMILIES, Integer.MAX_VALUE);
+
+ for (int rowIdx = 0; rowIdx < 5; rowIdx++) {
+ for (int colIdx = 0; colIdx < 5; colIdx++) {
+ // insert versions 201..300
+ putNVersions(ht, FAMILY, rowIdx, colIdx, 201, 300);
+ // insert versions 1..100
+ putNVersions(ht, FAMILY, rowIdx, colIdx, 1, 100);
+ }
+ }
+
+ // do some verification before flush
+ verifyInsertedValues(ht, FAMILY);
+
+ flush();
+
+ // do some verification after flush
+ verifyInsertedValues(ht, FAMILY);
+
+ // Insert some more versions after flush. These should be in memstore.
+ // After this we should have data in both memstore & HFiles.
+ for (int rowIdx = 0; rowIdx < 5; rowIdx++) {
+ for (int colIdx = 0; colIdx < 5; colIdx++) {
+ putNVersions(ht, FAMILY, rowIdx, colIdx, 301, 400);
+ putNVersions(ht, FAMILY, rowIdx, colIdx, 101, 200);
+ }
+ }
+
+ for (int rowIdx = 0; rowIdx < 5; rowIdx++) {
+ for (int colIdx = 0; colIdx < 5; colIdx++) {
+ kvs = getNVersions(ht, FAMILY, rowIdx, colIdx,
+ Arrays.asList(505L, 5L, 105L, 305L, 205L));
+ assertEquals(4, kvs.length);
+ checkOneCell(kvs[0], FAMILY, rowIdx, colIdx, 305);
+ checkOneCell(kvs[1], FAMILY, rowIdx, colIdx, 205);
+ checkOneCell(kvs[2], FAMILY, rowIdx, colIdx, 105);
+ checkOneCell(kvs[3], FAMILY, rowIdx, colIdx, 5);
+ }
+ }
+
+ // Request an empty list of versions using the Timestamps filter;
+ // Should return none.
+ kvs = getNVersions(ht, FAMILY, 2, 2, new ArrayList<Long>());
+ assertEquals(0, kvs.length);
+
+ //
+ // Test the filter using a Scan operation
+ // Scan rows 0..4. For each row, get all its columns, but only
+ // those versions of the columns with the specified timestamps.
+ Result[] results = scanNVersions(ht, FAMILY, 0, 4,
+ Arrays.asList(6L, 106L, 306L));
+ assertEquals("# of rows returned from scan", 5, results.length);
+ for (int rowIdx = 0; rowIdx < 5; rowIdx++) {
+ kvs = results[rowIdx].raw();
+ // each row should have 5 columns.
+ // And we have requested 3 versions for each.
+ assertEquals("Number of KeyValues in result for row:" + rowIdx,
+ 3*5, kvs.length);
+ for (int colIdx = 0; colIdx < 5; colIdx++) {
+ int offset = colIdx * 3;
+ checkOneCell(kvs[offset + 0], FAMILY, rowIdx, colIdx, 306);
+ checkOneCell(kvs[offset + 1], FAMILY, rowIdx, colIdx, 106);
+ checkOneCell(kvs[offset + 2], FAMILY, rowIdx, colIdx, 6);
+ }
+ }
+ }
+
+ /**
+ * Test TimestampsFilter in the presence of version deletes.
+ *
+ * @throws Exception
+ */
+ @Test
+ public void testWithVersionDeletes() throws Exception {
+
+ // first test from memstore (without flushing).
+ testWithVersionDeletes(false);
+
+ // run same test against HFiles (by forcing a flush).
+ testWithVersionDeletes(true);
+ }
+
+ private void testWithVersionDeletes(boolean flushTables) throws IOException {
+ byte [] TABLE = Bytes.toBytes("testWithVersionDeletes_" +
+ (flushTables ? "flush" : "noflush"));
+ byte [] FAMILY = Bytes.toBytes("event_log");
+ byte [][] FAMILIES = new byte[][] { FAMILY };
+
+ // create table; set versions to max...
+ HTable ht = TEST_UTIL.createTable(TABLE, FAMILIES, Integer.MAX_VALUE);
+
+ // For row:0, col:0: insert versions 1 through 5.
+ putNVersions(ht, FAMILY, 0, 0, 1, 5);
+
+ // delete version 4.
+ deleteOneVersion(ht, FAMILY, 0, 0, 4);
+
+ if (flushTables) {
+ flush();
+ }
+
+ // request a bunch of versions including the deleted version. We should
+ // only get back entries for the versions that exist.
+ KeyValue kvs[] = getNVersions(ht, FAMILY, 0, 0, Arrays.asList(2L, 3L, 4L, 5L));
+ assertEquals(3, kvs.length);
+ checkOneCell(kvs[0], FAMILY, 0, 0, 5);
+ checkOneCell(kvs[1], FAMILY, 0, 0, 3);
+ checkOneCell(kvs[2], FAMILY, 0, 0, 2);
+ }
+
+ private void verifyInsertedValues(HTable ht, byte[] cf) throws IOException {
+ for (int rowIdx = 0; rowIdx < 5; rowIdx++) {
+ for (int colIdx = 0; colIdx < 5; colIdx++) {
+ // ask for versions that exist.
+ KeyValue[] kvs = getNVersions(ht, cf, rowIdx, colIdx,
+ Arrays.asList(5L, 300L, 6L, 80L));
+ assertEquals(4, kvs.length);
+ checkOneCell(kvs[0], cf, rowIdx, colIdx, 300);
+ checkOneCell(kvs[1], cf, rowIdx, colIdx, 80);
+ checkOneCell(kvs[2], cf, rowIdx, colIdx, 6);
+ checkOneCell(kvs[3], cf, rowIdx, colIdx, 5);
+
+ // ask for versions that do not exist.
+ kvs = getNVersions(ht, cf, rowIdx, colIdx,
+ Arrays.asList(101L, 102L));
+ assertEquals(0, kvs.length);
+
+ // ask for some versions that exist and some that do not.
+ kvs = getNVersions(ht, cf, rowIdx, colIdx,
+ Arrays.asList(1L, 300L, 105L, 70L, 115L));
+ assertEquals(3, kvs.length);
+ checkOneCell(kvs[0], cf, rowIdx, colIdx, 300);
+ checkOneCell(kvs[1], cf, rowIdx, colIdx, 70);
+ checkOneCell(kvs[2], cf, rowIdx, colIdx, 1);
+ }
+ }
+ }
+
+ // Flush tables. Since flushing is asynchronous, sleep for a bit.
+ private void flush() throws IOException {
+ TEST_UTIL.flush();
+ try {
+ Thread.sleep(3000);
+ } catch (InterruptedException i) {
+ // ignore
+ }
+ }
+
+ /**
+ * Assert that the passed in KeyValue has expected contents for the
+ * specified row, column & timestamp.
+ */
+ private void checkOneCell(KeyValue kv, byte[] cf,
+ int rowIdx, int colIdx, long ts) {
+
+ String ctx = "rowIdx=" + rowIdx + "; colIdx=" + colIdx + "; ts=" + ts;
+
+ assertEquals("Row mismatch which checking: " + ctx,
+ "row:"+ rowIdx, Bytes.toString(kv.getRow()));
+
+ assertEquals("ColumnFamily mismatch while checking: " + ctx,
+ Bytes.toString(cf), Bytes.toString(kv.getFamily()));
+
+ assertEquals("Column qualifier mismatch while checking: " + ctx,
+ "column:" + colIdx,
+ Bytes.toString(kv.getQualifier()));
+
+ assertEquals("Timestamp mismatch while checking: " + ctx,
+ ts, kv.getTimestamp());
+
+ assertEquals("Value mismatch while checking: " + ctx,
+ "value-version-" + ts, Bytes.toString(kv.getValue()));
+ }
+
+ /**
+ * Uses the TimestampFilter on a Get to request a specified list of
+ * versions for the row/column specified by rowIdx & colIdx.
+ *
+ */
+ private KeyValue[] getNVersions(HTable ht, byte[] cf, int rowIdx,
+ int colIdx, List<Long> versions)
+ throws IOException {
+ byte row[] = Bytes.toBytes("row:" + rowIdx);
+ byte column[] = Bytes.toBytes("column:" + colIdx);
+ Filter filter = new TimestampsFilter(versions);
+ Get get = new Get(row);
+ get.addColumn(cf, column);
+ get.setFilter(filter);
+ get.setMaxVersions();
+ Result result = ht.get(get);
+
+ return result.raw();
+ }
+
+ /**
+ * Uses the TimestampFilter on a Scan to request a specified list of
+ * versions for the rows from startRowIdx to endRowIdx (both inclusive).
+ */
+ private Result[] scanNVersions(HTable ht, byte[] cf, int startRowIdx,
+ int endRowIdx, List<Long> versions)
+ throws IOException {
+ byte startRow[] = Bytes.toBytes("row:" + startRowIdx);
+ byte endRow[] = Bytes.toBytes("row:" + endRowIdx + 1); // exclusive
+ Filter filter = new TimestampsFilter(versions);
+ Scan scan = new Scan(startRow, endRow);
+ scan.setFilter(filter);
+ scan.setMaxVersions();
+ ResultScanner scanner = ht.getScanner(scan);
+ return scanner.next(endRowIdx - startRowIdx + 1);
+ }
+
+ /**
+ * Insert in specific row/column versions with timestamps
+ * versionStart..versionEnd.
+ */
+ private void putNVersions(HTable ht, byte[] cf, int rowIdx, int colIdx,
+ long versionStart, long versionEnd)
+ throws IOException {
+ byte row[] = Bytes.toBytes("row:" + rowIdx);
+ byte column[] = Bytes.toBytes("column:" + colIdx);
+ Put put = new Put(row);
+
+ for (long idx = versionStart; idx <= versionEnd; idx++) {
+ put.add(cf, column, idx, Bytes.toBytes("value-version-" + idx));
+ }
+
+ ht.put(put);
+ }
+
+ /**
+ * For row/column specified by rowIdx/colIdx, delete the cell
+ * corresponding to the specified version.
+ */
+ private void deleteOneVersion(HTable ht, byte[] cf, int rowIdx,
+ int colIdx, long version)
+ throws IOException {
+ byte row[] = Bytes.toBytes("row:" + rowIdx);
+ byte column[] = Bytes.toBytes("column:" + colIdx);
+ Delete del = new Delete(row);
+ del.deleteColumn(cf, column, version);
+ ht.delete(del);
+ }
+}
+
Added: hbase/branches/0.90_master_rewrite/src/test/java/org/apache/hadoop/hbase/filter/TestColumnPrefixFilter.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.90_master_rewrite/src/test/java/org/apache/hadoop/hbase/filter/TestColumnPrefixFilter.java?rev=990018&view=auto
==============================================================================
--- hbase/branches/0.90_master_rewrite/src/test/java/org/apache/hadoop/hbase/filter/TestColumnPrefixFilter.java (added)
+++ hbase/branches/0.90_master_rewrite/src/test/java/org/apache/hadoop/hbase/filter/TestColumnPrefixFilter.java Fri Aug 27 05:01:02 2010
@@ -0,0 +1,104 @@
+package org.apache.hadoop.hbase.filter;
+
+import static org.junit.Assert.*;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+
+import org.apache.hadoop.hbase.HBaseTestingUtility;
+import org.apache.hadoop.hbase.HColumnDescriptor;
+import org.apache.hadoop.hbase.HRegionInfo;
+import org.apache.hadoop.hbase.HTableDescriptor;
+import org.apache.hadoop.hbase.KeyValue;
+import org.apache.hadoop.hbase.KeyValueTestUtil;
+import org.apache.hadoop.hbase.client.Put;
+import org.apache.hadoop.hbase.client.Scan;
+import org.apache.hadoop.hbase.regionserver.HRegion;
+import org.apache.hadoop.hbase.regionserver.InternalScanner;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.junit.Test;
+
+public class TestColumnPrefixFilter {
+
+ private final static HBaseTestingUtility TEST_UTIL = new
+ HBaseTestingUtility();
+
+ @Test
+ public void testColumnPrefixFilter() throws IOException {
+ String family = "Family";
+ HTableDescriptor htd = new HTableDescriptor("TestColumnPrefixFilter");
+ htd.addFamily(new HColumnDescriptor(family));
+ HRegionInfo info = new HRegionInfo(htd, null, null, false);
+ HRegion region = HRegion.createHRegion(info, HBaseTestingUtility.
+ getTestDir(), TEST_UTIL.getConfiguration());
+
+ List<String> rows = generateRandomWords(100, "row");
+ List<String> columns = generateRandomWords(10000, "column");
+ long maxTimestamp = 2;
+
+ List<KeyValue> kvList = new ArrayList<KeyValue>();
+
+ Map<String, List<KeyValue>> prefixMap = new HashMap<String,
+ List<KeyValue>>();
+
+ prefixMap.put("p", new ArrayList<KeyValue>());
+ prefixMap.put("s", new ArrayList<KeyValue>());
+
+ String valueString = "ValueString";
+
+ for (String row: rows) {
+ Put p = new Put(Bytes.toBytes(row));
+ for (String column: columns) {
+ for (long timestamp = 1; timestamp <= maxTimestamp; timestamp++) {
+ KeyValue kv = KeyValueTestUtil.create(row, family, column, timestamp,
+ valueString);
+ p.add(kv);
+ kvList.add(kv);
+ for (String s: prefixMap.keySet()) {
+ if (column.startsWith(s)) {
+ prefixMap.get(s).add(kv);
+ }
+ }
+ }
+ }
+ region.put(p);
+ }
+
+ ColumnPrefixFilter filter;
+ Scan scan = new Scan();
+ scan.setMaxVersions();
+ for (String s: prefixMap.keySet()) {
+ filter = new ColumnPrefixFilter(Bytes.toBytes(s));
+ scan.setFilter(filter);
+ InternalScanner scanner = region.getScanner(scan);
+ List<KeyValue> results = new ArrayList<KeyValue>();
+ while(scanner.next(results));
+ assertEquals(prefixMap.get(s).size(), results.size());
+ }
+ }
+
+ List<String> generateRandomWords(int numberOfWords, String suffix) {
+ Set<String> wordSet = new HashSet<String>();
+ for (int i = 0; i < numberOfWords; i++) {
+ int lengthOfWords = (int) (Math.random()*2) + 1;
+ char[] wordChar = new char[lengthOfWords];
+ for (int j = 0; j < wordChar.length; j++) {
+ wordChar[j] = (char) (Math.random() * 26 + 97);
+ }
+ String word;
+ if (suffix == null) {
+ word = new String(wordChar);
+ } else {
+ word = new String(wordChar) + suffix;
+ }
+ wordSet.add(word);
+ }
+ List<String> wordList = new ArrayList<String>(wordSet);
+ return wordList;
+ }
+}
Modified: hbase/branches/0.90_master_rewrite/src/test/java/org/apache/hadoop/hbase/io/TestImmutableBytesWritable.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.90_master_rewrite/src/test/java/org/apache/hadoop/hbase/io/TestImmutableBytesWritable.java?rev=990018&r1=990017&r2=990018&view=diff
==============================================================================
--- hbase/branches/0.90_master_rewrite/src/test/java/org/apache/hadoop/hbase/io/TestImmutableBytesWritable.java (original)
+++ hbase/branches/0.90_master_rewrite/src/test/java/org/apache/hadoop/hbase/io/TestImmutableBytesWritable.java Fri Aug 27 05:01:02 2010
@@ -40,6 +40,13 @@ public class TestImmutableBytesWritable
new ImmutableBytesWritable(Bytes.toBytes("xxabc"), 2, 2).hashCode());
}
+ public void testSpecificCompare() {
+ ImmutableBytesWritable ibw1 = new ImmutableBytesWritable(new byte[]{0x0f});
+ ImmutableBytesWritable ibw2 = new ImmutableBytesWritable(new byte[]{0x00, 0x00});
+ ImmutableBytesWritable.Comparator c = new ImmutableBytesWritable.Comparator();
+ assertFalse("ibw1 < ibw2", c.compare( ibw1, ibw2 ) < 0 );
+ }
+
public void testComparison() throws Exception {
runTests("aa", "b", -1);
runTests("aa", "aa", 0);
Modified: hbase/branches/0.90_master_rewrite/src/test/java/org/apache/hadoop/hbase/io/hfile/TestCachedBlockQueue.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.90_master_rewrite/src/test/java/org/apache/hadoop/hbase/io/hfile/TestCachedBlockQueue.java?rev=990018&r1=990017&r2=990018&view=diff
==============================================================================
--- hbase/branches/0.90_master_rewrite/src/test/java/org/apache/hadoop/hbase/io/hfile/TestCachedBlockQueue.java (original)
+++ hbase/branches/0.90_master_rewrite/src/test/java/org/apache/hadoop/hbase/io/hfile/TestCachedBlockQueue.java Fri Aug 27 05:01:02 2010
@@ -20,6 +20,8 @@
package org.apache.hadoop.hbase.io.hfile;
import java.nio.ByteBuffer;
+import java.util.LinkedList;
+
import junit.framework.TestCase;
public class TestCachedBlockQueue extends TestCase {
@@ -57,15 +59,16 @@ public class TestCachedBlockQueue extend
assertEquals(queue.heapSize(), expectedSize);
- org.apache.hadoop.hbase.io.hfile.CachedBlock [] blocks = queue.get();
- assertEquals(blocks[0].getName(), "cb1");
- assertEquals(blocks[1].getName(), "cb2");
- assertEquals(blocks[2].getName(), "cb3");
- assertEquals(blocks[3].getName(), "cb4");
- assertEquals(blocks[4].getName(), "cb5");
- assertEquals(blocks[5].getName(), "cb6");
- assertEquals(blocks[6].getName(), "cb7");
- assertEquals(blocks[7].getName(), "cb8");
+ LinkedList<org.apache.hadoop.hbase.io.hfile.CachedBlock> blocks =
+ queue.get();
+ assertEquals(blocks.poll().getName(), "cb1");
+ assertEquals(blocks.poll().getName(), "cb2");
+ assertEquals(blocks.poll().getName(), "cb3");
+ assertEquals(blocks.poll().getName(), "cb4");
+ assertEquals(blocks.poll().getName(), "cb5");
+ assertEquals(blocks.poll().getName(), "cb6");
+ assertEquals(blocks.poll().getName(), "cb7");
+ assertEquals(blocks.poll().getName(), "cb8");
}
@@ -109,16 +112,16 @@ public class TestCachedBlockQueue extend
assertEquals(queue.heapSize(), expectedSize);
- org.apache.hadoop.hbase.io.hfile.CachedBlock [] blocks = queue.get();
- assertEquals(blocks[0].getName(), "cb0");
- assertEquals(blocks[1].getName(), "cb1");
- assertEquals(blocks[2].getName(), "cb2");
- assertEquals(blocks[3].getName(), "cb3");
- assertEquals(blocks[4].getName(), "cb4");
- assertEquals(blocks[5].getName(), "cb5");
- assertEquals(blocks[6].getName(), "cb6");
- assertEquals(blocks[7].getName(), "cb7");
- assertEquals(blocks[8].getName(), "cb8");
+ LinkedList<org.apache.hadoop.hbase.io.hfile.CachedBlock> blocks = queue.get();
+ assertEquals(blocks.poll().getName(), "cb0");
+ assertEquals(blocks.poll().getName(), "cb1");
+ assertEquals(blocks.poll().getName(), "cb2");
+ assertEquals(blocks.poll().getName(), "cb3");
+ assertEquals(blocks.poll().getName(), "cb4");
+ assertEquals(blocks.poll().getName(), "cb5");
+ assertEquals(blocks.poll().getName(), "cb6");
+ assertEquals(blocks.poll().getName(), "cb7");
+ assertEquals(blocks.poll().getName(), "cb8");
}
@@ -130,5 +133,4 @@ public class TestCachedBlockQueue extend
accessTime,false);
}
}
-
-}
+}
\ No newline at end of file
Modified: hbase/branches/0.90_master_rewrite/src/test/java/org/apache/hadoop/hbase/io/hfile/TestLruBlockCache.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.90_master_rewrite/src/test/java/org/apache/hadoop/hbase/io/hfile/TestLruBlockCache.java?rev=990018&r1=990017&r2=990018&view=diff
==============================================================================
--- hbase/branches/0.90_master_rewrite/src/test/java/org/apache/hadoop/hbase/io/hfile/TestLruBlockCache.java (original)
+++ hbase/branches/0.90_master_rewrite/src/test/java/org/apache/hadoop/hbase/io/hfile/TestLruBlockCache.java Fri Aug 27 05:01:02 2010
@@ -117,6 +117,9 @@ public class TestLruBlockCache extends T
// Expect no evictions
assertEquals(0, cache.getEvictionCount());
+ Thread t = new LruBlockCache.StatisticsThread(cache);
+ t.start();
+ t.join();
}
public void testCacheEvictionSimple() throws Exception {
Added: hbase/branches/0.90_master_rewrite/src/test/java/org/apache/hadoop/hbase/io/hfile/TestReseekTo.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.90_master_rewrite/src/test/java/org/apache/hadoop/hbase/io/hfile/TestReseekTo.java?rev=990018&view=auto
==============================================================================
--- hbase/branches/0.90_master_rewrite/src/test/java/org/apache/hadoop/hbase/io/hfile/TestReseekTo.java (added)
+++ hbase/branches/0.90_master_rewrite/src/test/java/org/apache/hadoop/hbase/io/hfile/TestReseekTo.java Fri Aug 27 05:01:02 2010
@@ -0,0 +1,88 @@
+/**
+ * Copyright 2010 The Apache Software Foundation
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.io.hfile;
+
+import java.util.ArrayList;
+import java.util.List;
+
+import org.apache.hadoop.fs.FSDataOutputStream;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hbase.HBaseTestingUtility;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.junit.Test;
+
+import static org.junit.Assert.*;
+
+/**
+ * Test {@link HFileScanner#reseekTo(byte[])}
+ */
+public class TestReseekTo {
+
+ private final static HBaseTestingUtility TEST_UTIL = new HBaseTestingUtility();
+
+ @Test
+ public void testReseekTo() throws Exception {
+
+ Path ncTFile = new Path(HBaseTestingUtility.getTestDir(), "basic.hfile");
+ FSDataOutputStream fout = TEST_UTIL.getTestFileSystem().create(ncTFile);
+ HFile.Writer writer = new HFile.Writer(fout, 4000, "none", null);
+ int numberOfKeys = 1000;
+
+ String valueString = "Value";
+
+ List<Integer> keyList = new ArrayList<Integer>();
+ List<String> valueList = new ArrayList<String>();
+
+ for (int key = 0; key < numberOfKeys; key++) {
+ String value = valueString + key;
+ keyList.add(key);
+ valueList.add(value);
+ writer.append(Bytes.toBytes(key), Bytes.toBytes(value));
+ }
+ writer.close();
+ fout.close();
+
+ HFile.Reader reader = new HFile.Reader(TEST_UTIL.getTestFileSystem(),
+ ncTFile, null, false);
+ reader.loadFileInfo();
+ HFileScanner scanner = reader.getScanner(false, true);
+
+ scanner.seekTo();
+ for (int i = 0; i < keyList.size(); i++) {
+ Integer key = keyList.get(i);
+ String value = valueList.get(i);
+ long start = System.nanoTime();
+ scanner.seekTo(Bytes.toBytes(key));
+ System.out.println("Seek Finished in: " + (System.nanoTime() - start)/1000 + " micro s");
+ assertEquals(value, scanner.getValueString());
+ }
+
+ scanner.seekTo();
+ for (int i = 0; i < keyList.size(); i += 10) {
+ Integer key = keyList.get(i);
+ String value = valueList.get(i);
+ long start = System.nanoTime();
+ scanner.reseekTo(Bytes.toBytes(key));
+ System.out.println("Reseek Finished in: " + (System.nanoTime() - start)/1000 + " micro s");
+ assertEquals(value, scanner.getValueString());
+ }
+ }
+
+}
\ No newline at end of file