You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hbase.apache.org by st...@apache.org on 2008/08/18 00:03:43 UTC
svn commit: r686650 [2/3] - in /hadoop/hbase/trunk: ./
src/java/org/apache/hadoop/hbase/ src/java/org/apache/hadoop/hbase/client/
src/java/org/apache/hadoop/hbase/client/transactional/
src/java/org/apache/hadoop/hbase/ipc/ src/java/org/apache/hadoop/hb...
Added: hadoop/hbase/trunk/src/java/org/apache/hadoop/hbase/regionserver/transactional/TransactionState.java
URL: http://svn.apache.org/viewvc/hadoop/hbase/trunk/src/java/org/apache/hadoop/hbase/regionserver/transactional/TransactionState.java?rev=686650&view=auto
==============================================================================
--- hadoop/hbase/trunk/src/java/org/apache/hadoop/hbase/regionserver/transactional/TransactionState.java (added)
+++ hadoop/hbase/trunk/src/java/org/apache/hadoop/hbase/regionserver/transactional/TransactionState.java Sun Aug 17 15:03:42 2008
@@ -0,0 +1,296 @@
+/**
+ * Copyright 2008 The Apache Software Foundation
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.regionserver.transactional;
+
+import java.util.ArrayList;
+import java.util.HashSet;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.SortedSet;
+import java.util.TreeMap;
+import java.util.TreeSet;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.hbase.io.BatchOperation;
+import org.apache.hadoop.hbase.io.BatchUpdate;
+import org.apache.hadoop.hbase.io.Cell;
+import org.apache.hadoop.hbase.util.Bytes;
+
+/**
+ * Holds the state of a transaction.
+ */
+class TransactionState {
+
+ private static final Log LOG = LogFactory.getLog(TransactionState.class);
+
+ /** Current status. */
+ public enum Status {
+ /** Initial status, still performing operations. */
+ PENDING,
+ /**
+ * Checked if we can commit, and said yes. Still need to determine the
+ * global decision.
+ */
+ COMMIT_PENDING,
+ /** Committed. */
+ COMMITED,
+ /** Aborted. */
+ ABORTED
+ }
+
+ private final long hLogStartSequenceId;
+ private final long transactionId;
+ private Status status;
+ private SortedSet<byte[]> readSet = new TreeSet<byte[]>(
+ Bytes.BYTES_COMPARATOR);
+ private List<BatchUpdate> writeSet = new LinkedList<BatchUpdate>();
+ private Set<TransactionState> transactionsToCheck = new HashSet<TransactionState>();
+ private int startSequenceNumber;
+ private Integer sequenceNumber;
+ boolean hasScan = false;
+
+ public TransactionState(final long transactionId,
+ final long rLogStartSequenceId) {
+ this.transactionId = transactionId;
+ this.hLogStartSequenceId = rLogStartSequenceId;
+ this.status = Status.PENDING;
+ }
+
+ public void addRead(final byte[] rowKey) {
+ readSet.add(rowKey);
+ }
+
+ public Set<byte[]> getReadSet() {
+ return readSet;
+ }
+
+ public void addWrite(final BatchUpdate write) {
+ writeSet.add(write);
+ }
+
+ public List<BatchUpdate> getWriteSet() {
+ return writeSet;
+ }
+
+ /**
+ * GetFull from the writeSet.
+ *
+ * @param row
+ * @param columns
+ * @param timestamp
+ * @return
+ */
+ public Map<byte[], Cell> localGetFull(final byte[] row,
+ final Set<byte[]> columns, final long timestamp) {
+ Map<byte[], Cell> results = new TreeMap<byte[], Cell>(
+ Bytes.BYTES_COMPARATOR); // Must use the Bytes Conparator because
+ for (BatchUpdate b : writeSet) {
+ if (!Bytes.equals(row, b.getRow())) {
+ continue;
+ }
+ if (b.getTimestamp() > timestamp) {
+ continue;
+ }
+ for (BatchOperation op : b) {
+ if (!op.isPut()
+ || (columns != null && !columns.contains(op.getColumn()))) {
+ continue;
+ }
+ results.put(op.getColumn(), new Cell(op.getValue(), b.getTimestamp()));
+ }
+ }
+ return results.size() == 0 ? null : results;
+ }
+
+ /**
+ * Get from the writeSet.
+ *
+ * @param row
+ * @param column
+ * @param timestamp
+ * @return
+ */
+ public Cell[] localGet(final byte[] row, final byte[] column,
+ final long timestamp) {
+ ArrayList<Cell> results = new ArrayList<Cell>();
+
+ // Go in reverse order to put newest updates first in list
+ for (int i = writeSet.size() - 1; i >= 0; i--) {
+ BatchUpdate b = writeSet.get(i);
+
+ if (!Bytes.equals(row, b.getRow())) {
+ continue;
+ }
+ if (b.getTimestamp() > timestamp) {
+ continue;
+ }
+ for (BatchOperation op : b) {
+ if (!op.isPut() || !Bytes.equals(column, op.getColumn())) {
+ continue;
+ }
+ results.add(new Cell(op.getValue(), b.getTimestamp()));
+ }
+ }
+ return results.size() == 0 ? null : results
+ .toArray(new Cell[results.size()]);
+ }
+
+ public void addTransactionToCheck(final TransactionState transaction) {
+ transactionsToCheck.add(transaction);
+ }
+
+ public boolean hasConflict() {
+ for (TransactionState transactionState : transactionsToCheck) {
+ if (hasConflict(transactionState)) {
+ return true;
+ }
+ }
+ return false;
+ }
+
+ private boolean hasConflict(final TransactionState checkAgainst) {
+ if (checkAgainst.getStatus().equals(TransactionState.Status.ABORTED)) {
+ return false; // Cannot conflict with aborted transactions
+ }
+
+ for (BatchUpdate otherUpdate : checkAgainst.getWriteSet()) {
+ if (this.hasScan) {
+ LOG.info("Transaction" + this.toString()
+ + " has a scan read. Meanwile a write occured. "
+ + "Conservitivly reporting conflict");
+ return true;
+ }
+
+ if (this.getReadSet().contains(otherUpdate.getRow())) {
+ LOG.trace("Transaction " + this.toString() + " conflicts with "
+ + checkAgainst.toString());
+ return true;
+ }
+ }
+ return false;
+ }
+
+ /**
+ * Get the status.
+ *
+ * @return Return the status.
+ */
+ public Status getStatus() {
+ return status;
+ }
+
+ /**
+ * Set the status.
+ *
+ * @param status The status to set.
+ */
+ public void setStatus(final Status status) {
+ this.status = status;
+ }
+
+ /**
+ * Get the startSequenceNumber.
+ *
+ * @return Return the startSequenceNumber.
+ */
+ public int getStartSequenceNumber() {
+ return startSequenceNumber;
+ }
+
+ /**
+ * Set the startSequenceNumber.
+ *
+ * @param startSequenceNumber.
+ */
+ public void setStartSequenceNumber(final int startSequenceNumber) {
+ this.startSequenceNumber = startSequenceNumber;
+ }
+
+ /**
+ * Get the sequenceNumber.
+ *
+ * @return Return the sequenceNumber.
+ */
+ public Integer getSequenceNumber() {
+ return sequenceNumber;
+ }
+
+ /**
+ * Set the sequenceNumber.
+ *
+ * @param sequenceNumber The sequenceNumber to set.
+ */
+ public void setSequenceNumber(final Integer sequenceNumber) {
+ this.sequenceNumber = sequenceNumber;
+ }
+
+ @Override
+ public String toString() {
+ StringBuilder result = new StringBuilder();
+ result.append("[transactionId: ");
+ result.append(transactionId);
+ result.append(" status: ");
+ result.append(status.name());
+ result.append(" read Size: ");
+ result.append(readSet.size());
+ result.append(" write Size: ");
+ result.append(writeSet.size());
+ result.append(" startSQ: ");
+ result.append(startSequenceNumber);
+ if (sequenceNumber != null) {
+ result.append(" commitedSQ:");
+ result.append(sequenceNumber);
+ }
+ result.append("]");
+
+ return result.toString();
+ }
+
+ /**
+ * Get the transactionId.
+ *
+ * @return Return the transactionId.
+ */
+ public long getTransactionId() {
+ return transactionId;
+ }
+
+ /**
+ * Get the startSequenceId.
+ *
+ * @return Return the startSequenceId.
+ */
+ public long getHLogStartSequenceId() {
+ return hLogStartSequenceId;
+ }
+
+ /**
+ * Set the hasScan.
+ *
+ * @param hasScan The hasScan to set.
+ */
+ public void setHasScan(final boolean hasScan) {
+ this.hasScan = hasScan;
+ }
+
+}
Added: hadoop/hbase/trunk/src/java/org/apache/hadoop/hbase/regionserver/transactional/TransactionalHLogManager.java
URL: http://svn.apache.org/viewvc/hadoop/hbase/trunk/src/java/org/apache/hadoop/hbase/regionserver/transactional/TransactionalHLogManager.java?rev=686650&view=auto
==============================================================================
--- hadoop/hbase/trunk/src/java/org/apache/hadoop/hbase/regionserver/transactional/TransactionalHLogManager.java (added)
+++ hadoop/hbase/trunk/src/java/org/apache/hadoop/hbase/regionserver/transactional/TransactionalHLogManager.java Sun Aug 17 15:03:42 2008
@@ -0,0 +1,260 @@
+/**
+ * Copyright 2008 The Apache Software Foundation
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.regionserver.transactional;
+
+import java.io.IOException;
+import java.io.UnsupportedEncodingException;
+import java.util.HashSet;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.SortedMap;
+import java.util.TreeMap;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hbase.HBaseConfiguration;
+import org.apache.hadoop.hbase.HConstants;
+import org.apache.hadoop.hbase.HRegionInfo;
+import org.apache.hadoop.hbase.io.BatchOperation;
+import org.apache.hadoop.hbase.io.BatchUpdate;
+import org.apache.hadoop.hbase.regionserver.HLog;
+import org.apache.hadoop.hbase.regionserver.HLogEdit;
+import org.apache.hadoop.hbase.regionserver.HLogKey;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.hadoop.io.SequenceFile;
+import org.apache.hadoop.util.Progressable;
+
+/**
+ * Responsible for writing and reading (recovering) transactional information
+ * to/from the HLog.
+ *
+ *
+ */
+class TransactionalHLogManager {
+
+ private static final Log LOG = LogFactory
+ .getLog(TransactionalHLogManager.class);
+
+ private final HLog hlog;
+ private final FileSystem fileSystem;
+ private final HRegionInfo regionInfo;
+ private final HBaseConfiguration conf;
+
+ public TransactionalHLogManager(final TransactionalRegion region) {
+ this.hlog = region.getLog();
+ this.fileSystem = region.getFilesystem();
+ this.regionInfo = region.getRegionInfo();
+ this.conf = region.getConf();
+ }
+
+ // For Testing
+ TransactionalHLogManager(final HLog hlog, final FileSystem fileSystem,
+ final HRegionInfo regionInfo, final HBaseConfiguration conf) {
+ this.hlog = hlog;
+ this.fileSystem = fileSystem;
+ this.regionInfo = regionInfo;
+ this.conf = conf;
+ }
+
+ public void writeStartToLog(final long transactionId) throws IOException {
+ HLogEdit logEdit;
+ logEdit = new HLogEdit(transactionId, HLogEdit.TransactionalOperation.START);
+
+ hlog.append(regionInfo, logEdit);
+ }
+
+ public void writeUpdateToLog(final long transactionId,
+ final BatchUpdate update) throws IOException {
+
+ long commitTime = update.getTimestamp() == HConstants.LATEST_TIMESTAMP ? System
+ .currentTimeMillis()
+ : update.getTimestamp();
+
+ for (BatchOperation op : update) {
+ HLogEdit logEdit = new HLogEdit(transactionId, op, commitTime);
+ hlog.append(regionInfo, update.getRow(), logEdit);
+ }
+ }
+
+ public void writeCommitToLog(final long transactionId) throws IOException {
+ HLogEdit logEdit;
+ logEdit = new HLogEdit(transactionId,
+ HLogEdit.TransactionalOperation.COMMIT);
+
+ hlog.append(regionInfo, logEdit);
+ }
+
+ public void writeAbortToLog(final long transactionId) throws IOException {
+ HLogEdit logEdit;
+ logEdit = new HLogEdit(transactionId, HLogEdit.TransactionalOperation.ABORT);
+
+ hlog.append(regionInfo, logEdit);
+ }
+
+ public Map<Long, List<BatchUpdate>> getCommitsFromLog(
+ final Path reconstructionLog, final long maxSeqID,
+ final Progressable reporter) throws UnsupportedEncodingException,
+ IOException {
+ if (reconstructionLog == null || !fileSystem.exists(reconstructionLog)) {
+ // Nothing to do.
+ return null;
+ }
+ // Check its not empty.
+ FileStatus[] stats = fileSystem.listStatus(reconstructionLog);
+ if (stats == null || stats.length == 0) {
+ LOG.warn("Passed reconstruction log " + reconstructionLog
+ + " is zero-length");
+ return null;
+ }
+
+ SortedMap<Long, List<BatchUpdate>> pendingTransactionsById = new TreeMap<Long, List<BatchUpdate>>();
+ SortedMap<Long, List<BatchUpdate>> commitedTransactionsById = new TreeMap<Long, List<BatchUpdate>>();
+ Set<Long> abortedTransactions = new HashSet<Long>();
+
+ SequenceFile.Reader logReader = new SequenceFile.Reader(fileSystem,
+ reconstructionLog, conf);
+
+ try {
+ HLogKey key = new HLogKey();
+ HLogEdit val = new HLogEdit();
+ long skippedEdits = 0;
+ long totalEdits = 0;
+ long startCount = 0;
+ long writeCount = 0;
+ long abortCount = 0;
+ long commitCount = 0;
+ // How many edits to apply before we send a progress report.
+ int reportInterval = conf.getInt("hbase.hstore.report.interval.edits",
+ 2000);
+ while (logReader.next(key, val)) {
+ LOG.debug("Processing edit: key: " + key.toString() + " val: "
+ + val.toString());
+ if (key.getLogSeqNum() < maxSeqID) {
+ skippedEdits++;
+ continue;
+ }
+
+ // Check this edit is for me.
+ byte[] column = val.getColumn();
+ Long transactionId = val.getTransactionId();
+ if (!val.isTransactionEntry() || HLog.isMetaColumn(column)
+ || !Bytes.equals(key.getRegionName(), regionInfo.getRegionName())) {
+ continue;
+ }
+
+ List<BatchUpdate> updates = pendingTransactionsById.get(transactionId);
+ switch (val.getOperation()) {
+ case START:
+ if (updates != null || abortedTransactions.contains(transactionId)
+ || commitedTransactionsById.containsKey(transactionId)) {
+ LOG.error("Processing start for transaction: " + transactionId
+ + ", but have already seen start message");
+ throw new IOException("Corrupted transaction log");
+ }
+ updates = new LinkedList<BatchUpdate>();
+ pendingTransactionsById.put(transactionId, updates);
+ startCount++;
+ break;
+
+ case WRITE:
+ if (updates == null) {
+ LOG.error("Processing edit for transaction: " + transactionId
+ + ", but have not seen start message");
+ throw new IOException("Corrupted transaction log");
+ }
+
+ BatchUpdate tranUpdate = new BatchUpdate(key.getRow());
+ if (val.getVal() != null) {
+ tranUpdate.put(val.getColumn(), val.getVal());
+ } else {
+ tranUpdate.delete(val.getColumn());
+ }
+ updates.add(tranUpdate);
+ writeCount++;
+ break;
+
+ case ABORT:
+ if (updates == null) {
+ LOG.error("Processing abort for transaction: " + transactionId
+ + ", but have not seen start message");
+ throw new IOException("Corrupted transaction log");
+ }
+ abortedTransactions.add(transactionId);
+ pendingTransactionsById.remove(transactionId);
+ abortCount++;
+ break;
+
+ case COMMIT:
+ if (updates == null) {
+ LOG.error("Processing commit for transaction: " + transactionId
+ + ", but have not seen start message");
+ throw new IOException("Corrupted transaction log");
+ }
+ if (abortedTransactions.contains(transactionId)) {
+ LOG.error("Processing commit for transaction: " + transactionId
+ + ", but also have abort message");
+ throw new IOException("Corrupted transaction log");
+ }
+ if (updates.size() == 0) {
+ LOG
+ .warn("Transaciton " + transactionId
+ + " has no writes in log. ");
+ }
+ if (commitedTransactionsById.containsKey(transactionId)) {
+ LOG.error("Processing commit for transaction: " + transactionId
+ + ", but have already commited transaction with that id");
+ throw new IOException("Corrupted transaction log");
+ }
+ pendingTransactionsById.remove(transactionId);
+ commitedTransactionsById.put(transactionId, updates);
+ commitCount++;
+ }
+ totalEdits++;
+
+ if (reporter != null && (totalEdits % reportInterval) == 0) {
+ reporter.progress();
+ }
+ }
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Read " + totalEdits + " tranasctional operations (skipped "
+ + skippedEdits + " because sequence id <= " + maxSeqID + "): "
+ + startCount + " starts, " + writeCount + " writes, " + abortCount
+ + " aborts, and " + commitCount + " commits.");
+ }
+ } finally {
+ logReader.close();
+ }
+
+ if (pendingTransactionsById.size() > 0) {
+ LOG
+ .info("Region log has "
+ + pendingTransactionsById.size()
+ + " unfinished transactions. Going to the transaction log to resolve");
+ throw new RuntimeException("Transaction log not yet implemented");
+ }
+
+ return commitedTransactionsById;
+ }
+}
Added: hadoop/hbase/trunk/src/java/org/apache/hadoop/hbase/regionserver/transactional/TransactionalRegion.java
URL: http://svn.apache.org/viewvc/hadoop/hbase/trunk/src/java/org/apache/hadoop/hbase/regionserver/transactional/TransactionalRegion.java?rev=686650&view=auto
==============================================================================
--- hadoop/hbase/trunk/src/java/org/apache/hadoop/hbase/regionserver/transactional/TransactionalRegion.java (added)
+++ hadoop/hbase/trunk/src/java/org/apache/hadoop/hbase/regionserver/transactional/TransactionalRegion.java Sun Aug 17 15:03:42 2008
@@ -0,0 +1,673 @@
+/**
+ * Copyright 2008 The Apache Software Foundation
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.regionserver.transactional;
+
+import java.io.IOException;
+import java.io.UnsupportedEncodingException;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.SortedMap;
+import java.util.TreeMap;
+import java.util.Map.Entry;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hbase.HBaseConfiguration;
+import org.apache.hadoop.hbase.HRegionInfo;
+import org.apache.hadoop.hbase.HStoreKey;
+import org.apache.hadoop.hbase.LeaseException;
+import org.apache.hadoop.hbase.LeaseListener;
+import org.apache.hadoop.hbase.Leases;
+import org.apache.hadoop.hbase.Leases.LeaseStillHeldException;
+import org.apache.hadoop.hbase.client.transactional.UnknownTransactionException;
+import org.apache.hadoop.hbase.filter.RowFilterInterface;
+import org.apache.hadoop.hbase.io.BatchUpdate;
+import org.apache.hadoop.hbase.io.Cell;
+import org.apache.hadoop.hbase.regionserver.FlushRequester;
+import org.apache.hadoop.hbase.regionserver.HLog;
+import org.apache.hadoop.hbase.regionserver.HRegion;
+import org.apache.hadoop.hbase.regionserver.HStore;
+import org.apache.hadoop.hbase.regionserver.InternalScanner;
+import org.apache.hadoop.hbase.regionserver.transactional.TransactionState.Status;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.hadoop.util.Progressable;
+
+/**
+ * Regionserver which provides transactional support for atomic transactions.
+ * This is achieved with optimistic concurrency control (see
+ * http://www.seas.upenn.edu/~zives/cis650/papers/opt-cc.pdf). We keep track
+ * read and write sets for each transaction, and hold off on processing the
+ * writes. To decide to commit a transaction we check its read sets with all
+ * transactions that have committed while it was running for overlaps.
+ * <p>
+ * Because transactions can span multiple regions, all regions must agree to
+ * commit a transactions. The client side of this commit protocol is encoded in
+ * org.apache.hadoop.hbase.client.transactional.TransactionManger
+ * <p>
+ * In the event of an failure of the client mid-commit, (after we voted yes), we
+ * will have to consult the transaction log to determine the final decision of
+ * the transaction. This is not yet implemented.
+ */
+class TransactionalRegion extends HRegion {
+
+ private static final String LEASE_TIME = "hbase.transaction.leaseTime";
+ private static final int DEFAULT_LEASE_TIME = 60 * 1000;
+ private static final int LEASE_CHECK_FREQUENCY = 1000;
+
+ private static final String OLD_TRANSACTION_FLUSH = "hbase.transaction.flush";
+ private static final int DEFAULT_OLD_TRANSACTION_FLUSH = 100; // Do a flush if we have this many old transactions..
+
+
+ private static final Log LOG = LogFactory.getLog(TransactionalRegion.class);
+
+ // Collection of active transactions (PENDING) keyed by id.
+ private Map<String, TransactionState> transactionsById = new HashMap<String, TransactionState>();
+
+ // Map of recent transactions that are COMMIT_PENDING or COMMITED keyed by
+ // their sequence number
+ private SortedMap<Integer, TransactionState> commitedTransactionsBySequenceNumber = Collections
+ .synchronizedSortedMap(new TreeMap<Integer, TransactionState>());
+
+ // Collection of transactions that are COMMIT_PENDING
+ private Set<TransactionState> commitPendingTransactions = Collections
+ .synchronizedSet(new HashSet<TransactionState>());
+
+ private final Leases transactionLeases;
+ private AtomicInteger nextSequenceId = new AtomicInteger(0);
+ private Object commitCheckLock = new Object();
+ private TransactionalHLogManager logManager;
+ private final int oldTransactionFlushTrigger;
+
+ public TransactionalRegion(final Path basedir, final HLog log,
+ final FileSystem fs, final HBaseConfiguration conf,
+ final HRegionInfo regionInfo, final FlushRequester flushListener) {
+ super(basedir, log, fs, conf, regionInfo, flushListener);
+ transactionLeases = new Leases(conf.getInt(LEASE_TIME, DEFAULT_LEASE_TIME),
+ LEASE_CHECK_FREQUENCY);
+ logManager = new TransactionalHLogManager(this);
+ oldTransactionFlushTrigger = conf.getInt(OLD_TRANSACTION_FLUSH, DEFAULT_OLD_TRANSACTION_FLUSH);
+ }
+
+ @Override
+ protected void doReconstructionLog(final Path oldLogFile,
+ final long minSeqId, final long maxSeqId, final Progressable reporter)
+ throws UnsupportedEncodingException, IOException {
+ super.doReconstructionLog(oldLogFile, minSeqId, maxSeqId, reporter);
+
+ Map<Long, List<BatchUpdate>> commitedTransactionsById = logManager
+ .getCommitsFromLog(oldLogFile, minSeqId, reporter);
+
+ if (commitedTransactionsById != null && commitedTransactionsById.size() > 0) {
+ LOG.debug("found " + commitedTransactionsById.size()
+ + " COMMITED transactions");
+
+ for (Entry<Long, List<BatchUpdate>> entry : commitedTransactionsById
+ .entrySet()) {
+ LOG.debug("Writing " + entry.getValue().size()
+ + " updates for transaction " + entry.getKey());
+ for (BatchUpdate b : entry.getValue()) {
+ super.batchUpdate(b, true); // These are walled so they live forever
+ }
+ }
+
+ // LOG.debug("Flushing cache"); // We must trigger a cache flush,
+ // otherwise
+ // we will would ignore the log on subsequent failure
+ // if (!super.flushcache()) {
+ // LOG.warn("Did not flush cache");
+ // }
+ }
+ }
+
+ /**
+ * We need to make sure that we don't complete a cache flush between running
+ * transactions. If we did, then we would not find all log messages needed to
+ * restore the transaction, as some of them would be before the last
+ * "complete" flush id.
+ */
+ @Override
+ protected long getCompleteCacheFlushSequenceId(final long currentSequenceId) {
+ long minPendingStartSequenceId = currentSequenceId;
+ for (TransactionState transactionState : transactionsById.values()) {
+ minPendingStartSequenceId = Math.min(minPendingStartSequenceId,
+ transactionState.getHLogStartSequenceId());
+ }
+ return minPendingStartSequenceId;
+ }
+
+ public void beginTransaction(final long transactionId) throws IOException {
+ String key = String.valueOf(transactionId);
+ if (transactionsById.get(key) != null) {
+ TransactionState alias = getTransactionState(transactionId);
+ if (alias != null) {
+ alias.setStatus(Status.ABORTED);
+ retireTransaction(alias);
+ }
+ throw new IOException("Already exiting transaction id: " + key);
+ }
+
+ TransactionState state = new TransactionState(transactionId, super.getLog()
+ .getSequenceNumber());
+
+ // Order is important here
+ for (TransactionState commitPending : commitPendingTransactions) {
+ state.addTransactionToCheck(commitPending);
+ }
+ state.setStartSequenceNumber(nextSequenceId.get());
+
+ transactionsById.put(String.valueOf(key), state);
+ try {
+ transactionLeases.createLease(key, new TransactionLeaseListener(key));
+ } catch (LeaseStillHeldException e) {
+ throw new RuntimeException(e);
+ }
+ LOG.debug("Begining transaction " + key + " in region "
+ + super.getRegionInfo().getRegionNameAsString());
+ logManager.writeStartToLog(transactionId);
+
+ maybeTriggerOldTransactionFlush();
+ }
+
+ /**
+ * Fetch a single data item.
+ *
+ * @param transactionId
+ * @param row
+ * @param column
+ * @return column value
+ * @throws IOException
+ */
+ public Cell get(final long transactionId, final byte[] row,
+ final byte[] column) throws IOException {
+ Cell[] results = get(transactionId, row, column, 1);
+ return (results == null || results.length == 0) ? null : results[0];
+ }
+
+ /**
+ * Fetch multiple versions of a single data item
+ *
+ * @param transactionId
+ * @param row
+ * @param column
+ * @param numVersions
+ * @return array of values one element per version
+ * @throws IOException
+ */
+ public Cell[] get(final long transactionId, final byte[] row,
+ final byte[] column, final int numVersions) throws IOException {
+ return get(transactionId, row, column, Long.MAX_VALUE, numVersions);
+ }
+
+ /**
+ * Fetch multiple versions of a single data item, with timestamp.
+ *
+ * @param transactionId
+ * @param row
+ * @param column
+ * @param timestamp
+ * @param numVersions
+ * @return array of values one element per version that matches the timestamp
+ * @throws IOException
+ */
+ public Cell[] get(final long transactionId, final byte[] row,
+ final byte[] column, final long timestamp, final int numVersions)
+ throws IOException {
+ TransactionState state = getTransactionState(transactionId);
+
+ state.addRead(row);
+
+ Cell[] localCells = state.localGet(row, column, timestamp);
+
+ if (localCells != null && localCells.length > 0) {
+ LOG
+ .trace("Transactional get of something we've written in the same transaction "
+ + transactionId);
+ LOG.trace("row: " + Bytes.toString(row));
+ LOG.trace("col: " + Bytes.toString(column));
+ LOG.trace("numVersions: " + numVersions);
+ for (Cell cell : localCells) {
+ LOG.trace("cell: " + Bytes.toString(cell.getValue()));
+ }
+
+ if (numVersions > 1) {
+ Cell[] globalCells = get(row, column, timestamp, numVersions - 1);
+ Cell[] result = new Cell[globalCells.length + localCells.length];
+ System.arraycopy(localCells, 0, result, 0, localCells.length);
+ System.arraycopy(globalCells, 0, result, localCells.length,
+ globalCells.length);
+ return result;
+ }
+ return localCells;
+ }
+
+ return get(row, column, timestamp, numVersions);
+ }
+
+ /**
+ * Fetch all the columns for the indicated row at a specified timestamp.
+ * Returns a TreeMap that maps column names to values.
+ *
+ * @param transactionId
+ * @param row
+ * @param columns Array of columns you'd like to retrieve. When null, get all.
+ * @param ts
+ * @return Map<columnName, Cell> values
+ * @throws IOException
+ */
+ public Map<byte[], Cell> getFull(final long transactionId, final byte[] row,
+ final Set<byte[]> columns, final long ts) throws IOException {
+ TransactionState state = getTransactionState(transactionId);
+
+ state.addRead(row);
+
+ Map<byte[], Cell> localCells = state.localGetFull(row, columns, ts);
+
+ if (localCells != null && localCells.size() > 0) {
+ LOG
+ .trace("Transactional get of something we've written in the same transaction "
+ + transactionId);
+ LOG.trace("row: " + Bytes.toString(row));
+ for (Entry<byte[], Cell> entry : localCells.entrySet()) {
+ LOG.trace("col: " + Bytes.toString(entry.getKey()));
+ LOG.trace("cell: " + Bytes.toString(entry.getValue().getValue()));
+ }
+
+ Map<byte[], Cell> internalResults = getFull(row, columns, ts, null);
+ internalResults.putAll(localCells);
+ return internalResults;
+ }
+
+ return getFull(row, columns, ts, null);
+ }
+
+ /**
+ * Return an iterator that scans over the HRegion, returning the indicated
+ * columns for only the rows that match the data filter. This Iterator must be
+ * closed by the caller.
+ *
+ * @param transactionId
+ * @param cols columns to scan. If column name is a column family, all columns
+ * of the specified column family are returned. Its also possible to pass a
+ * regex in the column qualifier. A column qualifier is judged to be a regex
+ * if it contains at least one of the following characters:
+ * <code>\+|^&*$[]]}{)(</code>.
+ * @param firstRow row which is the starting point of the scan
+ * @param timestamp only return rows whose timestamp is <= this value
+ * @param filter row filter
+ * @return InternalScanner
+ * @throws IOException
+ */
+ public InternalScanner getScanner(final long transactionId,
+ final byte[][] cols, final byte[] firstRow, final long timestamp,
+ final RowFilterInterface filter) throws IOException {
+ return new ScannerWrapper(transactionId, super.getScanner(cols, firstRow,
+ timestamp, filter));
+ }
+
+ /**
+ * Add a write to the transaction. Does not get applied until commit process.
+ *
+ * @param b
+ * @throws IOException
+ */
+ public void batchUpdate(final long transactionId, final BatchUpdate b)
+ throws IOException {
+ TransactionState state = getTransactionState(transactionId);
+ state.addWrite(b);
+ logManager.writeUpdateToLog(transactionId, b);
+ }
+
+ /**
+ * Add a delete to the transaction. Does not get applied until commit process.
+ * FIXME, not sure about this approach
+ *
+ * @param b
+ * @throws IOException
+ */
+ public void deleteAll(final long transactionId, final byte[] row,
+ final long timestamp) throws IOException {
+ TransactionState state = getTransactionState(transactionId);
+ long now = System.currentTimeMillis();
+
+ for (HStore store : super.stores.values()) {
+ List<HStoreKey> keys = store.getKeys(new HStoreKey(row, timestamp),
+ ALL_VERSIONS, now);
+ BatchUpdate deleteUpdate = new BatchUpdate(row, timestamp);
+
+ for (HStoreKey key : keys) {
+ deleteUpdate.delete(key.getColumn());
+ }
+
+ state.addWrite(deleteUpdate);
+ logManager.writeUpdateToLog(transactionId, deleteUpdate);
+
+ }
+
+ }
+
+ public boolean commitRequest(final long transactionId) throws IOException {
+ synchronized (commitCheckLock) {
+ TransactionState state = getTransactionState(transactionId);
+ if (state == null) {
+ return false;
+ }
+
+ if (hasConflict(state)) {
+ state.setStatus(Status.ABORTED);
+ retireTransaction(state);
+ return false;
+ }
+
+ // No conflicts, we can commit.
+ LOG.trace("No conflicts for transaction " + transactionId
+ + " found in region " + super.getRegionInfo().getRegionNameAsString()
+ + ". Voting for commit");
+ state.setStatus(Status.COMMIT_PENDING);
+
+ // If there are writes we must keep record off the transaction
+ if (state.getWriteSet().size() > 0) {
+ // Order is important
+ commitPendingTransactions.add(state);
+ state.setSequenceNumber(nextSequenceId.getAndIncrement());
+ commitedTransactionsBySequenceNumber.put(state.getSequenceNumber(),
+ state);
+ }
+
+ return true;
+ }
+ }
+
+ private boolean hasConflict(final TransactionState state) {
+ // Check transactions that were committed while we were running
+ for (int i = state.getStartSequenceNumber(); i < nextSequenceId.get(); i++) {
+ TransactionState other = commitedTransactionsBySequenceNumber.get(i);
+ if (other == null) {
+ continue;
+ }
+ state.addTransactionToCheck(other);
+ }
+
+ return state.hasConflict();
+ }
+
+ /**
+ * Commit the transaction.
+ *
+ * @param transactionId
+ * @return
+ * @throws IOException
+ */
+ public void commit(final long transactionId) throws IOException {
+ TransactionState state;
+ try {
+ state = getTransactionState(transactionId);
+ } catch (UnknownTransactionException e) {
+ LOG.fatal("Asked to commit unknown transaction: " + transactionId
+ + " in region " + super.getRegionInfo().getRegionNameAsString());
+ // FIXME Write to the transaction log that this transaction was corrupted
+ throw e;
+ }
+
+ if (!state.getStatus().equals(Status.COMMIT_PENDING)) {
+ LOG.fatal("Asked to commit a non pending transaction");
+ // FIXME Write to the transaction log that this transaction was corrupted
+ throw new IOException("commit failure");
+ }
+
+ commit(state);
+ }
+
+ /**
+ * Commit the transaction.
+ *
+ * @param transactionId
+ * @return
+ * @throws IOException
+ */
+ public void abort(final long transactionId) throws IOException {
+ TransactionState state;
+ try {
+ state = getTransactionState(transactionId);
+ } catch (UnknownTransactionException e) {
+ LOG.error("Asked to abort unknown transaction: " + transactionId);
+ return;
+ }
+
+ state.setStatus(Status.ABORTED);
+
+ if (state.getWriteSet().size() > 0) {
+ logManager.writeAbortToLog(state.getTransactionId());
+ }
+
+ // Following removes needed if we have voted
+ if (state.getSequenceNumber() != null) {
+ commitedTransactionsBySequenceNumber.remove(state.getSequenceNumber());
+ }
+ commitPendingTransactions.remove(state);
+
+ retireTransaction(state);
+ }
+
+ private void commit(final TransactionState state) throws IOException {
+
+ LOG.debug("Commiting transaction: " + state.toString() + " to "
+ + super.getRegionInfo().getRegionNameAsString());
+
+ if (state.getWriteSet().size() > 0) {
+ logManager.writeCommitToLog(state.getTransactionId());
+ }
+
+ for (BatchUpdate update : state.getWriteSet()) {
+ super.batchUpdate(update, false); // Don't need to WAL these
+ // FIME, maybe should be walled so we don't need to look so far back.
+ }
+
+ state.setStatus(Status.COMMITED);
+ if (state.getWriteSet().size() > 0
+ && !commitPendingTransactions.remove(state)) {
+ LOG
+ .fatal("Commiting a non-query transaction that is not in commitPendingTransactions");
+ throw new IOException("commit failure"); // FIXME, how to handle?
+ }
+ retireTransaction(state);
+ }
+
+ // Cancel leases, and removed from lease lookup. This transaction may still
+ // live in commitedTransactionsBySequenceNumber and commitPendingTransactions
+ private void retireTransaction(final TransactionState state) {
+ String key = String.valueOf(state.getTransactionId());
+ try {
+ transactionLeases.cancelLease(key);
+ } catch (LeaseException e) {
+ // Ignore
+ }
+
+ transactionsById.remove(key);
+ }
+
+ private TransactionState getTransactionState(final long transactionId)
+ throws UnknownTransactionException {
+ String key = String.valueOf(transactionId);
+ TransactionState state = null;
+
+ state = transactionsById.get(key);
+
+ if (state == null) {
+ LOG.trace("Unknown transaction: " + key);
+ throw new UnknownTransactionException(key);
+ }
+
+ try {
+ transactionLeases.renewLease(key);
+ } catch (LeaseException e) {
+ throw new RuntimeException(e);
+ }
+
+ return state;
+ }
+
+ private void maybeTriggerOldTransactionFlush() {
+ if (commitedTransactionsBySequenceNumber.size() > oldTransactionFlushTrigger) {
+ removeUnNeededCommitedTransactions();
+ }
+ }
+
+ /**
+ * Cleanup references to committed transactions that are no longer needed.
+ *
+ */
+ synchronized void removeUnNeededCommitedTransactions() {
+ Integer minStartSeqNumber = getMinStartSequenceNumber();
+ if (minStartSeqNumber == null) {
+ minStartSeqNumber = Integer.MAX_VALUE; // Remove all
+ }
+
+ int numRemoved = 0;
+ // Copy list to avoid conc update exception
+ for (Entry<Integer, TransactionState> entry : new LinkedList<Entry<Integer, TransactionState>>(
+ commitedTransactionsBySequenceNumber.entrySet())) {
+ if (entry.getKey() >= minStartSeqNumber) {
+ break;
+ }
+ numRemoved = numRemoved
+ + (commitedTransactionsBySequenceNumber.remove(entry.getKey()) == null ? 0
+ : 1);
+ numRemoved++;
+ }
+
+ if (numRemoved > 0) {
+ LOG.debug("Removed " + numRemoved
+ + " commited transactions with sequence lower than "
+ + minStartSeqNumber + ". Still have "
+ + commitedTransactionsBySequenceNumber.size() + " left");
+ } else if (commitedTransactionsBySequenceNumber.size() > 0) {
+ LOG.debug("Could not remove any transactions, and still have "
+ + commitedTransactionsBySequenceNumber.size() + " left");
+ }
+ }
+
+ private Integer getMinStartSequenceNumber() {
+ Integer min = null;
+ for (TransactionState transactionState : transactionsById.values()) {
+ if (min == null || transactionState.getStartSequenceNumber() < min) {
+ min = transactionState.getStartSequenceNumber();
+ }
+ }
+ return min;
+ }
+
+ // TODO, resolve from the global transaction log
+ @SuppressWarnings("unused")
+ private void resolveTransactionFromLog(final long transactionId) {
+ throw new RuntimeException("Globaql transaction log is not Implemented");
+ }
+
+ private class TransactionLeaseListener implements LeaseListener {
+ private final String transactionName;
+
+ TransactionLeaseListener(final String n) {
+ this.transactionName = n;
+ }
+
+ /** {@inheritDoc} */
+ public void leaseExpired() {
+ LOG.info("Transaction " + this.transactionName + " lease expired");
+ TransactionState s = null;
+ synchronized (transactionsById) {
+ s = transactionsById.remove(transactionName);
+ }
+ if (s == null) {
+ LOG.warn("Unknown transaction expired " + this.transactionName);
+ return;
+ }
+
+ switch (s.getStatus()) {
+ case PENDING:
+ s.setStatus(Status.ABORTED); // Other transactions may have a ref
+ break;
+ case COMMIT_PENDING:
+ LOG.info("Transaction " + s.getTransactionId()
+ + " expired in COMMIT_PENDING state");
+ LOG.info("Checking transaction status in transaction log");
+ resolveTransactionFromLog(s.getTransactionId());
+ break;
+ default:
+ LOG.warn("Unexpected status on expired lease");
+ }
+ }
+ }
+
+ /** Wrapper which keeps track of rows returned by scanner. */
+ private class ScannerWrapper implements InternalScanner {
+ private long transactionId;
+ private InternalScanner scanner;
+
+ public ScannerWrapper(final long transactionId,
+ final InternalScanner scanner) {
+ this.transactionId = transactionId;
+ this.scanner = scanner;
+ }
+
+ public void close() throws IOException {
+ scanner.close();
+ }
+
+ public boolean isMultipleMatchScanner() {
+ return scanner.isMultipleMatchScanner();
+ }
+
+ public boolean isWildcardScanner() {
+ return scanner.isWildcardScanner();
+ }
+
+ public boolean next(final HStoreKey key,
+ final SortedMap<byte[], Cell> results) throws IOException {
+ boolean result = scanner.next(key, results);
+ TransactionState state = getTransactionState(transactionId);
+ state.setHasScan(true);
+ // FIXME, not using row, just claiming read over the whole region. We are
+ // being very conservative on scans to avoid phantom reads.
+ state.addRead(key.getRow());
+
+ if (result) {
+ Map<byte[], Cell> localWrites = state.localGetFull(key.getRow(), null,
+ Integer.MAX_VALUE);
+ if (localWrites != null) {
+ LOG
+ .info("Scanning over row that has been writen to "
+ + transactionId);
+ for (Entry<byte[], Cell> entry : localWrites.entrySet()) {
+ results.put(entry.getKey(), entry.getValue());
+ }
+ }
+ }
+
+ return result;
+ }
+ }
+}
Added: hadoop/hbase/trunk/src/java/org/apache/hadoop/hbase/regionserver/transactional/TransactionalRegionServer.java
URL: http://svn.apache.org/viewvc/hadoop/hbase/trunk/src/java/org/apache/hadoop/hbase/regionserver/transactional/TransactionalRegionServer.java?rev=686650&view=auto
==============================================================================
--- hadoop/hbase/trunk/src/java/org/apache/hadoop/hbase/regionserver/transactional/TransactionalRegionServer.java (added)
+++ hadoop/hbase/trunk/src/java/org/apache/hadoop/hbase/regionserver/transactional/TransactionalRegionServer.java Sun Aug 17 15:03:42 2008
@@ -0,0 +1,296 @@
+/**
+ * Copyright 2008 The Apache Software Foundation
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.regionserver.transactional;
+
+import java.io.IOException;
+import java.lang.Thread.UncaughtExceptionHandler;
+import java.util.Arrays;
+import java.util.Map;
+import java.util.Set;
+import java.util.TreeSet;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.hbase.HBaseConfiguration;
+import org.apache.hadoop.hbase.HConstants;
+import org.apache.hadoop.hbase.HRegionInfo;
+import org.apache.hadoop.hbase.HServerAddress;
+import org.apache.hadoop.hbase.HTableDescriptor;
+import org.apache.hadoop.hbase.NotServingRegionException;
+import org.apache.hadoop.hbase.RemoteExceptionHandler;
+import org.apache.hadoop.hbase.filter.RowFilterInterface;
+import org.apache.hadoop.hbase.io.BatchUpdate;
+import org.apache.hadoop.hbase.io.Cell;
+import org.apache.hadoop.hbase.io.HbaseMapWritable;
+import org.apache.hadoop.hbase.io.RowResult;
+import org.apache.hadoop.hbase.ipc.TransactionalRegionInterface;
+import org.apache.hadoop.hbase.regionserver.HRegion;
+import org.apache.hadoop.hbase.regionserver.HRegionServer;
+import org.apache.hadoop.hbase.regionserver.InternalScanner;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.hadoop.hbase.util.Threads;
+import org.apache.hadoop.io.MapWritable;
+import org.apache.hadoop.util.Progressable;
+
+/**
+ * RegionServer with support for transactions. Transactional logic is at the
+ * region level, so we mostly just delegate to the appropriate
+ * TransactionalRegion.
+ */
+public class TransactionalRegionServer extends HRegionServer implements
+ TransactionalRegionInterface {
+ static final Log LOG = LogFactory.getLog(TransactionalRegionServer.class);
+
+ private final CleanOldTransactionsChore cleanOldTransactionsThread;
+
+ public TransactionalRegionServer(final HBaseConfiguration conf)
+ throws IOException {
+ this(new HServerAddress(conf.get(REGIONSERVER_ADDRESS,
+ DEFAULT_REGIONSERVER_ADDRESS)), conf);
+ }
+
+ public TransactionalRegionServer(final HServerAddress address,
+ final HBaseConfiguration conf) throws IOException {
+ super(address, conf);
+ cleanOldTransactionsThread = new CleanOldTransactionsChore(this,
+ super.stopRequested);
+ }
+
+ /** {@inheritDoc} */
+ @Override
+ public long getProtocolVersion(final String protocol, final long clientVersion)
+ throws IOException {
+ if (protocol.equals(TransactionalRegionInterface.class.getName())) {
+ return TransactionalRegionInterface.versionID;
+ }
+ return super.getProtocolVersion(protocol, clientVersion);
+ }
+
+ @Override
+ protected void init(final MapWritable c) throws IOException {
+ super.init(c);
+ String n = Thread.currentThread().getName();
+ UncaughtExceptionHandler handler = new UncaughtExceptionHandler() {
+ public void uncaughtException(final Thread t, final Throwable e) {
+ abort();
+ LOG.fatal("Set stop flag in " + t.getName(), e);
+ }
+ };
+ Threads.setDaemonThreadRunning(this.cleanOldTransactionsThread, n
+ + ".oldTransactionCleaner", handler);
+
+ }
+
+ @Override
+ protected HRegion instantiateRegion(final HRegionInfo regionInfo)
+ throws IOException {
+ HRegion r = new TransactionalRegion(HTableDescriptor.getTableDir(super
+ .getRootDir(), regionInfo.getTableDesc().getName()), super.log, super
+ .getFileSystem(), super.conf, regionInfo, super.getFlushRequester());
+ r.initialize(null, new Progressable() {
+ public void progress() {
+ addProcessingMessage(regionInfo);
+ }
+ });
+ return r;
+ }
+
+ protected TransactionalRegion getTransactionalRegion(final byte[] regionName)
+ throws NotServingRegionException {
+ return (TransactionalRegion) super.getRegion(regionName);
+ }
+
+ public void abort(final byte[] regionName, final long transactionId)
+ throws IOException {
+ checkOpen();
+ super.getRequestCount().incrementAndGet();
+ try {
+ getTransactionalRegion(regionName).abort(transactionId);
+ } catch (IOException e) {
+ checkFileSystem();
+ throw e;
+ }
+ }
+
+ public void batchUpdate(final long transactionId, final byte[] regionName,
+ final BatchUpdate b) throws IOException {
+ checkOpen();
+ super.getRequestCount().incrementAndGet();
+ try {
+ getTransactionalRegion(regionName).batchUpdate(transactionId, b);
+ } catch (IOException e) {
+ checkFileSystem();
+ throw e;
+ }
+ }
+
+ public void commit(final byte[] regionName, final long transactionId)
+ throws IOException {
+ checkOpen();
+ super.getRequestCount().incrementAndGet();
+ try {
+ getTransactionalRegion(regionName).commit(transactionId);
+ } catch (IOException e) {
+ checkFileSystem();
+ throw e;
+ }
+ }
+
+ public boolean commitRequest(final byte[] regionName, final long transactionId)
+ throws IOException {
+ checkOpen();
+ super.getRequestCount().incrementAndGet();
+ try {
+ return getTransactionalRegion(regionName).commitRequest(transactionId);
+ } catch (IOException e) {
+ checkFileSystem();
+ throw e;
+ }
+ }
+
+ public Cell get(final long transactionId, final byte[] regionName,
+ final byte[] row, final byte[] column) throws IOException {
+ checkOpen();
+ super.getRequestCount().incrementAndGet();
+ try {
+ return getTransactionalRegion(regionName).get(transactionId, row, column);
+ } catch (IOException e) {
+ checkFileSystem();
+ throw e;
+ }
+ }
+
+ public Cell[] get(final long transactionId, final byte[] regionName,
+ final byte[] row, final byte[] column, final int numVersions)
+ throws IOException {
+ checkOpen();
+ super.getRequestCount().incrementAndGet();
+ try {
+ return getTransactionalRegion(regionName).get(transactionId, row, column,
+ numVersions);
+ } catch (IOException e) {
+ checkFileSystem();
+ throw e;
+ }
+ }
+
+ public Cell[] get(final long transactionId, final byte[] regionName,
+ final byte[] row, final byte[] column, final long timestamp,
+ final int numVersions) throws IOException {
+ checkOpen();
+ super.getRequestCount().incrementAndGet();
+ try {
+ return getTransactionalRegion(regionName).get(transactionId, row, column,
+ timestamp, numVersions);
+ } catch (IOException e) {
+ checkFileSystem();
+ throw e;
+ }
+ }
+
+ public RowResult getRow(final long transactionId, final byte[] regionName,
+ final byte[] row, final long ts) throws IOException {
+ return getRow(transactionId, regionName, row, null, ts);
+ }
+
+ public RowResult getRow(final long transactionId, final byte[] regionName,
+ final byte[] row, final byte[][] columns) throws IOException {
+ return getRow(transactionId, regionName, row, columns,
+ HConstants.LATEST_TIMESTAMP);
+ }
+
+ public RowResult getRow(final long transactionId, final byte[] regionName,
+ final byte[] row, final byte[][] columns, final long ts)
+ throws IOException {
+ checkOpen();
+ super.getRequestCount().incrementAndGet();
+ try {
+ // convert the columns array into a set so it's easy to check later.
+ Set<byte[]> columnSet = null;
+ if (columns != null) {
+ columnSet = new TreeSet<byte[]>(Bytes.BYTES_COMPARATOR);
+ columnSet.addAll(Arrays.asList(columns));
+ }
+
+ TransactionalRegion region = getTransactionalRegion(regionName);
+ Map<byte[], Cell> map = region.getFull(transactionId, row, columnSet, ts);
+ HbaseMapWritable<byte[], Cell> result = new HbaseMapWritable<byte[], Cell>();
+ result.putAll(map);
+ return new RowResult(row, result);
+ } catch (IOException e) {
+ checkFileSystem();
+ throw e;
+ }
+
+ }
+
+ /** {@inheritDoc} */
+ public void deleteAll(final long transactionId, final byte[] regionName,
+ final byte[] row, final long timestamp) throws IOException {
+ checkOpen();
+ super.getRequestCount().incrementAndGet();
+ try {
+ TransactionalRegion region = getTransactionalRegion(regionName);
+ region.deleteAll(transactionId, row, timestamp);
+ } catch (IOException e) {
+ checkFileSystem();
+ throw e;
+ }
+ }
+
+ public long openScanner(final long transactionId, final byte[] regionName,
+ final byte[][] cols, final byte[] firstRow, final long timestamp,
+ final RowFilterInterface filter) throws IOException {
+ checkOpen();
+ NullPointerException npe = null;
+ if (regionName == null) {
+ npe = new NullPointerException("regionName is null");
+ } else if (cols == null) {
+ npe = new NullPointerException("columns to scan is null");
+ } else if (firstRow == null) {
+ npe = new NullPointerException("firstRow for scanner is null");
+ }
+ if (npe != null) {
+ IOException io = new IOException("Invalid arguments to openScanner");
+ io.initCause(npe);
+ throw io;
+ }
+ super.getRequestCount().incrementAndGet();
+ try {
+ TransactionalRegion r = getTransactionalRegion(regionName);
+ long scannerId = -1L;
+ InternalScanner s = r.getScanner(transactionId, cols, firstRow,
+ timestamp, filter);
+ scannerId = super.addScanner(s);
+ return scannerId;
+ } catch (IOException e) {
+ LOG.error("Error opening scanner (fsOk: " + this.fsOk + ")",
+ RemoteExceptionHandler.checkIOException(e));
+ checkFileSystem();
+ throw e;
+ }
+ }
+
+ public void beginTransaction(final long transactionId, final byte[] regionName)
+ throws IOException {
+ getTransactionalRegion(regionName).beginTransaction(transactionId);
+ }
+
+}
Modified: hadoop/hbase/trunk/src/test/org/apache/hadoop/hbase/HBaseTestCase.java
URL: http://svn.apache.org/viewvc/hadoop/hbase/trunk/src/test/org/apache/hadoop/hbase/HBaseTestCase.java?rev=686650&r1=686649&r2=686650&view=diff
==============================================================================
--- hadoop/hbase/trunk/src/test/org/apache/hadoop/hbase/HBaseTestCase.java (original)
+++ hadoop/hbase/trunk/src/test/org/apache/hadoop/hbase/HBaseTestCase.java Sun Aug 17 15:03:42 2008
@@ -167,9 +167,11 @@
protected HRegion openClosedRegion(final HRegion closedRegion)
throws IOException {
- return new HRegion(closedRegion.getBaseDir(), closedRegion.getLog(),
- closedRegion.getFilesystem(), closedRegion.getConf(),
- closedRegion.getRegionInfo(), null, null);
+ HRegion r = new HRegion(closedRegion.getBaseDir(), closedRegion.getLog(),
+ closedRegion.getFilesystem(), closedRegion.getConf(),
+ closedRegion.getRegionInfo(), null);
+ r.initialize(null, null);
+ return r;
}
/**
Added: hadoop/hbase/trunk/src/test/org/apache/hadoop/hbase/client/transactional/StressTestTransactions.java
URL: http://svn.apache.org/viewvc/hadoop/hbase/trunk/src/test/org/apache/hadoop/hbase/client/transactional/StressTestTransactions.java?rev=686650&view=auto
==============================================================================
--- hadoop/hbase/trunk/src/test/org/apache/hadoop/hbase/client/transactional/StressTestTransactions.java (added)
+++ hadoop/hbase/trunk/src/test/org/apache/hadoop/hbase/client/transactional/StressTestTransactions.java Sun Aug 17 15:03:42 2008
@@ -0,0 +1,421 @@
+/**
+ * Copyright 2008 The Apache Software Foundation
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.client.transactional;
+
+import java.io.IOException;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Random;
+import java.util.concurrent.atomic.AtomicBoolean;
+
+import junit.framework.Assert;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.hbase.HBaseClusterTestCase;
+import org.apache.hadoop.hbase.HColumnDescriptor;
+import org.apache.hadoop.hbase.HConstants;
+import org.apache.hadoop.hbase.HTableDescriptor;
+import org.apache.hadoop.hbase.client.HBaseAdmin;
+import org.apache.hadoop.hbase.io.BatchUpdate;
+import org.apache.hadoop.hbase.ipc.TransactionalRegionInterface;
+import org.apache.hadoop.hbase.regionserver.transactional.TransactionalRegionServer;
+import org.apache.hadoop.hbase.util.Bytes;
+
+/**
+ * Stress Test the transaction functionality. This requires to run an
+ * {@link TransactionalRegionServer}. We run many threads doing reads/writes
+ * which may conflict with each other. We have two types of transactions, those
+ * which operate on rows of a single table, and those which operate on rows
+ * across multiple tables. Each transaction type has a modification operation
+ * which changes two values while maintaining the sum. Also each transaction
+ * type has a consistency-check operation which sums all rows and verifies that
+ * the sum is as expected.
+ */
+public class StressTestTransactions extends HBaseClusterTestCase {
+ private static final Log LOG = LogFactory
+ .getLog(StressTestTransactions.class);
+
+ private static final int NUM_TABLES = 3;
+ private static final int NUM_ST_ROWS = 3;
+ private static final int NUM_MT_ROWS = 3;
+ private static final int NUM_TRANSACTIONS_PER_THREAD = 100;
+ private static final int NUM_SINGLE_TABLE_THREADS = 6;
+ private static final int NUM_MULTI_TABLE_THREADS = 6;
+ private static final int PRE_COMMIT_SLEEP = 10;
+ private static final Random RAND = new Random();
+
+ private static final byte[] FAMILY = Bytes.toBytes("family:");
+ private static final byte[] COL = Bytes.toBytes("family:a");
+
+ private HBaseAdmin admin;
+ private TransactionalTable[] tables;
+ private TransactionManager transactionManager;
+
+ /** constructor */
+ public StressTestTransactions() {
+ conf.set(HConstants.REGION_SERVER_CLASS, TransactionalRegionInterface.class
+ .getName());
+ conf.set(HConstants.REGION_SERVER_IMPL, TransactionalRegionServer.class
+ .getName());
+ }
+
+ @Override
+ protected void setUp() throws Exception {
+ super.setUp();
+
+ tables = new TransactionalTable[NUM_TABLES];
+
+ for (int i = 0; i < tables.length; i++) {
+ HTableDescriptor desc = new HTableDescriptor(makeTableName(i));
+ desc.addFamily(new HColumnDescriptor(FAMILY));
+ admin = new HBaseAdmin(conf);
+ admin.createTable(desc);
+ tables[i] = new TransactionalTable(conf, desc.getName());
+ }
+
+ transactionManager = new TransactionManager(conf);
+ }
+
+ private String makeTableName(final int i) {
+ return "table" + i;
+ }
+
+ private void writeInitalValues() throws IOException {
+ for (TransactionalTable table : tables) {
+ for (int i = 0; i < NUM_ST_ROWS; i++) {
+ byte[] row = makeSTRow(i);
+ BatchUpdate b = new BatchUpdate(row);
+ b.put(COL, Bytes.toBytes(SingleTableTransactionThread.INITIAL_VALUE));
+ table.commit(b);
+ }
+ for (int i = 0; i < NUM_MT_ROWS; i++) {
+ byte[] row = makeMTRow(i);
+ BatchUpdate b = new BatchUpdate(row);
+ b.put(COL, Bytes.toBytes(MultiTableTransactionThread.INITIAL_VALUE));
+ table.commit(b);
+ }
+ }
+ }
+
+ private byte[] makeSTRow(final int i) {
+ return Bytes.toBytes("st" + i);
+ }
+
+ private byte[] makeMTRow(final int i) {
+ return Bytes.toBytes("mt" + i);
+ }
+
+ private static int nextThreadNum = 1;
+ private static final AtomicBoolean stopRequest = new AtomicBoolean(false);
+ private static final AtomicBoolean consistencyFailure = new AtomicBoolean(
+ false);
+
+ // Thread which runs transactions
+ abstract class TransactionThread extends Thread {
+ private int numRuns = 0;
+ private int numAborts = 0;
+ private int numUnknowns = 0;
+
+ public TransactionThread(final String namePrefix) {
+ super.setName(namePrefix + "transaction " + nextThreadNum++);
+ }
+
+ @Override
+ public void run() {
+ for (int i = 0; i < NUM_TRANSACTIONS_PER_THREAD; i++) {
+ if (stopRequest.get()) {
+ return;
+ }
+ try {
+ numRuns++;
+ transaction();
+ } catch (UnknownTransactionException e) {
+ numUnknowns++;
+ } catch (IOException e) {
+ throw new RuntimeException(e);
+ } catch (CommitUnsuccessfulException e) {
+ numAborts++;
+ }
+ }
+ }
+
+ protected abstract void transaction() throws IOException,
+ CommitUnsuccessfulException;
+
+ public int getNumAborts() {
+ return numAborts;
+ }
+
+ public int getNumUnknowns() {
+ return numUnknowns;
+ }
+
+ protected void preCommitSleep() {
+ try {
+ Thread.sleep(PRE_COMMIT_SLEEP);
+ } catch (InterruptedException e) {
+ throw new RuntimeException(e);
+ }
+ }
+
+ protected void consistencyFailure() {
+ LOG.fatal("Consistency failure");
+ stopRequest.set(true);
+ consistencyFailure.set(true);
+ }
+
+ /**
+ * Get the numRuns.
+ *
+ * @return Return the numRuns.
+ */
+ public int getNumRuns() {
+ return numRuns;
+ }
+
+ }
+
+ // Atomically change the value of two rows rows while maintaining the sum.
+ // This should preserve the global sum of the rows, which is also checked
+ // with a transaction.
+ private class SingleTableTransactionThread extends TransactionThread {
+ private static final int INITIAL_VALUE = 10;
+ public static final int TOTAL_SUM = INITIAL_VALUE * NUM_ST_ROWS;
+ private static final int MAX_TRANSFER_AMT = 100;
+
+ private TransactionalTable table;
+ boolean doCheck = false;
+
+ public SingleTableTransactionThread() {
+ super("single table ");
+ }
+
+ @Override
+ protected void transaction() throws IOException,
+ CommitUnsuccessfulException {
+ if (doCheck) {
+ checkTotalSum();
+ } else {
+ doSingleRowChange();
+ }
+ doCheck = !doCheck;
+ }
+
+ private void doSingleRowChange() throws IOException,
+ CommitUnsuccessfulException {
+ table = tables[RAND.nextInt(NUM_TABLES)];
+ int transferAmount = RAND.nextInt(MAX_TRANSFER_AMT * 2)
+ - MAX_TRANSFER_AMT;
+ int row1Index = RAND.nextInt(NUM_ST_ROWS);
+ int row2Index;
+ do {
+ row2Index = RAND.nextInt(NUM_ST_ROWS);
+ } while (row2Index == row1Index);
+ byte[] row1 = makeSTRow(row1Index);
+ byte[] row2 = makeSTRow(row2Index);
+
+ TransactionState transactionState = transactionManager.beginTransaction();
+ int row1Amount = Bytes.toInt(table.get(transactionState, row1, COL)
+ .getValue());
+ int row2Amount = Bytes.toInt(table.get(transactionState, row2, COL)
+ .getValue());
+
+ row1Amount -= transferAmount;
+ row2Amount += transferAmount;
+
+ BatchUpdate update = new BatchUpdate(row1);
+ update.put(COL, Bytes.toBytes(row1Amount));
+ table.commit(transactionState, update);
+ update = new BatchUpdate(row2);
+ update.put(COL, Bytes.toBytes(row2Amount));
+ table.commit(transactionState, update);
+
+ super.preCommitSleep();
+
+ transactionManager.tryCommit(transactionState);
+ LOG.debug("Commited");
+ }
+
+ // Check the table we last mutated
+ private void checkTotalSum() throws IOException,
+ CommitUnsuccessfulException {
+ TransactionState transactionState = transactionManager.beginTransaction();
+ int totalSum = 0;
+ for (int i = 0; i < NUM_ST_ROWS; i++) {
+ totalSum += Bytes.toInt(table.get(transactionState, makeSTRow(i), COL)
+ .getValue());
+ }
+
+ transactionManager.tryCommit(transactionState);
+ if (TOTAL_SUM != totalSum) {
+ super.consistencyFailure();
+ }
+ }
+
+ }
+
+ // Similar to SingleTable, but this time we maintain consistency across tables
+ // rather than rows
+ private class MultiTableTransactionThread extends TransactionThread {
+ private static final int INITIAL_VALUE = 1000;
+ public static final int TOTAL_SUM = INITIAL_VALUE * NUM_TABLES;
+ private static final int MAX_TRANSFER_AMT = 100;
+
+ private byte[] row;
+ boolean doCheck = false;
+
+ public MultiTableTransactionThread() {
+ super("multi table");
+ }
+
+ @Override
+ protected void transaction() throws IOException,
+ CommitUnsuccessfulException {
+ if (doCheck) {
+ checkTotalSum();
+ } else {
+ doSingleRowChange();
+ }
+ doCheck = !doCheck;
+ }
+
+ private void doSingleRowChange() throws IOException,
+ CommitUnsuccessfulException {
+ row = makeMTRow(RAND.nextInt(NUM_MT_ROWS));
+ int transferAmount = RAND.nextInt(MAX_TRANSFER_AMT * 2)
+ - MAX_TRANSFER_AMT;
+ int table1Index = RAND.nextInt(tables.length);
+ int table2Index;
+ do {
+ table2Index = RAND.nextInt(tables.length);
+ } while (table2Index == table1Index);
+
+ TransactionalTable table1 = tables[table1Index];
+ TransactionalTable table2 = tables[table2Index];
+
+ TransactionState transactionState = transactionManager.beginTransaction();
+ int table1Amount = Bytes.toInt(table1.get(transactionState, row, COL)
+ .getValue());
+ int table2Amount = Bytes.toInt(table2.get(transactionState, row, COL)
+ .getValue());
+
+ table1Amount -= transferAmount;
+ table2Amount += transferAmount;
+
+ BatchUpdate update = new BatchUpdate(row);
+ update.put(COL, Bytes.toBytes(table1Amount));
+ table1.commit(transactionState, update);
+
+ update = new BatchUpdate(row);
+ update.put(COL, Bytes.toBytes(table2Amount));
+ table2.commit(transactionState, update);
+
+ super.preCommitSleep();
+
+ transactionManager.tryCommit(transactionState);
+
+ LOG.trace(Bytes.toString(table1.getTableName()) + ": " + table1Amount);
+ LOG.trace(Bytes.toString(table2.getTableName()) + ": " + table2Amount);
+
+ }
+
+ private void checkTotalSum() throws IOException,
+ CommitUnsuccessfulException {
+ TransactionState transactionState = transactionManager.beginTransaction();
+ int totalSum = 0;
+ int[] amounts = new int[tables.length];
+ for (int i = 0; i < tables.length; i++) {
+ int amount = Bytes.toInt(tables[i].get(transactionState, row, COL)
+ .getValue());
+ amounts[i] = amount;
+ totalSum += amount;
+ }
+
+ transactionManager.tryCommit(transactionState);
+
+ for (int i = 0; i < tables.length; i++) {
+ LOG.trace(Bytes.toString(tables[i].getTableName()) + ": " + amounts[i]);
+ }
+
+ if (TOTAL_SUM != totalSum) {
+ super.consistencyFailure();
+ }
+ }
+
+ }
+
+ public void testStressTransactions() throws IOException, InterruptedException {
+ writeInitalValues();
+
+ List<TransactionThread> transactionThreads = new LinkedList<TransactionThread>();
+
+ for (int i = 0; i < NUM_SINGLE_TABLE_THREADS; i++) {
+ TransactionThread transactionThread = new SingleTableTransactionThread();
+ transactionThread.start();
+ transactionThreads.add(transactionThread);
+ }
+
+ for (int i = 0; i < NUM_MULTI_TABLE_THREADS; i++) {
+ TransactionThread transactionThread = new MultiTableTransactionThread();
+ transactionThread.start();
+ transactionThreads.add(transactionThread);
+ }
+
+ for (TransactionThread transactionThread : transactionThreads) {
+ transactionThread.join();
+ }
+
+ for (TransactionThread transactionThread : transactionThreads) {
+ LOG.info(transactionThread.getName() + " done with "
+ + transactionThread.getNumAborts() + " aborts, and "
+ + transactionThread.getNumUnknowns() + " unknown transactions of "
+ + transactionThread.getNumRuns());
+ }
+
+ doFinalConsistencyChecks();
+ }
+
+ private void doFinalConsistencyChecks() throws IOException {
+
+ int[] mtSums = new int[NUM_MT_ROWS];
+ for (int i = 0; i < mtSums.length; i++) {
+ mtSums[i] = 0;
+ }
+
+ for (TransactionalTable table : tables) {
+ int thisTableSum = 0;
+ for (int i = 0; i < NUM_ST_ROWS; i++) {
+ byte[] row = makeSTRow(i);
+ thisTableSum += Bytes.toInt(table.get(row, COL).getValue());
+ }
+ Assert.assertEquals(SingleTableTransactionThread.TOTAL_SUM, thisTableSum);
+
+ for (int i = 0; i < NUM_MT_ROWS; i++) {
+ byte[] row = makeMTRow(i);
+ mtSums[i] += Bytes.toInt(table.get(row, COL).getValue());
+ }
+ }
+
+ for (int mtSum : mtSums) {
+ Assert.assertEquals(MultiTableTransactionThread.TOTAL_SUM, mtSum);
+ }
+ }
+}
Added: hadoop/hbase/trunk/src/test/org/apache/hadoop/hbase/client/transactional/TestTransactions.java
URL: http://svn.apache.org/viewvc/hadoop/hbase/trunk/src/test/org/apache/hadoop/hbase/client/transactional/TestTransactions.java?rev=686650&view=auto
==============================================================================
--- hadoop/hbase/trunk/src/test/org/apache/hadoop/hbase/client/transactional/TestTransactions.java (added)
+++ hadoop/hbase/trunk/src/test/org/apache/hadoop/hbase/client/transactional/TestTransactions.java Sun Aug 17 15:03:42 2008
@@ -0,0 +1,143 @@
+/**
+ * Copyright 2008 The Apache Software Foundation
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.client.transactional;
+
+import java.io.IOException;
+
+import org.apache.hadoop.hbase.HBaseClusterTestCase;
+import org.apache.hadoop.hbase.HColumnDescriptor;
+import org.apache.hadoop.hbase.HConstants;
+import org.apache.hadoop.hbase.HTableDescriptor;
+import org.apache.hadoop.hbase.client.HBaseAdmin;
+import org.apache.hadoop.hbase.io.BatchUpdate;
+import org.apache.hadoop.hbase.io.Cell;
+import org.apache.hadoop.hbase.ipc.TransactionalRegionInterface;
+import org.apache.hadoop.hbase.regionserver.transactional.TransactionalRegionServer;
+import org.apache.hadoop.hbase.util.Bytes;
+
+/**
+ * Test the transaction functionality. This requires to run an
+ * {@link TransactionalRegionServer}.
+ */
+public class TestTransactions extends HBaseClusterTestCase {
+
+ private static final String TABLE_NAME = "table1";
+
+ private static final byte[] FAMILY = Bytes.toBytes("family:");
+ private static final byte[] COL_A = Bytes.toBytes("family:a");
+
+ private static final byte[] ROW1 = Bytes.toBytes("row1");
+ private static final byte[] ROW2 = Bytes.toBytes("row2");
+ private static final byte[] ROW3 = Bytes.toBytes("row3");
+
+ private HBaseAdmin admin;
+ private TransactionalTable table;
+ private TransactionManager transactionManager;
+
+ /** constructor */
+ public TestTransactions() {
+ conf.set(HConstants.REGION_SERVER_CLASS, TransactionalRegionInterface.class
+ .getName());
+ conf.set(HConstants.REGION_SERVER_IMPL, TransactionalRegionServer.class
+ .getName());
+ }
+
+ @Override
+ protected void setUp() throws Exception {
+ super.setUp();
+
+ HTableDescriptor desc = new HTableDescriptor(TABLE_NAME);
+ desc.addFamily(new HColumnDescriptor(FAMILY));
+ admin = new HBaseAdmin(conf);
+ admin.createTable(desc);
+ table = new TransactionalTable(conf, desc.getName());
+
+ transactionManager = new TransactionManager(conf);
+ writeInitalRow();
+ }
+
+ private void writeInitalRow() throws IOException {
+ BatchUpdate update = new BatchUpdate(ROW1);
+ update.put(COL_A, Bytes.toBytes(1));
+ table.commit(update);
+ }
+
+ public void testSimpleTransaction() throws IOException,
+ CommitUnsuccessfulException {
+ TransactionState transactionState = makeTransaction1();
+ transactionManager.tryCommit(transactionState);
+ }
+
+ public void testTwoTransactionsWithoutConflict() throws IOException,
+ CommitUnsuccessfulException {
+ TransactionState transactionState1 = makeTransaction1();
+ TransactionState transactionState2 = makeTransaction2();
+
+ transactionManager.tryCommit(transactionState1);
+ transactionManager.tryCommit(transactionState2);
+ }
+
+ public void testTwoTransactionsWithConflict() throws IOException,
+ CommitUnsuccessfulException {
+ TransactionState transactionState1 = makeTransaction1();
+ TransactionState transactionState2 = makeTransaction2();
+
+ transactionManager.tryCommit(transactionState2);
+
+ try {
+ transactionManager.tryCommit(transactionState1);
+ fail();
+ } catch (CommitUnsuccessfulException e) {
+ // Good
+ }
+ }
+
+ // Read from ROW1,COL_A and put it in ROW2_COLA and ROW3_COLA
+ private TransactionState makeTransaction1() throws IOException {
+ TransactionState transactionState = transactionManager.beginTransaction();
+
+ Cell row1_A = table.get(transactionState, ROW1, COL_A);
+
+ BatchUpdate write1 = new BatchUpdate(ROW2);
+ write1.put(COL_A, row1_A.getValue());
+ table.commit(transactionState, write1);
+
+ BatchUpdate write2 = new BatchUpdate(ROW3);
+ write2.put(COL_A, row1_A.getValue());
+ table.commit(transactionState, write2);
+
+ return transactionState;
+ }
+
+ // Read ROW1,COL_A, increment its (integer) value, write back
+ private TransactionState makeTransaction2() throws IOException {
+ TransactionState transactionState = transactionManager.beginTransaction();
+
+ Cell row1_A = table.get(transactionState, ROW1, COL_A);
+
+ int value = Bytes.toInt(row1_A.getValue());
+
+ BatchUpdate write = new BatchUpdate(ROW1);
+ write.put(COL_A, Bytes.toBytes(value + 1));
+ table.commit(transactionState, write);
+
+ return transactionState;
+ }
+}
Added: hadoop/hbase/trunk/src/test/org/apache/hadoop/hbase/regionserver/transactional/TestHLogRecovery.java
URL: http://svn.apache.org/viewvc/hadoop/hbase/trunk/src/test/org/apache/hadoop/hbase/regionserver/transactional/TestHLogRecovery.java?rev=686650&view=auto
==============================================================================
--- hadoop/hbase/trunk/src/test/org/apache/hadoop/hbase/regionserver/transactional/TestHLogRecovery.java (added)
+++ hadoop/hbase/trunk/src/test/org/apache/hadoop/hbase/regionserver/transactional/TestHLogRecovery.java Sun Aug 17 15:03:42 2008
@@ -0,0 +1,285 @@
+/**
+ * Copyright 2008 The Apache Software Foundation
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.regionserver.transactional;
+
+import java.io.IOException;
+import java.util.Collection;
+import java.util.List;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hbase.HBaseClusterTestCase;
+import org.apache.hadoop.hbase.HColumnDescriptor;
+import org.apache.hadoop.hbase.HConstants;
+import org.apache.hadoop.hbase.HTableDescriptor;
+import org.apache.hadoop.hbase.LocalHBaseCluster;
+import org.apache.hadoop.hbase.client.HBaseAdmin;
+import org.apache.hadoop.hbase.client.HTable;
+import org.apache.hadoop.hbase.client.Scanner;
+import org.apache.hadoop.hbase.client.transactional.CommitUnsuccessfulException;
+import org.apache.hadoop.hbase.client.transactional.TransactionManager;
+import org.apache.hadoop.hbase.client.transactional.TransactionState;
+import org.apache.hadoop.hbase.client.transactional.TransactionalTable;
+import org.apache.hadoop.hbase.io.BatchUpdate;
+import org.apache.hadoop.hbase.ipc.TransactionalRegionInterface;
+import org.apache.hadoop.hbase.regionserver.HRegion;
+import org.apache.hadoop.hbase.regionserver.HRegionServer;
+import org.apache.hadoop.hbase.util.Bytes;
+
+public class TestHLogRecovery extends HBaseClusterTestCase {
+ private static final Log LOG = LogFactory.getLog(TestHLogRecovery.class);
+
+ private static final String TABLE_NAME = "table1";
+
+ private static final byte[] FAMILY = Bytes.toBytes("family:");
+ private static final byte[] COL_A = Bytes.toBytes("family:a");
+
+ private static final byte[] ROW1 = Bytes.toBytes("row1");
+ private static final byte[] ROW2 = Bytes.toBytes("row2");
+ private static final byte[] ROW3 = Bytes.toBytes("row3");
+ private static final int TOTAL_VALUE = 10;
+
+ private HBaseAdmin admin;
+ private TransactionManager transactionManager;
+ private TransactionalTable table;
+
+ /** constructor */
+ public TestHLogRecovery() {
+ super(2, false);
+
+ conf.set(HConstants.REGION_SERVER_CLASS, TransactionalRegionInterface.class
+ .getName());
+ conf.set(HConstants.REGION_SERVER_IMPL, TransactionalRegionServer.class
+ .getName());
+
+ // Set flush params so we don't get any
+ // FIXME (defaults are probably fine)
+
+ // Copied from TestRegionServerExit
+ conf.setInt("ipc.client.connect.max.retries", 5); // reduce ipc retries
+ conf.setInt("ipc.client.timeout", 10000); // and ipc timeout
+ conf.setInt("hbase.client.pause", 10000); // increase client timeout
+ conf.setInt("hbase.client.retries.number", 10); // increase HBase retries
+ }
+
+ @Override
+ protected void setUp() throws Exception {
+ FileSystem.getLocal(conf).delete(new Path(conf.get(HConstants.HBASE_DIR)), true);
+ super.setUp();
+
+ HTableDescriptor desc = new HTableDescriptor(TABLE_NAME);
+ desc.addFamily(new HColumnDescriptor(FAMILY));
+ admin = new HBaseAdmin(conf);
+ admin.createTable(desc);
+ table = new TransactionalTable(conf, desc.getName());
+
+ transactionManager = new TransactionManager(conf);
+ writeInitalRows();
+ }
+
+ private void writeInitalRows() throws IOException {
+ BatchUpdate update = new BatchUpdate(ROW1);
+ update.put(COL_A, Bytes.toBytes(TOTAL_VALUE));
+ table.commit(update);
+ update = new BatchUpdate(ROW2);
+ update.put(COL_A, Bytes.toBytes(0));
+ table.commit(update);
+ update = new BatchUpdate(ROW3);
+ update.put(COL_A, Bytes.toBytes(0));
+ table.commit(update);
+ }
+
+ public void testWithoutFlush() throws IOException,
+ CommitUnsuccessfulException {
+ writeInitalRows();
+ TransactionState state1 = makeTransaction(false);
+ transactionManager.tryCommit(state1);
+ stopOrAbortRegionServer(true);
+
+ Thread t = startVerificationThread(1);
+ t.start();
+ threadDumpingJoin(t);
+ }
+
+ public void testWithFlushBeforeCommit() throws IOException,
+ CommitUnsuccessfulException {
+ writeInitalRows();
+ TransactionState state1 = makeTransaction(false);
+ flushRegionServer();
+ transactionManager.tryCommit(state1);
+ stopOrAbortRegionServer(true);
+
+ Thread t = startVerificationThread(1);
+ t.start();
+ threadDumpingJoin(t);
+ }
+
+ // FIXME, TODO
+ // public void testWithFlushBetweenTransactionWrites() {
+ // fail();
+ // }
+
+ private void flushRegionServer() {
+ List<LocalHBaseCluster.RegionServerThread> regionThreads = cluster
+ .getRegionThreads();
+
+ HRegion region = null;
+ int server = -1;
+ for (int i = 0; i < regionThreads.size() && server == -1; i++) {
+ HRegionServer s = regionThreads.get(i).getRegionServer();
+ Collection<HRegion> regions = s.getOnlineRegions();
+ for (HRegion r : regions) {
+ if (Bytes.equals(r.getTableDesc().getName(), Bytes.toBytes(TABLE_NAME))) {
+ server = i;
+ region = r;
+ }
+ }
+ }
+ if (server == -1) {
+ LOG.fatal("could not find region server serving table region");
+ fail();
+ }
+ ((TransactionalRegionServer) regionThreads.get(server).getRegionServer())
+ .getFlushRequester().request(region);
+ }
+
+ /**
+ * Stop the region server serving TABLE_NAME.
+ *
+ * @param abort set to true if region server should be aborted, if false it is
+ * just shut down.
+ */
+ private void stopOrAbortRegionServer(final boolean abort) {
+ List<LocalHBaseCluster.RegionServerThread> regionThreads = cluster
+ .getRegionThreads();
+
+ int server = -1;
+ for (int i = 0; i < regionThreads.size(); i++) {
+ HRegionServer s = regionThreads.get(i).getRegionServer();
+ Collection<HRegion> regions = s.getOnlineRegions();
+ LOG.info("server: " + regionThreads.get(i).getName());
+ for (HRegion r : regions) {
+ LOG.info("region: " + r.getRegionInfo().getRegionNameAsString());
+ if (Bytes.equals(r.getTableDesc().getName(), Bytes.toBytes(TABLE_NAME))) {
+ server = i;
+ }
+ }
+ }
+ if (server == -1) {
+ LOG.fatal("could not find region server serving table region");
+ fail();
+ }
+ if (abort) {
+ this.cluster.abortRegionServer(server);
+
+ } else {
+ this.cluster.stopRegionServer(server);
+ }
+ LOG.info(this.cluster.waitOnRegionServer(server) + " has been "
+ + (abort ? "aborted" : "shut down"));
+ }
+
+ private void verify(final int numRuns) throws IOException {
+ // Reads
+ int row1 = Bytes.toInt(table.get(ROW1, COL_A).getValue());
+ int row2 = Bytes.toInt(table.get(ROW2, COL_A).getValue());
+ int row3 = Bytes.toInt(table.get(ROW3, COL_A).getValue());
+
+ assertEquals(TOTAL_VALUE - 2 * numRuns, row1);
+ assertEquals(numRuns, row2);
+ assertEquals(numRuns, row3);
+ }
+
+ // Move 2 out of ROW1 and 1 into ROW2 and 1 into ROW3
+ private TransactionState makeTransaction(final boolean flushMidWay)
+ throws IOException {
+ TransactionState transactionState = transactionManager.beginTransaction();
+
+ // Reads
+ int row1 = Bytes.toInt(table.get(transactionState, ROW1, COL_A).getValue());
+ int row2 = Bytes.toInt(table.get(transactionState, ROW2, COL_A).getValue());
+ int row3 = Bytes.toInt(table.get(transactionState, ROW3, COL_A).getValue());
+
+ row1 -= 2;
+ row2 += 1;
+ row3 += 1;
+
+ if (flushMidWay) {
+ flushRegionServer();
+ }
+
+ // Writes
+ BatchUpdate write = new BatchUpdate(ROW1);
+ write.put(COL_A, Bytes.toBytes(row1));
+ table.commit(transactionState, write);
+
+ write = new BatchUpdate(ROW2);
+ write.put(COL_A, Bytes.toBytes(row2));
+ table.commit(transactionState, write);
+
+ write = new BatchUpdate(ROW3);
+ write.put(COL_A, Bytes.toBytes(row3));
+ table.commit(transactionState, write);
+
+ return transactionState;
+ }
+
+ /*
+ * Run verification in a thread so I can concurrently run a thread-dumper
+ * while we're waiting (because in this test sometimes the meta scanner looks
+ * to be be stuck). @param tableName Name of table to find. @param row Row we
+ * expect to find. @return Verification thread. Caller needs to calls start on
+ * it.
+ */
+ private Thread startVerificationThread(final int numRuns) {
+ Runnable runnable = new Runnable() {
+ public void run() {
+ try {
+ // Now try to open a scanner on the meta table. Should stall until
+ // meta server comes back up.
+ HTable t = new HTable(conf, TABLE_NAME);
+ Scanner s = t.getScanner(new byte[][] { COL_A },
+ HConstants.EMPTY_START_ROW);
+ s.close();
+
+ } catch (IOException e) {
+ LOG.fatal("could not re-open meta table because", e);
+ fail();
+ }
+ Scanner scanner = null;
+ try {
+ verify(numRuns);
+ LOG.info("Success!");
+ } catch (Exception e) {
+ e.printStackTrace();
+ fail();
+ } finally {
+ if (scanner != null) {
+ LOG.info("Closing scanner " + scanner);
+ scanner.close();
+ }
+ }
+ }
+ };
+ return new Thread(runnable);
+ }
+}