You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hbase.apache.org by st...@apache.org on 2017/11/28 22:50:56 UTC
hbase git commit: HBASE-18090 Improve TableSnapshotInputFormat to
allow more multiple mappers per region
Repository: hbase
Updated Branches:
refs/heads/branch-1.3 dca65353d -> b1912790f
HBASE-18090 Improve TableSnapshotInputFormat to allow more multiple mappers per region
Signed-off-by: Michael Stack <st...@apache.org>
Project: http://git-wip-us.apache.org/repos/asf/hbase/repo
Commit: http://git-wip-us.apache.org/repos/asf/hbase/commit/b1912790
Tree: http://git-wip-us.apache.org/repos/asf/hbase/tree/b1912790
Diff: http://git-wip-us.apache.org/repos/asf/hbase/diff/b1912790
Branch: refs/heads/branch-1.3
Commit: b1912790f9c057e51fd7e6dde924bfd0f6e784cc
Parents: dca6535
Author: libisthanks <li...@gmail.com>
Authored: Thu Nov 9 10:53:22 2017 +0800
Committer: Michael Stack <st...@apache.org>
Committed: Tue Nov 28 14:50:40 2017 -0800
----------------------------------------------------------------------
...IntegrationTestTableSnapshotInputFormat.java | 4 +-
.../hbase/client/ClientSideRegionScanner.java | 2 +
.../hadoop/hbase/mapred/TableMapReduceUtil.java | 38 +++++++
.../hbase/mapred/TableSnapshotInputFormat.java | 18 +++
.../hbase/mapreduce/TableMapReduceUtil.java | 40 +++++++
.../mapreduce/TableSnapshotInputFormat.java | 24 +++-
.../mapreduce/TableSnapshotInputFormatImpl.java | 112 +++++++++++++++++--
.../hadoop/hbase/util/RegionSplitter.java | 70 ++++++++++++
.../mapred/TestTableSnapshotInputFormat.java | 41 ++++---
.../TableSnapshotInputFormatTestBase.java | 23 ++--
.../mapreduce/TestTableSnapshotInputFormat.java | 35 ++++--
.../hadoop/hbase/util/TestRegionSplitter.java | 20 ++++
12 files changed, 380 insertions(+), 47 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/hbase/blob/b1912790/hbase-it/src/test/java/org/apache/hadoop/hbase/mapreduce/IntegrationTestTableSnapshotInputFormat.java
----------------------------------------------------------------------
diff --git a/hbase-it/src/test/java/org/apache/hadoop/hbase/mapreduce/IntegrationTestTableSnapshotInputFormat.java b/hbase-it/src/test/java/org/apache/hadoop/hbase/mapreduce/IntegrationTestTableSnapshotInputFormat.java
index 1a152e8..2df1c4b 100644
--- a/hbase-it/src/test/java/org/apache/hadoop/hbase/mapreduce/IntegrationTestTableSnapshotInputFormat.java
+++ b/hbase-it/src/test/java/org/apache/hadoop/hbase/mapreduce/IntegrationTestTableSnapshotInputFormat.java
@@ -151,7 +151,7 @@ public class IntegrationTestTableSnapshotInputFormat extends IntegrationTestBase
int expectedNumSplits = numRegions > 2 ? numRegions - 2 : numRegions;
org.apache.hadoop.hbase.mapreduce.TestTableSnapshotInputFormat.doTestWithMapReduce(util,
- tableName, snapshotName, START_ROW, END_ROW, tableDir, numRegions,
+ tableName, snapshotName, START_ROW, END_ROW, tableDir, numRegions, 1,
expectedNumSplits, false);
} else if (mr.equalsIgnoreCase(MAPRED_IMPLEMENTATION)) {
/*
@@ -165,7 +165,7 @@ public class IntegrationTestTableSnapshotInputFormat extends IntegrationTestBase
int expectedNumSplits = numRegions;
org.apache.hadoop.hbase.mapred.TestTableSnapshotInputFormat.doTestWithMapReduce(util,
- tableName, snapshotName, MAPRED_START_ROW, MAPRED_END_ROW, tableDir, numRegions,
+ tableName, snapshotName, MAPRED_START_ROW, MAPRED_END_ROW, tableDir, numRegions, 1,
expectedNumSplits, false);
} else {
throw new IllegalArgumentException("Unrecognized mapreduce implementation: " + mr +".");
http://git-wip-us.apache.org/repos/asf/hbase/blob/b1912790/hbase-server/src/main/java/org/apache/hadoop/hbase/client/ClientSideRegionScanner.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/client/ClientSideRegionScanner.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/client/ClientSideRegionScanner.java
index dde2f10..ef89c32 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/client/ClientSideRegionScanner.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/client/ClientSideRegionScanner.java
@@ -56,6 +56,8 @@ public class ClientSideRegionScanner extends AbstractClientScanner {
// region is immutable, set isolation level
scan.setIsolationLevel(IsolationLevel.READ_UNCOMMITTED);
+ htd.setReadOnly(true);
+
// open region from the snapshot directory
this.region = HRegion.openHRegion(conf, fs, rootDir, hri, htd, null, null, null);
http://git-wip-us.apache.org/repos/asf/hbase/blob/b1912790/hbase-server/src/main/java/org/apache/hadoop/hbase/mapred/TableMapReduceUtil.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/mapred/TableMapReduceUtil.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/mapred/TableMapReduceUtil.java
index d5f225f..476c1a7 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/mapred/TableMapReduceUtil.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/mapred/TableMapReduceUtil.java
@@ -39,6 +39,7 @@ import org.apache.hadoop.hbase.mapreduce.ResultSerialization;
import org.apache.hadoop.hbase.security.User;
import org.apache.hadoop.hbase.security.UserProvider;
import org.apache.hadoop.hbase.security.token.TokenUtil;
+import org.apache.hadoop.hbase.util.RegionSplitter;
import org.apache.hadoop.hbase.zookeeper.ZKClusterId;
import org.apache.hadoop.hbase.zookeeper.ZooKeeperWatcher;
import org.apache.hadoop.io.Text;
@@ -194,6 +195,43 @@ public class TableMapReduceUtil {
}
/**
+ * Sets up the job for reading from a table snapshot. It bypasses hbase servers
+ * and read directly from snapshot files.
+ *
+ * @param snapshotName The name of the snapshot (of a table) to read from.
+ * @param columns The columns to scan.
+ * @param mapper The mapper class to use.
+ * @param outputKeyClass The class of the output key.
+ * @param outputValueClass The class of the output value.
+ * @param jobConf The current job to adjust. Make sure the passed job is
+ * carrying all necessary HBase configuration.
+ * @param addDependencyJars upload HBase jars and jars for any of the configured
+ * job classes via the distributed cache (tmpjars).
+ * @param tmpRestoreDir a temporary directory to copy the snapshot files into. Current user should
+ * have write permissions to this directory, and this should not be a subdirectory of rootdir.
+ * After the job is finished, restore directory can be deleted.
+ * @param splitAlgo algorithm to split
+ * @param numSplitsPerRegion how many input splits to generate per one region
+ * @throws IOException When setting up the details fails.
+ * @see TableSnapshotInputFormat
+ */
+ public static void initTableSnapshotMapJob(String snapshotName, String columns,
+ Class<? extends TableMap> mapper,
+ Class<?> outputKeyClass,
+ Class<?> outputValueClass, JobConf jobConf,
+ boolean addDependencyJars, Path tmpRestoreDir,
+ RegionSplitter.SplitAlgorithm splitAlgo,
+ int numSplitsPerRegion)
+ throws IOException {
+ TableSnapshotInputFormat.setInput(jobConf, snapshotName, tmpRestoreDir, splitAlgo,
+ numSplitsPerRegion);
+ initTableMapJob(snapshotName, columns, mapper, outputKeyClass, outputValueClass, jobConf,
+ addDependencyJars, TableSnapshotInputFormat.class);
+ org.apache.hadoop.hbase.mapreduce.TableMapReduceUtil.resetCacheConfig(jobConf);
+ }
+
+
+ /**
* Use this before submitting a TableReduce job. It will
* appropriately set up the JobConf.
*
http://git-wip-us.apache.org/repos/asf/hbase/blob/b1912790/hbase-server/src/main/java/org/apache/hadoop/hbase/mapred/TableSnapshotInputFormat.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/mapred/TableSnapshotInputFormat.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/mapred/TableSnapshotInputFormat.java
index a5c62b2..b9e0a03 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/mapred/TableSnapshotInputFormat.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/mapred/TableSnapshotInputFormat.java
@@ -27,11 +27,13 @@ import org.apache.hadoop.hbase.client.Result;
import org.apache.hadoop.hbase.client.Scan;
import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
import org.apache.hadoop.hbase.mapreduce.TableSnapshotInputFormatImpl;
+import org.apache.hadoop.hbase.util.RegionSplitter;
import org.apache.hadoop.mapred.InputFormat;
import org.apache.hadoop.mapred.InputSplit;
import org.apache.hadoop.mapred.JobConf;
import org.apache.hadoop.mapred.RecordReader;
import org.apache.hadoop.mapred.Reporter;
+import org.apache.hadoop.mapreduce.Job;
import java.io.DataInput;
import java.io.DataOutput;
@@ -165,4 +167,20 @@ public class TableSnapshotInputFormat implements InputFormat<ImmutableBytesWrita
throws IOException {
TableSnapshotInputFormatImpl.setInput(job, snapshotName, restoreDir);
}
+
+ /**
+ * Configures the job to use TableSnapshotInputFormat to read from a snapshot.
+ * @param job the job to configure
+ * @param snapshotName the name of the snapshot to read from
+ * @param restoreDir a temporary directory to restore the snapshot into. Current user should
+ * have write permissions to this directory, and this should not be a subdirectory of rootdir.
+ * After the job is finished, restoreDir can be deleted.
+ * @param splitAlgo split algorithm to generate splits from region
+ * @param numSplitsPerRegion how many input splits to generate per one region
+ * @throws IOException if an error occurs
+ */
+ public static void setInput(JobConf job, String snapshotName, Path restoreDir,
+ RegionSplitter.SplitAlgorithm splitAlgo, int numSplitsPerRegion) throws IOException {
+ TableSnapshotInputFormatImpl.setInput(job, snapshotName, restoreDir, splitAlgo, numSplitsPerRegion);
+ }
}
http://git-wip-us.apache.org/repos/asf/hbase/blob/b1912790/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/TableMapReduceUtil.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/TableMapReduceUtil.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/TableMapReduceUtil.java
index 9ba1f07..8533ed0 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/TableMapReduceUtil.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/TableMapReduceUtil.java
@@ -55,6 +55,7 @@ import org.apache.hadoop.io.Writable;
import org.apache.hadoop.mapreduce.InputFormat;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.util.StringUtils;
+import org.apache.hadoop.hbase.util.RegionSplitter;
/**
* Utility for {@link TableMapper} and {@link TableReducer}
@@ -369,6 +370,45 @@ public class TableMapReduceUtil {
}
/**
+ * Sets up the job for reading from a table snapshot. It bypasses hbase servers
+ * and read directly from snapshot files.
+ *
+ * @param snapshotName The name of the snapshot (of a table) to read from.
+ * @param scan The scan instance with the columns, time range etc.
+ * @param mapper The mapper class to use.
+ * @param outputKeyClass The class of the output key.
+ * @param outputValueClass The class of the output value.
+ * @param job The current job to adjust. Make sure the passed job is
+ * carrying all necessary HBase configuration.
+ * @param addDependencyJars upload HBase jars and jars for any of the configured
+ * job classes via the distributed cache (tmpjars).
+ *
+ * @param tmpRestoreDir a temporary directory to copy the snapshot files into. Current user should
+ * have write permissions to this directory, and this should not be a subdirectory of rootdir.
+ * After the job is finished, restore directory can be deleted.
+ * @param splitAlgo algorithm to split
+ * @param numSplitsPerRegion how many input splits to generate per one region
+ * @throws IOException When setting up the details fails.
+ * @see TableSnapshotInputFormat
+ */
+ public static void initTableSnapshotMapperJob(String snapshotName, Scan scan,
+ Class<? extends TableMapper> mapper,
+ Class<?> outputKeyClass,
+ Class<?> outputValueClass, Job job,
+ boolean addDependencyJars, Path tmpRestoreDir,
+ RegionSplitter.SplitAlgorithm splitAlgo,
+ int numSplitsPerRegion)
+ throws IOException {
+ TableSnapshotInputFormat.setInput(job, snapshotName, tmpRestoreDir, splitAlgo,
+ numSplitsPerRegion);
+ initTableMapperJob(snapshotName, scan, mapper, outputKeyClass,
+ outputValueClass, job, addDependencyJars, false, TableSnapshotInputFormat.class);
+ resetCacheConfig(job.getConfiguration());
+ }
+
+
+
+ /**
* Use this before submitting a Multi TableMap job. It will appropriately set
* up the job.
*
http://git-wip-us.apache.org/repos/asf/hbase/blob/b1912790/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/TableSnapshotInputFormat.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/TableSnapshotInputFormat.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/TableSnapshotInputFormat.java
index c40396f..dce311d 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/TableSnapshotInputFormat.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/TableSnapshotInputFormat.java
@@ -28,6 +28,7 @@ import org.apache.hadoop.hbase.client.Result;
import org.apache.hadoop.hbase.client.Scan;
import org.apache.hadoop.hbase.client.metrics.ScanMetrics;
import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
+import org.apache.hadoop.hbase.util.RegionSplitter;
import org.apache.hadoop.io.Writable;
import org.apache.hadoop.mapreduce.InputFormat;
import org.apache.hadoop.mapreduce.InputSplit;
@@ -66,8 +67,10 @@ import java.util.List;
* }
* </pre>
* <p>
- * Internally, this input format restores the snapshot into the given tmp directory. Similar to
- * {@link TableInputFormat} an InputSplit is created per region. The region is opened for reading
+ * Internally, this input format restores the snapshot into the given tmp directory. By default,
+ * and similar to {@link TableInputFormat} an InputSplit is created per region, but optionally you
+ * can run N mapper tasks per every region, in which case the region key range will be split to
+ * N sub-ranges and an InputSplit will be created per sub-range. The region is opened for reading
* from each RecordReader. An internal RegionScanner is used to execute the
* {@link org.apache.hadoop.hbase.CellScanner} obtained from the user.
* <p>
@@ -204,4 +207,21 @@ public class TableSnapshotInputFormat extends InputFormat<ImmutableBytesWritable
throws IOException {
TableSnapshotInputFormatImpl.setInput(job.getConfiguration(), snapshotName, restoreDir);
}
+
+ /**
+ * Configures the job to use TableSnapshotInputFormat to read from a snapshot.
+ * @param job the job to configure
+ * @param snapshotName the name of the snapshot to read from
+ * @param restoreDir a temporary directory to restore the snapshot into. Current user should
+ * have write permissions to this directory, and this should not be a subdirectory of rootdir.
+ * After the job is finished, restoreDir can be deleted.
+ * @param splitAlgo split algorithm to generate splits from region
+ * @param numSplitsPerRegion how many input splits to generate per one region
+ * @throws IOException if an error occurs
+ */
+ public static void setInput(Job job, String snapshotName, Path restoreDir,
+ RegionSplitter.SplitAlgorithm splitAlgo, int numSplitsPerRegion) throws IOException {
+ TableSnapshotInputFormatImpl.setInput(job.getConfiguration(), snapshotName, restoreDir,
+ splitAlgo, numSplitsPerRegion);
+ }
}
http://git-wip-us.apache.org/repos/asf/hbase/blob/b1912790/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/TableSnapshotInputFormatImpl.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/TableSnapshotInputFormatImpl.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/TableSnapshotInputFormatImpl.java
index c182556..4da20d4 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/TableSnapshotInputFormatImpl.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/TableSnapshotInputFormatImpl.java
@@ -45,6 +45,7 @@ import org.apache.hadoop.hbase.snapshot.SnapshotDescriptionUtils;
import org.apache.hadoop.hbase.snapshot.SnapshotManifest;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hbase.util.FSUtils;
+import org.apache.hadoop.hbase.util.RegionSplitter;
import org.apache.hadoop.io.Writable;
import java.io.ByteArrayOutputStream;
@@ -76,6 +77,17 @@ public class TableSnapshotInputFormatImpl {
private static final float DEFAULT_LOCALITY_CUTOFF_MULTIPLIER = 0.8f;
/**
+ * For MapReduce jobs running multiple mappers per region, determines
+ * what split algorithm we should be using to find split points for scanners.
+ */
+ public static final String SPLIT_ALGO = "hbase.mapreduce.split.algorithm";
+ /**
+ * For MapReduce jobs running multiple mappers per region, determines
+ * number of splits to generate per region.
+ */
+ public static final String NUM_SPLITS_PER_REGION = "hbase.mapreduce.splits.per.region";
+
+ /**
* Implementation class for InputSplit logic common between mapred and mapreduce.
*/
public static class InputSplit implements Writable {
@@ -262,7 +274,30 @@ public class TableSnapshotInputFormatImpl {
// the temp dir where the snapshot is restored
Path restoreDir = new Path(conf.get(RESTORE_DIR_KEY));
- return getSplits(scan, manifest, regionInfos, restoreDir, conf);
+ RegionSplitter.SplitAlgorithm splitAlgo = getSplitAlgo(conf);
+
+ int numSplits = conf.getInt(NUM_SPLITS_PER_REGION, 1);
+
+ return getSplits(scan, manifest, regionInfos, restoreDir, conf, splitAlgo, numSplits);
+ }
+
+ public static RegionSplitter.SplitAlgorithm getSplitAlgo(Configuration conf) throws IOException{
+ String splitAlgoClassName = conf.get(SPLIT_ALGO);
+ if (splitAlgoClassName == null)
+ return null;
+ try {
+ return ((Class<? extends RegionSplitter.SplitAlgorithm>)
+ Class.forName(splitAlgoClassName)).newInstance();
+ } catch (ClassNotFoundException e) {
+ throw new IOException("SplitAlgo class " + splitAlgoClassName +
+ " is not found", e);
+ } catch (InstantiationException e) {
+ throw new IOException("SplitAlgo class " + splitAlgoClassName +
+ " is not instantiable", e);
+ } catch (IllegalAccessException e) {
+ throw new IOException("SplitAlgo class " + splitAlgoClassName +
+ " is not instantiable", e);
+ }
}
public static List<HRegionInfo> getRegionInfosFromManifest(SnapshotManifest manifest) {
@@ -305,6 +340,12 @@ public class TableSnapshotInputFormatImpl {
public static List<InputSplit> getSplits(Scan scan, SnapshotManifest manifest,
List<HRegionInfo> regionManifests, Path restoreDir, Configuration conf) throws IOException {
+ return getSplits(scan, manifest, regionManifests, restoreDir, conf, null, 1);
+ }
+
+ public static List<InputSplit> getSplits(Scan scan, SnapshotManifest manifest,
+ List<HRegionInfo> regionManifests, Path restoreDir,
+ Configuration conf, RegionSplitter.SplitAlgorithm sa, int numSplits) throws IOException {
// load table descriptor
HTableDescriptor htd = manifest.getTableDescriptor();
@@ -317,16 +358,36 @@ public class TableSnapshotInputFormatImpl {
continue;
}
- if (CellUtil.overlappingKeys(scan.getStartRow(), scan.getStopRow(), hri.getStartKey(),
- hri.getEndKey())) {
- // compute HDFS locations from snapshot files (which will get the locations for
- // referred hfiles)
- List<String> hosts = getBestLocations(conf,
- HRegion.computeHDFSBlocksDistribution(conf, htd, hri, tableDir));
-
- int len = Math.min(3, hosts.size());
- hosts = hosts.subList(0, len);
- splits.add(new InputSplit(htd, hri, hosts, scan, restoreDir));
+ if (numSplits > 1) {
+ byte[][] sp = sa.split(hri.getStartKey(), hri.getEndKey(), numSplits, true);
+ for (int i = 0; i < sp.length - 1; i++) {
+ if (CellUtil.overlappingKeys(scan.getStartRow(), scan.getStopRow(), sp[i],
+ sp[i + 1])) {
+ // compute HDFS locations from snapshot files (which will get the locations for
+ // referred hfiles)
+ List<String> hosts = getBestLocations(conf,
+ HRegion.computeHDFSBlocksDistribution(conf, htd, hri, tableDir));
+
+ int len = Math.min(3, hosts.size());
+ hosts = hosts.subList(0, len);
+ Scan boundedScan = new Scan(scan);
+ boundedScan.setStartRow(sp[i]);
+ boundedScan.setStopRow(sp[i + 1]);
+ splits.add(new InputSplit(htd, hri, hosts, boundedScan, restoreDir));
+ }
+ }
+ } else {
+ if (CellUtil.overlappingKeys(scan.getStartRow(), scan.getStopRow(), hri.getStartKey(),
+ hri.getEndKey())) {
+ // compute HDFS locations from snapshot files (which will get the locations for
+ // referred hfiles)
+ List<String> hosts = getBestLocations(conf,
+ HRegion.computeHDFSBlocksDistribution(conf, htd, hri, tableDir));
+
+ int len = Math.min(3, hosts.size());
+ hosts = hosts.subList(0, len);
+ splits.add(new InputSplit(htd, hri, hosts, scan, restoreDir));
+ }
}
}
@@ -395,6 +456,35 @@ public class TableSnapshotInputFormatImpl {
*/
public static void setInput(Configuration conf, String snapshotName, Path restoreDir)
throws IOException {
+ setInput(conf, snapshotName, restoreDir, null, 1);
+ }
+
+ /**
+ * Configures the job to use TableSnapshotInputFormat to read from a snapshot.
+ * @param conf the job to configure
+ * @param snapshotName the name of the snapshot to read from
+ * @param restoreDir a temporary directory to restore the snapshot into. Current user should
+ * have write permissions to this directory, and this should not be a subdirectory of rootdir.
+ * After the job is finished, restoreDir can be deleted.
+ * @param numSplitsPerRegion how many input splits to generate per one region
+ * @param splitAlgo SplitAlgorithm to be used when generating InputSplits
+ * @throws IOException if an error occurs
+ */
+ public static void setInput(Configuration conf, String snapshotName, Path restoreDir,
+ RegionSplitter.SplitAlgorithm splitAlgo, int numSplitsPerRegion)
+ throws IOException {
+ conf.set(SNAPSHOT_NAME_KEY, snapshotName);
+ if (numSplitsPerRegion < 1) {
+ throw new IllegalArgumentException("numSplits must be >= 1, " +
+ "illegal numSplits : " + numSplitsPerRegion);
+ }
+ if (splitAlgo == null && numSplitsPerRegion > 1) {
+ throw new IllegalArgumentException("Split algo can't be null when numSplits > 1");
+ }
+ if (splitAlgo != null) {
+ conf.set(SPLIT_ALGO, splitAlgo.getClass().getName());
+ }
+ conf.setInt(NUM_SPLITS_PER_REGION, numSplitsPerRegion);
conf.set(SNAPSHOT_NAME_KEY, snapshotName);
Path rootDir = FSUtils.getRootDir(conf);
http://git-wip-us.apache.org/repos/asf/hbase/blob/b1912790/hbase-server/src/main/java/org/apache/hadoop/hbase/util/RegionSplitter.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/util/RegionSplitter.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/util/RegionSplitter.java
index ea704f8..2ba1673 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/util/RegionSplitter.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/util/RegionSplitter.java
@@ -46,6 +46,7 @@ import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.ClusterStatus;
import org.apache.hadoop.hbase.HBaseConfiguration;
+import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.HColumnDescriptor;
import org.apache.hadoop.hbase.HRegionInfo;
import org.apache.hadoop.hbase.HRegionLocation;
@@ -179,6 +180,17 @@ public class RegionSplitter {
byte[][] split(int numRegions);
/**
+ * Some MapReduce jobs may want to run multiple mappers per region,
+ * this is intended for such usecase.
+ *
+ * @param start first row (inclusive)
+ * @param end last row (exclusive)
+ * @param numSplits number of splits to generate
+ * @param inclusive whether start and end are returned as split points
+ */
+ byte[][] split(byte[] start, byte[] end, int numSplits, boolean inclusive);
+
+ /**
* In HBase, the first row is represented by an empty byte array. This might
* cause problems with your split algorithm or row printing. All your APIs
* will be passed firstRow() instead of empty array.
@@ -921,6 +933,39 @@ public class RegionSplitter {
return convertToBytes(splits);
}
+ @Override
+ public byte[][] split(byte[] start, byte[] end, int numSplits, boolean inclusive) {
+ BigInteger s = convertToBigInteger(start);
+ BigInteger e = convertToBigInteger(end);
+
+ Preconditions.checkArgument(e.compareTo(s) > 0,
+ "last row (%s) is configured less than first row (%s)", rowToStr(end),
+ end);
+ // +1 to range because the last row is inclusive
+ BigInteger range = e.subtract(s).add(BigInteger.ONE);
+ Preconditions.checkState(range.compareTo(BigInteger.valueOf(numSplits)) >= 0,
+ "split granularity (%s) is greater than the range (%s)", numSplits, range);
+
+ BigInteger[] splits = new BigInteger[numSplits - 1];
+ BigInteger sizeOfEachSplit = range.divide(BigInteger.valueOf(numSplits));
+ for (int i = 1; i < numSplits; i++) {
+ // NOTE: this means the last region gets all the slop.
+ // This is not a big deal if we're assuming n << MAXHEX
+ splits[i - 1] = s.add(sizeOfEachSplit.multiply(BigInteger
+ .valueOf(i)));
+ }
+
+ if (inclusive) {
+ BigInteger[] inclusiveSplitPoints = new BigInteger[numSplits + 1];
+ inclusiveSplitPoints[0] = convertToBigInteger(start);
+ inclusiveSplitPoints[numSplits] = convertToBigInteger(end);
+ System.arraycopy(splits, 0, inclusiveSplitPoints, 1, splits.length);
+ return convertToBytes(inclusiveSplitPoints);
+ } else {
+ return convertToBytes(splits);
+ }
+ }
+
public byte[] firstRow() {
return convertToByte(firstRowInt);
}
@@ -1063,6 +1108,31 @@ public class RegionSplitter {
return Arrays.copyOfRange(splits, 1, splits.length - 1);
}
+ public byte[][] split(byte[] start, byte[] end, int numSplits, boolean inclusive) {
+ if (Arrays.equals(start, HConstants.EMPTY_BYTE_ARRAY)) {
+ start = firstRowBytes;
+ }
+ if (Arrays.equals(end, HConstants.EMPTY_BYTE_ARRAY)) {
+ end = lastRowBytes;
+ }
+ Preconditions.checkArgument(
+ Bytes.compareTo(end, start) > 0,
+ "last row (%s) is configured less than first row (%s)",
+ Bytes.toStringBinary(end),
+ Bytes.toStringBinary(start));
+
+ byte[][] splits = Bytes.split(start, end, true,
+ numSplits - 1);
+ Preconditions.checkState(splits != null,
+ "Could not calculate input splits with given user input: " + this);
+ if (inclusive) {
+ return splits;
+ } else {
+ // remove endpoints, which are included in the splits list
+ return Arrays.copyOfRange(splits, 1, splits.length - 1);
+ }
+ }
+
@Override
public byte[] firstRow() {
return firstRowBytes;
http://git-wip-us.apache.org/repos/asf/hbase/blob/b1912790/hbase-server/src/test/java/org/apache/hadoop/hbase/mapred/TestTableSnapshotInputFormat.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/mapred/TestTableSnapshotInputFormat.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/mapred/TestTableSnapshotInputFormat.java
index 60f19a2..ed98eb3 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/mapred/TestTableSnapshotInputFormat.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/mapred/TestTableSnapshotInputFormat.java
@@ -29,6 +29,7 @@ import org.apache.hadoop.hbase.client.Result;
import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
import org.apache.hadoop.hbase.mapreduce.TableSnapshotInputFormatTestBase;
import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.hadoop.hbase.util.RegionSplitter;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.mapred.InputSplit;
import org.apache.hadoop.mapred.JobClient;
@@ -131,20 +132,20 @@ public class TestTableSnapshotInputFormat extends TableSnapshotInputFormatTestBa
@Test
@Override
public void testWithMockedMapReduceMultiRegion() throws Exception {
- testWithMockedMapReduce(UTIL, "testWithMockedMapReduceMultiRegion", 10, 10);
+ testWithMockedMapReduce(UTIL, "testWithMockedMapReduceMultiRegion", 10, 1, 10);
}
@Test
@Override
public void testWithMapReduceMultiRegion() throws Exception {
- testWithMapReduce(UTIL, "testWithMapReduceMultiRegion", 10, 10, false);
+ testWithMapReduce(UTIL, "testWithMapReduceMultiRegion", 10, 1, 10, false);
}
@Test
@Override
// run the MR job while HBase is offline
public void testWithMapReduceAndOfflineHBaseMultiRegion() throws Exception {
- testWithMapReduce(UTIL, "testWithMapReduceAndOfflineHBaseMultiRegion", 10, 10, true);
+ testWithMapReduce(UTIL, "testWithMapReduceAndOfflineHBaseMultiRegion", 10, 1, 10, true);
}
@Override
@@ -158,7 +159,7 @@ public class TestTableSnapshotInputFormat extends TableSnapshotInputFormatTestBa
@Override
protected void testWithMockedMapReduce(HBaseTestingUtility util, String snapshotName,
- int numRegions, int expectedNumSplits) throws Exception {
+ int numRegions, int numSplitsPerRegion, int expectedNumSplits) throws Exception {
setupCluster();
TableName tableName = TableName.valueOf("testWithMockedMapReduce");
try {
@@ -168,9 +169,16 @@ public class TestTableSnapshotInputFormat extends TableSnapshotInputFormatTestBa
JobConf job = new JobConf(util.getConfiguration());
Path tmpTableDir = util.getRandomDir();
- TableMapReduceUtil.initTableSnapshotMapJob(snapshotName,
- COLUMNS, TestTableSnapshotMapper.class, ImmutableBytesWritable.class,
- NullWritable.class, job, false, tmpTableDir);
+ if (numSplitsPerRegion > 1) {
+ TableMapReduceUtil.initTableSnapshotMapJob(snapshotName,
+ COLUMNS, TestTableSnapshotMapper.class, ImmutableBytesWritable.class,
+ NullWritable.class, job, false, tmpTableDir, new RegionSplitter.UniformSplit(),
+ numSplitsPerRegion);
+ } else {
+ TableMapReduceUtil.initTableSnapshotMapJob(snapshotName,
+ COLUMNS, TestTableSnapshotMapper.class, ImmutableBytesWritable.class,
+ NullWritable.class, job, false, tmpTableDir);
+ }
// mapred doesn't support start and end keys? o.O
verifyWithMockedMapReduce(job, numRegions, expectedNumSplits, getStartRow(), getEndRow());
@@ -219,16 +227,16 @@ public class TestTableSnapshotInputFormat extends TableSnapshotInputFormatTestBa
@Override
protected void testWithMapReduceImpl(HBaseTestingUtility util, TableName tableName,
- String snapshotName, Path tableDir, int numRegions, int expectedNumSplits,
+ String snapshotName, Path tableDir, int numRegions, int numSplitsPerRegion, int expectedNumSplits,
boolean shutdownCluster) throws Exception {
doTestWithMapReduce(util, tableName, snapshotName, getStartRow(), getEndRow(), tableDir,
- numRegions, expectedNumSplits, shutdownCluster);
+ numRegions, numSplitsPerRegion, expectedNumSplits, shutdownCluster);
}
// this is also called by the IntegrationTestTableSnapshotInputFormat
public static void doTestWithMapReduce(HBaseTestingUtility util, TableName tableName,
String snapshotName, byte[] startRow, byte[] endRow, Path tableDir, int numRegions,
- int expectedNumSplits, boolean shutdownCluster) throws Exception {
+ int numSplitsPerRegion, int expectedNumSplits, boolean shutdownCluster) throws Exception {
//create the table and snapshot
createTableAndSnapshot(util, tableName, snapshotName, startRow, endRow, numRegions);
@@ -245,9 +253,16 @@ public class TestTableSnapshotInputFormat extends TableSnapshotInputFormatTestBa
org.apache.hadoop.hbase.mapreduce.TableMapReduceUtil.addDependencyJarsForClasses(jobConf,
TestTableSnapshotInputFormat.class);
- TableMapReduceUtil.initTableSnapshotMapJob(snapshotName, COLUMNS,
- TestTableSnapshotMapper.class, ImmutableBytesWritable.class,
- NullWritable.class, jobConf, true, tableDir);
+ if(numSplitsPerRegion > 1) {
+ TableMapReduceUtil.initTableSnapshotMapJob(snapshotName, COLUMNS,
+ TestTableSnapshotMapper.class, ImmutableBytesWritable.class,
+ NullWritable.class, jobConf, true, tableDir, new RegionSplitter.UniformSplit(),
+ numSplitsPerRegion);
+ } else {
+ TableMapReduceUtil.initTableSnapshotMapJob(snapshotName, COLUMNS,
+ TestTableSnapshotMapper.class, ImmutableBytesWritable.class,
+ NullWritable.class, jobConf, true, tableDir);
+ }
jobConf.setReducerClass(TestTableSnapshotInputFormat.TestTableSnapshotReducer.class);
jobConf.setNumReduceTasks(1);
http://git-wip-us.apache.org/repos/asf/hbase/blob/b1912790/hbase-server/src/test/java/org/apache/hadoop/hbase/mapreduce/TableSnapshotInputFormatTestBase.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/mapreduce/TableSnapshotInputFormatTestBase.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/mapreduce/TableSnapshotInputFormatTestBase.java
index 3df4a8f..9402612 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/mapreduce/TableSnapshotInputFormatTestBase.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/mapreduce/TableSnapshotInputFormatTestBase.java
@@ -76,10 +76,10 @@ public abstract class TableSnapshotInputFormatTestBase {
}
protected abstract void testWithMockedMapReduce(HBaseTestingUtility util, String snapshotName,
- int numRegions, int expectedNumSplits) throws Exception;
+ int numRegions, int numSplitsPerRegion, int expectedNumSplits) throws Exception;
protected abstract void testWithMapReduceImpl(HBaseTestingUtility util, TableName tableName,
- String snapshotName, Path tableDir, int numRegions, int expectedNumSplits,
+ String snapshotName, Path tableDir, int numRegions, int numSplitsPerRegion, int expectedNumSplits,
boolean shutdownCluster) throws Exception;
protected abstract byte[] getStartRow();
@@ -88,28 +88,33 @@ public abstract class TableSnapshotInputFormatTestBase {
@Test
public void testWithMockedMapReduceSingleRegion() throws Exception {
- testWithMockedMapReduce(UTIL, "testWithMockedMapReduceSingleRegion", 1, 1);
+ testWithMockedMapReduce(UTIL, "testWithMockedMapReduceSingleRegion", 1, 1, 1);
}
@Test
public void testWithMockedMapReduceMultiRegion() throws Exception {
- testWithMockedMapReduce(UTIL, "testWithMockedMapReduceMultiRegion", 10, 8);
+ testWithMockedMapReduce(UTIL, "testWithMockedMapReduceMultiRegion", 10, 1, 8);
}
@Test
public void testWithMapReduceSingleRegion() throws Exception {
- testWithMapReduce(UTIL, "testWithMapReduceSingleRegion", 1, 1, false);
+ testWithMapReduce(UTIL, "testWithMapReduceSingleRegion", 1, 1, 1, false);
}
@Test
public void testWithMapReduceMultiRegion() throws Exception {
- testWithMapReduce(UTIL, "testWithMapReduceMultiRegion", 10, 8, false);
+ testWithMapReduce(UTIL, "testWithMapReduceMultiRegion", 10, 1, 8, false);
+ }
+
+ @Test
+ public void testWithMapReduceMultipleMappersPerRegion() throws Exception {
+ testWithMapReduce(UTIL, "testWithMapReduceMultiRegion", 10, 5, 50, false);
}
@Test
// run the MR job while HBase is offline
public void testWithMapReduceAndOfflineHBaseMultiRegion() throws Exception {
- testWithMapReduce(UTIL, "testWithMapReduceAndOfflineHBaseMultiRegion", 10, 8, true);
+ testWithMapReduce(UTIL, "testWithMapReduceAndOfflineHBaseMultiRegion", 10, 1, 8, true);
}
// Test that snapshot restore does not create back references in the HBase root dir.
@@ -157,13 +162,13 @@ public abstract class TableSnapshotInputFormatTestBase {
String snapshotName, Path tmpTableDir) throws Exception;
protected void testWithMapReduce(HBaseTestingUtility util, String snapshotName,
- int numRegions, int expectedNumSplits, boolean shutdownCluster) throws Exception {
+ int numRegions, int numSplitsPerRegion, int expectedNumSplits, boolean shutdownCluster) throws Exception {
setupCluster();
try {
Path tableDir = util.getRandomDir();
TableName tableName = TableName.valueOf("testWithMapReduce");
testWithMapReduceImpl(util, tableName, snapshotName, tableDir, numRegions,
- expectedNumSplits, shutdownCluster);
+ numSplitsPerRegion, expectedNumSplits, shutdownCluster);
} finally {
tearDownCluster();
}
http://git-wip-us.apache.org/repos/asf/hbase/blob/b1912790/hbase-server/src/test/java/org/apache/hadoop/hbase/mapreduce/TestTableSnapshotInputFormat.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/mapreduce/TestTableSnapshotInputFormat.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/mapreduce/TestTableSnapshotInputFormat.java
index 3531dd7..ed24495 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/mapreduce/TestTableSnapshotInputFormat.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/mapreduce/TestTableSnapshotInputFormat.java
@@ -44,6 +44,7 @@ import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
import org.apache.hadoop.hbase.mapreduce.TableSnapshotInputFormat.TableSnapshotRegionSplit;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hbase.util.FSUtils;
+import org.apache.hadoop.hbase.util.RegionSplitter;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.mapreduce.InputSplit;
import org.apache.hadoop.mapreduce.Job;
@@ -195,7 +196,7 @@ public class TestTableSnapshotInputFormat extends TableSnapshotInputFormatTestBa
@Override
public void testWithMockedMapReduce(HBaseTestingUtility util, String snapshotName,
- int numRegions, int expectedNumSplits) throws Exception {
+ int numRegions, int numSplitsPerRegion, int expectedNumSplits) throws Exception {
setupCluster();
TableName tableName = TableName.valueOf("testWithMockedMapReduce");
try {
@@ -206,9 +207,16 @@ public class TestTableSnapshotInputFormat extends TableSnapshotInputFormatTestBa
Path tmpTableDir = util.getRandomDir();
Scan scan = new Scan(getStartRow(), getEndRow()); // limit the scan
- TableMapReduceUtil.initTableSnapshotMapperJob(snapshotName,
- scan, TestTableSnapshotMapper.class, ImmutableBytesWritable.class,
- NullWritable.class, job, false, tmpTableDir);
+ if (numSplitsPerRegion > 1) {
+ TableMapReduceUtil.initTableSnapshotMapperJob(snapshotName,
+ scan, TestTableSnapshotMapper.class, ImmutableBytesWritable.class,
+ NullWritable.class, job, false, tmpTableDir, new RegionSplitter.UniformSplit(),
+ numSplitsPerRegion);
+ } else {
+ TableMapReduceUtil.initTableSnapshotMapperJob(snapshotName,
+ scan, TestTableSnapshotMapper.class, ImmutableBytesWritable.class,
+ NullWritable.class, job, false, tmpTableDir);
+ }
verifyWithMockedMapReduce(job, numRegions, expectedNumSplits, getStartRow(), getEndRow());
@@ -321,16 +329,16 @@ public class TestTableSnapshotInputFormat extends TableSnapshotInputFormatTestBa
@Override
protected void testWithMapReduceImpl(HBaseTestingUtility util, TableName tableName,
- String snapshotName, Path tableDir, int numRegions, int expectedNumSplits,
+ String snapshotName, Path tableDir, int numRegions, int numSplitsPerRegion, int expectedNumSplits,
boolean shutdownCluster) throws Exception {
doTestWithMapReduce(util, tableName, snapshotName, getStartRow(), getEndRow(), tableDir,
- numRegions, expectedNumSplits, shutdownCluster);
+ numRegions, numSplitsPerRegion, expectedNumSplits, shutdownCluster);
}
// this is also called by the IntegrationTestTableSnapshotInputFormat
public static void doTestWithMapReduce(HBaseTestingUtility util, TableName tableName,
String snapshotName, byte[] startRow, byte[] endRow, Path tableDir, int numRegions,
- int expectedNumSplits, boolean shutdownCluster) throws Exception {
+ int numSplitsPerRegion, int expectedNumSplits, boolean shutdownCluster) throws Exception {
//create the table and snapshot
createTableAndSnapshot(util, tableName, snapshotName, startRow, endRow, numRegions);
@@ -348,9 +356,16 @@ public class TestTableSnapshotInputFormat extends TableSnapshotInputFormatTestBa
TableMapReduceUtil.addDependencyJarsForClasses(job.getConfiguration(),
TestTableSnapshotInputFormat.class);
- TableMapReduceUtil.initTableSnapshotMapperJob(snapshotName,
- scan, TestTableSnapshotMapper.class, ImmutableBytesWritable.class,
- NullWritable.class, job, true, tableDir);
+ if (numSplitsPerRegion > 1) {
+ TableMapReduceUtil.initTableSnapshotMapperJob(snapshotName,
+ scan, TestTableSnapshotMapper.class, ImmutableBytesWritable.class,
+ NullWritable.class, job, true, tableDir, new RegionSplitter.UniformSplit(),
+ numSplitsPerRegion);
+ } else {
+ TableMapReduceUtil.initTableSnapshotMapperJob(snapshotName,
+ scan, TestTableSnapshotMapper.class, ImmutableBytesWritable.class,
+ NullWritable.class, job, true, tableDir);
+ }
job.setReducerClass(TestTableSnapshotInputFormat.TestTableSnapshotReducer.class);
job.setNumReduceTasks(1);
http://git-wip-us.apache.org/repos/asf/hbase/blob/b1912790/hbase-server/src/test/java/org/apache/hadoop/hbase/util/TestRegionSplitter.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/util/TestRegionSplitter.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/util/TestRegionSplitter.java
index e343588..e8600a7 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/util/TestRegionSplitter.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/util/TestRegionSplitter.java
@@ -162,6 +162,16 @@ public class TestRegionSplitter {
// Halfway between df... and ff... should be ef....
splitPoint = splitter.split("dfffffff".getBytes(), lastRow);
assertArrayEquals(splitPoint,"efffffff".getBytes());
+
+ // Check splitting region with multiple mappers per region
+ byte[][] splits = splitter.split("00000000".getBytes(), "30000000".getBytes(), 3, false);
+ assertEquals(2, splits.length);
+ assertArrayEquals(splits[0], "10000000".getBytes());
+ assertArrayEquals(splits[1], "20000000".getBytes());
+
+ splits = splitter.split("00000000".getBytes(), "20000000".getBytes(), 2, true);
+ assertEquals(3, splits.length);
+ assertArrayEquals(splits[1], "10000000".getBytes());
}
/**
@@ -210,6 +220,16 @@ public class TestRegionSplitter {
splitPoint = splitter.split(new byte[] {'a', 'a', 'a'}, new byte[] {'a', 'a', 'b'});
assertArrayEquals(splitPoint, new byte[] {'a', 'a', 'a', (byte)0x80 });
+
+ // Check splitting region with multiple mappers per region
+ byte[][] splits = splitter.split(new byte[] {'a', 'a', 'a'}, new byte[] {'a', 'a', 'd'}, 3, false);
+ assertEquals(2, splits.length);
+ assertArrayEquals(splits[0], new byte[]{'a', 'a', 'b'});
+ assertArrayEquals(splits[1], new byte[]{'a', 'a', 'c'});
+
+ splits = splitter.split(new byte[] {'a', 'a', 'a'}, new byte[] {'a', 'a', 'e'}, 2, true);
+ assertEquals(3, splits.length);
+ assertArrayEquals(splits[1], new byte[] { 'a', 'a', 'c'});
}
@Test