You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hbase.apache.org by st...@apache.org on 2021/03/12 05:10:19 UTC

[hbase] branch branch-2.3 updated: HBASE-25566 RoundRobinTableInputFormat (#2947)

This is an automated email from the ASF dual-hosted git repository.

stack pushed a commit to branch branch-2.3
in repository https://gitbox.apache.org/repos/asf/hbase.git


The following commit(s) were added to refs/heads/branch-2.3 by this push:
     new 0d78689  HBASE-25566 RoundRobinTableInputFormat (#2947)
0d78689 is described below

commit 0d78689e1318eb4a104d6f3cfc8b4145b2e9151c
Author: sudhir-reddy <sudhir-reddy>
AuthorDate: Thu Mar 11 20:41:26 2021 -0800

    HBASE-25566 RoundRobinTableInputFormat (#2947)
    
    Co-authored-by: stack <st...@apache.org>
    Co-authored-by: sudhir-reddy <sudhir-reddy>
    Co-authored-by: Huaxiang Sun <hu...@apache.org>
---
 .../mapreduce/RoundRobinTableInputFormat.java      | 173 ++++++++++++++++++++
 .../hadoop/hbase/mapreduce/TableMapReduceUtil.java |  16 +-
 .../apache/hadoop/hbase/mapreduce/TableSplit.java  |  14 +-
 .../mapreduce/TestRoundRobinTableInputFormat.java  | 177 +++++++++++++++++++++
 4 files changed, 370 insertions(+), 10 deletions(-)

diff --git a/hbase-mapreduce/src/main/java/org/apache/hadoop/hbase/mapreduce/RoundRobinTableInputFormat.java b/hbase-mapreduce/src/main/java/org/apache/hadoop/hbase/mapreduce/RoundRobinTableInputFormat.java
new file mode 100644
index 0000000..2b15e00
--- /dev/null
+++ b/hbase-mapreduce/src/main/java/org/apache/hadoop/hbase/mapreduce/RoundRobinTableInputFormat.java
@@ -0,0 +1,173 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.mapreduce;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Iterator;
+import java.util.LinkedList;
+import java.util.Map;
+import java.util.HashMap;
+import java.util.List;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.HBaseConfiguration;
+import org.apache.hadoop.hbase.HConstants;
+import org.apache.hadoop.mapreduce.InputSplit;
+import org.apache.hadoop.mapreduce.JobContext;
+import org.apache.hadoop.mapreduce.JobID;
+import org.apache.hadoop.mapreduce.task.JobContextImpl;
+import org.apache.yetus.audience.InterfaceAudience;
+
+/**
+ * Process the return from super-class {@link TableInputFormat} (TIF) so as to undo any clumping of
+ * {@link InputSplit}s around RegionServers. Spread splits broadly to distribute read-load over
+ * RegionServers in the cluster. The super-class TIF returns splits in hbase:meta table order.
+ * Adjacent or near-adjacent hbase:meta Regions can be hosted on the same RegionServer -- nothing
+ * prevents this. This hbase:maeta ordering of InputSplit placement can be lumpy making it so some
+ * RegionServers end up hosting lots of InputSplit scans while contemporaneously other RegionServers
+ * host few or none. This class does a pass over the return from the super-class to better spread
+ * the load. See the below helpful Flipkart blog post for a description and from where the base of
+ * this code comes from (with permission).
+ * @see https://tech.flipkart.com/is-data-locality-always-out-of-the-box-in-hadoop-not-really-2ae9c95163cb
+ */
+@InterfaceAudience.Public
+public class RoundRobinTableInputFormat extends TableInputFormat {
+  private Boolean hbaseRegionsizecalculatorEnableOriginalValue = null;
+  /**
+   * Boolean config for whether superclass should produce InputSplits with 'lengths'. If true, TIF
+   * will query every RegionServer to get the 'size' of all involved Regions and this 'size' will
+   * be used the the InputSplit length. If false, we skip this query and the super-classes
+   * returned InputSplits will have lenghths of zero. This override will set the flag to false.
+   * All returned lengths will be zero. Makes it so sorting on 'length' becomes a noop. The sort
+   * returned by this override will prevail. Thats what we want.
+   */
+  static String HBASE_REGIONSIZECALCULATOR_ENABLE = "hbase.regionsizecalculator.enable";
+
+  @Override
+  public List<InputSplit> getSplits(JobContext context) throws IOException {
+    try {
+      // Do a round robin on what we get back from the super-class.
+      configure();
+      return roundRobin(getSuperSplits(context));
+    } finally {
+      unconfigure();
+    }
+  }
+
+  /**
+   * Call super-classes' getSplits. Have it out here as its own method so can be overridden.
+   */
+  List<InputSplit> getSuperSplits(JobContext context) throws IOException {
+    return super.getSplits(context);
+  }
+
+  /**
+   * Spread the splits list so as to avoid clumping on RegionServers. Order splits so every server
+   * gets one split before a server gets a second, and so on; i.e. round-robin the splits amongst
+   * the servers in the cluster.
+   */
+  List<InputSplit> roundRobin(List<InputSplit> inputs) throws IOException {
+    if ((inputs == null) || inputs.isEmpty()) {
+      return inputs;
+    }
+    List<InputSplit> result = new ArrayList<>(inputs.size());
+    // Prepare a hashmap with each region server as key and list of Input Splits as value
+    Map<String, List<InputSplit>> regionServerSplits = new HashMap<>();
+    for (InputSplit is: inputs) {
+      if (is instanceof TableSplit) {
+        String regionServer = ((TableSplit)is).getRegionLocation();
+        if (regionServer != null && !regionServer.isEmpty()) {
+          regionServerSplits.computeIfAbsent(regionServer, k -> new LinkedList<>()).add(is);
+          continue;
+        }
+      }
+      // If TableSplit or region server not found, add it anyways.
+      result.add(is);
+    }
+    // Write out splits in a manner that spreads splits for a RegionServer to avoid 'clumping'.
+    while (!regionServerSplits.isEmpty()) {
+      Iterator<String> iterator = regionServerSplits.keySet().iterator();
+      while (iterator.hasNext()) {
+        String regionServer = iterator.next();
+        List<InputSplit> inputSplitListForRegion = regionServerSplits.get(regionServer);
+        if (!inputSplitListForRegion.isEmpty()) {
+          result.add(inputSplitListForRegion.remove(0));
+        }
+        if (inputSplitListForRegion.isEmpty()) {
+          iterator.remove();
+        }
+      }
+    }
+    return result;
+  }
+
+  /**
+   * Adds a configuration to the Context disabling remote rpc'ing to figure Region size
+   * when calculating InputSplits. See up in super-class TIF where we rpc to every server to find
+   * the size of all involved Regions. Here we disable this super-class action. This means
+   * InputSplits will have a length of zero. If all InputSplits have zero-length InputSplits, the
+   * ordering done in here will 'pass-through' Hadoop's length-first sort. The superclass TIF will
+   * ask every node for the current size of each of the participating Table Regions. It does this
+   * because it wants to schedule the biggest Regions first (This fixation comes of hadoop itself
+   * -- see JobSubmitter where it sorts inputs by size). This extra diligence takes time and is of
+   * no utility in this RRTIF where spread is of more import than size-first. Also, if a rolling
+   * restart is happening when we go to launch the job, the job launch may fail because the request
+   * for Region size fails -- even after retries -- because rolled RegionServer may take a while to
+   * come online: e.g. it takes java 90 seconds to allocate a 160G. RegionServer is offline during
+   * this time. The job launch will fail with 'Connection rejected'. So, we set
+   * 'hbase.regionsizecalculator.enable' to false here in RRTIF.
+   * @see #unconfigure()
+   */
+  void configure() {
+    if (getConf().get(HBASE_REGIONSIZECALCULATOR_ENABLE) != null) {
+      this.hbaseRegionsizecalculatorEnableOriginalValue = getConf().
+        getBoolean(HBASE_REGIONSIZECALCULATOR_ENABLE, true);
+    }
+    getConf().setBoolean(HBASE_REGIONSIZECALCULATOR_ENABLE, false);
+  }
+
+  /**
+   * @see #configure()
+   */
+  void unconfigure() {
+    if (this.hbaseRegionsizecalculatorEnableOriginalValue == null) {
+      getConf().unset(HBASE_REGIONSIZECALCULATOR_ENABLE);
+    } else {
+      getConf().setBoolean(HBASE_REGIONSIZECALCULATOR_ENABLE,
+        this.hbaseRegionsizecalculatorEnableOriginalValue);
+    }
+  }
+
+  /**
+   * Pass table name as argument. Set the zk ensemble to use with the System property
+   * 'hbase.zookeeper.quorum'
+   */
+  public static void main(String[] args) throws IOException {
+    TableInputFormat tif = new RoundRobinTableInputFormat();
+    final Configuration configuration = HBaseConfiguration.create();
+    configuration.setBoolean("hbase.regionsizecalculator.enable", false);
+    configuration.set(HConstants.ZOOKEEPER_QUORUM,
+      System.getProperty(HConstants.ZOOKEEPER_QUORUM, "localhost"));
+    configuration.set(TableInputFormat.INPUT_TABLE, args[0]);
+    tif.setConf(configuration);
+    List<InputSplit> splits = tif.getSplits(new JobContextImpl(configuration, new JobID()));
+    for (InputSplit split: splits) {
+      System.out.println(split);
+    }
+  }
+}
diff --git a/hbase-mapreduce/src/main/java/org/apache/hadoop/hbase/mapreduce/TableMapReduceUtil.java b/hbase-mapreduce/src/main/java/org/apache/hadoop/hbase/mapreduce/TableMapReduceUtil.java
index e38ee80..3326f6d 100644
--- a/hbase-mapreduce/src/main/java/org/apache/hadoop/hbase/mapreduce/TableMapReduceUtil.java
+++ b/hbase-mapreduce/src/main/java/org/apache/hadoop/hbase/mapreduce/TableMapReduceUtil.java
@@ -71,6 +71,7 @@ import com.codahale.metrics.MetricRegistry;
 @InterfaceAudience.Public
 public class TableMapReduceUtil {
   private static final Logger LOG = LoggerFactory.getLogger(TableMapReduceUtil.class);
+  public static final String TABLE_INPUT_CLASS_KEY = "hbase.table.input.class";
 
   /**
    * Use this before submitting a TableMap job. It will appropriately set up
@@ -264,8 +265,17 @@ public class TableMapReduceUtil {
       Class<?> outputValueClass, Job job,
       boolean addDependencyJars)
   throws IOException {
-      initTableMapperJob(Bytes.toString(table), scan, mapper, outputKeyClass,
-              outputValueClass, job, addDependencyJars, TableInputFormat.class);
+      initTableMapperJob(Bytes.toString(table), scan, mapper, outputKeyClass, outputValueClass, job,
+        addDependencyJars, getConfiguredInputFormat(job));
+  }
+
+  /**
+   * @return {@link TableInputFormat} .class unless Configuration has something else at
+   *   {@link #TABLE_INPUT_CLASS_KEY}.
+   */
+  private static Class<? extends InputFormat> getConfiguredInputFormat(Job job) {
+    return (Class<? extends InputFormat>)job.getConfiguration().
+      getClass(TABLE_INPUT_CLASS_KEY, TableInputFormat.class);
   }
 
   /**
@@ -290,7 +300,7 @@ public class TableMapReduceUtil {
       boolean addDependencyJars)
   throws IOException {
       initTableMapperJob(table, scan, mapper, outputKeyClass,
-              outputValueClass, job, addDependencyJars, TableInputFormat.class);
+              outputValueClass, job, addDependencyJars, getConfiguredInputFormat(job));
   }
 
   /**
diff --git a/hbase-mapreduce/src/main/java/org/apache/hadoop/hbase/mapreduce/TableSplit.java b/hbase-mapreduce/src/main/java/org/apache/hadoop/hbase/mapreduce/TableSplit.java
index acce55e..93300eb 100644
--- a/hbase-mapreduce/src/main/java/org/apache/hadoop/hbase/mapreduce/TableSplit.java
+++ b/hbase-mapreduce/src/main/java/org/apache/hadoop/hbase/mapreduce/TableSplit.java
@@ -352,8 +352,8 @@ public class TableSplit extends InputSplit
   @Override
   public String toString() {
     StringBuilder sb = new StringBuilder();
-    sb.append("HBase table split(");
-    sb.append("table name: ").append(tableName);
+    sb.append("Split(");
+    sb.append("tablename=").append(tableName);
     // null scan input is represented by ""
     String printScan = "";
     if (!scan.equals("")) {
@@ -364,12 +364,12 @@ public class TableSplit extends InputSplit
       catch (IOException e) {
         printScan = "";
       }
+      sb.append(", scan=").append(printScan);
     }
-    sb.append(", scan: ").append(printScan);
-    sb.append(", start row: ").append(Bytes.toStringBinary(startRow));
-    sb.append(", end row: ").append(Bytes.toStringBinary(endRow));
-    sb.append(", region location: ").append(regionLocation);
-    sb.append(", encoded region name: ").append(encodedRegionName);
+    sb.append(", startrow=").append(Bytes.toStringBinary(startRow));
+    sb.append(", endrow=").append(Bytes.toStringBinary(endRow));
+    sb.append(", regionLocation=").append(regionLocation);
+    sb.append(", regionname=").append(encodedRegionName);
     sb.append(")");
     return sb.toString();
   }
diff --git a/hbase-mapreduce/src/test/java/org/apache/hadoop/hbase/mapreduce/TestRoundRobinTableInputFormat.java b/hbase-mapreduce/src/test/java/org/apache/hadoop/hbase/mapreduce/TestRoundRobinTableInputFormat.java
new file mode 100644
index 0000000..c3abf4d
--- /dev/null
+++ b/hbase-mapreduce/src/test/java/org/apache/hadoop/hbase/mapreduce/TestRoundRobinTableInputFormat.java
@@ -0,0 +1,177 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.mapreduce;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNull;
+import static org.junit.Assert.assertTrue;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.Comparator;
+import java.util.List;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.HBaseClassTestRule;
+import org.apache.hadoop.hbase.HBaseConfiguration;
+import org.apache.hadoop.hbase.TableName;
+import org.apache.hadoop.hbase.client.Scan;
+import org.apache.hadoop.hbase.testclassification.SmallTests;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.hadoop.mapreduce.InputSplit;
+import org.apache.hadoop.mapreduce.JobContext;
+import org.junit.ClassRule;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+import org.mockito.Mockito;
+
+/**
+ * Basic test of {@link RoundRobinTableInputFormat}; i.e. RRTIF.
+ */
+@Category({SmallTests.class})
+public class TestRoundRobinTableInputFormat {
+  @ClassRule
+  public static final HBaseClassTestRule CLASS_RULE =
+      HBaseClassTestRule.forClass(TestRoundRobinTableInputFormat.class);
+
+  private static final int SERVERS_COUNT = 5;
+  private static final String[] KEYS = {
+    "aa", "ab", "ac", "ad", "ae",
+    "ba", "bb", "bc", "bd", "be",
+    "ca", "cb", "cc", "cd", "ce",
+    "da", "db", "dc", "dd", "de",
+    "ea", "eb", "ec", "ed", "ee",
+    "fa", "fb", "fc", "fd", "fe",
+    "ga", "gb", "gc", "gd", "ge",
+    "ha", "hb", "hc", "hd", "he",
+    "ia", "ib", "ic", "id", "ie",
+    "ja", "jb", "jc", "jd", "je", "jf"
+  };
+
+  /**
+   * Test default behavior.
+   */
+  @Test
+  public void testRoundRobinSplit() throws IOException, InterruptedException {
+    final List<InputSplit> splits = createSplits();
+    Collections.shuffle(splits);
+    List<InputSplit> sortedSplits = new RoundRobinTableInputFormat().roundRobin(splits);
+    testDistribution(sortedSplits);
+    // Now test that order is preserved even after being passed through the SplitComparator
+    // that sorts InputSplit by length as is done up in Hadoop in JobSubmitter.
+    List<InputSplit> copy = new ArrayList<>(sortedSplits);
+    Arrays.sort(copy.toArray(new InputSplit[0]), new SplitComparator());
+    // Assert the sort is retained even after passing through SplitComparator.
+    for (int i = 0; i < sortedSplits.size(); i++) {
+      TableSplit sortedTs = (TableSplit)sortedSplits.get(i);
+      TableSplit copyTs = (TableSplit)copy.get(i);
+      assertEquals(sortedTs.getEncodedRegionName(), copyTs.getEncodedRegionName());
+    }
+  }
+
+  /**
+   * @return Splits made out of {@link #KEYS}. Splits are for five Servers. Length is ZERO!
+   */
+  private List<InputSplit> createSplits() {
+    List<InputSplit> splits = new ArrayList<>(KEYS.length - 1);
+    for (int i = 0; i < KEYS.length - 1; i++) {
+      InputSplit split = new TableSplit(TableName.valueOf("test"), new Scan(),
+        Bytes.toBytes(KEYS[i]), Bytes.toBytes(KEYS[i + 1]), String.valueOf(i % SERVERS_COUNT + 1),
+        "", 0);
+      splits.add(split);
+    }
+    return splits;
+  }
+
+  private void testDistribution(List<InputSplit> list) throws IOException, InterruptedException {
+    for (int i = 0; i < KEYS.length/SERVERS_COUNT; i++) {
+      int [] counts = new int[SERVERS_COUNT];
+      for (int j = i * SERVERS_COUNT; j < i * SERVERS_COUNT + SERVERS_COUNT; j++) {
+        counts[Integer.parseInt(list.get(j).getLocations()[0]) - 1]++;
+      }
+      for (int value : counts) {
+        assertEquals(value, 1);
+      }
+    }
+  }
+
+  /**
+   * Private comparator copied from private JobSubmmiter Hadoop class...
+   * hadoop/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/JobSubmitter.java
+   * Used so we can run the sort done up in JobSubmitter here in tests.
+   */
+  private static class SplitComparator implements Comparator<InputSplit> {
+    @Override
+    public int compare(InputSplit o1, InputSplit o2) {
+      try {
+        return Long.compare(o1.getLength(), o2.getLength());
+      } catch (IOException|InterruptedException e) {
+        throw new RuntimeException("exception in compare", e);
+      }
+    }
+  }
+
+  /**
+   * Assert that lengths are descending. RRTIF writes lengths in descending order so any
+   * subsequent sort using dump SplitComparator as is done in JobSubmitter up in Hadoop keeps
+   * our RRTIF ordering.
+   */
+  private void assertLengthDescending(List<InputSplit> list)
+    throws IOException, InterruptedException {
+    long previousLength = Long.MAX_VALUE;
+    for (InputSplit is: list) {
+      long length = is.getLength();
+      assertTrue(previousLength + " " + length, previousLength > length);
+      previousLength = length;
+    }
+  }
+
+  /**
+   * Test that configure/unconfigure set and properly undo the HBASE_REGIONSIZECALCULATOR_ENABLE
+   * configuration.
+   */
+  @Test
+  public void testConfigureUnconfigure() {
+    Configuration configuration = HBaseConfiguration.create();
+    RoundRobinTableInputFormat rrtif = new RoundRobinTableInputFormat();
+    rrtif.setConf(configuration);
+    JobContext jobContext = Mockito.mock(JobContext.class);
+    Mockito.when(jobContext.getConfiguration()).thenReturn(configuration);
+    // Assert when done, HBASE_REGIONSIZECALCULATOR_ENABLE is still unset.
+    configuration.unset(RoundRobinTableInputFormat.HBASE_REGIONSIZECALCULATOR_ENABLE);
+    rrtif.configure();
+    rrtif.unconfigure();
+    String value = configuration.get(RoundRobinTableInputFormat.HBASE_REGIONSIZECALCULATOR_ENABLE);
+    assertNull(value);
+    // Assert HBASE_REGIONSIZECALCULATOR_ENABLE is still false when done.
+    checkRetainsBooleanValue(jobContext, rrtif, false);
+    // Assert HBASE_REGIONSIZECALCULATOR_ENABLE is still true when done.
+    checkRetainsBooleanValue(jobContext, rrtif, true);
+  }
+
+  private void checkRetainsBooleanValue(JobContext jobContext, RoundRobinTableInputFormat rrtif,
+      final boolean b) {
+    jobContext.getConfiguration().
+      setBoolean(RoundRobinTableInputFormat.HBASE_REGIONSIZECALCULATOR_ENABLE, b);
+    rrtif.configure();
+    rrtif.unconfigure();
+    String value = jobContext.getConfiguration().
+      get(RoundRobinTableInputFormat.HBASE_REGIONSIZECALCULATOR_ENABLE);
+    assertEquals(b, Boolean.valueOf(value));
+  }
+}