You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hbase.apache.org by st...@apache.org on 2008/08/18 00:03:43 UTC
svn commit: r686650 [1/3] - in /hadoop/hbase/trunk: ./
src/java/org/apache/hadoop/hbase/ src/java/org/apache/hadoop/hbase/client/
src/java/org/apache/hadoop/hbase/client/transactional/
src/java/org/apache/hadoop/hbase/ipc/ src/java/org/apache/hadoop/hb...
Author: stack
Date: Sun Aug 17 15:03:42 2008
New Revision: 686650
URL: http://svn.apache.org/viewvc?rev=686650&view=rev
Log:
HBASE-669 MultiRegion transactions with Optimistic Concurrency Control
Added:
hadoop/hbase/trunk/src/java/org/apache/hadoop/hbase/client/transactional/
hadoop/hbase/trunk/src/java/org/apache/hadoop/hbase/client/transactional/CommitUnsuccessfulException.java
hadoop/hbase/trunk/src/java/org/apache/hadoop/hbase/client/transactional/LocalTransactionLogger.java
hadoop/hbase/trunk/src/java/org/apache/hadoop/hbase/client/transactional/TransactionLogger.java
hadoop/hbase/trunk/src/java/org/apache/hadoop/hbase/client/transactional/TransactionManager.java
hadoop/hbase/trunk/src/java/org/apache/hadoop/hbase/client/transactional/TransactionScannerCallable.java
hadoop/hbase/trunk/src/java/org/apache/hadoop/hbase/client/transactional/TransactionState.java
hadoop/hbase/trunk/src/java/org/apache/hadoop/hbase/client/transactional/TransactionalTable.java
hadoop/hbase/trunk/src/java/org/apache/hadoop/hbase/client/transactional/UnknownTransactionException.java
hadoop/hbase/trunk/src/java/org/apache/hadoop/hbase/client/transactional/package.html
hadoop/hbase/trunk/src/java/org/apache/hadoop/hbase/ipc/TransactionalRegionInterface.java
hadoop/hbase/trunk/src/java/org/apache/hadoop/hbase/regionserver/transactional/
hadoop/hbase/trunk/src/java/org/apache/hadoop/hbase/regionserver/transactional/CleanOldTransactionsChore.java
hadoop/hbase/trunk/src/java/org/apache/hadoop/hbase/regionserver/transactional/TransactionState.java
hadoop/hbase/trunk/src/java/org/apache/hadoop/hbase/regionserver/transactional/TransactionalHLogManager.java
hadoop/hbase/trunk/src/java/org/apache/hadoop/hbase/regionserver/transactional/TransactionalRegion.java
hadoop/hbase/trunk/src/java/org/apache/hadoop/hbase/regionserver/transactional/TransactionalRegionServer.java
hadoop/hbase/trunk/src/test/org/apache/hadoop/hbase/client/transactional/
hadoop/hbase/trunk/src/test/org/apache/hadoop/hbase/client/transactional/StressTestTransactions.java
hadoop/hbase/trunk/src/test/org/apache/hadoop/hbase/client/transactional/TestTransactions.java
hadoop/hbase/trunk/src/test/org/apache/hadoop/hbase/regionserver/transactional/
hadoop/hbase/trunk/src/test/org/apache/hadoop/hbase/regionserver/transactional/TestHLogRecovery.java
hadoop/hbase/trunk/src/test/org/apache/hadoop/hbase/regionserver/transactional/TestTransactionalHLogManager.java
Modified:
hadoop/hbase/trunk/CHANGES.txt
hadoop/hbase/trunk/src/java/org/apache/hadoop/hbase/HMerge.java
hadoop/hbase/trunk/src/java/org/apache/hadoop/hbase/client/HTable.java
hadoop/hbase/trunk/src/java/org/apache/hadoop/hbase/client/ServerCallable.java
hadoop/hbase/trunk/src/java/org/apache/hadoop/hbase/regionserver/HLog.java
hadoop/hbase/trunk/src/java/org/apache/hadoop/hbase/regionserver/HLogEdit.java
hadoop/hbase/trunk/src/java/org/apache/hadoop/hbase/regionserver/HLogKey.java
hadoop/hbase/trunk/src/java/org/apache/hadoop/hbase/regionserver/HRegion.java
hadoop/hbase/trunk/src/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java
hadoop/hbase/trunk/src/java/org/apache/hadoop/hbase/regionserver/HStore.java
hadoop/hbase/trunk/src/test/org/apache/hadoop/hbase/HBaseTestCase.java
Modified: hadoop/hbase/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/hadoop/hbase/trunk/CHANGES.txt?rev=686650&r1=686649&r2=686650&view=diff
==============================================================================
--- hadoop/hbase/trunk/CHANGES.txt (original)
+++ hadoop/hbase/trunk/CHANGES.txt Sun Aug 17 15:03:42 2008
@@ -49,6 +49,8 @@
were present in previous versions of the patches for this issue,
but not in the version that was committed. Also fix a number of
compilation problems that were introduced by patch.
+ HBASE-669 MultiRegion transactions with Optimistic Concurrency Control
+ (Clint Morgan via Stack)
OPTIMIZATIONS
Modified: hadoop/hbase/trunk/src/java/org/apache/hadoop/hbase/HMerge.java
URL: http://svn.apache.org/viewvc/hadoop/hbase/trunk/src/java/org/apache/hadoop/hbase/HMerge.java?rev=686650&r1=686649&r2=686650&view=diff
==============================================================================
--- hadoop/hbase/trunk/src/java/org/apache/hadoop/hbase/HMerge.java (original)
+++ hadoop/hbase/trunk/src/java/org/apache/hadoop/hbase/HMerge.java Sun Aug 17 15:03:42 2008
@@ -150,12 +150,13 @@
for (int i = 0; i < info.length - 1; i++) {
if (currentRegion == null) {
currentRegion =
- new HRegion(tabledir, hlog, fs, conf, info[i], null, null);
+ new HRegion(tabledir, hlog, fs, conf, info[i], null);
+ currentRegion.initialize(null, null);
currentSize = currentRegion.getLargestHStoreSize();
}
nextRegion =
- new HRegion(tabledir, hlog, fs, conf, info[i + 1], null, null);
-
+ new HRegion(tabledir, hlog, fs, conf, info[i + 1], null);
+ nextRegion.initialize(null, null);
nextSize = nextRegion.getLargestHStoreSize();
if ((currentSize + nextSize) <= (maxFilesize / 2)) {
@@ -322,7 +323,8 @@
// Scan root region to find all the meta regions
root = new HRegion(rootTableDir, hlog, fs, conf,
- HRegionInfo.ROOT_REGIONINFO, null, null);
+ HRegionInfo.ROOT_REGIONINFO, null);
+ root.initialize(null, null);
InternalScanner rootScanner =
root.getScanner(COL_REGIONINFO_ARRAY, HConstants.EMPTY_START_ROW,
Modified: hadoop/hbase/trunk/src/java/org/apache/hadoop/hbase/client/HTable.java
URL: http://svn.apache.org/viewvc/hadoop/hbase/trunk/src/java/org/apache/hadoop/hbase/client/HTable.java?rev=686650&r1=686649&r2=686650&view=diff
==============================================================================
--- hadoop/hbase/trunk/src/java/org/apache/hadoop/hbase/client/HTable.java (original)
+++ hadoop/hbase/trunk/src/java/org/apache/hadoop/hbase/client/HTable.java Sun Aug 17 15:03:42 2008
@@ -982,7 +982,9 @@
public Scanner getScanner(final byte [][] columns,
final byte [] startRow, long timestamp, RowFilterInterface filter)
throws IOException {
- return new ClientScanner(columns, startRow, timestamp, filter);
+ ClientScanner s = new ClientScanner(columns, startRow, timestamp, filter);
+ s.initialize();
+ return s;
}
/**
@@ -1335,15 +1337,13 @@
protected RowFilterInterface filter;
protected ClientScanner(final Text [] columns, final Text startRow,
- long timestamp, RowFilterInterface filter)
- throws IOException {
+ long timestamp, RowFilterInterface filter) {
this(Bytes.toByteArrays(columns), startRow.getBytes(), timestamp,
filter);
}
protected ClientScanner(final byte[][] columns, final byte [] startRow,
- final long timestamp, final RowFilterInterface filter)
- throws IOException {
+ final long timestamp, final RowFilterInterface filter) {
if (CLIENT_LOG.isDebugEnabled()) {
CLIENT_LOG.debug("Creating scanner over " + Bytes.toString(getTableName()) +
" starting at key '" + Bytes.toString(startRow) + "'");
@@ -1359,6 +1359,9 @@
if (filter != null) {
filter.validate(columns);
}
+ }
+
+ public void initialize() throws IOException {
nextScanner();
}
Modified: hadoop/hbase/trunk/src/java/org/apache/hadoop/hbase/client/ServerCallable.java
URL: http://svn.apache.org/viewvc/hadoop/hbase/trunk/src/java/org/apache/hadoop/hbase/client/ServerCallable.java?rev=686650&r1=686649&r2=686650&view=diff
==============================================================================
--- hadoop/hbase/trunk/src/java/org/apache/hadoop/hbase/client/ServerCallable.java (original)
+++ hadoop/hbase/trunk/src/java/org/apache/hadoop/hbase/client/ServerCallable.java Sun Aug 17 15:03:42 2008
@@ -60,11 +60,17 @@
/** @return the server name */
public String getServerName() {
+ if (location == null) {
+ return null;
+ }
return location.getServerAddress().toString();
}
/** @return the region name */
- public byte [] getRegionName() {
+ public byte[] getRegionName() {
+ if (location == null) {
+ return null;
+ }
return location.getRegionInfo().getRegionName();
}
Added: hadoop/hbase/trunk/src/java/org/apache/hadoop/hbase/client/transactional/CommitUnsuccessfulException.java
URL: http://svn.apache.org/viewvc/hadoop/hbase/trunk/src/java/org/apache/hadoop/hbase/client/transactional/CommitUnsuccessfulException.java?rev=686650&view=auto
==============================================================================
--- hadoop/hbase/trunk/src/java/org/apache/hadoop/hbase/client/transactional/CommitUnsuccessfulException.java (added)
+++ hadoop/hbase/trunk/src/java/org/apache/hadoop/hbase/client/transactional/CommitUnsuccessfulException.java Sun Aug 17 15:03:42 2008
@@ -0,0 +1,43 @@
+/**
+ * Copyright 2008 The Apache Software Foundation
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.client.transactional;
+
+/** Thrown when a transaction cannot be committed.
+ *
+ */
+public class CommitUnsuccessfulException extends Exception {
+
+ public CommitUnsuccessfulException() {
+ super();
+ }
+
+ public CommitUnsuccessfulException(String arg0, Throwable arg1) {
+ super(arg0, arg1);
+ }
+
+ public CommitUnsuccessfulException(String arg0) {
+ super(arg0);
+ }
+
+ public CommitUnsuccessfulException(Throwable arg0) {
+ super(arg0);
+ }
+
+}
Added: hadoop/hbase/trunk/src/java/org/apache/hadoop/hbase/client/transactional/LocalTransactionLogger.java
URL: http://svn.apache.org/viewvc/hadoop/hbase/trunk/src/java/org/apache/hadoop/hbase/client/transactional/LocalTransactionLogger.java?rev=686650&view=auto
==============================================================================
--- hadoop/hbase/trunk/src/java/org/apache/hadoop/hbase/client/transactional/LocalTransactionLogger.java (added)
+++ hadoop/hbase/trunk/src/java/org/apache/hadoop/hbase/client/transactional/LocalTransactionLogger.java Sun Aug 17 15:03:42 2008
@@ -0,0 +1,66 @@
+/**
+ * Copyright 2008 The Apache Software Foundation
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.client.transactional;
+
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Random;
+
+/**
+ * A local, in-memory implementation of the transaction logger. Does not provide a global view, so
+ * it can't be relighed on by
+ *
+ */
+public class LocalTransactionLogger implements TransactionLogger {
+
+ private static LocalTransactionLogger instance;
+
+ public synchronized static LocalTransactionLogger getInstance() {
+ if (instance == null) {
+ instance = new LocalTransactionLogger();
+ }
+ return instance;
+ }
+
+ private Random random = new Random();
+ private Map<Long, TransactionStatus> transactionIdToStatusMap = Collections
+ .synchronizedMap(new HashMap<Long, TransactionStatus>());
+
+ private LocalTransactionLogger() {
+ // Enforce singlton
+ }
+
+ // Gives back random longs to minimize possibility of collision
+ public long createNewTransactionLog() {
+ long id = random.nextLong();
+ transactionIdToStatusMap.put(id, TransactionStatus.PENDING);
+ return id;
+ }
+
+ public TransactionStatus getStatusForTransaction(final long transactionId) {
+ return transactionIdToStatusMap.get(transactionId);
+ }
+
+ public void setStatusForTransaction(final long transactionId,
+ final TransactionStatus status) {
+ transactionIdToStatusMap.put(transactionId, status);
+ }
+}
Added: hadoop/hbase/trunk/src/java/org/apache/hadoop/hbase/client/transactional/TransactionLogger.java
URL: http://svn.apache.org/viewvc/hadoop/hbase/trunk/src/java/org/apache/hadoop/hbase/client/transactional/TransactionLogger.java?rev=686650&view=auto
==============================================================================
--- hadoop/hbase/trunk/src/java/org/apache/hadoop/hbase/client/transactional/TransactionLogger.java (added)
+++ hadoop/hbase/trunk/src/java/org/apache/hadoop/hbase/client/transactional/TransactionLogger.java Sun Aug 17 15:03:42 2008
@@ -0,0 +1,45 @@
+/**
+ * Copyright 2008 The Apache Software Foundation
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.client.transactional;
+
+/**
+ * Simple interface used to provide a log about transaction status. Written to
+ * by the client, and read by regionservers in case of failure.
+ *
+ */
+public interface TransactionLogger {
+
+ enum TransactionStatus {
+ PENDING, COMMITTED, ABORTED
+ }
+
+ /**
+ * Create a new transaction log. Return the transaction's globally unique id.
+ * Log's initial value should be PENDING
+ *
+ * @return transaction id
+ */
+ long createNewTransactionLog();
+
+ TransactionStatus getStatusForTransaction(long transactionId);
+
+ void setStatusForTransaction(long transactionId, TransactionStatus status);
+
+}
Added: hadoop/hbase/trunk/src/java/org/apache/hadoop/hbase/client/transactional/TransactionManager.java
URL: http://svn.apache.org/viewvc/hadoop/hbase/trunk/src/java/org/apache/hadoop/hbase/client/transactional/TransactionManager.java?rev=686650&view=auto
==============================================================================
--- hadoop/hbase/trunk/src/java/org/apache/hadoop/hbase/client/transactional/TransactionManager.java (added)
+++ hadoop/hbase/trunk/src/java/org/apache/hadoop/hbase/client/transactional/TransactionManager.java Sun Aug 17 15:03:42 2008
@@ -0,0 +1,145 @@
+/**
+ * Copyright 2008 The Apache Software Foundation
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.client.transactional;
+
+import java.io.IOException;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.hbase.HBaseConfiguration;
+import org.apache.hadoop.hbase.HRegionLocation;
+import org.apache.hadoop.hbase.client.HConnection;
+import org.apache.hadoop.hbase.client.HConnectionManager;
+import org.apache.hadoop.hbase.ipc.TransactionalRegionInterface;
+import org.apache.hadoop.ipc.RemoteException;
+
+/**
+ * Transaction Manager. Responsible for committing transactions.
+ *
+ */
+public class TransactionManager {
+ static final Log LOG = LogFactory.getLog(TransactionManager.class);
+
+ private final HConnection connection;
+ private final TransactionLogger transactionLogger;
+
+ public TransactionManager(final HBaseConfiguration conf) {
+ this(LocalTransactionLogger.getInstance(), conf);
+ }
+
+ public TransactionManager(final TransactionLogger transactionLogger,
+ final HBaseConfiguration conf) {
+ this.transactionLogger = transactionLogger;
+ connection = HConnectionManager.getConnection(conf);
+ }
+
+ /**
+ * Called to start a transaction.
+ *
+ * @return new transaction state
+ */
+ public TransactionState beginTransaction() {
+ long transactionId = transactionLogger.createNewTransactionLog();
+ LOG.debug("Begining transaction " + transactionId);
+ return new TransactionState(transactionId);
+ }
+
+ /**
+ * Try and commit a transaction.
+ *
+ * @param transactionState
+ * @return
+ * @throws IOException
+ */
+ public void tryCommit(final TransactionState transactionState)
+ throws CommitUnsuccessfulException, IOException {
+ LOG.debug("atempting to commit trasaction: " + transactionState.toString());
+
+ try {
+ for (HRegionLocation location : transactionState
+ .getParticipatingRegions()) {
+ TransactionalRegionInterface transactionalRegionServer = (TransactionalRegionInterface) connection
+ .getHRegionConnection(location.getServerAddress());
+ boolean canCommit = transactionalRegionServer.commitRequest(location
+ .getRegionInfo().getRegionName(), transactionState
+ .getTransactionId());
+ if (LOG.isTraceEnabled()) {
+ LOG.trace("Region ["
+ + location.getRegionInfo().getRegionNameAsString() + "] votes "
+ + (canCommit ? "to commit" : "to abort") + " transaction "
+ + transactionState.getTransactionId());
+ }
+
+ if (!canCommit) {
+ LOG.debug("Aborting [" + transactionState.getTransactionId() + "]");
+ abort(transactionState, location);
+ throw new CommitUnsuccessfulException();
+ }
+ }
+
+ LOG.debug("Commiting [" + transactionState.getTransactionId() + "]");
+
+ transactionLogger.setStatusForTransaction(transactionState
+ .getTransactionId(), TransactionLogger.TransactionStatus.COMMITTED);
+
+ for (HRegionLocation location : transactionState
+ .getParticipatingRegions()) {
+ TransactionalRegionInterface transactionalRegionServer = (TransactionalRegionInterface) connection
+ .getHRegionConnection(location.getServerAddress());
+ transactionalRegionServer.commit(location.getRegionInfo()
+ .getRegionName(), transactionState.getTransactionId());
+ }
+ } catch (RemoteException e) {
+ LOG.debug("Commit of transaction [" + transactionState.getTransactionId()
+ + "] was unsucsessful", e);
+ // FIXME, think about the what ifs
+ throw new CommitUnsuccessfulException(e);
+ }
+ // Tran log can be deleted now ...
+ }
+
+ /**
+ * Abort a s transaction.
+ *
+ * @param transactionState
+ * @throws IOException
+ */
+ public void abort(final TransactionState transactionState) throws IOException {
+ abort(transactionState, null);
+ }
+
+ private void abort(final TransactionState transactionState,
+ final HRegionLocation locationToIgnore) throws IOException {
+ transactionLogger.setStatusForTransaction(transactionState
+ .getTransactionId(), TransactionLogger.TransactionStatus.ABORTED);
+
+ for (HRegionLocation location : transactionState.getParticipatingRegions()) {
+ if (locationToIgnore != null && location.equals(locationToIgnore)) {
+ continue;
+ }
+
+ TransactionalRegionInterface transactionalRegionServer = (TransactionalRegionInterface) connection
+ .getHRegionConnection(location.getServerAddress());
+
+ transactionalRegionServer.abort(location.getRegionInfo().getRegionName(),
+ transactionState.getTransactionId());
+ }
+ }
+}
Added: hadoop/hbase/trunk/src/java/org/apache/hadoop/hbase/client/transactional/TransactionScannerCallable.java
URL: http://svn.apache.org/viewvc/hadoop/hbase/trunk/src/java/org/apache/hadoop/hbase/client/transactional/TransactionScannerCallable.java?rev=686650&view=auto
==============================================================================
--- hadoop/hbase/trunk/src/java/org/apache/hadoop/hbase/client/transactional/TransactionScannerCallable.java (added)
+++ hadoop/hbase/trunk/src/java/org/apache/hadoop/hbase/client/transactional/TransactionScannerCallable.java Sun Aug 17 15:03:42 2008
@@ -0,0 +1,51 @@
+/**
+ * Copyright 2008 The Apache Software Foundation
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.client.transactional;
+
+import java.io.IOException;
+
+import org.apache.hadoop.hbase.client.HConnection;
+import org.apache.hadoop.hbase.client.ScannerCallable;
+import org.apache.hadoop.hbase.filter.RowFilterInterface;
+import org.apache.hadoop.hbase.ipc.TransactionalRegionInterface;
+
+class TransactionScannerCallable extends ScannerCallable {
+
+ private TransactionState transactionState;
+
+ TransactionScannerCallable(final TransactionState transactionState,
+ final HConnection connection, final byte[] tableName,
+ final byte[][] columns, final byte[] startRow, final long timestamp,
+ final RowFilterInterface filter) {
+ super(connection, tableName, columns, startRow, timestamp, filter);
+ this.transactionState = transactionState;
+ }
+
+ @Override
+ protected long openScanner() throws IOException {
+ if (transactionState.addRegion(location)) {
+ ((TransactionalRegionInterface) server).beginTransaction(transactionState
+ .getTransactionId(), location.getRegionInfo().getRegionName());
+ }
+ return ((TransactionalRegionInterface) server).openScanner(transactionState
+ .getTransactionId(), this.location.getRegionInfo().getRegionName(),
+ getColumns(), row, getTimestamp(), getFilter());
+ }
+}
Added: hadoop/hbase/trunk/src/java/org/apache/hadoop/hbase/client/transactional/TransactionState.java
URL: http://svn.apache.org/viewvc/hadoop/hbase/trunk/src/java/org/apache/hadoop/hbase/client/transactional/TransactionState.java?rev=686650&view=auto
==============================================================================
--- hadoop/hbase/trunk/src/java/org/apache/hadoop/hbase/client/transactional/TransactionState.java (added)
+++ hadoop/hbase/trunk/src/java/org/apache/hadoop/hbase/client/transactional/TransactionState.java Sun Aug 17 15:03:42 2008
@@ -0,0 +1,75 @@
+/**
+ * Copyright 2008 The Apache Software Foundation
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.client.transactional;
+
+import java.util.HashSet;
+import java.util.Set;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.hbase.HRegionLocation;
+
+/**
+ * Holds client-side transaction information. Client's use them as opaque
+ * objects passed around to transaction operations.
+ *
+ */
+public class TransactionState {
+ static final Log LOG = LogFactory.getLog(TransactionState.class);
+
+ private final long transactionId;
+
+ private Set<HRegionLocation> participatingRegions = new HashSet<HRegionLocation>();
+
+ TransactionState(final long transactionId) {
+ this.transactionId = transactionId;
+ }
+
+ boolean addRegion(final HRegionLocation hregion) {
+ boolean added = participatingRegions.add(hregion);
+
+ if (added) {
+ LOG.debug("Adding new hregion ["
+ + hregion.getRegionInfo().getRegionNameAsString()
+ + "] to transaction [" + transactionId + "]");
+ }
+
+ return added;
+ }
+
+ Set<HRegionLocation> getParticipatingRegions() {
+ return participatingRegions;
+ }
+
+ /**
+ * Get the transactionId.
+ *
+ * @return Return the transactionId.
+ */
+ public long getTransactionId() {
+ return transactionId;
+ }
+
+ @Override
+ public String toString() {
+ return "id: " + transactionId + ", particpants: "
+ + participatingRegions.size();
+ }
+}
Added: hadoop/hbase/trunk/src/java/org/apache/hadoop/hbase/client/transactional/TransactionalTable.java
URL: http://svn.apache.org/viewvc/hadoop/hbase/trunk/src/java/org/apache/hadoop/hbase/client/transactional/TransactionalTable.java?rev=686650&view=auto
==============================================================================
--- hadoop/hbase/trunk/src/java/org/apache/hadoop/hbase/client/transactional/TransactionalTable.java (added)
+++ hadoop/hbase/trunk/src/java/org/apache/hadoop/hbase/client/transactional/TransactionalTable.java Sun Aug 17 15:03:42 2008
@@ -0,0 +1,401 @@
+/**
+ * Copyright 2008 The Apache Software Foundation
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.client.transactional;
+
+import java.io.IOException;
+
+import org.apache.hadoop.hbase.HBaseConfiguration;
+import org.apache.hadoop.hbase.HConstants;
+import org.apache.hadoop.hbase.client.HConnection;
+import org.apache.hadoop.hbase.client.HTable;
+import org.apache.hadoop.hbase.client.Scanner;
+import org.apache.hadoop.hbase.client.ScannerCallable;
+import org.apache.hadoop.hbase.client.ServerCallable;
+import org.apache.hadoop.hbase.filter.RowFilterInterface;
+import org.apache.hadoop.hbase.io.BatchUpdate;
+import org.apache.hadoop.hbase.io.Cell;
+import org.apache.hadoop.hbase.io.RowResult;
+import org.apache.hadoop.hbase.ipc.TransactionalRegionInterface;
+import org.apache.hadoop.io.Text;
+
+/**
+ * Table with transactional support.
+ *
+ */
+public class TransactionalTable extends HTable {
+
+ public TransactionalTable(final HBaseConfiguration conf,
+ final String tableName) throws IOException {
+ super(conf, tableName);
+ }
+
+ public TransactionalTable(final HBaseConfiguration conf, final Text tableName)
+ throws IOException {
+ super(conf, tableName);
+ }
+
+ public TransactionalTable(final HBaseConfiguration conf,
+ final byte[] tableName) throws IOException {
+ super(conf, tableName);
+ }
+
+ private static abstract class TransactionalServerCallable<T> extends
+ ServerCallable<T> {
+ protected TransactionState transactionState;
+
+ protected TransactionalRegionInterface getTransactionServer() {
+ return (TransactionalRegionInterface) server;
+ }
+
+ protected void recordServer() throws IOException {
+ if (transactionState.addRegion(location)) {
+ getTransactionServer().beginTransaction(
+ transactionState.getTransactionId(),
+ location.getRegionInfo().getRegionName());
+ }
+ }
+
+ public TransactionalServerCallable(final HConnection connection,
+ final byte[] tableName, final byte[] row,
+ final TransactionState transactionState) {
+ super(connection, tableName, row);
+ this.transactionState = transactionState;
+ }
+
+ }
+
+ /**
+ * Get a single value for the specified row and column
+ *
+ * @param row row key
+ * @param column column name
+ * @return value for specified row/column
+ * @throws IOException
+ */
+ public Cell get(final TransactionState transactionState, final byte[] row,
+ final byte[] column) throws IOException {
+ return super.getConnection().getRegionServerWithRetries(
+ new TransactionalServerCallable<Cell>(super.getConnection(), super
+ .getTableName(), row, transactionState) {
+ public Cell call() throws IOException {
+ recordServer();
+ return getTransactionServer().get(
+ transactionState.getTransactionId(),
+ location.getRegionInfo().getRegionName(), row, column);
+ }
+ });
+ }
+
+ /**
+ * Get the specified number of versions of the specified row and column
+ *
+ * @param row - row key
+ * @param column - column name
+ * @param numVersions - number of versions to retrieve
+ * @return - array byte values
+ * @throws IOException
+ */
+ public Cell[] get(final TransactionState transactionState, final byte[] row,
+ final byte[] column, final int numVersions) throws IOException {
+ Cell[] values = null;
+ values = super.getConnection().getRegionServerWithRetries(
+ new TransactionalServerCallable<Cell[]>(super.getConnection(), super
+ .getTableName(), row, transactionState) {
+ public Cell[] call() throws IOException {
+ recordServer();
+ return getTransactionServer().get(
+ transactionState.getTransactionId(),
+ location.getRegionInfo().getRegionName(), row, column,
+ numVersions);
+ }
+ });
+
+ return values;
+ }
+
+ /**
+ * Get the specified number of versions of the specified row and column with
+ * the specified timestamp.
+ *
+ * @param row - row key
+ * @param column - column name
+ * @param timestamp - timestamp
+ * @param numVersions - number of versions to retrieve
+ * @return - array of values that match the above criteria
+ * @throws IOException
+ */
+ public Cell[] get(final TransactionState transactionState, final byte[] row,
+ final byte[] column, final long timestamp, final int numVersions)
+ throws IOException {
+ Cell[] values = null;
+ values = super.getConnection().getRegionServerWithRetries(
+ new TransactionalServerCallable<Cell[]>(super.getConnection(), super
+ .getTableName(), row, transactionState) {
+ public Cell[] call() throws IOException {
+ recordServer();
+ return getTransactionServer().get(
+ transactionState.getTransactionId(),
+ location.getRegionInfo().getRegionName(), row, column,
+ timestamp, numVersions);
+ }
+ });
+
+ return values;
+ }
+
+ /**
+ * Get all the data for the specified row at the latest timestamp
+ *
+ * @param row row key
+ * @return RowResult is empty if row does not exist.
+ * @throws IOException
+ */
+ public RowResult getRow(final TransactionState transactionState,
+ final byte[] row) throws IOException {
+ return getRow(transactionState, row, HConstants.LATEST_TIMESTAMP);
+ }
+
+ /**
+ * Get all the data for the specified row at a specified timestamp
+ *
+ * @param row row key
+ * @param ts timestamp
+ * @return RowResult is empty if row does not exist.
+ * @throws IOException
+ */
+ public RowResult getRow(final TransactionState transactionState,
+ final byte[] row, final long ts) throws IOException {
+ return super.getConnection().getRegionServerWithRetries(
+ new TransactionalServerCallable<RowResult>(super.getConnection(), super
+ .getTableName(), row, transactionState) {
+ public RowResult call() throws IOException {
+ recordServer();
+ return getTransactionServer().getRow(
+ transactionState.getTransactionId(),
+ location.getRegionInfo().getRegionName(), row, ts);
+ }
+ });
+ }
+
+ /**
+ * Get selected columns for the specified row at the latest timestamp
+ *
+ * @param row row key
+ * @param columns Array of column names you want to retrieve.
+ * @return RowResult is empty if row does not exist.
+ * @throws IOException
+ */
+ public RowResult getRow(final TransactionState transactionState,
+ final byte[] row, final byte[][] columns) throws IOException {
+ return getRow(transactionState, row, columns, HConstants.LATEST_TIMESTAMP);
+ }
+
+ /**
+ * Get selected columns for the specified row at a specified timestamp
+ *
+ * @param row row key
+ * @param columns Array of column names you want to retrieve.
+ * @param ts timestamp
+ * @return RowResult is empty if row does not exist.
+ * @throws IOException
+ */
+ public RowResult getRow(final TransactionState transactionState,
+ final byte[] row, final byte[][] columns, final long ts)
+ throws IOException {
+ return super.getConnection().getRegionServerWithRetries(
+ new TransactionalServerCallable<RowResult>(super.getConnection(), super
+ .getTableName(), row, transactionState) {
+ public RowResult call() throws IOException {
+ recordServer();
+ return getTransactionServer().getRow(
+ transactionState.getTransactionId(),
+ location.getRegionInfo().getRegionName(), row, columns, ts);
+ }
+ });
+ }
+
+ /**
+ * Delete all cells that match the passed row and whose timestamp is equal-to
+ * or older than the passed timestamp.
+ *
+ * @param row Row to update
+ * @param column name of column whose value is to be deleted
+ * @param ts Delete all cells of the same timestamp or older.
+ * @throws IOException
+ */
+ public void deleteAll(final TransactionState transactionState,
+ final byte[] row, final long ts) throws IOException {
+ super.getConnection().getRegionServerWithRetries(
+ new TransactionalServerCallable<Boolean>(super.getConnection(), super
+ .getTableName(), row, transactionState) {
+ public Boolean call() throws IOException {
+ recordServer();
+ getTransactionServer().deleteAll(
+ transactionState.getTransactionId(),
+ location.getRegionInfo().getRegionName(), row, ts);
+ return null;
+ }
+ });
+ }
+
+ /**
+ * Get a scanner on the current table starting at first row. Return the
+ * specified columns.
+ *
+ * @param columns columns to scan. If column name is a column family, all
+ * columns of the specified column family are returned. Its also possible to
+ * pass a regex in the column qualifier. A column qualifier is judged to be a
+ * regex if it contains at least one of the following characters:
+ * <code>\+|^&*$[]]}{)(</code>.
+ * @return scanner
+ * @throws IOException
+ */
+ public Scanner getScanner(final TransactionState transactionState,
+ final byte[][] columns) throws IOException {
+ return getScanner(transactionState, columns, HConstants.EMPTY_START_ROW,
+ HConstants.LATEST_TIMESTAMP, null);
+ }
+
+ /**
+ * Get a scanner on the current table starting at the specified row. Return
+ * the specified columns.
+ *
+ * @param columns columns to scan. If column name is a column family, all
+ * columns of the specified column family are returned. Its also possible to
+ * pass a regex in the column qualifier. A column qualifier is judged to be a
+ * regex if it contains at least one of the following characters:
+ * <code>\+|^&*$[]]}{)(</code>.
+ * @param startRow starting row in table to scan
+ * @return scanner
+ * @throws IOException
+ */
+ public Scanner getScanner(final TransactionState transactionState,
+ final byte[][] columns, final byte[] startRow) throws IOException {
+ return getScanner(transactionState, columns, startRow,
+ HConstants.LATEST_TIMESTAMP, null);
+ }
+
+ /**
+ * Get a scanner on the current table starting at the specified row. Return
+ * the specified columns.
+ *
+ * @param columns columns to scan. If column name is a column family, all
+ * columns of the specified column family are returned. Its also possible to
+ * pass a regex in the column qualifier. A column qualifier is judged to be a
+ * regex if it contains at least one of the following characters:
+ * <code>\+|^&*$[]]}{)(</code>.
+ * @param startRow starting row in table to scan
+ * @param timestamp only return results whose timestamp <= this value
+ * @return scanner
+ * @throws IOException
+ */
+ public Scanner getScanner(final TransactionState transactionState,
+ final byte[][] columns, final byte[] startRow, final long timestamp)
+ throws IOException {
+ return getScanner(transactionState, columns, startRow, timestamp, null);
+ }
+
+ /**
+ * Get a scanner on the current table starting at the specified row. Return
+ * the specified columns.
+ *
+ * @param columns columns to scan. If column name is a column family, all
+ * columns of the specified column family are returned. Its also possible to
+ * pass a regex in the column qualifier. A column qualifier is judged to be a
+ * regex if it contains at least one of the following characters:
+ * <code>\+|^&*$[]]}{)(</code>.
+ * @param startRow starting row in table to scan
+ * @param filter a row filter using row-key regexp and/or column data filter.
+ * @return scanner
+ * @throws IOException
+ */
+ public Scanner getScanner(final TransactionState transactionState,
+ final byte[][] columns, final byte[] startRow,
+ final RowFilterInterface filter) throws IOException {
+ return getScanner(transactionState, columns, startRow,
+ HConstants.LATEST_TIMESTAMP, filter);
+ }
+
+ /**
+ * Get a scanner on the current table starting at the specified row. Return
+ * the specified columns.
+ *
+ * @param columns columns to scan. If column name is a column family, all
+ * columns of the specified column family are returned. Its also possible to
+ * pass a regex in the column qualifier. A column qualifier is judged to be a
+ * regex if it contains at least one of the following characters:
+ * <code>\+|^&*$[]]}{)(</code>.
+ * @param startRow starting row in table to scan
+ * @param timestamp only return results whose timestamp <= this value
+ * @param filter a row filter using row-key regexp and/or column data filter.
+ * @return scanner
+ * @throws IOException
+ */
+ public Scanner getScanner(final TransactionState transactionState,
+ final byte[][] columns, final byte[] startRow, final long timestamp,
+ final RowFilterInterface filter) throws IOException {
+ ClientScanner scanner = new TransactionalClientScanner(transactionState, columns, startRow,
+ timestamp, filter);
+ scanner.initialize();
+ return scanner;
+ }
+
+ /**
+ * Commit a BatchUpdate to the table.
+ *
+ * @param batchUpdate
+ * @throws IOException
+ */
+ public synchronized void commit(final TransactionState transactionState,
+ final BatchUpdate batchUpdate) throws IOException {
+ super.getConnection().getRegionServerWithRetries(
+ new TransactionalServerCallable<Boolean>(super.getConnection(), super
+ .getTableName(), batchUpdate.getRow(), transactionState) {
+ public Boolean call() throws IOException {
+ recordServer();
+ getTransactionServer().batchUpdate(
+ transactionState.getTransactionId(),
+ location.getRegionInfo().getRegionName(), batchUpdate);
+ return null;
+ }
+ });
+ }
+
+ protected class TransactionalClientScanner extends HTable.ClientScanner {
+
+ private TransactionState transactionState;
+
+ protected TransactionalClientScanner(
+ final TransactionState transactionState, final byte[][] columns,
+ final byte[] startRow, final long timestamp,
+ final RowFilterInterface filter) {
+ super(columns, startRow, timestamp, filter);
+ this.transactionState = transactionState;
+ }
+
+ @Override
+ protected ScannerCallable getScannerCallable(final byte[] localStartKey) {
+ return new TransactionScannerCallable(transactionState, getConnection(),
+ getTableName(), getColumns(), localStartKey, getTimestamp(),
+ getFilter());
+ }
+ }
+
+}
Added: hadoop/hbase/trunk/src/java/org/apache/hadoop/hbase/client/transactional/UnknownTransactionException.java
URL: http://svn.apache.org/viewvc/hadoop/hbase/trunk/src/java/org/apache/hadoop/hbase/client/transactional/UnknownTransactionException.java?rev=686650&view=auto
==============================================================================
--- hadoop/hbase/trunk/src/java/org/apache/hadoop/hbase/client/transactional/UnknownTransactionException.java (added)
+++ hadoop/hbase/trunk/src/java/org/apache/hadoop/hbase/client/transactional/UnknownTransactionException.java Sun Aug 17 15:03:42 2008
@@ -0,0 +1,41 @@
+/**
+ * Copyright 2008 The Apache Software Foundation
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.client.transactional;
+
+import org.apache.hadoop.hbase.DoNotRetryIOException;
+
+/**
+ * Thrown if a region server is passed an unknown transaction id
+ */
+ public class UnknownTransactionException extends DoNotRetryIOException {
+
+ /** constructor */
+ public UnknownTransactionException() {
+ super();
+ }
+
+ /**
+ * Constructor
+ * @param s message
+ */
+ public UnknownTransactionException(String s) {
+ super(s);
+ }
+}
Added: hadoop/hbase/trunk/src/java/org/apache/hadoop/hbase/client/transactional/package.html
URL: http://svn.apache.org/viewvc/hadoop/hbase/trunk/src/java/org/apache/hadoop/hbase/client/transactional/package.html?rev=686650&view=auto
==============================================================================
--- hadoop/hbase/trunk/src/java/org/apache/hadoop/hbase/client/transactional/package.html (added)
+++ hadoop/hbase/trunk/src/java/org/apache/hadoop/hbase/client/transactional/package.html Sun Aug 17 15:03:42 2008
@@ -0,0 +1,59 @@
+<!DOCTYPE HTML PUBLIC "-//W3C//DTD HTML 3.2 Final//EN">
+<html>
+
+<!--
+ Licensed to the Apache Software Foundation (ASF) under one or more
+ contributor license agreements. See the NOTICE file distributed with
+ this work for additional information regarding copyright ownership.
+ The ASF licenses this file to You under the Apache License, Version 2.0
+ (the "License"); you may not use this file except in compliance with
+ the License. You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+ Unless required by applicable law or agreed to in writing, software
+ distributed under the License is distributed on an "AS IS" BASIS,
+ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ See the License for the specific language governing permissions and
+ limitations under the License.
+-->
+
+<head />
+<body bgcolor="white">
+
+This package provides support for atomic transactions. Transactions can
+span multiple regions. Transaction writes are applied when committing a
+transaction. At commit time, the transaction is examined to see if it
+can be applied while still maintaining atomicity. This is done by
+looking for conflicts with the transactions that committed while the
+current transaction was running. This technique is known as optimistic
+concurrency control (OCC) because it relies on the assumption that
+transactions will mostly not have conflicts with each other.
+
+<p>
+For more details on OCC, see the paper <i> On Optimistic Methods for Concurrency Control </i>
+by Kung and Robinson available
+<a href=http://www.seas.upenn.edu/~zives/cis650/papers/opt-cc.pdf> here </a>.
+
+<p> To enable transactions, modify hbase-site.xml to turn on the
+TransactionalRegionServer. This is done by setting
+<i>hbase.regionserver.class</i> to
+<i>org.apache.hadoop.hbase.ipc.TransactionalRegionInterface</i> and
+<i>hbase.regionserver.impl </i> to
+<i>org.apache.hadoop.hbase.regionserver.transactional.TransactionalRegionServer</i>
+
+<h3> Known Issues </h3>
+
+Recovery in the face of hregion server failure
+is not fully implemented. Thus, you cannot rely on the transactional
+properties in the face of node failure.
+
+<p> In order to avoid phantom reads on scanners, scanners currently
+claim a <i>write set</i> for all rows in every regions which they scan
+through. This means that if transaction A writes to a region that
+transaction B is scanning, then there is a conflict (only one
+transacton can be committed). This will occur even if the scanner
+never went over the row that was written.
+
+</body>
+</html>
Added: hadoop/hbase/trunk/src/java/org/apache/hadoop/hbase/ipc/TransactionalRegionInterface.java
URL: http://svn.apache.org/viewvc/hadoop/hbase/trunk/src/java/org/apache/hadoop/hbase/ipc/TransactionalRegionInterface.java?rev=686650&view=auto
==============================================================================
--- hadoop/hbase/trunk/src/java/org/apache/hadoop/hbase/ipc/TransactionalRegionInterface.java (added)
+++ hadoop/hbase/trunk/src/java/org/apache/hadoop/hbase/ipc/TransactionalRegionInterface.java Sun Aug 17 15:03:42 2008
@@ -0,0 +1,178 @@
+/*
+ * $Id$
+ * Created on Jun 4, 2008
+ *
+ */
+package org.apache.hadoop.hbase.ipc;
+
+import java.io.IOException;
+
+import org.apache.hadoop.hbase.filter.RowFilterInterface;
+import org.apache.hadoop.hbase.io.BatchUpdate;
+import org.apache.hadoop.hbase.io.Cell;
+import org.apache.hadoop.hbase.io.RowResult;
+
+/**
+ * Interface for transactional region servers.
+ *
+ */
+public interface TransactionalRegionInterface extends HRegionInterface {
+ /** Interface version number */
+ public static final long versionID = 1L;
+
+ /**
+ * Sent to initiate a transaction.
+ *
+ * @param transactionId
+ * @param regionName name of region
+ */
+ public void beginTransaction(long transactionId, final byte[] regionName)
+ throws IOException;
+
+ /**
+ * Retrieve a single value from the specified region for the specified row and
+ * column keys
+ *
+ * @param regionName name of region
+ * @param row row key
+ * @param column column key
+ * @return alue for that region/row/column
+ * @throws IOException
+ */
+ public Cell get(long transactionId, final byte[] regionName,
+ final byte[] row, final byte[] column) throws IOException;
+
+ /**
+ * Get the specified number of versions of the specified row and column
+ *
+ * @param regionName region name
+ * @param row row key
+ * @param column column key
+ * @param numVersions number of versions to return
+ * @return array of values
+ * @throws IOException
+ */
+ public Cell[] get(long transactionId, final byte[] regionName,
+ final byte[] row, final byte[] column, final int numVersions)
+ throws IOException;
+
+ /**
+ * Get the specified number of versions of the specified row and column with
+ * the specified timestamp.
+ *
+ * @param regionName region name
+ * @param row row key
+ * @param column column key
+ * @param timestamp timestamp
+ * @param numVersions number of versions to return
+ * @return array of values
+ * @throws IOException
+ */
+ public Cell[] get(long transactionId, final byte[] regionName,
+ final byte[] row, final byte[] column, final long timestamp,
+ final int numVersions) throws IOException;
+
+ /**
+ * Get all the data for the specified row at a given timestamp
+ *
+ * @param regionName region name
+ * @param row row key
+ * @return map of values
+ * @throws IOException
+ */
+ public RowResult getRow(long transactionId, final byte[] regionName,
+ final byte[] row, final long ts) throws IOException;
+
+ /**
+ * Get selected columns for the specified row at a given timestamp.
+ *
+ * @param regionName region name
+ * @param row row key
+ * @return map of values
+ * @throws IOException
+ */
+ public RowResult getRow(long transactionId, final byte[] regionName,
+ final byte[] row, final byte[][] columns, final long ts)
+ throws IOException;
+
+ /**
+ * Get selected columns for the specified row at the latest timestamp.
+ *
+ * @param regionName region name
+ * @param row row key
+ * @return map of values
+ * @throws IOException
+ */
+ public RowResult getRow(long transactionId, final byte[] regionName,
+ final byte[] row, final byte[][] columns) throws IOException;
+
+ /**
+ * Delete all cells that match the passed row and whose timestamp is equal-to
+ * or older than the passed timestamp.
+ *
+ * @param regionName region name
+ * @param row row key
+ * @param timestamp Delete all entries that have this timestamp or older
+ * @throws IOException
+ */
+ public void deleteAll(long transactionId, byte[] regionName, byte[] row,
+ long timestamp) throws IOException;
+
+ /**
+ * Opens a remote scanner with a RowFilter.
+ *
+ * @param transactionId
+ * @param regionName name of region to scan
+ * @param columns columns to scan. If column name is a column family, all
+ * columns of the specified column family are returned. Its also possible to
+ * pass a regex for column family name. A column name is judged to be regex if
+ * it contains at least one of the following characters:
+ * <code>\+|^&*$[]]}{)(</code>.
+ * @param startRow starting row to scan
+ * @param timestamp only return values whose timestamp is <= this value
+ * @param filter RowFilter for filtering results at the row-level.
+ *
+ * @return scannerId scanner identifier used in other calls
+ * @throws IOException
+ */
+ public long openScanner(final long transactionId, final byte[] regionName,
+ final byte[][] columns, final byte[] startRow, long timestamp,
+ RowFilterInterface filter) throws IOException;
+
+ /**
+ * Applies a batch of updates via one RPC
+ *
+ * @param regionName name of the region to update
+ * @param b BatchUpdate
+ * @throws IOException
+ */
+ public void batchUpdate(long transactionId, final byte[] regionName,
+ final BatchUpdate b) throws IOException;
+
+ /**
+ * Ask if we can commit the given transaction.
+ *
+ * @param transactionId
+ * @return true if we can commit
+ */
+ public boolean commitRequest(final byte[] regionName, long transactionId)
+ throws IOException;
+
+ /**
+ * Commit the transaction.
+ *
+ * @param transactionId
+ * @return
+ */
+ public void commit(final byte[] regionName, long transactionId)
+ throws IOException;
+
+ /**
+ * Abort the transaction.
+ *
+ * @param transactionId
+ * @return
+ */
+ public void abort(final byte[] regionName, long transactionId)
+ throws IOException;
+}
Modified: hadoop/hbase/trunk/src/java/org/apache/hadoop/hbase/regionserver/HLog.java
URL: http://svn.apache.org/viewvc/hadoop/hbase/trunk/src/java/org/apache/hadoop/hbase/regionserver/HLog.java?rev=686650&r1=686649&r2=686650&view=diff
==============================================================================
--- hadoop/hbase/trunk/src/java/org/apache/hadoop/hbase/regionserver/HLog.java (original)
+++ hadoop/hbase/trunk/src/java/org/apache/hadoop/hbase/regionserver/HLog.java Sun Aug 17 15:03:42 2008
@@ -171,7 +171,7 @@
* Accessor for tests.
* @return Current state of the monotonically increasing file id.
*/
- long getFilenum() {
+ public long getFilenum() {
return this.filenum;
}
@@ -204,6 +204,10 @@
}
}
}
+
+ public long getSequenceNumber() {
+ return logSeqNum;
+ }
/**
* Roll the log writer. That is, start writing log messages to a new file.
@@ -311,7 +315,7 @@
* This is a convenience method that computes a new filename with a given
* file-number.
*/
- Path computeFilename(final long fn) {
+ public Path computeFilename(final long fn) {
return new Path(dir, HLOG_DATFILE + fn);
}
@@ -330,7 +334,7 @@
*
* @throws IOException
*/
- void close() throws IOException {
+ public void close() throws IOException {
cacheFlushLock.lock();
try {
synchronized (updateLock) {
@@ -391,13 +395,8 @@
new HLogKey(regionName, tableName, key.getRow(), seqNum[counter++]);
HLogEdit logEdit =
new HLogEdit(key.getColumn(), es.getValue(), key.getTimestamp());
- try {
- this.writer.append(logKey, logEdit);
- } catch (IOException e) {
- LOG.fatal("Could not append. Requesting close of log", e);
- requestLogRoll();
- throw e;
- }
+ doWrite(logKey, logEdit);
+
this.numEntries++;
}
}
@@ -411,6 +410,63 @@
this.listener.logRollRequested();
}
}
+
+ private void doWrite(HLogKey logKey, HLogEdit logEdit) throws IOException {
+ try {
+ this.writer.append(logKey, logEdit);
+ } catch (IOException e) {
+ LOG.fatal("Could not append. Requesting close of log", e);
+ requestLogRoll();
+ throw e;
+ }
+ }
+
+ /** Append an entry without a row to the log.
+ *
+ * @param regionInfo
+ * @param logEdit
+ * @throws IOException
+ */
+ public void append(HRegionInfo regionInfo, HLogEdit logEdit) throws IOException {
+ this.append(regionInfo, new byte[0], logEdit);
+ }
+
+ /** Append an entry to the log.
+ *
+ * @param regionName
+ * @param tableName
+ * @param row
+ * @param logEdit
+ * @throws IOException
+ */
+ public void append(HRegionInfo regionInfo, byte [] row, HLogEdit logEdit) throws IOException {
+ if (closed) {
+ throw new IOException("Cannot append; log is closed");
+ }
+ byte [] regionName = regionInfo.getRegionName();
+ byte [] tableName = regionInfo.getTableDesc().getName();
+
+ synchronized (updateLock) {
+ long seqNum = obtainSeqNum();
+ // The 'lastSeqWritten' map holds the sequence number of the oldest
+ // write for each region. When the cache is flushed, the entry for the
+ // region being flushed is removed if the sequence number of the flush
+ // is greater than or equal to the value in lastSeqWritten.
+ if (!this.lastSeqWritten.containsKey(regionName)) {
+ this.lastSeqWritten.put(regionName, Long.valueOf(seqNum));
+ }
+
+ HLogKey logKey = new HLogKey(regionName, tableName, row, seqNum);
+ doWrite(logKey, logEdit);
+ this.numEntries++;
+ }
+
+ if (this.numEntries > this.maxlogentries) {
+ if (listener != null) {
+ listener.logRollRequested();
+ }
+ }
+ }
/** @return How many items have been added to the log */
int getNumEntries() {
@@ -508,6 +564,10 @@
this.cacheFlushLock.unlock();
}
+ public static boolean isMetaColumn(byte [] column) {
+ return Bytes.equals(METACOLUMN, column);
+ }
+
/**
* Split up a bunch of log files, that are no longer being written to, into
* new files, one per region. Delete the old log files when finished.
Modified: hadoop/hbase/trunk/src/java/org/apache/hadoop/hbase/regionserver/HLogEdit.java
URL: http://svn.apache.org/viewvc/hadoop/hbase/trunk/src/java/org/apache/hadoop/hbase/regionserver/HLogEdit.java?rev=686650&r1=686649&r2=686650&view=diff
==============================================================================
--- hadoop/hbase/trunk/src/java/org/apache/hadoop/hbase/regionserver/HLogEdit.java (original)
+++ hadoop/hbase/trunk/src/java/org/apache/hadoop/hbase/regionserver/HLogEdit.java Sun Aug 17 15:03:42 2008
@@ -19,6 +19,7 @@
*/
package org.apache.hadoop.hbase.regionserver;
+import org.apache.hadoop.hbase.io.BatchOperation;
import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.io.*;
@@ -62,11 +63,20 @@
public static boolean isDeleted(final byte [] value) {
return (value == null)? false: deleteBytes.compareTo(value) == 0;
}
+
+ public enum TransactionalOperation {
+ START, WRITE, COMMIT, ABORT
+ }
private byte [] column;
private byte [] val;
private long timestamp;
private static final int MAX_VALUE_LEN = 128;
+
+ private boolean isTransactionEntry;
+ private Long transactionId = null;
+ private TransactionalOperation operation;
+
/**
* Default constructor used by Writable
@@ -85,6 +95,34 @@
this.column = c;
this.val = bval;
this.timestamp = timestamp;
+ this.isTransactionEntry = false;
+ }
+
+ /** Construct a WRITE transaction.
+ *
+ * @param transactionId
+ * @param op
+ * @param timestamp
+ */
+ public HLogEdit(long transactionId, BatchOperation op, long timestamp) {
+ this(op.getColumn(), op.getValue(), timestamp);
+ // This covers delete ops too...
+ this.transactionId = transactionId;
+ this.operation = TransactionalOperation.WRITE;
+ this.isTransactionEntry = true;
+ }
+
+ /** Construct a transactional operation (BEGIN, ABORT, or COMMIT).
+ *
+ * @param transactionId
+ * @param op
+ */
+ public HLogEdit(long transactionId, TransactionalOperation op) {
+ this.column = new byte[0];
+ this.val = new byte[0];
+ this.transactionId = transactionId;
+ this.operation = op;
+ this.isTransactionEntry = true;
}
/** @return the column */
@@ -101,6 +139,28 @@
public long getTimestamp() {
return this.timestamp;
}
+
+ public boolean isTransactionEntry() {
+ return isTransactionEntry;
+ }
+
+ /**
+ * Get the transactionId, or null if this is not a transactional edit.
+ *
+ * @return Return the transactionId.
+ */
+ public Long getTransactionId() {
+ return transactionId;
+ }
+
+ /**
+ * Get the operation.
+ *
+ * @return Return the operation.
+ */
+ public TransactionalOperation getOperation() {
+ return operation;
+ }
/**
* @return First column name, timestamp, and first 128 bytes of the value
@@ -117,8 +177,13 @@
} catch (UnsupportedEncodingException e) {
throw new RuntimeException("UTF8 encoding not present?", e);
}
- return "(" + Bytes.toString(getColumn()) + "/" + getTimestamp() + "/" +
- value + ")";
+ return "("
+ + Bytes.toString(getColumn())
+ + "/"
+ + getTimestamp()
+ + "/"
+ + (isTransactionEntry ? "tran: " + transactionId + " op "
+ + operation.toString() +"/": "") + value + ")";
}
// Writable
@@ -126,9 +191,18 @@
/** {@inheritDoc} */
public void write(DataOutput out) throws IOException {
Bytes.writeByteArray(out, this.column);
- out.writeInt(this.val.length);
- out.write(this.val);
+ if (this.val == null) {
+ out.writeInt(0);
+ } else {
+ out.writeInt(this.val.length);
+ out.write(this.val);
+ }
out.writeLong(timestamp);
+ out.writeBoolean(isTransactionEntry);
+ if (isTransactionEntry) {
+ out.writeLong(transactionId);
+ out.writeUTF(operation.name());
+ }
}
/** {@inheritDoc} */
@@ -137,5 +211,10 @@
this.val = new byte[in.readInt()];
in.readFully(this.val);
this.timestamp = in.readLong();
+ isTransactionEntry = in.readBoolean();
+ if (isTransactionEntry) {
+ transactionId = in.readLong();
+ operation = TransactionalOperation.valueOf(in.readUTF());
+ }
}
}
Modified: hadoop/hbase/trunk/src/java/org/apache/hadoop/hbase/regionserver/HLogKey.java
URL: http://svn.apache.org/viewvc/hadoop/hbase/trunk/src/java/org/apache/hadoop/hbase/regionserver/HLogKey.java?rev=686650&r1=686649&r2=686650&view=diff
==============================================================================
--- hadoop/hbase/trunk/src/java/org/apache/hadoop/hbase/regionserver/HLogKey.java (original)
+++ hadoop/hbase/trunk/src/java/org/apache/hadoop/hbase/regionserver/HLogKey.java Sun Aug 17 15:03:42 2008
@@ -30,6 +30,8 @@
* The log intermingles edits to many tables and rows, so each log entry
* identifies the appropriate table and row. Within a table and row, they're
* also sorted.
+ *
+ * Some Transactional edits (START, COMMIT, ABORT) will not have an associated row.
*/
public class HLogKey implements WritableComparable {
private byte [] regionName;
@@ -64,19 +66,19 @@
// A bunch of accessors
//////////////////////////////////////////////////////////////////////////////
- byte [] getRegionName() {
+ public byte [] getRegionName() {
return regionName;
}
- byte [] getTablename() {
+ public byte [] getTablename() {
return tablename;
}
- byte [] getRow() {
+ public byte [] getRow() {
return row;
}
- long getLogSeqNum() {
+ public long getLogSeqNum() {
return logSeqNum;
}
Modified: hadoop/hbase/trunk/src/java/org/apache/hadoop/hbase/regionserver/HRegion.java
URL: http://svn.apache.org/viewvc/hadoop/hbase/trunk/src/java/org/apache/hadoop/hbase/regionserver/HRegion.java?rev=686650&r1=686649&r2=686650&view=diff
==============================================================================
--- hadoop/hbase/trunk/src/java/org/apache/hadoop/hbase/regionserver/HRegion.java (original)
+++ hadoop/hbase/trunk/src/java/org/apache/hadoop/hbase/regionserver/HRegion.java Sun Aug 17 15:03:42 2008
@@ -243,8 +243,8 @@
LOG.debug("Files for new region");
listPaths(fs, newRegionDir);
}
- HRegion dstRegion = new HRegion(basedir, log, fs, conf, newRegionInfo,
- null, null);
+ HRegion dstRegion = new HRegion(basedir, log, fs, conf, newRegionInfo, null);
+ dstRegion.initialize(null, null);
dstRegion.compactStores();
if (LOG.isDebugEnabled()) {
LOG.debug("Files for new region");
@@ -318,7 +318,7 @@
private final Map<Integer, TreeMap<HStoreKey, byte []>> targetColumns =
new ConcurrentHashMap<Integer, TreeMap<HStoreKey, byte []>>();
// Default access because read by tests.
- final Map<Integer, HStore> stores = new ConcurrentHashMap<Integer, HStore>();
+ protected final Map<Integer, HStore> stores = new ConcurrentHashMap<Integer, HStore>();
final AtomicLong memcacheSize = new AtomicLong(0);
final Path basedir;
@@ -376,7 +376,7 @@
private final ReentrantReadWriteLock updatesLock =
new ReentrantReadWriteLock();
private final Integer splitLock = new Integer(0);
- private final long minSequenceId;
+ private long minSequenceId;
final AtomicInteger activeScannerCount = new AtomicInteger(0);
//////////////////////////////////////////////////////////////////////////////
@@ -395,31 +395,6 @@
* appropriate log info for this HRegion. If there is a previous log file
* (implying that the HRegion has been written-to before), then read it from
* the supplied path.
- * @param fs is the filesystem.
- * @param conf is global configuration settings.
- * @param regionInfo - HRegionInfo that describes the region
- * @param initialFiles If there are initial files (implying that the HRegion
- * is new), then read them from the supplied path.
- * @param flushListener an object that implements CacheFlushListener or null
- * or null
- * @throws IOException
- */
- public HRegion(Path basedir, HLog log, FileSystem fs, HBaseConfiguration conf,
- HRegionInfo regionInfo, Path initialFiles,
- FlushRequester flushListener) throws IOException {
- this(basedir, log, fs, conf, regionInfo, initialFiles, flushListener, null);
- }
-
- /**
- * HRegion constructor.
- *
- * @param log The HLog is the outbound log for any updates to the HRegion
- * (There's a single HLog for all the HRegions on a single HRegionServer.)
- * The log file is a logfile from the previous execution that's
- * custom-computed for this HRegion. The HRegionServer computes and sorts the
- * appropriate log info for this HRegion. If there is a previous log file
- * (implying that the HRegion has been written-to before), then read it from
- * the supplied path.
* @param basedir qualified path of directory where region should be located,
* usually the table directory.
* @param fs is the filesystem.
@@ -434,10 +409,8 @@
* @throws IOException
*/
public HRegion(Path basedir, HLog log, FileSystem fs, HBaseConfiguration conf,
- HRegionInfo regionInfo, Path initialFiles,
- FlushRequester flushListener, final Progressable reporter)
- throws IOException {
-
+ HRegionInfo regionInfo,
+ FlushRequester flushListener) {
this.basedir = basedir;
this.log = log;
this.fs = fs;
@@ -447,7 +420,6 @@
this.threadWakeFrequency = conf.getLong(THREAD_WAKE_FREQUENCY, 10 * 1000);
String encodedNameStr = Integer.toString(this.regionInfo.getEncodedName());
this.regiondir = new Path(basedir, encodedNameStr);
- Path oldLogFile = new Path(regiondir, HREGION_OLDLOGFILE_NAME);
this.historian = RegionHistorian.getInstance();
if (LOG.isDebugEnabled()) {
@@ -457,6 +429,27 @@
this.regionCompactionDir =
new Path(getCompactionDir(basedir), encodedNameStr);
+
+ int flushSize = regionInfo.getTableDesc().getMemcacheFlushSize();
+ if (flushSize == HTableDescriptor.DEFAULT_MEMCACHE_FLUSH_SIZE) {
+ flushSize = conf.getInt("hbase.hregion.memcache.flush.size",
+ HTableDescriptor.DEFAULT_MEMCACHE_FLUSH_SIZE);
+ }
+ this.memcacheFlushSize = flushSize;
+
+ this.blockingMemcacheSize = this.memcacheFlushSize *
+ conf.getInt("hbase.hregion.memcache.block.multiplier", 1);
+ }
+
+ /** Initialize this region and get it ready to roll.
+ *
+ * @param initialFiles
+ * @param reporter
+ * @throws IOException
+ */
+ public void initialize( Path initialFiles,
+ final Progressable reporter) throws IOException {
+ Path oldLogFile = new Path(regiondir, HREGION_OLDLOGFILE_NAME);
// Move prefab HStore files into place (if any). This picks up split files
// and any merges from splits and merges dirs.
@@ -466,16 +459,20 @@
// Load in all the HStores.
long maxSeqId = -1;
+ long minSeqId = Integer.MAX_VALUE;
for (HColumnDescriptor c : this.regionInfo.getTableDesc().getFamilies()) {
HStore store = instantiateHStore(this.basedir, c, oldLogFile, reporter);
stores.put(Bytes.mapKey(c.getName()), store);
long storeSeqId = store.getMaxSequenceId();
if (storeSeqId > maxSeqId) {
maxSeqId = storeSeqId;
+ }
+ if (storeSeqId < minSeqId) {
+ minSeqId = storeSeqId;
}
}
- doReconstructionLog(oldLogFile, maxSeqId, reporter);
+ doReconstructionLog(oldLogFile, minSeqId, maxSeqId, reporter);
if (fs.exists(oldLogFile)) {
if (LOG.isDebugEnabled()) {
@@ -501,17 +498,6 @@
if (fs.exists(merges)) {
fs.delete(merges, true);
}
-
- int flushSize = regionInfo.getTableDesc().getMemcacheFlushSize();
- if (flushSize == HTableDescriptor.DEFAULT_MEMCACHE_FLUSH_SIZE) {
- flushSize = conf.getInt("hbase.hregion.memcache.flush.size",
- HTableDescriptor.DEFAULT_MEMCACHE_FLUSH_SIZE);
- }
- this.memcacheFlushSize = flushSize;
-
- this.blockingMemcacheSize = this.memcacheFlushSize *
- conf.getInt("hbase.hregion.memcache.block.multiplier", 1);
-
// See if region is meant to run read-only.
if (this.regionInfo.getTableDesc().isReadOnly()) {
this.writestate.setReadOnly(true);
@@ -797,10 +783,12 @@
// Opening the region copies the splits files from the splits directory
// under each region.
HRegion regionA =
- new HRegion(basedir, log, fs, conf, regionAInfo, dirA, null);
+ new HRegion(basedir, log, fs, conf, regionAInfo, null);
+ regionA.initialize(dirA, null);
regionA.close();
HRegion regionB =
- new HRegion(basedir, log, fs, conf, regionBInfo, dirB, null);
+ new HRegion(basedir, log, fs, conf, regionBInfo, null);
+ regionB.initialize(dirB, null);
regionB.close();
// Cleanup
@@ -1029,12 +1017,14 @@
// again so its value will represent the size of the updates received
// during the flush
long sequenceId = -1L;
+ long completeSequenceId = -1L;
this.updatesLock.writeLock().lock();
try {
for (HStore s: stores.values()) {
s.snapshot();
}
sequenceId = log.startCacheFlush();
+ completeSequenceId = this.getCompleteCacheFlushSequenceId(sequenceId);
this.memcacheSize.set(0);
} finally {
this.updatesLock.writeLock().unlock();
@@ -1050,7 +1040,7 @@
// Keep running vector of all store files that includes both old and the
// just-made new flush store file.
for (HStore hstore: stores.values()) {
- boolean needsCompaction = hstore.flushCache(sequenceId);
+ boolean needsCompaction = hstore.flushCache(completeSequenceId);
if (needsCompaction) {
compactionRequested = true;
}
@@ -1077,7 +1067,7 @@
// and that all updates to the log for this regionName that have lower
// log-sequence-ids can be safely ignored.
this.log.completeCacheFlush(getRegionName(),
- regionInfo.getTableDesc().getName(), sequenceId);
+ regionInfo.getTableDesc().getName(), completeSequenceId);
// C. Finally notify anyone waiting on memcache to clear:
// e.g. checkResources().
@@ -1099,6 +1089,18 @@
return compactionRequested;
}
+ /**
+ * Get the sequence number to be associated with this cache flush. Used by
+ * TransactionalRegion to not complete pending transactions.
+ *
+ *
+ * @param currentSequenceId
+ * @return sequence id to complete the cache flush with
+ */
+ protected long getCompleteCacheFlushSequenceId(long currentSequenceId) {
+ return currentSequenceId;
+ }
+
//////////////////////////////////////////////////////////////////////////////
// get() methods for client use.
//////////////////////////////////////////////////////////////////////////////
@@ -1346,7 +1348,33 @@
* @param b
* @throws IOException
*/
- public void batchUpdate(BatchUpdate b, Integer lockid)
+ public void batchUpdate(BatchUpdate b) throws IOException {
+ this.batchUpdate(b, null, true);
+ }
+
+ /**
+ * @param b
+ * @throws IOException
+ */
+ public void batchUpdate(BatchUpdate b, boolean writeToWAL) throws IOException {
+ this.batchUpdate(b, null, writeToWAL);
+ }
+
+
+ /**
+ * @param b
+ * @throws IOException
+ */
+ public void batchUpdate(BatchUpdate b, Integer lockid) throws IOException {
+ this.batchUpdate(b, lockid, true);
+ }
+
+ /**
+ * @param b
+ * @param writeToWal if true, then we write this update to the log
+ * @throws IOException
+ */
+ public void batchUpdate(BatchUpdate b, Integer lockid, boolean writeToWAL)
throws IOException {
checkReadOnly();
@@ -1395,7 +1423,7 @@
this.targetColumns.remove(lid);
if (edits != null && edits.size() > 0) {
- update(edits);
+ update(edits, writeToWAL);
}
if (deletes != null && deletes.size() > 0) {
@@ -1597,16 +1625,25 @@
}
targets.put(key, val);
}
-
- /*
+
+ /**
* Add updates first to the hlog and then add values to memcache.
* Warning: Assumption is caller has lock on passed in row.
- * @param row Row to update.
- * @param timestamp Timestamp to record the updates against
* @param updatesByColumn Cell updates by column
* @throws IOException
*/
- private void update(final TreeMap<HStoreKey, byte []> updatesByColumn)
+ private void update(final TreeMap<HStoreKey, byte []> updatesByColumn) throws IOException {
+ this.update(updatesByColumn, true);
+ }
+
+ /**
+ * Add updates first to the hlog (if writeToWal) and then add values to memcache.
+ * Warning: Assumption is caller has lock on passed in row.
+ * @param writeToWAL if true, then we should write to the log
+ * @param updatesByColumn Cell updates by column
+ * @throws IOException
+ */
+ private void update(final TreeMap<HStoreKey, byte []> updatesByColumn, boolean writeToWAL)
throws IOException {
if (updatesByColumn == null || updatesByColumn.size() <= 0) {
return;
@@ -1615,8 +1652,10 @@
boolean flush = false;
this.updatesLock.readLock().lock();
try {
- this.log.append(regionInfo.getRegionName(),
- regionInfo.getTableDesc().getName(), updatesByColumn);
+ if (writeToWAL) {
+ this.log.append(regionInfo.getRegionName(), regionInfo.getTableDesc()
+ .getName(), updatesByColumn);
+ }
long size = 0;
for (Map.Entry<HStoreKey, byte[]> e: updatesByColumn.entrySet()) {
HStoreKey key = e.getKey();
@@ -1660,7 +1699,7 @@
// Do any reconstruction needed from the log
@SuppressWarnings("unused")
- protected void doReconstructionLog(Path oldLogFile, long maxSeqId,
+ protected void doReconstructionLog(Path oldLogFile, long minSeqId, long maxSeqId,
Progressable reporter)
throws UnsupportedEncodingException, IOException {
// Nothing to do (Replaying is done in HStores)
@@ -2105,9 +2144,11 @@
if (!info.isMetaRegion()) {
RegionHistorian.getInstance().addRegionCreation(info);
}
- return new HRegion(tableDir,
- new HLog(fs, new Path(regionDir, HREGION_LOGDIR_NAME), conf, null),
- fs, conf, info, null, null);
+ HRegion region = new HRegion(tableDir,
+ new HLog(fs, new Path(regionDir, HREGION_LOGDIR_NAME), conf, null),
+ fs, conf, info, null);
+ region.initialize(null, null);
+ return region;
}
/**
@@ -2134,7 +2175,8 @@
}
HRegion r = new HRegion(
HTableDescriptor.getTableDir(rootDir, info.getTableDesc().getName()),
- log, FileSystem.get(conf), conf, info, null, null);
+ log, FileSystem.get(conf), conf, info, null);
+ r.initialize(null, null);
if (log != null) {
log.setSequenceNumber(r.getMinSequenceId());
}
Modified: hadoop/hbase/trunk/src/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java
URL: http://svn.apache.org/viewvc/hadoop/hbase/trunk/src/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java?rev=686650&r1=686649&r2=686650&view=diff
==============================================================================
--- hadoop/hbase/trunk/src/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java (original)
+++ hadoop/hbase/trunk/src/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java Sun Aug 17 15:03:42 2008
@@ -900,13 +900,15 @@
protected HRegion instantiateRegion(final HRegionInfo regionInfo)
throws IOException {
- return new HRegion(HTableDescriptor.getTableDir(rootDir, regionInfo
- .getTableDesc().getName()), this.log, this.fs, conf, regionInfo, null,
- this.cacheFlusher, new Progressable() {
- public void progress() {
- addProcessingMessage(regionInfo);
- }
- });
+ HRegion r = new HRegion(HTableDescriptor.getTableDir(rootDir, regionInfo
+ .getTableDesc().getName()), this.log, this.fs, conf, regionInfo,
+ this.cacheFlusher);
+ r.initialize(null, new Progressable() {
+ public void progress() {
+ addProcessingMessage(regionInfo);
+ }
+ });
+ return r;
}
/*
Modified: hadoop/hbase/trunk/src/java/org/apache/hadoop/hbase/regionserver/HStore.java
URL: http://svn.apache.org/viewvc/hadoop/hbase/trunk/src/java/org/apache/hadoop/hbase/regionserver/HStore.java?rev=686650&r1=686649&r2=686650&view=diff
==============================================================================
--- hadoop/hbase/trunk/src/java/org/apache/hadoop/hbase/regionserver/HStore.java (original)
+++ hadoop/hbase/trunk/src/java/org/apache/hadoop/hbase/regionserver/HStore.java Sun Aug 17 15:03:42 2008
@@ -311,7 +311,7 @@
// Check this edit is for me. Also, guard against writing
// METACOLUMN info such as HBASE::CACHEFLUSH entries
byte [] column = val.getColumn();
- if (Bytes.equals(column, HLog.METACOLUMN)
+ if (val.isTransactionEntry() || Bytes.equals(column, HLog.METACOLUMN)
|| !Bytes.equals(key.getRegionName(), info.getRegionName())
|| !HStoreKey.matchingFamily(family.getName(), column)) {
continue;
@@ -1316,8 +1316,7 @@
* @return Matching keys.
* @throws IOException
*/
- List<HStoreKey> getKeys(final HStoreKey origin, final int versions,
- final long now)
+ public List<HStoreKey> getKeys(final HStoreKey origin, final int versions, final long now)
throws IOException {
// This code below is very close to the body of the get method. Any
// changes in the flow below should also probably be done in get. TODO:
Added: hadoop/hbase/trunk/src/java/org/apache/hadoop/hbase/regionserver/transactional/CleanOldTransactionsChore.java
URL: http://svn.apache.org/viewvc/hadoop/hbase/trunk/src/java/org/apache/hadoop/hbase/regionserver/transactional/CleanOldTransactionsChore.java?rev=686650&view=auto
==============================================================================
--- hadoop/hbase/trunk/src/java/org/apache/hadoop/hbase/regionserver/transactional/CleanOldTransactionsChore.java (added)
+++ hadoop/hbase/trunk/src/java/org/apache/hadoop/hbase/regionserver/transactional/CleanOldTransactionsChore.java Sun Aug 17 15:03:42 2008
@@ -0,0 +1,53 @@
+/**
+ * Copyright 2008 The Apache Software Foundation
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.regionserver.transactional;
+
+import java.util.concurrent.atomic.AtomicBoolean;
+
+import org.apache.hadoop.hbase.Chore;
+import org.apache.hadoop.hbase.regionserver.HRegion;
+
+/**
+ * Cleans up committed transactions when they are no longer needed to verify
+ * pending transactions.
+ */
+class CleanOldTransactionsChore extends Chore {
+
+ private static final String SLEEP_CONF = "hbase.transaction.clean.sleep";
+ private static final int DEFAULT_SLEEP = 60 * 1000;
+
+ private final TransactionalRegionServer regionServer;
+
+ public CleanOldTransactionsChore(
+ final TransactionalRegionServer regionServer,
+ final AtomicBoolean stopRequest) {
+ super(regionServer.getConfiguration().getInt(SLEEP_CONF, DEFAULT_SLEEP),
+ stopRequest);
+ this.regionServer = regionServer;
+ }
+
+ @Override
+ protected void chore() {
+ for (HRegion region : regionServer.getOnlineRegions()) {
+ ((TransactionalRegion) region).removeUnNeededCommitedTransactions();
+ }
+ }
+
+}