You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hbase.apache.org by st...@apache.org on 2008/08/18 00:03:43 UTC

svn commit: r686650 [1/3] - in /hadoop/hbase/trunk: ./ src/java/org/apache/hadoop/hbase/ src/java/org/apache/hadoop/hbase/client/ src/java/org/apache/hadoop/hbase/client/transactional/ src/java/org/apache/hadoop/hbase/ipc/ src/java/org/apache/hadoop/hb...

Author: stack
Date: Sun Aug 17 15:03:42 2008
New Revision: 686650

URL: http://svn.apache.org/viewvc?rev=686650&view=rev
Log:
HBASE-669 MultiRegion transactions with Optimistic Concurrency Control

Added:
    hadoop/hbase/trunk/src/java/org/apache/hadoop/hbase/client/transactional/
    hadoop/hbase/trunk/src/java/org/apache/hadoop/hbase/client/transactional/CommitUnsuccessfulException.java
    hadoop/hbase/trunk/src/java/org/apache/hadoop/hbase/client/transactional/LocalTransactionLogger.java
    hadoop/hbase/trunk/src/java/org/apache/hadoop/hbase/client/transactional/TransactionLogger.java
    hadoop/hbase/trunk/src/java/org/apache/hadoop/hbase/client/transactional/TransactionManager.java
    hadoop/hbase/trunk/src/java/org/apache/hadoop/hbase/client/transactional/TransactionScannerCallable.java
    hadoop/hbase/trunk/src/java/org/apache/hadoop/hbase/client/transactional/TransactionState.java
    hadoop/hbase/trunk/src/java/org/apache/hadoop/hbase/client/transactional/TransactionalTable.java
    hadoop/hbase/trunk/src/java/org/apache/hadoop/hbase/client/transactional/UnknownTransactionException.java
    hadoop/hbase/trunk/src/java/org/apache/hadoop/hbase/client/transactional/package.html
    hadoop/hbase/trunk/src/java/org/apache/hadoop/hbase/ipc/TransactionalRegionInterface.java
    hadoop/hbase/trunk/src/java/org/apache/hadoop/hbase/regionserver/transactional/
    hadoop/hbase/trunk/src/java/org/apache/hadoop/hbase/regionserver/transactional/CleanOldTransactionsChore.java
    hadoop/hbase/trunk/src/java/org/apache/hadoop/hbase/regionserver/transactional/TransactionState.java
    hadoop/hbase/trunk/src/java/org/apache/hadoop/hbase/regionserver/transactional/TransactionalHLogManager.java
    hadoop/hbase/trunk/src/java/org/apache/hadoop/hbase/regionserver/transactional/TransactionalRegion.java
    hadoop/hbase/trunk/src/java/org/apache/hadoop/hbase/regionserver/transactional/TransactionalRegionServer.java
    hadoop/hbase/trunk/src/test/org/apache/hadoop/hbase/client/transactional/
    hadoop/hbase/trunk/src/test/org/apache/hadoop/hbase/client/transactional/StressTestTransactions.java
    hadoop/hbase/trunk/src/test/org/apache/hadoop/hbase/client/transactional/TestTransactions.java
    hadoop/hbase/trunk/src/test/org/apache/hadoop/hbase/regionserver/transactional/
    hadoop/hbase/trunk/src/test/org/apache/hadoop/hbase/regionserver/transactional/TestHLogRecovery.java
    hadoop/hbase/trunk/src/test/org/apache/hadoop/hbase/regionserver/transactional/TestTransactionalHLogManager.java
Modified:
    hadoop/hbase/trunk/CHANGES.txt
    hadoop/hbase/trunk/src/java/org/apache/hadoop/hbase/HMerge.java
    hadoop/hbase/trunk/src/java/org/apache/hadoop/hbase/client/HTable.java
    hadoop/hbase/trunk/src/java/org/apache/hadoop/hbase/client/ServerCallable.java
    hadoop/hbase/trunk/src/java/org/apache/hadoop/hbase/regionserver/HLog.java
    hadoop/hbase/trunk/src/java/org/apache/hadoop/hbase/regionserver/HLogEdit.java
    hadoop/hbase/trunk/src/java/org/apache/hadoop/hbase/regionserver/HLogKey.java
    hadoop/hbase/trunk/src/java/org/apache/hadoop/hbase/regionserver/HRegion.java
    hadoop/hbase/trunk/src/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java
    hadoop/hbase/trunk/src/java/org/apache/hadoop/hbase/regionserver/HStore.java
    hadoop/hbase/trunk/src/test/org/apache/hadoop/hbase/HBaseTestCase.java

Modified: hadoop/hbase/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/hadoop/hbase/trunk/CHANGES.txt?rev=686650&r1=686649&r2=686650&view=diff
==============================================================================
--- hadoop/hbase/trunk/CHANGES.txt (original)
+++ hadoop/hbase/trunk/CHANGES.txt Sun Aug 17 15:03:42 2008
@@ -49,6 +49,8 @@
               were present in previous versions of the patches for this issue,
               but not in the version that was committed. Also fix a number of
               compilation problems that were introduced by patch.
+   HBASE-669  MultiRegion transactions with Optimistic Concurrency Control
+              (Clint Morgan via Stack)
 
   OPTIMIZATIONS
 

Modified: hadoop/hbase/trunk/src/java/org/apache/hadoop/hbase/HMerge.java
URL: http://svn.apache.org/viewvc/hadoop/hbase/trunk/src/java/org/apache/hadoop/hbase/HMerge.java?rev=686650&r1=686649&r2=686650&view=diff
==============================================================================
--- hadoop/hbase/trunk/src/java/org/apache/hadoop/hbase/HMerge.java (original)
+++ hadoop/hbase/trunk/src/java/org/apache/hadoop/hbase/HMerge.java Sun Aug 17 15:03:42 2008
@@ -150,12 +150,13 @@
       for (int i = 0; i < info.length - 1; i++) {
         if (currentRegion == null) {
           currentRegion =
-            new HRegion(tabledir, hlog, fs, conf, info[i], null, null);
+            new HRegion(tabledir, hlog, fs, conf, info[i], null);
+          currentRegion.initialize(null, null);
           currentSize = currentRegion.getLargestHStoreSize();
         }
         nextRegion =
-          new HRegion(tabledir, hlog, fs, conf, info[i + 1], null, null);
-
+          new HRegion(tabledir, hlog, fs, conf, info[i + 1], null);
+        nextRegion.initialize(null, null);
         nextSize = nextRegion.getLargestHStoreSize();
 
         if ((currentSize + nextSize) <= (maxFilesize / 2)) {
@@ -322,7 +323,8 @@
       // Scan root region to find all the meta regions
       
       root = new HRegion(rootTableDir, hlog, fs, conf,
-          HRegionInfo.ROOT_REGIONINFO, null, null);
+          HRegionInfo.ROOT_REGIONINFO, null);
+      root.initialize(null, null);
 
       InternalScanner rootScanner = 
         root.getScanner(COL_REGIONINFO_ARRAY, HConstants.EMPTY_START_ROW, 

Modified: hadoop/hbase/trunk/src/java/org/apache/hadoop/hbase/client/HTable.java
URL: http://svn.apache.org/viewvc/hadoop/hbase/trunk/src/java/org/apache/hadoop/hbase/client/HTable.java?rev=686650&r1=686649&r2=686650&view=diff
==============================================================================
--- hadoop/hbase/trunk/src/java/org/apache/hadoop/hbase/client/HTable.java (original)
+++ hadoop/hbase/trunk/src/java/org/apache/hadoop/hbase/client/HTable.java Sun Aug 17 15:03:42 2008
@@ -982,7 +982,9 @@
   public Scanner getScanner(final byte [][] columns,
     final byte [] startRow, long timestamp, RowFilterInterface filter)
   throws IOException {
-    return new ClientScanner(columns, startRow, timestamp, filter);
+    ClientScanner s = new ClientScanner(columns, startRow, timestamp, filter);
+    s.initialize();
+    return s;
   }
   
   /**
@@ -1335,15 +1337,13 @@
     protected RowFilterInterface filter;
     
     protected ClientScanner(final Text [] columns, final Text startRow,
-        long timestamp, RowFilterInterface filter)
-    throws IOException {
+        long timestamp, RowFilterInterface filter) {
       this(Bytes.toByteArrays(columns), startRow.getBytes(), timestamp,
         filter);
     }
 
     protected ClientScanner(final byte[][] columns, final byte [] startRow,
-        final long timestamp, final RowFilterInterface filter) 
-    throws IOException {
+        final long timestamp, final RowFilterInterface filter) {
       if (CLIENT_LOG.isDebugEnabled()) {
         CLIENT_LOG.debug("Creating scanner over " + Bytes.toString(getTableName()) +
           " starting at key '" + Bytes.toString(startRow) + "'");
@@ -1359,6 +1359,9 @@
       if (filter != null) {
         filter.validate(columns);
       }
+    }
+   
+    public void initialize() throws IOException {
       nextScanner();
     }
     

Modified: hadoop/hbase/trunk/src/java/org/apache/hadoop/hbase/client/ServerCallable.java
URL: http://svn.apache.org/viewvc/hadoop/hbase/trunk/src/java/org/apache/hadoop/hbase/client/ServerCallable.java?rev=686650&r1=686649&r2=686650&view=diff
==============================================================================
--- hadoop/hbase/trunk/src/java/org/apache/hadoop/hbase/client/ServerCallable.java (original)
+++ hadoop/hbase/trunk/src/java/org/apache/hadoop/hbase/client/ServerCallable.java Sun Aug 17 15:03:42 2008
@@ -60,11 +60,17 @@
 
   /** @return the server name */
   public String getServerName() {
+    if (location == null) {
+      return null;
+    }
     return location.getServerAddress().toString();
   }
   
   /** @return the region name */
-  public byte [] getRegionName() {
+  public byte[] getRegionName() {
+    if (location == null) {
+      return null;
+    }
     return location.getRegionInfo().getRegionName();
   }
   

Added: hadoop/hbase/trunk/src/java/org/apache/hadoop/hbase/client/transactional/CommitUnsuccessfulException.java
URL: http://svn.apache.org/viewvc/hadoop/hbase/trunk/src/java/org/apache/hadoop/hbase/client/transactional/CommitUnsuccessfulException.java?rev=686650&view=auto
==============================================================================
--- hadoop/hbase/trunk/src/java/org/apache/hadoop/hbase/client/transactional/CommitUnsuccessfulException.java (added)
+++ hadoop/hbase/trunk/src/java/org/apache/hadoop/hbase/client/transactional/CommitUnsuccessfulException.java Sun Aug 17 15:03:42 2008
@@ -0,0 +1,43 @@
+/**
+ * Copyright 2008 The Apache Software Foundation
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.client.transactional;
+
+/** Thrown when a transaction cannot be committed. 
+ * 
+ */
+public class CommitUnsuccessfulException extends Exception {
+
+  public CommitUnsuccessfulException() {
+    super();
+  }
+
+  public CommitUnsuccessfulException(String arg0, Throwable arg1) {
+    super(arg0, arg1);
+  }
+
+  public CommitUnsuccessfulException(String arg0) {
+    super(arg0);
+  }
+
+  public CommitUnsuccessfulException(Throwable arg0) {
+    super(arg0);
+  }
+
+}

Added: hadoop/hbase/trunk/src/java/org/apache/hadoop/hbase/client/transactional/LocalTransactionLogger.java
URL: http://svn.apache.org/viewvc/hadoop/hbase/trunk/src/java/org/apache/hadoop/hbase/client/transactional/LocalTransactionLogger.java?rev=686650&view=auto
==============================================================================
--- hadoop/hbase/trunk/src/java/org/apache/hadoop/hbase/client/transactional/LocalTransactionLogger.java (added)
+++ hadoop/hbase/trunk/src/java/org/apache/hadoop/hbase/client/transactional/LocalTransactionLogger.java Sun Aug 17 15:03:42 2008
@@ -0,0 +1,66 @@
+/**
+ * Copyright 2008 The Apache Software Foundation
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.client.transactional;
+
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Random;
+
+/**
+ * A local, in-memory implementation of the transaction logger. Does not provide a global view, so 
+ * it can't be relighed on by 
+ * 
+ */
+public class LocalTransactionLogger implements TransactionLogger {
+
+  private static LocalTransactionLogger instance;
+  
+  public synchronized static LocalTransactionLogger getInstance() {
+    if (instance == null) {
+      instance = new LocalTransactionLogger();
+    }
+    return instance;
+  }
+  
+  private Random random = new Random();
+  private Map<Long, TransactionStatus> transactionIdToStatusMap = Collections
+      .synchronizedMap(new HashMap<Long, TransactionStatus>());
+
+  private LocalTransactionLogger() {
+    // Enforce singlton
+  }
+  
+  // Gives back random longs to minimize possibility of collision
+  public long createNewTransactionLog() {
+    long id = random.nextLong();
+    transactionIdToStatusMap.put(id, TransactionStatus.PENDING);
+    return id;
+  }
+
+  public TransactionStatus getStatusForTransaction(final long transactionId) {
+    return transactionIdToStatusMap.get(transactionId);
+  }
+
+  public void setStatusForTransaction(final long transactionId,
+      final TransactionStatus status) {
+    transactionIdToStatusMap.put(transactionId, status);
+  }
+}

Added: hadoop/hbase/trunk/src/java/org/apache/hadoop/hbase/client/transactional/TransactionLogger.java
URL: http://svn.apache.org/viewvc/hadoop/hbase/trunk/src/java/org/apache/hadoop/hbase/client/transactional/TransactionLogger.java?rev=686650&view=auto
==============================================================================
--- hadoop/hbase/trunk/src/java/org/apache/hadoop/hbase/client/transactional/TransactionLogger.java (added)
+++ hadoop/hbase/trunk/src/java/org/apache/hadoop/hbase/client/transactional/TransactionLogger.java Sun Aug 17 15:03:42 2008
@@ -0,0 +1,45 @@
+/**
+ * Copyright 2008 The Apache Software Foundation
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.client.transactional;
+
+/**
+ * Simple interface used to provide a log about transaction status. Written to
+ * by the client, and read by regionservers in case of failure. 
+ * 
+ */
+public interface TransactionLogger {
+
+  enum TransactionStatus {
+    PENDING, COMMITTED, ABORTED
+  }
+
+  /**
+   * Create a new transaction log. Return the transaction's globally unique id.
+   * Log's initial value should be PENDING
+   * 
+   * @return transaction id
+   */
+  long createNewTransactionLog();
+
+  TransactionStatus getStatusForTransaction(long transactionId);
+
+  void setStatusForTransaction(long transactionId, TransactionStatus status);
+
+}

Added: hadoop/hbase/trunk/src/java/org/apache/hadoop/hbase/client/transactional/TransactionManager.java
URL: http://svn.apache.org/viewvc/hadoop/hbase/trunk/src/java/org/apache/hadoop/hbase/client/transactional/TransactionManager.java?rev=686650&view=auto
==============================================================================
--- hadoop/hbase/trunk/src/java/org/apache/hadoop/hbase/client/transactional/TransactionManager.java (added)
+++ hadoop/hbase/trunk/src/java/org/apache/hadoop/hbase/client/transactional/TransactionManager.java Sun Aug 17 15:03:42 2008
@@ -0,0 +1,145 @@
+/**
+ * Copyright 2008 The Apache Software Foundation
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.client.transactional;
+
+import java.io.IOException;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.hbase.HBaseConfiguration;
+import org.apache.hadoop.hbase.HRegionLocation;
+import org.apache.hadoop.hbase.client.HConnection;
+import org.apache.hadoop.hbase.client.HConnectionManager;
+import org.apache.hadoop.hbase.ipc.TransactionalRegionInterface;
+import org.apache.hadoop.ipc.RemoteException;
+
+/**
+ * Transaction Manager. Responsible for committing transactions.
+ * 
+ */
+public class TransactionManager {
+  static final Log LOG = LogFactory.getLog(TransactionManager.class);
+
+  private final HConnection connection;
+  private final TransactionLogger transactionLogger;
+
+  public TransactionManager(final HBaseConfiguration conf) {
+    this(LocalTransactionLogger.getInstance(), conf);
+  }
+
+  public TransactionManager(final TransactionLogger transactionLogger,
+      final HBaseConfiguration conf) {
+    this.transactionLogger = transactionLogger;
+    connection = HConnectionManager.getConnection(conf);
+  }
+
+  /**
+   * Called to start a transaction.
+   * 
+   * @return new transaction state
+   */
+  public TransactionState beginTransaction() {
+    long transactionId = transactionLogger.createNewTransactionLog();
+    LOG.debug("Begining transaction " + transactionId);
+    return new TransactionState(transactionId);
+  }
+
+  /**
+   * Try and commit a transaction.
+   * 
+   * @param transactionState
+   * @return
+   * @throws IOException
+   */
+  public void tryCommit(final TransactionState transactionState)
+      throws CommitUnsuccessfulException, IOException {
+    LOG.debug("atempting to commit trasaction: " + transactionState.toString());
+
+    try {
+      for (HRegionLocation location : transactionState
+          .getParticipatingRegions()) {
+        TransactionalRegionInterface transactionalRegionServer = (TransactionalRegionInterface) connection
+            .getHRegionConnection(location.getServerAddress());
+        boolean canCommit = transactionalRegionServer.commitRequest(location
+            .getRegionInfo().getRegionName(), transactionState
+            .getTransactionId());
+        if (LOG.isTraceEnabled()) {
+          LOG.trace("Region ["
+              + location.getRegionInfo().getRegionNameAsString() + "] votes "
+              + (canCommit ? "to commit" : "to abort") + " transaction "
+              + transactionState.getTransactionId());
+        }
+
+        if (!canCommit) {
+          LOG.debug("Aborting [" + transactionState.getTransactionId() + "]");
+          abort(transactionState, location);
+          throw new CommitUnsuccessfulException();
+        }
+      }
+
+      LOG.debug("Commiting [" + transactionState.getTransactionId() + "]");
+
+      transactionLogger.setStatusForTransaction(transactionState
+          .getTransactionId(), TransactionLogger.TransactionStatus.COMMITTED);
+
+      for (HRegionLocation location : transactionState
+          .getParticipatingRegions()) {
+        TransactionalRegionInterface transactionalRegionServer = (TransactionalRegionInterface) connection
+            .getHRegionConnection(location.getServerAddress());
+        transactionalRegionServer.commit(location.getRegionInfo()
+            .getRegionName(), transactionState.getTransactionId());
+      }
+    } catch (RemoteException e) {
+      LOG.debug("Commit of transaction [" + transactionState.getTransactionId()
+          + "] was unsucsessful", e);
+      // FIXME, think about the what ifs
+      throw new CommitUnsuccessfulException(e);
+    }
+    // Tran log can be deleted now ...
+  }
+
+  /**
+   * Abort a s transaction.
+   * 
+   * @param transactionState
+   * @throws IOException
+   */
+  public void abort(final TransactionState transactionState) throws IOException {
+    abort(transactionState, null);
+  }
+
+  private void abort(final TransactionState transactionState,
+      final HRegionLocation locationToIgnore) throws IOException {
+    transactionLogger.setStatusForTransaction(transactionState
+        .getTransactionId(), TransactionLogger.TransactionStatus.ABORTED);
+
+    for (HRegionLocation location : transactionState.getParticipatingRegions()) {
+      if (locationToIgnore != null && location.equals(locationToIgnore)) {
+        continue;
+      }
+
+      TransactionalRegionInterface transactionalRegionServer = (TransactionalRegionInterface) connection
+          .getHRegionConnection(location.getServerAddress());
+
+      transactionalRegionServer.abort(location.getRegionInfo().getRegionName(),
+          transactionState.getTransactionId());
+    }
+  }
+}

Added: hadoop/hbase/trunk/src/java/org/apache/hadoop/hbase/client/transactional/TransactionScannerCallable.java
URL: http://svn.apache.org/viewvc/hadoop/hbase/trunk/src/java/org/apache/hadoop/hbase/client/transactional/TransactionScannerCallable.java?rev=686650&view=auto
==============================================================================
--- hadoop/hbase/trunk/src/java/org/apache/hadoop/hbase/client/transactional/TransactionScannerCallable.java (added)
+++ hadoop/hbase/trunk/src/java/org/apache/hadoop/hbase/client/transactional/TransactionScannerCallable.java Sun Aug 17 15:03:42 2008
@@ -0,0 +1,51 @@
+/**
+ * Copyright 2008 The Apache Software Foundation
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.client.transactional;
+
+import java.io.IOException;
+
+import org.apache.hadoop.hbase.client.HConnection;
+import org.apache.hadoop.hbase.client.ScannerCallable;
+import org.apache.hadoop.hbase.filter.RowFilterInterface;
+import org.apache.hadoop.hbase.ipc.TransactionalRegionInterface;
+
+class TransactionScannerCallable extends ScannerCallable {
+
+  private TransactionState transactionState;
+
+  TransactionScannerCallable(final TransactionState transactionState,
+      final HConnection connection, final byte[] tableName,
+      final byte[][] columns, final byte[] startRow, final long timestamp,
+      final RowFilterInterface filter) {
+    super(connection, tableName, columns, startRow, timestamp, filter);
+    this.transactionState = transactionState;
+  }
+
+  @Override
+  protected long openScanner() throws IOException {
+    if (transactionState.addRegion(location)) {
+      ((TransactionalRegionInterface) server).beginTransaction(transactionState
+          .getTransactionId(), location.getRegionInfo().getRegionName());
+    }
+    return ((TransactionalRegionInterface) server).openScanner(transactionState
+        .getTransactionId(), this.location.getRegionInfo().getRegionName(),
+        getColumns(), row, getTimestamp(), getFilter());
+  }
+}

Added: hadoop/hbase/trunk/src/java/org/apache/hadoop/hbase/client/transactional/TransactionState.java
URL: http://svn.apache.org/viewvc/hadoop/hbase/trunk/src/java/org/apache/hadoop/hbase/client/transactional/TransactionState.java?rev=686650&view=auto
==============================================================================
--- hadoop/hbase/trunk/src/java/org/apache/hadoop/hbase/client/transactional/TransactionState.java (added)
+++ hadoop/hbase/trunk/src/java/org/apache/hadoop/hbase/client/transactional/TransactionState.java Sun Aug 17 15:03:42 2008
@@ -0,0 +1,75 @@
+/**
+ * Copyright 2008 The Apache Software Foundation
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.client.transactional;
+
+import java.util.HashSet;
+import java.util.Set;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.hbase.HRegionLocation;
+
+/**
+ * Holds client-side transaction information. Client's use them as opaque
+ * objects passed around to transaction operations.
+ * 
+ */
+public class TransactionState {
+  static final Log LOG = LogFactory.getLog(TransactionState.class);
+
+  private final long transactionId;
+
+  private Set<HRegionLocation> participatingRegions = new HashSet<HRegionLocation>();
+
+  TransactionState(final long transactionId) {
+    this.transactionId = transactionId;
+  }
+
+  boolean addRegion(final HRegionLocation hregion) {
+    boolean added = participatingRegions.add(hregion);
+
+    if (added) {
+      LOG.debug("Adding new hregion ["
+          + hregion.getRegionInfo().getRegionNameAsString()
+          + "] to transaction [" + transactionId + "]");
+    }
+
+    return added;
+  }
+
+  Set<HRegionLocation> getParticipatingRegions() {
+    return participatingRegions;
+  }
+
+  /**
+   * Get the transactionId.
+   * 
+   * @return Return the transactionId.
+   */
+  public long getTransactionId() {
+    return transactionId;
+  }
+
+  @Override
+  public String toString() {
+    return "id: " + transactionId + ", particpants: "
+        + participatingRegions.size();
+  }
+}

Added: hadoop/hbase/trunk/src/java/org/apache/hadoop/hbase/client/transactional/TransactionalTable.java
URL: http://svn.apache.org/viewvc/hadoop/hbase/trunk/src/java/org/apache/hadoop/hbase/client/transactional/TransactionalTable.java?rev=686650&view=auto
==============================================================================
--- hadoop/hbase/trunk/src/java/org/apache/hadoop/hbase/client/transactional/TransactionalTable.java (added)
+++ hadoop/hbase/trunk/src/java/org/apache/hadoop/hbase/client/transactional/TransactionalTable.java Sun Aug 17 15:03:42 2008
@@ -0,0 +1,401 @@
+/**
+ * Copyright 2008 The Apache Software Foundation
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.client.transactional;
+
+import java.io.IOException;
+
+import org.apache.hadoop.hbase.HBaseConfiguration;
+import org.apache.hadoop.hbase.HConstants;
+import org.apache.hadoop.hbase.client.HConnection;
+import org.apache.hadoop.hbase.client.HTable;
+import org.apache.hadoop.hbase.client.Scanner;
+import org.apache.hadoop.hbase.client.ScannerCallable;
+import org.apache.hadoop.hbase.client.ServerCallable;
+import org.apache.hadoop.hbase.filter.RowFilterInterface;
+import org.apache.hadoop.hbase.io.BatchUpdate;
+import org.apache.hadoop.hbase.io.Cell;
+import org.apache.hadoop.hbase.io.RowResult;
+import org.apache.hadoop.hbase.ipc.TransactionalRegionInterface;
+import org.apache.hadoop.io.Text;
+
+/**
+ * Table with transactional support.
+ * 
+ */
+public class TransactionalTable extends HTable {
+
+  public TransactionalTable(final HBaseConfiguration conf,
+      final String tableName) throws IOException {
+    super(conf, tableName);
+  }
+
+  public TransactionalTable(final HBaseConfiguration conf, final Text tableName)
+      throws IOException {
+    super(conf, tableName);
+  }
+
+  public TransactionalTable(final HBaseConfiguration conf,
+      final byte[] tableName) throws IOException {
+    super(conf, tableName);
+  }
+
+  private static abstract class TransactionalServerCallable<T> extends
+      ServerCallable<T> {
+    protected TransactionState transactionState;
+
+    protected TransactionalRegionInterface getTransactionServer() {
+      return (TransactionalRegionInterface) server;
+    }
+
+    protected void recordServer() throws IOException {
+      if (transactionState.addRegion(location)) {
+        getTransactionServer().beginTransaction(
+            transactionState.getTransactionId(),
+            location.getRegionInfo().getRegionName());
+      }
+    }
+
+    public TransactionalServerCallable(final HConnection connection,
+        final byte[] tableName, final byte[] row,
+        final TransactionState transactionState) {
+      super(connection, tableName, row);
+      this.transactionState = transactionState;
+    }
+
+  }
+
+  /**
+   * Get a single value for the specified row and column
+   * 
+   * @param row row key
+   * @param column column name
+   * @return value for specified row/column
+   * @throws IOException
+   */
+  public Cell get(final TransactionState transactionState, final byte[] row,
+      final byte[] column) throws IOException {
+    return super.getConnection().getRegionServerWithRetries(
+        new TransactionalServerCallable<Cell>(super.getConnection(), super
+            .getTableName(), row, transactionState) {
+          public Cell call() throws IOException {
+            recordServer();
+            return getTransactionServer().get(
+                transactionState.getTransactionId(),
+                location.getRegionInfo().getRegionName(), row, column);
+          }
+        });
+  }
+
+  /**
+   * Get the specified number of versions of the specified row and column
+   * 
+   * @param row - row key
+   * @param column - column name
+   * @param numVersions - number of versions to retrieve
+   * @return - array byte values
+   * @throws IOException
+   */
+  public Cell[] get(final TransactionState transactionState, final byte[] row,
+      final byte[] column, final int numVersions) throws IOException {
+    Cell[] values = null;
+    values = super.getConnection().getRegionServerWithRetries(
+        new TransactionalServerCallable<Cell[]>(super.getConnection(), super
+            .getTableName(), row, transactionState) {
+          public Cell[] call() throws IOException {
+            recordServer();
+            return getTransactionServer().get(
+                transactionState.getTransactionId(),
+                location.getRegionInfo().getRegionName(), row, column,
+                numVersions);
+          }
+        });
+
+    return values;
+  }
+
+  /**
+   * Get the specified number of versions of the specified row and column with
+   * the specified timestamp.
+   * 
+   * @param row - row key
+   * @param column - column name
+   * @param timestamp - timestamp
+   * @param numVersions - number of versions to retrieve
+   * @return - array of values that match the above criteria
+   * @throws IOException
+   */
+  public Cell[] get(final TransactionState transactionState, final byte[] row,
+      final byte[] column, final long timestamp, final int numVersions)
+      throws IOException {
+    Cell[] values = null;
+    values = super.getConnection().getRegionServerWithRetries(
+        new TransactionalServerCallable<Cell[]>(super.getConnection(), super
+            .getTableName(), row, transactionState) {
+          public Cell[] call() throws IOException {
+            recordServer();
+            return getTransactionServer().get(
+                transactionState.getTransactionId(),
+                location.getRegionInfo().getRegionName(), row, column,
+                timestamp, numVersions);
+          }
+        });
+
+    return values;
+  }
+
+  /**
+   * Get all the data for the specified row at the latest timestamp
+   * 
+   * @param row row key
+   * @return RowResult is empty if row does not exist.
+   * @throws IOException
+   */
+  public RowResult getRow(final TransactionState transactionState,
+      final byte[] row) throws IOException {
+    return getRow(transactionState, row, HConstants.LATEST_TIMESTAMP);
+  }
+
+  /**
+   * Get all the data for the specified row at a specified timestamp
+   * 
+   * @param row row key
+   * @param ts timestamp
+   * @return RowResult is empty if row does not exist.
+   * @throws IOException
+   */
+  public RowResult getRow(final TransactionState transactionState,
+      final byte[] row, final long ts) throws IOException {
+    return super.getConnection().getRegionServerWithRetries(
+        new TransactionalServerCallable<RowResult>(super.getConnection(), super
+            .getTableName(), row, transactionState) {
+          public RowResult call() throws IOException {
+            recordServer();
+            return getTransactionServer().getRow(
+                transactionState.getTransactionId(),
+                location.getRegionInfo().getRegionName(), row, ts);
+          }
+        });
+  }
+
+  /**
+   * Get selected columns for the specified row at the latest timestamp
+   * 
+   * @param row row key
+   * @param columns Array of column names you want to retrieve.
+   * @return RowResult is empty if row does not exist.
+   * @throws IOException
+   */
+  public RowResult getRow(final TransactionState transactionState,
+      final byte[] row, final byte[][] columns) throws IOException {
+    return getRow(transactionState, row, columns, HConstants.LATEST_TIMESTAMP);
+  }
+
+  /**
+   * Get selected columns for the specified row at a specified timestamp
+   * 
+   * @param row row key
+   * @param columns Array of column names you want to retrieve.
+   * @param ts timestamp
+   * @return RowResult is empty if row does not exist.
+   * @throws IOException
+   */
+  public RowResult getRow(final TransactionState transactionState,
+      final byte[] row, final byte[][] columns, final long ts)
+      throws IOException {
+    return super.getConnection().getRegionServerWithRetries(
+        new TransactionalServerCallable<RowResult>(super.getConnection(), super
+            .getTableName(), row, transactionState) {
+          public RowResult call() throws IOException {
+            recordServer();
+            return getTransactionServer().getRow(
+                transactionState.getTransactionId(),
+                location.getRegionInfo().getRegionName(), row, columns, ts);
+          }
+        });
+  }
+
+  /**
+   * Delete all cells that match the passed row and whose timestamp is equal-to
+   * or older than the passed timestamp.
+   * 
+   * @param row Row to update
+   * @param column name of column whose value is to be deleted
+   * @param ts Delete all cells of the same timestamp or older.
+   * @throws IOException
+   */
+  public void deleteAll(final TransactionState transactionState,
+      final byte[] row, final long ts) throws IOException {
+    super.getConnection().getRegionServerWithRetries(
+        new TransactionalServerCallable<Boolean>(super.getConnection(), super
+            .getTableName(), row, transactionState) {
+          public Boolean call() throws IOException {
+            recordServer();
+            getTransactionServer().deleteAll(
+                transactionState.getTransactionId(),
+                location.getRegionInfo().getRegionName(), row, ts);
+            return null;
+          }
+        });
+  }
+
+  /**
+   * Get a scanner on the current table starting at first row. Return the
+   * specified columns.
+   * 
+   * @param columns columns to scan. If column name is a column family, all
+   * columns of the specified column family are returned. Its also possible to
+   * pass a regex in the column qualifier. A column qualifier is judged to be a
+   * regex if it contains at least one of the following characters:
+   * <code>\+|^&*$[]]}{)(</code>.
+   * @return scanner
+   * @throws IOException
+   */
+  public Scanner getScanner(final TransactionState transactionState,
+      final byte[][] columns) throws IOException {
+    return getScanner(transactionState, columns, HConstants.EMPTY_START_ROW,
+        HConstants.LATEST_TIMESTAMP, null);
+  }
+
+  /**
+   * Get a scanner on the current table starting at the specified row. Return
+   * the specified columns.
+   * 
+   * @param columns columns to scan. If column name is a column family, all
+   * columns of the specified column family are returned. Its also possible to
+   * pass a regex in the column qualifier. A column qualifier is judged to be a
+   * regex if it contains at least one of the following characters:
+   * <code>\+|^&*$[]]}{)(</code>.
+   * @param startRow starting row in table to scan
+   * @return scanner
+   * @throws IOException
+   */
+  public Scanner getScanner(final TransactionState transactionState,
+      final byte[][] columns, final byte[] startRow) throws IOException {
+    return getScanner(transactionState, columns, startRow,
+        HConstants.LATEST_TIMESTAMP, null);
+  }
+
+  /**
+   * Get a scanner on the current table starting at the specified row. Return
+   * the specified columns.
+   * 
+   * @param columns columns to scan. If column name is a column family, all
+   * columns of the specified column family are returned. Its also possible to
+   * pass a regex in the column qualifier. A column qualifier is judged to be a
+   * regex if it contains at least one of the following characters:
+   * <code>\+|^&*$[]]}{)(</code>.
+   * @param startRow starting row in table to scan
+   * @param timestamp only return results whose timestamp <= this value
+   * @return scanner
+   * @throws IOException
+   */
+  public Scanner getScanner(final TransactionState transactionState,
+      final byte[][] columns, final byte[] startRow, final long timestamp)
+      throws IOException {
+    return getScanner(transactionState, columns, startRow, timestamp, null);
+  }
+
+  /**
+   * Get a scanner on the current table starting at the specified row. Return
+   * the specified columns.
+   * 
+   * @param columns columns to scan. If column name is a column family, all
+   * columns of the specified column family are returned. Its also possible to
+   * pass a regex in the column qualifier. A column qualifier is judged to be a
+   * regex if it contains at least one of the following characters:
+   * <code>\+|^&*$[]]}{)(</code>.
+   * @param startRow starting row in table to scan
+   * @param filter a row filter using row-key regexp and/or column data filter.
+   * @return scanner
+   * @throws IOException
+   */
+  public Scanner getScanner(final TransactionState transactionState,
+      final byte[][] columns, final byte[] startRow,
+      final RowFilterInterface filter) throws IOException {
+    return getScanner(transactionState, columns, startRow,
+        HConstants.LATEST_TIMESTAMP, filter);
+  }
+
+  /**
+   * Get a scanner on the current table starting at the specified row. Return
+   * the specified columns.
+   * 
+   * @param columns columns to scan. If column name is a column family, all
+   * columns of the specified column family are returned. Its also possible to
+   * pass a regex in the column qualifier. A column qualifier is judged to be a
+   * regex if it contains at least one of the following characters:
+   * <code>\+|^&*$[]]}{)(</code>.
+   * @param startRow starting row in table to scan
+   * @param timestamp only return results whose timestamp <= this value
+   * @param filter a row filter using row-key regexp and/or column data filter.
+   * @return scanner
+   * @throws IOException
+   */
+  public Scanner getScanner(final TransactionState transactionState,
+      final byte[][] columns, final byte[] startRow, final long timestamp,
+      final RowFilterInterface filter) throws IOException {
+    ClientScanner scanner = new TransactionalClientScanner(transactionState, columns, startRow,
+        timestamp, filter);
+    scanner.initialize();
+    return scanner;
+  }
+
+  /**
+   * Commit a BatchUpdate to the table.
+   * 
+   * @param batchUpdate
+   * @throws IOException
+   */
+  public synchronized void commit(final TransactionState transactionState,
+      final BatchUpdate batchUpdate) throws IOException {
+    super.getConnection().getRegionServerWithRetries(
+        new TransactionalServerCallable<Boolean>(super.getConnection(), super
+            .getTableName(), batchUpdate.getRow(), transactionState) {
+          public Boolean call() throws IOException {
+            recordServer();
+            getTransactionServer().batchUpdate(
+                transactionState.getTransactionId(),
+                location.getRegionInfo().getRegionName(), batchUpdate);
+            return null;
+          }
+        });
+  }
+
+  protected class TransactionalClientScanner extends HTable.ClientScanner {
+
+    private TransactionState transactionState;
+
+    protected TransactionalClientScanner(
+        final TransactionState transactionState, final byte[][] columns,
+        final byte[] startRow, final long timestamp,
+        final RowFilterInterface filter) {
+      super(columns, startRow, timestamp, filter);
+      this.transactionState = transactionState;
+    }
+
+    @Override
+    protected ScannerCallable getScannerCallable(final byte[] localStartKey) {
+      return new TransactionScannerCallable(transactionState, getConnection(),
+          getTableName(), getColumns(), localStartKey, getTimestamp(),
+          getFilter());
+    }
+  }
+
+}

Added: hadoop/hbase/trunk/src/java/org/apache/hadoop/hbase/client/transactional/UnknownTransactionException.java
URL: http://svn.apache.org/viewvc/hadoop/hbase/trunk/src/java/org/apache/hadoop/hbase/client/transactional/UnknownTransactionException.java?rev=686650&view=auto
==============================================================================
--- hadoop/hbase/trunk/src/java/org/apache/hadoop/hbase/client/transactional/UnknownTransactionException.java (added)
+++ hadoop/hbase/trunk/src/java/org/apache/hadoop/hbase/client/transactional/UnknownTransactionException.java Sun Aug 17 15:03:42 2008
@@ -0,0 +1,41 @@
+/**
+ * Copyright 2008 The Apache Software Foundation
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.client.transactional;
+
+import org.apache.hadoop.hbase.DoNotRetryIOException;
+
+/**
+ * Thrown if a region server is passed an unknown transaction id
+ */
+ public class UnknownTransactionException extends DoNotRetryIOException {
+
+  /** constructor */
+  public UnknownTransactionException() {
+    super();
+  }
+
+  /**
+   * Constructor
+   * @param s message
+   */
+  public UnknownTransactionException(String s) {
+    super(s);
+  }
+}

Added: hadoop/hbase/trunk/src/java/org/apache/hadoop/hbase/client/transactional/package.html
URL: http://svn.apache.org/viewvc/hadoop/hbase/trunk/src/java/org/apache/hadoop/hbase/client/transactional/package.html?rev=686650&view=auto
==============================================================================
--- hadoop/hbase/trunk/src/java/org/apache/hadoop/hbase/client/transactional/package.html (added)
+++ hadoop/hbase/trunk/src/java/org/apache/hadoop/hbase/client/transactional/package.html Sun Aug 17 15:03:42 2008
@@ -0,0 +1,59 @@
+<!DOCTYPE HTML PUBLIC "-//W3C//DTD HTML 3.2 Final//EN">
+<html>
+
+<!--
+   Licensed to the Apache Software Foundation (ASF) under one or more
+   contributor license agreements.  See the NOTICE file distributed with
+   this work for additional information regarding copyright ownership.
+   The ASF licenses this file to You under the Apache License, Version 2.0
+   (the "License"); you may not use this file except in compliance with
+   the License.  You may obtain a copy of the License at
+
+       http://www.apache.org/licenses/LICENSE-2.0
+
+   Unless required by applicable law or agreed to in writing, software
+   distributed under the License is distributed on an "AS IS" BASIS,
+   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+   See the License for the specific language governing permissions and
+   limitations under the License.
+-->
+
+<head />
+<body bgcolor="white">
+
+This package provides support for atomic transactions. Transactions can
+span multiple regions. Transaction writes are applied when committing a
+transaction. At commit time, the transaction is examined to see if it
+can be applied while still maintaining atomicity. This is done by
+looking for conflicts with the transactions that committed while the
+current transaction was running. This technique is known as optimistic
+concurrency control (OCC) because it relies on the assumption that
+transactions will mostly not have conflicts with each other.
+
+<p>
+For more details on OCC, see the paper <i> On Optimistic Methods for Concurrency Control </i> 
+by Kung and Robinson available 
+<a href=http://www.seas.upenn.edu/~zives/cis650/papers/opt-cc.pdf> here </a>.
+
+<p> To enable transactions, modify hbase-site.xml to turn on the
+TransactionalRegionServer.  This is done by setting
+<i>hbase.regionserver.class</i> to
+<i>org.apache.hadoop.hbase.ipc.TransactionalRegionInterface</i> and
+<i>hbase.regionserver.impl </i> to
+<i>org.apache.hadoop.hbase.regionserver.transactional.TransactionalRegionServer</i>
+
+<h3> Known Issues </h3>
+
+Recovery in the face of hregion server failure
+is not fully implemented. Thus, you cannot rely on the transactional
+properties in the face of node failure.
+
+<p> In order to avoid phantom reads on scanners, scanners currently
+claim a <i>write set</i> for all rows in every regions which they scan
+through. This means that if transaction A writes to a region that
+transaction B is scanning, then there is a conflict (only one
+transacton can be committed). This will occur even if the scanner
+never went over the row that was written.
+ 
+</body>
+</html>

Added: hadoop/hbase/trunk/src/java/org/apache/hadoop/hbase/ipc/TransactionalRegionInterface.java
URL: http://svn.apache.org/viewvc/hadoop/hbase/trunk/src/java/org/apache/hadoop/hbase/ipc/TransactionalRegionInterface.java?rev=686650&view=auto
==============================================================================
--- hadoop/hbase/trunk/src/java/org/apache/hadoop/hbase/ipc/TransactionalRegionInterface.java (added)
+++ hadoop/hbase/trunk/src/java/org/apache/hadoop/hbase/ipc/TransactionalRegionInterface.java Sun Aug 17 15:03:42 2008
@@ -0,0 +1,178 @@
+/*
+ * $Id$
+ * Created on Jun 4, 2008
+ * 
+ */
+package org.apache.hadoop.hbase.ipc;
+
+import java.io.IOException;
+
+import org.apache.hadoop.hbase.filter.RowFilterInterface;
+import org.apache.hadoop.hbase.io.BatchUpdate;
+import org.apache.hadoop.hbase.io.Cell;
+import org.apache.hadoop.hbase.io.RowResult;
+
+/**
+ * Interface for transactional region servers.
+ * 
+ */
+public interface TransactionalRegionInterface extends HRegionInterface {
+  /** Interface version number */
+  public static final long versionID = 1L;
+
+  /**
+   * Sent to initiate a transaction.
+   * 
+   * @param transactionId
+   * @param regionName name of region
+   */
+  public void beginTransaction(long transactionId, final byte[] regionName)
+      throws IOException;
+
+  /**
+   * Retrieve a single value from the specified region for the specified row and
+   * column keys
+   * 
+   * @param regionName name of region
+   * @param row row key
+   * @param column column key
+   * @return alue for that region/row/column
+   * @throws IOException
+   */
+  public Cell get(long transactionId, final byte[] regionName,
+      final byte[] row, final byte[] column) throws IOException;
+
+  /**
+   * Get the specified number of versions of the specified row and column
+   * 
+   * @param regionName region name
+   * @param row row key
+   * @param column column key
+   * @param numVersions number of versions to return
+   * @return array of values
+   * @throws IOException
+   */
+  public Cell[] get(long transactionId, final byte[] regionName,
+      final byte[] row, final byte[] column, final int numVersions)
+      throws IOException;
+
+  /**
+   * Get the specified number of versions of the specified row and column with
+   * the specified timestamp.
+   * 
+   * @param regionName region name
+   * @param row row key
+   * @param column column key
+   * @param timestamp timestamp
+   * @param numVersions number of versions to return
+   * @return array of values
+   * @throws IOException
+   */
+  public Cell[] get(long transactionId, final byte[] regionName,
+      final byte[] row, final byte[] column, final long timestamp,
+      final int numVersions) throws IOException;
+
+  /**
+   * Get all the data for the specified row at a given timestamp
+   * 
+   * @param regionName region name
+   * @param row row key
+   * @return map of values
+   * @throws IOException
+   */
+  public RowResult getRow(long transactionId, final byte[] regionName,
+      final byte[] row, final long ts) throws IOException;
+
+  /**
+   * Get selected columns for the specified row at a given timestamp.
+   * 
+   * @param regionName region name
+   * @param row row key
+   * @return map of values
+   * @throws IOException
+   */
+  public RowResult getRow(long transactionId, final byte[] regionName,
+      final byte[] row, final byte[][] columns, final long ts)
+      throws IOException;
+
+  /**
+   * Get selected columns for the specified row at the latest timestamp.
+   * 
+   * @param regionName region name
+   * @param row row key
+   * @return map of values
+   * @throws IOException
+   */
+  public RowResult getRow(long transactionId, final byte[] regionName,
+      final byte[] row, final byte[][] columns) throws IOException;
+
+  /**
+   * Delete all cells that match the passed row and whose timestamp is equal-to
+   * or older than the passed timestamp.
+   * 
+   * @param regionName region name
+   * @param row row key
+   * @param timestamp Delete all entries that have this timestamp or older
+   * @throws IOException
+   */
+  public void deleteAll(long transactionId, byte[] regionName, byte[] row,
+      long timestamp) throws IOException;
+
+  /**
+   * Opens a remote scanner with a RowFilter.
+   * 
+   * @param transactionId
+   * @param regionName name of region to scan
+   * @param columns columns to scan. If column name is a column family, all
+   * columns of the specified column family are returned. Its also possible to
+   * pass a regex for column family name. A column name is judged to be regex if
+   * it contains at least one of the following characters:
+   * <code>\+|^&*$[]]}{)(</code>.
+   * @param startRow starting row to scan
+   * @param timestamp only return values whose timestamp is <= this value
+   * @param filter RowFilter for filtering results at the row-level.
+   * 
+   * @return scannerId scanner identifier used in other calls
+   * @throws IOException
+   */
+  public long openScanner(final long transactionId, final byte[] regionName,
+      final byte[][] columns, final byte[] startRow, long timestamp,
+      RowFilterInterface filter) throws IOException;
+
+  /**
+   * Applies a batch of updates via one RPC
+   * 
+   * @param regionName name of the region to update
+   * @param b BatchUpdate
+   * @throws IOException
+   */
+  public void batchUpdate(long transactionId, final byte[] regionName,
+      final BatchUpdate b) throws IOException;
+
+  /**
+   * Ask if we can commit the given transaction.
+   * 
+   * @param transactionId
+   * @return true if we can commit
+   */
+  public boolean commitRequest(final byte[] regionName, long transactionId)
+      throws IOException;
+
+  /**
+   * Commit the transaction.
+   * 
+   * @param transactionId
+   * @return
+   */
+  public void commit(final byte[] regionName, long transactionId)
+      throws IOException;
+
+  /**
+   * Abort the transaction.
+   * 
+   * @param transactionId
+   * @return
+   */
+  public void abort(final byte[] regionName, long transactionId)
+      throws IOException;
+}

Modified: hadoop/hbase/trunk/src/java/org/apache/hadoop/hbase/regionserver/HLog.java
URL: http://svn.apache.org/viewvc/hadoop/hbase/trunk/src/java/org/apache/hadoop/hbase/regionserver/HLog.java?rev=686650&r1=686649&r2=686650&view=diff
==============================================================================
--- hadoop/hbase/trunk/src/java/org/apache/hadoop/hbase/regionserver/HLog.java (original)
+++ hadoop/hbase/trunk/src/java/org/apache/hadoop/hbase/regionserver/HLog.java Sun Aug 17 15:03:42 2008
@@ -171,7 +171,7 @@
    * Accessor for tests.
    * @return Current state of the monotonically increasing file id.
    */
-  long getFilenum() {
+  public long getFilenum() {
     return this.filenum;
   }
 
@@ -204,6 +204,10 @@
       }
     }
   }
+  
+  public long getSequenceNumber() {
+    return logSeqNum;
+  }
 
   /**
    * Roll the log writer. That is, start writing log messages to a new file.
@@ -311,7 +315,7 @@
    * This is a convenience method that computes a new filename with a given
    * file-number.
    */
-  Path computeFilename(final long fn) {
+  public Path computeFilename(final long fn) {
     return new Path(dir, HLOG_DATFILE + fn);
   }
 
@@ -330,7 +334,7 @@
    *
    * @throws IOException
    */
-  void close() throws IOException {
+  public void close() throws IOException {
     cacheFlushLock.lock();
     try {
       synchronized (updateLock) {
@@ -391,13 +395,8 @@
           new HLogKey(regionName, tableName, key.getRow(), seqNum[counter++]);
         HLogEdit logEdit =
           new HLogEdit(key.getColumn(), es.getValue(), key.getTimestamp());
-        try {
-      	  this.writer.append(logKey, logEdit);
-      	} catch (IOException e) {
-          LOG.fatal("Could not append. Requesting close of log", e);
-          requestLogRoll();
-          throw e;
-      	}
+       doWrite(logKey, logEdit);
+
         this.numEntries++;
       }
     }
@@ -411,6 +410,63 @@
       this.listener.logRollRequested();
     }
   }
+  
+  private void doWrite(HLogKey logKey, HLogEdit logEdit) throws IOException {
+    try {
+      this.writer.append(logKey, logEdit);
+    } catch (IOException e) {
+      LOG.fatal("Could not append. Requesting close of log", e);
+      requestLogRoll();
+      throw e;
+    }
+  }
+  
+  /** Append an entry without a row to the log.
+   * 
+   * @param regionInfo
+   * @param logEdit
+   * @throws IOException
+   */
+  public void append(HRegionInfo regionInfo, HLogEdit logEdit) throws IOException {
+    this.append(regionInfo, new byte[0], logEdit);
+  }
+  
+  /** Append an entry to the log.
+   * 
+   * @param regionName
+   * @param tableName
+   * @param row
+   * @param logEdit
+   * @throws IOException
+   */
+  public void append(HRegionInfo regionInfo, byte [] row, HLogEdit logEdit) throws IOException {
+    if (closed) {
+      throw new IOException("Cannot append; log is closed");
+    }
+    byte [] regionName = regionInfo.getRegionName();
+    byte [] tableName = regionInfo.getTableDesc().getName();
+    
+    synchronized (updateLock) {
+      long seqNum = obtainSeqNum();
+      // The 'lastSeqWritten' map holds the sequence number of the oldest
+      // write for each region. When the cache is flushed, the entry for the
+      // region being flushed is removed if the sequence number of the flush
+      // is greater than or equal to the value in lastSeqWritten.
+      if (!this.lastSeqWritten.containsKey(regionName)) {
+        this.lastSeqWritten.put(regionName, Long.valueOf(seqNum));
+      }
+
+      HLogKey logKey = new HLogKey(regionName, tableName, row, seqNum);
+      doWrite(logKey, logEdit);
+      this.numEntries++;
+    }
+
+    if (this.numEntries > this.maxlogentries) {
+      if (listener != null) {
+        listener.logRollRequested();
+      }
+    }
+  }
 
   /** @return How many items have been added to the log */
   int getNumEntries() {
@@ -508,6 +564,10 @@
     this.cacheFlushLock.unlock();
   }
 
+  public static boolean isMetaColumn(byte [] column) {
+    return Bytes.equals(METACOLUMN, column);
+  }
+  
   /**
    * Split up a bunch of log files, that are no longer being written to, into
    * new files, one per region. Delete the old log files when finished.

Modified: hadoop/hbase/trunk/src/java/org/apache/hadoop/hbase/regionserver/HLogEdit.java
URL: http://svn.apache.org/viewvc/hadoop/hbase/trunk/src/java/org/apache/hadoop/hbase/regionserver/HLogEdit.java?rev=686650&r1=686649&r2=686650&view=diff
==============================================================================
--- hadoop/hbase/trunk/src/java/org/apache/hadoop/hbase/regionserver/HLogEdit.java (original)
+++ hadoop/hbase/trunk/src/java/org/apache/hadoop/hbase/regionserver/HLogEdit.java Sun Aug 17 15:03:42 2008
@@ -19,6 +19,7 @@
  */
 package org.apache.hadoop.hbase.regionserver;
 
+import org.apache.hadoop.hbase.io.BatchOperation;
 import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
 import org.apache.hadoop.hbase.util.Bytes;
 import org.apache.hadoop.io.*;
@@ -62,11 +63,20 @@
   public static boolean isDeleted(final byte [] value) {
     return (value == null)? false: deleteBytes.compareTo(value) == 0;
   }
+  
+  public enum TransactionalOperation {
+    START, WRITE, COMMIT, ABORT
+  }
 
   private byte [] column;
   private byte [] val;
   private long timestamp;
   private static final int MAX_VALUE_LEN = 128;
+  
+  private boolean isTransactionEntry;
+  private Long transactionId = null;
+  private TransactionalOperation operation;
+
 
   /**
    * Default constructor used by Writable
@@ -85,6 +95,34 @@
     this.column = c;
     this.val = bval;
     this.timestamp = timestamp;
+    this.isTransactionEntry = false;
+  }
+  
+  /** Construct a WRITE transaction. 
+   * 
+   * @param transactionId
+   * @param op
+   * @param timestamp
+   */
+  public HLogEdit(long transactionId, BatchOperation op, long timestamp) {
+    this(op.getColumn(), op.getValue(), timestamp);
+    // This covers delete ops too...
+    this.transactionId = transactionId;
+    this.operation = TransactionalOperation.WRITE;
+    this.isTransactionEntry = true;
+  }
+
+  /** Construct a transactional operation (BEGIN, ABORT, or COMMIT). 
+   * 
+   * @param transactionId
+   * @param op
+   */
+  public HLogEdit(long transactionId, TransactionalOperation op) {
+    this.column = new byte[0];
+    this.val = new byte[0];
+    this.transactionId = transactionId;
+    this.operation = op;
+    this.isTransactionEntry = true;
   }
 
   /** @return the column */
@@ -101,6 +139,28 @@
   public long getTimestamp() {
     return this.timestamp;
   }
+  
+  public boolean isTransactionEntry() {
+    return isTransactionEntry;
+  }
+  
+  /**
+   * Get the transactionId, or null if this is not a transactional edit.
+   * 
+   * @return Return the transactionId.
+   */
+  public Long getTransactionId() {
+    return transactionId;
+  }
+
+  /**
+   * Get the operation.
+   * 
+   * @return Return the operation.
+   */
+  public TransactionalOperation getOperation() {
+    return operation;
+  }
 
   /**
    * @return First column name, timestamp, and first 128 bytes of the value
@@ -117,8 +177,13 @@
     } catch (UnsupportedEncodingException e) {
       throw new RuntimeException("UTF8 encoding not present?", e);
     }
-    return "(" + Bytes.toString(getColumn()) + "/" + getTimestamp() + "/" +
-      value + ")";
+    return "("
+        + Bytes.toString(getColumn())
+        + "/"
+        + getTimestamp()
+        + "/"
+        + (isTransactionEntry ? "tran: " + transactionId + " op "
+            + operation.toString() +"/": "") + value + ")";
   }
   
   // Writable
@@ -126,9 +191,18 @@
   /** {@inheritDoc} */
   public void write(DataOutput out) throws IOException {
     Bytes.writeByteArray(out, this.column);
-    out.writeInt(this.val.length);
-    out.write(this.val);
+    if (this.val == null) {
+      out.writeInt(0);
+    } else {
+      out.writeInt(this.val.length);
+      out.write(this.val);
+    }
     out.writeLong(timestamp);
+    out.writeBoolean(isTransactionEntry);
+    if (isTransactionEntry) {
+      out.writeLong(transactionId);
+      out.writeUTF(operation.name());
+    }
   }
   
   /** {@inheritDoc} */
@@ -137,5 +211,10 @@
     this.val = new byte[in.readInt()];
     in.readFully(this.val);
     this.timestamp = in.readLong();
+    isTransactionEntry = in.readBoolean();
+    if (isTransactionEntry) {
+      transactionId = in.readLong();
+      operation = TransactionalOperation.valueOf(in.readUTF());
+    }
   }
 }

Modified: hadoop/hbase/trunk/src/java/org/apache/hadoop/hbase/regionserver/HLogKey.java
URL: http://svn.apache.org/viewvc/hadoop/hbase/trunk/src/java/org/apache/hadoop/hbase/regionserver/HLogKey.java?rev=686650&r1=686649&r2=686650&view=diff
==============================================================================
--- hadoop/hbase/trunk/src/java/org/apache/hadoop/hbase/regionserver/HLogKey.java (original)
+++ hadoop/hbase/trunk/src/java/org/apache/hadoop/hbase/regionserver/HLogKey.java Sun Aug 17 15:03:42 2008
@@ -30,6 +30,8 @@
  * The log intermingles edits to many tables and rows, so each log entry 
  * identifies the appropriate table and row.  Within a table and row, they're 
  * also sorted.
+ * 
+ * Some Transactional edits (START, COMMIT, ABORT) will not have an associated row.
  */
 public class HLogKey implements WritableComparable {
   private byte [] regionName;
@@ -64,19 +66,19 @@
   // A bunch of accessors
   //////////////////////////////////////////////////////////////////////////////
 
-  byte [] getRegionName() {
+  public byte [] getRegionName() {
     return regionName;
   }
   
-  byte [] getTablename() {
+  public byte [] getTablename() {
     return tablename;
   }
   
-  byte [] getRow() {
+  public byte [] getRow() {
     return row;
   }
   
-  long getLogSeqNum() {
+  public long getLogSeqNum() {
     return logSeqNum;
   }
   

Modified: hadoop/hbase/trunk/src/java/org/apache/hadoop/hbase/regionserver/HRegion.java
URL: http://svn.apache.org/viewvc/hadoop/hbase/trunk/src/java/org/apache/hadoop/hbase/regionserver/HRegion.java?rev=686650&r1=686649&r2=686650&view=diff
==============================================================================
--- hadoop/hbase/trunk/src/java/org/apache/hadoop/hbase/regionserver/HRegion.java (original)
+++ hadoop/hbase/trunk/src/java/org/apache/hadoop/hbase/regionserver/HRegion.java Sun Aug 17 15:03:42 2008
@@ -243,8 +243,8 @@
       LOG.debug("Files for new region");
       listPaths(fs, newRegionDir);
     }
-    HRegion dstRegion = new HRegion(basedir, log, fs, conf, newRegionInfo,
-        null, null);
+    HRegion dstRegion = new HRegion(basedir, log, fs, conf, newRegionInfo, null);
+    dstRegion.initialize(null, null);
     dstRegion.compactStores();
     if (LOG.isDebugEnabled()) {
       LOG.debug("Files for new region");
@@ -318,7 +318,7 @@
   private final Map<Integer, TreeMap<HStoreKey, byte []>> targetColumns =
       new ConcurrentHashMap<Integer, TreeMap<HStoreKey, byte []>>();
   // Default access because read by tests.
-  final Map<Integer, HStore> stores = new ConcurrentHashMap<Integer, HStore>();
+  protected final Map<Integer, HStore> stores = new ConcurrentHashMap<Integer, HStore>();
   final AtomicLong memcacheSize = new AtomicLong(0);
 
   final Path basedir;
@@ -376,7 +376,7 @@
   private final ReentrantReadWriteLock updatesLock =
     new ReentrantReadWriteLock();
   private final Integer splitLock = new Integer(0);
-  private final long minSequenceId;
+  private long minSequenceId;
   final AtomicInteger activeScannerCount = new AtomicInteger(0);
 
   //////////////////////////////////////////////////////////////////////////////
@@ -395,31 +395,6 @@
    * appropriate log info for this HRegion. If there is a previous log file
    * (implying that the HRegion has been written-to before), then read it from
    * the supplied path.
-   * @param fs is the filesystem.  
-   * @param conf is global configuration settings.
-   * @param regionInfo - HRegionInfo that describes the region
-   * @param initialFiles If there are initial files (implying that the HRegion
-   * is new), then read them from the supplied path.
-   * @param flushListener an object that implements CacheFlushListener or null
-   * or null
-   * @throws IOException
-   */
-  public HRegion(Path basedir, HLog log, FileSystem fs, HBaseConfiguration conf, 
-      HRegionInfo regionInfo, Path initialFiles,
-      FlushRequester flushListener) throws IOException {
-    this(basedir, log, fs, conf, regionInfo, initialFiles, flushListener, null);
-  }
-  
-  /**
-   * HRegion constructor.
-   *
-   * @param log The HLog is the outbound log for any updates to the HRegion
-   * (There's a single HLog for all the HRegions on a single HRegionServer.)
-   * The log file is a logfile from the previous execution that's
-   * custom-computed for this HRegion. The HRegionServer computes and sorts the
-   * appropriate log info for this HRegion. If there is a previous log file
-   * (implying that the HRegion has been written-to before), then read it from
-   * the supplied path.
    * @param basedir qualified path of directory where region should be located,
    * usually the table directory.
    * @param fs is the filesystem.  
@@ -434,10 +409,8 @@
    * @throws IOException
    */
   public HRegion(Path basedir, HLog log, FileSystem fs, HBaseConfiguration conf, 
-      HRegionInfo regionInfo, Path initialFiles,
-      FlushRequester flushListener, final Progressable reporter)
-    throws IOException {
-    
+      HRegionInfo regionInfo,
+      FlushRequester flushListener) {
     this.basedir = basedir;
     this.log = log;
     this.fs = fs;
@@ -447,7 +420,6 @@
     this.threadWakeFrequency = conf.getLong(THREAD_WAKE_FREQUENCY, 10 * 1000);
     String encodedNameStr = Integer.toString(this.regionInfo.getEncodedName());
     this.regiondir = new Path(basedir, encodedNameStr);
-    Path oldLogFile = new Path(regiondir, HREGION_OLDLOGFILE_NAME);
     this.historian = RegionHistorian.getInstance();
     
     if (LOG.isDebugEnabled()) {
@@ -457,6 +429,27 @@
     
     this.regionCompactionDir =
       new Path(getCompactionDir(basedir), encodedNameStr);
+   
+    int flushSize = regionInfo.getTableDesc().getMemcacheFlushSize();
+    if (flushSize == HTableDescriptor.DEFAULT_MEMCACHE_FLUSH_SIZE) {
+      flushSize = conf.getInt("hbase.hregion.memcache.flush.size",
+                      HTableDescriptor.DEFAULT_MEMCACHE_FLUSH_SIZE);
+    }
+    this.memcacheFlushSize = flushSize;
+
+    this.blockingMemcacheSize = this.memcacheFlushSize *
+      conf.getInt("hbase.hregion.memcache.block.multiplier", 1);
+  }
+  
+  /** Initialize this region and get it ready to roll.
+   * 
+   * @param initialFiles
+   * @param reporter
+   * @throws IOException
+   */
+  public void initialize( Path initialFiles,
+      final Progressable reporter) throws IOException {
+    Path oldLogFile = new Path(regiondir, HREGION_OLDLOGFILE_NAME);
 
     // Move prefab HStore files into place (if any).  This picks up split files
     // and any merges from splits and merges dirs.
@@ -466,16 +459,20 @@
 
     // Load in all the HStores.
     long maxSeqId = -1;
+    long minSeqId = Integer.MAX_VALUE;
     for (HColumnDescriptor c : this.regionInfo.getTableDesc().getFamilies()) {
       HStore store = instantiateHStore(this.basedir, c, oldLogFile, reporter);
       stores.put(Bytes.mapKey(c.getName()), store);
       long storeSeqId = store.getMaxSequenceId();
       if (storeSeqId > maxSeqId) {
         maxSeqId = storeSeqId;
+      } 
+      if (storeSeqId < minSeqId) {
+        minSeqId = storeSeqId;
       }
     }
     
-    doReconstructionLog(oldLogFile, maxSeqId, reporter);
+    doReconstructionLog(oldLogFile, minSeqId, maxSeqId, reporter);
     
     if (fs.exists(oldLogFile)) {
       if (LOG.isDebugEnabled()) {
@@ -501,17 +498,6 @@
     if (fs.exists(merges)) {
       fs.delete(merges, true);
     }
-
-    int flushSize = regionInfo.getTableDesc().getMemcacheFlushSize();
-    if (flushSize == HTableDescriptor.DEFAULT_MEMCACHE_FLUSH_SIZE) {
-      flushSize = conf.getInt("hbase.hregion.memcache.flush.size",
-                      HTableDescriptor.DEFAULT_MEMCACHE_FLUSH_SIZE);
-    }
-    this.memcacheFlushSize = flushSize;
-
-    this.blockingMemcacheSize = this.memcacheFlushSize *
-      conf.getInt("hbase.hregion.memcache.block.multiplier", 1);
-
     // See if region is meant to run read-only.
     if (this.regionInfo.getTableDesc().isReadOnly()) {
       this.writestate.setReadOnly(true);
@@ -797,10 +783,12 @@
       // Opening the region copies the splits files from the splits directory
       // under each region.
       HRegion regionA =
-        new HRegion(basedir, log, fs, conf, regionAInfo, dirA, null);
+        new HRegion(basedir, log, fs, conf, regionAInfo, null);
+      regionA.initialize(dirA, null);
       regionA.close();
       HRegion regionB =
-        new HRegion(basedir, log, fs, conf, regionBInfo, dirB, null);
+        new HRegion(basedir, log, fs, conf, regionBInfo, null);
+      regionB.initialize(dirB, null);
       regionB.close();
 
       // Cleanup
@@ -1029,12 +1017,14 @@
     // again so its value will represent the size of the updates received
     // during the flush
     long sequenceId = -1L;
+    long completeSequenceId = -1L;
     this.updatesLock.writeLock().lock();
     try {
       for (HStore s: stores.values()) {
         s.snapshot();
       }
       sequenceId = log.startCacheFlush();
+      completeSequenceId = this.getCompleteCacheFlushSequenceId(sequenceId);
       this.memcacheSize.set(0);
     } finally {
       this.updatesLock.writeLock().unlock();
@@ -1050,7 +1040,7 @@
       // Keep running vector of all store files that includes both old and the
       // just-made new flush store file.
       for (HStore hstore: stores.values()) {
-        boolean needsCompaction = hstore.flushCache(sequenceId);
+        boolean needsCompaction = hstore.flushCache(completeSequenceId);
         if (needsCompaction) {
           compactionRequested = true;
         }
@@ -1077,7 +1067,7 @@
     //     and that all updates to the log for this regionName that have lower 
     //     log-sequence-ids can be safely ignored.
     this.log.completeCacheFlush(getRegionName(),
-        regionInfo.getTableDesc().getName(), sequenceId);
+        regionInfo.getTableDesc().getName(), completeSequenceId);
 
     // C. Finally notify anyone waiting on memcache to clear:
     // e.g. checkResources().
@@ -1099,6 +1089,18 @@
     return compactionRequested;
   }
   
+  /**
+   * Get the sequence number to be associated with this cache flush. Used by
+   * TransactionalRegion to not complete pending transactions.
+   * 
+   * 
+   * @param currentSequenceId
+   * @return sequence id to complete the cache flush with
+   */ 
+  protected long getCompleteCacheFlushSequenceId(long currentSequenceId) {
+    return currentSequenceId;
+  }
+  
   //////////////////////////////////////////////////////////////////////////////
   // get() methods for client use.
   //////////////////////////////////////////////////////////////////////////////
@@ -1346,7 +1348,33 @@
    * @param b
    * @throws IOException
    */
-  public void batchUpdate(BatchUpdate b, Integer lockid)
+  public void batchUpdate(BatchUpdate b) throws IOException {
+    this.batchUpdate(b, null, true);
+  }
+  
+  /**
+   * @param b
+   * @throws IOException
+   */
+  public void batchUpdate(BatchUpdate b, boolean writeToWAL) throws IOException {
+    this.batchUpdate(b, null, writeToWAL);
+  }
+
+  
+  /**
+   * @param b
+   * @throws IOException
+   */
+  public void batchUpdate(BatchUpdate b, Integer lockid) throws IOException {
+    this.batchUpdate(b, lockid, true);
+  }
+  
+  /**
+   * @param b
+   * @param writeToWal if true, then we write this update to the log
+   * @throws IOException
+   */
+  public void batchUpdate(BatchUpdate b, Integer lockid, boolean writeToWAL)
   throws IOException {
     checkReadOnly();
 
@@ -1395,7 +1423,7 @@
         this.targetColumns.remove(lid);
 
       if (edits != null && edits.size() > 0) {
-        update(edits);
+        update(edits, writeToWAL);
       }
       
       if (deletes != null && deletes.size() > 0) {
@@ -1597,16 +1625,25 @@
     }
     targets.put(key, val);
   }
-
-  /* 
+  
+  /** 
    * Add updates first to the hlog and then add values to memcache.
    * Warning: Assumption is caller has lock on passed in row.
-   * @param row Row to update.
-   * @param timestamp Timestamp to record the updates against
    * @param updatesByColumn Cell updates by column
    * @throws IOException
    */
-  private void update(final TreeMap<HStoreKey, byte []> updatesByColumn)
+  private void update(final TreeMap<HStoreKey, byte []> updatesByColumn) throws IOException {
+    this.update(updatesByColumn, true);
+  }
+
+  /** 
+   * Add updates first to the hlog (if writeToWal) and then add values to memcache.
+   * Warning: Assumption is caller has lock on passed in row.
+   * @param writeToWAL if true, then we should write to the log
+   * @param updatesByColumn Cell updates by column
+   * @throws IOException
+   */
+  private void update(final TreeMap<HStoreKey, byte []> updatesByColumn, boolean writeToWAL)
   throws IOException {
     if (updatesByColumn == null || updatesByColumn.size() <= 0) {
       return;
@@ -1615,8 +1652,10 @@
     boolean flush = false;
     this.updatesLock.readLock().lock();
     try {
-      this.log.append(regionInfo.getRegionName(),
-        regionInfo.getTableDesc().getName(), updatesByColumn);
+      if (writeToWAL) {
+        this.log.append(regionInfo.getRegionName(), regionInfo.getTableDesc()
+            .getName(), updatesByColumn);
+      }
       long size = 0;
       for (Map.Entry<HStoreKey, byte[]> e: updatesByColumn.entrySet()) {
         HStoreKey key = e.getKey();
@@ -1660,7 +1699,7 @@
   
   // Do any reconstruction needed from the log
   @SuppressWarnings("unused")
-  protected void doReconstructionLog(Path oldLogFile, long maxSeqId,
+  protected void doReconstructionLog(Path oldLogFile, long minSeqId, long maxSeqId,
     Progressable reporter)
   throws UnsupportedEncodingException, IOException {
     // Nothing to do (Replaying is done in HStores)
@@ -2105,9 +2144,11 @@
     if (!info.isMetaRegion()) {
       RegionHistorian.getInstance().addRegionCreation(info);
     }
-    return new HRegion(tableDir,
-      new HLog(fs, new Path(regionDir, HREGION_LOGDIR_NAME), conf, null),
-      fs, conf, info, null, null);
+    HRegion region = new HRegion(tableDir,
+        new HLog(fs, new Path(regionDir, HREGION_LOGDIR_NAME), conf, null),
+        fs, conf, info, null);
+    region.initialize(null, null);
+    return region;
   }
   
   /**
@@ -2134,7 +2175,8 @@
     }
     HRegion r = new HRegion(
         HTableDescriptor.getTableDir(rootDir, info.getTableDesc().getName()),
-        log, FileSystem.get(conf), conf, info, null, null);
+        log, FileSystem.get(conf), conf, info, null);
+    r.initialize(null, null);
     if (log != null) {
       log.setSequenceNumber(r.getMinSequenceId());
     }

Modified: hadoop/hbase/trunk/src/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java
URL: http://svn.apache.org/viewvc/hadoop/hbase/trunk/src/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java?rev=686650&r1=686649&r2=686650&view=diff
==============================================================================
--- hadoop/hbase/trunk/src/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java (original)
+++ hadoop/hbase/trunk/src/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java Sun Aug 17 15:03:42 2008
@@ -900,13 +900,15 @@
   
   protected HRegion instantiateRegion(final HRegionInfo regionInfo)
       throws IOException {
-    return new HRegion(HTableDescriptor.getTableDir(rootDir, regionInfo
-        .getTableDesc().getName()), this.log, this.fs, conf, regionInfo, null,
-        this.cacheFlusher, new Progressable() {
-          public void progress() {
-            addProcessingMessage(regionInfo);
-          }
-        });
+    HRegion r = new HRegion(HTableDescriptor.getTableDir(rootDir, regionInfo
+        .getTableDesc().getName()), this.log, this.fs, conf, regionInfo,
+        this.cacheFlusher);
+    r.initialize(null,  new Progressable() {
+      public void progress() {
+        addProcessingMessage(regionInfo);
+      }
+    });
+    return r; 
   }
   
   /*

Modified: hadoop/hbase/trunk/src/java/org/apache/hadoop/hbase/regionserver/HStore.java
URL: http://svn.apache.org/viewvc/hadoop/hbase/trunk/src/java/org/apache/hadoop/hbase/regionserver/HStore.java?rev=686650&r1=686649&r2=686650&view=diff
==============================================================================
--- hadoop/hbase/trunk/src/java/org/apache/hadoop/hbase/regionserver/HStore.java (original)
+++ hadoop/hbase/trunk/src/java/org/apache/hadoop/hbase/regionserver/HStore.java Sun Aug 17 15:03:42 2008
@@ -311,7 +311,7 @@
         // Check this edit is for me. Also, guard against writing
         // METACOLUMN info such as HBASE::CACHEFLUSH entries
         byte [] column = val.getColumn();
-        if (Bytes.equals(column, HLog.METACOLUMN)
+        if (val.isTransactionEntry() || Bytes.equals(column, HLog.METACOLUMN)
             || !Bytes.equals(key.getRegionName(), info.getRegionName())
             || !HStoreKey.matchingFamily(family.getName(), column)) {
           continue;
@@ -1316,8 +1316,7 @@
    * @return Matching keys.
    * @throws IOException
    */
-  List<HStoreKey> getKeys(final HStoreKey origin, final int versions,
-    final long now)
+  public List<HStoreKey> getKeys(final HStoreKey origin, final int versions, final long now)
   throws IOException {
     // This code below is very close to the body of the get method.  Any 
     // changes in the flow below should also probably be done in get.  TODO:

Added: hadoop/hbase/trunk/src/java/org/apache/hadoop/hbase/regionserver/transactional/CleanOldTransactionsChore.java
URL: http://svn.apache.org/viewvc/hadoop/hbase/trunk/src/java/org/apache/hadoop/hbase/regionserver/transactional/CleanOldTransactionsChore.java?rev=686650&view=auto
==============================================================================
--- hadoop/hbase/trunk/src/java/org/apache/hadoop/hbase/regionserver/transactional/CleanOldTransactionsChore.java (added)
+++ hadoop/hbase/trunk/src/java/org/apache/hadoop/hbase/regionserver/transactional/CleanOldTransactionsChore.java Sun Aug 17 15:03:42 2008
@@ -0,0 +1,53 @@
+/**
+ * Copyright 2008 The Apache Software Foundation
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.regionserver.transactional;
+
+import java.util.concurrent.atomic.AtomicBoolean;
+
+import org.apache.hadoop.hbase.Chore;
+import org.apache.hadoop.hbase.regionserver.HRegion;
+
+/**
+ * Cleans up committed transactions when they are no longer needed to verify
+ * pending transactions.
+ */
+class CleanOldTransactionsChore extends Chore {
+
+  private static final String SLEEP_CONF = "hbase.transaction.clean.sleep";
+  private static final int DEFAULT_SLEEP = 60 * 1000;
+
+  private final TransactionalRegionServer regionServer;
+
+  public CleanOldTransactionsChore(
+      final TransactionalRegionServer regionServer,
+      final AtomicBoolean stopRequest) {
+    super(regionServer.getConfiguration().getInt(SLEEP_CONF, DEFAULT_SLEEP),
+        stopRequest);
+    this.regionServer = regionServer;
+  }
+
+  @Override
+  protected void chore() {
+    for (HRegion region : regionServer.getOnlineRegions()) {
+      ((TransactionalRegion) region).removeUnNeededCommitedTransactions();
+    }
+  }
+
+}