You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hbase.apache.org by ap...@apache.org on 2009/05/14 23:47:59 UTC

svn commit: r774939 - in /hadoop/hbase/trunk_on_hadoop-0.18.3: ./ bin/ src/java/org/apache/hadoop/hbase/ipc/ src/test/org/apache/hadoop/hbase/regionserver/transactional/

Author: apurtell
Date: Thu May 14 21:47:58 2009
New Revision: 774939

URL: http://svn.apache.org/viewvc?rev=774939&view=rev
Log:
pull up to trunk; working infoservers and ipc

Added:
    hadoop/hbase/trunk_on_hadoop-0.18.3/src/test/org/apache/hadoop/hbase/regionserver/transactional/DisabledTestTransactionalHLogManager.java
Removed:
    hadoop/hbase/trunk_on_hadoop-0.18.3/src/test/org/apache/hadoop/hbase/regionserver/transactional/TestTransactionalHLogManager.java
Modified:
    hadoop/hbase/trunk_on_hadoop-0.18.3/CHANGES.txt
    hadoop/hbase/trunk_on_hadoop-0.18.3/bin/hbase
    hadoop/hbase/trunk_on_hadoop-0.18.3/src/java/org/apache/hadoop/hbase/ipc/HBaseClient.java
    hadoop/hbase/trunk_on_hadoop-0.18.3/src/java/org/apache/hadoop/hbase/ipc/HBaseRPC.java
    hadoop/hbase/trunk_on_hadoop-0.18.3/src/java/org/apache/hadoop/hbase/ipc/HBaseServer.java

Modified: hadoop/hbase/trunk_on_hadoop-0.18.3/CHANGES.txt
URL: http://svn.apache.org/viewvc/hadoop/hbase/trunk_on_hadoop-0.18.3/CHANGES.txt?rev=774939&r1=774938&r2=774939&view=diff
==============================================================================
--- hadoop/hbase/trunk_on_hadoop-0.18.3/CHANGES.txt (original)
+++ hadoop/hbase/trunk_on_hadoop-0.18.3/CHANGES.txt Thu May 14 21:47:58 2009
@@ -244,6 +244,8 @@
    HBASE-1413  Fall back to filesystem block size default if HLog blocksize is
                not specified
    HBASE-1417  Cleanup disorientating RPC message
+   HBASE-1424  have shell print regioninfo and location on first load if
+               DEBUG enabled
 
   OPTIMIZATIONS
    HBASE-1412  Change values for delete column and column family in KeyValue

Modified: hadoop/hbase/trunk_on_hadoop-0.18.3/bin/hbase
URL: http://svn.apache.org/viewvc/hadoop/hbase/trunk_on_hadoop-0.18.3/bin/hbase?rev=774939&r1=774938&r2=774939&view=diff
==============================================================================
--- hadoop/hbase/trunk_on_hadoop-0.18.3/bin/hbase (original)
+++ hadoop/hbase/trunk_on_hadoop-0.18.3/bin/hbase Thu May 14 21:47:58 2009
@@ -133,7 +133,7 @@
   CLASSPATH=${CLASSPATH}:$f;
 done
 
-for f in $HBASE_HOME/lib/jsp-2.1/*.jar; do
+for f in $HBASE_HOME/lib/jetty-ext/*.jar; do
   CLASSPATH=${CLASSPATH}:$f;
 done
 

Modified: hadoop/hbase/trunk_on_hadoop-0.18.3/src/java/org/apache/hadoop/hbase/ipc/HBaseClient.java
URL: http://svn.apache.org/viewvc/hadoop/hbase/trunk_on_hadoop-0.18.3/src/java/org/apache/hadoop/hbase/ipc/HBaseClient.java?rev=774939&r1=774938&r2=774939&view=diff
==============================================================================
--- hadoop/hbase/trunk_on_hadoop-0.18.3/src/java/org/apache/hadoop/hbase/ipc/HBaseClient.java (original)
+++ hadoop/hbase/trunk_on_hadoop-0.18.3/src/java/org/apache/hadoop/hbase/ipc/HBaseClient.java Thu May 14 21:47:58 2009
@@ -302,7 +302,7 @@
             this.socket = socketFactory.createSocket();
             this.socket.setTcpNoDelay(tcpNoDelay);
             // connection time out is 20s
-            this.socket.connect(remoteId.getAddress(), 20000);
+            HBaseRPC.connect(this.socket, remoteId.getAddress(), 20000);
             this.socket.setSoTimeout(pingInterval);
             break;
           } catch (SocketTimeoutException toe) {

Modified: hadoop/hbase/trunk_on_hadoop-0.18.3/src/java/org/apache/hadoop/hbase/ipc/HBaseRPC.java
URL: http://svn.apache.org/viewvc/hadoop/hbase/trunk_on_hadoop-0.18.3/src/java/org/apache/hadoop/hbase/ipc/HBaseRPC.java?rev=774939&r1=774938&r2=774939&view=diff
==============================================================================
--- hadoop/hbase/trunk_on_hadoop-0.18.3/src/java/org/apache/hadoop/hbase/ipc/HBaseRPC.java (original)
+++ hadoop/hbase/trunk_on_hadoop-0.18.3/src/java/org/apache/hadoop/hbase/ipc/HBaseRPC.java Thu May 14 21:47:58 2009
@@ -21,6 +21,7 @@
 import java.io.DataInput;
 import java.io.DataOutput;
 import java.io.IOException;
+import java.io.InterruptedIOException;
 import java.lang.reflect.Array;
 import java.lang.reflect.InvocationHandler;
 import java.lang.reflect.InvocationTargetException;
@@ -28,11 +29,20 @@
 import java.lang.reflect.Proxy;
 import java.net.ConnectException;
 import java.net.InetSocketAddress;
+import java.net.Socket;
+import java.net.SocketAddress;
 import java.net.SocketTimeoutException;
+import java.nio.channels.SelectableChannel;
+import java.nio.channels.SelectionKey;
+import java.nio.channels.Selector;
+import java.nio.channels.SocketChannel;
+import java.nio.channels.spi.SelectorProvider;
 import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.Comparator;
 import java.util.HashMap;
+import java.util.Iterator;
+import java.util.LinkedList;
 import java.util.Map;
 
 import javax.net.SocketFactory;
@@ -47,6 +57,7 @@
 import org.apache.hadoop.ipc.VersionedProtocol;
 import org.apache.hadoop.net.NetUtils;
 import org.apache.hadoop.security.UserGroupInformation;
+import org.apache.hadoop.util.StringUtils;
 
 /** A simple RPC mechanism.
  *
@@ -678,4 +689,275 @@
       v = v.substring(0, 55)+"...";
     LOG.info(v);
   }
+
+  public static void connect(Socket socket, 
+      SocketAddress endpoint, 
+      int timeout) throws IOException {
+    if (socket == null || endpoint == null || timeout < 0) {
+      throw new IllegalArgumentException("Illegal argument for connect()");
+    }
+    SocketChannel ch = socket.getChannel();
+    if (ch == null) {
+      // let the default implementation handle it.
+      socket.connect(endpoint, timeout);
+    } else {
+      connect(ch, endpoint, timeout);
+    }
+  }
+
+  private static SelectorPool selector = new SelectorPool();
+
+  public static void connect(SocketChannel channel, SocketAddress endpoint,
+      int timeout) throws IOException {
+    
+    boolean blockingOn = channel.isBlocking();
+    if (blockingOn) {
+      channel.configureBlocking(false);
+    }
+    
+    try { 
+      if (channel.connect(endpoint)) {
+        return;
+      }
+
+      long timeoutLeft = timeout;
+      long endTime = (timeout > 0) ? (System.currentTimeMillis() + timeout): 0;
+      
+      while (true) {
+        // we might have to call finishConnect() more than once
+        // for some channels (with user level protocols)
+        
+        int ret = selector.select((SelectableChannel)channel, 
+                                  SelectionKey.OP_CONNECT, timeoutLeft);
+        
+        if (ret > 0 && channel.finishConnect()) {
+          return;
+        }
+        
+        if (ret == 0 ||
+            (timeout > 0 &&  
+              (timeoutLeft = (endTime - System.currentTimeMillis())) <= 0)) {
+          throw new SocketTimeoutException(
+                    timeoutExceptionString(channel, timeout, 
+                                           SelectionKey.OP_CONNECT));
+        }
+      }
+    } catch (IOException e) {
+      // javadoc for SocketChannel.connect() says channel should be closed.
+      try {
+        channel.close();
+      } catch (IOException ignored) {}
+      throw e;
+    } finally {
+      if (blockingOn && channel.isOpen()) {
+        channel.configureBlocking(true);
+      }
+    }
+  }
+
+  private static String timeoutExceptionString(SelectableChannel channel,
+      long timeout, int ops) {
+
+    String waitingFor;
+    switch(ops) {
+
+    case SelectionKey.OP_READ :
+      waitingFor = "read"; break;
+
+    case SelectionKey.OP_WRITE :
+      waitingFor = "write"; break;      
+
+    case SelectionKey.OP_CONNECT :
+      waitingFor = "connect"; break;
+
+    default :
+      waitingFor = "" + ops;  
+    }
+
+    return timeout + " millis timeout while " +
+      "waiting for channel to be ready for " + 
+      waitingFor + ". ch : " + channel;    
+  }
+
+  /**
+   * This maintains a pool of selectors. These selectors are closed
+   * once they are idle (unused) for a few seconds.
+   */
+  private static class SelectorPool {
+    
+    private static class SelectorInfo {
+      Selector              selector;
+      long                  lastActivityTime;
+      LinkedList<SelectorInfo> queue; 
+      
+      void close() {
+        if (selector != null) {
+          try {
+            selector.close();
+          } catch (IOException e) {
+            LOG.warn("Unexpected exception while closing selector : " +
+                     StringUtils.stringifyException(e));
+          }
+        }
+      }    
+    }
+    
+    private static class ProviderInfo {
+      SelectorProvider provider;
+      LinkedList<SelectorInfo> queue; // lifo
+      ProviderInfo next;
+    }
+    
+    private static final long IDLE_TIMEOUT = 10 * 1000; // 10 seconds.
+    
+    private ProviderInfo providerList = null;
+    
+    /**
+     * Waits on the channel with the given timeout using one of the 
+     * cached selectors. It also removes any cached selectors that are
+     * idle for a few seconds.
+     * 
+     * @param channel
+     * @param ops
+     * @param timeout
+     * @return
+     * @throws IOException
+     */
+    int select(SelectableChannel channel, int ops, long timeout) 
+                                                   throws IOException {
+     
+      SelectorInfo info = get(channel);
+      
+      SelectionKey key = null;
+      int ret = 0;
+      
+      try {
+        while (true) {
+          long start = (timeout == 0) ? 0 : System.currentTimeMillis();
+
+          key = channel.register(info.selector, ops);
+          ret = info.selector.select(timeout);
+          
+          if (ret != 0) {
+            return ret;
+          }
+          
+          /* Sometimes select() returns 0 much before timeout for 
+           * unknown reasons. So select again if required.
+           */
+          if (timeout > 0) {
+            timeout -= System.currentTimeMillis() - start;
+            if (timeout <= 0) {
+              return 0;
+            }
+          }
+          
+          if (Thread.currentThread().isInterrupted()) {
+            throw new InterruptedIOException("Interruped while waiting for " +
+                                             "IO on channel " + channel +
+                                             ". " + timeout + 
+                                             " millis timeout left.");
+          }
+        }
+      } finally {
+        if (key != null) {
+          key.cancel();
+        }
+        
+        //clear the canceled key.
+        try {
+          info.selector.selectNow();
+        } catch (IOException e) {
+          LOG.info("Unexpected Exception while clearing selector : " +
+                   StringUtils.stringifyException(e));
+          // don't put the selector back.
+          info.close();
+          return ret; 
+        }
+        
+        release(info);
+      }
+    }
+    
+    /**
+     * Takes one selector from end of LRU list of free selectors.
+     * If there are no selectors awailable, it creates a new selector.
+     * Also invokes trimIdleSelectors(). 
+     * 
+     * @param channel
+     * @return 
+     * @throws IOException
+     */
+    private synchronized SelectorInfo get(SelectableChannel channel) 
+                                                         throws IOException {
+      SelectorInfo selInfo = null;
+      
+      SelectorProvider provider = channel.provider();
+      
+      // pick the list : rarely there is more than one provider in use.
+      ProviderInfo pList = providerList;
+      while (pList != null && pList.provider != provider) {
+        pList = pList.next;
+      }      
+      if (pList == null) {
+        //LOG.info("Creating new ProviderInfo : " + provider.toString());
+        pList = new ProviderInfo();
+        pList.provider = provider;
+        pList.queue = new LinkedList<SelectorInfo>();
+        pList.next = providerList;
+        providerList = pList;
+      }
+      
+      LinkedList<SelectorInfo> queue = pList.queue;
+      
+      if (queue.isEmpty()) {
+        Selector selector = provider.openSelector();
+        selInfo = new SelectorInfo();
+        selInfo.selector = selector;
+        selInfo.queue = queue;
+      } else {
+        selInfo = queue.removeLast();
+      }
+      
+      trimIdleSelectors(System.currentTimeMillis());
+      return selInfo;
+    }
+    
+    /**
+     * puts selector back at the end of LRU list of free selectos.
+     * Also invokes trimIdleSelectors().
+     * 
+     * @param info
+     */
+    private synchronized void release(SelectorInfo info) {
+      long now = System.currentTimeMillis();
+      trimIdleSelectors(now);
+      info.lastActivityTime = now;
+      info.queue.addLast(info);
+    }
+    
+    /**
+     * Closes selectors that are idle for IDLE_TIMEOUT (10 sec). It does not
+     * traverse the whole list, just over the one that have crossed 
+     * the timeout.
+     */
+    private void trimIdleSelectors(long now) {
+      long cutoff = now - IDLE_TIMEOUT;
+      
+      for(ProviderInfo pList=providerList; pList != null; pList=pList.next) {
+        if (pList.queue.isEmpty()) {
+          continue;
+        }
+        for(Iterator<SelectorInfo> it = pList.queue.iterator(); it.hasNext();) {
+          SelectorInfo info = it.next();
+          if (info.lastActivityTime > cutoff) {
+            break;
+          }
+          it.remove();
+          info.close();
+        }
+      }
+    }
+  }
+
 }

Modified: hadoop/hbase/trunk_on_hadoop-0.18.3/src/java/org/apache/hadoop/hbase/ipc/HBaseServer.java
URL: http://svn.apache.org/viewvc/hadoop/hbase/trunk_on_hadoop-0.18.3/src/java/org/apache/hadoop/hbase/ipc/HBaseServer.java?rev=774939&r1=774938&r2=774939&view=diff
==============================================================================
--- hadoop/hbase/trunk_on_hadoop-0.18.3/src/java/org/apache/hadoop/hbase/ipc/HBaseServer.java (original)
+++ hadoop/hbase/trunk_on_hadoop-0.18.3/src/java/org/apache/hadoop/hbase/ipc/HBaseServer.java Thu May 14 21:47:58 2009
@@ -75,8 +75,7 @@
   public static final ByteBuffer HEADER = ByteBuffer.wrap("hrpc".getBytes());
   
   // 1 : Introduce ping and server does not throw away RPCs
-  // 3 : RPC was refactored in 0.19 
-  public static final byte CURRENT_VERSION = 3;
+  public static final byte CURRENT_VERSION = 2;
   
   /**
    * How many calls/handler are allowed in the queue.

Added: hadoop/hbase/trunk_on_hadoop-0.18.3/src/test/org/apache/hadoop/hbase/regionserver/transactional/DisabledTestTransactionalHLogManager.java
URL: http://svn.apache.org/viewvc/hadoop/hbase/trunk_on_hadoop-0.18.3/src/test/org/apache/hadoop/hbase/regionserver/transactional/DisabledTestTransactionalHLogManager.java?rev=774939&view=auto
==============================================================================
--- hadoop/hbase/trunk_on_hadoop-0.18.3/src/test/org/apache/hadoop/hbase/regionserver/transactional/DisabledTestTransactionalHLogManager.java (added)
+++ hadoop/hbase/trunk_on_hadoop-0.18.3/src/test/org/apache/hadoop/hbase/regionserver/transactional/DisabledTestTransactionalHLogManager.java Thu May 14 21:47:58 2009
@@ -0,0 +1,308 @@
+/**
+ * Copyright 2008 The Apache Software Foundation
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.regionserver.transactional;
+
+import java.io.IOException;
+import java.util.List;
+import java.util.Map;
+
+import org.apache.hadoop.dfs.MiniDFSCluster;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hbase.HBaseTestCase;
+import org.apache.hadoop.hbase.HConstants;
+import org.apache.hadoop.hbase.HRegionInfo;
+import org.apache.hadoop.hbase.HTableDescriptor;
+import org.apache.hadoop.hbase.io.BatchUpdate;
+import org.apache.hadoop.hbase.regionserver.HLog;
+import org.apache.hadoop.hbase.util.Bytes;
+
+/** JUnit test case for HLog */
+public class DisabledTestTransactionalHLogManager extends HBaseTestCase implements
+    HConstants {
+  private Path dir;
+  private MiniDFSCluster cluster;
+
+  final byte[] tableName = Bytes.toBytes("tablename");
+  final HTableDescriptor tableDesc = new HTableDescriptor(tableName);
+  final HRegionInfo regionInfo = new HRegionInfo(tableDesc,
+      HConstants.EMPTY_START_ROW, HConstants.EMPTY_END_ROW);
+  final byte[] row1 = Bytes.toBytes("row1");
+  final byte[] val1 = Bytes.toBytes("val1");
+  final byte[] row2 = Bytes.toBytes("row2");
+  final byte[] val2 = Bytes.toBytes("val2");
+  final byte[] row3 = Bytes.toBytes("row3");
+  final byte[] val3 = Bytes.toBytes("val3");
+  final byte[] col = Bytes.toBytes("col:A");
+
+  @Override
+  public void setUp() throws Exception {
+    cluster = new MiniDFSCluster(conf, 2, true, (String[]) null);
+    // Set the hbase.rootdir to be the home directory in mini dfs.
+    this.conf.set(HConstants.HBASE_DIR, this.cluster.getFileSystem()
+        .getHomeDirectory().toString());
+    super.setUp();
+    this.dir = new Path("/hbase", getName());
+    if (fs.exists(dir)) {
+      fs.delete(dir, true);
+    }
+  }
+
+  @Override
+  public void tearDown() throws Exception {
+    if (this.fs.exists(this.dir)) {
+      this.fs.delete(this.dir, true);
+    }
+    shutdownDfs(cluster);
+    super.tearDown();
+  }
+
+  /**
+   * @throws IOException
+   */
+  public void testSingleCommit() throws IOException {
+
+    HLog log = new HLog(fs, dir, this.conf, null);
+    TransactionalHLogManager logMangaer = new TransactionalHLogManager(log, fs,
+        regionInfo, conf);
+
+    // Write columns named 1, 2, 3, etc. and then values of single byte
+    // 1, 2, 3...
+    long transactionId = 1;
+    logMangaer.writeStartToLog(transactionId);
+
+    BatchUpdate update1 = new BatchUpdate(row1);
+    update1.put(col, val1);
+    logMangaer.writeUpdateToLog(transactionId, update1);
+
+    BatchUpdate update2 = new BatchUpdate(row2);
+    update2.put(col, val2);
+    logMangaer.writeUpdateToLog(transactionId, update2);
+
+    BatchUpdate update3 = new BatchUpdate(row3);
+    update3.put(col, val3);
+    logMangaer.writeUpdateToLog(transactionId, update3);
+
+    logMangaer.writeCommitToLog(transactionId);
+
+    // log.completeCacheFlush(regionName, tableName, logSeqId);
+
+    log.close();
+    Path filename = log.computeFilename(log.getFilenum());
+
+    Map<Long, List<BatchUpdate>> commits = logMangaer.getCommitsFromLog(
+        filename, -1, null);
+
+    assertEquals(1, commits.size());
+    assertTrue(commits.containsKey(transactionId));
+    assertEquals(3, commits.get(transactionId).size());
+
+    List<BatchUpdate> updates = commits.get(transactionId);
+
+    update1 = updates.get(0);
+    assertTrue(Bytes.equals(row1, update1.getRow()));
+    assertTrue(Bytes.equals(val1, update1.iterator().next().getValue()));
+
+    update2 = updates.get(1);
+    assertTrue(Bytes.equals(row2, update2.getRow()));
+    assertTrue(Bytes.equals(val2, update2.iterator().next().getValue()));
+
+    update3 = updates.get(2);
+    assertTrue(Bytes.equals(row3, update3.getRow()));
+    assertTrue(Bytes.equals(val3, update3.iterator().next().getValue()));
+
+  }
+  
+  /**
+   * @throws IOException
+   */
+  public void testSingleAbort() throws IOException {
+
+    HLog log = new HLog(fs, dir, this.conf, null);
+    TransactionalHLogManager logMangaer = new TransactionalHLogManager(log, fs,
+        regionInfo, conf);
+
+    long transactionId = 1;
+    logMangaer.writeStartToLog(transactionId);
+
+    BatchUpdate update1 = new BatchUpdate(row1);
+    update1.put(col, val1);
+    logMangaer.writeUpdateToLog(transactionId, update1);
+
+    BatchUpdate update2 = new BatchUpdate(row2);
+    update2.put(col, val2);
+    logMangaer.writeUpdateToLog(transactionId, update2);
+
+    BatchUpdate update3 = new BatchUpdate(row3);
+    update3.put(col, val3);
+    logMangaer.writeUpdateToLog(transactionId, update3);
+
+    logMangaer.writeAbortToLog(transactionId);
+
+    // log.completeCacheFlush(regionName, tableName, logSeqId);
+
+    log.close();
+    Path filename = log.computeFilename(log.getFilenum());
+
+    Map<Long, List<BatchUpdate>> commits = logMangaer.getCommitsFromLog(
+        filename, -1, null);
+
+    assertEquals(0, commits.size());
+  }
+  
+  /**
+   * @throws IOException
+   */
+  public void testInterlievedCommits() throws IOException {
+
+    HLog log = new HLog(fs, dir, this.conf, null);
+    TransactionalHLogManager logMangaer = new TransactionalHLogManager(log, fs,
+        regionInfo, conf);
+
+    long transaction1Id = 1;
+    long transaction2Id = 2;
+    logMangaer.writeStartToLog(transaction1Id);
+
+    BatchUpdate update1 = new BatchUpdate(row1);
+    update1.put(col, val1);
+    logMangaer.writeUpdateToLog(transaction1Id, update1);
+
+    logMangaer.writeStartToLog(transaction2Id);
+    
+    BatchUpdate update2 = new BatchUpdate(row2);
+    update2.put(col, val2);
+    logMangaer.writeUpdateToLog(transaction2Id, update2);
+
+    BatchUpdate update3 = new BatchUpdate(row3);
+    update3.put(col, val3);
+    logMangaer.writeUpdateToLog(transaction1Id, update3);
+
+    logMangaer.writeCommitToLog(transaction2Id);
+    logMangaer.writeCommitToLog(transaction1Id);
+
+    // log.completeCacheFlush(regionName, tableName, logSeqId);
+
+    log.close();
+    Path filename = log.computeFilename(log.getFilenum());
+
+    Map<Long, List<BatchUpdate>> commits = logMangaer.getCommitsFromLog(
+        filename, -1, null);
+
+    assertEquals(2, commits.size());
+    assertEquals(2, commits.get(transaction1Id).size());
+    assertEquals(1, commits.get(transaction2Id).size());
+  }
+  
+  /**
+   * @throws IOException
+   */
+  public void testInterlievedAbortCommit() throws IOException {
+
+    HLog log = new HLog(fs, dir, this.conf, null);
+    TransactionalHLogManager logMangaer = new TransactionalHLogManager(log, fs,
+        regionInfo, conf);
+
+    long transaction1Id = 1;
+    long transaction2Id = 2;
+    logMangaer.writeStartToLog(transaction1Id);
+
+    BatchUpdate update1 = new BatchUpdate(row1);
+    update1.put(col, val1);
+    logMangaer.writeUpdateToLog(transaction1Id, update1);
+
+    logMangaer.writeStartToLog(transaction2Id);
+    
+    BatchUpdate update2 = new BatchUpdate(row2);
+    update2.put(col, val2);
+    logMangaer.writeUpdateToLog(transaction2Id, update2);
+
+    logMangaer.writeAbortToLog(transaction2Id);
+    
+    BatchUpdate update3 = new BatchUpdate(row3);
+    update3.put(col, val3);
+    logMangaer.writeUpdateToLog(transaction1Id, update3);
+
+    logMangaer.writeCommitToLog(transaction1Id);
+
+    // log.completeCacheFlush(regionName, tableName, logSeqId);
+
+    log.close();
+    Path filename = log.computeFilename(log.getFilenum());
+
+    Map<Long, List<BatchUpdate>> commits = logMangaer.getCommitsFromLog(
+        filename, -1, null);
+
+    assertEquals(1, commits.size());
+    assertEquals(2, commits.get(transaction1Id).size());
+  }
+  
+  /**
+   * @throws IOException
+   */
+  public void testInterlievedCommitAbort() throws IOException {
+
+    HLog log = new HLog(fs, dir, this.conf, null);
+    TransactionalHLogManager logMangaer = new TransactionalHLogManager(log, fs,
+        regionInfo, conf);
+
+    long transaction1Id = 1;
+    long transaction2Id = 2;
+    logMangaer.writeStartToLog(transaction1Id);
+
+    BatchUpdate update1 = new BatchUpdate(row1);
+    update1.put(col, val1);
+    logMangaer.writeUpdateToLog(transaction1Id, update1);
+
+    logMangaer.writeStartToLog(transaction2Id);
+    
+    BatchUpdate update2 = new BatchUpdate(row2);
+    update2.put(col, val2);
+    logMangaer.writeUpdateToLog(transaction2Id, update2);
+
+    logMangaer.writeCommitToLog(transaction2Id);
+    
+    BatchUpdate update3 = new BatchUpdate(row3);
+    update3.put(col, val3);
+    logMangaer.writeUpdateToLog(transaction1Id, update3);
+
+    logMangaer.writeAbortToLog(transaction1Id);
+
+    // log.completeCacheFlush(regionName, tableName, logSeqId);
+
+    log.close();
+    Path filename = log.computeFilename(log.getFilenum());
+
+    Map<Long, List<BatchUpdate>> commits = logMangaer.getCommitsFromLog(
+        filename, -1, null);
+
+    assertEquals(1, commits.size());
+    assertEquals(1, commits.get(transaction2Id).size());
+  }
+  
+  // FIXME Cannot do this test without a global transacton manager
+  // public void testMissingCommit() {
+  // fail();
+  // }
+
+  // FIXME Cannot do this test without a global transacton manager
+  // public void testMissingAbort() {
+  // fail();
+  // }
+
+}