You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hbase.apache.org by en...@apache.org on 2013/04/27 00:12:50 UTC

svn commit: r1476419 [2/2] - in /hbase/branches/0.95: hbase-client/src/main/java/org/apache/hadoop/hbase/protobuf/ hbase-protocol/src/main/java/org/apache/hadoop/hbase/protobuf/generated/ hbase-protocol/src/main/protobuf/ hbase-server/src/main/java/org...

Modified: hbase/branches/0.95/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/HLogUtil.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.95/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/HLogUtil.java?rev=1476419&r1=1476418&r2=1476419&view=diff
==============================================================================
--- hbase/branches/0.95/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/HLogUtil.java (original)
+++ hbase/branches/0.95/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/HLogUtil.java Fri Apr 26 22:12:49 2013
@@ -28,27 +28,21 @@ import java.util.regex.Pattern;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.fs.PathFilter;
+import org.apache.hadoop.hbase.HConstants;
+import org.apache.hadoop.hbase.HRegionInfo;
+import org.apache.hadoop.hbase.HTableDescriptor;
 import org.apache.hadoop.hbase.ServerName;
+import org.apache.hadoop.hbase.protobuf.generated.WAL.CompactionDescriptor;
+import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
 import org.apache.hadoop.hbase.util.FSUtils;
 
-import org.apache.hadoop.hbase.HConstants;
-import org.apache.hadoop.hbase.util.Bytes;
-
 public class HLogUtil {
   static final Log LOG = LogFactory.getLog(HLogUtil.class);
 
-  /**
-   * @param family
-   * @return true if the column is a meta column
-   */
-  public static boolean isMetaFamily(byte[] family) {
-    return Bytes.equals(HLog.METAFAMILY, family);
-  }
-
   @SuppressWarnings("unchecked")
   public static Class<? extends HLogKey> getKeyClass(Configuration conf) {
     return (Class<? extends HLogKey>) conf.getClass(
@@ -69,7 +63,7 @@ public class HLogUtil {
   /**
    * Pattern used to validate a HLog file name
    */
-  private static final Pattern pattern = 
+  private static final Pattern pattern =
       Pattern.compile(".*\\.\\d*("+HLog.META_HLOG_FILE_EXTN+")*");
 
   /**
@@ -84,40 +78,40 @@ public class HLogUtil {
 
   /*
    * Get a reader for the WAL.
-   * 
+   *
    * @param fs
-   * 
+   *
    * @param path
-   * 
+   *
    * @param conf
-   * 
+   *
    * @return A WAL reader. Close when done with it.
-   * 
+   *
    * @throws IOException
-   * 
+   *
    * public static HLog.Reader getReader(final FileSystem fs, final Path path,
    * Configuration conf) throws IOException { try {
-   * 
+   *
    * if (logReaderClass == null) {
-   * 
+   *
    * logReaderClass = conf.getClass("hbase.regionserver.hlog.reader.impl",
    * SequenceFileLogReader.class, Reader.class); }
-   * 
-   * 
+   *
+   *
    * HLog.Reader reader = logReaderClass.newInstance(); reader.init(fs, path,
    * conf); return reader; } catch (IOException e) { throw e; } catch (Exception
    * e) { throw new IOException("Cannot get log reader", e); } }
-   * 
+   *
    * * Get a writer for the WAL.
-   * 
+   *
    * @param path
-   * 
+   *
    * @param conf
-   * 
+   *
    * @return A WAL writer. Close when done with it.
-   * 
+   *
    * @throws IOException
-   * 
+   *
    * public static HLog.Writer createWriter(final FileSystem fs, final Path
    * path, Configuration conf) throws IOException { try { if (logWriterClass ==
    * null) { logWriterClass =
@@ -130,7 +124,7 @@ public class HLogUtil {
 
   /**
    * Construct the HLog directory name
-   * 
+   *
    * @param serverName
    *          Server name formatted as described in {@link ServerName}
    * @return the relative HLog directory name, e.g.
@@ -157,7 +151,7 @@ public class HLogUtil {
 
   /**
    * Move aside a bad edits file.
-   * 
+   *
    * @param fs
    * @param edits
    *          Edits file to move aside.
@@ -239,7 +233,7 @@ public class HLogUtil {
   /**
    * Returns sorted set of edit files made by wal-log splitter, excluding files
    * with '.temp' suffix.
-   * 
+   *
    * @param fs
    * @param regiondir
    * @return Files in passed <code>regiondir</code> as a sorted set.
@@ -287,4 +281,18 @@ public class HLogUtil {
     }
     return false;
   }
+
+  /**
+   * Write the marker that a compaction has succeeded and is about to be committed.
+   * This provides info to the HMaster to allow it to recover the compaction if
+   * this regionserver dies in the middle (This part is not yet implemented). It also prevents the compaction from
+   * finishing if this regionserver has already lost its lease on the log.
+   */
+  public static void writeCompactionMarker(HLog log, HTableDescriptor htd, HRegionInfo info, final CompactionDescriptor c)
+  throws IOException {
+    WALEdit e = WALEdit.createCompaction(c);
+    log.append(info, c.getTableName().toByteArray(), e,
+        EnvironmentEdgeManager.currentTimeMillis(), htd);
+    LOG.info("Appended compaction marker " + c);
+  }
 }

Modified: hbase/branches/0.95/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/WALEdit.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.95/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/WALEdit.java?rev=1476419&r1=1476418&r2=1476419&view=diff
==============================================================================
--- hbase/branches/0.95/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/WALEdit.java (original)
+++ hbase/branches/0.95/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/WALEdit.java Fri Apr 26 22:12:49 2013
@@ -27,8 +27,9 @@ import java.util.NavigableMap;
 import java.util.TreeMap;
 
 import org.apache.hadoop.classification.InterfaceAudience;
-import org.apache.hadoop.hbase.io.HeapSize;
 import org.apache.hadoop.hbase.KeyValue;
+import org.apache.hadoop.hbase.io.HeapSize;
+import org.apache.hadoop.hbase.protobuf.generated.WAL.CompactionDescriptor;
 import org.apache.hadoop.hbase.util.Bytes;
 import org.apache.hadoop.hbase.util.ClassSize;
 import org.apache.hadoop.io.Writable;
@@ -69,6 +70,11 @@ import org.apache.hadoop.io.Writable;
  */
 @InterfaceAudience.Private
 public class WALEdit implements Writable, HeapSize {
+  // TODO: Make it so user cannot make a cf w/ this name.  Make the illegal cf names.  Ditto for row.
+  public static final byte [] METAFAMILY = Bytes.toBytes("METAFAMILY");
+  static final byte [] METAROW = Bytes.toBytes("METAROW");
+  static final byte[] COMPLETE_CACHE_FLUSH = Bytes.toBytes("HBASE::CACHEFLUSH");
+  static final byte[] COMPACTION = Bytes.toBytes("HBASE::COMPACTION");
 
   private final int VERSION_2 = -1;
 
@@ -80,12 +86,21 @@ public class WALEdit implements Writable
   public WALEdit() {
   }
 
+  /**
+   * @param f
+   * @return True is <code>f</code> is {@link #METAFAMILY}
+   */
+  public static boolean isMetaEditFamily(final byte [] f) {
+    return Bytes.equals(METAFAMILY, f);
+  }
+
   public void setCompressionContext(final CompressionContext compressionContext) {
     this.compressionContext = compressionContext;
   }
 
-  public void add(KeyValue kv) {
+  public WALEdit add(KeyValue kv) {
     this.kvs.add(kv);
+    return this;
   }
 
   public boolean isEmpty() {
@@ -197,4 +212,26 @@ public class WALEdit implements Writable
     return sb.toString();
   }
 
-}
+  /**
+   * Create a compacion WALEdit
+   * @param c
+   * @return A WALEdit that has <code>c</code> serialized as its value
+   */
+  public static WALEdit createCompaction(final CompactionDescriptor c) {
+    byte [] pbbytes = c.toByteArray();
+    KeyValue kv = new KeyValue(METAROW, METAFAMILY, COMPACTION, System.currentTimeMillis(), pbbytes);
+    return new WALEdit().add(kv); //replication scope null so that this won't be replicated
+  }
+
+  /**
+   * Deserialized and returns a CompactionDescriptor is the KeyValue contains one.
+   * @param kv the key value
+   * @return deserialized CompactionDescriptor or null.
+   */
+  public static CompactionDescriptor getCompaction(KeyValue kv) throws IOException {
+    if (kv.matchingRow(METAROW) && kv.matchingColumn(METAFAMILY, COMPACTION)) {
+      return CompactionDescriptor.parseFrom(kv.getValue());
+    }
+    return null;
+  }
+}
\ No newline at end of file

Modified: hbase/branches/0.95/hbase-server/src/test/java/org/apache/hadoop/hbase/HBaseTestingUtility.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.95/hbase-server/src/test/java/org/apache/hadoop/hbase/HBaseTestingUtility.java?rev=1476419&r1=1476418&r2=1476419&view=diff
==============================================================================
--- hbase/branches/0.95/hbase-server/src/test/java/org/apache/hadoop/hbase/HBaseTestingUtility.java (original)
+++ hbase/branches/0.95/hbase-server/src/test/java/org/apache/hadoop/hbase/HBaseTestingUtility.java Fri Apr 26 22:12:49 2013
@@ -57,6 +57,7 @@ import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.hbase.Waiter.Predicate;
 import org.apache.hadoop.hbase.catalog.MetaEditor;
 import org.apache.hadoop.hbase.client.Delete;
+import org.apache.hadoop.hbase.client.Durability;
 import org.apache.hadoop.hbase.client.Get;
 import org.apache.hadoop.hbase.client.HBaseAdmin;
 import org.apache.hadoop.hbase.client.HConnection;
@@ -65,7 +66,6 @@ import org.apache.hadoop.hbase.client.Pu
 import org.apache.hadoop.hbase.client.Result;
 import org.apache.hadoop.hbase.client.ResultScanner;
 import org.apache.hadoop.hbase.client.Scan;
-import org.apache.hadoop.hbase.client.Durability;
 import org.apache.hadoop.hbase.exceptions.MasterNotRunningException;
 import org.apache.hadoop.hbase.exceptions.TableExistsException;
 import org.apache.hadoop.hbase.exceptions.TableNotEnabledException;
@@ -1320,6 +1320,15 @@ public class HBaseTestingUtility extends
     return rowCount;
   }
 
+  public void loadNumericRows(final HTable t, final byte[] f, int startRow, int endRow) throws IOException {
+    for (int i = startRow; i < endRow; i++) {
+      byte[] data = Bytes.toBytes(String.valueOf(i));
+      Put put = new Put(data);
+      put.add(f, null, data);
+      t.put(put);
+    }
+  }
+
   /**
    * Return the number of rows in the given table.
    */
@@ -1935,7 +1944,7 @@ public class HBaseTestingUtility extends
 
   /*
    * Retrieves a splittable region randomly from tableName
-   * 
+   *
    * @param tableName name of table
    * @param maxAttempts maximum number of attempts, unlimited for value of -1
    * @return the HRegion chosen, null if none was found within limit of maxAttempts
@@ -1954,7 +1963,7 @@ public class HBaseTestingUtility extends
       }
       regCount = regions.size();
       // There are chances that before we get the region for the table from an RS the region may
-      // be going for CLOSE.  This may be because online schema change is enabled 
+      // be going for CLOSE.  This may be because online schema change is enabled
       if (regCount > 0) {
         idx = random.nextInt(regCount);
         // if we have just tried this region, there is no need to try again
@@ -1972,7 +1981,7 @@ public class HBaseTestingUtility extends
     } while (maxAttempts == -1 || attempts < maxAttempts);
     return null;
   }
-  
+
   public MiniZooKeeperCluster getZkCluster() {
     return zkCluster;
   }
@@ -2253,10 +2262,10 @@ public class HBaseTestingUtility extends
     scanner.close();
     return result;
   }
-  
+
   /**
    * Create region split keys between startkey and endKey
-   * 
+   *
    * @param startKey
    * @param endKey
    * @param numRegions the number of regions to be created. it has to be greater than 3.

Modified: hbase/branches/0.95/hbase-server/src/test/java/org/apache/hadoop/hbase/MiniHBaseCluster.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.95/hbase-server/src/test/java/org/apache/hadoop/hbase/MiniHBaseCluster.java?rev=1476419&r1=1476418&r2=1476419&view=diff
==============================================================================
--- hbase/branches/0.95/hbase-server/src/test/java/org/apache/hadoop/hbase/MiniHBaseCluster.java (original)
+++ hbase/branches/0.95/hbase-server/src/test/java/org/apache/hadoop/hbase/MiniHBaseCluster.java Fri Apr 26 22:12:49 2013
@@ -676,6 +676,20 @@ public class MiniHBaseCluster extends HB
     this.hbaseCluster.join();
   }
 
+  public List<HRegion> findRegionsForTable(byte[] tableName) {
+    ArrayList<HRegion> ret = new ArrayList<HRegion>();
+    for (JVMClusterUtil.RegionServerThread rst : getRegionServerThreads()) {
+      HRegionServer hrs = rst.getRegionServer();
+      for (HRegion region : hrs.getOnlineRegions(tableName)) {
+        if (Bytes.equals(region.getTableDesc().getName(), tableName)) {
+          ret.add(region);
+        }
+      }
+    }
+    return ret;
+  }
+
+
   protected int getRegionServerIndex(ServerName serverName) {
     //we have a small number of region servers, this should be fine for now.
     List<RegionServerThread> servers = getRegionServerThreads();

Modified: hbase/branches/0.95/hbase-server/src/test/java/org/apache/hadoop/hbase/TestFullLogReconstruction.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.95/hbase-server/src/test/java/org/apache/hadoop/hbase/TestFullLogReconstruction.java?rev=1476419&r1=1476418&r2=1476419&view=diff
==============================================================================
--- hbase/branches/0.95/hbase-server/src/test/java/org/apache/hadoop/hbase/TestFullLogReconstruction.java (original)
+++ hbase/branches/0.95/hbase-server/src/test/java/org/apache/hadoop/hbase/TestFullLogReconstruction.java Fri Apr 26 22:12:49 2013
@@ -23,9 +23,6 @@ import static org.junit.Assert.assertEqu
 
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.hbase.client.HTable;
-import org.apache.hadoop.hbase.client.Result;
-import org.apache.hadoop.hbase.client.ResultScanner;
-import org.apache.hadoop.hbase.client.Scan;
 import org.apache.hadoop.hbase.util.Bytes;
 import org.junit.After;
 import org.junit.AfterClass;
@@ -99,13 +96,7 @@ public class TestFullLogReconstruction {
 
     // Load up the table with simple rows and count them
     int initialCount = TEST_UTIL.loadTable(table, FAMILY);
-    Scan scan = new Scan();
-    ResultScanner results = table.getScanner(scan);
-    int count = 0;
-    for (Result res : results) {
-      count++;
-    }
-    results.close();
+    int count = TEST_UTIL.countRows(table);
 
     assertEquals(initialCount, count);
 
@@ -114,15 +105,8 @@ public class TestFullLogReconstruction {
     }
 
     TEST_UTIL.expireRegionServerSession(0);
-    scan = new Scan();
-    results = table.getScanner(scan);
-    int newCount = 0;
-    for (Result res : results) {
-      newCount++;
-    }
+    int newCount = TEST_UTIL.countRows(table);
     assertEquals(count, newCount);
-    results.close();
     table.close();
   }
-
-}
+}
\ No newline at end of file

Added: hbase/branches/0.95/hbase-server/src/test/java/org/apache/hadoop/hbase/TestIOFencing.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.95/hbase-server/src/test/java/org/apache/hadoop/hbase/TestIOFencing.java?rev=1476419&view=auto
==============================================================================
--- hbase/branches/0.95/hbase-server/src/test/java/org/apache/hadoop/hbase/TestIOFencing.java (added)
+++ hbase/branches/0.95/hbase-server/src/test/java/org/apache/hadoop/hbase/TestIOFencing.java Fri Apr 26 22:12:49 2013
@@ -0,0 +1,303 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+
+import java.io.IOException;
+import java.util.Collection;
+import java.util.List;
+import java.util.concurrent.CountDownLatch;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.commons.logging.impl.Log4JLogger;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hbase.client.HBaseAdmin;
+import org.apache.hadoop.hbase.client.HTable;
+import org.apache.hadoop.hbase.regionserver.ConstantSizeRegionSplitPolicy;
+import org.apache.hadoop.hbase.regionserver.HRegion;
+import org.apache.hadoop.hbase.regionserver.HRegionServer;
+import org.apache.hadoop.hbase.regionserver.HStore;
+import org.apache.hadoop.hbase.regionserver.RegionServerServices;
+import org.apache.hadoop.hbase.regionserver.Store;
+import org.apache.hadoop.hbase.regionserver.StoreFile;
+import org.apache.hadoop.hbase.regionserver.compactions.CompactionContext;
+import org.apache.hadoop.hbase.regionserver.wal.HLog;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.hadoop.hbase.util.JVMClusterUtil.RegionServerThread;
+import org.apache.hadoop.hdfs.DFSClient;
+import org.apache.hadoop.hdfs.server.datanode.DataNode;
+import org.apache.hadoop.hdfs.server.namenode.FSNamesystem;
+import org.apache.hadoop.hdfs.server.namenode.LeaseManager;
+import org.apache.log4j.Level;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+
+/**
+ * Test for the case where a regionserver going down has enough cycles to do damage to regions
+ * that have actually been assigned elsehwere.
+ *
+ * <p>If we happen to assign a region before it fully done with in its old location -- i.e. it is on two servers at the
+ * same time -- all can work fine until the case where the region on the dying server decides to compact or otherwise
+ * change the region file set.  The region in its new location will then get a surprise when it tries to do something
+ * w/ a file removed by the region in its old location on dying server.
+ *
+ * <p>Making a test for this case is a little tough in that even if a file is deleted up on the namenode,
+ * if the file was opened before the delete, it will continue to let reads happen until something changes the
+ * state of cached blocks in the dfsclient that was already open (a block from the deleted file is cleaned
+ * from the datanode by NN).
+ *
+ * <p>What we will do below is do an explicit check for existence on the files listed in the region that
+ * has had some files removed because of a compaction.  This sort of hurry's along and makes certain what is a chance
+ * occurance.
+ */
+@Category(MediumTests.class)
+public class TestIOFencing {
+  static final Log LOG = LogFactory.getLog(TestIOFencing.class);
+  static {
+    ((Log4JLogger)FSNamesystem.LOG).getLogger().setLevel(Level.ALL);
+    ((Log4JLogger)DataNode.LOG).getLogger().setLevel(Level.ALL);
+    ((Log4JLogger)LeaseManager.LOG).getLogger().setLevel(Level.ALL);
+    ((Log4JLogger)LogFactory.getLog("org.apache.hadoop.hdfs.server.namenode.FSNamesystem")).getLogger().setLevel(Level.ALL);
+    ((Log4JLogger)DFSClient.LOG).getLogger().setLevel(Level.ALL);
+    ((Log4JLogger)HLog.LOG).getLogger().setLevel(Level.ALL);
+  }
+
+  public abstract static class CompactionBlockerRegion extends HRegion {
+    volatile int compactCount = 0;
+    volatile CountDownLatch compactionsBlocked = new CountDownLatch(0);
+    volatile CountDownLatch compactionsWaiting = new CountDownLatch(0);
+
+    public CompactionBlockerRegion(Path tableDir, HLog log,
+        FileSystem fs, Configuration confParam, HRegionInfo info,
+        HTableDescriptor htd, RegionServerServices rsServices) {
+      super(tableDir, log, fs, confParam, info, htd, rsServices);
+    }
+
+    public void stopCompactions() {
+      compactionsBlocked = new CountDownLatch(1);
+      compactionsWaiting = new CountDownLatch(1);
+    }
+
+    public void allowCompactions() {
+      LOG.debug("allowing compactions");
+      compactionsBlocked.countDown();
+    }
+    public void waitForCompactionToBlock() throws IOException {
+      try {
+        LOG.debug("waiting for compaction to block");
+        compactionsWaiting.await();
+        LOG.debug("compaction block reached");
+      } catch (InterruptedException ex) {
+        throw new IOException(ex);
+      }
+    }
+    @Override
+    public boolean compact(CompactionContext compaction, Store store) throws IOException {
+      try {
+        return super.compact(compaction, store);
+      } finally {
+        compactCount++;
+      }
+    }
+    public int countStoreFiles() {
+      int count = 0;
+      for (Store store : stores.values()) {
+        count += store.getStorefilesCount();
+      }
+      return count;
+    }
+  }
+
+  /**
+   * An override of HRegion that allows us park compactions in a holding pattern and
+   * then when appropriate for the test, allow them proceed again.
+   */
+  public static class BlockCompactionsInPrepRegion extends CompactionBlockerRegion {
+
+    public BlockCompactionsInPrepRegion(Path tableDir, HLog log,
+        FileSystem fs, Configuration confParam, HRegionInfo info,
+        HTableDescriptor htd, RegionServerServices rsServices) {
+      super(tableDir, log, fs, confParam, info, htd, rsServices);
+    }
+    @Override
+    protected void doRegionCompactionPrep() throws IOException {
+      compactionsWaiting.countDown();
+      try {
+        compactionsBlocked.await();
+      } catch (InterruptedException ex) {
+        throw new IOException();
+      }
+      super.doRegionCompactionPrep();
+    }
+  }
+
+  /**
+   * An override of HRegion that allows us park compactions in a holding pattern and
+   * then when appropriate for the test, allow them proceed again. This allows the compaction
+   * entry to go the WAL before blocking, but blocks afterwards
+   */
+  public static class BlockCompactionsInCompletionRegion extends CompactionBlockerRegion {
+    public BlockCompactionsInCompletionRegion(Path tableDir, HLog log,
+        FileSystem fs, Configuration confParam, HRegionInfo info,
+        HTableDescriptor htd, RegionServerServices rsServices) {
+      super(tableDir, log, fs, confParam, info, htd, rsServices);
+    }
+    protected HStore instantiateHStore(final HColumnDescriptor family) throws IOException {
+      return new BlockCompactionsInCompletionHStore(this, family, this.conf);
+    }
+  }
+
+  public static class BlockCompactionsInCompletionHStore extends HStore {
+    CompactionBlockerRegion r;
+    protected BlockCompactionsInCompletionHStore(HRegion region, HColumnDescriptor family,
+        Configuration confParam) throws IOException {
+      super(region, family, confParam);
+      r = (CompactionBlockerRegion) region;
+    }
+
+    @Override
+    protected void completeCompaction(Collection<StoreFile> compactedFiles,
+        Collection<StoreFile> result) throws IOException {
+      try {
+        r.compactionsWaiting.countDown();
+        r.compactionsBlocked.await();
+      } catch (InterruptedException ex) {
+        throw new IOException(ex);
+      }
+      super.completeCompaction(compactedFiles, result);
+    }
+  }
+
+  private final static HBaseTestingUtility TEST_UTIL = new HBaseTestingUtility();
+  private final static byte[] TABLE_NAME = Bytes.toBytes("tabletest");
+  private final static byte[] FAMILY = Bytes.toBytes("family");
+  private static final int FIRST_BATCH_COUNT = 4000;
+  private static final int SECOND_BATCH_COUNT = FIRST_BATCH_COUNT;
+
+  /**
+   * Test that puts up a regionserver, starts a compaction on a loaded region but holds the
+   * compaction until after we have killed the server and the region has come up on
+   * a new regionserver altogether.  This fakes the double assignment case where region in one
+   * location changes the files out from underneath a region being served elsewhere.
+   */
+  @Test
+  public void testFencingAroundCompaction() throws Exception {
+    doTest(BlockCompactionsInPrepRegion.class);
+  }
+
+  /**
+   * Test that puts up a regionserver, starts a compaction on a loaded region but holds the
+   * compaction completion until after we have killed the server and the region has come up on
+   * a new regionserver altogether.  This fakes the double assignment case where region in one
+   * location changes the files out from underneath a region being served elsewhere.
+   */
+  @Test
+  public void testFencingAroundCompactionAfterWALSync() throws Exception {
+    doTest(BlockCompactionsInCompletionRegion.class);
+  }
+
+  public void doTest(Class<?> regionClass) throws Exception {
+    Configuration c = TEST_UTIL.getConfiguration();
+    // Insert our custom region
+    c.setClass(HConstants.REGION_IMPL, regionClass, HRegion.class);
+    c.setBoolean("dfs.support.append", true);
+    // Encourage plenty of flushes
+    c.setLong("hbase.hregion.memstore.flush.size", 200000);
+    c.set(HConstants.HBASE_REGION_SPLIT_POLICY_KEY, ConstantSizeRegionSplitPolicy.class.getName());
+    // Only run compaction when we tell it to
+    c.setInt("hbase.hstore.compactionThreshold", 1000);
+    c.setLong("hbase.hstore.blockingStoreFiles", 1000);
+    // Compact quickly after we tell it to!
+    c.setInt("hbase.regionserver.thread.splitcompactcheckfrequency", 1000);
+    LOG.info("Starting mini cluster");
+    TEST_UTIL.startMiniCluster(1);
+    CompactionBlockerRegion compactingRegion = null;
+    HBaseAdmin admin = null;
+    try {
+      LOG.info("Creating admin");
+      admin = new HBaseAdmin(c);
+      LOG.info("Creating table");
+      TEST_UTIL.createTable(TABLE_NAME, FAMILY);
+      HTable table = new HTable(c, TABLE_NAME);
+      LOG.info("Loading test table");
+      // Load some rows
+      TEST_UTIL.loadNumericRows(table, FAMILY, 0, FIRST_BATCH_COUNT);
+      // Find the region
+      List<HRegion> testRegions = TEST_UTIL.getMiniHBaseCluster().findRegionsForTable(TABLE_NAME);
+      assertEquals(1, testRegions.size());
+      compactingRegion = (CompactionBlockerRegion)testRegions.get(0);
+      assertTrue(compactingRegion.countStoreFiles() > 1);
+      final byte REGION_NAME[] = compactingRegion.getRegionName();
+      LOG.info("Blocking compactions");
+      compactingRegion.stopCompactions();
+      LOG.info("Asking for compaction");
+      admin.majorCompact(TABLE_NAME);
+      LOG.info("Waiting for compaction to be about to start");
+      compactingRegion.waitForCompactionToBlock();
+      LOG.info("Starting a new server");
+      RegionServerThread newServerThread = TEST_UTIL.getMiniHBaseCluster().startRegionServer();
+      HRegionServer newServer = newServerThread.getRegionServer();
+      LOG.info("Killing region server ZK lease");
+      TEST_UTIL.expireRegionServerSession(0);
+      CompactionBlockerRegion newRegion = null;
+      long startWaitTime = System.currentTimeMillis();
+      while (newRegion == null) {
+        LOG.info("Waiting for the new server to pick up the region " + Bytes.toString(REGION_NAME));
+        Thread.sleep(100);
+        newRegion = (CompactionBlockerRegion)newServer.getOnlineRegion(REGION_NAME);
+        assertTrue("Timed out waiting for new server to open region",
+          System.currentTimeMillis() - startWaitTime < 60000);
+      }
+      LOG.info("Allowing compaction to proceed");
+      compactingRegion.allowCompactions();
+      while (compactingRegion.compactCount == 0) {
+        Thread.sleep(1000);
+      }
+      // The server we killed stays up until the compaction that was started before it was killed completes.  In logs
+      // you should see the old regionserver now going down.
+      LOG.info("Compaction finished");
+      // After compaction of old region finishes on the server that was going down, make sure that
+      // all the files we expect are still working when region is up in new location.
+      FileSystem fs = newRegion.getFilesystem();
+      for (String f: newRegion.getStoreFileList(new byte [][] {FAMILY})) {
+        assertTrue("After compaction, does not exist: " + f, fs.exists(new Path(f)));
+      }
+      // If we survive the split keep going...
+      // Now we make sure that the region isn't totally confused.  Load up more rows.
+      TEST_UTIL.loadNumericRows(table, FAMILY, FIRST_BATCH_COUNT, FIRST_BATCH_COUNT + SECOND_BATCH_COUNT);
+      admin.majorCompact(TABLE_NAME);
+      startWaitTime = System.currentTimeMillis();
+      while (newRegion.compactCount == 0) {
+        Thread.sleep(1000);
+        assertTrue("New region never compacted", System.currentTimeMillis() - startWaitTime < 30000);
+      }
+      assertEquals(FIRST_BATCH_COUNT + SECOND_BATCH_COUNT, TEST_UTIL.countRows(table));
+    } finally {
+      if (compactingRegion != null) {
+        compactingRegion.allowCompactions();
+      }
+      admin.close();
+      TEST_UTIL.shutdownMiniCluster();
+    }
+  }
+}
\ No newline at end of file

Modified: hbase/branches/0.95/hbase-server/src/test/java/org/apache/hadoop/hbase/coprocessor/TestRowProcessorEndpoint.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.95/hbase-server/src/test/java/org/apache/hadoop/hbase/coprocessor/TestRowProcessorEndpoint.java?rev=1476419&r1=1476418&r2=1476419&view=diff
==============================================================================
--- hbase/branches/0.95/hbase-server/src/test/java/org/apache/hadoop/hbase/coprocessor/TestRowProcessorEndpoint.java (original)
+++ hbase/branches/0.95/hbase-server/src/test/java/org/apache/hadoop/hbase/coprocessor/TestRowProcessorEndpoint.java Fri Apr 26 22:12:49 2013
@@ -350,7 +350,7 @@ public class TestRowProcessorEndpoint {
 
         // We can also inject some meta data to the walEdit
         KeyValue metaKv = new KeyValue(
-            row, HLog.METAFAMILY,
+            row, WALEdit.METAFAMILY,
             Bytes.toBytes("I just increment counter"),
             Bytes.toBytes(counter));
         walEdit.add(metaKv);

Modified: hbase/branches/0.95/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestHRegion.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.95/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestHRegion.java?rev=1476419&r1=1476418&r2=1476419&view=diff
==============================================================================
--- hbase/branches/0.95/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestHRegion.java (original)
+++ hbase/branches/0.95/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestHRegion.java Fri Apr 26 22:12:49 2013
@@ -35,18 +35,18 @@ import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FSDataOutputStream;
+import org.apache.hadoop.fs.FileStatus;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hbase.Cell;
+import org.apache.hadoop.hbase.CellComparator;
 import org.apache.hadoop.hbase.CompatibilitySingletonFactory;
-import org.apache.hadoop.hbase.exceptions.FailedSanityCheckException;
 import org.apache.hadoop.hbase.HBaseConfiguration;
 import org.apache.hadoop.hbase.HBaseTestCase;
 import org.apache.hadoop.hbase.HBaseTestingUtility;
 import org.apache.hadoop.hbase.HColumnDescriptor;
 import org.apache.hadoop.hbase.HConstants;
 import org.apache.hadoop.hbase.HConstants.OperationStatusCode;
-import org.apache.hadoop.hbase.Cell;
-import org.apache.hadoop.hbase.CellComparator;
 import org.apache.hadoop.hbase.HDFSBlocksDistribution;
 import org.apache.hadoop.hbase.HRegionInfo;
 import org.apache.hadoop.hbase.HTableDescriptor;
@@ -58,6 +58,7 @@ import org.apache.hadoop.hbase.Multithre
 import org.apache.hadoop.hbase.MultithreadedTestUtil.TestThread;
 import org.apache.hadoop.hbase.client.Append;
 import org.apache.hadoop.hbase.client.Delete;
+import org.apache.hadoop.hbase.client.Durability;
 import org.apache.hadoop.hbase.client.Get;
 import org.apache.hadoop.hbase.client.HTable;
 import org.apache.hadoop.hbase.client.Increment;
@@ -65,7 +66,7 @@ import org.apache.hadoop.hbase.client.Mu
 import org.apache.hadoop.hbase.client.Put;
 import org.apache.hadoop.hbase.client.Result;
 import org.apache.hadoop.hbase.client.Scan;
-import org.apache.hadoop.hbase.client.Durability;
+import org.apache.hadoop.hbase.exceptions.FailedSanityCheckException;
 import org.apache.hadoop.hbase.exceptions.NoSuchColumnFamilyException;
 import org.apache.hadoop.hbase.exceptions.NotServingRegionException;
 import org.apache.hadoop.hbase.exceptions.WrongRegionException;
@@ -82,6 +83,8 @@ import org.apache.hadoop.hbase.filter.Si
 import org.apache.hadoop.hbase.monitoring.MonitoredRPCHandler;
 import org.apache.hadoop.hbase.monitoring.MonitoredTask;
 import org.apache.hadoop.hbase.monitoring.TaskMonitor;
+import org.apache.hadoop.hbase.protobuf.ProtobufUtil;
+import org.apache.hadoop.hbase.protobuf.generated.WAL.CompactionDescriptor;
 import org.apache.hadoop.hbase.regionserver.HRegion.RegionScannerImpl;
 import org.apache.hadoop.hbase.regionserver.wal.HLog;
 import org.apache.hadoop.hbase.regionserver.wal.HLogFactory;
@@ -382,6 +385,95 @@ public class TestHRegion extends HBaseTe
     }
   }
 
+  @Test
+  public void testRecoveredEditsReplayCompaction() throws Exception {
+    String method = "testRecoveredEditsReplayCompaction";
+    byte[] tableName = Bytes.toBytes(method);
+    byte[] family = Bytes.toBytes("family");
+    this.region = initHRegion(tableName, method, conf, family);
+    try {
+      Path regiondir = region.getRegionFileSystem().getRegionDir();
+      FileSystem fs = region.getRegionFileSystem().getFileSystem();
+      byte[] regionName = region.getRegionInfo().getEncodedNameAsBytes();
+
+      long maxSeqId = 3;
+      long minSeqId = 0;
+
+      for (long i = minSeqId; i < maxSeqId; i++) {
+        Put put = new Put(Bytes.toBytes(i));
+        put.add(family, Bytes.toBytes(i), Bytes.toBytes(i));
+        region.put(put);
+        region.flushcache();
+      }
+
+      //this will create a region with 3 files
+      assertEquals(3, region.getStore(family).getStorefilesCount());
+      List<Path> storeFiles = new ArrayList<Path>(3);
+      for (StoreFile sf : region.getStore(family).getStorefiles()) {
+        storeFiles.add(sf.getPath());
+      }
+
+      //disable compaction completion
+      conf.setBoolean("hbase.hstore.compaction.complete",false);
+      region.compactStores();
+
+      //ensure that nothing changed
+      assertEquals(3, region.getStore(family).getStorefilesCount());
+
+      //now find the compacted file, and manually add it to the recovered edits
+      Path tmpDir = region.getRegionFileSystem().getTempDir();
+      FileStatus[] files = region.getRegionFileSystem().getFileSystem().listStatus(tmpDir);
+      assertEquals(1, files.length);
+      //move the file inside region dir
+      Path newFile = region.getRegionFileSystem().commitStoreFile(Bytes.toString(family), files[0].getPath());
+
+      CompactionDescriptor compactionDescriptor = ProtobufUtil.toCompactionDescriptor(
+          this.region.getRegionInfo(), family,
+          storeFiles, Lists.newArrayList(newFile),
+          region.getRegionFileSystem().getStoreDir(Bytes.toString(family)));
+
+      HLogUtil.writeCompactionMarker(region.getLog(), this.region.getTableDesc(),
+          this.region.getRegionInfo(), compactionDescriptor);
+
+      Path recoveredEditsDir = HLogUtil.getRegionDirRecoveredEditsDir(regiondir);
+
+      Path recoveredEdits = new Path(recoveredEditsDir, String.format("%019d", 1000));
+      fs.create(recoveredEdits);
+      HLog.Writer writer = HLogFactory.createWriter(fs, recoveredEdits, conf);
+
+      long time = System.nanoTime();
+
+      writer.append(new HLog.Entry(new HLogKey(regionName, tableName, 10, time, HConstants.DEFAULT_CLUSTER_ID),
+          WALEdit.createCompaction(compactionDescriptor)));
+      writer.close();
+
+      //close the region now, and reopen again
+      HTableDescriptor htd = region.getTableDesc();
+      HRegionInfo info = region.getRegionInfo();
+      region.close();
+      region = HRegion.openHRegion(conf, fs, regiondir.getParent().getParent(), info, htd, null);
+
+      //now check whether we have only one store file, the compacted one
+      Collection<StoreFile> sfs = region.getStore(family).getStorefiles();
+      for (StoreFile sf : sfs) {
+        LOG.info(sf.getPath());
+      }
+      assertEquals(1, region.getStore(family).getStorefilesCount());
+      files = region.getRegionFileSystem().getFileSystem().listStatus(tmpDir);
+      assertEquals(0, files.length);
+
+      for (long i = minSeqId; i < maxSeqId; i++) {
+        Get get = new Get(Bytes.toBytes(i));
+        Result result = region.get(get);
+        byte[] value = result.getValue(family, Bytes.toBytes(i));
+        assertEquals(Bytes.toBytes(i), value);
+      }
+    } finally {
+      HRegion.closeHRegion(this.region);
+      this.region = null;
+    }
+  }
+
   public void testGetWhileRegionClose() throws IOException {
     Configuration hc = initSplit();
     int numRows = 100;