You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hbase.apache.org by te...@apache.org on 2014/03/08 19:48:25 UTC
svn commit: r1575588 - in /hbase/branches/0.98/hbase-server/src:
main/java/org/apache/hadoop/hbase/regionserver/HRegionFileSystem.java
main/java/org/apache/hadoop/hbase/util/FSHDFSUtils.java
test/java/org/apache/hadoop/hbase/util/TestFSHDFSUtils.java
Author: tedyu
Date: Sat Mar 8 18:48:25 2014
New Revision: 1575588
URL: http://svn.apache.org/r1575588
Log:
HBASE-8304 Bulkload fails to remove files if fs.default.name / fs.defaultFS is configured without default port (Haosdent)
Modified:
hbase/branches/0.98/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionFileSystem.java
hbase/branches/0.98/hbase-server/src/main/java/org/apache/hadoop/hbase/util/FSHDFSUtils.java
hbase/branches/0.98/hbase-server/src/test/java/org/apache/hadoop/hbase/util/TestFSHDFSUtils.java
Modified: hbase/branches/0.98/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionFileSystem.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.98/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionFileSystem.java?rev=1575588&r1=1575587&r2=1575588&view=diff
==============================================================================
--- hbase/branches/0.98/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionFileSystem.java (original)
+++ hbase/branches/0.98/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionFileSystem.java Sat Mar 8 18:48:25 2014
@@ -48,8 +48,8 @@ import org.apache.hadoop.hbase.backup.HF
import org.apache.hadoop.hbase.fs.HFileSystem;
import org.apache.hadoop.hbase.io.Reference;
import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.hadoop.hbase.util.FSHDFSUtils;
import org.apache.hadoop.hbase.util.FSUtils;
-import org.apache.hadoop.hbase.util.Threads;
/**
* View to an on-disk Region.
@@ -402,7 +402,7 @@ public class HRegionFileSystem {
// We can't compare FileSystem instances as equals() includes UGI instance
// as part of the comparison and won't work when doing SecureBulkLoad
// TODO deal with viewFS
- if (!srcFs.getUri().equals(desFs.getUri())) {
+ if (!FSHDFSUtils.isSameHdfs(conf, srcFs, desFs)) {
LOG.info("Bulk-load file " + srcPath + " is on different filesystem than " +
"the destination store. Copying file over to destination filesystem.");
Path tmpPath = createTempName();
Modified: hbase/branches/0.98/hbase-server/src/main/java/org/apache/hadoop/hbase/util/FSHDFSUtils.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.98/hbase-server/src/main/java/org/apache/hadoop/hbase/util/FSHDFSUtils.java?rev=1575588&r1=1575587&r2=1575588&view=diff
==============================================================================
--- hbase/branches/0.98/hbase-server/src/main/java/org/apache/hadoop/hbase/util/FSHDFSUtils.java (original)
+++ hbase/branches/0.98/hbase-server/src/main/java/org/apache/hadoop/hbase/util/FSHDFSUtils.java Sat Mar 8 18:48:25 2014
@@ -22,7 +22,13 @@ import java.io.FileNotFoundException;
import java.io.IOException;
import java.io.InterruptedIOException;
import java.lang.reflect.Method;
+import java.net.InetSocketAddress;
+import java.net.URI;
+import java.util.HashSet;
+import java.util.Map;
+import java.util.Set;
+import com.google.common.collect.Sets;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.classification.InterfaceAudience;
@@ -41,6 +47,82 @@ import org.apache.hadoop.hdfs.server.nam
@InterfaceStability.Evolving
public class FSHDFSUtils extends FSUtils {
private static final Log LOG = LogFactory.getLog(FSHDFSUtils.class);
+ private static Class dfsUtilClazz;
+ private static Method getNNAddressesMethod;
+
+ /**
+ * @param fs
+ * @param conf
+ * @return A set containing all namenode addresses of fs
+ */
+ private static Set<InetSocketAddress> getNNAddresses(DistributedFileSystem fs,
+ Configuration conf) {
+ Set<InetSocketAddress> addresses = new HashSet<InetSocketAddress>();
+ String serviceName = fs.getCanonicalServiceName();
+
+ if (serviceName.startsWith("ha-hdfs")) {
+ try {
+ if (dfsUtilClazz == null) {
+ dfsUtilClazz = Class.forName("org.apache.hadoop.hdfs.DFSUtil");
+ }
+ if (getNNAddressesMethod == null) {
+ getNNAddressesMethod =
+ dfsUtilClazz.getMethod("getNNServiceRpcAddresses", Configuration.class);
+ }
+
+ Map<String, Map<String, InetSocketAddress>> addressMap =
+ (Map<String, Map<String, InetSocketAddress>>) getNNAddressesMethod
+ .invoke(null, conf);
+ for (Map.Entry<String, Map<String, InetSocketAddress>> entry : addressMap.entrySet()) {
+ Map<String, InetSocketAddress> nnMap = entry.getValue();
+ for (Map.Entry<String, InetSocketAddress> e2 : nnMap.entrySet()) {
+ InetSocketAddress addr = e2.getValue();
+ addresses.add(addr);
+ }
+ }
+ } catch (Exception e) {
+ LOG.warn("DFSUtil.getNNServiceRpcAddresses failed. serviceName=" + serviceName, e);
+ }
+ } else {
+ URI uri = fs.getUri();
+ InetSocketAddress addr = new InetSocketAddress(uri.getHost(), uri.getPort());
+ addresses.add(addr);
+ }
+
+ return addresses;
+ }
+
+ /**
+ * @param conf the Configuration of HBase
+ * @param srcFs
+ * @param desFs
+ * @return Whether srcFs and desFs are on same hdfs or not
+ */
+ public static boolean isSameHdfs(Configuration conf, FileSystem srcFs, FileSystem desFs) {
+ // By getCanonicalServiceName, we could make sure both srcFs and desFs
+ // show a unified format which contains scheme, host and port.
+ String srcServiceName = srcFs.getCanonicalServiceName();
+ String desServiceName = desFs.getCanonicalServiceName();
+
+ if (srcServiceName == null || desServiceName == null) {
+ return false;
+ }
+ if (srcServiceName.equals(desServiceName)) {
+ return true;
+ }
+ if (srcFs instanceof DistributedFileSystem && desFs instanceof DistributedFileSystem) {
+ //If one serviceName is an HA format while the other is a non-HA format,
+ // maybe they refer to the same FileSystem.
+ //For example, srcFs is "ha-hdfs://nameservices" and desFs is "hdfs://activeNamenode:port"
+ Set<InetSocketAddress> srcAddrs = getNNAddresses((DistributedFileSystem) srcFs, conf);
+ Set<InetSocketAddress> desAddrs = getNNAddresses((DistributedFileSystem) desFs, conf);
+ if (Sets.intersection(srcAddrs, desAddrs).size() > 0) {
+ return true;
+ }
+ }
+
+ return false;
+ }
/**
* Recover the lease from HDFS, retrying multiple times.
Modified: hbase/branches/0.98/hbase-server/src/test/java/org/apache/hadoop/hbase/util/TestFSHDFSUtils.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.98/hbase-server/src/test/java/org/apache/hadoop/hbase/util/TestFSHDFSUtils.java?rev=1575588&r1=1575587&r2=1575588&view=diff
==============================================================================
--- hbase/branches/0.98/hbase-server/src/test/java/org/apache/hadoop/hbase/util/TestFSHDFSUtils.java (original)
+++ hbase/branches/0.98/hbase-server/src/test/java/org/apache/hadoop/hbase/util/TestFSHDFSUtils.java Sat Mar 8 18:48:25 2014
@@ -21,8 +21,12 @@ import static org.junit.Assert.assertTru
import java.io.IOException;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.HBaseTestingUtility;
import org.apache.hadoop.hbase.MediumTests;
import org.apache.hadoop.hdfs.DistributedFileSystem;
@@ -36,6 +40,7 @@ import org.mockito.Mockito;
*/
@Category(MediumTests.class)
public class TestFSHDFSUtils {
+ private static final Log LOG = LogFactory.getLog(TestFSHDFSUtils.class);
private static final HBaseTestingUtility HTU = new HBaseTestingUtility();
static {
Configuration conf = HTU.getConfiguration();
@@ -94,6 +99,51 @@ public class TestFSHDFSUtils {
Mockito.verify(dfs, Mockito.times(1)).isFileClosed(FILE);
}
+ @Test
+ public void testIsSameHdfs() throws IOException {
+ try {
+ Class dfsUtilClazz = Class.forName("org.apache.hadoop.hdfs.DFSUtil");
+ dfsUtilClazz.getMethod("getNNServiceRpcAddresses", Configuration.class);
+ } catch (Exception e) {
+ LOG.info("Skip testIsSameHdfs test case because of the no-HA hadoop version.");
+ return;
+ }
+
+ Configuration conf = HBaseConfiguration.create();
+ Path srcPath = new Path("hdfs://localhost:8020/");
+ Path desPath = new Path("hdfs://127.0.0.1/");
+ FileSystem srcFs = srcPath.getFileSystem(conf);
+ FileSystem desFs = desPath.getFileSystem(conf);
+
+ assertTrue(FSHDFSUtils.isSameHdfs(conf, srcFs, desFs));
+
+ desPath = new Path("hdfs://127.0.0.1:8070/");
+ desFs = desPath.getFileSystem(conf);
+ assertTrue(!FSHDFSUtils.isSameHdfs(conf, srcFs, desFs));
+
+ desPath = new Path("hdfs://127.0.1.1:8020/");
+ desFs = desPath.getFileSystem(conf);
+ assertTrue(!FSHDFSUtils.isSameHdfs(conf, srcFs, desFs));
+
+ conf.set("fs.defaultFS", "hdfs://haosong-hadoop");
+ conf.set("dfs.nameservices", "haosong-hadoop");
+ conf.set("dfs.ha.namenodes.haosong-hadoop", "nn1,nn2");
+ conf.set("dfs.client.failover.proxy.provider.haosong-hadoop",
+ "org.apache.hadoop.hdfs.server.namenode.ha.ConfiguredFailoverProxyProvider");
+
+ conf.set("dfs.namenode.rpc-address.haosong-hadoop.nn1", "127.0.0.1:8020");
+ conf.set("dfs.namenode.rpc-address.haosong-hadoop.nn2", "127.10.2.1:8000");
+ desPath = new Path("/");
+ desFs = desPath.getFileSystem(conf);
+ assertTrue(FSHDFSUtils.isSameHdfs(conf, srcFs, desFs));
+
+ conf.set("dfs.namenode.rpc-address.haosong-hadoop.nn1", "127.10.2.1:8020");
+ conf.set("dfs.namenode.rpc-address.haosong-hadoop.nn2", "127.0.0.1:8000");
+ desPath = new Path("/");
+ desFs = desPath.getFileSystem(conf);
+ assertTrue(!FSHDFSUtils.isSameHdfs(conf, srcFs, desFs));
+ }
+
/**
* Version of DFS that has HDFS-4525 in it.
*/