You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hbase.apache.org by ap...@apache.org on 2009/05/14 23:47:59 UTC
svn commit: r774939 - in /hadoop/hbase/trunk_on_hadoop-0.18.3: ./ bin/
src/java/org/apache/hadoop/hbase/ipc/
src/test/org/apache/hadoop/hbase/regionserver/transactional/
Author: apurtell
Date: Thu May 14 21:47:58 2009
New Revision: 774939
URL: http://svn.apache.org/viewvc?rev=774939&view=rev
Log:
pull up to trunk; working infoservers and ipc
Added:
hadoop/hbase/trunk_on_hadoop-0.18.3/src/test/org/apache/hadoop/hbase/regionserver/transactional/DisabledTestTransactionalHLogManager.java
Removed:
hadoop/hbase/trunk_on_hadoop-0.18.3/src/test/org/apache/hadoop/hbase/regionserver/transactional/TestTransactionalHLogManager.java
Modified:
hadoop/hbase/trunk_on_hadoop-0.18.3/CHANGES.txt
hadoop/hbase/trunk_on_hadoop-0.18.3/bin/hbase
hadoop/hbase/trunk_on_hadoop-0.18.3/src/java/org/apache/hadoop/hbase/ipc/HBaseClient.java
hadoop/hbase/trunk_on_hadoop-0.18.3/src/java/org/apache/hadoop/hbase/ipc/HBaseRPC.java
hadoop/hbase/trunk_on_hadoop-0.18.3/src/java/org/apache/hadoop/hbase/ipc/HBaseServer.java
Modified: hadoop/hbase/trunk_on_hadoop-0.18.3/CHANGES.txt
URL: http://svn.apache.org/viewvc/hadoop/hbase/trunk_on_hadoop-0.18.3/CHANGES.txt?rev=774939&r1=774938&r2=774939&view=diff
==============================================================================
--- hadoop/hbase/trunk_on_hadoop-0.18.3/CHANGES.txt (original)
+++ hadoop/hbase/trunk_on_hadoop-0.18.3/CHANGES.txt Thu May 14 21:47:58 2009
@@ -244,6 +244,8 @@
HBASE-1413 Fall back to filesystem block size default if HLog blocksize is
not specified
HBASE-1417 Cleanup disorientating RPC message
+ HBASE-1424 have shell print regioninfo and location on first load if
+ DEBUG enabled
OPTIMIZATIONS
HBASE-1412 Change values for delete column and column family in KeyValue
Modified: hadoop/hbase/trunk_on_hadoop-0.18.3/bin/hbase
URL: http://svn.apache.org/viewvc/hadoop/hbase/trunk_on_hadoop-0.18.3/bin/hbase?rev=774939&r1=774938&r2=774939&view=diff
==============================================================================
--- hadoop/hbase/trunk_on_hadoop-0.18.3/bin/hbase (original)
+++ hadoop/hbase/trunk_on_hadoop-0.18.3/bin/hbase Thu May 14 21:47:58 2009
@@ -133,7 +133,7 @@
CLASSPATH=${CLASSPATH}:$f;
done
-for f in $HBASE_HOME/lib/jsp-2.1/*.jar; do
+for f in $HBASE_HOME/lib/jetty-ext/*.jar; do
CLASSPATH=${CLASSPATH}:$f;
done
Modified: hadoop/hbase/trunk_on_hadoop-0.18.3/src/java/org/apache/hadoop/hbase/ipc/HBaseClient.java
URL: http://svn.apache.org/viewvc/hadoop/hbase/trunk_on_hadoop-0.18.3/src/java/org/apache/hadoop/hbase/ipc/HBaseClient.java?rev=774939&r1=774938&r2=774939&view=diff
==============================================================================
--- hadoop/hbase/trunk_on_hadoop-0.18.3/src/java/org/apache/hadoop/hbase/ipc/HBaseClient.java (original)
+++ hadoop/hbase/trunk_on_hadoop-0.18.3/src/java/org/apache/hadoop/hbase/ipc/HBaseClient.java Thu May 14 21:47:58 2009
@@ -302,7 +302,7 @@
this.socket = socketFactory.createSocket();
this.socket.setTcpNoDelay(tcpNoDelay);
// connection time out is 20s
- this.socket.connect(remoteId.getAddress(), 20000);
+ HBaseRPC.connect(this.socket, remoteId.getAddress(), 20000);
this.socket.setSoTimeout(pingInterval);
break;
} catch (SocketTimeoutException toe) {
Modified: hadoop/hbase/trunk_on_hadoop-0.18.3/src/java/org/apache/hadoop/hbase/ipc/HBaseRPC.java
URL: http://svn.apache.org/viewvc/hadoop/hbase/trunk_on_hadoop-0.18.3/src/java/org/apache/hadoop/hbase/ipc/HBaseRPC.java?rev=774939&r1=774938&r2=774939&view=diff
==============================================================================
--- hadoop/hbase/trunk_on_hadoop-0.18.3/src/java/org/apache/hadoop/hbase/ipc/HBaseRPC.java (original)
+++ hadoop/hbase/trunk_on_hadoop-0.18.3/src/java/org/apache/hadoop/hbase/ipc/HBaseRPC.java Thu May 14 21:47:58 2009
@@ -21,6 +21,7 @@
import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;
+import java.io.InterruptedIOException;
import java.lang.reflect.Array;
import java.lang.reflect.InvocationHandler;
import java.lang.reflect.InvocationTargetException;
@@ -28,11 +29,20 @@
import java.lang.reflect.Proxy;
import java.net.ConnectException;
import java.net.InetSocketAddress;
+import java.net.Socket;
+import java.net.SocketAddress;
import java.net.SocketTimeoutException;
+import java.nio.channels.SelectableChannel;
+import java.nio.channels.SelectionKey;
+import java.nio.channels.Selector;
+import java.nio.channels.SocketChannel;
+import java.nio.channels.spi.SelectorProvider;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Comparator;
import java.util.HashMap;
+import java.util.Iterator;
+import java.util.LinkedList;
import java.util.Map;
import javax.net.SocketFactory;
@@ -47,6 +57,7 @@
import org.apache.hadoop.ipc.VersionedProtocol;
import org.apache.hadoop.net.NetUtils;
import org.apache.hadoop.security.UserGroupInformation;
+import org.apache.hadoop.util.StringUtils;
/** A simple RPC mechanism.
*
@@ -678,4 +689,275 @@
v = v.substring(0, 55)+"...";
LOG.info(v);
}
+
+ public static void connect(Socket socket,
+ SocketAddress endpoint,
+ int timeout) throws IOException {
+ if (socket == null || endpoint == null || timeout < 0) {
+ throw new IllegalArgumentException("Illegal argument for connect()");
+ }
+ SocketChannel ch = socket.getChannel();
+ if (ch == null) {
+ // let the default implementation handle it.
+ socket.connect(endpoint, timeout);
+ } else {
+ connect(ch, endpoint, timeout);
+ }
+ }
+
+ private static SelectorPool selector = new SelectorPool();
+
+ public static void connect(SocketChannel channel, SocketAddress endpoint,
+ int timeout) throws IOException {
+
+ boolean blockingOn = channel.isBlocking();
+ if (blockingOn) {
+ channel.configureBlocking(false);
+ }
+
+ try {
+ if (channel.connect(endpoint)) {
+ return;
+ }
+
+ long timeoutLeft = timeout;
+ long endTime = (timeout > 0) ? (System.currentTimeMillis() + timeout): 0;
+
+ while (true) {
+ // we might have to call finishConnect() more than once
+ // for some channels (with user level protocols)
+
+ int ret = selector.select((SelectableChannel)channel,
+ SelectionKey.OP_CONNECT, timeoutLeft);
+
+ if (ret > 0 && channel.finishConnect()) {
+ return;
+ }
+
+ if (ret == 0 ||
+ (timeout > 0 &&
+ (timeoutLeft = (endTime - System.currentTimeMillis())) <= 0)) {
+ throw new SocketTimeoutException(
+ timeoutExceptionString(channel, timeout,
+ SelectionKey.OP_CONNECT));
+ }
+ }
+ } catch (IOException e) {
+ // javadoc for SocketChannel.connect() says channel should be closed.
+ try {
+ channel.close();
+ } catch (IOException ignored) {}
+ throw e;
+ } finally {
+ if (blockingOn && channel.isOpen()) {
+ channel.configureBlocking(true);
+ }
+ }
+ }
+
+ private static String timeoutExceptionString(SelectableChannel channel,
+ long timeout, int ops) {
+
+ String waitingFor;
+ switch(ops) {
+
+ case SelectionKey.OP_READ :
+ waitingFor = "read"; break;
+
+ case SelectionKey.OP_WRITE :
+ waitingFor = "write"; break;
+
+ case SelectionKey.OP_CONNECT :
+ waitingFor = "connect"; break;
+
+ default :
+ waitingFor = "" + ops;
+ }
+
+ return timeout + " millis timeout while " +
+ "waiting for channel to be ready for " +
+ waitingFor + ". ch : " + channel;
+ }
+
+ /**
+ * This maintains a pool of selectors. These selectors are closed
+ * once they are idle (unused) for a few seconds.
+ */
+ private static class SelectorPool {
+
+ private static class SelectorInfo {
+ Selector selector;
+ long lastActivityTime;
+ LinkedList<SelectorInfo> queue;
+
+ void close() {
+ if (selector != null) {
+ try {
+ selector.close();
+ } catch (IOException e) {
+ LOG.warn("Unexpected exception while closing selector : " +
+ StringUtils.stringifyException(e));
+ }
+ }
+ }
+ }
+
+ private static class ProviderInfo {
+ SelectorProvider provider;
+ LinkedList<SelectorInfo> queue; // lifo
+ ProviderInfo next;
+ }
+
+ private static final long IDLE_TIMEOUT = 10 * 1000; // 10 seconds.
+
+ private ProviderInfo providerList = null;
+
+ /**
+ * Waits on the channel with the given timeout using one of the
+ * cached selectors. It also removes any cached selectors that are
+ * idle for a few seconds.
+ *
+ * @param channel
+ * @param ops
+ * @param timeout
+ * @return
+ * @throws IOException
+ */
+ int select(SelectableChannel channel, int ops, long timeout)
+ throws IOException {
+
+ SelectorInfo info = get(channel);
+
+ SelectionKey key = null;
+ int ret = 0;
+
+ try {
+ while (true) {
+ long start = (timeout == 0) ? 0 : System.currentTimeMillis();
+
+ key = channel.register(info.selector, ops);
+ ret = info.selector.select(timeout);
+
+ if (ret != 0) {
+ return ret;
+ }
+
+ /* Sometimes select() returns 0 much before timeout for
+ * unknown reasons. So select again if required.
+ */
+ if (timeout > 0) {
+ timeout -= System.currentTimeMillis() - start;
+ if (timeout <= 0) {
+ return 0;
+ }
+ }
+
+ if (Thread.currentThread().isInterrupted()) {
+ throw new InterruptedIOException("Interruped while waiting for " +
+ "IO on channel " + channel +
+ ". " + timeout +
+ " millis timeout left.");
+ }
+ }
+ } finally {
+ if (key != null) {
+ key.cancel();
+ }
+
+ //clear the canceled key.
+ try {
+ info.selector.selectNow();
+ } catch (IOException e) {
+ LOG.info("Unexpected Exception while clearing selector : " +
+ StringUtils.stringifyException(e));
+ // don't put the selector back.
+ info.close();
+ return ret;
+ }
+
+ release(info);
+ }
+ }
+
+ /**
+ * Takes one selector from end of LRU list of free selectors.
+ * If there are no selectors awailable, it creates a new selector.
+ * Also invokes trimIdleSelectors().
+ *
+ * @param channel
+ * @return
+ * @throws IOException
+ */
+ private synchronized SelectorInfo get(SelectableChannel channel)
+ throws IOException {
+ SelectorInfo selInfo = null;
+
+ SelectorProvider provider = channel.provider();
+
+ // pick the list : rarely there is more than one provider in use.
+ ProviderInfo pList = providerList;
+ while (pList != null && pList.provider != provider) {
+ pList = pList.next;
+ }
+ if (pList == null) {
+ //LOG.info("Creating new ProviderInfo : " + provider.toString());
+ pList = new ProviderInfo();
+ pList.provider = provider;
+ pList.queue = new LinkedList<SelectorInfo>();
+ pList.next = providerList;
+ providerList = pList;
+ }
+
+ LinkedList<SelectorInfo> queue = pList.queue;
+
+ if (queue.isEmpty()) {
+ Selector selector = provider.openSelector();
+ selInfo = new SelectorInfo();
+ selInfo.selector = selector;
+ selInfo.queue = queue;
+ } else {
+ selInfo = queue.removeLast();
+ }
+
+ trimIdleSelectors(System.currentTimeMillis());
+ return selInfo;
+ }
+
+ /**
+ * puts selector back at the end of LRU list of free selectos.
+ * Also invokes trimIdleSelectors().
+ *
+ * @param info
+ */
+ private synchronized void release(SelectorInfo info) {
+ long now = System.currentTimeMillis();
+ trimIdleSelectors(now);
+ info.lastActivityTime = now;
+ info.queue.addLast(info);
+ }
+
+ /**
+ * Closes selectors that are idle for IDLE_TIMEOUT (10 sec). It does not
+ * traverse the whole list, just over the one that have crossed
+ * the timeout.
+ */
+ private void trimIdleSelectors(long now) {
+ long cutoff = now - IDLE_TIMEOUT;
+
+ for(ProviderInfo pList=providerList; pList != null; pList=pList.next) {
+ if (pList.queue.isEmpty()) {
+ continue;
+ }
+ for(Iterator<SelectorInfo> it = pList.queue.iterator(); it.hasNext();) {
+ SelectorInfo info = it.next();
+ if (info.lastActivityTime > cutoff) {
+ break;
+ }
+ it.remove();
+ info.close();
+ }
+ }
+ }
+ }
+
}
Modified: hadoop/hbase/trunk_on_hadoop-0.18.3/src/java/org/apache/hadoop/hbase/ipc/HBaseServer.java
URL: http://svn.apache.org/viewvc/hadoop/hbase/trunk_on_hadoop-0.18.3/src/java/org/apache/hadoop/hbase/ipc/HBaseServer.java?rev=774939&r1=774938&r2=774939&view=diff
==============================================================================
--- hadoop/hbase/trunk_on_hadoop-0.18.3/src/java/org/apache/hadoop/hbase/ipc/HBaseServer.java (original)
+++ hadoop/hbase/trunk_on_hadoop-0.18.3/src/java/org/apache/hadoop/hbase/ipc/HBaseServer.java Thu May 14 21:47:58 2009
@@ -75,8 +75,7 @@
public static final ByteBuffer HEADER = ByteBuffer.wrap("hrpc".getBytes());
// 1 : Introduce ping and server does not throw away RPCs
- // 3 : RPC was refactored in 0.19
- public static final byte CURRENT_VERSION = 3;
+ public static final byte CURRENT_VERSION = 2;
/**
* How many calls/handler are allowed in the queue.
Added: hadoop/hbase/trunk_on_hadoop-0.18.3/src/test/org/apache/hadoop/hbase/regionserver/transactional/DisabledTestTransactionalHLogManager.java
URL: http://svn.apache.org/viewvc/hadoop/hbase/trunk_on_hadoop-0.18.3/src/test/org/apache/hadoop/hbase/regionserver/transactional/DisabledTestTransactionalHLogManager.java?rev=774939&view=auto
==============================================================================
--- hadoop/hbase/trunk_on_hadoop-0.18.3/src/test/org/apache/hadoop/hbase/regionserver/transactional/DisabledTestTransactionalHLogManager.java (added)
+++ hadoop/hbase/trunk_on_hadoop-0.18.3/src/test/org/apache/hadoop/hbase/regionserver/transactional/DisabledTestTransactionalHLogManager.java Thu May 14 21:47:58 2009
@@ -0,0 +1,308 @@
+/**
+ * Copyright 2008 The Apache Software Foundation
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.regionserver.transactional;
+
+import java.io.IOException;
+import java.util.List;
+import java.util.Map;
+
+import org.apache.hadoop.dfs.MiniDFSCluster;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hbase.HBaseTestCase;
+import org.apache.hadoop.hbase.HConstants;
+import org.apache.hadoop.hbase.HRegionInfo;
+import org.apache.hadoop.hbase.HTableDescriptor;
+import org.apache.hadoop.hbase.io.BatchUpdate;
+import org.apache.hadoop.hbase.regionserver.HLog;
+import org.apache.hadoop.hbase.util.Bytes;
+
+/** JUnit test case for HLog */
+public class DisabledTestTransactionalHLogManager extends HBaseTestCase implements
+ HConstants {
+ private Path dir;
+ private MiniDFSCluster cluster;
+
+ final byte[] tableName = Bytes.toBytes("tablename");
+ final HTableDescriptor tableDesc = new HTableDescriptor(tableName);
+ final HRegionInfo regionInfo = new HRegionInfo(tableDesc,
+ HConstants.EMPTY_START_ROW, HConstants.EMPTY_END_ROW);
+ final byte[] row1 = Bytes.toBytes("row1");
+ final byte[] val1 = Bytes.toBytes("val1");
+ final byte[] row2 = Bytes.toBytes("row2");
+ final byte[] val2 = Bytes.toBytes("val2");
+ final byte[] row3 = Bytes.toBytes("row3");
+ final byte[] val3 = Bytes.toBytes("val3");
+ final byte[] col = Bytes.toBytes("col:A");
+
+ @Override
+ public void setUp() throws Exception {
+ cluster = new MiniDFSCluster(conf, 2, true, (String[]) null);
+ // Set the hbase.rootdir to be the home directory in mini dfs.
+ this.conf.set(HConstants.HBASE_DIR, this.cluster.getFileSystem()
+ .getHomeDirectory().toString());
+ super.setUp();
+ this.dir = new Path("/hbase", getName());
+ if (fs.exists(dir)) {
+ fs.delete(dir, true);
+ }
+ }
+
+ @Override
+ public void tearDown() throws Exception {
+ if (this.fs.exists(this.dir)) {
+ this.fs.delete(this.dir, true);
+ }
+ shutdownDfs(cluster);
+ super.tearDown();
+ }
+
+ /**
+ * @throws IOException
+ */
+ public void testSingleCommit() throws IOException {
+
+ HLog log = new HLog(fs, dir, this.conf, null);
+ TransactionalHLogManager logMangaer = new TransactionalHLogManager(log, fs,
+ regionInfo, conf);
+
+ // Write columns named 1, 2, 3, etc. and then values of single byte
+ // 1, 2, 3...
+ long transactionId = 1;
+ logMangaer.writeStartToLog(transactionId);
+
+ BatchUpdate update1 = new BatchUpdate(row1);
+ update1.put(col, val1);
+ logMangaer.writeUpdateToLog(transactionId, update1);
+
+ BatchUpdate update2 = new BatchUpdate(row2);
+ update2.put(col, val2);
+ logMangaer.writeUpdateToLog(transactionId, update2);
+
+ BatchUpdate update3 = new BatchUpdate(row3);
+ update3.put(col, val3);
+ logMangaer.writeUpdateToLog(transactionId, update3);
+
+ logMangaer.writeCommitToLog(transactionId);
+
+ // log.completeCacheFlush(regionName, tableName, logSeqId);
+
+ log.close();
+ Path filename = log.computeFilename(log.getFilenum());
+
+ Map<Long, List<BatchUpdate>> commits = logMangaer.getCommitsFromLog(
+ filename, -1, null);
+
+ assertEquals(1, commits.size());
+ assertTrue(commits.containsKey(transactionId));
+ assertEquals(3, commits.get(transactionId).size());
+
+ List<BatchUpdate> updates = commits.get(transactionId);
+
+ update1 = updates.get(0);
+ assertTrue(Bytes.equals(row1, update1.getRow()));
+ assertTrue(Bytes.equals(val1, update1.iterator().next().getValue()));
+
+ update2 = updates.get(1);
+ assertTrue(Bytes.equals(row2, update2.getRow()));
+ assertTrue(Bytes.equals(val2, update2.iterator().next().getValue()));
+
+ update3 = updates.get(2);
+ assertTrue(Bytes.equals(row3, update3.getRow()));
+ assertTrue(Bytes.equals(val3, update3.iterator().next().getValue()));
+
+ }
+
+ /**
+ * @throws IOException
+ */
+ public void testSingleAbort() throws IOException {
+
+ HLog log = new HLog(fs, dir, this.conf, null);
+ TransactionalHLogManager logMangaer = new TransactionalHLogManager(log, fs,
+ regionInfo, conf);
+
+ long transactionId = 1;
+ logMangaer.writeStartToLog(transactionId);
+
+ BatchUpdate update1 = new BatchUpdate(row1);
+ update1.put(col, val1);
+ logMangaer.writeUpdateToLog(transactionId, update1);
+
+ BatchUpdate update2 = new BatchUpdate(row2);
+ update2.put(col, val2);
+ logMangaer.writeUpdateToLog(transactionId, update2);
+
+ BatchUpdate update3 = new BatchUpdate(row3);
+ update3.put(col, val3);
+ logMangaer.writeUpdateToLog(transactionId, update3);
+
+ logMangaer.writeAbortToLog(transactionId);
+
+ // log.completeCacheFlush(regionName, tableName, logSeqId);
+
+ log.close();
+ Path filename = log.computeFilename(log.getFilenum());
+
+ Map<Long, List<BatchUpdate>> commits = logMangaer.getCommitsFromLog(
+ filename, -1, null);
+
+ assertEquals(0, commits.size());
+ }
+
+ /**
+ * @throws IOException
+ */
+ public void testInterlievedCommits() throws IOException {
+
+ HLog log = new HLog(fs, dir, this.conf, null);
+ TransactionalHLogManager logMangaer = new TransactionalHLogManager(log, fs,
+ regionInfo, conf);
+
+ long transaction1Id = 1;
+ long transaction2Id = 2;
+ logMangaer.writeStartToLog(transaction1Id);
+
+ BatchUpdate update1 = new BatchUpdate(row1);
+ update1.put(col, val1);
+ logMangaer.writeUpdateToLog(transaction1Id, update1);
+
+ logMangaer.writeStartToLog(transaction2Id);
+
+ BatchUpdate update2 = new BatchUpdate(row2);
+ update2.put(col, val2);
+ logMangaer.writeUpdateToLog(transaction2Id, update2);
+
+ BatchUpdate update3 = new BatchUpdate(row3);
+ update3.put(col, val3);
+ logMangaer.writeUpdateToLog(transaction1Id, update3);
+
+ logMangaer.writeCommitToLog(transaction2Id);
+ logMangaer.writeCommitToLog(transaction1Id);
+
+ // log.completeCacheFlush(regionName, tableName, logSeqId);
+
+ log.close();
+ Path filename = log.computeFilename(log.getFilenum());
+
+ Map<Long, List<BatchUpdate>> commits = logMangaer.getCommitsFromLog(
+ filename, -1, null);
+
+ assertEquals(2, commits.size());
+ assertEquals(2, commits.get(transaction1Id).size());
+ assertEquals(1, commits.get(transaction2Id).size());
+ }
+
+ /**
+ * @throws IOException
+ */
+ public void testInterlievedAbortCommit() throws IOException {
+
+ HLog log = new HLog(fs, dir, this.conf, null);
+ TransactionalHLogManager logMangaer = new TransactionalHLogManager(log, fs,
+ regionInfo, conf);
+
+ long transaction1Id = 1;
+ long transaction2Id = 2;
+ logMangaer.writeStartToLog(transaction1Id);
+
+ BatchUpdate update1 = new BatchUpdate(row1);
+ update1.put(col, val1);
+ logMangaer.writeUpdateToLog(transaction1Id, update1);
+
+ logMangaer.writeStartToLog(transaction2Id);
+
+ BatchUpdate update2 = new BatchUpdate(row2);
+ update2.put(col, val2);
+ logMangaer.writeUpdateToLog(transaction2Id, update2);
+
+ logMangaer.writeAbortToLog(transaction2Id);
+
+ BatchUpdate update3 = new BatchUpdate(row3);
+ update3.put(col, val3);
+ logMangaer.writeUpdateToLog(transaction1Id, update3);
+
+ logMangaer.writeCommitToLog(transaction1Id);
+
+ // log.completeCacheFlush(regionName, tableName, logSeqId);
+
+ log.close();
+ Path filename = log.computeFilename(log.getFilenum());
+
+ Map<Long, List<BatchUpdate>> commits = logMangaer.getCommitsFromLog(
+ filename, -1, null);
+
+ assertEquals(1, commits.size());
+ assertEquals(2, commits.get(transaction1Id).size());
+ }
+
+ /**
+ * @throws IOException
+ */
+ public void testInterlievedCommitAbort() throws IOException {
+
+ HLog log = new HLog(fs, dir, this.conf, null);
+ TransactionalHLogManager logMangaer = new TransactionalHLogManager(log, fs,
+ regionInfo, conf);
+
+ long transaction1Id = 1;
+ long transaction2Id = 2;
+ logMangaer.writeStartToLog(transaction1Id);
+
+ BatchUpdate update1 = new BatchUpdate(row1);
+ update1.put(col, val1);
+ logMangaer.writeUpdateToLog(transaction1Id, update1);
+
+ logMangaer.writeStartToLog(transaction2Id);
+
+ BatchUpdate update2 = new BatchUpdate(row2);
+ update2.put(col, val2);
+ logMangaer.writeUpdateToLog(transaction2Id, update2);
+
+ logMangaer.writeCommitToLog(transaction2Id);
+
+ BatchUpdate update3 = new BatchUpdate(row3);
+ update3.put(col, val3);
+ logMangaer.writeUpdateToLog(transaction1Id, update3);
+
+ logMangaer.writeAbortToLog(transaction1Id);
+
+ // log.completeCacheFlush(regionName, tableName, logSeqId);
+
+ log.close();
+ Path filename = log.computeFilename(log.getFilenum());
+
+ Map<Long, List<BatchUpdate>> commits = logMangaer.getCommitsFromLog(
+ filename, -1, null);
+
+ assertEquals(1, commits.size());
+ assertEquals(1, commits.get(transaction2Id).size());
+ }
+
+ // FIXME Cannot do this test without a global transacton manager
+ // public void testMissingCommit() {
+ // fail();
+ // }
+
+ // FIXME Cannot do this test without a global transacton manager
+ // public void testMissingAbort() {
+ // fail();
+ // }
+
+}