You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hbase.apache.org by li...@apache.org on 2013/11/06 20:18:20 UTC

svn commit: r1539432 - in /hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase: client/HTable.java mapred/TableInputFormat.java mapred/TableInputFormatBase.java mapreduce/TableInputFormat.java mapreduce/TableInputFormatBase.java

Author: liyin
Date: Wed Nov  6 19:18:20 2013
New Revision: 1539432

URL: http://svn.apache.org/r1539432
Log:
[HBASE-9896] Adds mappers/job option for HBaseStreaming

Author: rshroff

Summary:
Currently the split algo only supports mappers/region for any MR
job for HBase Streaming. The reason is to have locality of the
region server hosting the region and the mapper task.
However in certain scenarios, we need to have a strict limit on
the number of mappers per job.

This changes adds an additional parameter to control the total
number of mappers/job. The numMappersPerRegion  and numMappersPerJob
are mutually exclusive.

Test Plan: tested with various startRow/stopRow/num_mapper configs on SH004

Reviewers: manukranthk, liyintang

Reviewed By: liyintang

CC: hbase-eng@, chaoyc, nzhang

Differential Revision: https://phabricator.fb.com/D1032630

Task ID: 2381046

Modified:
    hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/client/HTable.java
    hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/mapred/TableInputFormat.java
    hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/mapred/TableInputFormatBase.java
    hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/mapreduce/TableInputFormat.java
    hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/mapreduce/TableInputFormatBase.java

Modified: hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/client/HTable.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/client/HTable.java?rev=1539432&r1=1539431&r2=1539432&view=diff
==============================================================================
--- hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/client/HTable.java (original)
+++ hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/client/HTable.java Wed Nov  6 19:18:20 2013
@@ -394,6 +394,35 @@ public class HTable implements HTableInt
   }
   
   /**
+   * Gets the starting and ending row keys for every region in the currently
+   * open table.
+   * <p>
+   * This is mainly useful for the MapReduce integration.
+   * @return TreeMap of {startKey,endKey} pairs
+   * @throws IOException if a remote or network exception occurs
+   */
+  @SuppressWarnings("unchecked")
+  public TreeMap<byte[], byte[]> getStartEndKeysMap() throws IOException {
+    final TreeMap<byte[], byte[]> startEndKeysMap =
+      new TreeMap<byte[], byte[]>(new Bytes.ByteArrayComparator());
+    MetaScannerVisitor visitor = new MetaScannerVisitor() {
+      public boolean processRow(Result rowResult) throws IOException {
+        HRegionInfo info = Writables.getHRegionInfo(
+            rowResult.getValue(HConstants.CATALOG_FAMILY,
+                HConstants.REGIONINFO_QUALIFIER));
+        if (Bytes.equals(info.getTableDesc().getName(), getTableName())) {
+          if (!(info.isOffline() || info.isSplit())) {
+            startEndKeysMap.put(info.getStartKey(), info.getEndKey());
+          }
+        }
+        return true;
+      }
+    };
+    MetaScanner.metaScan(configuration, visitor, this.tableName);
+    return startEndKeysMap;
+  }
+
+  /**
    * Returns the Array of StartKeys along with the favoredNodes 
    * for a particular region. Identifying the the favoredNodes using the 
    * Meta table similar to the 

Modified: hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/mapred/TableInputFormat.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/mapred/TableInputFormat.java?rev=1539432&r1=1539431&r2=1539432&view=diff
==============================================================================
--- hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/mapred/TableInputFormat.java (original)
+++ hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/mapred/TableInputFormat.java Wed Nov  6 19:18:20 2013
@@ -34,6 +34,7 @@ import org.apache.hadoop.mapred.JobConfi
 import org.apache.hadoop.util.StringUtils;
 
 import static org.apache.hadoop.hbase.mapreduce.TableInputFormat.MAPPERS_PER_REGION;
+import static org.apache.hadoop.hbase.mapreduce.TableInputFormat.NUM_MAPPERS_PER_JOB;
 import static org.apache.hadoop.hbase.mapreduce.TableInputFormat.SPLIT_ALGO;
 
 /**
@@ -70,6 +71,10 @@ public class TableInputFormat extends Ta
       setNumMapperPerRegion(Integer.parseInt(job.get(MAPPERS_PER_REGION)));
     }
 
+    if (job.get(NUM_MAPPERS_PER_JOB) != null) {
+      setNumMappersPerJob(Integer.parseInt(job.get(NUM_MAPPERS_PER_JOB)));
+    }
+
     setSplitAlgorithm(job.get(SPLIT_ALGO, UniformSplit.class.getSimpleName()));
   }
 
@@ -92,4 +97,4 @@ public class TableInputFormat extends Ta
       throw new IOException("expecting at least one column");
     }
   }
-}
\ No newline at end of file
+}

Modified: hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/mapred/TableInputFormatBase.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/mapred/TableInputFormatBase.java?rev=1539432&r1=1539431&r2=1539432&view=diff
==============================================================================
--- hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/mapred/TableInputFormatBase.java (original)
+++ hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/mapred/TableInputFormatBase.java Wed Nov  6 19:18:20 2013
@@ -79,7 +79,12 @@ implements InputFormat<ImmutableBytesWri
   private Scan scan;
 
   /** The number of mappers to assign to each region. */
-  private int numMappersPerRegion = 1;
+  private int numMappers = 1;
+
+  /** The total number of mappers. The number of mappers per region is
+   * mutually exclusive with number of mappers per job. In case both
+   * are defined, mapperPerJob will take the precedence.*/
+  private boolean mappersPerJob = false;
 
   /** Splitting algorithm to be used to split the keys */
   private String splitAlgmName; // default to Uniform
@@ -134,7 +139,7 @@ implements InputFormat<ImmutableBytesWri
   public InputSplit[] getSplits(JobConf job, int numSplits) throws IOException {
     List<org.apache.hadoop.mapreduce.InputSplit> newStyleSplits =
         org.apache.hadoop.hbase.mapreduce.TableInputFormatBase.getSplitsInternal(
-            table, job, scan, numMappersPerRegion, splitAlgmName, null);
+            table, job, scan, numMappers, mappersPerJob, splitAlgmName, null);
     int n = newStyleSplits.size();
     InputSplit[] result = new InputSplit[n];
     for (int i = 0; i < n; ++i) {
@@ -210,7 +215,30 @@ implements InputFormat<ImmutableBytesWri
       throw new IllegalArgumentException("Expecting at least 1 mapper " +
           "per region; instead got: " + num);
     }
-    numMappersPerRegion = num;
+    if (!mappersPerJob) {
+      numMappers = num;
+    } else {
+      LOG.warn("Ignoring mappersPerRegion config value as mappersPerJob is" +
+        " already set.");
+    }
+  }
+
+  /**
+   * Sets the number of mappers per job.
+   *
+   * @param num
+   * @throws IllegalArgumentException When <code>num</code> <= 0.
+   */
+  public void setNumMappersPerJob(int num) throws IllegalArgumentException {
+    if (num <= 0) {
+      throw new IllegalArgumentException("Expecting at least 1 mapper " +
+          "per region; instead got: " + num);
+    }
+    if (numMappers > 1) {
+      LOG.warn("Overriding num of mappers per region.");
+    }
+    numMappers = num;
+    mappersPerJob = true;
   }
 
   public void setSplitAlgorithm(String name) {

Modified: hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/mapreduce/TableInputFormat.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/mapreduce/TableInputFormat.java?rev=1539432&r1=1539431&r2=1539432&view=diff
==============================================================================
--- hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/mapreduce/TableInputFormat.java (original)
+++ hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/mapreduce/TableInputFormat.java Wed Nov  6 19:18:20 2013
@@ -93,6 +93,10 @@ implements Configurable {
   /** The number of mappers that should be assigned to each region. */
   public static final String MAPPERS_PER_REGION = "hbase.mapreduce.mappersperregion";
 
+  /** The number of mappers that should be assigned per job. */
+  public static final String NUM_MAPPERS_PER_JOB = "hbase.mapreduce.num.mappers";
+
+
   /** The Algorithm used to split each region's keyspace. */
   public static final String SPLIT_ALGO = "hbase.mapreduce.tableinputformat.split.algo";
 
@@ -137,6 +141,10 @@ implements Configurable {
       setNumMapperPerRegion(Integer.parseInt(conf.get(MAPPERS_PER_REGION)));
     }
 
+    if (conf.get(NUM_MAPPERS_PER_JOB) != null) {
+      setNumMappersPerJob(Integer.parseInt(conf.get(NUM_MAPPERS_PER_JOB)));
+    }
+
     setSplitAlgorithm(conf.get(SPLIT_ALGO, UniformSplit.class.getSimpleName()));
   }
 

Modified: hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/mapreduce/TableInputFormatBase.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/mapreduce/TableInputFormatBase.java?rev=1539432&r1=1539431&r2=1539432&view=diff
==============================================================================
--- hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/mapreduce/TableInputFormatBase.java (original)
+++ hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/mapreduce/TableInputFormatBase.java Wed Nov  6 19:18:20 2013
@@ -21,10 +21,10 @@ package org.apache.hadoop.hbase.mapreduc
 
 import java.io.IOException;
 import java.net.InetAddress;
-import java.net.URLEncoder;
 import java.util.ArrayList;
 import java.util.List;
 import java.util.Map;
+import java.util.TreeMap;
 import java.util.concurrent.ConcurrentHashMap;
 
 import javax.naming.NamingException;
@@ -67,7 +67,11 @@ extends InputFormat<ImmutableBytesWritab
   /** The reader scanning the table, can be a custom one. */
   private TableRecordReader tableRecordReader = null;
   /** The number of mappers to assign to each region. */
-  private int numMappersPerRegion = 1;
+  private int numMappers = 1;
+  /** The total number of mappers. The number of mappers per region is
+   * mutually exclusive with number of mappers per job. In case both
+   * are defined, mappersPerJob will take the precedence.*/
+  private boolean mappersPerJob = false;
   /** Splitting algorithm to be used to split the keys */
   private String splitAlgmName; // default to Uniform
 
@@ -121,14 +125,15 @@ extends InputFormat<ImmutableBytesWritab
    */
   @Override
   public List<InputSplit> getSplits(JobContext context) throws IOException {
-    return getSplitsInternal(table, context.getConfiguration(), scan, numMappersPerRegion,
-        splitAlgmName, this);
+    return getSplitsInternal(table, context.getConfiguration(), scan, numMappers,
+      mappersPerJob, splitAlgmName, this);
   }
 
   public static List<InputSplit> getSplitsInternal(HTable table,
       Configuration conf,
       Scan scan,
-      int numMappersPerRegion,
+      int numMappers,
+      boolean mappersPerJob,
       String splitAlgmName,
       TableInputFormatBase tifb) throws IOException {
     if (table == null) {
@@ -136,6 +141,11 @@ extends InputFormat<ImmutableBytesWritab
     }
     determineNameServer(conf);
 
+    if (mappersPerJob) {
+      return getSplitsInternalBasedOnNumMappersPerJob(table, conf, scan, numMappers,
+        splitAlgmName);
+    }
+
     Pair<byte[][], byte[][]> keys = table.getStartEndKeys();
     if (keys == null || keys.getFirst() == null ||
         keys.getFirst().length == 0) {
@@ -144,15 +154,15 @@ extends InputFormat<ImmutableBytesWritab
     Pair<byte[][], byte[][]> splitKeys = null;
     int numRegions = keys.getFirst().length;
     //TODO: Can anything else be done when there are less than 3 regions?
-    if ((numMappersPerRegion == 1) || (numRegions < 3)) {
-      numMappersPerRegion = 1;
+    if ((numMappers == 1) || (numRegions < 3)) {
+      numMappers = 1;
       splitKeys = keys;
     } else {
-      byte[][] startKeys = new byte[numRegions * numMappersPerRegion][];
-      byte[][] stopKeys = new byte[numRegions * numMappersPerRegion][];
+      byte[][] startKeys = new byte[numRegions * numMappers][];
+      byte[][] stopKeys = new byte[numRegions * numMappers][];
       // Insert null keys at edges
       startKeys[0] = HConstants.EMPTY_START_ROW;
-      stopKeys[numRegions * numMappersPerRegion - 1] = HConstants.EMPTY_END_ROW;
+      stopKeys[numRegions * numMappers - 1] = HConstants.EMPTY_END_ROW;
 
       byte[][] originalStartKeys = keys.getFirst();
       byte[][] originalStopKeys = keys.getSecond();
@@ -166,14 +176,14 @@ extends InputFormat<ImmutableBytesWritab
           algmImpl.setFirstRow(algmImpl.rowToStr(originalStartKeys[i]));
         if (originalStopKeys[i].length != 0)
           algmImpl.setLastRow(algmImpl.rowToStr(originalStopKeys[i]));
-        byte[][] dividingKeys = algmImpl.split(numMappersPerRegion);
+        byte[][] dividingKeys = algmImpl.split(numMappers);
 
-        startKeys[i*numMappersPerRegion] = originalStartKeys[i];
-        for (int j = 0; j < numMappersPerRegion - 1; j++) {
-          stopKeys[i * numMappersPerRegion + j] = dividingKeys[j];
-          startKeys[i * numMappersPerRegion + j + 1] = dividingKeys[j];
+        startKeys[i*numMappers] = originalStartKeys[i];
+        for (int j = 0; j < numMappers - 1; j++) {
+          stopKeys[i * numMappers + j] = dividingKeys[j];
+          startKeys[i * numMappers + j + 1] = dividingKeys[j];
         }
-        stopKeys[(i+1)*numMappersPerRegion - 1] = originalStopKeys[i];
+        stopKeys[(i+1)*numMappers - 1] = originalStopKeys[i];
       }
       splitKeys = new Pair<byte[][], byte[][]>();
       splitKeys.setFirst(startKeys);
@@ -181,12 +191,12 @@ extends InputFormat<ImmutableBytesWritab
     }
 
     List<InputSplit> splits =
-        new ArrayList<InputSplit>(numRegions * numMappersPerRegion);
+        new ArrayList<InputSplit>(numRegions * numMappers);
     byte[] startRow = scan.getStartRow();
     byte[] stopRow = scan.getStopRow();
-    for (int i = 0; i < numRegions * numMappersPerRegion; i++) {
-      if (tifb != null && !tifb.includeRegionInSplit(keys.getFirst()[i / numMappersPerRegion],
-          keys.getSecond()[i / numMappersPerRegion])) {
+    for (int i = 0; i < numRegions * numMappers; i++) {
+      if (tifb != null && !tifb.includeRegionInSplit(keys.getFirst()[i / numMappers],
+          keys.getSecond()[i / numMappers])) {
         continue;
       }
       HServerAddress regionServerAddress = 
@@ -226,6 +236,78 @@ extends InputFormat<ImmutableBytesWritab
     return splits;
   }
 
+  public static List<InputSplit> getSplitsInternalBasedOnNumMappersPerJob(
+    HTable table, Configuration conf, Scan scan, int numMappersPerJob,
+    String splitAlgmName) throws IOException {
+    List<InputSplit> splits =
+      new ArrayList<InputSplit>(numMappersPerJob);
+    byte[] startRow = scan.getStartRow();
+    byte[] stopRow = scan.getStopRow();
+
+    SplitAlgorithm algmImpl = RegionSplitter.newSplitAlgoInstance(conf,
+      splitAlgmName);
+    if (startRow.length != 0)
+      algmImpl.setFirstRow(algmImpl.rowToStr(startRow));
+    if (stopRow.length != 0)
+      algmImpl.setLastRow(algmImpl.rowToStr(stopRow));
+
+    // This will return numMappersPerJob - 1 split interval
+    byte[][] dividingKeys = algmImpl.split(numMappersPerJob);
+
+    byte[] splitStartKey =  algmImpl.firstRow();
+    byte[] splitStopKey = algmImpl.lastRow();
+
+    TreeMap<byte[], byte[]> regions = table.getStartEndKeysMap();
+
+    // No splits possible, just default it to 1
+    if (dividingKeys == null || dividingKeys.length == 0) {
+      byte[] startRowOfFullRegion = getNearestFullRegion(regions, splitStartKey,
+        splitStopKey);
+      InputSplit split = new TableSplit(table.getTableName(),
+        splitStartKey, splitStopKey, getRegionServerAddress(table,
+          startRowOfFullRegion));
+      if (LOG.isDebugEnabled()) {
+        LOG.debug("getSplits: split -> " + split);
+      }
+      splits.add(split);
+    } else {
+      for (int i = 0; i <= dividingKeys.length; i++) {
+        if (i == dividingKeys.length) {
+          splitStopKey = algmImpl.lastRow();
+        } else {
+          splitStopKey = dividingKeys[i];
+        }
+        byte[] startRowOfFullRegion = getNearestFullRegion(regions, splitStartKey,
+          splitStopKey);
+        InputSplit split = new TableSplit(table.getTableName(),
+          splitStartKey, splitStopKey, getRegionServerAddress(table,
+            startRowOfFullRegion));
+        splits.add(split);
+        if (LOG.isDebugEnabled()) {
+          LOG.debug("getSplits: split -> " + i + " -> " + split);
+        }
+        splitStartKey = splitStopKey;
+      }
+    }
+    HConnectionManager.deleteAllZookeeperConnections();
+    return splits;
+  }
+
+  private static byte[] getNearestFullRegion(
+    final TreeMap<byte[], byte[]> regions,
+    final byte[] startKey, final byte[] stopKey) {
+
+    Map.Entry<byte[], byte[]> region = regions.ceilingEntry(startKey);
+    if (region != null) {
+      if (Bytes.compareTo(region.getValue(), stopKey) < 0) {
+        // The given range spans at least one complete region. Return
+        // the start row for this region
+        return region.getKey();
+      }
+    }
+    return startKey;
+  }
+
   private static synchronized void determineNameServer(Configuration conf) {
     // Get the name server address and the default value is null.
     if (nameServer == null) {
@@ -233,6 +315,24 @@ extends InputFormat<ImmutableBytesWritab
     }
   }
 
+  private static String getRegionServerAddress(final HTable table, final byte[] row)
+    throws IOException {
+    String regionLocation = null;
+    HServerAddress regionServerAddress =
+      table.getRegionLocation(row).getServerAddress();
+    InetAddress regionAddress = null;
+    try {
+      regionAddress =
+        regionServerAddress.getInetSocketAddress().getAddress();
+      regionLocation = reverseDNS(regionAddress);
+    } catch (NamingException e) {
+      LOG.error("Cannot resolve the host name for " + regionAddress +
+          " because of " + e);
+      regionLocation = regionServerAddress.getHostname();
+    }
+    return regionLocation;
+  }
+
   private static String reverseDNS(InetAddress ipAddress)
   throws NamingException {
     String hostName = reverseDNSCacheMap.get(ipAddress);
@@ -327,7 +427,30 @@ extends InputFormat<ImmutableBytesWritab
       throw new IllegalArgumentException("Expecting at least 1 mapper " +
           "per region; instead got: " + num);
     }
-    numMappersPerRegion = num;
+    if (!mappersPerJob) {
+      numMappers = num;
+    } else {
+      LOG.warn("Ingoring mappersPerRegion config value as mappersPerJob is" +
+        " already set.");
+    }
+  }
+
+  /**
+   * Sets the number of mappers assigned for the job.
+   *
+   * @param num
+   * @throws IllegalArgumentException When <code>num</code> <= 0.
+   */
+  public void setNumMappersPerJob(int num) throws IllegalArgumentException {
+    if (num <= 0) {
+      throw new IllegalArgumentException("Expecting at least 1 mapper " +
+        "per job; instead got: " + num);
+    }
+    if (numMappers > 1) {
+      LOG.warn("Overriding num of mappers per region.");
+    }
+    numMappers = num;
+    mappersPerJob = true;
   }
 
   public void setSplitAlgorithm(String name) {