You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hbase.apache.org by st...@apache.org on 2008/08/18 00:03:43 UTC

svn commit: r686650 [2/3] - in /hadoop/hbase/trunk: ./ src/java/org/apache/hadoop/hbase/ src/java/org/apache/hadoop/hbase/client/ src/java/org/apache/hadoop/hbase/client/transactional/ src/java/org/apache/hadoop/hbase/ipc/ src/java/org/apache/hadoop/hb...

Added: hadoop/hbase/trunk/src/java/org/apache/hadoop/hbase/regionserver/transactional/TransactionState.java
URL: http://svn.apache.org/viewvc/hadoop/hbase/trunk/src/java/org/apache/hadoop/hbase/regionserver/transactional/TransactionState.java?rev=686650&view=auto
==============================================================================
--- hadoop/hbase/trunk/src/java/org/apache/hadoop/hbase/regionserver/transactional/TransactionState.java (added)
+++ hadoop/hbase/trunk/src/java/org/apache/hadoop/hbase/regionserver/transactional/TransactionState.java Sun Aug 17 15:03:42 2008
@@ -0,0 +1,296 @@
+/**
+ * Copyright 2008 The Apache Software Foundation
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.regionserver.transactional;
+
+import java.util.ArrayList;
+import java.util.HashSet;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.SortedSet;
+import java.util.TreeMap;
+import java.util.TreeSet;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.hbase.io.BatchOperation;
+import org.apache.hadoop.hbase.io.BatchUpdate;
+import org.apache.hadoop.hbase.io.Cell;
+import org.apache.hadoop.hbase.util.Bytes;
+
+/**
+ * Holds the state of a transaction.
+ */
+class TransactionState {
+
+  private static final Log LOG = LogFactory.getLog(TransactionState.class);
+
+  /** Current status. */
+  public enum Status {
+    /** Initial status, still performing operations. */
+    PENDING,
+    /**
+     * Checked if we can commit, and said yes. Still need to determine the
+     * global decision.
+     */
+    COMMIT_PENDING,
+    /** Committed. */
+    COMMITED,
+    /** Aborted. */
+    ABORTED
+  }
+
+  private final long hLogStartSequenceId;
+  private final long transactionId;
+  private Status status;
+  private SortedSet<byte[]> readSet = new TreeSet<byte[]>(
+      Bytes.BYTES_COMPARATOR);
+  private List<BatchUpdate> writeSet = new LinkedList<BatchUpdate>();
+  private Set<TransactionState> transactionsToCheck = new HashSet<TransactionState>();
+  private int startSequenceNumber;
+  private Integer sequenceNumber;
+  boolean hasScan = false;
+
+  public TransactionState(final long transactionId,
+      final long rLogStartSequenceId) {
+    this.transactionId = transactionId;
+    this.hLogStartSequenceId = rLogStartSequenceId;
+    this.status = Status.PENDING;
+  }
+
+  public void addRead(final byte[] rowKey) {
+    readSet.add(rowKey);
+  }
+
+  public Set<byte[]> getReadSet() {
+    return readSet;
+  }
+
+  public void addWrite(final BatchUpdate write) {
+    writeSet.add(write);
+  }
+
+  public List<BatchUpdate> getWriteSet() {
+    return writeSet;
+  }
+
+  /**
+   * GetFull from the writeSet.
+   * 
+   * @param row
+   * @param columns
+   * @param timestamp
+   * @return
+   */
+  public Map<byte[], Cell> localGetFull(final byte[] row,
+      final Set<byte[]> columns, final long timestamp) {
+    Map<byte[], Cell> results = new TreeMap<byte[], Cell>(
+        Bytes.BYTES_COMPARATOR); // Must use the Bytes Conparator because
+    for (BatchUpdate b : writeSet) {
+      if (!Bytes.equals(row, b.getRow())) {
+        continue;
+      }
+      if (b.getTimestamp() > timestamp) {
+        continue;
+      }
+      for (BatchOperation op : b) {
+        if (!op.isPut()
+            || (columns != null && !columns.contains(op.getColumn()))) {
+          continue;
+        }
+        results.put(op.getColumn(), new Cell(op.getValue(), b.getTimestamp()));
+      }
+    }
+    return results.size() == 0 ? null : results;
+  }
+
+  /**
+   * Get from the writeSet.
+   * 
+   * @param row
+   * @param column
+   * @param timestamp
+   * @return
+   */
+  public Cell[] localGet(final byte[] row, final byte[] column,
+      final long timestamp) {
+    ArrayList<Cell> results = new ArrayList<Cell>();
+
+    // Go in reverse order to put newest updates first in list
+    for (int i = writeSet.size() - 1; i >= 0; i--) {
+      BatchUpdate b = writeSet.get(i);
+
+      if (!Bytes.equals(row, b.getRow())) {
+        continue;
+      }
+      if (b.getTimestamp() > timestamp) {
+        continue;
+      }
+      for (BatchOperation op : b) {
+        if (!op.isPut() || !Bytes.equals(column, op.getColumn())) {
+          continue;
+        }
+        results.add(new Cell(op.getValue(), b.getTimestamp()));
+      }
+    }
+    return results.size() == 0 ? null : results
+        .toArray(new Cell[results.size()]);
+  }
+
+  public void addTransactionToCheck(final TransactionState transaction) {
+    transactionsToCheck.add(transaction);
+  }
+
+  public boolean hasConflict() {
+    for (TransactionState transactionState : transactionsToCheck) {
+      if (hasConflict(transactionState)) {
+        return true;
+      }
+    }
+    return false;
+  }
+
+  private boolean hasConflict(final TransactionState checkAgainst) {
+    if (checkAgainst.getStatus().equals(TransactionState.Status.ABORTED)) {
+      return false; // Cannot conflict with aborted transactions
+    }
+
+    for (BatchUpdate otherUpdate : checkAgainst.getWriteSet()) {
+      if (this.hasScan) {
+        LOG.info("Transaction" + this.toString()
+            + " has a scan read. Meanwile a write occured. "
+            + "Conservitivly reporting conflict");
+        return true;
+      }
+
+      if (this.getReadSet().contains(otherUpdate.getRow())) {
+        LOG.trace("Transaction " + this.toString() + " conflicts with "
+            + checkAgainst.toString());
+        return true;
+      }
+    }
+    return false;
+  }
+
+  /**
+   * Get the status.
+   * 
+   * @return Return the status.
+   */
+  public Status getStatus() {
+    return status;
+  }
+
+  /**
+   * Set the status.
+   * 
+   * @param status The status to set.
+   */
+  public void setStatus(final Status status) {
+    this.status = status;
+  }
+
+  /**
+   * Get the startSequenceNumber.
+   * 
+   * @return Return the startSequenceNumber.
+   */
+  public int getStartSequenceNumber() {
+    return startSequenceNumber;
+  }
+
+  /**
+   * Set the startSequenceNumber.
+   * 
+   * @param startSequenceNumber.
+   */
+  public void setStartSequenceNumber(final int startSequenceNumber) {
+    this.startSequenceNumber = startSequenceNumber;
+  }
+
+  /**
+   * Get the sequenceNumber.
+   * 
+   * @return Return the sequenceNumber.
+   */
+  public Integer getSequenceNumber() {
+    return sequenceNumber;
+  }
+
+  /**
+   * Set the sequenceNumber.
+   * 
+   * @param sequenceNumber The sequenceNumber to set.
+   */
+  public void setSequenceNumber(final Integer sequenceNumber) {
+    this.sequenceNumber = sequenceNumber;
+  }
+
+  @Override
+  public String toString() {
+    StringBuilder result = new StringBuilder();
+    result.append("[transactionId: ");
+    result.append(transactionId);
+    result.append(" status: ");
+    result.append(status.name());
+    result.append(" read Size: ");
+    result.append(readSet.size());
+    result.append(" write Size: ");
+    result.append(writeSet.size());
+    result.append(" startSQ: ");
+    result.append(startSequenceNumber);
+    if (sequenceNumber != null) {
+      result.append(" commitedSQ:");
+      result.append(sequenceNumber);
+    }
+    result.append("]");
+
+    return result.toString();
+  }
+
+  /**
+   * Get the transactionId.
+   * 
+   * @return Return the transactionId.
+   */
+  public long getTransactionId() {
+    return transactionId;
+  }
+
+  /**
+   * Get the startSequenceId.
+   * 
+   * @return Return the startSequenceId.
+   */
+  public long getHLogStartSequenceId() {
+    return hLogStartSequenceId;
+  }
+
+  /**
+   * Set the hasScan.
+   * 
+   * @param hasScan The hasScan to set.
+   */
+  public void setHasScan(final boolean hasScan) {
+    this.hasScan = hasScan;
+  }
+
+}

Added: hadoop/hbase/trunk/src/java/org/apache/hadoop/hbase/regionserver/transactional/TransactionalHLogManager.java
URL: http://svn.apache.org/viewvc/hadoop/hbase/trunk/src/java/org/apache/hadoop/hbase/regionserver/transactional/TransactionalHLogManager.java?rev=686650&view=auto
==============================================================================
--- hadoop/hbase/trunk/src/java/org/apache/hadoop/hbase/regionserver/transactional/TransactionalHLogManager.java (added)
+++ hadoop/hbase/trunk/src/java/org/apache/hadoop/hbase/regionserver/transactional/TransactionalHLogManager.java Sun Aug 17 15:03:42 2008
@@ -0,0 +1,260 @@
+/**
+ * Copyright 2008 The Apache Software Foundation
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.regionserver.transactional;
+
+import java.io.IOException;
+import java.io.UnsupportedEncodingException;
+import java.util.HashSet;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.SortedMap;
+import java.util.TreeMap;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hbase.HBaseConfiguration;
+import org.apache.hadoop.hbase.HConstants;
+import org.apache.hadoop.hbase.HRegionInfo;
+import org.apache.hadoop.hbase.io.BatchOperation;
+import org.apache.hadoop.hbase.io.BatchUpdate;
+import org.apache.hadoop.hbase.regionserver.HLog;
+import org.apache.hadoop.hbase.regionserver.HLogEdit;
+import org.apache.hadoop.hbase.regionserver.HLogKey;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.hadoop.io.SequenceFile;
+import org.apache.hadoop.util.Progressable;
+
+/**
+ * Responsible for writing and reading (recovering) transactional information
+ * to/from the HLog.
+ * 
+ * 
+ */
+class TransactionalHLogManager {
+
+  private static final Log LOG = LogFactory
+      .getLog(TransactionalHLogManager.class);
+
+  private final HLog hlog;
+  private final FileSystem fileSystem;
+  private final HRegionInfo regionInfo;
+  private final HBaseConfiguration conf;
+
+  public TransactionalHLogManager(final TransactionalRegion region) {
+    this.hlog = region.getLog();
+    this.fileSystem = region.getFilesystem();
+    this.regionInfo = region.getRegionInfo();
+    this.conf = region.getConf();
+  }
+
+  // For Testing
+  TransactionalHLogManager(final HLog hlog, final FileSystem fileSystem,
+      final HRegionInfo regionInfo, final HBaseConfiguration conf) {
+    this.hlog = hlog;
+    this.fileSystem = fileSystem;
+    this.regionInfo = regionInfo;
+    this.conf = conf;
+  }
+
+  public void writeStartToLog(final long transactionId) throws IOException {
+    HLogEdit logEdit;
+    logEdit = new HLogEdit(transactionId, HLogEdit.TransactionalOperation.START);
+
+    hlog.append(regionInfo, logEdit);
+  }
+
+  public void writeUpdateToLog(final long transactionId,
+      final BatchUpdate update) throws IOException {
+
+    long commitTime = update.getTimestamp() == HConstants.LATEST_TIMESTAMP ? System
+        .currentTimeMillis()
+        : update.getTimestamp();
+
+    for (BatchOperation op : update) {
+      HLogEdit logEdit = new HLogEdit(transactionId, op, commitTime);
+      hlog.append(regionInfo, update.getRow(), logEdit);
+    }
+  }
+
+  public void writeCommitToLog(final long transactionId) throws IOException {
+    HLogEdit logEdit;
+    logEdit = new HLogEdit(transactionId,
+        HLogEdit.TransactionalOperation.COMMIT);
+
+    hlog.append(regionInfo, logEdit);
+  }
+
+  public void writeAbortToLog(final long transactionId) throws IOException {
+    HLogEdit logEdit;
+    logEdit = new HLogEdit(transactionId, HLogEdit.TransactionalOperation.ABORT);
+
+    hlog.append(regionInfo, logEdit);
+  }
+
+  public Map<Long, List<BatchUpdate>> getCommitsFromLog(
+      final Path reconstructionLog, final long maxSeqID,
+      final Progressable reporter) throws UnsupportedEncodingException,
+      IOException {
+    if (reconstructionLog == null || !fileSystem.exists(reconstructionLog)) {
+      // Nothing to do.
+      return null;
+    }
+    // Check its not empty.
+    FileStatus[] stats = fileSystem.listStatus(reconstructionLog);
+    if (stats == null || stats.length == 0) {
+      LOG.warn("Passed reconstruction log " + reconstructionLog
+          + " is zero-length");
+      return null;
+    }
+
+    SortedMap<Long, List<BatchUpdate>> pendingTransactionsById = new TreeMap<Long, List<BatchUpdate>>();
+    SortedMap<Long, List<BatchUpdate>> commitedTransactionsById = new TreeMap<Long, List<BatchUpdate>>();
+    Set<Long> abortedTransactions = new HashSet<Long>();
+
+    SequenceFile.Reader logReader = new SequenceFile.Reader(fileSystem,
+        reconstructionLog, conf);
+
+    try {
+      HLogKey key = new HLogKey();
+      HLogEdit val = new HLogEdit();
+      long skippedEdits = 0;
+      long totalEdits = 0;
+      long startCount = 0;
+      long writeCount = 0;
+      long abortCount = 0;
+      long commitCount = 0;
+      // How many edits to apply before we send a progress report.
+      int reportInterval = conf.getInt("hbase.hstore.report.interval.edits",
+          2000);
+      while (logReader.next(key, val)) {
+        LOG.debug("Processing edit: key: " + key.toString() + " val: "
+            + val.toString());
+        if (key.getLogSeqNum() < maxSeqID) {
+          skippedEdits++;
+          continue;
+        }
+
+        // Check this edit is for me.
+        byte[] column = val.getColumn();
+        Long transactionId = val.getTransactionId();
+        if (!val.isTransactionEntry() || HLog.isMetaColumn(column)
+            || !Bytes.equals(key.getRegionName(), regionInfo.getRegionName())) {
+          continue;
+        }
+
+        List<BatchUpdate> updates = pendingTransactionsById.get(transactionId);
+        switch (val.getOperation()) {
+        case START:
+          if (updates != null || abortedTransactions.contains(transactionId)
+              || commitedTransactionsById.containsKey(transactionId)) {
+            LOG.error("Processing start for transaction: " + transactionId
+                + ", but have already seen start message");
+            throw new IOException("Corrupted transaction log");
+          }
+          updates = new LinkedList<BatchUpdate>();
+          pendingTransactionsById.put(transactionId, updates);
+          startCount++;
+          break;
+
+        case WRITE:
+          if (updates == null) {
+            LOG.error("Processing edit for transaction: " + transactionId
+                + ", but have not seen start message");
+            throw new IOException("Corrupted transaction log");
+          }
+
+          BatchUpdate tranUpdate = new BatchUpdate(key.getRow());
+          if (val.getVal() != null) {
+            tranUpdate.put(val.getColumn(), val.getVal());
+          } else {
+            tranUpdate.delete(val.getColumn());
+          }
+          updates.add(tranUpdate);
+          writeCount++;
+          break;
+
+        case ABORT:
+          if (updates == null) {
+            LOG.error("Processing abort for transaction: " + transactionId
+                + ", but have not seen start message");
+            throw new IOException("Corrupted transaction log");
+          }
+          abortedTransactions.add(transactionId);
+          pendingTransactionsById.remove(transactionId);
+          abortCount++;
+          break;
+
+        case COMMIT:
+          if (updates == null) {
+            LOG.error("Processing commit for transaction: " + transactionId
+                + ", but have not seen start message");
+            throw new IOException("Corrupted transaction log");
+          }
+          if (abortedTransactions.contains(transactionId)) {
+            LOG.error("Processing commit for transaction: " + transactionId
+                + ", but also have abort message");
+            throw new IOException("Corrupted transaction log");
+          }
+          if (updates.size() == 0) {
+            LOG
+                .warn("Transaciton " + transactionId
+                    + " has no writes in log. ");
+          }
+          if (commitedTransactionsById.containsKey(transactionId)) {
+            LOG.error("Processing commit for transaction: " + transactionId
+                + ", but have already commited transaction with that id");
+            throw new IOException("Corrupted transaction log");
+          }
+          pendingTransactionsById.remove(transactionId);
+          commitedTransactionsById.put(transactionId, updates);
+          commitCount++;
+        }
+        totalEdits++;
+
+        if (reporter != null && (totalEdits % reportInterval) == 0) {
+          reporter.progress();
+        }
+      }
+      if (LOG.isDebugEnabled()) {
+        LOG.debug("Read " + totalEdits + " tranasctional operations (skipped "
+            + skippedEdits + " because sequence id <= " + maxSeqID + "): "
+            + startCount + " starts, " + writeCount + " writes, " + abortCount
+            + " aborts, and " + commitCount + " commits.");
+      }
+    } finally {
+      logReader.close();
+    }
+
+    if (pendingTransactionsById.size() > 0) {
+      LOG
+          .info("Region log has "
+              + pendingTransactionsById.size()
+              + " unfinished transactions. Going to the transaction log to resolve");
+      throw new RuntimeException("Transaction log not yet implemented");
+    }
+
+    return commitedTransactionsById;
+  }
+}

Added: hadoop/hbase/trunk/src/java/org/apache/hadoop/hbase/regionserver/transactional/TransactionalRegion.java
URL: http://svn.apache.org/viewvc/hadoop/hbase/trunk/src/java/org/apache/hadoop/hbase/regionserver/transactional/TransactionalRegion.java?rev=686650&view=auto
==============================================================================
--- hadoop/hbase/trunk/src/java/org/apache/hadoop/hbase/regionserver/transactional/TransactionalRegion.java (added)
+++ hadoop/hbase/trunk/src/java/org/apache/hadoop/hbase/regionserver/transactional/TransactionalRegion.java Sun Aug 17 15:03:42 2008
@@ -0,0 +1,673 @@
+/**
+ * Copyright 2008 The Apache Software Foundation
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.regionserver.transactional;
+
+import java.io.IOException;
+import java.io.UnsupportedEncodingException;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.SortedMap;
+import java.util.TreeMap;
+import java.util.Map.Entry;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hbase.HBaseConfiguration;
+import org.apache.hadoop.hbase.HRegionInfo;
+import org.apache.hadoop.hbase.HStoreKey;
+import org.apache.hadoop.hbase.LeaseException;
+import org.apache.hadoop.hbase.LeaseListener;
+import org.apache.hadoop.hbase.Leases;
+import org.apache.hadoop.hbase.Leases.LeaseStillHeldException;
+import org.apache.hadoop.hbase.client.transactional.UnknownTransactionException;
+import org.apache.hadoop.hbase.filter.RowFilterInterface;
+import org.apache.hadoop.hbase.io.BatchUpdate;
+import org.apache.hadoop.hbase.io.Cell;
+import org.apache.hadoop.hbase.regionserver.FlushRequester;
+import org.apache.hadoop.hbase.regionserver.HLog;
+import org.apache.hadoop.hbase.regionserver.HRegion;
+import org.apache.hadoop.hbase.regionserver.HStore;
+import org.apache.hadoop.hbase.regionserver.InternalScanner;
+import org.apache.hadoop.hbase.regionserver.transactional.TransactionState.Status;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.hadoop.util.Progressable;
+
+/**
+ * Regionserver which provides transactional support for atomic transactions.
+ * This is achieved with optimistic concurrency control (see
+ * http://www.seas.upenn.edu/~zives/cis650/papers/opt-cc.pdf). We keep track
+ * read and write sets for each transaction, and hold off on processing the
+ * writes. To decide to commit a transaction we check its read sets with all
+ * transactions that have committed while it was running for overlaps.
+ * <p>
+ * Because transactions can span multiple regions, all regions must agree to
+ * commit a transactions. The client side of this commit protocol is encoded in
+ * org.apache.hadoop.hbase.client.transactional.TransactionManger
+ * <p>
+ * In the event of an failure of the client mid-commit, (after we voted yes), we
+ * will have to consult the transaction log to determine the final decision of
+ * the transaction. This is not yet implemented.
+ */
+class TransactionalRegion extends HRegion {
+
+  private static final String LEASE_TIME = "hbase.transaction.leaseTime";
+  private static final int DEFAULT_LEASE_TIME = 60 * 1000;
+  private static final int LEASE_CHECK_FREQUENCY = 1000;
+  
+  private static final String OLD_TRANSACTION_FLUSH = "hbase.transaction.flush";
+  private static final int DEFAULT_OLD_TRANSACTION_FLUSH = 100; // Do a flush if we have this many old transactions..
+  
+
+  private static final Log LOG = LogFactory.getLog(TransactionalRegion.class);
+
+  // Collection of active transactions (PENDING) keyed by id.
+  private Map<String, TransactionState> transactionsById = new HashMap<String, TransactionState>();
+
+  // Map of recent transactions that are COMMIT_PENDING or COMMITED keyed by
+  // their sequence number
+  private SortedMap<Integer, TransactionState> commitedTransactionsBySequenceNumber = Collections
+      .synchronizedSortedMap(new TreeMap<Integer, TransactionState>());
+
+  // Collection of transactions that are COMMIT_PENDING
+  private Set<TransactionState> commitPendingTransactions = Collections
+      .synchronizedSet(new HashSet<TransactionState>());
+
+  private final Leases transactionLeases;
+  private AtomicInteger nextSequenceId = new AtomicInteger(0);
+  private Object commitCheckLock = new Object();
+  private TransactionalHLogManager logManager;
+  private final int oldTransactionFlushTrigger;
+
+  public TransactionalRegion(final Path basedir, final HLog log,
+      final FileSystem fs, final HBaseConfiguration conf,
+      final HRegionInfo regionInfo, final FlushRequester flushListener) {
+    super(basedir, log, fs, conf, regionInfo, flushListener);
+    transactionLeases = new Leases(conf.getInt(LEASE_TIME, DEFAULT_LEASE_TIME),
+        LEASE_CHECK_FREQUENCY);
+    logManager = new TransactionalHLogManager(this);
+    oldTransactionFlushTrigger = conf.getInt(OLD_TRANSACTION_FLUSH, DEFAULT_OLD_TRANSACTION_FLUSH);
+  }
+
+  @Override
+  protected void doReconstructionLog(final Path oldLogFile,
+      final long minSeqId, final long maxSeqId, final Progressable reporter)
+      throws UnsupportedEncodingException, IOException {
+    super.doReconstructionLog(oldLogFile, minSeqId, maxSeqId, reporter);
+
+    Map<Long, List<BatchUpdate>> commitedTransactionsById = logManager
+        .getCommitsFromLog(oldLogFile, minSeqId, reporter);
+
+    if (commitedTransactionsById != null && commitedTransactionsById.size() > 0) {
+      LOG.debug("found " + commitedTransactionsById.size()
+          + " COMMITED transactions");
+
+      for (Entry<Long, List<BatchUpdate>> entry : commitedTransactionsById
+          .entrySet()) {
+        LOG.debug("Writing " + entry.getValue().size()
+            + " updates for transaction " + entry.getKey());
+        for (BatchUpdate b : entry.getValue()) {
+          super.batchUpdate(b, true); // These are walled so they live forever
+        }
+      }
+
+      // LOG.debug("Flushing cache"); // We must trigger a cache flush,
+      // otherwise
+      // we will would ignore the log on subsequent failure
+      // if (!super.flushcache()) {
+      // LOG.warn("Did not flush cache");
+      // }
+    }
+  }
+
+  /**
+   * We need to make sure that we don't complete a cache flush between running
+   * transactions. If we did, then we would not find all log messages needed to
+   * restore the transaction, as some of them would be before the last
+   * "complete" flush id.
+   */
+  @Override
+  protected long getCompleteCacheFlushSequenceId(final long currentSequenceId) {
+    long minPendingStartSequenceId = currentSequenceId;
+    for (TransactionState transactionState : transactionsById.values()) {
+      minPendingStartSequenceId = Math.min(minPendingStartSequenceId,
+          transactionState.getHLogStartSequenceId());
+    }
+    return minPendingStartSequenceId;
+  }
+
+  public void beginTransaction(final long transactionId) throws IOException {
+    String key = String.valueOf(transactionId);
+    if (transactionsById.get(key) != null) {
+      TransactionState alias = getTransactionState(transactionId);
+      if (alias != null) {
+        alias.setStatus(Status.ABORTED);
+        retireTransaction(alias);
+      }
+      throw new IOException("Already exiting transaction id: " + key);
+    }
+
+    TransactionState state = new TransactionState(transactionId, super.getLog()
+        .getSequenceNumber());
+
+    // Order is important here
+    for (TransactionState commitPending : commitPendingTransactions) {
+      state.addTransactionToCheck(commitPending);
+    }
+    state.setStartSequenceNumber(nextSequenceId.get());
+
+    transactionsById.put(String.valueOf(key), state);
+    try {
+      transactionLeases.createLease(key, new TransactionLeaseListener(key));
+    } catch (LeaseStillHeldException e) {
+      throw new RuntimeException(e);
+    }
+    LOG.debug("Begining transaction " + key + " in region "
+        + super.getRegionInfo().getRegionNameAsString());
+    logManager.writeStartToLog(transactionId);
+    
+    maybeTriggerOldTransactionFlush();
+  }
+
+  /**
+   * Fetch a single data item.
+   * 
+   * @param transactionId
+   * @param row
+   * @param column
+   * @return column value
+   * @throws IOException
+   */
+  public Cell get(final long transactionId, final byte[] row,
+      final byte[] column) throws IOException {
+    Cell[] results = get(transactionId, row, column, 1);
+    return (results == null || results.length == 0) ? null : results[0];
+  }
+
+  /**
+   * Fetch multiple versions of a single data item
+   * 
+   * @param transactionId
+   * @param row
+   * @param column
+   * @param numVersions
+   * @return array of values one element per version
+   * @throws IOException
+   */
+  public Cell[] get(final long transactionId, final byte[] row,
+      final byte[] column, final int numVersions) throws IOException {
+    return get(transactionId, row, column, Long.MAX_VALUE, numVersions);
+  }
+
+  /**
+   * Fetch multiple versions of a single data item, with timestamp.
+   * 
+   * @param transactionId
+   * @param row
+   * @param column
+   * @param timestamp
+   * @param numVersions
+   * @return array of values one element per version that matches the timestamp
+   * @throws IOException
+   */
+  public Cell[] get(final long transactionId, final byte[] row,
+      final byte[] column, final long timestamp, final int numVersions)
+      throws IOException {
+    TransactionState state = getTransactionState(transactionId);
+
+    state.addRead(row);
+
+    Cell[] localCells = state.localGet(row, column, timestamp);
+
+    if (localCells != null && localCells.length > 0) {
+      LOG
+          .trace("Transactional get of something we've written in the same transaction "
+              + transactionId);
+      LOG.trace("row: " + Bytes.toString(row));
+      LOG.trace("col: " + Bytes.toString(column));
+      LOG.trace("numVersions: " + numVersions);
+      for (Cell cell : localCells) {
+        LOG.trace("cell: " + Bytes.toString(cell.getValue()));
+      }
+
+      if (numVersions > 1) {
+        Cell[] globalCells = get(row, column, timestamp, numVersions - 1);
+        Cell[] result = new Cell[globalCells.length + localCells.length];
+        System.arraycopy(localCells, 0, result, 0, localCells.length);
+        System.arraycopy(globalCells, 0, result, localCells.length,
+            globalCells.length);
+        return result;
+      }
+      return localCells;
+    }
+
+    return get(row, column, timestamp, numVersions);
+  }
+
+  /**
+   * Fetch all the columns for the indicated row at a specified timestamp.
+   * Returns a TreeMap that maps column names to values.
+   * 
+   * @param transactionId
+   * @param row
+   * @param columns Array of columns you'd like to retrieve. When null, get all.
+   * @param ts
+   * @return Map<columnName, Cell> values
+   * @throws IOException
+   */
+  public Map<byte[], Cell> getFull(final long transactionId, final byte[] row,
+      final Set<byte[]> columns, final long ts) throws IOException {
+    TransactionState state = getTransactionState(transactionId);
+
+    state.addRead(row);
+
+    Map<byte[], Cell> localCells = state.localGetFull(row, columns, ts);
+
+    if (localCells != null && localCells.size() > 0) {
+      LOG
+          .trace("Transactional get of something we've written in the same transaction "
+              + transactionId);
+      LOG.trace("row: " + Bytes.toString(row));
+      for (Entry<byte[], Cell> entry : localCells.entrySet()) {
+        LOG.trace("col: " + Bytes.toString(entry.getKey()));
+        LOG.trace("cell: " + Bytes.toString(entry.getValue().getValue()));
+      }
+
+      Map<byte[], Cell> internalResults = getFull(row, columns, ts, null);
+      internalResults.putAll(localCells);
+      return internalResults;
+    }
+
+    return getFull(row, columns, ts, null);
+  }
+
+  /**
+   * Return an iterator that scans over the HRegion, returning the indicated
+   * columns for only the rows that match the data filter. This Iterator must be
+   * closed by the caller.
+   * 
+   * @param transactionId
+   * @param cols columns to scan. If column name is a column family, all columns
+   * of the specified column family are returned. Its also possible to pass a
+   * regex in the column qualifier. A column qualifier is judged to be a regex
+   * if it contains at least one of the following characters:
+   * <code>\+|^&*$[]]}{)(</code>.
+   * @param firstRow row which is the starting point of the scan
+   * @param timestamp only return rows whose timestamp is <= this value
+   * @param filter row filter
+   * @return InternalScanner
+   * @throws IOException
+   */
+  public InternalScanner getScanner(final long transactionId,
+      final byte[][] cols, final byte[] firstRow, final long timestamp,
+      final RowFilterInterface filter) throws IOException {
+    return new ScannerWrapper(transactionId, super.getScanner(cols, firstRow,
+        timestamp, filter));
+  }
+
+  /**
+   * Add a write to the transaction. Does not get applied until commit process.
+   * 
+   * @param b
+   * @throws IOException
+   */
+  public void batchUpdate(final long transactionId, final BatchUpdate b)
+      throws IOException {
+    TransactionState state = getTransactionState(transactionId);
+    state.addWrite(b);
+    logManager.writeUpdateToLog(transactionId, b);
+  }
+
+  /**
+   * Add a delete to the transaction. Does not get applied until commit process.
+   * FIXME, not sure about this approach
+   * 
+   * @param b
+   * @throws IOException
+   */
+  public void deleteAll(final long transactionId, final byte[] row,
+      final long timestamp) throws IOException {
+    TransactionState state = getTransactionState(transactionId);
+    long now = System.currentTimeMillis();
+
+    for (HStore store : super.stores.values()) {
+      List<HStoreKey> keys = store.getKeys(new HStoreKey(row, timestamp),
+          ALL_VERSIONS, now);
+      BatchUpdate deleteUpdate = new BatchUpdate(row, timestamp);
+
+      for (HStoreKey key : keys) {
+        deleteUpdate.delete(key.getColumn());
+      }
+      
+      state.addWrite(deleteUpdate);
+      logManager.writeUpdateToLog(transactionId, deleteUpdate);
+
+    }
+
+  }
+
+  public boolean commitRequest(final long transactionId) throws IOException {
+    synchronized (commitCheckLock) {
+      TransactionState state = getTransactionState(transactionId);
+      if (state == null) {
+        return false;
+      }
+
+      if (hasConflict(state)) {
+        state.setStatus(Status.ABORTED);
+        retireTransaction(state);
+        return false;
+      }
+
+      // No conflicts, we can commit.
+      LOG.trace("No conflicts for transaction " + transactionId
+          + " found in region " + super.getRegionInfo().getRegionNameAsString()
+          + ". Voting for commit");
+      state.setStatus(Status.COMMIT_PENDING);
+
+      // If there are writes we must keep record off the transaction
+      if (state.getWriteSet().size() > 0) {
+        // Order is important
+        commitPendingTransactions.add(state);
+        state.setSequenceNumber(nextSequenceId.getAndIncrement());
+        commitedTransactionsBySequenceNumber.put(state.getSequenceNumber(),
+            state);
+      }
+
+      return true;
+    }
+  }
+
+  private boolean hasConflict(final TransactionState state) {
+    // Check transactions that were committed while we were running
+    for (int i = state.getStartSequenceNumber(); i < nextSequenceId.get(); i++) {
+      TransactionState other = commitedTransactionsBySequenceNumber.get(i);
+      if (other == null) {
+        continue;
+      }
+      state.addTransactionToCheck(other);
+    }
+
+    return state.hasConflict();
+  }
+
+  /**
+   * Commit the transaction.
+   * 
+   * @param transactionId
+   * @return
+   * @throws IOException
+   */
+  public void commit(final long transactionId) throws IOException {
+    TransactionState state;
+    try {
+      state = getTransactionState(transactionId);
+    } catch (UnknownTransactionException e) {
+      LOG.fatal("Asked to commit unknown transaction: " + transactionId
+          + " in region " + super.getRegionInfo().getRegionNameAsString());
+      // FIXME Write to the transaction log that this transaction was corrupted
+      throw e;
+    }
+
+    if (!state.getStatus().equals(Status.COMMIT_PENDING)) {
+      LOG.fatal("Asked to commit a non pending transaction");
+      // FIXME Write to the transaction log that this transaction was corrupted
+      throw new IOException("commit failure");
+    }
+
+    commit(state);
+  }
+
+  /**
+   * Commit the transaction.
+   * 
+   * @param transactionId
+   * @return
+   * @throws IOException
+   */
+  public void abort(final long transactionId) throws IOException {
+    TransactionState state;
+    try {
+      state = getTransactionState(transactionId);
+    } catch (UnknownTransactionException e) {
+      LOG.error("Asked to abort unknown transaction: " + transactionId);
+      return;
+    }
+
+    state.setStatus(Status.ABORTED);
+
+    if (state.getWriteSet().size() > 0) {
+      logManager.writeAbortToLog(state.getTransactionId());
+    }
+
+    // Following removes needed if we have voted
+    if (state.getSequenceNumber() != null) {
+      commitedTransactionsBySequenceNumber.remove(state.getSequenceNumber());
+    }
+    commitPendingTransactions.remove(state);
+
+    retireTransaction(state);
+  }
+
+  private void commit(final TransactionState state) throws IOException {
+
+    LOG.debug("Commiting transaction: " + state.toString() + " to "
+        + super.getRegionInfo().getRegionNameAsString());
+
+    if (state.getWriteSet().size() > 0) {
+      logManager.writeCommitToLog(state.getTransactionId());
+    }
+
+    for (BatchUpdate update : state.getWriteSet()) {
+      super.batchUpdate(update, false); // Don't need to WAL these
+      // FIME, maybe should be walled so we don't need to look so far back.
+    }
+
+    state.setStatus(Status.COMMITED);
+    if (state.getWriteSet().size() > 0
+        && !commitPendingTransactions.remove(state)) {
+      LOG
+          .fatal("Commiting a non-query transaction that is not in commitPendingTransactions");
+      throw new IOException("commit failure"); // FIXME, how to handle?
+    }
+    retireTransaction(state);
+  }
+
+  // Cancel leases, and removed from lease lookup. This transaction may still
+  // live in commitedTransactionsBySequenceNumber and commitPendingTransactions
+  private void retireTransaction(final TransactionState state) {
+    String key = String.valueOf(state.getTransactionId());
+    try {
+      transactionLeases.cancelLease(key);
+    } catch (LeaseException e) {
+      // Ignore
+    }
+
+    transactionsById.remove(key);
+  }
+
+  private TransactionState getTransactionState(final long transactionId)
+      throws UnknownTransactionException {
+    String key = String.valueOf(transactionId);
+    TransactionState state = null;
+
+    state = transactionsById.get(key);
+
+    if (state == null) {
+      LOG.trace("Unknown transaction: " + key);
+      throw new UnknownTransactionException(key);
+    }
+
+    try {
+      transactionLeases.renewLease(key);
+    } catch (LeaseException e) {
+      throw new RuntimeException(e);
+    }
+
+    return state;
+  }
+
+  private void maybeTriggerOldTransactionFlush() {
+      if (commitedTransactionsBySequenceNumber.size() > oldTransactionFlushTrigger) {
+        removeUnNeededCommitedTransactions();
+      }
+  }
+  
+  /**
+   * Cleanup references to committed transactions that are no longer needed.
+   * 
+   */
+  synchronized void removeUnNeededCommitedTransactions() {
+    Integer minStartSeqNumber = getMinStartSequenceNumber();
+    if (minStartSeqNumber == null) {
+      minStartSeqNumber = Integer.MAX_VALUE; // Remove all
+    }
+
+    int numRemoved = 0;
+    // Copy list to avoid conc update exception
+    for (Entry<Integer, TransactionState> entry : new LinkedList<Entry<Integer, TransactionState>>(
+        commitedTransactionsBySequenceNumber.entrySet())) {
+      if (entry.getKey() >= minStartSeqNumber) {
+        break;
+      }
+      numRemoved = numRemoved
+          + (commitedTransactionsBySequenceNumber.remove(entry.getKey()) == null ? 0
+              : 1);
+      numRemoved++;
+    }
+
+    if (numRemoved > 0) {
+      LOG.debug("Removed " + numRemoved
+          + " commited transactions with sequence lower than "
+          + minStartSeqNumber + ". Still have "
+          + commitedTransactionsBySequenceNumber.size() + " left");
+    } else if (commitedTransactionsBySequenceNumber.size() > 0) {
+      LOG.debug("Could not remove any transactions, and still have "
+          + commitedTransactionsBySequenceNumber.size() + " left");
+    }
+  }
+
+  private Integer getMinStartSequenceNumber() {
+    Integer min = null;
+    for (TransactionState transactionState : transactionsById.values()) {
+      if (min == null || transactionState.getStartSequenceNumber() < min) {
+        min = transactionState.getStartSequenceNumber();
+      }
+    }
+    return min;
+  }
+
+  // TODO, resolve from the global transaction log
+  @SuppressWarnings("unused")
+  private void resolveTransactionFromLog(final long transactionId) {
+    throw new RuntimeException("Globaql transaction log is not Implemented");
+  }
+
+  private class TransactionLeaseListener implements LeaseListener {
+    private final String transactionName;
+
+    TransactionLeaseListener(final String n) {
+      this.transactionName = n;
+    }
+
+    /** {@inheritDoc} */
+    public void leaseExpired() {
+      LOG.info("Transaction " + this.transactionName + " lease expired");
+      TransactionState s = null;
+      synchronized (transactionsById) {
+        s = transactionsById.remove(transactionName);
+      }
+      if (s == null) {
+        LOG.warn("Unknown transaction expired " + this.transactionName);
+        return;
+      }
+
+      switch (s.getStatus()) {
+      case PENDING:
+        s.setStatus(Status.ABORTED); // Other transactions may have a ref
+        break;
+      case COMMIT_PENDING:
+        LOG.info("Transaction " + s.getTransactionId()
+            + " expired in COMMIT_PENDING state");
+        LOG.info("Checking transaction status in transaction log");
+        resolveTransactionFromLog(s.getTransactionId());
+        break;
+      default:
+        LOG.warn("Unexpected status on expired lease");
+      }
+    }
+  }
+
+  /** Wrapper which keeps track of rows returned by scanner. */
+  private class ScannerWrapper implements InternalScanner {
+    private long transactionId;
+    private InternalScanner scanner;
+
+    public ScannerWrapper(final long transactionId,
+        final InternalScanner scanner) {
+      this.transactionId = transactionId;
+      this.scanner = scanner;
+    }
+
+    public void close() throws IOException {
+      scanner.close();
+    }
+
+    public boolean isMultipleMatchScanner() {
+      return scanner.isMultipleMatchScanner();
+    }
+
+    public boolean isWildcardScanner() {
+      return scanner.isWildcardScanner();
+    }
+
+    public boolean next(final HStoreKey key,
+        final SortedMap<byte[], Cell> results) throws IOException {
+      boolean result = scanner.next(key, results);
+      TransactionState state = getTransactionState(transactionId);
+      state.setHasScan(true);
+      // FIXME, not using row, just claiming read over the whole region. We are
+      // being very conservative on scans to avoid phantom reads.
+      state.addRead(key.getRow());
+
+      if (result) {
+        Map<byte[], Cell> localWrites = state.localGetFull(key.getRow(), null,
+            Integer.MAX_VALUE);
+        if (localWrites != null) {
+          LOG
+              .info("Scanning over row that has been writen to "
+                  + transactionId);
+          for (Entry<byte[], Cell> entry : localWrites.entrySet()) {
+            results.put(entry.getKey(), entry.getValue());
+          }
+        }
+      }
+
+      return result;
+    }
+  }
+}

Added: hadoop/hbase/trunk/src/java/org/apache/hadoop/hbase/regionserver/transactional/TransactionalRegionServer.java
URL: http://svn.apache.org/viewvc/hadoop/hbase/trunk/src/java/org/apache/hadoop/hbase/regionserver/transactional/TransactionalRegionServer.java?rev=686650&view=auto
==============================================================================
--- hadoop/hbase/trunk/src/java/org/apache/hadoop/hbase/regionserver/transactional/TransactionalRegionServer.java (added)
+++ hadoop/hbase/trunk/src/java/org/apache/hadoop/hbase/regionserver/transactional/TransactionalRegionServer.java Sun Aug 17 15:03:42 2008
@@ -0,0 +1,296 @@
+/**
+ * Copyright 2008 The Apache Software Foundation
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.regionserver.transactional;
+
+import java.io.IOException;
+import java.lang.Thread.UncaughtExceptionHandler;
+import java.util.Arrays;
+import java.util.Map;
+import java.util.Set;
+import java.util.TreeSet;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.hbase.HBaseConfiguration;
+import org.apache.hadoop.hbase.HConstants;
+import org.apache.hadoop.hbase.HRegionInfo;
+import org.apache.hadoop.hbase.HServerAddress;
+import org.apache.hadoop.hbase.HTableDescriptor;
+import org.apache.hadoop.hbase.NotServingRegionException;
+import org.apache.hadoop.hbase.RemoteExceptionHandler;
+import org.apache.hadoop.hbase.filter.RowFilterInterface;
+import org.apache.hadoop.hbase.io.BatchUpdate;
+import org.apache.hadoop.hbase.io.Cell;
+import org.apache.hadoop.hbase.io.HbaseMapWritable;
+import org.apache.hadoop.hbase.io.RowResult;
+import org.apache.hadoop.hbase.ipc.TransactionalRegionInterface;
+import org.apache.hadoop.hbase.regionserver.HRegion;
+import org.apache.hadoop.hbase.regionserver.HRegionServer;
+import org.apache.hadoop.hbase.regionserver.InternalScanner;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.hadoop.hbase.util.Threads;
+import org.apache.hadoop.io.MapWritable;
+import org.apache.hadoop.util.Progressable;
+
+/**
+ * RegionServer with support for transactions. Transactional logic is at the
+ * region level, so we mostly just delegate to the appropriate
+ * TransactionalRegion.
+ */
+public class TransactionalRegionServer extends HRegionServer implements
+    TransactionalRegionInterface {
+  static final Log LOG = LogFactory.getLog(TransactionalRegionServer.class);
+
+  private final CleanOldTransactionsChore cleanOldTransactionsThread;
+
+  public TransactionalRegionServer(final HBaseConfiguration conf)
+      throws IOException {
+    this(new HServerAddress(conf.get(REGIONSERVER_ADDRESS,
+        DEFAULT_REGIONSERVER_ADDRESS)), conf);
+  }
+
+  public TransactionalRegionServer(final HServerAddress address,
+      final HBaseConfiguration conf) throws IOException {
+    super(address, conf);
+    cleanOldTransactionsThread = new CleanOldTransactionsChore(this,
+        super.stopRequested);
+  }
+
+  /** {@inheritDoc} */
+  @Override
+  public long getProtocolVersion(final String protocol, final long clientVersion)
+      throws IOException {
+    if (protocol.equals(TransactionalRegionInterface.class.getName())) {
+      return TransactionalRegionInterface.versionID;
+    }
+    return super.getProtocolVersion(protocol, clientVersion);
+  }
+
+  @Override
+  protected void init(final MapWritable c) throws IOException {
+    super.init(c);
+    String n = Thread.currentThread().getName();
+    UncaughtExceptionHandler handler = new UncaughtExceptionHandler() {
+      public void uncaughtException(final Thread t, final Throwable e) {
+        abort();
+        LOG.fatal("Set stop flag in " + t.getName(), e);
+      }
+    };
+    Threads.setDaemonThreadRunning(this.cleanOldTransactionsThread, n
+        + ".oldTransactionCleaner", handler);
+
+  }
+
+  @Override
+  protected HRegion instantiateRegion(final HRegionInfo regionInfo)
+      throws IOException {
+    HRegion r = new TransactionalRegion(HTableDescriptor.getTableDir(super
+        .getRootDir(), regionInfo.getTableDesc().getName()), super.log, super
+        .getFileSystem(), super.conf, regionInfo, super.getFlushRequester());
+    r.initialize(null, new Progressable() {
+      public void progress() {
+        addProcessingMessage(regionInfo);
+      }
+    });
+    return r;
+  }
+
+  protected TransactionalRegion getTransactionalRegion(final byte[] regionName)
+      throws NotServingRegionException {
+    return (TransactionalRegion) super.getRegion(regionName);
+  }
+
+  public void abort(final byte[] regionName, final long transactionId)
+      throws IOException {
+    checkOpen();
+    super.getRequestCount().incrementAndGet();
+    try {
+      getTransactionalRegion(regionName).abort(transactionId);
+    } catch (IOException e) {
+      checkFileSystem();
+      throw e;
+    }
+  }
+
+  public void batchUpdate(final long transactionId, final byte[] regionName,
+      final BatchUpdate b) throws IOException {
+    checkOpen();
+    super.getRequestCount().incrementAndGet();
+    try {
+      getTransactionalRegion(regionName).batchUpdate(transactionId, b);
+    } catch (IOException e) {
+      checkFileSystem();
+      throw e;
+    }
+  }
+
+  public void commit(final byte[] regionName, final long transactionId)
+      throws IOException {
+    checkOpen();
+    super.getRequestCount().incrementAndGet();
+    try {
+      getTransactionalRegion(regionName).commit(transactionId);
+    } catch (IOException e) {
+      checkFileSystem();
+      throw e;
+    }
+  }
+
+  public boolean commitRequest(final byte[] regionName, final long transactionId)
+      throws IOException {
+    checkOpen();
+    super.getRequestCount().incrementAndGet();
+    try {
+      return getTransactionalRegion(regionName).commitRequest(transactionId);
+    } catch (IOException e) {
+      checkFileSystem();
+      throw e;
+    }
+  }
+
+  public Cell get(final long transactionId, final byte[] regionName,
+      final byte[] row, final byte[] column) throws IOException {
+    checkOpen();
+    super.getRequestCount().incrementAndGet();
+    try {
+      return getTransactionalRegion(regionName).get(transactionId, row, column);
+    } catch (IOException e) {
+      checkFileSystem();
+      throw e;
+    }
+  }
+
+  public Cell[] get(final long transactionId, final byte[] regionName,
+      final byte[] row, final byte[] column, final int numVersions)
+      throws IOException {
+    checkOpen();
+    super.getRequestCount().incrementAndGet();
+    try {
+      return getTransactionalRegion(regionName).get(transactionId, row, column,
+          numVersions);
+    } catch (IOException e) {
+      checkFileSystem();
+      throw e;
+    }
+  }
+
+  public Cell[] get(final long transactionId, final byte[] regionName,
+      final byte[] row, final byte[] column, final long timestamp,
+      final int numVersions) throws IOException {
+    checkOpen();
+    super.getRequestCount().incrementAndGet();
+    try {
+      return getTransactionalRegion(regionName).get(transactionId, row, column,
+          timestamp, numVersions);
+    } catch (IOException e) {
+      checkFileSystem();
+      throw e;
+    }
+  }
+
+  public RowResult getRow(final long transactionId, final byte[] regionName,
+      final byte[] row, final long ts) throws IOException {
+    return getRow(transactionId, regionName, row, null, ts);
+  }
+
+  public RowResult getRow(final long transactionId, final byte[] regionName,
+      final byte[] row, final byte[][] columns) throws IOException {
+    return getRow(transactionId, regionName, row, columns,
+        HConstants.LATEST_TIMESTAMP);
+  }
+
+  public RowResult getRow(final long transactionId, final byte[] regionName,
+      final byte[] row, final byte[][] columns, final long ts)
+      throws IOException {
+    checkOpen();
+    super.getRequestCount().incrementAndGet();
+    try {
+      // convert the columns array into a set so it's easy to check later.
+      Set<byte[]> columnSet = null;
+      if (columns != null) {
+        columnSet = new TreeSet<byte[]>(Bytes.BYTES_COMPARATOR);
+        columnSet.addAll(Arrays.asList(columns));
+      }
+
+      TransactionalRegion region = getTransactionalRegion(regionName);
+      Map<byte[], Cell> map = region.getFull(transactionId, row, columnSet, ts);
+      HbaseMapWritable<byte[], Cell> result = new HbaseMapWritable<byte[], Cell>();
+      result.putAll(map);
+      return new RowResult(row, result);
+    } catch (IOException e) {
+      checkFileSystem();
+      throw e;
+    }
+
+  }
+
+  /** {@inheritDoc} */
+  public void deleteAll(final long transactionId, final byte[] regionName,
+      final byte[] row, final long timestamp) throws IOException {
+    checkOpen();
+    super.getRequestCount().incrementAndGet();
+    try {
+      TransactionalRegion region = getTransactionalRegion(regionName);
+      region.deleteAll(transactionId, row, timestamp);
+    } catch (IOException e) {
+      checkFileSystem();
+      throw e;
+    }
+  }
+
+  public long openScanner(final long transactionId, final byte[] regionName,
+      final byte[][] cols, final byte[] firstRow, final long timestamp,
+      final RowFilterInterface filter) throws IOException {
+    checkOpen();
+    NullPointerException npe = null;
+    if (regionName == null) {
+      npe = new NullPointerException("regionName is null");
+    } else if (cols == null) {
+      npe = new NullPointerException("columns to scan is null");
+    } else if (firstRow == null) {
+      npe = new NullPointerException("firstRow for scanner is null");
+    }
+    if (npe != null) {
+      IOException io = new IOException("Invalid arguments to openScanner");
+      io.initCause(npe);
+      throw io;
+    }
+    super.getRequestCount().incrementAndGet();
+    try {
+      TransactionalRegion r = getTransactionalRegion(regionName);
+      long scannerId = -1L;
+      InternalScanner s = r.getScanner(transactionId, cols, firstRow,
+          timestamp, filter);
+      scannerId = super.addScanner(s);
+      return scannerId;
+    } catch (IOException e) {
+      LOG.error("Error opening scanner (fsOk: " + this.fsOk + ")",
+          RemoteExceptionHandler.checkIOException(e));
+      checkFileSystem();
+      throw e;
+    }
+  }
+
+  public void beginTransaction(final long transactionId, final byte[] regionName)
+      throws IOException {
+    getTransactionalRegion(regionName).beginTransaction(transactionId);
+  }
+
+}

Modified: hadoop/hbase/trunk/src/test/org/apache/hadoop/hbase/HBaseTestCase.java
URL: http://svn.apache.org/viewvc/hadoop/hbase/trunk/src/test/org/apache/hadoop/hbase/HBaseTestCase.java?rev=686650&r1=686649&r2=686650&view=diff
==============================================================================
--- hadoop/hbase/trunk/src/test/org/apache/hadoop/hbase/HBaseTestCase.java (original)
+++ hadoop/hbase/trunk/src/test/org/apache/hadoop/hbase/HBaseTestCase.java Sun Aug 17 15:03:42 2008
@@ -167,9 +167,11 @@
   
   protected HRegion openClosedRegion(final HRegion closedRegion)
   throws IOException {
-    return new HRegion(closedRegion.getBaseDir(), closedRegion.getLog(),
-      closedRegion.getFilesystem(), closedRegion.getConf(),
-      closedRegion.getRegionInfo(), null, null);
+    HRegion r = new HRegion(closedRegion.getBaseDir(), closedRegion.getLog(),
+        closedRegion.getFilesystem(), closedRegion.getConf(),
+        closedRegion.getRegionInfo(), null);
+    r.initialize(null, null);
+    return r;
   }
   
   /**

Added: hadoop/hbase/trunk/src/test/org/apache/hadoop/hbase/client/transactional/StressTestTransactions.java
URL: http://svn.apache.org/viewvc/hadoop/hbase/trunk/src/test/org/apache/hadoop/hbase/client/transactional/StressTestTransactions.java?rev=686650&view=auto
==============================================================================
--- hadoop/hbase/trunk/src/test/org/apache/hadoop/hbase/client/transactional/StressTestTransactions.java (added)
+++ hadoop/hbase/trunk/src/test/org/apache/hadoop/hbase/client/transactional/StressTestTransactions.java Sun Aug 17 15:03:42 2008
@@ -0,0 +1,421 @@
+/**
+ * Copyright 2008 The Apache Software Foundation
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.client.transactional;
+
+import java.io.IOException;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Random;
+import java.util.concurrent.atomic.AtomicBoolean;
+
+import junit.framework.Assert;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.hbase.HBaseClusterTestCase;
+import org.apache.hadoop.hbase.HColumnDescriptor;
+import org.apache.hadoop.hbase.HConstants;
+import org.apache.hadoop.hbase.HTableDescriptor;
+import org.apache.hadoop.hbase.client.HBaseAdmin;
+import org.apache.hadoop.hbase.io.BatchUpdate;
+import org.apache.hadoop.hbase.ipc.TransactionalRegionInterface;
+import org.apache.hadoop.hbase.regionserver.transactional.TransactionalRegionServer;
+import org.apache.hadoop.hbase.util.Bytes;
+
+/**
+ * Stress Test the transaction functionality. This requires to run an
+ * {@link TransactionalRegionServer}. We run many threads doing reads/writes
+ * which may conflict with each other. We have two types of transactions, those
+ * which operate on rows of a single table, and those which operate on rows
+ * across multiple tables. Each transaction type has a modification operation
+ * which changes two values while maintaining the sum. Also each transaction
+ * type has a consistency-check operation which sums all rows and verifies that
+ * the sum is as expected.
+ */
+public class StressTestTransactions extends HBaseClusterTestCase {
+  private static final Log LOG = LogFactory
+      .getLog(StressTestTransactions.class);
+
+  private static final int NUM_TABLES = 3;
+  private static final int NUM_ST_ROWS = 3;
+  private static final int NUM_MT_ROWS = 3;
+  private static final int NUM_TRANSACTIONS_PER_THREAD = 100;
+  private static final int NUM_SINGLE_TABLE_THREADS = 6;
+  private static final int NUM_MULTI_TABLE_THREADS = 6;
+  private static final int PRE_COMMIT_SLEEP = 10;
+  private static final Random RAND = new Random();
+
+  private static final byte[] FAMILY = Bytes.toBytes("family:");
+  private static final byte[] COL = Bytes.toBytes("family:a");
+
+  private HBaseAdmin admin;
+  private TransactionalTable[] tables;
+  private TransactionManager transactionManager;
+
+  /** constructor */
+  public StressTestTransactions() {
+    conf.set(HConstants.REGION_SERVER_CLASS, TransactionalRegionInterface.class
+        .getName());
+    conf.set(HConstants.REGION_SERVER_IMPL, TransactionalRegionServer.class
+        .getName());
+  }
+
+  @Override
+  protected void setUp() throws Exception {
+    super.setUp();
+
+    tables = new TransactionalTable[NUM_TABLES];
+
+    for (int i = 0; i < tables.length; i++) {
+      HTableDescriptor desc = new HTableDescriptor(makeTableName(i));
+      desc.addFamily(new HColumnDescriptor(FAMILY));
+      admin = new HBaseAdmin(conf);
+      admin.createTable(desc);
+      tables[i] = new TransactionalTable(conf, desc.getName());
+    }
+
+    transactionManager = new TransactionManager(conf);
+  }
+
+  private String makeTableName(final int i) {
+    return "table" + i;
+  }
+
+  private void writeInitalValues() throws IOException {
+    for (TransactionalTable table : tables) {
+      for (int i = 0; i < NUM_ST_ROWS; i++) {
+        byte[] row = makeSTRow(i);
+        BatchUpdate b = new BatchUpdate(row);
+        b.put(COL, Bytes.toBytes(SingleTableTransactionThread.INITIAL_VALUE));
+        table.commit(b);
+      }
+      for (int i = 0; i < NUM_MT_ROWS; i++) {
+        byte[] row = makeMTRow(i);
+        BatchUpdate b = new BatchUpdate(row);
+        b.put(COL, Bytes.toBytes(MultiTableTransactionThread.INITIAL_VALUE));
+        table.commit(b);
+      }
+    }
+  }
+
+  private byte[] makeSTRow(final int i) {
+    return Bytes.toBytes("st" + i);
+  }
+
+  private byte[] makeMTRow(final int i) {
+    return Bytes.toBytes("mt" + i);
+  }
+
+  private static int nextThreadNum = 1;
+  private static final AtomicBoolean stopRequest = new AtomicBoolean(false);
+  private static final AtomicBoolean consistencyFailure = new AtomicBoolean(
+      false);
+
+  // Thread which runs transactions
+  abstract class TransactionThread extends Thread {
+    private int numRuns = 0;
+    private int numAborts = 0;
+    private int numUnknowns = 0;
+
+    public TransactionThread(final String namePrefix) {
+      super.setName(namePrefix + "transaction " + nextThreadNum++);
+    }
+
+    @Override
+    public void run() {
+      for (int i = 0; i < NUM_TRANSACTIONS_PER_THREAD; i++) {
+        if (stopRequest.get()) {
+          return;
+        }
+        try {
+          numRuns++;
+          transaction();
+        } catch (UnknownTransactionException e) {
+          numUnknowns++;
+        } catch (IOException e) {
+          throw new RuntimeException(e);
+        } catch (CommitUnsuccessfulException e) {
+          numAborts++;
+        }
+      }
+    }
+
+    protected abstract void transaction() throws IOException,
+        CommitUnsuccessfulException;
+
+    public int getNumAborts() {
+      return numAborts;
+    }
+
+    public int getNumUnknowns() {
+      return numUnknowns;
+    }
+
+    protected void preCommitSleep() {
+      try {
+        Thread.sleep(PRE_COMMIT_SLEEP);
+      } catch (InterruptedException e) {
+        throw new RuntimeException(e);
+      }
+    }
+
+    protected void consistencyFailure() {
+      LOG.fatal("Consistency failure");
+      stopRequest.set(true);
+      consistencyFailure.set(true);
+    }
+
+    /**
+     * Get the numRuns.
+     * 
+     * @return Return the numRuns.
+     */
+    public int getNumRuns() {
+      return numRuns;
+    }
+
+  }
+
+  // Atomically change the value of two rows rows while maintaining the sum.
+  // This should preserve the global sum of the rows, which is also checked
+  // with a transaction.
+  private class SingleTableTransactionThread extends TransactionThread {
+    private static final int INITIAL_VALUE = 10;
+    public static final int TOTAL_SUM = INITIAL_VALUE * NUM_ST_ROWS;
+    private static final int MAX_TRANSFER_AMT = 100;
+
+    private TransactionalTable table;
+    boolean doCheck = false;
+
+    public SingleTableTransactionThread() {
+      super("single table ");
+    }
+
+    @Override
+    protected void transaction() throws IOException,
+        CommitUnsuccessfulException {
+      if (doCheck) {
+        checkTotalSum();
+      } else {
+        doSingleRowChange();
+      }
+      doCheck = !doCheck;
+    }
+
+    private void doSingleRowChange() throws IOException,
+        CommitUnsuccessfulException {
+      table = tables[RAND.nextInt(NUM_TABLES)];
+      int transferAmount = RAND.nextInt(MAX_TRANSFER_AMT * 2)
+          - MAX_TRANSFER_AMT;
+      int row1Index = RAND.nextInt(NUM_ST_ROWS);
+      int row2Index;
+      do {
+        row2Index = RAND.nextInt(NUM_ST_ROWS);
+      } while (row2Index == row1Index);
+      byte[] row1 = makeSTRow(row1Index);
+      byte[] row2 = makeSTRow(row2Index);
+
+      TransactionState transactionState = transactionManager.beginTransaction();
+      int row1Amount = Bytes.toInt(table.get(transactionState, row1, COL)
+          .getValue());
+      int row2Amount = Bytes.toInt(table.get(transactionState, row2, COL)
+          .getValue());
+
+      row1Amount -= transferAmount;
+      row2Amount += transferAmount;
+
+      BatchUpdate update = new BatchUpdate(row1);
+      update.put(COL, Bytes.toBytes(row1Amount));
+      table.commit(transactionState, update);
+      update = new BatchUpdate(row2);
+      update.put(COL, Bytes.toBytes(row2Amount));
+      table.commit(transactionState, update);
+
+      super.preCommitSleep();
+
+      transactionManager.tryCommit(transactionState);
+      LOG.debug("Commited");
+    }
+
+    // Check the table we last mutated
+    private void checkTotalSum() throws IOException,
+        CommitUnsuccessfulException {
+      TransactionState transactionState = transactionManager.beginTransaction();
+      int totalSum = 0;
+      for (int i = 0; i < NUM_ST_ROWS; i++) {
+        totalSum += Bytes.toInt(table.get(transactionState, makeSTRow(i), COL)
+            .getValue());
+      }
+
+      transactionManager.tryCommit(transactionState);
+      if (TOTAL_SUM != totalSum) {
+        super.consistencyFailure();
+      }
+    }
+
+  }
+
+  // Similar to SingleTable, but this time we maintain consistency across tables
+  // rather than rows
+  private class MultiTableTransactionThread extends TransactionThread {
+    private static final int INITIAL_VALUE = 1000;
+    public static final int TOTAL_SUM = INITIAL_VALUE * NUM_TABLES;
+    private static final int MAX_TRANSFER_AMT = 100;
+
+    private byte[] row;
+    boolean doCheck = false;
+
+    public MultiTableTransactionThread() {
+      super("multi table");
+    }
+
+    @Override
+    protected void transaction() throws IOException,
+        CommitUnsuccessfulException {
+      if (doCheck) {
+        checkTotalSum();
+      } else {
+        doSingleRowChange();
+      }
+      doCheck = !doCheck;
+    }
+
+    private void doSingleRowChange() throws IOException,
+        CommitUnsuccessfulException {
+      row = makeMTRow(RAND.nextInt(NUM_MT_ROWS));
+      int transferAmount = RAND.nextInt(MAX_TRANSFER_AMT * 2)
+          - MAX_TRANSFER_AMT;
+      int table1Index = RAND.nextInt(tables.length);
+      int table2Index;
+      do {
+        table2Index = RAND.nextInt(tables.length);
+      } while (table2Index == table1Index);
+
+      TransactionalTable table1 = tables[table1Index];
+      TransactionalTable table2 = tables[table2Index];
+
+      TransactionState transactionState = transactionManager.beginTransaction();
+      int table1Amount = Bytes.toInt(table1.get(transactionState, row, COL)
+          .getValue());
+      int table2Amount = Bytes.toInt(table2.get(transactionState, row, COL)
+          .getValue());
+
+      table1Amount -= transferAmount;
+      table2Amount += transferAmount;
+
+      BatchUpdate update = new BatchUpdate(row);
+      update.put(COL, Bytes.toBytes(table1Amount));
+      table1.commit(transactionState, update);
+
+      update = new BatchUpdate(row);
+      update.put(COL, Bytes.toBytes(table2Amount));
+      table2.commit(transactionState, update);
+
+      super.preCommitSleep();
+
+      transactionManager.tryCommit(transactionState);
+
+      LOG.trace(Bytes.toString(table1.getTableName()) + ": " + table1Amount);
+      LOG.trace(Bytes.toString(table2.getTableName()) + ": " + table2Amount);
+
+    }
+
+    private void checkTotalSum() throws IOException,
+        CommitUnsuccessfulException {
+      TransactionState transactionState = transactionManager.beginTransaction();
+      int totalSum = 0;
+      int[] amounts = new int[tables.length];
+      for (int i = 0; i < tables.length; i++) {
+        int amount = Bytes.toInt(tables[i].get(transactionState, row, COL)
+            .getValue());
+        amounts[i] = amount;
+        totalSum += amount;
+      }
+
+      transactionManager.tryCommit(transactionState);
+
+      for (int i = 0; i < tables.length; i++) {
+        LOG.trace(Bytes.toString(tables[i].getTableName()) + ": " + amounts[i]);
+      }
+
+      if (TOTAL_SUM != totalSum) {
+        super.consistencyFailure();
+      }
+    }
+
+  }
+
+  public void testStressTransactions() throws IOException, InterruptedException {
+    writeInitalValues();
+
+    List<TransactionThread> transactionThreads = new LinkedList<TransactionThread>();
+
+    for (int i = 0; i < NUM_SINGLE_TABLE_THREADS; i++) {
+      TransactionThread transactionThread = new SingleTableTransactionThread();
+      transactionThread.start();
+      transactionThreads.add(transactionThread);
+    }
+
+    for (int i = 0; i < NUM_MULTI_TABLE_THREADS; i++) {
+      TransactionThread transactionThread = new MultiTableTransactionThread();
+      transactionThread.start();
+      transactionThreads.add(transactionThread);
+    }
+
+    for (TransactionThread transactionThread : transactionThreads) {
+      transactionThread.join();
+    }
+
+    for (TransactionThread transactionThread : transactionThreads) {
+      LOG.info(transactionThread.getName() + " done with "
+          + transactionThread.getNumAborts() + " aborts, and "
+          + transactionThread.getNumUnknowns() + " unknown transactions of "
+          + transactionThread.getNumRuns());
+    }
+
+    doFinalConsistencyChecks();
+  }
+
+  private void doFinalConsistencyChecks() throws IOException {
+
+    int[] mtSums = new int[NUM_MT_ROWS];
+    for (int i = 0; i < mtSums.length; i++) {
+      mtSums[i] = 0;
+    }
+
+    for (TransactionalTable table : tables) {
+      int thisTableSum = 0;
+      for (int i = 0; i < NUM_ST_ROWS; i++) {
+        byte[] row = makeSTRow(i);
+        thisTableSum += Bytes.toInt(table.get(row, COL).getValue());
+      }
+      Assert.assertEquals(SingleTableTransactionThread.TOTAL_SUM, thisTableSum);
+
+      for (int i = 0; i < NUM_MT_ROWS; i++) {
+        byte[] row = makeMTRow(i);
+        mtSums[i] += Bytes.toInt(table.get(row, COL).getValue());
+      }
+    }
+
+    for (int mtSum : mtSums) {
+      Assert.assertEquals(MultiTableTransactionThread.TOTAL_SUM, mtSum);
+    }
+  }
+}

Added: hadoop/hbase/trunk/src/test/org/apache/hadoop/hbase/client/transactional/TestTransactions.java
URL: http://svn.apache.org/viewvc/hadoop/hbase/trunk/src/test/org/apache/hadoop/hbase/client/transactional/TestTransactions.java?rev=686650&view=auto
==============================================================================
--- hadoop/hbase/trunk/src/test/org/apache/hadoop/hbase/client/transactional/TestTransactions.java (added)
+++ hadoop/hbase/trunk/src/test/org/apache/hadoop/hbase/client/transactional/TestTransactions.java Sun Aug 17 15:03:42 2008
@@ -0,0 +1,143 @@
+/**
+ * Copyright 2008 The Apache Software Foundation
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.client.transactional;
+
+import java.io.IOException;
+
+import org.apache.hadoop.hbase.HBaseClusterTestCase;
+import org.apache.hadoop.hbase.HColumnDescriptor;
+import org.apache.hadoop.hbase.HConstants;
+import org.apache.hadoop.hbase.HTableDescriptor;
+import org.apache.hadoop.hbase.client.HBaseAdmin;
+import org.apache.hadoop.hbase.io.BatchUpdate;
+import org.apache.hadoop.hbase.io.Cell;
+import org.apache.hadoop.hbase.ipc.TransactionalRegionInterface;
+import org.apache.hadoop.hbase.regionserver.transactional.TransactionalRegionServer;
+import org.apache.hadoop.hbase.util.Bytes;
+
+/**
+ * Test the transaction functionality. This requires to run an
+ * {@link TransactionalRegionServer}.
+ */
+public class TestTransactions extends HBaseClusterTestCase {
+
+  private static final String TABLE_NAME = "table1";
+
+  private static final byte[] FAMILY = Bytes.toBytes("family:");
+  private static final byte[] COL_A = Bytes.toBytes("family:a");
+
+  private static final byte[] ROW1 = Bytes.toBytes("row1");
+  private static final byte[] ROW2 = Bytes.toBytes("row2");
+  private static final byte[] ROW3 = Bytes.toBytes("row3");
+
+  private HBaseAdmin admin;
+  private TransactionalTable table;
+  private TransactionManager transactionManager;
+
+  /** constructor */
+  public TestTransactions() {
+    conf.set(HConstants.REGION_SERVER_CLASS, TransactionalRegionInterface.class
+        .getName());
+    conf.set(HConstants.REGION_SERVER_IMPL, TransactionalRegionServer.class
+        .getName());
+  }
+
+  @Override
+  protected void setUp() throws Exception {
+    super.setUp();
+
+    HTableDescriptor desc = new HTableDescriptor(TABLE_NAME);
+    desc.addFamily(new HColumnDescriptor(FAMILY));
+    admin = new HBaseAdmin(conf);
+    admin.createTable(desc);
+    table = new TransactionalTable(conf, desc.getName());
+
+    transactionManager = new TransactionManager(conf);
+    writeInitalRow();
+  }
+
+  private void writeInitalRow() throws IOException {
+    BatchUpdate update = new BatchUpdate(ROW1);
+    update.put(COL_A, Bytes.toBytes(1));
+    table.commit(update);
+  }
+
+  public void testSimpleTransaction() throws IOException,
+      CommitUnsuccessfulException {
+    TransactionState transactionState = makeTransaction1();
+    transactionManager.tryCommit(transactionState);
+  }
+
+  public void testTwoTransactionsWithoutConflict() throws IOException,
+      CommitUnsuccessfulException {
+    TransactionState transactionState1 = makeTransaction1();
+    TransactionState transactionState2 = makeTransaction2();
+
+    transactionManager.tryCommit(transactionState1);
+    transactionManager.tryCommit(transactionState2);
+  }
+
+  public void testTwoTransactionsWithConflict() throws IOException,
+      CommitUnsuccessfulException {
+    TransactionState transactionState1 = makeTransaction1();
+    TransactionState transactionState2 = makeTransaction2();
+
+    transactionManager.tryCommit(transactionState2);
+
+    try {
+      transactionManager.tryCommit(transactionState1);
+      fail();
+    } catch (CommitUnsuccessfulException e) {
+      // Good
+    }
+  }
+
+  // Read from ROW1,COL_A and put it in ROW2_COLA and ROW3_COLA
+  private TransactionState makeTransaction1() throws IOException {
+    TransactionState transactionState = transactionManager.beginTransaction();
+
+    Cell row1_A = table.get(transactionState, ROW1, COL_A);
+
+    BatchUpdate write1 = new BatchUpdate(ROW2);
+    write1.put(COL_A, row1_A.getValue());
+    table.commit(transactionState, write1);
+
+    BatchUpdate write2 = new BatchUpdate(ROW3);
+    write2.put(COL_A, row1_A.getValue());
+    table.commit(transactionState, write2);
+
+    return transactionState;
+  }
+
+  // Read ROW1,COL_A, increment its (integer) value, write back
+  private TransactionState makeTransaction2() throws IOException {
+    TransactionState transactionState = transactionManager.beginTransaction();
+
+    Cell row1_A = table.get(transactionState, ROW1, COL_A);
+
+    int value = Bytes.toInt(row1_A.getValue());
+
+    BatchUpdate write = new BatchUpdate(ROW1);
+    write.put(COL_A, Bytes.toBytes(value + 1));
+    table.commit(transactionState, write);
+
+    return transactionState;
+  }
+}

Added: hadoop/hbase/trunk/src/test/org/apache/hadoop/hbase/regionserver/transactional/TestHLogRecovery.java
URL: http://svn.apache.org/viewvc/hadoop/hbase/trunk/src/test/org/apache/hadoop/hbase/regionserver/transactional/TestHLogRecovery.java?rev=686650&view=auto
==============================================================================
--- hadoop/hbase/trunk/src/test/org/apache/hadoop/hbase/regionserver/transactional/TestHLogRecovery.java (added)
+++ hadoop/hbase/trunk/src/test/org/apache/hadoop/hbase/regionserver/transactional/TestHLogRecovery.java Sun Aug 17 15:03:42 2008
@@ -0,0 +1,285 @@
+/**
+ * Copyright 2008 The Apache Software Foundation
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.regionserver.transactional;
+
+import java.io.IOException;
+import java.util.Collection;
+import java.util.List;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hbase.HBaseClusterTestCase;
+import org.apache.hadoop.hbase.HColumnDescriptor;
+import org.apache.hadoop.hbase.HConstants;
+import org.apache.hadoop.hbase.HTableDescriptor;
+import org.apache.hadoop.hbase.LocalHBaseCluster;
+import org.apache.hadoop.hbase.client.HBaseAdmin;
+import org.apache.hadoop.hbase.client.HTable;
+import org.apache.hadoop.hbase.client.Scanner;
+import org.apache.hadoop.hbase.client.transactional.CommitUnsuccessfulException;
+import org.apache.hadoop.hbase.client.transactional.TransactionManager;
+import org.apache.hadoop.hbase.client.transactional.TransactionState;
+import org.apache.hadoop.hbase.client.transactional.TransactionalTable;
+import org.apache.hadoop.hbase.io.BatchUpdate;
+import org.apache.hadoop.hbase.ipc.TransactionalRegionInterface;
+import org.apache.hadoop.hbase.regionserver.HRegion;
+import org.apache.hadoop.hbase.regionserver.HRegionServer;
+import org.apache.hadoop.hbase.util.Bytes;
+
+public class TestHLogRecovery extends HBaseClusterTestCase {
+  private static final Log LOG = LogFactory.getLog(TestHLogRecovery.class);
+
+  private static final String TABLE_NAME = "table1";
+
+  private static final byte[] FAMILY = Bytes.toBytes("family:");
+  private static final byte[] COL_A = Bytes.toBytes("family:a");
+
+  private static final byte[] ROW1 = Bytes.toBytes("row1");
+  private static final byte[] ROW2 = Bytes.toBytes("row2");
+  private static final byte[] ROW3 = Bytes.toBytes("row3");
+  private static final int TOTAL_VALUE = 10;
+
+  private HBaseAdmin admin;
+  private TransactionManager transactionManager;
+  private TransactionalTable table;
+
+  /** constructor */
+  public TestHLogRecovery() {
+    super(2, false);
+
+    conf.set(HConstants.REGION_SERVER_CLASS, TransactionalRegionInterface.class
+        .getName());
+    conf.set(HConstants.REGION_SERVER_IMPL, TransactionalRegionServer.class
+        .getName());
+
+    // Set flush params so we don't get any
+    // FIXME (defaults are probably fine)
+
+    // Copied from TestRegionServerExit
+    conf.setInt("ipc.client.connect.max.retries", 5); // reduce ipc retries
+    conf.setInt("ipc.client.timeout", 10000); // and ipc timeout
+    conf.setInt("hbase.client.pause", 10000); // increase client timeout
+    conf.setInt("hbase.client.retries.number", 10); // increase HBase retries
+  }
+
+  @Override
+  protected void setUp() throws Exception {
+    FileSystem.getLocal(conf).delete(new Path(conf.get(HConstants.HBASE_DIR)), true);
+    super.setUp();
+
+    HTableDescriptor desc = new HTableDescriptor(TABLE_NAME);
+    desc.addFamily(new HColumnDescriptor(FAMILY));
+    admin = new HBaseAdmin(conf);
+    admin.createTable(desc);
+    table = new TransactionalTable(conf, desc.getName());
+
+    transactionManager = new TransactionManager(conf);
+    writeInitalRows();
+  }
+
+  private void writeInitalRows() throws IOException {
+    BatchUpdate update = new BatchUpdate(ROW1);
+    update.put(COL_A, Bytes.toBytes(TOTAL_VALUE));
+    table.commit(update);
+    update = new BatchUpdate(ROW2);
+    update.put(COL_A, Bytes.toBytes(0));
+    table.commit(update);
+    update = new BatchUpdate(ROW3);
+    update.put(COL_A, Bytes.toBytes(0));
+    table.commit(update);
+  }
+
+  public void testWithoutFlush() throws IOException,
+      CommitUnsuccessfulException {
+    writeInitalRows();
+    TransactionState state1 = makeTransaction(false);
+    transactionManager.tryCommit(state1);
+    stopOrAbortRegionServer(true);
+
+    Thread t = startVerificationThread(1);
+    t.start();
+    threadDumpingJoin(t);
+  }
+
+  public void testWithFlushBeforeCommit() throws IOException,
+      CommitUnsuccessfulException {
+    writeInitalRows();
+    TransactionState state1 = makeTransaction(false);
+    flushRegionServer();
+    transactionManager.tryCommit(state1);
+    stopOrAbortRegionServer(true);
+
+    Thread t = startVerificationThread(1);
+    t.start();
+    threadDumpingJoin(t);
+  }
+
+  // FIXME, TODO
+  // public void testWithFlushBetweenTransactionWrites() {
+  // fail();
+  // }
+
+  private void flushRegionServer() {
+    List<LocalHBaseCluster.RegionServerThread> regionThreads = cluster
+        .getRegionThreads();
+
+    HRegion region = null;
+    int server = -1;
+    for (int i = 0; i < regionThreads.size() && server == -1; i++) {
+      HRegionServer s = regionThreads.get(i).getRegionServer();
+      Collection<HRegion> regions = s.getOnlineRegions();
+      for (HRegion r : regions) {
+        if (Bytes.equals(r.getTableDesc().getName(), Bytes.toBytes(TABLE_NAME))) {
+          server = i;
+          region = r;
+        }
+      }
+    }
+    if (server == -1) {
+      LOG.fatal("could not find region server serving table region");
+      fail();
+    }
+    ((TransactionalRegionServer) regionThreads.get(server).getRegionServer())
+        .getFlushRequester().request(region);
+  }
+
+  /**
+   * Stop the region server serving TABLE_NAME.
+   * 
+   * @param abort set to true if region server should be aborted, if false it is
+   * just shut down.
+   */
+  private void stopOrAbortRegionServer(final boolean abort) {
+    List<LocalHBaseCluster.RegionServerThread> regionThreads = cluster
+        .getRegionThreads();
+
+    int server = -1;
+    for (int i = 0; i < regionThreads.size(); i++) {
+      HRegionServer s = regionThreads.get(i).getRegionServer();
+      Collection<HRegion> regions = s.getOnlineRegions();
+      LOG.info("server: " + regionThreads.get(i).getName());
+      for (HRegion r : regions) {
+        LOG.info("region: " + r.getRegionInfo().getRegionNameAsString());
+        if (Bytes.equals(r.getTableDesc().getName(), Bytes.toBytes(TABLE_NAME))) {
+          server = i;
+        }
+      }
+    }
+    if (server == -1) {
+      LOG.fatal("could not find region server serving table region");
+      fail();
+    }
+    if (abort) {
+      this.cluster.abortRegionServer(server);
+
+    } else {
+      this.cluster.stopRegionServer(server);
+    }
+    LOG.info(this.cluster.waitOnRegionServer(server) + " has been "
+        + (abort ? "aborted" : "shut down"));
+  }
+
+  private void verify(final int numRuns) throws IOException {
+    // Reads
+    int row1 = Bytes.toInt(table.get(ROW1, COL_A).getValue());
+    int row2 = Bytes.toInt(table.get(ROW2, COL_A).getValue());
+    int row3 = Bytes.toInt(table.get(ROW3, COL_A).getValue());
+
+    assertEquals(TOTAL_VALUE - 2 * numRuns, row1);
+    assertEquals(numRuns, row2);
+    assertEquals(numRuns, row3);
+  }
+
+  // Move 2 out of ROW1 and 1 into ROW2 and 1 into ROW3
+  private TransactionState makeTransaction(final boolean flushMidWay)
+      throws IOException {
+    TransactionState transactionState = transactionManager.beginTransaction();
+
+    // Reads
+    int row1 = Bytes.toInt(table.get(transactionState, ROW1, COL_A).getValue());
+    int row2 = Bytes.toInt(table.get(transactionState, ROW2, COL_A).getValue());
+    int row3 = Bytes.toInt(table.get(transactionState, ROW3, COL_A).getValue());
+
+    row1 -= 2;
+    row2 += 1;
+    row3 += 1;
+
+    if (flushMidWay) {
+      flushRegionServer();
+    }
+
+    // Writes
+    BatchUpdate write = new BatchUpdate(ROW1);
+    write.put(COL_A, Bytes.toBytes(row1));
+    table.commit(transactionState, write);
+
+    write = new BatchUpdate(ROW2);
+    write.put(COL_A, Bytes.toBytes(row2));
+    table.commit(transactionState, write);
+
+    write = new BatchUpdate(ROW3);
+    write.put(COL_A, Bytes.toBytes(row3));
+    table.commit(transactionState, write);
+
+    return transactionState;
+  }
+
+  /*
+   * Run verification in a thread so I can concurrently run a thread-dumper
+   * while we're waiting (because in this test sometimes the meta scanner looks
+   * to be be stuck). @param tableName Name of table to find. @param row Row we
+   * expect to find. @return Verification thread. Caller needs to calls start on
+   * it.
+   */
+  private Thread startVerificationThread(final int numRuns) {
+    Runnable runnable = new Runnable() {
+      public void run() {
+        try {
+          // Now try to open a scanner on the meta table. Should stall until
+          // meta server comes back up.
+          HTable t = new HTable(conf, TABLE_NAME);
+          Scanner s = t.getScanner(new byte[][] { COL_A },
+              HConstants.EMPTY_START_ROW);
+          s.close();
+
+        } catch (IOException e) {
+          LOG.fatal("could not re-open meta table because", e);
+          fail();
+        }
+        Scanner scanner = null;
+        try {
+          verify(numRuns);
+          LOG.info("Success!");
+        } catch (Exception e) {
+          e.printStackTrace();
+          fail();
+        } finally {
+          if (scanner != null) {
+            LOG.info("Closing scanner " + scanner);
+            scanner.close();
+          }
+        }
+      }
+    };
+    return new Thread(runnable);
+  }
+}