You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hbase.apache.org by ap...@apache.org on 2009/09/26 03:26:56 UTC

svn commit: r819085 - in /hadoop/hbase/trunk: ./ src/contrib/transactional/src/java/org/apache/hadoop/hbase/client/transactional/ src/contrib/transactional/src/java/org/apache/hadoop/hbase/regionserver/transactional/ src/contrib/transactional/src/test/...

Author: apurtell
Date: Sat Sep 26 01:26:55 2009
New Revision: 819085

URL: http://svn.apache.org/viewvc?rev=819085&view=rev
Log:
HBASE-1848 Master can't split logs written by THBase

Modified:
    hadoop/hbase/trunk/CHANGES.txt
    hadoop/hbase/trunk/src/contrib/transactional/src/java/org/apache/hadoop/hbase/client/transactional/HBaseBackedTransactionLogger.java
    hadoop/hbase/trunk/src/contrib/transactional/src/java/org/apache/hadoop/hbase/client/transactional/TransactionLogger.java
    hadoop/hbase/trunk/src/contrib/transactional/src/java/org/apache/hadoop/hbase/client/transactional/TransactionManager.java
    hadoop/hbase/trunk/src/contrib/transactional/src/java/org/apache/hadoop/hbase/client/transactional/package.html
    hadoop/hbase/trunk/src/contrib/transactional/src/java/org/apache/hadoop/hbase/regionserver/transactional/THLogRecoveryManager.java
    hadoop/hbase/trunk/src/contrib/transactional/src/java/org/apache/hadoop/hbase/regionserver/transactional/TransactionState.java
    hadoop/hbase/trunk/src/contrib/transactional/src/java/org/apache/hadoop/hbase/regionserver/transactional/TransactionalRegion.java
    hadoop/hbase/trunk/src/contrib/transactional/src/test/org/apache/hadoop/hbase/regionserver/transactional/TestTHLog.java
    hadoop/hbase/trunk/src/java/org/apache/hadoop/hbase/KeyValue.java
    hadoop/hbase/trunk/src/java/org/apache/hadoop/hbase/regionserver/HLog.java
    hadoop/hbase/trunk/src/java/org/apache/hadoop/hbase/regionserver/Store.java

Modified: hadoop/hbase/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/hadoop/hbase/trunk/CHANGES.txt?rev=819085&r1=819084&r2=819085&view=diff
==============================================================================
--- hadoop/hbase/trunk/CHANGES.txt (original)
+++ hadoop/hbase/trunk/CHANGES.txt Sat Sep 26 01:26:55 2009
@@ -49,6 +49,8 @@
                cacheBlocks
    HBASE-1869  IndexedTable delete fails when used in conjunction with RowLock()
                (Keith Thomas via Stack)
+   HBASE-1858  Master can't split logs created by THBase (Clint Morgan via
+               Andrew Purtell)
 
   IMPROVEMENTS
    HBASE-1760  Cleanup TODOs in HTable

Modified: hadoop/hbase/trunk/src/contrib/transactional/src/java/org/apache/hadoop/hbase/client/transactional/HBaseBackedTransactionLogger.java
URL: http://svn.apache.org/viewvc/hadoop/hbase/trunk/src/contrib/transactional/src/java/org/apache/hadoop/hbase/client/transactional/HBaseBackedTransactionLogger.java?rev=819085&r1=819084&r2=819085&view=diff
==============================================================================
--- hadoop/hbase/trunk/src/contrib/transactional/src/java/org/apache/hadoop/hbase/client/transactional/HBaseBackedTransactionLogger.java (original)
+++ hadoop/hbase/trunk/src/contrib/transactional/src/java/org/apache/hadoop/hbase/client/transactional/HBaseBackedTransactionLogger.java Sat Sep 26 01:26:55 2009
@@ -24,6 +24,7 @@
 
 import org.apache.hadoop.hbase.HBaseConfiguration;
 import org.apache.hadoop.hbase.HColumnDescriptor;
+import org.apache.hadoop.hbase.HConstants;
 import org.apache.hadoop.hbase.HTableDescriptor;
 import org.apache.hadoop.hbase.client.Delete;
 import org.apache.hadoop.hbase.client.Get;

Modified: hadoop/hbase/trunk/src/contrib/transactional/src/java/org/apache/hadoop/hbase/client/transactional/TransactionLogger.java
URL: http://svn.apache.org/viewvc/hadoop/hbase/trunk/src/contrib/transactional/src/java/org/apache/hadoop/hbase/client/transactional/TransactionLogger.java?rev=819085&r1=819084&r2=819085&view=diff
==============================================================================
--- hadoop/hbase/trunk/src/contrib/transactional/src/java/org/apache/hadoop/hbase/client/transactional/TransactionLogger.java (original)
+++ hadoop/hbase/trunk/src/contrib/transactional/src/java/org/apache/hadoop/hbase/client/transactional/TransactionLogger.java Sat Sep 26 01:26:55 2009
@@ -44,16 +44,22 @@
    */
   long createNewTransactionLog();
 
-  /**
+  /** Get the status of a transaction.
    * @param transactionId
    * @return transaction status
    */
   TransactionStatus getStatusForTransaction(long transactionId);
 
-  /**
+  /** Set the status for a transaction.
    * @param transactionId
    * @param status
    */
   void setStatusForTransaction(long transactionId, TransactionStatus status);
+  
+  /** This transaction's state is no longer needed.
+   * 
+   * @param transactionId
+   */
+  void forgetTransaction(long transactionId);
 
 }

Modified: hadoop/hbase/trunk/src/contrib/transactional/src/java/org/apache/hadoop/hbase/client/transactional/TransactionManager.java
URL: http://svn.apache.org/viewvc/hadoop/hbase/trunk/src/contrib/transactional/src/java/org/apache/hadoop/hbase/client/transactional/TransactionManager.java?rev=819085&r1=819084&r2=819085&view=diff
==============================================================================
--- hadoop/hbase/trunk/src/contrib/transactional/src/java/org/apache/hadoop/hbase/client/transactional/TransactionManager.java (original)
+++ hadoop/hbase/trunk/src/contrib/transactional/src/java/org/apache/hadoop/hbase/client/transactional/TransactionManager.java Sat Sep 26 01:26:55 2009
@@ -152,6 +152,8 @@
     
     if (status == TransactionalRegionInterface.COMMIT_OK) {
       doCommit(transactionState);
+    } else if (status == TransactionalRegionInterface.COMMIT_OK_READ_ONLY) {
+      transactionLogger.forgetTransaction(transactionState.getTransactionId());
     }
     LOG.debug("Committed transaction ["+transactionState.getTransactionId()+"] in ["+((System.currentTimeMillis()-startTime))+"]ms");
   }
@@ -187,7 +189,7 @@
       }
       throw new CommitUnsuccessfulException(e);
     }
-    // TODO: Transaction log can be deleted now ...
+    transactionLogger.forgetTransaction(transactionState.getTransactionId());
   }
 
   /**
@@ -230,6 +232,7 @@
                 + "]. Ignoring.");
       }
     }
+    transactionLogger.forgetTransaction(transactionState.getTransactionId());
   }
   
   public synchronized JtaXAResource getXAResource() {

Modified: hadoop/hbase/trunk/src/contrib/transactional/src/java/org/apache/hadoop/hbase/client/transactional/package.html
URL: http://svn.apache.org/viewvc/hadoop/hbase/trunk/src/contrib/transactional/src/java/org/apache/hadoop/hbase/client/transactional/package.html?rev=819085&r1=819084&r2=819085&view=diff
==============================================================================
--- hadoop/hbase/trunk/src/contrib/transactional/src/java/org/apache/hadoop/hbase/client/transactional/package.html (original)
+++ hadoop/hbase/trunk/src/contrib/transactional/src/java/org/apache/hadoop/hbase/client/transactional/package.html Sat Sep 26 01:26:55 2009
@@ -43,6 +43,10 @@
 <i>org.apache.hadoop.hbase.ipc.TransactionalRegionInterface</i> and
 <i>hbase.regionserver.impl </i> to
 <i>org.apache.hadoop.hbase.regionserver.transactional.TransactionalRegionServer</i>
+Additionally, to properly recover from the write-ahead-log, the transactional log 
+key class must be registered by setting <i>hbase.regionserver.hlog.keyclass</i>
+to <i>org.apache.hadoop.hbase.regionserver.transactional.THLogKey</i>
+
 
 <p> 
 The read set claimed by a transactional scanner is determined from the start and

Modified: hadoop/hbase/trunk/src/contrib/transactional/src/java/org/apache/hadoop/hbase/regionserver/transactional/THLogRecoveryManager.java
URL: http://svn.apache.org/viewvc/hadoop/hbase/trunk/src/contrib/transactional/src/java/org/apache/hadoop/hbase/regionserver/transactional/THLogRecoveryManager.java?rev=819085&r1=819084&r2=819085&view=diff
==============================================================================
--- hadoop/hbase/trunk/src/contrib/transactional/src/java/org/apache/hadoop/hbase/regionserver/transactional/THLogRecoveryManager.java (original)
+++ hadoop/hbase/trunk/src/contrib/transactional/src/java/org/apache/hadoop/hbase/regionserver/transactional/THLogRecoveryManager.java Sat Sep 26 01:26:55 2009
@@ -21,8 +21,8 @@
 
 import java.io.IOException;
 import java.io.UnsupportedEncodingException;
+import java.util.ArrayList;
 import java.util.HashSet;
-import java.util.LinkedList;
 import java.util.List;
 import java.util.Map;
 import java.util.Set;
@@ -38,6 +38,7 @@
 import org.apache.hadoop.hbase.HBaseConfiguration;
 import org.apache.hadoop.hbase.HRegionInfo;
 import org.apache.hadoop.hbase.KeyValue;
+import org.apache.hadoop.hbase.client.transactional.HBaseBackedTransactionLogger;
 import org.apache.hadoop.hbase.client.transactional.TransactionLogger;
 import org.apache.hadoop.hbase.util.Bytes;
 import org.apache.hadoop.io.SequenceFile;
@@ -74,6 +75,10 @@
   
 
   /**
+   * Go through the WAL, and look for transactions that were started, but never
+   * completed. If the transaction was committed, then those edits will need to
+   * be applied.
+   * 
    * @param reconstructionLog
    * @param maxSeqID
    * @param reporter
@@ -98,7 +103,7 @@
     }
 
     SortedMap<Long, List<KeyValue>> pendingTransactionsById = new TreeMap<Long, List<KeyValue>>();
-    SortedMap<Long, List<KeyValue>> commitedTransactionsById = new TreeMap<Long, List<KeyValue>>();
+    Set<Long> commitedTransactions = new HashSet<Long>();
     Set<Long> abortedTransactions = new HashSet<Long>();
 
     SequenceFile.Reader logReader = new SequenceFile.Reader(fileSystem,
@@ -118,8 +123,10 @@
           2000);
 
       while (logReader.next(key, val)) {
-        LOG.debug("Processing edit: key: " + key.toString() + " val: "
-            + val.toString());
+        if (LOG.isTraceEnabled()) {
+          LOG.trace("Processing edit: key: " + key.toString() + " val: "
+              + val.toString());
+        }
         if (key.getLogSeqNum() < maxSeqID) {
           skippedEdits++;
           continue;
@@ -135,12 +142,12 @@
 
         case START:
           if (updates != null || abortedTransactions.contains(transactionId)
-              || commitedTransactionsById.containsKey(transactionId)) {
+              || commitedTransactions.contains(transactionId)) {
             LOG.error("Processing start for transaction: " + transactionId
                 + ", but have already seen start message");
             throw new IOException("Corrupted transaction log");
           }
-          updates = new LinkedList<KeyValue>();
+          updates = new ArrayList<KeyValue>();
           pendingTransactionsById.put(transactionId, updates);
           startCount++;
           break;
@@ -179,18 +186,13 @@
                 + ", but also have abort message");
             throw new IOException("Corrupted transaction log");
           }
-          if (updates.size() == 0) {
-            LOG
-                .warn("Transaciton " + transactionId
-                    + " has no writes in log. ");
-          }
-          if (commitedTransactionsById.containsKey(transactionId)) {
+          if (commitedTransactions.contains(transactionId)) {
             LOG.error("Processing commit for transaction: " + transactionId
                 + ", but have already commited transaction with that id");
             throw new IOException("Corrupted transaction log");
           }
           pendingTransactionsById.remove(transactionId);
-          commitedTransactionsById.put(transactionId, updates);
+          commitedTransactions.add(transactionId);
           commitCount++;
           break;
           default:
@@ -213,23 +215,28 @@
     }
 
     if (pendingTransactionsById.size() > 0) {
-      resolvePendingTransaction(pendingTransactionsById, commitedTransactionsById);
+      return resolvePendingTransaction(pendingTransactionsById);
     }
      
 
-    return commitedTransactionsById;
+    return null;
   }
   
-  private void resolvePendingTransaction(
-      SortedMap<Long, List<KeyValue>> pendingTransactionsById,
-      SortedMap<Long, List<KeyValue>> commitedTransactionsById) {
+  private SortedMap<Long, List<KeyValue>> resolvePendingTransaction(
+      SortedMap<Long, List<KeyValue>> pendingTransactionsById
+      ) {
+    SortedMap<Long, List<KeyValue>> commitedTransactionsById = new TreeMap<Long, List<KeyValue>>();
+    
     LOG.info("Region log has " + pendingTransactionsById.size()
         + " unfinished transactions. Going to the transaction log to resolve");
 
-    for (Entry<Long, List<KeyValue>> entry : pendingTransactionsById
-        .entrySet()) {
+    for (Entry<Long, List<KeyValue>> entry : pendingTransactionsById.entrySet()) {
+      if (entry.getValue().isEmpty()) {
+        LOG.debug("Skipping resolving trx ["+entry.getKey()+"] has no writes.");
+      }
       TransactionLogger.TransactionStatus transactionStatus = getGlobalTransactionLog()
           .getStatusForTransaction(entry.getKey());
+      
       if (transactionStatus == null) {
         throw new RuntimeException("Cannot resolve tranasction ["
             + entry.getKey() + "] from global tx log.");
@@ -241,12 +248,28 @@
         commitedTransactionsById.put(entry.getKey(), entry.getValue());
         break;
       case PENDING:
+        LOG
+            .warn("Transaction ["
+                + entry.getKey()
+                + "] is still pending. Asumming it will not commit."
+                + " If it eventually does commit, then we loose transactional semantics.");
+        // TODO this could possibly be handled by waiting and seeing what happens.  
         break;
       }
     }
+    return commitedTransactionsById;
   }
 
-  private TransactionLogger getGlobalTransactionLog() {
-    return null;
+  private TransactionLogger globalTransactionLog = null;
+  
+  private synchronized TransactionLogger getGlobalTransactionLog() {
+    if (globalTransactionLog == null) {
+      try {
+    globalTransactionLog = new HBaseBackedTransactionLogger();
+      } catch (IOException e) {
+        throw new RuntimeException(e); 
+      }
+    }
+    return globalTransactionLog;
   }
 }

Modified: hadoop/hbase/trunk/src/contrib/transactional/src/java/org/apache/hadoop/hbase/regionserver/transactional/TransactionState.java
URL: http://svn.apache.org/viewvc/hadoop/hbase/trunk/src/contrib/transactional/src/java/org/apache/hadoop/hbase/regionserver/transactional/TransactionState.java?rev=819085&r1=819084&r2=819085&view=diff
==============================================================================
--- hadoop/hbase/trunk/src/contrib/transactional/src/java/org/apache/hadoop/hbase/regionserver/transactional/TransactionState.java (original)
+++ hadoop/hbase/trunk/src/contrib/transactional/src/java/org/apache/hadoop/hbase/regionserver/transactional/TransactionState.java Sat Sep 26 01:26:55 2009
@@ -20,6 +20,8 @@
 package org.apache.hadoop.hbase.regionserver.transactional;
 
 import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collections;
 import java.util.HashSet;
 import java.util.Iterator;
 import java.util.LinkedList;
@@ -163,9 +165,10 @@
     
     // TODO take deletes into account as well
     
-    List<KeyValue> localKVs = new LinkedList<KeyValue>();
-    
-    for (Put put : puts) {
+    List<KeyValue> localKVs = new ArrayList<KeyValue>();
+    List<Put> reversedPuts = new ArrayList<Put>(puts);
+    Collections.reverse(reversedPuts);
+    for (Put put : reversedPuts) {
       if (!Bytes.equals(get.getRow(), put.getRow())) {
         continue;
       }

Modified: hadoop/hbase/trunk/src/contrib/transactional/src/java/org/apache/hadoop/hbase/regionserver/transactional/TransactionalRegion.java
URL: http://svn.apache.org/viewvc/hadoop/hbase/trunk/src/contrib/transactional/src/java/org/apache/hadoop/hbase/regionserver/transactional/TransactionalRegion.java?rev=819085&r1=819084&r2=819085&view=diff
==============================================================================
--- hadoop/hbase/trunk/src/contrib/transactional/src/java/org/apache/hadoop/hbase/regionserver/transactional/TransactionalRegion.java (original)
+++ hadoop/hbase/trunk/src/contrib/transactional/src/java/org/apache/hadoop/hbase/regionserver/transactional/TransactionalRegion.java Sat Sep 26 01:26:55 2009
@@ -51,6 +51,7 @@
 import org.apache.hadoop.hbase.client.Put;
 import org.apache.hadoop.hbase.client.Result;
 import org.apache.hadoop.hbase.client.Scan;
+import org.apache.hadoop.hbase.client.transactional.HBaseBackedTransactionLogger;
 import org.apache.hadoop.hbase.client.transactional.UnknownTransactionException;
 import org.apache.hadoop.hbase.ipc.TransactionalRegionInterface;
 import org.apache.hadoop.hbase.regionserver.FlushRequester;
@@ -130,14 +131,19 @@
       final long minSeqId, final long maxSeqId, final Progressable reporter)
       throws UnsupportedEncodingException, IOException {
     super.doReconstructionLog(oldLogFile, minSeqId, maxSeqId, reporter);
-
+    
+    // We can ignore doing anything with the Trx Log table, it is not-transactional.
+    if (super.getTableDesc().getNameAsString().equals(HBaseBackedTransactionLogger.TABLE_NAME)) {
+      return;
+    }
+        
     THLogRecoveryManager recoveryManager = new THLogRecoveryManager(this);
     Map<Long, List<KeyValue>> commitedTransactionsById = recoveryManager
         .getCommitsFromLog(oldLogFile, minSeqId, reporter);
 
     if (commitedTransactionsById != null && commitedTransactionsById.size() > 0) {
       LOG.debug("found " + commitedTransactionsById.size()
-          + " COMMITED transactions");
+          + " COMMITED transactions to recover.");
 
       for (Entry<Long, List<KeyValue>> entry : commitedTransactionsById
           .entrySet()) {
@@ -150,12 +156,11 @@
         }
       }
 
-      // LOG.debug("Flushing cache"); // We must trigger a cache flush,
-      // otherwise
-      // we will would ignore the log on subsequent failure
-      // if (!super.flushcache()) {
-      // LOG.warn("Did not flush cache");
-      // }
+       LOG.debug("Flushing cache"); // We must trigger a cache flush,
+       //otherwise we will would ignore the log on subsequent failure
+       if (!super.flushcache()) {
+         LOG.warn("Did not flush cache");
+       }
     }
   }
 
@@ -362,6 +367,7 @@
       }
       // Otherwise we were read-only and commitable, so we can forget it.
       state.setStatus(Status.COMMITED);
+      this.hlog.writeCommitToLog(super.getRegionInfo(), state.getTransactionId());
       retireTransaction(state);
       return TransactionalRegionInterface.COMMIT_OK_READ_ONLY;
     }
@@ -459,15 +465,19 @@
     LOG.debug("Commiting transaction: " + state.toString() + " to "
         + super.getRegionInfo().getRegionNameAsString());
 
-    this.hlog.writeCommitToLog(super.getRegionInfo(), state.getTransactionId());
 
+    // FIXME potential mix up here if some deletes should come before the puts.
     for (Put update : state.getPuts()) {
-      this.put(update, false); // Don't need to WAL these
+      this.put(update, true);
     }
     
     for (Delete delete : state.getDeleteSet()) {
-      this.delete(delete, null, false);
+      this.delete(delete, null, true);
     }
+    
+    // Now the transaction lives in the WAL, we can writa a commit to the log 
+    // so we don't have to recover it.
+    this.hlog.writeCommitToLog(super.getRegionInfo(), state.getTransactionId());
 
     state.setStatus(Status.COMMITED);
     if (state.hasWrite()

Modified: hadoop/hbase/trunk/src/contrib/transactional/src/test/org/apache/hadoop/hbase/regionserver/transactional/TestTHLog.java
URL: http://svn.apache.org/viewvc/hadoop/hbase/trunk/src/contrib/transactional/src/test/org/apache/hadoop/hbase/regionserver/transactional/TestTHLog.java?rev=819085&r1=819084&r2=819085&view=diff
==============================================================================
--- hadoop/hbase/trunk/src/contrib/transactional/src/test/org/apache/hadoop/hbase/regionserver/transactional/TestTHLog.java (original)
+++ hadoop/hbase/trunk/src/contrib/transactional/src/test/org/apache/hadoop/hbase/regionserver/transactional/TestTHLog.java Sat Sep 26 01:26:55 2009
@@ -105,23 +105,7 @@
     Map<Long, List<KeyValue>> commits = logRecoveryMangaer.getCommitsFromLog(
         filename, -1, null);
 
-    assertEquals(1, commits.size());
-    assertTrue(commits.containsKey(transactionId));
-    assertEquals(3, commits.get(transactionId).size());
-
-    List<KeyValue> updates = commits.get(transactionId);
-
-    KeyValue update1 = updates.get(0);
-    assertTrue(Bytes.equals(row1, update1.getRow()));
-    assertTrue(Bytes.equals(val1, update1.getValue()));
-
-    KeyValue update2 = updates.get(1);
-    assertTrue(Bytes.equals(row2, update2.getRow()));
-    assertTrue(Bytes.equals(val2, update2.getValue()));
-
-    KeyValue update3 = updates.get(2);
-    assertTrue(Bytes.equals(row3, update3.getRow()));
-    assertTrue(Bytes.equals(val3, update3.getValue()));
+    assertNull(commits);
 
   }
 
@@ -153,7 +137,7 @@
     Map<Long, List<KeyValue>> commits = logRecoveryMangaer.getCommitsFromLog(
         filename, -1, null);
 
-    assertEquals(0, commits.size());
+    assertNull(commits);
   }
 
   /**
@@ -190,9 +174,7 @@
     Map<Long, List<KeyValue>> commits = logMangaer.getCommitsFromLog(filename,
         -1, null);
 
-    assertEquals(2, commits.size());
-    assertEquals(2, commits.get(transaction1Id).size());
-    assertEquals(1, commits.get(transaction2Id).size());
+    assertNull(commits);
   }
 
   /**
@@ -229,8 +211,7 @@
     Map<Long, List<KeyValue>> commits = logMangaer.getCommitsFromLog(filename,
         -1, null);
 
-    assertEquals(1, commits.size());
-    assertEquals(2, commits.get(transaction1Id).size());
+    assertNull(commits);
   }
 
   /**
@@ -267,8 +248,7 @@
     Map<Long, List<KeyValue>> commits = logMangaer.getCommitsFromLog(filename,
         -1, null);
 
-    assertEquals(1, commits.size());
-    assertEquals(1, commits.get(transaction2Id).size());
+    assertNull(commits);
   }
 
   // FIXME Cannot do this test without a global transacton manager

Modified: hadoop/hbase/trunk/src/java/org/apache/hadoop/hbase/KeyValue.java
URL: http://svn.apache.org/viewvc/hadoop/hbase/trunk/src/java/org/apache/hadoop/hbase/KeyValue.java?rev=819085&r1=819084&r2=819085&view=diff
==============================================================================
--- hadoop/hbase/trunk/src/java/org/apache/hadoop/hbase/KeyValue.java (original)
+++ hadoop/hbase/trunk/src/java/org/apache/hadoop/hbase/KeyValue.java Sat Sep 26 01:26:55 2009
@@ -997,6 +997,9 @@
    * @return True if matching families.
    */
   public boolean matchingFamily(final byte [] family) {
+    if (this.length == 0 || this.bytes.length == 0) {
+      return false;
+    }
     int o = getFamilyOffset();
     int l = getFamilyLength(o);
     return Bytes.compareTo(family, 0, family.length, this.bytes, o, l) == 0;

Modified: hadoop/hbase/trunk/src/java/org/apache/hadoop/hbase/regionserver/HLog.java
URL: http://svn.apache.org/viewvc/hadoop/hbase/trunk/src/java/org/apache/hadoop/hbase/regionserver/HLog.java?rev=819085&r1=819084&r2=819085&view=diff
==============================================================================
--- hadoop/hbase/trunk/src/java/org/apache/hadoop/hbase/regionserver/HLog.java (original)
+++ hadoop/hbase/trunk/src/java/org/apache/hadoop/hbase/regionserver/HLog.java Sat Sep 26 01:26:55 2009
@@ -823,6 +823,22 @@
       this.w = w;
     }
   }
+  
+   static Class<? extends HLogKey> getKeyClass(HBaseConfiguration conf) {
+     return (Class<? extends HLogKey>) conf
+        .getClass("hbase.regionserver.hlog.keyclass", HLogKey.class);
+  }
+  
+   static HLogKey newKey(HBaseConfiguration conf) throws IOException {
+    Class<? extends HLogKey> keyClass = getKeyClass(conf);
+    try {
+      return keyClass.newInstance();
+    } catch (InstantiationException e) {
+      throw new IOException("cannot create hlog key");
+    } catch (IllegalAccessException e) {
+      throw new IOException("cannot create hlog key");
+    }
+  }
 
   /*
    * @param rootDir
@@ -875,7 +891,7 @@
           try {
             in = new SequenceFile.Reader(fs, logfiles[i].getPath(), conf);
             try {
-              HLogKey key = new HLogKey();
+              HLogKey key = newKey(conf);
               KeyValue val = new KeyValue();
               while (in.next(key, val)) {
                 byte [] regionName = key.getRegionName();
@@ -890,7 +906,7 @@
                 count++;
                 // Make the key and value new each time; otherwise same instance
                 // is used over and over.
-                key = new HLogKey();
+                key = newKey(conf);
                 val = new KeyValue();
               }
               LOG.debug("Pushed=" + count + " entries from " +
@@ -960,7 +976,7 @@
                     }
                     SequenceFile.Writer w =
                       SequenceFile.createWriter(fs, conf, logfile,
-                        HLogKey.class, KeyValue.class, getCompressionType(conf));
+                        getKeyClass(conf), KeyValue.class, getCompressionType(conf));
                     wap = new WriterAndPath(logfile, w);
                     logWriters.put(key, wap);
                     if (LOG.isDebugEnabled()) {
@@ -970,7 +986,7 @@
 
                     if (old != null) {
                       // Copy from existing log file
-                      HLogKey oldkey = new HLogKey();
+                      HLogKey oldkey = newKey(conf);
                       KeyValue oldval = new KeyValue();
                       for (; old.next(oldkey, oldval); count++) {
                         if (LOG.isDebugEnabled() && count > 0

Modified: hadoop/hbase/trunk/src/java/org/apache/hadoop/hbase/regionserver/Store.java
URL: http://svn.apache.org/viewvc/hadoop/hbase/trunk/src/java/org/apache/hadoop/hbase/regionserver/Store.java?rev=819085&r1=819084&r2=819085&view=diff
==============================================================================
--- hadoop/hbase/trunk/src/java/org/apache/hadoop/hbase/regionserver/Store.java (original)
+++ hadoop/hbase/trunk/src/java/org/apache/hadoop/hbase/regionserver/Store.java Sat Sep 26 01:26:55 2009
@@ -309,7 +309,7 @@
     SequenceFile.Reader logReader = new SequenceFile.Reader(this.fs,
       reconstructionLog, this.conf);
     try {
-      HLogKey key = new HLogKey();
+      HLogKey key = HLog.newKey(conf);
       KeyValue val = new KeyValue();
       long skippedEdits = 0;
       long editsCount = 0;
@@ -327,8 +327,7 @@
         }
         // Check this edit is for me. Also, guard against writing the special
         // METACOLUMN info such as HBASE::CACHEFLUSH entries
-        if (/* commented out for now - stack via jgray key.isTransactionEntry() || */
-            val.matchingFamily(HLog.METAFAMILY) ||
+        if (val.matchingFamily(HLog.METAFAMILY) ||
           !Bytes.equals(key.getRegionName(), region.regionInfo.getRegionName()) ||
           !val.matchingFamily(family.getName())) {
           continue;