You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hbase.apache.org by te...@apache.org on 2012/01/31 20:52:58 UTC

svn commit: r1238774 - /hbase/trunk/src/main/java/org/apache/hadoop/hbase/mapreduce/TableInputFormatBase.java

Author: tedyu
Date: Tue Jan 31 19:52:58 2012
New Revision: 1238774

URL: http://svn.apache.org/viewvc?rev=1238774&view=rev
Log:
HBASE-5259 Normalize the RegionLocation in TableInputFormat by the reverse DNS lookup (Liyin Tang)

Modified:
    hbase/trunk/src/main/java/org/apache/hadoop/hbase/mapreduce/TableInputFormatBase.java

Modified: hbase/trunk/src/main/java/org/apache/hadoop/hbase/mapreduce/TableInputFormatBase.java
URL: http://svn.apache.org/viewvc/hbase/trunk/src/main/java/org/apache/hadoop/hbase/mapreduce/TableInputFormatBase.java?rev=1238774&r1=1238773&r2=1238774&view=diff
==============================================================================
--- hbase/trunk/src/main/java/org/apache/hadoop/hbase/mapreduce/TableInputFormatBase.java (original)
+++ hbase/trunk/src/main/java/org/apache/hadoop/hbase/mapreduce/TableInputFormatBase.java Tue Jan 31 19:52:58 2012
@@ -20,11 +20,17 @@
 package org.apache.hadoop.hbase.mapreduce;
 
 import java.io.IOException;
+import java.net.InetAddress;
 import java.util.ArrayList;
+import java.util.HashMap;
 import java.util.List;
 
+import javax.naming.NamingException;
+
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.hbase.HConstants;
+import org.apache.hadoop.hbase.HServerAddress;
 import org.apache.hadoop.hbase.client.HTable;
 import org.apache.hadoop.hbase.client.Result;
 import org.apache.hadoop.hbase.client.Scan;
@@ -36,6 +42,7 @@ import org.apache.hadoop.mapreduce.Input
 import org.apache.hadoop.mapreduce.JobContext;
 import org.apache.hadoop.mapreduce.RecordReader;
 import org.apache.hadoop.mapreduce.TaskAttemptContext;
+import org.apache.hadoop.net.DNS;
 
 /**
  * A base for {@link TableInputFormat}s. Receives a {@link HTable}, an
@@ -78,6 +85,13 @@ extends InputFormat<ImmutableBytesWritab
   private TableRecordReader tableRecordReader = null;
 
 
+  /** The reverse DNS lookup cache mapping: IPAddress => HostName */
+  private HashMap<InetAddress, String> reverseDNSCacheMap =
+    new HashMap<InetAddress, String>();
+  
+  /** The NameServer address */
+  private String nameServer = null;
+  
   /**
    * Builds a TableRecordReader. If no TableRecordReader was provided, uses
    * the default.
@@ -128,6 +142,10 @@ extends InputFormat<ImmutableBytesWritab
 	if (table == null) {
 	    throw new IOException("No table was provided.");
 	}
+    // Get the name server address and the default value is null.
+    this.nameServer =
+      context.getConfiguration().get("hbase.nameserver.address", null);
+    
     Pair<byte[][], byte[][]> keys = table.getStartEndKeys();
     if (keys == null || keys.getFirst() == null ||
         keys.getFirst().length == 0) {
@@ -138,13 +156,24 @@ extends InputFormat<ImmutableBytesWritab
       if ( !includeRegionInSplit(keys.getFirst()[i], keys.getSecond()[i])) {
         continue;
       }
-      String regionLocation = table.getRegionLocation(keys.getFirst()[i]).
-        getHostname();
-      byte[] startRow = scan.getStartRow();
-      byte[] stopRow = scan.getStopRow();
-      // determine if the given start an stop key fall into the region
+      HServerAddress regionServerAddress = 
+        table.getRegionLocation(keys.getFirst()[i]).getServerAddress();
+      InetAddress regionAddress =
+        regionServerAddress.getInetSocketAddress().getAddress();
+      String regionLocation;
+      try {
+        regionLocation = reverseDNS(regionAddress);
+      } catch (NamingException e) {
+        LOG.error("Cannot resolve the host name for " + regionAddress +
+            " because of " + e);
+        regionLocation = regionServerAddress.getHostname();
+      }
+
+			byte[] startRow = scan.getStartRow();
+			byte[] stopRow = scan.getStopRow();
+			// determine if the given start an stop key fall into the region
       if ((startRow.length == 0 || keys.getSecond()[i].length == 0 ||
-           Bytes.compareTo(startRow, keys.getSecond()[i]) < 0) &&
+					 Bytes.compareTo(startRow, keys.getSecond()[i]) < 0) &&
           (stopRow.length == 0 ||
            Bytes.compareTo(stopRow, keys.getFirst()[i]) > 0)) {
         byte[] splitStart = startRow.length == 0 ||
@@ -164,6 +193,15 @@ extends InputFormat<ImmutableBytesWritab
     }
     return splits;
   }
+  
+  private String reverseDNS(InetAddress ipAddress) throws NamingException {
+    String hostName = this.reverseDNSCacheMap.get(ipAddress);
+    if (hostName == null) {
+      hostName = DNS.reverseDns(ipAddress, this.nameServer);
+      this.reverseDNSCacheMap.put(ipAddress, hostName);
+    }
+    return hostName;
+  }
 
   /**
    *