You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@hbase.apache.org by Ryan Rawson <ry...@gmail.com> on 2010/03/23 03:12:56 UTC
Tests with HDFS-200 on trunk issues
Interesting story here, testing with dfs.support.append=true might be
difficult/impossible using unit tests because of this code:
//
// We found the lease for this file. And surprisingly the original
// holder is trying to recreate this file. This should never occur.
//
if (lease != null) {
Lease leaseFile = leaseManager.getLeaseByPath(src);
if (leaseFile != null && leaseFile.equals(lease)) {
throw new AlreadyBeingCreatedException(
"failed to create
file " + src + " for " + holder +
" on client " +
clientMachine +
" because current
leaseholder is trying to recreate file.");
}
}
I ran into this with the following patch on trunk:
diff --git a/core/src/main/java/org/apache/hadoop/hbase/regionserver/wal/HLog.java
b/core/src/main/java/org/apache/hadoop/hbase/regionserver/wal/HLog.java
index d2b01fe..8c81aed 100644
--- a/core/src/main/java/org/apache/hadoop/hbase/regionserver/wal/HLog.java
+++ b/core/src/main/java/org/apache/hadoop/hbase/regionserver/wal/HLog.java
@@ -22,6 +22,7 @@ package org.apache.hadoop.hbase.regionserver.wal;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
@@ -39,6 +40,7 @@ import org.apache.hadoop.hbase.util.ClassSize;
import org.apache.hadoop.hbase.util.FSUtils;
import org.apache.hadoop.hbase.util.Threads;
import org.apache.hadoop.hdfs.DistributedFileSystem;
+import org.apache.hadoop.io.SequenceFile;
import org.apache.hadoop.io.Writable;
import java.io.DataInput;
@@ -1023,7 +1025,8 @@ public class HLog implements HConstants, Syncable {
* @param rootDir qualified root directory of the HBase instance
* @param srcDir Directory of log files to split: e.g.
* <code>${ROOTDIR}/log_HOST_PORT</code>
- * @param oldLogDir
+ * @param oldLogDir the system-wide archival directory for
non-split HLogs after either
+ * (a) recovery of a RS or (b) pruning of HLogs in a RS due to flushes.
* @param fs FileSystem
* @param conf HBaseConfiguration
* @throws IOException
@@ -1121,6 +1124,7 @@ public class HLog implements HConstants, Syncable {
int concurrentLogReads =
conf.getInt("hbase.regionserver.hlog.splitlog.reader.threads", 3);
// Is append supported?
+ boolean append = isAppend(conf);
try {
int maxSteps = Double.valueOf(Math.ceil((logfiles.length * 1.0) /
concurrentLogReads)).intValue();
@@ -1139,6 +1143,7 @@ public class HLog implements HConstants, Syncable {
LOG.debug("Splitting hlog " + (i + 1) + " of " + logfiles.length +
": " + logfiles[i].getPath() + ", length=" +
logfiles[i].getLen());
}
+ recoverLog(fs, logfiles[i].getPath(), append);
Reader in = null;
int count = 0;
try {
@@ -1290,6 +1295,65 @@ public class HLog implements HConstants, Syncable {
return splits;
}
+ /*
+ * @param conf
+ * @return True if append enabled and we have the syncFs in our path.
+ */
+ static boolean isAppend(final Configuration conf) {
+ boolean append = conf.getBoolean("dfs.support.append", false);
+ if (append) {
+ try {
+ SequenceFile.Writer.class.getMethod("syncFs", new Class<?> []{});
+ append = true;
+ } catch (SecurityException ignored) {
+ } catch (NoSuchMethodException e) {
+ append = false;
+ }
+ }
+ return append;
+ }
+
+ /*
+ * Recover log.
+ * Try and open log in append mode.
+ * Doing this, we get a hold of the file that crashed writer
+ * was writing to. Once we have it, close it. This will
+ * allow subsequent reader to see up to last sync.
+ * @param fs
+ * @param p
+ * @param append
+ */
+ public static void recoverLog(final FileSystem fs, final Path p,
+ final boolean append) {
+ if (!append) {
+ return;
+ }
+
+ // lease recovery not needed for local file system case.
+ // currently, local file system doesn't implement append either.
+ if (!(fs instanceof DistributedFileSystem)) {
+ return;
+ }
+
+ // Trying recovery
+ boolean recovered = false;
+ while (!recovered) {
+ try {
+ FSDataOutputStream out = fs.append(p);
+ out.close();
+ recovered = true;
+ } catch (IOException e) {
+ LOG.info("Failed open for append, waiting on lease recovery: " + p, e);
+ try {
+ Thread.sleep(1000);
+ } catch (InterruptedException ex) {
+ // ignore it and try again
+ }
+ }
+ }
+ LOG.info("Past out lease recovery");
+ }
+
/**
* Utility class that lets us keep track of the edit with it's key
* Only used when splitting logs
diff --git a/core/src/test/java/org/apache/hadoop/hbase/regionserver/TestStoreReconstruction.java
b/core/src/test/java/org/apache/hadoop/hbase/regionserver/TestStoreReconstruction.java
index 05dd38d..7f6e869 100644
--- a/core/src/test/java/org/apache/hadoop/hbase/regionserver/TestStoreReconstruction.java
+++ b/core/src/test/java/org/apache/hadoop/hbase/regionserver/TestStoreReconstruction.java
@@ -66,6 +66,7 @@ public class TestStoreReconstruction {
public void setUp() throws Exception {
conf = TEST_UTIL.getConfiguration();
+ conf.set("dfs.support.append", "true");
cluster = new MiniDFSCluster(conf, 3, true, (String[])null);
// Set the hbase.rootdir to be the home directory in mini dfs.
TEST_UTIL.getConfiguration().set(HConstants.HBASE_DIR,
@@ -99,7 +100,7 @@ public class TestStoreReconstruction {
HRegionInfo info = new HRegionInfo(htd, null, null, false);
Path oldLogDir = new Path(this.dir, HConstants.HREGION_OLDLOGDIR_NAME);
HLog log = new HLog(cluster.getFileSystem(),
- this.dir, oldLogDir, conf, null);
+ new Path(this.dir, "rs"), oldLogDir, conf, null);
HRegion region = new HRegion(dir, log,
cluster.getFileSystem(),conf, info, null);
List<KeyValue> result = new ArrayList<KeyValue>();
@@ -134,11 +135,11 @@ public class TestStoreReconstruction {
log.sync();
// TODO dont close the file here.
- log.close();
+ //log.close();
List<Path> splits =
HLog.splitLog(new Path(conf.get(HConstants.HBASE_DIR)),
- this.dir, oldLogDir, cluster.getFileSystem(), conf);
+ new Path(this.dir, "rs"), oldLogDir,
cluster.getFileSystem(), conf);
// Split should generate only 1 file since there's only 1 region
assertEquals(1, splits.size());
diff --git a/pom.xml b/pom.xml
index 9e2ed37..a53f484 100644
--- a/pom.xml
+++ b/pom.xml
@@ -158,7 +158,7 @@
<compileSource>1.6</compileSource>
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
- <hadoop.version>0.20.2</hadoop.version>
+ <hadoop.version>0.20.2+228+200a</hadoop.version>
<log4j.version>1.2.15</log4j.version>
<jetty.version>6.1.14</jetty.version>
Re: Tests with HDFS-200 on trunk issues
Posted by Todd Lipcon <to...@cloudera.com>.
I think what you've gotta do in the test is get a new FileSystem instance,
so that you get a new DFSClient (with a new client ID). That way it won't
trigger the checks for "original holder recreating file".
-Todd
On Mon, Mar 22, 2010 at 7:12 PM, Ryan Rawson <ry...@gmail.com> wrote:
> Interesting story here, testing with dfs.support.append=true might be
> difficult/impossible using unit tests because of this code:
>
> //
> // We found the lease for this file. And surprisingly the original
> // holder is trying to recreate this file. This should never occur.
> //
> if (lease != null) {
> Lease leaseFile = leaseManager.getLeaseByPath(src);
> if (leaseFile != null && leaseFile.equals(lease)) {
> throw new AlreadyBeingCreatedException(
> "failed to create
> file " + src + " for " + holder +
> " on client " +
> clientMachine +
> " because current
> leaseholder is trying to recreate file.");
> }
> }
>
>
> I ran into this with the following patch on trunk:
>
> diff --git
> a/core/src/main/java/org/apache/hadoop/hbase/regionserver/wal/HLog.java
> b/core/src/main/java/org/apache/hadoop/hbase/regionserver/wal/HLog.java
> index d2b01fe..8c81aed 100644
> --- a/core/src/main/java/org/apache/hadoop/hbase/regionserver/wal/HLog.java
> +++ b/core/src/main/java/org/apache/hadoop/hbase/regionserver/wal/HLog.java
> @@ -22,6 +22,7 @@ package org.apache.hadoop.hbase.regionserver.wal;
> import org.apache.commons.logging.Log;
> import org.apache.commons.logging.LogFactory;
> import org.apache.hadoop.conf.Configuration;
> +import org.apache.hadoop.fs.FSDataOutputStream;
> import org.apache.hadoop.fs.FileStatus;
> import org.apache.hadoop.fs.FileSystem;
> import org.apache.hadoop.fs.Path;
> @@ -39,6 +40,7 @@ import org.apache.hadoop.hbase.util.ClassSize;
> import org.apache.hadoop.hbase.util.FSUtils;
> import org.apache.hadoop.hbase.util.Threads;
> import org.apache.hadoop.hdfs.DistributedFileSystem;
> +import org.apache.hadoop.io.SequenceFile;
> import org.apache.hadoop.io.Writable;
>
> import java.io.DataInput;
> @@ -1023,7 +1025,8 @@ public class HLog implements HConstants, Syncable {
> * @param rootDir qualified root directory of the HBase instance
> * @param srcDir Directory of log files to split: e.g.
> * <code>${ROOTDIR}/log_HOST_PORT</code>
> - * @param oldLogDir
> + * @param oldLogDir the system-wide archival directory for
> non-split HLogs after either
> + * (a) recovery of a RS or (b) pruning of HLogs in a RS due to flushes.
> * @param fs FileSystem
> * @param conf HBaseConfiguration
> * @throws IOException
> @@ -1121,6 +1124,7 @@ public class HLog implements HConstants, Syncable {
> int concurrentLogReads =
> conf.getInt("hbase.regionserver.hlog.splitlog.reader.threads", 3);
> // Is append supported?
> + boolean append = isAppend(conf);
> try {
> int maxSteps = Double.valueOf(Math.ceil((logfiles.length * 1.0) /
> concurrentLogReads)).intValue();
> @@ -1139,6 +1143,7 @@ public class HLog implements HConstants, Syncable {
> LOG.debug("Splitting hlog " + (i + 1) + " of " +
> logfiles.length +
> ": " + logfiles[i].getPath() + ", length=" +
> logfiles[i].getLen());
> }
> + recoverLog(fs, logfiles[i].getPath(), append);
> Reader in = null;
> int count = 0;
> try {
> @@ -1290,6 +1295,65 @@ public class HLog implements HConstants, Syncable {
> return splits;
> }
>
> + /*
> + * @param conf
> + * @return True if append enabled and we have the syncFs in our path.
> + */
> + static boolean isAppend(final Configuration conf) {
> + boolean append = conf.getBoolean("dfs.support.append", false);
> + if (append) {
> + try {
> + SequenceFile.Writer.class.getMethod("syncFs", new Class<?> []{});
> + append = true;
> + } catch (SecurityException ignored) {
> + } catch (NoSuchMethodException e) {
> + append = false;
> + }
> + }
> + return append;
> + }
> +
> + /*
> + * Recover log.
> + * Try and open log in append mode.
> + * Doing this, we get a hold of the file that crashed writer
> + * was writing to. Once we have it, close it. This will
> + * allow subsequent reader to see up to last sync.
> + * @param fs
> + * @param p
> + * @param append
> + */
> + public static void recoverLog(final FileSystem fs, final Path p,
> + final boolean append) {
> + if (!append) {
> + return;
> + }
> +
> + // lease recovery not needed for local file system case.
> + // currently, local file system doesn't implement append either.
> + if (!(fs instanceof DistributedFileSystem)) {
> + return;
> + }
> +
> + // Trying recovery
> + boolean recovered = false;
> + while (!recovered) {
> + try {
> + FSDataOutputStream out = fs.append(p);
> + out.close();
> + recovered = true;
> + } catch (IOException e) {
> + LOG.info("Failed open for append, waiting on lease recovery: " +
> p, e);
> + try {
> + Thread.sleep(1000);
> + } catch (InterruptedException ex) {
> + // ignore it and try again
> + }
> + }
> + }
> + LOG.info("Past out lease recovery");
> + }
> +
> /**
> * Utility class that lets us keep track of the edit with it's key
> * Only used when splitting logs
> diff --git
> a/core/src/test/java/org/apache/hadoop/hbase/regionserver/TestStoreReconstruction.java
>
> b/core/src/test/java/org/apache/hadoop/hbase/regionserver/TestStoreReconstruction.java
> index 05dd38d..7f6e869 100644
> ---
> a/core/src/test/java/org/apache/hadoop/hbase/regionserver/TestStoreReconstruction.java
> +++
> b/core/src/test/java/org/apache/hadoop/hbase/regionserver/TestStoreReconstruction.java
> @@ -66,6 +66,7 @@ public class TestStoreReconstruction {
> public void setUp() throws Exception {
>
> conf = TEST_UTIL.getConfiguration();
> + conf.set("dfs.support.append", "true");
> cluster = new MiniDFSCluster(conf, 3, true, (String[])null);
> // Set the hbase.rootdir to be the home directory in mini dfs.
> TEST_UTIL.getConfiguration().set(HConstants.HBASE_DIR,
> @@ -99,7 +100,7 @@ public class TestStoreReconstruction {
> HRegionInfo info = new HRegionInfo(htd, null, null, false);
> Path oldLogDir = new Path(this.dir, HConstants.HREGION_OLDLOGDIR_NAME);
> HLog log = new HLog(cluster.getFileSystem(),
> - this.dir, oldLogDir, conf, null);
> + new Path(this.dir, "rs"), oldLogDir, conf, null);
> HRegion region = new HRegion(dir, log,
> cluster.getFileSystem(),conf, info, null);
> List<KeyValue> result = new ArrayList<KeyValue>();
> @@ -134,11 +135,11 @@ public class TestStoreReconstruction {
> log.sync();
>
> // TODO dont close the file here.
> - log.close();
> + //log.close();
>
> List<Path> splits =
> HLog.splitLog(new Path(conf.get(HConstants.HBASE_DIR)),
> - this.dir, oldLogDir, cluster.getFileSystem(), conf);
> + new Path(this.dir, "rs"), oldLogDir,
> cluster.getFileSystem(), conf);
>
> // Split should generate only 1 file since there's only 1 region
> assertEquals(1, splits.size());
> diff --git a/pom.xml b/pom.xml
> index 9e2ed37..a53f484 100644
> --- a/pom.xml
> +++ b/pom.xml
> @@ -158,7 +158,7 @@
> <compileSource>1.6</compileSource>
> <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
>
> - <hadoop.version>0.20.2</hadoop.version>
> + <hadoop.version>0.20.2+228+200a</hadoop.version>
>
> <log4j.version>1.2.15</log4j.version>
> <jetty.version>6.1.14</jetty.version>
>
--
Todd Lipcon
Software Engineer, Cloudera