You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hbase.apache.org by en...@apache.org on 2013/04/27 00:05:34 UTC
svn commit: r1476414 [2/2] - in /hbase/trunk:
hbase-client/src/main/java/org/apache/hadoop/hbase/protobuf/
hbase-protocol/src/main/java/org/apache/hadoop/hbase/protobuf/generated/
hbase-protocol/src/main/protobuf/ hbase-server/src/main/java/org/apache/...
Modified: hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/HLogUtil.java
URL: http://svn.apache.org/viewvc/hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/HLogUtil.java?rev=1476414&r1=1476413&r2=1476414&view=diff
==============================================================================
--- hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/HLogUtil.java (original)
+++ hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/HLogUtil.java Fri Apr 26 22:05:33 2013
@@ -28,27 +28,21 @@ import java.util.regex.Pattern;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.PathFilter;
+import org.apache.hadoop.hbase.HConstants;
+import org.apache.hadoop.hbase.HRegionInfo;
+import org.apache.hadoop.hbase.HTableDescriptor;
import org.apache.hadoop.hbase.ServerName;
+import org.apache.hadoop.hbase.protobuf.generated.WAL.CompactionDescriptor;
+import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
import org.apache.hadoop.hbase.util.FSUtils;
-import org.apache.hadoop.hbase.HConstants;
-import org.apache.hadoop.hbase.util.Bytes;
-
public class HLogUtil {
static final Log LOG = LogFactory.getLog(HLogUtil.class);
- /**
- * @param family
- * @return true if the column is a meta column
- */
- public static boolean isMetaFamily(byte[] family) {
- return Bytes.equals(HLog.METAFAMILY, family);
- }
-
@SuppressWarnings("unchecked")
public static Class<? extends HLogKey> getKeyClass(Configuration conf) {
return (Class<? extends HLogKey>) conf.getClass(
@@ -69,7 +63,7 @@ public class HLogUtil {
/**
* Pattern used to validate a HLog file name
*/
- private static final Pattern pattern =
+ private static final Pattern pattern =
Pattern.compile(".*\\.\\d*("+HLog.META_HLOG_FILE_EXTN+")*");
/**
@@ -84,40 +78,40 @@ public class HLogUtil {
/*
* Get a reader for the WAL.
- *
+ *
* @param fs
- *
+ *
* @param path
- *
+ *
* @param conf
- *
+ *
* @return A WAL reader. Close when done with it.
- *
+ *
* @throws IOException
- *
+ *
* public static HLog.Reader getReader(final FileSystem fs, final Path path,
* Configuration conf) throws IOException { try {
- *
+ *
* if (logReaderClass == null) {
- *
+ *
* logReaderClass = conf.getClass("hbase.regionserver.hlog.reader.impl",
* SequenceFileLogReader.class, Reader.class); }
- *
- *
+ *
+ *
* HLog.Reader reader = logReaderClass.newInstance(); reader.init(fs, path,
* conf); return reader; } catch (IOException e) { throw e; } catch (Exception
* e) { throw new IOException("Cannot get log reader", e); } }
- *
+ *
* * Get a writer for the WAL.
- *
+ *
* @param path
- *
+ *
* @param conf
- *
+ *
* @return A WAL writer. Close when done with it.
- *
+ *
* @throws IOException
- *
+ *
* public static HLog.Writer createWriter(final FileSystem fs, final Path
* path, Configuration conf) throws IOException { try { if (logWriterClass ==
* null) { logWriterClass =
@@ -130,7 +124,7 @@ public class HLogUtil {
/**
* Construct the HLog directory name
- *
+ *
* @param serverName
* Server name formatted as described in {@link ServerName}
* @return the relative HLog directory name, e.g.
@@ -157,7 +151,7 @@ public class HLogUtil {
/**
* Move aside a bad edits file.
- *
+ *
* @param fs
* @param edits
* Edits file to move aside.
@@ -239,7 +233,7 @@ public class HLogUtil {
/**
* Returns sorted set of edit files made by wal-log splitter, excluding files
* with '.temp' suffix.
- *
+ *
* @param fs
* @param regiondir
* @return Files in passed <code>regiondir</code> as a sorted set.
@@ -287,4 +281,18 @@ public class HLogUtil {
}
return false;
}
+
+ /**
+ * Write the marker that a compaction has succeeded and is about to be committed.
+ * This provides info to the HMaster to allow it to recover the compaction if
+ * this regionserver dies in the middle (This part is not yet implemented). It also prevents the compaction from
+ * finishing if this regionserver has already lost its lease on the log.
+ */
+ public static void writeCompactionMarker(HLog log, HTableDescriptor htd, HRegionInfo info, final CompactionDescriptor c)
+ throws IOException {
+ WALEdit e = WALEdit.createCompaction(c);
+ log.append(info, c.getTableName().toByteArray(), e,
+ EnvironmentEdgeManager.currentTimeMillis(), htd);
+ LOG.info("Appended compaction marker " + c);
+ }
}
Modified: hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/WALEdit.java
URL: http://svn.apache.org/viewvc/hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/WALEdit.java?rev=1476414&r1=1476413&r2=1476414&view=diff
==============================================================================
--- hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/WALEdit.java (original)
+++ hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/WALEdit.java Fri Apr 26 22:05:33 2013
@@ -27,8 +27,9 @@ import java.util.NavigableMap;
import java.util.TreeMap;
import org.apache.hadoop.classification.InterfaceAudience;
-import org.apache.hadoop.hbase.io.HeapSize;
import org.apache.hadoop.hbase.KeyValue;
+import org.apache.hadoop.hbase.io.HeapSize;
+import org.apache.hadoop.hbase.protobuf.generated.WAL.CompactionDescriptor;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hbase.util.ClassSize;
import org.apache.hadoop.io.Writable;
@@ -69,6 +70,11 @@ import org.apache.hadoop.io.Writable;
*/
@InterfaceAudience.Private
public class WALEdit implements Writable, HeapSize {
+ // TODO: Make it so user cannot make a cf w/ this name. Make the illegal cf names. Ditto for row.
+ public static final byte [] METAFAMILY = Bytes.toBytes("METAFAMILY");
+ static final byte [] METAROW = Bytes.toBytes("METAROW");
+ static final byte[] COMPLETE_CACHE_FLUSH = Bytes.toBytes("HBASE::CACHEFLUSH");
+ static final byte[] COMPACTION = Bytes.toBytes("HBASE::COMPACTION");
private final int VERSION_2 = -1;
@@ -80,12 +86,21 @@ public class WALEdit implements Writable
public WALEdit() {
}
+ /**
+ * @param f
+ * @return True is <code>f</code> is {@link #METAFAMILY}
+ */
+ public static boolean isMetaEditFamily(final byte [] f) {
+ return Bytes.equals(METAFAMILY, f);
+ }
+
public void setCompressionContext(final CompressionContext compressionContext) {
this.compressionContext = compressionContext;
}
- public void add(KeyValue kv) {
+ public WALEdit add(KeyValue kv) {
this.kvs.add(kv);
+ return this;
}
public boolean isEmpty() {
@@ -197,4 +212,26 @@ public class WALEdit implements Writable
return sb.toString();
}
-}
+ /**
+ * Create a compacion WALEdit
+ * @param c
+ * @return A WALEdit that has <code>c</code> serialized as its value
+ */
+ public static WALEdit createCompaction(final CompactionDescriptor c) {
+ byte [] pbbytes = c.toByteArray();
+ KeyValue kv = new KeyValue(METAROW, METAFAMILY, COMPACTION, System.currentTimeMillis(), pbbytes);
+ return new WALEdit().add(kv); //replication scope null so that this won't be replicated
+ }
+
+ /**
+ * Deserialized and returns a CompactionDescriptor is the KeyValue contains one.
+ * @param kv the key value
+ * @return deserialized CompactionDescriptor or null.
+ */
+ public static CompactionDescriptor getCompaction(KeyValue kv) throws IOException {
+ if (kv.matchingRow(METAROW) && kv.matchingColumn(METAFAMILY, COMPACTION)) {
+ return CompactionDescriptor.parseFrom(kv.getValue());
+ }
+ return null;
+ }
+}
\ No newline at end of file
Modified: hbase/trunk/hbase-server/src/test/java/org/apache/hadoop/hbase/HBaseTestingUtility.java
URL: http://svn.apache.org/viewvc/hbase/trunk/hbase-server/src/test/java/org/apache/hadoop/hbase/HBaseTestingUtility.java?rev=1476414&r1=1476413&r2=1476414&view=diff
==============================================================================
--- hbase/trunk/hbase-server/src/test/java/org/apache/hadoop/hbase/HBaseTestingUtility.java (original)
+++ hbase/trunk/hbase-server/src/test/java/org/apache/hadoop/hbase/HBaseTestingUtility.java Fri Apr 26 22:05:33 2013
@@ -57,6 +57,7 @@ import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.Waiter.Predicate;
import org.apache.hadoop.hbase.catalog.MetaEditor;
import org.apache.hadoop.hbase.client.Delete;
+import org.apache.hadoop.hbase.client.Durability;
import org.apache.hadoop.hbase.client.Get;
import org.apache.hadoop.hbase.client.HBaseAdmin;
import org.apache.hadoop.hbase.client.HConnection;
@@ -65,7 +66,6 @@ import org.apache.hadoop.hbase.client.Pu
import org.apache.hadoop.hbase.client.Result;
import org.apache.hadoop.hbase.client.ResultScanner;
import org.apache.hadoop.hbase.client.Scan;
-import org.apache.hadoop.hbase.client.Durability;
import org.apache.hadoop.hbase.exceptions.MasterNotRunningException;
import org.apache.hadoop.hbase.exceptions.TableExistsException;
import org.apache.hadoop.hbase.exceptions.TableNotEnabledException;
@@ -86,7 +86,6 @@ import org.apache.hadoop.hbase.regionser
import org.apache.hadoop.hbase.regionserver.HStore;
import org.apache.hadoop.hbase.regionserver.InternalScanner;
import org.apache.hadoop.hbase.regionserver.MultiVersionConsistencyControl;
-import org.apache.hadoop.hbase.regionserver.StoreFile;
import org.apache.hadoop.hbase.security.User;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hbase.util.FSUtils;
@@ -1322,6 +1321,15 @@ public class HBaseTestingUtility extends
return rowCount;
}
+ public void loadNumericRows(final HTable t, final byte[] f, int startRow, int endRow) throws IOException {
+ for (int i = startRow; i < endRow; i++) {
+ byte[] data = Bytes.toBytes(String.valueOf(i));
+ Put put = new Put(data);
+ put.add(f, null, data);
+ t.put(put);
+ }
+ }
+
/**
* Return the number of rows in the given table.
*/
@@ -1937,7 +1945,7 @@ public class HBaseTestingUtility extends
/*
* Retrieves a splittable region randomly from tableName
- *
+ *
* @param tableName name of table
* @param maxAttempts maximum number of attempts, unlimited for value of -1
* @return the HRegion chosen, null if none was found within limit of maxAttempts
@@ -1956,7 +1964,7 @@ public class HBaseTestingUtility extends
}
regCount = regions.size();
// There are chances that before we get the region for the table from an RS the region may
- // be going for CLOSE. This may be because online schema change is enabled
+ // be going for CLOSE. This may be because online schema change is enabled
if (regCount > 0) {
idx = random.nextInt(regCount);
// if we have just tried this region, there is no need to try again
@@ -1974,7 +1982,7 @@ public class HBaseTestingUtility extends
} while (maxAttempts == -1 || attempts < maxAttempts);
return null;
}
-
+
public MiniZooKeeperCluster getZkCluster() {
return zkCluster;
}
@@ -2252,10 +2260,10 @@ public class HBaseTestingUtility extends
scanner.close();
return result;
}
-
+
/**
* Create region split keys between startkey and endKey
- *
+ *
* @param startKey
* @param endKey
* @param numRegions the number of regions to be created. it has to be greater than 3.
Modified: hbase/trunk/hbase-server/src/test/java/org/apache/hadoop/hbase/MiniHBaseCluster.java
URL: http://svn.apache.org/viewvc/hbase/trunk/hbase-server/src/test/java/org/apache/hadoop/hbase/MiniHBaseCluster.java?rev=1476414&r1=1476413&r2=1476414&view=diff
==============================================================================
--- hbase/trunk/hbase-server/src/test/java/org/apache/hadoop/hbase/MiniHBaseCluster.java (original)
+++ hbase/trunk/hbase-server/src/test/java/org/apache/hadoop/hbase/MiniHBaseCluster.java Fri Apr 26 22:05:33 2013
@@ -676,6 +676,20 @@ public class MiniHBaseCluster extends HB
this.hbaseCluster.join();
}
+ public List<HRegion> findRegionsForTable(byte[] tableName) {
+ ArrayList<HRegion> ret = new ArrayList<HRegion>();
+ for (JVMClusterUtil.RegionServerThread rst : getRegionServerThreads()) {
+ HRegionServer hrs = rst.getRegionServer();
+ for (HRegion region : hrs.getOnlineRegions(tableName)) {
+ if (Bytes.equals(region.getTableDesc().getName(), tableName)) {
+ ret.add(region);
+ }
+ }
+ }
+ return ret;
+ }
+
+
protected int getRegionServerIndex(ServerName serverName) {
//we have a small number of region servers, this should be fine for now.
List<RegionServerThread> servers = getRegionServerThreads();
Modified: hbase/trunk/hbase-server/src/test/java/org/apache/hadoop/hbase/TestFullLogReconstruction.java
URL: http://svn.apache.org/viewvc/hbase/trunk/hbase-server/src/test/java/org/apache/hadoop/hbase/TestFullLogReconstruction.java?rev=1476414&r1=1476413&r2=1476414&view=diff
==============================================================================
--- hbase/trunk/hbase-server/src/test/java/org/apache/hadoop/hbase/TestFullLogReconstruction.java (original)
+++ hbase/trunk/hbase-server/src/test/java/org/apache/hadoop/hbase/TestFullLogReconstruction.java Fri Apr 26 22:05:33 2013
@@ -23,9 +23,6 @@ import static org.junit.Assert.assertEqu
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.client.HTable;
-import org.apache.hadoop.hbase.client.Result;
-import org.apache.hadoop.hbase.client.ResultScanner;
-import org.apache.hadoop.hbase.client.Scan;
import org.apache.hadoop.hbase.util.Bytes;
import org.junit.After;
import org.junit.AfterClass;
@@ -99,13 +96,7 @@ public class TestFullLogReconstruction {
// Load up the table with simple rows and count them
int initialCount = TEST_UTIL.loadTable(table, FAMILY);
- Scan scan = new Scan();
- ResultScanner results = table.getScanner(scan);
- int count = 0;
- for (Result res : results) {
- count++;
- }
- results.close();
+ int count = TEST_UTIL.countRows(table);
assertEquals(initialCount, count);
@@ -114,15 +105,8 @@ public class TestFullLogReconstruction {
}
TEST_UTIL.expireRegionServerSession(0);
- scan = new Scan();
- results = table.getScanner(scan);
- int newCount = 0;
- for (Result res : results) {
- newCount++;
- }
+ int newCount = TEST_UTIL.countRows(table);
assertEquals(count, newCount);
- results.close();
table.close();
}
-
-}
+}
\ No newline at end of file
Added: hbase/trunk/hbase-server/src/test/java/org/apache/hadoop/hbase/TestIOFencing.java
URL: http://svn.apache.org/viewvc/hbase/trunk/hbase-server/src/test/java/org/apache/hadoop/hbase/TestIOFencing.java?rev=1476414&view=auto
==============================================================================
--- hbase/trunk/hbase-server/src/test/java/org/apache/hadoop/hbase/TestIOFencing.java (added)
+++ hbase/trunk/hbase-server/src/test/java/org/apache/hadoop/hbase/TestIOFencing.java Fri Apr 26 22:05:33 2013
@@ -0,0 +1,303 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+
+import java.io.IOException;
+import java.util.Collection;
+import java.util.List;
+import java.util.concurrent.CountDownLatch;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.commons.logging.impl.Log4JLogger;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hbase.client.HBaseAdmin;
+import org.apache.hadoop.hbase.client.HTable;
+import org.apache.hadoop.hbase.regionserver.ConstantSizeRegionSplitPolicy;
+import org.apache.hadoop.hbase.regionserver.HRegion;
+import org.apache.hadoop.hbase.regionserver.HRegionServer;
+import org.apache.hadoop.hbase.regionserver.HStore;
+import org.apache.hadoop.hbase.regionserver.RegionServerServices;
+import org.apache.hadoop.hbase.regionserver.Store;
+import org.apache.hadoop.hbase.regionserver.StoreFile;
+import org.apache.hadoop.hbase.regionserver.compactions.CompactionContext;
+import org.apache.hadoop.hbase.regionserver.wal.HLog;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.hadoop.hbase.util.JVMClusterUtil.RegionServerThread;
+import org.apache.hadoop.hdfs.DFSClient;
+import org.apache.hadoop.hdfs.server.datanode.DataNode;
+import org.apache.hadoop.hdfs.server.namenode.FSNamesystem;
+import org.apache.hadoop.hdfs.server.namenode.LeaseManager;
+import org.apache.log4j.Level;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+
+/**
+ * Test for the case where a regionserver going down has enough cycles to do damage to regions
+ * that have actually been assigned elsehwere.
+ *
+ * <p>If we happen to assign a region before it fully done with in its old location -- i.e. it is on two servers at the
+ * same time -- all can work fine until the case where the region on the dying server decides to compact or otherwise
+ * change the region file set. The region in its new location will then get a surprise when it tries to do something
+ * w/ a file removed by the region in its old location on dying server.
+ *
+ * <p>Making a test for this case is a little tough in that even if a file is deleted up on the namenode,
+ * if the file was opened before the delete, it will continue to let reads happen until something changes the
+ * state of cached blocks in the dfsclient that was already open (a block from the deleted file is cleaned
+ * from the datanode by NN).
+ *
+ * <p>What we will do below is do an explicit check for existence on the files listed in the region that
+ * has had some files removed because of a compaction. This sort of hurry's along and makes certain what is a chance
+ * occurance.
+ */
+@Category(MediumTests.class)
+public class TestIOFencing {
+ static final Log LOG = LogFactory.getLog(TestIOFencing.class);
+ static {
+ ((Log4JLogger)FSNamesystem.LOG).getLogger().setLevel(Level.ALL);
+ ((Log4JLogger)DataNode.LOG).getLogger().setLevel(Level.ALL);
+ ((Log4JLogger)LeaseManager.LOG).getLogger().setLevel(Level.ALL);
+ ((Log4JLogger)LogFactory.getLog("org.apache.hadoop.hdfs.server.namenode.FSNamesystem")).getLogger().setLevel(Level.ALL);
+ ((Log4JLogger)DFSClient.LOG).getLogger().setLevel(Level.ALL);
+ ((Log4JLogger)HLog.LOG).getLogger().setLevel(Level.ALL);
+ }
+
+ public abstract static class CompactionBlockerRegion extends HRegion {
+ volatile int compactCount = 0;
+ volatile CountDownLatch compactionsBlocked = new CountDownLatch(0);
+ volatile CountDownLatch compactionsWaiting = new CountDownLatch(0);
+
+ public CompactionBlockerRegion(Path tableDir, HLog log,
+ FileSystem fs, Configuration confParam, HRegionInfo info,
+ HTableDescriptor htd, RegionServerServices rsServices) {
+ super(tableDir, log, fs, confParam, info, htd, rsServices);
+ }
+
+ public void stopCompactions() {
+ compactionsBlocked = new CountDownLatch(1);
+ compactionsWaiting = new CountDownLatch(1);
+ }
+
+ public void allowCompactions() {
+ LOG.debug("allowing compactions");
+ compactionsBlocked.countDown();
+ }
+ public void waitForCompactionToBlock() throws IOException {
+ try {
+ LOG.debug("waiting for compaction to block");
+ compactionsWaiting.await();
+ LOG.debug("compaction block reached");
+ } catch (InterruptedException ex) {
+ throw new IOException(ex);
+ }
+ }
+ @Override
+ public boolean compact(CompactionContext compaction, Store store) throws IOException {
+ try {
+ return super.compact(compaction, store);
+ } finally {
+ compactCount++;
+ }
+ }
+ public int countStoreFiles() {
+ int count = 0;
+ for (Store store : stores.values()) {
+ count += store.getStorefilesCount();
+ }
+ return count;
+ }
+ }
+
+ /**
+ * An override of HRegion that allows us park compactions in a holding pattern and
+ * then when appropriate for the test, allow them proceed again.
+ */
+ public static class BlockCompactionsInPrepRegion extends CompactionBlockerRegion {
+
+ public BlockCompactionsInPrepRegion(Path tableDir, HLog log,
+ FileSystem fs, Configuration confParam, HRegionInfo info,
+ HTableDescriptor htd, RegionServerServices rsServices) {
+ super(tableDir, log, fs, confParam, info, htd, rsServices);
+ }
+ @Override
+ protected void doRegionCompactionPrep() throws IOException {
+ compactionsWaiting.countDown();
+ try {
+ compactionsBlocked.await();
+ } catch (InterruptedException ex) {
+ throw new IOException();
+ }
+ super.doRegionCompactionPrep();
+ }
+ }
+
+ /**
+ * An override of HRegion that allows us park compactions in a holding pattern and
+ * then when appropriate for the test, allow them proceed again. This allows the compaction
+ * entry to go the WAL before blocking, but blocks afterwards
+ */
+ public static class BlockCompactionsInCompletionRegion extends CompactionBlockerRegion {
+ public BlockCompactionsInCompletionRegion(Path tableDir, HLog log,
+ FileSystem fs, Configuration confParam, HRegionInfo info,
+ HTableDescriptor htd, RegionServerServices rsServices) {
+ super(tableDir, log, fs, confParam, info, htd, rsServices);
+ }
+ protected HStore instantiateHStore(final HColumnDescriptor family) throws IOException {
+ return new BlockCompactionsInCompletionHStore(this, family, this.conf);
+ }
+ }
+
+ public static class BlockCompactionsInCompletionHStore extends HStore {
+ CompactionBlockerRegion r;
+ protected BlockCompactionsInCompletionHStore(HRegion region, HColumnDescriptor family,
+ Configuration confParam) throws IOException {
+ super(region, family, confParam);
+ r = (CompactionBlockerRegion) region;
+ }
+
+ @Override
+ protected void completeCompaction(Collection<StoreFile> compactedFiles,
+ Collection<StoreFile> result) throws IOException {
+ try {
+ r.compactionsWaiting.countDown();
+ r.compactionsBlocked.await();
+ } catch (InterruptedException ex) {
+ throw new IOException(ex);
+ }
+ super.completeCompaction(compactedFiles, result);
+ }
+ }
+
+ private final static HBaseTestingUtility TEST_UTIL = new HBaseTestingUtility();
+ private final static byte[] TABLE_NAME = Bytes.toBytes("tabletest");
+ private final static byte[] FAMILY = Bytes.toBytes("family");
+ private static final int FIRST_BATCH_COUNT = 4000;
+ private static final int SECOND_BATCH_COUNT = FIRST_BATCH_COUNT;
+
+ /**
+ * Test that puts up a regionserver, starts a compaction on a loaded region but holds the
+ * compaction until after we have killed the server and the region has come up on
+ * a new regionserver altogether. This fakes the double assignment case where region in one
+ * location changes the files out from underneath a region being served elsewhere.
+ */
+ @Test
+ public void testFencingAroundCompaction() throws Exception {
+ doTest(BlockCompactionsInPrepRegion.class);
+ }
+
+ /**
+ * Test that puts up a regionserver, starts a compaction on a loaded region but holds the
+ * compaction completion until after we have killed the server and the region has come up on
+ * a new regionserver altogether. This fakes the double assignment case where region in one
+ * location changes the files out from underneath a region being served elsewhere.
+ */
+ @Test
+ public void testFencingAroundCompactionAfterWALSync() throws Exception {
+ doTest(BlockCompactionsInCompletionRegion.class);
+ }
+
+ public void doTest(Class<?> regionClass) throws Exception {
+ Configuration c = TEST_UTIL.getConfiguration();
+ // Insert our custom region
+ c.setClass(HConstants.REGION_IMPL, regionClass, HRegion.class);
+ c.setBoolean("dfs.support.append", true);
+ // Encourage plenty of flushes
+ c.setLong("hbase.hregion.memstore.flush.size", 200000);
+ c.set(HConstants.HBASE_REGION_SPLIT_POLICY_KEY, ConstantSizeRegionSplitPolicy.class.getName());
+ // Only run compaction when we tell it to
+ c.setInt("hbase.hstore.compactionThreshold", 1000);
+ c.setLong("hbase.hstore.blockingStoreFiles", 1000);
+ // Compact quickly after we tell it to!
+ c.setInt("hbase.regionserver.thread.splitcompactcheckfrequency", 1000);
+ LOG.info("Starting mini cluster");
+ TEST_UTIL.startMiniCluster(1);
+ CompactionBlockerRegion compactingRegion = null;
+ HBaseAdmin admin = null;
+ try {
+ LOG.info("Creating admin");
+ admin = new HBaseAdmin(c);
+ LOG.info("Creating table");
+ TEST_UTIL.createTable(TABLE_NAME, FAMILY);
+ HTable table = new HTable(c, TABLE_NAME);
+ LOG.info("Loading test table");
+ // Load some rows
+ TEST_UTIL.loadNumericRows(table, FAMILY, 0, FIRST_BATCH_COUNT);
+ // Find the region
+ List<HRegion> testRegions = TEST_UTIL.getMiniHBaseCluster().findRegionsForTable(TABLE_NAME);
+ assertEquals(1, testRegions.size());
+ compactingRegion = (CompactionBlockerRegion)testRegions.get(0);
+ assertTrue(compactingRegion.countStoreFiles() > 1);
+ final byte REGION_NAME[] = compactingRegion.getRegionName();
+ LOG.info("Blocking compactions");
+ compactingRegion.stopCompactions();
+ LOG.info("Asking for compaction");
+ admin.majorCompact(TABLE_NAME);
+ LOG.info("Waiting for compaction to be about to start");
+ compactingRegion.waitForCompactionToBlock();
+ LOG.info("Starting a new server");
+ RegionServerThread newServerThread = TEST_UTIL.getMiniHBaseCluster().startRegionServer();
+ HRegionServer newServer = newServerThread.getRegionServer();
+ LOG.info("Killing region server ZK lease");
+ TEST_UTIL.expireRegionServerSession(0);
+ CompactionBlockerRegion newRegion = null;
+ long startWaitTime = System.currentTimeMillis();
+ while (newRegion == null) {
+ LOG.info("Waiting for the new server to pick up the region " + Bytes.toString(REGION_NAME));
+ Thread.sleep(100);
+ newRegion = (CompactionBlockerRegion)newServer.getOnlineRegion(REGION_NAME);
+ assertTrue("Timed out waiting for new server to open region",
+ System.currentTimeMillis() - startWaitTime < 60000);
+ }
+ LOG.info("Allowing compaction to proceed");
+ compactingRegion.allowCompactions();
+ while (compactingRegion.compactCount == 0) {
+ Thread.sleep(1000);
+ }
+ // The server we killed stays up until the compaction that was started before it was killed completes. In logs
+ // you should see the old regionserver now going down.
+ LOG.info("Compaction finished");
+ // After compaction of old region finishes on the server that was going down, make sure that
+ // all the files we expect are still working when region is up in new location.
+ FileSystem fs = newRegion.getFilesystem();
+ for (String f: newRegion.getStoreFileList(new byte [][] {FAMILY})) {
+ assertTrue("After compaction, does not exist: " + f, fs.exists(new Path(f)));
+ }
+ // If we survive the split keep going...
+ // Now we make sure that the region isn't totally confused. Load up more rows.
+ TEST_UTIL.loadNumericRows(table, FAMILY, FIRST_BATCH_COUNT, FIRST_BATCH_COUNT + SECOND_BATCH_COUNT);
+ admin.majorCompact(TABLE_NAME);
+ startWaitTime = System.currentTimeMillis();
+ while (newRegion.compactCount == 0) {
+ Thread.sleep(1000);
+ assertTrue("New region never compacted", System.currentTimeMillis() - startWaitTime < 30000);
+ }
+ assertEquals(FIRST_BATCH_COUNT + SECOND_BATCH_COUNT, TEST_UTIL.countRows(table));
+ } finally {
+ if (compactingRegion != null) {
+ compactingRegion.allowCompactions();
+ }
+ admin.close();
+ TEST_UTIL.shutdownMiniCluster();
+ }
+ }
+}
\ No newline at end of file
Modified: hbase/trunk/hbase-server/src/test/java/org/apache/hadoop/hbase/coprocessor/TestRowProcessorEndpoint.java
URL: http://svn.apache.org/viewvc/hbase/trunk/hbase-server/src/test/java/org/apache/hadoop/hbase/coprocessor/TestRowProcessorEndpoint.java?rev=1476414&r1=1476413&r2=1476414&view=diff
==============================================================================
--- hbase/trunk/hbase-server/src/test/java/org/apache/hadoop/hbase/coprocessor/TestRowProcessorEndpoint.java (original)
+++ hbase/trunk/hbase-server/src/test/java/org/apache/hadoop/hbase/coprocessor/TestRowProcessorEndpoint.java Fri Apr 26 22:05:33 2013
@@ -350,7 +350,7 @@ public class TestRowProcessorEndpoint {
// We can also inject some meta data to the walEdit
KeyValue metaKv = new KeyValue(
- row, HLog.METAFAMILY,
+ row, WALEdit.METAFAMILY,
Bytes.toBytes("I just increment counter"),
Bytes.toBytes(counter));
walEdit.add(metaKv);
Modified: hbase/trunk/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestHRegion.java
URL: http://svn.apache.org/viewvc/hbase/trunk/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestHRegion.java?rev=1476414&r1=1476413&r2=1476414&view=diff
==============================================================================
--- hbase/trunk/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestHRegion.java (original)
+++ hbase/trunk/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestHRegion.java Fri Apr 26 22:05:33 2013
@@ -35,18 +35,18 @@ import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataOutputStream;
+import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hbase.Cell;
+import org.apache.hadoop.hbase.CellComparator;
import org.apache.hadoop.hbase.CompatibilitySingletonFactory;
-import org.apache.hadoop.hbase.exceptions.FailedSanityCheckException;
import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.HBaseTestCase;
import org.apache.hadoop.hbase.HBaseTestingUtility;
import org.apache.hadoop.hbase.HColumnDescriptor;
import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.HConstants.OperationStatusCode;
-import org.apache.hadoop.hbase.Cell;
-import org.apache.hadoop.hbase.CellComparator;
import org.apache.hadoop.hbase.HDFSBlocksDistribution;
import org.apache.hadoop.hbase.HRegionInfo;
import org.apache.hadoop.hbase.HTableDescriptor;
@@ -58,6 +58,7 @@ import org.apache.hadoop.hbase.Multithre
import org.apache.hadoop.hbase.MultithreadedTestUtil.TestThread;
import org.apache.hadoop.hbase.client.Append;
import org.apache.hadoop.hbase.client.Delete;
+import org.apache.hadoop.hbase.client.Durability;
import org.apache.hadoop.hbase.client.Get;
import org.apache.hadoop.hbase.client.HTable;
import org.apache.hadoop.hbase.client.Increment;
@@ -65,7 +66,7 @@ import org.apache.hadoop.hbase.client.Mu
import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.client.Result;
import org.apache.hadoop.hbase.client.Scan;
-import org.apache.hadoop.hbase.client.Durability;
+import org.apache.hadoop.hbase.exceptions.FailedSanityCheckException;
import org.apache.hadoop.hbase.exceptions.NoSuchColumnFamilyException;
import org.apache.hadoop.hbase.exceptions.NotServingRegionException;
import org.apache.hadoop.hbase.exceptions.WrongRegionException;
@@ -82,6 +83,8 @@ import org.apache.hadoop.hbase.filter.Si
import org.apache.hadoop.hbase.monitoring.MonitoredRPCHandler;
import org.apache.hadoop.hbase.monitoring.MonitoredTask;
import org.apache.hadoop.hbase.monitoring.TaskMonitor;
+import org.apache.hadoop.hbase.protobuf.ProtobufUtil;
+import org.apache.hadoop.hbase.protobuf.generated.WAL.CompactionDescriptor;
import org.apache.hadoop.hbase.regionserver.HRegion.RegionScannerImpl;
import org.apache.hadoop.hbase.regionserver.wal.HLog;
import org.apache.hadoop.hbase.regionserver.wal.HLogFactory;
@@ -382,6 +385,95 @@ public class TestHRegion extends HBaseTe
}
}
+ @Test
+ public void testRecoveredEditsReplayCompaction() throws Exception {
+ String method = "testRecoveredEditsReplayCompaction";
+ byte[] tableName = Bytes.toBytes(method);
+ byte[] family = Bytes.toBytes("family");
+ this.region = initHRegion(tableName, method, conf, family);
+ try {
+ Path regiondir = region.getRegionFileSystem().getRegionDir();
+ FileSystem fs = region.getRegionFileSystem().getFileSystem();
+ byte[] regionName = region.getRegionInfo().getEncodedNameAsBytes();
+
+ long maxSeqId = 3;
+ long minSeqId = 0;
+
+ for (long i = minSeqId; i < maxSeqId; i++) {
+ Put put = new Put(Bytes.toBytes(i));
+ put.add(family, Bytes.toBytes(i), Bytes.toBytes(i));
+ region.put(put);
+ region.flushcache();
+ }
+
+ //this will create a region with 3 files
+ assertEquals(3, region.getStore(family).getStorefilesCount());
+ List<Path> storeFiles = new ArrayList<Path>(3);
+ for (StoreFile sf : region.getStore(family).getStorefiles()) {
+ storeFiles.add(sf.getPath());
+ }
+
+ //disable compaction completion
+ conf.setBoolean("hbase.hstore.compaction.complete",false);
+ region.compactStores();
+
+ //ensure that nothing changed
+ assertEquals(3, region.getStore(family).getStorefilesCount());
+
+ //now find the compacted file, and manually add it to the recovered edits
+ Path tmpDir = region.getRegionFileSystem().getTempDir();
+ FileStatus[] files = region.getRegionFileSystem().getFileSystem().listStatus(tmpDir);
+ assertEquals(1, files.length);
+ //move the file inside region dir
+ Path newFile = region.getRegionFileSystem().commitStoreFile(Bytes.toString(family), files[0].getPath());
+
+ CompactionDescriptor compactionDescriptor = ProtobufUtil.toCompactionDescriptor(
+ this.region.getRegionInfo(), family,
+ storeFiles, Lists.newArrayList(newFile),
+ region.getRegionFileSystem().getStoreDir(Bytes.toString(family)));
+
+ HLogUtil.writeCompactionMarker(region.getLog(), this.region.getTableDesc(),
+ this.region.getRegionInfo(), compactionDescriptor);
+
+ Path recoveredEditsDir = HLogUtil.getRegionDirRecoveredEditsDir(regiondir);
+
+ Path recoveredEdits = new Path(recoveredEditsDir, String.format("%019d", 1000));
+ fs.create(recoveredEdits);
+ HLog.Writer writer = HLogFactory.createWriter(fs, recoveredEdits, conf);
+
+ long time = System.nanoTime();
+
+ writer.append(new HLog.Entry(new HLogKey(regionName, tableName, 10, time, HConstants.DEFAULT_CLUSTER_ID),
+ WALEdit.createCompaction(compactionDescriptor)));
+ writer.close();
+
+ //close the region now, and reopen again
+ HTableDescriptor htd = region.getTableDesc();
+ HRegionInfo info = region.getRegionInfo();
+ region.close();
+ region = HRegion.openHRegion(conf, fs, regiondir.getParent().getParent(), info, htd, null);
+
+ //now check whether we have only one store file, the compacted one
+ Collection<StoreFile> sfs = region.getStore(family).getStorefiles();
+ for (StoreFile sf : sfs) {
+ LOG.info(sf.getPath());
+ }
+ assertEquals(1, region.getStore(family).getStorefilesCount());
+ files = region.getRegionFileSystem().getFileSystem().listStatus(tmpDir);
+ assertEquals(0, files.length);
+
+ for (long i = minSeqId; i < maxSeqId; i++) {
+ Get get = new Get(Bytes.toBytes(i));
+ Result result = region.get(get);
+ byte[] value = result.getValue(family, Bytes.toBytes(i));
+ assertEquals(Bytes.toBytes(i), value);
+ }
+ } finally {
+ HRegion.closeHRegion(this.region);
+ this.region = null;
+ }
+ }
+
public void testGetWhileRegionClose() throws IOException {
Configuration hc = initSplit();
int numRows = 100;