You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hbase.apache.org by ap...@apache.org on 2016/07/22 01:16:03 UTC
[7/7] hbase git commit: HBASE-16108 RowCounter should support
multiple key ranges (Konstantin Ryakhovskiy)
HBASE-16108 RowCounter should support multiple key ranges (Konstantin Ryakhovskiy)
Conflicts:
hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/RowCounter.java
hbase-server/src/test/java/org/apache/hadoop/hbase/mapreduce/TestRowCounter.java
Project: http://git-wip-us.apache.org/repos/asf/hbase/repo
Commit: http://git-wip-us.apache.org/repos/asf/hbase/commit/10e06ca6
Tree: http://git-wip-us.apache.org/repos/asf/hbase/tree/10e06ca6
Diff: http://git-wip-us.apache.org/repos/asf/hbase/diff/10e06ca6
Branch: refs/heads/0.98
Commit: 10e06ca618e5ef809ea88d04e553d534e2327ec9
Parents: e5d0d48
Author: tedyu <yu...@gmail.com>
Authored: Fri Jul 1 09:32:43 2016 -0700
Committer: Andrew Purtell <ap...@apache.org>
Committed: Thu Jul 21 18:04:29 2016 -0700
----------------------------------------------------------------------
.../hadoop/hbase/mapreduce/RowCounter.java | 88 ++++++----
.../hadoop/hbase/mapreduce/TestRowCounter.java | 165 ++++++++++++++++---
2 files changed, 198 insertions(+), 55 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/hbase/blob/10e06ca6/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/RowCounter.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/RowCounter.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/RowCounter.java
index 82e5f6e..3d4bf2d 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/RowCounter.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/RowCounter.java
@@ -19,8 +19,8 @@
package org.apache.hadoop.hbase.mapreduce;
import java.io.IOException;
-import java.util.Set;
-import java.util.TreeSet;
+import java.util.List;
+import java.util.ArrayList;
import org.apache.commons.lang.StringUtils;
import org.apache.hadoop.hbase.HConstants;
@@ -30,8 +30,9 @@ import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.client.Result;
import org.apache.hadoop.hbase.client.Scan;
+import org.apache.hadoop.hbase.filter.FilterBase;
import org.apache.hadoop.hbase.filter.FirstKeyOnlyFilter;
-import org.apache.hadoop.hbase.filter.FirstKeyValueMatchingQualifiersFilter;
+import org.apache.hadoop.hbase.filter.MultiRowRangeFilter;
import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.mapreduce.Job;
@@ -88,8 +89,7 @@ public class RowCounter {
public static Job createSubmittableJob(Configuration conf, String[] args)
throws IOException {
String tableName = args[0];
- String startKey = null;
- String endKey = null;
+ List<MultiRowRangeFilter.RowRange> rowRangeList = null;
long startTime = 0;
long endTime = 0;
@@ -102,14 +102,12 @@ public class RowCounter {
// First argument is table name, starting from second
for (int i = 1; i < args.length; i++) {
if (args[i].startsWith(rangeSwitch)) {
- String[] startEnd = args[i].substring(rangeSwitch.length()).split(",", 2);
- if (startEnd.length != 2 || startEnd[1].contains(",")) {
- printUsage("Please specify range in such format as \"--range=a,b\" " +
- "or, with only one boundary, \"--range=,b\" or \"--range=a,\"");
+ try {
+ rowRangeList = parseRowRangeParameter(args[i], rangeSwitch);
+ } catch (IllegalArgumentException e) {
return null;
}
- startKey = startEnd[0];
- endKey = startEnd[1];
+ continue;
}
if (startTime < endTime) {
printUsage("--endtime=" + endTime + " needs to be greater than --starttime=" + startTime);
@@ -134,14 +132,7 @@ public class RowCounter {
job.setJarByClass(RowCounter.class);
Scan scan = new Scan();
scan.setCacheBlocks(false);
- Set<byte []> qualifiers = new TreeSet<byte[]>(Bytes.BYTES_COMPARATOR);
- if (startKey != null && !startKey.equals("")) {
- scan.setStartRow(Bytes.toBytes(startKey));
- }
- if (endKey != null && !endKey.equals("")) {
- scan.setStopRow(Bytes.toBytes(endKey));
- }
- scan.setFilter(new FirstKeyOnlyFilter());
+ setScanFilter(scan, rowRangeList);
if (sb.length() > 0) {
for (String columnName : sb.toString().trim().split(" ")) {
String family = StringUtils.substringBefore(columnName, ":");
@@ -155,14 +146,6 @@ public class RowCounter {
}
}
}
- // specified column may or may not be part of first key value for the row.
- // Hence do not use FirstKeyOnlyFilter if scan has columns, instead use
- // FirstKeyValueMatchingQualifiersFilter.
- if (qualifiers.size() == 0) {
- scan.setFilter(new FirstKeyOnlyFilter());
- } else {
- scan.setFilter(new FirstKeyValueMatchingQualifiersFilter(qualifiers));
- }
scan.setTimeRange(startTime, endTime == 0 ? HConstants.LATEST_TIMESTAMP : endTime);
job.setOutputFormatClass(NullOutputFormat.class);
TableMapReduceUtil.initTableMapperJob(tableName, scan,
@@ -171,6 +154,55 @@ public class RowCounter {
return job;
}
+ private static List<MultiRowRangeFilter.RowRange> parseRowRangeParameter(
+ String arg, String rangeSwitch) {
+ final String[] ranges = arg.substring(rangeSwitch.length()).split(";");
+ final List<MultiRowRangeFilter.RowRange> rangeList =
+ new ArrayList<MultiRowRangeFilter.RowRange>();
+ for (String range : ranges) {
+ String[] startEnd = range.split(",", 2);
+ if (startEnd.length != 2 || startEnd[1].contains(",")) {
+ printUsage("Please specify range in such format as \"--range=a,b\" " +
+ "or, with only one boundary, \"--range=,b\" or \"--range=a,\"");
+ throw new IllegalArgumentException("Wrong range specification: " + range);
+ }
+ String startKey = startEnd[0];
+ String endKey = startEnd[1];
+ rangeList.add(new MultiRowRangeFilter.RowRange(
+ Bytes.toBytesBinary(startKey), true,
+ Bytes.toBytesBinary(endKey), false));
+ }
+ return rangeList;
+ }
+
+ /**
+ * Sets filter {@link FilterBase} to the {@link Scan} instance.
+ * If provided rowRangeList contains more than one element,
+ * method sets filter which is instance of {@link MultiRowRangeFilter}.
+ * Otherwise, method sets filter which is instance of {@link FirstKeyOnlyFilter}.
+ * If rowRangeList contains exactly one element, startRow and stopRow are set to the scan.
+ * @param scan
+ * @param rowRangeList
+ */
+ private static void setScanFilter(Scan scan, List<MultiRowRangeFilter.RowRange> rowRangeList) {
+ final int size = rowRangeList == null ? 0 : rowRangeList.size();
+ if (size <= 1) {
+ scan.setFilter(new FirstKeyOnlyFilter());
+ }
+ if (size == 1) {
+ MultiRowRangeFilter.RowRange range = rowRangeList.get(0);
+ scan.setStartRow(range.getStartRow()); //inclusive
+ scan.setStopRow(range.getStopRow()); //exclusive
+ } else if (size > 1) {
+ try {
+ scan.setFilter(new MultiRowRangeFilter(rowRangeList));
+ } catch (IOException e) {
+ //the IOException should never be thrown. see HBASE-16145
+ throw new RuntimeException("Cannot instantiate MultiRowRangeFilter");
+ }
+ }
+ }
+
/*
* @param errorMessage Can attach a message when error occurs.
*/
@@ -185,7 +217,7 @@ public class RowCounter {
private static void printUsage() {
System.err.println("Usage: RowCounter [options] <tablename> " +
"[--starttime=[start] --endtime=[end] " +
- "[--range=[startKey],[endKey]] [<column1> <column2>...]");
+ "[--range=[startKey],[endKey][;[startKey],[endKey]...]] [<column1> <column2>...]");
System.err.println("For performance consider the following options:\n"
+ "-Dhbase.client.scanner.caching=100\n"
+ "-Dmapred.map.tasks.speculative.execution=false");
http://git-wip-us.apache.org/repos/asf/hbase/blob/10e06ca6/hbase-server/src/test/java/org/apache/hadoop/hbase/mapreduce/TestRowCounter.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/mapreduce/TestRowCounter.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/mapreduce/TestRowCounter.java
index 7b5af6e..2a6a048 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/mapreduce/TestRowCounter.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/mapreduce/TestRowCounter.java
@@ -69,9 +69,8 @@ public class TestRowCounter {
throws Exception {
TEST_UTIL.startMiniCluster();
TEST_UTIL.startMiniMapReduceCluster();
- HTable table = TEST_UTIL.createTable(Bytes.toBytes(TABLE_NAME),
- Bytes.toBytes(COL_FAM));
- writeRows(table);
+ HTable table = TEST_UTIL.createTable(TableName.valueOf(TABLE_NAME), Bytes.toBytes(COL_FAM));
+ writeRows(table, TOTAL_ROWS, ROWS_WITH_ONE_COL);
table.close();
}
@@ -137,6 +136,100 @@ public class TestRowCounter {
}
/**
+ * Test a case when a range is specified with single range of start-end keys
+ * @throws Exception
+ */
+ @Test
+ public void testRowCounterRowSingleRange() throws Exception {
+ String[] args = new String[] {
+ TABLE_NAME, "--range=row1,row3"
+ };
+ runRowCount(args, 2);
+ }
+
+ /**
+ * Test a case when a range is specified with single range with end key only
+ * @throws Exception
+ */
+ @Test
+ public void testRowCounterRowSingleRangeUpperBound() throws Exception {
+ String[] args = new String[] {
+ TABLE_NAME, "--range=,row3"
+ };
+ runRowCount(args, 3);
+ }
+
+ /**
+ * Test a case when a range is specified with two ranges where one range is with end key only
+ * @throws Exception
+ */
+ @Test
+ public void testRowCounterRowMultiRangeUpperBound() throws Exception {
+ String[] args = new String[] {
+ TABLE_NAME, "--range=,row3;row5,row7"
+ };
+ runRowCount(args, 5);
+ }
+ /**
+ * Test a case when a range is specified with multiple ranges of start-end keys
+ * @throws Exception
+ */
+ @Test
+ public void testRowCounterRowMultiRange() throws Exception {
+ String[] args = new String[] {
+ TABLE_NAME, "--range=row1,row3;row5,row8"
+ };
+ runRowCount(args, 5);
+ }
+
+ /**
+ * Test a case when a range is specified with multiple ranges of start-end keys;
+ * one range is filled, another two are not
+ * @throws Exception
+ */
+ @Test
+ public void testRowCounterRowMultiEmptyRange() throws Exception {
+ String[] args = new String[] {
+ TABLE_NAME, "--range=row1,row3;;"
+ };
+ runRowCount(args, 2);
+ }
+
+ @Test
+ public void testRowCounter10kRowRange() throws Exception {
+ String tableName = TABLE_NAME + "10k";
+ HTable table = TEST_UTIL.createTable(TableName.valueOf(tableName), Bytes.toBytes(COL_FAM));
+ try {
+ writeRows(table, 10000, 0);
+ } finally {
+ table.close();
+ }
+ String[] args = new String[] {
+ tableName, "--range=row9872,row9875"
+ };
+ runRowCount(args, 3);
+ }
+
+ /**
+ * test case for HBASE-15287
+ * @throws Exception
+ */
+ @Test
+ public void testRowCounterRowRangeBinary() throws Exception {
+ String tableName = TABLE_NAME + "Binary";
+ HTable table = TEST_UTIL.createTable(TableName.valueOf(tableName), Bytes.toBytes(COL_FAM));
+ try {
+ writeRows(table, 10, 0, true);
+ } finally {
+ table.close();
+ }
+ String[] args = new String[] {
+ tableName, "--range=\\x00row5,\\x00row8"
+ };
+ runRowCount(args, 3);
+ }
+
+ /**
* Test a case when the timerange is specified with --starttime and --endtime options
*
* @throws Exception
@@ -144,6 +237,7 @@ public class TestRowCounter {
@Test
public void testRowCounterTimeRange()
throws Exception {
+ final String tableName = TABLE_NAME + "TimeRange";
final byte[] family = Bytes.toBytes(COL_FAM);
final byte[] col1 = Bytes.toBytes(COL1);
Put put1 = new Put(Bytes.toBytes("row_timerange_" + 1));
@@ -153,32 +247,35 @@ public class TestRowCounter {
long ts;
// clean up content of TABLE_NAME
- HTable table = TEST_UTIL.truncateTable(TableName.valueOf(TABLE_NAME));
- ts = System.currentTimeMillis();
- put1.add(family, col1, ts, Bytes.toBytes("val1"));
- table.put(put1);
- Thread.sleep(100);
-
- ts = System.currentTimeMillis();
- put2.add(family, col1, ts, Bytes.toBytes("val2"));
- put3.add(family, col1, ts, Bytes.toBytes("val3"));
- table.put(put2);
- table.put(put3);
- table.close();
- String[] args = new String[] {TABLE_NAME, COL_FAM + ":" + COL1, "--starttime=" + 0,
+ HTable table = TEST_UTIL.createTable(TableName.valueOf(tableName), Bytes.toBytes(COL_FAM));
+ try {
+ ts = System.currentTimeMillis();
+ put1.add(family, col1, ts, Bytes.toBytes("val1"));
+ table.put(put1);
+ Thread.sleep(100);
+ ts = System.currentTimeMillis();
+ put2.add(family, col1, ts, Bytes.toBytes("val2"));
+ put3.add(family, col1, ts, Bytes.toBytes("val3"));
+ table.put(put2);
+ table.put(put3);
+ } finally {
+ table.close();
+ }
+
+ String[] args = new String[] {tableName, COL_FAM + ":" + COL1, "--starttime=" + 0,
"--endtime=" + ts};
runRowCount(args, 1);
- args = new String[] {TABLE_NAME, COL_FAM + ":" + COL1, "--starttime=" + 0,
+ args = new String[] {tableName, COL_FAM + ":" + COL1, "--starttime=" + 0,
"--endtime=" + (ts - 10)};
runRowCount(args, 1);
- args = new String[] {TABLE_NAME, COL_FAM + ":" + COL1, "--starttime=" + ts,
+ args = new String[] {tableName, COL_FAM + ":" + COL1, "--starttime=" + ts,
"--endtime=" + (ts + 1000)};
runRowCount(args, 2);
- args = new String[] {TABLE_NAME, COL_FAM + ":" + COL1, "--starttime=" + (ts - 30 * 1000),
+ args = new String[] {tableName, COL_FAM + ":" + COL1, "--starttime=" + (ts - 30 * 1000),
"--endtime=" + (ts + 30 * 1000),};
runRowCount(args, 3);
}
@@ -196,20 +293,32 @@ public class TestRowCounter {
Configuration conf = opts.getConfiguration();
args = opts.getRemainingArgs();
Job job = RowCounter.createSubmittableJob(conf, args);
+ long start = System.currentTimeMillis();
job.waitForCompletion(true);
+ long duration = System.currentTimeMillis() - start;
+ LOG.debug("row count duration (ms): " + duration);
assertTrue(job.isSuccessful());
Counter counter = job.getCounters().findCounter(RowCounterMapper.Counters.ROWS);
assertEquals(expectedCount, counter.getValue());
}
+ private static void writeRows(HTable table, int totalRows, int rowsWithOneCol)
+ throws IOException {
+ writeRows(table, totalRows, rowsWithOneCol, false);
+ }
+
/**
* Writes TOTAL_ROWS number of distinct rows in to the table. Few rows have
* two columns, Few have one.
- *
* @param table
+ * @param totalRows total number of rows to be added to the table
+ * @param rowsWithOneCol number of rows with one column to be added to the table
+ * @param writeBinary whether row prefix has to have \x00 in the beginning
* @throws IOException
*/
- private static void writeRows(HTable table) throws IOException {
+ private static void writeRows(HTable table, int totalRows, int rowsWithOneCol, boolean writeBinary)
+ throws IOException {
+ final String rowPrefix = writeBinary ? "\\x00row" : "row";
final byte[] family = Bytes.toBytes(COL_FAM);
final byte[] value = Bytes.toBytes("abcd");
final byte[] col1 = Bytes.toBytes(COL1);
@@ -218,8 +327,8 @@ public class TestRowCounter {
ArrayList<Put> rowsUpdate = new ArrayList<Put>();
// write few rows with two columns
int i = 0;
- for (; i < TOTAL_ROWS - ROWS_WITH_ONE_COL; i++) {
- byte[] row = Bytes.toBytes("row" + i);
+ for (; i < totalRows - rowsWithOneCol; i++) {
+ byte[] row = Bytes.toBytesBinary(rowPrefix + i);
Put put = new Put(row);
put.add(family, col1, value);
put.add(family, col2, value);
@@ -228,8 +337,8 @@ public class TestRowCounter {
}
// write few rows with only one column
- for (; i < TOTAL_ROWS; i++) {
- byte[] row = Bytes.toBytes("row" + i);
+ for (; i < totalRows; i++) {
+ byte[] row = Bytes.toBytesBinary(rowPrefix + i);
Put put = new Put(row);
put.add(family, col2, value);
rowsUpdate.add(put);
@@ -261,7 +370,8 @@ public class TestRowCounter {
assertTrue(data.toString().contains("Wrong number of parameters:"));
assertTrue(data.toString().contains("Usage: RowCounter [options] <tablename> " +
"[--starttime=[start] --endtime=[end] " +
- "[--range=[startKey],[endKey]] " +
+ "[--range=[startKey],[endKey]" +
+ "[;[startKey],[endKey]...]] " +
"[<column1> <column2>...]"));
assertTrue(data.toString().contains("-Dhbase.client.scanner.caching=100"));
assertTrue(data.toString().contains("-Dmapred.map.tasks.speculative.execution=false"));
@@ -279,7 +389,8 @@ public class TestRowCounter {
" \"--range=,b\" or \"--range=a,\""));
assertTrue(data.toString().contains("Usage: RowCounter [options] <tablename> " +
"[--starttime=[start] --endtime=[end] " +
- "[--range=[startKey],[endKey]] " +
+ "[--range=[startKey],[endKey]" +
+ "[;[startKey],[endKey]...]] " +
"[<column1> <column2>...]"));
}