You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hbase.apache.org by te...@apache.org on 2014/03/10 17:05:48 UTC

svn commit: r1575985 - in /hbase/branches/0.94/src: main/java/org/apache/hadoop/hbase/regionserver/Store.java main/java/org/apache/hadoop/hbase/util/FSHDFSUtils.java test/java/org/apache/hadoop/hbase/util/TestFSHDFSUtils.java

Author: tedyu
Date: Mon Mar 10 16:05:48 2014
New Revision: 1575985

URL: http://svn.apache.org/r1575985
Log:
HBASE-10712 Backport HBASE-8304 to 0.94 and 0.96 (Haosdent)


Modified:
    hbase/branches/0.94/src/main/java/org/apache/hadoop/hbase/regionserver/Store.java
    hbase/branches/0.94/src/main/java/org/apache/hadoop/hbase/util/FSHDFSUtils.java
    hbase/branches/0.94/src/test/java/org/apache/hadoop/hbase/util/TestFSHDFSUtils.java

Modified: hbase/branches/0.94/src/main/java/org/apache/hadoop/hbase/regionserver/Store.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.94/src/main/java/org/apache/hadoop/hbase/regionserver/Store.java?rev=1575985&r1=1575984&r2=1575985&view=diff
==============================================================================
--- hbase/branches/0.94/src/main/java/org/apache/hadoop/hbase/regionserver/Store.java (original)
+++ hbase/branches/0.94/src/main/java/org/apache/hadoop/hbase/regionserver/Store.java Mon Mar 10 16:05:48 2014
@@ -72,12 +72,7 @@ import org.apache.hadoop.hbase.regionser
 import org.apache.hadoop.hbase.regionserver.compactions.CompactionRequest;
 import org.apache.hadoop.hbase.regionserver.metrics.SchemaConfigured;
 import org.apache.hadoop.hbase.regionserver.metrics.SchemaMetrics;
-import org.apache.hadoop.hbase.util.Bytes;
-import org.apache.hadoop.hbase.util.ChecksumType;
-import org.apache.hadoop.hbase.util.ClassSize;
-import org.apache.hadoop.hbase.util.CollectionBackedScanner;
-import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
-import org.apache.hadoop.hbase.util.FSUtils;
+import org.apache.hadoop.hbase.util.*;
 import org.apache.hadoop.util.StringUtils;
 
 import com.google.common.base.Preconditions;
@@ -639,7 +634,7 @@ public class Store extends SchemaConfigu
     //equals() includes UGI instance as part of the comparison
     //and won't work when doing SecureBulkLoad
     //TODO deal with viewFS
-    if (!srcFs.getUri().equals(desFs.getUri())) {
+    if (!FSHDFSUtils.isSameHdfs(conf, srcFs, desFs)) {
       LOG.info("File " + srcPath + " on different filesystem than " +
           "destination store - moving to this filesystem.");
       Path tmpPath = getTmpPath();

Modified: hbase/branches/0.94/src/main/java/org/apache/hadoop/hbase/util/FSHDFSUtils.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.94/src/main/java/org/apache/hadoop/hbase/util/FSHDFSUtils.java?rev=1575985&r1=1575984&r2=1575985&view=diff
==============================================================================
--- hbase/branches/0.94/src/main/java/org/apache/hadoop/hbase/util/FSHDFSUtils.java (original)
+++ hbase/branches/0.94/src/main/java/org/apache/hadoop/hbase/util/FSHDFSUtils.java Mon Mar 10 16:05:48 2014
@@ -22,7 +22,13 @@ import java.io.FileNotFoundException;
 import java.io.IOException;
 import java.io.InterruptedIOException;
 import java.lang.reflect.Method;
+import java.net.InetSocketAddress;
+import java.net.URI;
+import java.util.HashSet;
+import java.util.Map;
+import java.util.Set;
 
+import com.google.common.collect.Sets;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.conf.Configuration;
@@ -37,6 +43,82 @@ import org.apache.hadoop.hdfs.server.nam
  */
 public class FSHDFSUtils extends FSUtils{
   private static final Log LOG = LogFactory.getLog(FSHDFSUtils.class);
+  private static Class dfsUtilClazz;
+  private static Method getNNAddressesMethod;
+
+  /**
+   * @param fs
+   * @param conf
+   * @return A set containing all namenode addresses of fs
+   */
+  private static Set<InetSocketAddress> getNNAddresses(DistributedFileSystem fs,
+                                                      Configuration conf) {
+    Set<InetSocketAddress> addresses = new HashSet<InetSocketAddress>();
+    String serviceName = fs.getCanonicalServiceName();
+
+    if (serviceName.startsWith("ha-hdfs")) {
+      try {
+        if (dfsUtilClazz == null) {
+          dfsUtilClazz = Class.forName("org.apache.hadoop.hdfs.DFSUtil");
+        }
+        if (getNNAddressesMethod == null) {
+          getNNAddressesMethod =
+                  dfsUtilClazz.getMethod("getNNServiceRpcAddresses", Configuration.class);
+        }
+
+        Map<String, Map<String, InetSocketAddress>> addressMap =
+                (Map<String, Map<String, InetSocketAddress>>) getNNAddressesMethod
+                        .invoke(null, conf);
+        for (Map.Entry<String, Map<String, InetSocketAddress>> entry : addressMap.entrySet()) {
+          Map<String, InetSocketAddress> nnMap = entry.getValue();
+          for (Map.Entry<String, InetSocketAddress> e2 : nnMap.entrySet()) {
+            InetSocketAddress addr = e2.getValue();
+            addresses.add(addr);
+          }
+        }
+      } catch (Exception e) {
+        LOG.warn("DFSUtil.getNNServiceRpcAddresses failed. serviceName=" + serviceName, e);
+      }
+    } else {
+      URI uri = fs.getUri();
+      InetSocketAddress addr = new InetSocketAddress(uri.getHost(), uri.getPort());
+      addresses.add(addr);
+    }
+
+    return addresses;
+  }
+
+  /**
+   * @param conf the Configuration of HBase
+   * @param srcFs
+   * @param desFs
+   * @return Whether srcFs and desFs are on same hdfs or not
+   */
+  public static boolean isSameHdfs(Configuration conf, FileSystem srcFs, FileSystem desFs) {
+    // By getCanonicalServiceName, we could make sure both srcFs and desFs
+    // show a unified format which contains scheme, host and port.
+    String srcServiceName = srcFs.getCanonicalServiceName();
+    String desServiceName = desFs.getCanonicalServiceName();
+
+    if (srcServiceName == null || desServiceName == null) {
+      return false;
+    }
+    if (srcServiceName.equals(desServiceName)) {
+      return true;
+    }
+    if (srcFs instanceof DistributedFileSystem && desFs instanceof DistributedFileSystem) {
+      //If one serviceName is an HA format while the other is a non-HA format,
+      // maybe they refer to the same FileSystem.
+      //For example, srcFs is "ha-hdfs://nameservices" and desFs is "hdfs://activeNamenode:port"
+      Set<InetSocketAddress> srcAddrs = getNNAddresses((DistributedFileSystem) srcFs, conf);
+      Set<InetSocketAddress> desAddrs = getNNAddresses((DistributedFileSystem) desFs, conf);
+      if (Sets.intersection(srcAddrs, desAddrs).size() > 0) {
+        return true;
+      }
+    }
+
+    return false;
+  }
 
   /**
    * Recover the lease from HDFS, retrying multiple times.

Modified: hbase/branches/0.94/src/test/java/org/apache/hadoop/hbase/util/TestFSHDFSUtils.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.94/src/test/java/org/apache/hadoop/hbase/util/TestFSHDFSUtils.java?rev=1575985&r1=1575984&r2=1575985&view=diff
==============================================================================
--- hbase/branches/0.94/src/test/java/org/apache/hadoop/hbase/util/TestFSHDFSUtils.java (original)
+++ hbase/branches/0.94/src/test/java/org/apache/hadoop/hbase/util/TestFSHDFSUtils.java Mon Mar 10 16:05:48 2014
@@ -21,8 +21,12 @@ import static org.junit.Assert.assertTru
 
 import java.io.IOException;
 
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hbase.HBaseConfiguration;
 import org.apache.hadoop.hbase.HBaseTestingUtility;
 import org.apache.hadoop.hbase.MediumTests;
 import org.apache.hadoop.hdfs.DistributedFileSystem;
@@ -36,6 +40,7 @@ import org.mockito.Mockito;
  */
 @Category(MediumTests.class)
 public class TestFSHDFSUtils {
+  private static final Log LOG = LogFactory.getLog(TestFSHDFSUtils.class);
   private static final HBaseTestingUtility HTU = new HBaseTestingUtility();
   static {
     Configuration conf = HTU.getConfiguration();
@@ -90,6 +95,52 @@ public class TestFSHDFSUtils {
     Mockito.verify(dfs, Mockito.times(1)).isFileClosed(FILE);
   }
 
+  @Test
+  public void testIsSameHdfs() throws IOException {
+    try {
+      Class dfsUtilClazz = Class.forName("org.apache.hadoop.hdfs.DFSUtil");
+      dfsUtilClazz.getMethod("getNNServiceRpcAddresses", Configuration.class);
+    } catch (Exception e) {
+      LOG.info("Skip testIsSameHdfs test case because of the no-HA hadoop version.");
+      return;
+    }
+
+    Configuration conf = HBaseConfiguration.create();
+    Path srcPath = new Path("hdfs://localhost:8020/");
+    Path desPath = new Path("hdfs://127.0.0.1/");
+    FileSystem srcFs = srcPath.getFileSystem(conf);
+    FileSystem desFs = desPath.getFileSystem(conf);
+
+    assertTrue(FSHDFSUtils.isSameHdfs(conf, srcFs, desFs));
+
+    desPath = new Path("hdfs://127.0.0.1:8070/");
+    desFs = desPath.getFileSystem(conf);
+    assertTrue(!FSHDFSUtils.isSameHdfs(conf, srcFs, desFs));
+
+    desPath = new Path("hdfs://127.0.1.1:8020/");
+    desFs = desPath.getFileSystem(conf);
+    assertTrue(!FSHDFSUtils.isSameHdfs(conf, srcFs, desFs));
+
+    conf.set("fs.defaultFS", "hdfs://haosong-hadoop");
+    conf.set("dfs.nameservices", "haosong-hadoop");
+    conf.set("dfs.federation.nameservices", "haosong-hadoop");
+    conf.set("dfs.ha.namenodes.haosong-hadoop", "nn1,nn2");
+    conf.set("dfs.client.failover.proxy.provider.haosong-hadoop",
+            "org.apache.hadoop.hdfs.server.namenode.ha.ConfiguredFailoverProxyProvider");
+
+    conf.set("dfs.namenode.rpc-address.haosong-hadoop.nn1", "127.0.0.1:8020");
+    conf.set("dfs.namenode.rpc-address.haosong-hadoop.nn2", "127.10.2.1:8000");
+    desPath = new Path("/");
+    desFs = desPath.getFileSystem(conf);
+    assertTrue(FSHDFSUtils.isSameHdfs(conf, srcFs, desFs));
+
+    conf.set("dfs.namenode.rpc-address.haosong-hadoop.nn1", "127.10.2.1:8020");
+    conf.set("dfs.namenode.rpc-address.haosong-hadoop.nn2", "127.0.0.1:8000");
+    desPath = new Path("/");
+    desFs = desPath.getFileSystem(conf);
+    assertTrue(!FSHDFSUtils.isSameHdfs(conf, srcFs, desFs));
+  }
+
   /**
    * Version of DFS that has HDFS-4525 in it.
    */
@@ -102,4 +153,4 @@ public class TestFSHDFSUtils {
       return false;
     }
   }
-}
\ No newline at end of file
+}