You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hbase.apache.org by ap...@apache.org on 2017/08/26 01:39:15 UTC
[15/41] hbase git commit: HBASE-18640 Move mapreduce out of
hbase-server into separate module.
http://git-wip-us.apache.org/repos/asf/hbase/blob/664b6be0/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/SyncTable.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/SyncTable.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/SyncTable.java
deleted file mode 100644
index c72a0c3..0000000
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/SyncTable.java
+++ /dev/null
@@ -1,786 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.hadoop.hbase.mapreduce;
-
-import java.io.IOException;
-import java.util.Iterator;
-import java.util.Collections;
-
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.conf.Configured;
-import org.apache.hadoop.fs.FileStatus;
-import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.hbase.Cell;
-import org.apache.hadoop.hbase.CellComparator;
-import org.apache.hadoop.hbase.CellUtil;
-import org.apache.hadoop.hbase.HBaseConfiguration;
-import org.apache.hadoop.hbase.TableName;
-import org.apache.hadoop.hbase.client.Connection;
-import org.apache.hadoop.hbase.client.ConnectionFactory;
-import org.apache.hadoop.hbase.client.Delete;
-import org.apache.hadoop.hbase.client.Mutation;
-import org.apache.hadoop.hbase.client.Put;
-import org.apache.hadoop.hbase.client.Result;
-import org.apache.hadoop.hbase.client.ResultScanner;
-import org.apache.hadoop.hbase.client.Scan;
-import org.apache.hadoop.hbase.client.Table;
-import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
-import org.apache.hadoop.hbase.util.Bytes;
-import org.apache.hadoop.mapreduce.Counters;
-import org.apache.hadoop.mapreduce.Job;
-import org.apache.hadoop.mapreduce.lib.output.NullOutputFormat;
-import org.apache.hadoop.util.GenericOptionsParser;
-import org.apache.hadoop.util.Tool;
-import org.apache.hadoop.util.ToolRunner;
-
-import org.apache.hadoop.hbase.shaded.com.google.common.base.Throwables;
-import org.apache.hadoop.hbase.shaded.com.google.common.collect.Iterators;
-
-public class SyncTable extends Configured implements Tool {
-
- private static final Log LOG = LogFactory.getLog(SyncTable.class);
-
- static final String SOURCE_HASH_DIR_CONF_KEY = "sync.table.source.hash.dir";
- static final String SOURCE_TABLE_CONF_KEY = "sync.table.source.table.name";
- static final String TARGET_TABLE_CONF_KEY = "sync.table.target.table.name";
- static final String SOURCE_ZK_CLUSTER_CONF_KEY = "sync.table.source.zk.cluster";
- static final String TARGET_ZK_CLUSTER_CONF_KEY = "sync.table.target.zk.cluster";
- static final String DRY_RUN_CONF_KEY="sync.table.dry.run";
-
- Path sourceHashDir;
- String sourceTableName;
- String targetTableName;
-
- String sourceZkCluster;
- String targetZkCluster;
- boolean dryRun;
-
- Counters counters;
-
- public SyncTable(Configuration conf) {
- super(conf);
- }
-
- public Job createSubmittableJob(String[] args) throws IOException {
- FileSystem fs = sourceHashDir.getFileSystem(getConf());
- if (!fs.exists(sourceHashDir)) {
- throw new IOException("Source hash dir not found: " + sourceHashDir);
- }
-
- HashTable.TableHash tableHash = HashTable.TableHash.read(getConf(), sourceHashDir);
- LOG.info("Read source hash manifest: " + tableHash);
- LOG.info("Read " + tableHash.partitions.size() + " partition keys");
- if (!tableHash.tableName.equals(sourceTableName)) {
- LOG.warn("Table name mismatch - manifest indicates hash was taken from: "
- + tableHash.tableName + " but job is reading from: " + sourceTableName);
- }
- if (tableHash.numHashFiles != tableHash.partitions.size() + 1) {
- throw new RuntimeException("Hash data appears corrupt. The number of of hash files created"
- + " should be 1 more than the number of partition keys. However, the manifest file "
- + " says numHashFiles=" + tableHash.numHashFiles + " but the number of partition keys"
- + " found in the partitions file is " + tableHash.partitions.size());
- }
-
- Path dataDir = new Path(sourceHashDir, HashTable.HASH_DATA_DIR);
- int dataSubdirCount = 0;
- for (FileStatus file : fs.listStatus(dataDir)) {
- if (file.getPath().getName().startsWith(HashTable.OUTPUT_DATA_FILE_PREFIX)) {
- dataSubdirCount++;
- }
- }
-
- if (dataSubdirCount != tableHash.numHashFiles) {
- throw new RuntimeException("Hash data appears corrupt. The number of of hash files created"
- + " should be 1 more than the number of partition keys. However, the number of data dirs"
- + " found is " + dataSubdirCount + " but the number of partition keys"
- + " found in the partitions file is " + tableHash.partitions.size());
- }
-
- Job job = Job.getInstance(getConf(),getConf().get("mapreduce.job.name",
- "syncTable_" + sourceTableName + "-" + targetTableName));
- Configuration jobConf = job.getConfiguration();
- job.setJarByClass(HashTable.class);
- jobConf.set(SOURCE_HASH_DIR_CONF_KEY, sourceHashDir.toString());
- jobConf.set(SOURCE_TABLE_CONF_KEY, sourceTableName);
- jobConf.set(TARGET_TABLE_CONF_KEY, targetTableName);
- if (sourceZkCluster != null) {
- jobConf.set(SOURCE_ZK_CLUSTER_CONF_KEY, sourceZkCluster);
- }
- if (targetZkCluster != null) {
- jobConf.set(TARGET_ZK_CLUSTER_CONF_KEY, targetZkCluster);
- }
- jobConf.setBoolean(DRY_RUN_CONF_KEY, dryRun);
-
- TableMapReduceUtil.initTableMapperJob(targetTableName, tableHash.initScan(),
- SyncMapper.class, null, null, job);
-
- job.setNumReduceTasks(0);
-
- if (dryRun) {
- job.setOutputFormatClass(NullOutputFormat.class);
- } else {
- // No reducers. Just write straight to table. Call initTableReducerJob
- // because it sets up the TableOutputFormat.
- TableMapReduceUtil.initTableReducerJob(targetTableName, null, job, null,
- targetZkCluster, null, null);
-
- // would be nice to add an option for bulk load instead
- }
-
- // Obtain an authentication token, for the specified cluster, on behalf of the current user
- if (sourceZkCluster != null) {
- Configuration peerConf =
- HBaseConfiguration.createClusterConf(job.getConfiguration(), sourceZkCluster);
- TableMapReduceUtil.initCredentialsForCluster(job, peerConf);
- }
- return job;
- }
-
- public static class SyncMapper extends TableMapper<ImmutableBytesWritable, Mutation> {
- Path sourceHashDir;
-
- Connection sourceConnection;
- Connection targetConnection;
- Table sourceTable;
- Table targetTable;
- boolean dryRun;
-
- HashTable.TableHash sourceTableHash;
- HashTable.TableHash.Reader sourceHashReader;
- ImmutableBytesWritable currentSourceHash;
- ImmutableBytesWritable nextSourceKey;
- HashTable.ResultHasher targetHasher;
-
- Throwable mapperException;
-
- public static enum Counter {BATCHES, HASHES_MATCHED, HASHES_NOT_MATCHED, SOURCEMISSINGROWS,
- SOURCEMISSINGCELLS, TARGETMISSINGROWS, TARGETMISSINGCELLS, ROWSWITHDIFFS, DIFFERENTCELLVALUES,
- MATCHINGROWS, MATCHINGCELLS, EMPTY_BATCHES, RANGESMATCHED, RANGESNOTMATCHED};
-
- @Override
- protected void setup(Context context) throws IOException {
-
- Configuration conf = context.getConfiguration();
- sourceHashDir = new Path(conf.get(SOURCE_HASH_DIR_CONF_KEY));
- sourceConnection = openConnection(conf, SOURCE_ZK_CLUSTER_CONF_KEY, null);
- targetConnection = openConnection(conf, TARGET_ZK_CLUSTER_CONF_KEY,
- TableOutputFormat.OUTPUT_CONF_PREFIX);
- sourceTable = openTable(sourceConnection, conf, SOURCE_TABLE_CONF_KEY);
- targetTable = openTable(targetConnection, conf, TARGET_TABLE_CONF_KEY);
- dryRun = conf.getBoolean(SOURCE_TABLE_CONF_KEY, false);
-
- sourceTableHash = HashTable.TableHash.read(conf, sourceHashDir);
- LOG.info("Read source hash manifest: " + sourceTableHash);
- LOG.info("Read " + sourceTableHash.partitions.size() + " partition keys");
-
- TableSplit split = (TableSplit) context.getInputSplit();
- ImmutableBytesWritable splitStartKey = new ImmutableBytesWritable(split.getStartRow());
-
- sourceHashReader = sourceTableHash.newReader(conf, splitStartKey);
- findNextKeyHashPair();
-
- // create a hasher, but don't start it right away
- // instead, find the first hash batch at or after the start row
- // and skip any rows that come before. they will be caught by the previous task
- targetHasher = new HashTable.ResultHasher();
- }
-
- private static Connection openConnection(Configuration conf, String zkClusterConfKey,
- String configPrefix)
- throws IOException {
- String zkCluster = conf.get(zkClusterConfKey);
- Configuration clusterConf = HBaseConfiguration.createClusterConf(conf,
- zkCluster, configPrefix);
- return ConnectionFactory.createConnection(clusterConf);
- }
-
- private static Table openTable(Connection connection, Configuration conf,
- String tableNameConfKey) throws IOException {
- return connection.getTable(TableName.valueOf(conf.get(tableNameConfKey)));
- }
-
- /**
- * Attempt to read the next source key/hash pair.
- * If there are no more, set nextSourceKey to null
- */
- private void findNextKeyHashPair() throws IOException {
- boolean hasNext = sourceHashReader.next();
- if (hasNext) {
- nextSourceKey = sourceHashReader.getCurrentKey();
- } else {
- // no more keys - last hash goes to the end
- nextSourceKey = null;
- }
- }
-
- @Override
- protected void map(ImmutableBytesWritable key, Result value, Context context)
- throws IOException, InterruptedException {
- try {
- // first, finish any hash batches that end before the scanned row
- while (nextSourceKey != null && key.compareTo(nextSourceKey) >= 0) {
- moveToNextBatch(context);
- }
-
- // next, add the scanned row (as long as we've reached the first batch)
- if (targetHasher.isBatchStarted()) {
- targetHasher.hashResult(value);
- }
- } catch (Throwable t) {
- mapperException = t;
- Throwables.propagateIfInstanceOf(t, IOException.class);
- Throwables.propagateIfInstanceOf(t, InterruptedException.class);
- Throwables.propagate(t);
- }
- }
-
- /**
- * If there is an open hash batch, complete it and sync if there are diffs.
- * Start a new batch, and seek to read the
- */
- private void moveToNextBatch(Context context) throws IOException, InterruptedException {
- if (targetHasher.isBatchStarted()) {
- finishBatchAndCompareHashes(context);
- }
- targetHasher.startBatch(nextSourceKey);
- currentSourceHash = sourceHashReader.getCurrentHash();
-
- findNextKeyHashPair();
- }
-
- /**
- * Finish the currently open hash batch.
- * Compare the target hash to the given source hash.
- * If they do not match, then sync the covered key range.
- */
- private void finishBatchAndCompareHashes(Context context)
- throws IOException, InterruptedException {
- targetHasher.finishBatch();
- context.getCounter(Counter.BATCHES).increment(1);
- if (targetHasher.getBatchSize() == 0) {
- context.getCounter(Counter.EMPTY_BATCHES).increment(1);
- }
- ImmutableBytesWritable targetHash = targetHasher.getBatchHash();
- if (targetHash.equals(currentSourceHash)) {
- context.getCounter(Counter.HASHES_MATCHED).increment(1);
- } else {
- context.getCounter(Counter.HASHES_NOT_MATCHED).increment(1);
-
- ImmutableBytesWritable stopRow = nextSourceKey == null
- ? new ImmutableBytesWritable(sourceTableHash.stopRow)
- : nextSourceKey;
-
- if (LOG.isDebugEnabled()) {
- LOG.debug("Hash mismatch. Key range: " + toHex(targetHasher.getBatchStartKey())
- + " to " + toHex(stopRow)
- + " sourceHash: " + toHex(currentSourceHash)
- + " targetHash: " + toHex(targetHash));
- }
-
- syncRange(context, targetHasher.getBatchStartKey(), stopRow);
- }
- }
- private static String toHex(ImmutableBytesWritable bytes) {
- return Bytes.toHex(bytes.get(), bytes.getOffset(), bytes.getLength());
- }
-
- private static final CellScanner EMPTY_CELL_SCANNER
- = new CellScanner(Collections.<Result>emptyIterator());
-
- /**
- * Rescan the given range directly from the source and target tables.
- * Count and log differences, and if this is not a dry run, output Puts and Deletes
- * to make the target table match the source table for this range
- */
- private void syncRange(Context context, ImmutableBytesWritable startRow,
- ImmutableBytesWritable stopRow) throws IOException, InterruptedException {
- Scan scan = sourceTableHash.initScan();
- scan.setStartRow(startRow.copyBytes());
- scan.setStopRow(stopRow.copyBytes());
-
- ResultScanner sourceScanner = sourceTable.getScanner(scan);
- CellScanner sourceCells = new CellScanner(sourceScanner.iterator());
-
- ResultScanner targetScanner = targetTable.getScanner(new Scan(scan));
- CellScanner targetCells = new CellScanner(targetScanner.iterator());
-
- boolean rangeMatched = true;
- byte[] nextSourceRow = sourceCells.nextRow();
- byte[] nextTargetRow = targetCells.nextRow();
- while(nextSourceRow != null || nextTargetRow != null) {
- boolean rowMatched;
- int rowComparison = compareRowKeys(nextSourceRow, nextTargetRow);
- if (rowComparison < 0) {
- if (LOG.isInfoEnabled()) {
- LOG.info("Target missing row: " + Bytes.toHex(nextSourceRow));
- }
- context.getCounter(Counter.TARGETMISSINGROWS).increment(1);
-
- rowMatched = syncRowCells(context, nextSourceRow, sourceCells, EMPTY_CELL_SCANNER);
- nextSourceRow = sourceCells.nextRow(); // advance only source to next row
- } else if (rowComparison > 0) {
- if (LOG.isInfoEnabled()) {
- LOG.info("Source missing row: " + Bytes.toHex(nextTargetRow));
- }
- context.getCounter(Counter.SOURCEMISSINGROWS).increment(1);
-
- rowMatched = syncRowCells(context, nextTargetRow, EMPTY_CELL_SCANNER, targetCells);
- nextTargetRow = targetCells.nextRow(); // advance only target to next row
- } else {
- // current row is the same on both sides, compare cell by cell
- rowMatched = syncRowCells(context, nextSourceRow, sourceCells, targetCells);
- nextSourceRow = sourceCells.nextRow();
- nextTargetRow = targetCells.nextRow();
- }
-
- if (!rowMatched) {
- rangeMatched = false;
- }
- }
-
- sourceScanner.close();
- targetScanner.close();
-
- context.getCounter(rangeMatched ? Counter.RANGESMATCHED : Counter.RANGESNOTMATCHED)
- .increment(1);
- }
-
- private static class CellScanner {
- private final Iterator<Result> results;
-
- private byte[] currentRow;
- private Result currentRowResult;
- private int nextCellInRow;
-
- private Result nextRowResult;
-
- public CellScanner(Iterator<Result> results) {
- this.results = results;
- }
-
- /**
- * Advance to the next row and return its row key.
- * Returns null iff there are no more rows.
- */
- public byte[] nextRow() {
- if (nextRowResult == null) {
- // no cached row - check scanner for more
- while (results.hasNext()) {
- nextRowResult = results.next();
- Cell nextCell = nextRowResult.rawCells()[0];
- if (currentRow == null
- || !Bytes.equals(currentRow, 0, currentRow.length, nextCell.getRowArray(),
- nextCell.getRowOffset(), nextCell.getRowLength())) {
- // found next row
- break;
- } else {
- // found another result from current row, keep scanning
- nextRowResult = null;
- }
- }
-
- if (nextRowResult == null) {
- // end of data, no more rows
- currentRowResult = null;
- currentRow = null;
- return null;
- }
- }
-
- // advance to cached result for next row
- currentRowResult = nextRowResult;
- nextCellInRow = 0;
- currentRow = currentRowResult.getRow();
- nextRowResult = null;
- return currentRow;
- }
-
- /**
- * Returns the next Cell in the current row or null iff none remain.
- */
- public Cell nextCellInRow() {
- if (currentRowResult == null) {
- // nothing left in current row
- return null;
- }
-
- Cell nextCell = currentRowResult.rawCells()[nextCellInRow];
- nextCellInRow++;
- if (nextCellInRow == currentRowResult.size()) {
- if (results.hasNext()) {
- Result result = results.next();
- Cell cell = result.rawCells()[0];
- if (Bytes.equals(currentRow, 0, currentRow.length, cell.getRowArray(),
- cell.getRowOffset(), cell.getRowLength())) {
- // result is part of current row
- currentRowResult = result;
- nextCellInRow = 0;
- } else {
- // result is part of next row, cache it
- nextRowResult = result;
- // current row is complete
- currentRowResult = null;
- }
- } else {
- // end of data
- currentRowResult = null;
- }
- }
- return nextCell;
- }
- }
-
- /**
- * Compare the cells for the given row from the source and target tables.
- * Count and log any differences.
- * If not a dry run, output a Put and/or Delete needed to sync the target table
- * to match the source table.
- */
- private boolean syncRowCells(Context context, byte[] rowKey, CellScanner sourceCells,
- CellScanner targetCells) throws IOException, InterruptedException {
- Put put = null;
- Delete delete = null;
- long matchingCells = 0;
- boolean matchingRow = true;
- Cell sourceCell = sourceCells.nextCellInRow();
- Cell targetCell = targetCells.nextCellInRow();
- while (sourceCell != null || targetCell != null) {
-
- int cellKeyComparison = compareCellKeysWithinRow(sourceCell, targetCell);
- if (cellKeyComparison < 0) {
- if (LOG.isDebugEnabled()) {
- LOG.debug("Target missing cell: " + sourceCell);
- }
- context.getCounter(Counter.TARGETMISSINGCELLS).increment(1);
- matchingRow = false;
-
- if (!dryRun) {
- if (put == null) {
- put = new Put(rowKey);
- }
- put.add(sourceCell);
- }
-
- sourceCell = sourceCells.nextCellInRow();
- } else if (cellKeyComparison > 0) {
- if (LOG.isDebugEnabled()) {
- LOG.debug("Source missing cell: " + targetCell);
- }
- context.getCounter(Counter.SOURCEMISSINGCELLS).increment(1);
- matchingRow = false;
-
- if (!dryRun) {
- if (delete == null) {
- delete = new Delete(rowKey);
- }
- // add a tombstone to exactly match the target cell that is missing on the source
- delete.addColumn(CellUtil.cloneFamily(targetCell),
- CellUtil.cloneQualifier(targetCell), targetCell.getTimestamp());
- }
-
- targetCell = targetCells.nextCellInRow();
- } else {
- // the cell keys are equal, now check values
- if (CellUtil.matchingValue(sourceCell, targetCell)) {
- matchingCells++;
- } else {
- if (LOG.isDebugEnabled()) {
- LOG.debug("Different values: ");
- LOG.debug(" source cell: " + sourceCell
- + " value: " + Bytes.toHex(sourceCell.getValueArray(),
- sourceCell.getValueOffset(), sourceCell.getValueLength()));
- LOG.debug(" target cell: " + targetCell
- + " value: " + Bytes.toHex(targetCell.getValueArray(),
- targetCell.getValueOffset(), targetCell.getValueLength()));
- }
- context.getCounter(Counter.DIFFERENTCELLVALUES).increment(1);
- matchingRow = false;
-
- if (!dryRun) {
- // overwrite target cell
- if (put == null) {
- put = new Put(rowKey);
- }
- put.add(sourceCell);
- }
- }
- sourceCell = sourceCells.nextCellInRow();
- targetCell = targetCells.nextCellInRow();
- }
-
- if (!dryRun && sourceTableHash.scanBatch > 0) {
- if (put != null && put.size() >= sourceTableHash.scanBatch) {
- context.write(new ImmutableBytesWritable(rowKey), put);
- put = null;
- }
- if (delete != null && delete.size() >= sourceTableHash.scanBatch) {
- context.write(new ImmutableBytesWritable(rowKey), delete);
- delete = null;
- }
- }
- }
-
- if (!dryRun) {
- if (put != null) {
- context.write(new ImmutableBytesWritable(rowKey), put);
- }
- if (delete != null) {
- context.write(new ImmutableBytesWritable(rowKey), delete);
- }
- }
-
- if (matchingCells > 0) {
- context.getCounter(Counter.MATCHINGCELLS).increment(matchingCells);
- }
- if (matchingRow) {
- context.getCounter(Counter.MATCHINGROWS).increment(1);
- return true;
- } else {
- context.getCounter(Counter.ROWSWITHDIFFS).increment(1);
- return false;
- }
- }
-
- /**
- * Compare row keys of the given Result objects.
- * Nulls are after non-nulls
- */
- private static int compareRowKeys(byte[] r1, byte[] r2) {
- if (r1 == null) {
- return 1; // source missing row
- } else if (r2 == null) {
- return -1; // target missing row
- } else {
- // Sync on no META tables only. We can directly do what CellComparator is doing inside.
- // Never the call going to MetaCellComparator.
- return Bytes.compareTo(r1, 0, r1.length, r2, 0, r2.length);
- }
- }
-
- /**
- * Compare families, qualifiers, and timestamps of the given Cells.
- * They are assumed to be of the same row.
- * Nulls are after non-nulls.
- */
- private static int compareCellKeysWithinRow(Cell c1, Cell c2) {
- if (c1 == null) {
- return 1; // source missing cell
- }
- if (c2 == null) {
- return -1; // target missing cell
- }
-
- int result = CellComparator.compareFamilies(c1, c2);
- if (result != 0) {
- return result;
- }
-
- result = CellComparator.compareQualifiers(c1, c2);
- if (result != 0) {
- return result;
- }
-
- // note timestamp comparison is inverted - more recent cells first
- return CellComparator.compareTimestamps(c1, c2);
- }
-
- @Override
- protected void cleanup(Context context)
- throws IOException, InterruptedException {
- if (mapperException == null) {
- try {
- finishRemainingHashRanges(context);
- } catch (Throwable t) {
- mapperException = t;
- }
- }
-
- try {
- sourceTable.close();
- targetTable.close();
- sourceConnection.close();
- targetConnection.close();
- } catch (Throwable t) {
- if (mapperException == null) {
- mapperException = t;
- } else {
- LOG.error("Suppressing exception from closing tables", t);
- }
- }
-
- // propagate first exception
- if (mapperException != null) {
- Throwables.propagateIfInstanceOf(mapperException, IOException.class);
- Throwables.propagateIfInstanceOf(mapperException, InterruptedException.class);
- Throwables.propagate(mapperException);
- }
- }
-
- private void finishRemainingHashRanges(Context context) throws IOException,
- InterruptedException {
- TableSplit split = (TableSplit) context.getInputSplit();
- byte[] splitEndRow = split.getEndRow();
- boolean reachedEndOfTable = HashTable.isTableEndRow(splitEndRow);
-
- // if there are more hash batches that begin before the end of this split move to them
- while (nextSourceKey != null
- && (nextSourceKey.compareTo(splitEndRow) < 0 || reachedEndOfTable)) {
- moveToNextBatch(context);
- }
-
- if (targetHasher.isBatchStarted()) {
- // need to complete the final open hash batch
-
- if ((nextSourceKey != null && nextSourceKey.compareTo(splitEndRow) > 0)
- || (nextSourceKey == null && !Bytes.equals(splitEndRow, sourceTableHash.stopRow))) {
- // the open hash range continues past the end of this region
- // add a scan to complete the current hash range
- Scan scan = sourceTableHash.initScan();
- scan.setStartRow(splitEndRow);
- if (nextSourceKey == null) {
- scan.setStopRow(sourceTableHash.stopRow);
- } else {
- scan.setStopRow(nextSourceKey.copyBytes());
- }
-
- ResultScanner targetScanner = null;
- try {
- targetScanner = targetTable.getScanner(scan);
- for (Result row : targetScanner) {
- targetHasher.hashResult(row);
- }
- } finally {
- if (targetScanner != null) {
- targetScanner.close();
- }
- }
- } // else current batch ends exactly at split end row
-
- finishBatchAndCompareHashes(context);
- }
- }
- }
-
- private static final int NUM_ARGS = 3;
- private static void printUsage(final String errorMsg) {
- if (errorMsg != null && errorMsg.length() > 0) {
- System.err.println("ERROR: " + errorMsg);
- System.err.println();
- }
- System.err.println("Usage: SyncTable [options] <sourcehashdir> <sourcetable> <targettable>");
- System.err.println();
- System.err.println("Options:");
-
- System.err.println(" sourcezkcluster ZK cluster key of the source table");
- System.err.println(" (defaults to cluster in classpath's config)");
- System.err.println(" targetzkcluster ZK cluster key of the target table");
- System.err.println(" (defaults to cluster in classpath's config)");
- System.err.println(" dryrun if true, output counters but no writes");
- System.err.println(" (defaults to false)");
- System.err.println();
- System.err.println("Args:");
- System.err.println(" sourcehashdir path to HashTable output dir for source table");
- System.err.println(" (see org.apache.hadoop.hbase.mapreduce.HashTable)");
- System.err.println(" sourcetable Name of the source table to sync from");
- System.err.println(" targettable Name of the target table to sync to");
- System.err.println();
- System.err.println("Examples:");
- System.err.println(" For a dry run SyncTable of tableA from a remote source cluster");
- System.err.println(" to a local target cluster:");
- System.err.println(" $ hbase " +
- "org.apache.hadoop.hbase.mapreduce.SyncTable --dryrun=true"
- + " --sourcezkcluster=zk1.example.com,zk2.example.com,zk3.example.com:2181:/hbase"
- + " hdfs://nn:9000/hashes/tableA tableA tableA");
- }
-
- private boolean doCommandLine(final String[] args) {
- if (args.length < NUM_ARGS) {
- printUsage(null);
- return false;
- }
- try {
- sourceHashDir = new Path(args[args.length - 3]);
- sourceTableName = args[args.length - 2];
- targetTableName = args[args.length - 1];
-
- for (int i = 0; i < args.length - NUM_ARGS; i++) {
- String cmd = args[i];
- if (cmd.equals("-h") || cmd.startsWith("--h")) {
- printUsage(null);
- return false;
- }
-
- final String sourceZkClusterKey = "--sourcezkcluster=";
- if (cmd.startsWith(sourceZkClusterKey)) {
- sourceZkCluster = cmd.substring(sourceZkClusterKey.length());
- continue;
- }
-
- final String targetZkClusterKey = "--targetzkcluster=";
- if (cmd.startsWith(targetZkClusterKey)) {
- targetZkCluster = cmd.substring(targetZkClusterKey.length());
- continue;
- }
-
- final String dryRunKey = "--dryrun=";
- if (cmd.startsWith(dryRunKey)) {
- dryRun = Boolean.parseBoolean(cmd.substring(dryRunKey.length()));
- continue;
- }
-
- printUsage("Invalid argument '" + cmd + "'");
- return false;
- }
-
-
- } catch (Exception e) {
- e.printStackTrace();
- printUsage("Can't start because " + e.getMessage());
- return false;
- }
- return true;
- }
-
- /**
- * Main entry point.
- */
- public static void main(String[] args) throws Exception {
- int ret = ToolRunner.run(new SyncTable(HBaseConfiguration.create()), args);
- System.exit(ret);
- }
-
- @Override
- public int run(String[] args) throws Exception {
- String[] otherArgs = new GenericOptionsParser(getConf(), args).getRemainingArgs();
- if (!doCommandLine(otherArgs)) {
- return 1;
- }
-
- Job job = createSubmittableJob(otherArgs);
- if (!job.waitForCompletion(true)) {
- LOG.info("Map-reduce job failed!");
- return 1;
- }
- counters = job.getCounters();
- return 0;
- }
-
-}
http://git-wip-us.apache.org/repos/asf/hbase/blob/664b6be0/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/TableInputFormat.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/TableInputFormat.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/TableInputFormat.java
deleted file mode 100644
index 63868da..0000000
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/TableInputFormat.java
+++ /dev/null
@@ -1,294 +0,0 @@
-/**
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.hadoop.hbase.mapreduce;
-
-import java.io.IOException;
-import java.util.Collections;
-import java.util.List;
-import java.util.Locale;
-
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
-import org.apache.hadoop.conf.Configurable;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.hbase.KeyValue;
-import org.apache.hadoop.hbase.TableName;
-import org.apache.hadoop.hbase.classification.InterfaceAudience;
-import org.apache.hadoop.hbase.client.Connection;
-import org.apache.hadoop.hbase.client.ConnectionFactory;
-import org.apache.hadoop.hbase.client.RegionLocator;
-import org.apache.hadoop.hbase.client.Scan;
-import org.apache.hadoop.hbase.util.Bytes;
-import org.apache.hadoop.mapreduce.InputSplit;
-import org.apache.hadoop.mapreduce.JobContext;
-import org.apache.hadoop.hbase.util.Pair;
-import org.apache.hadoop.mapreduce.Job;
-import org.apache.hadoop.util.StringUtils;
-
-/**
- * Convert HBase tabular data into a format that is consumable by Map/Reduce.
- */
-@InterfaceAudience.Public
-public class TableInputFormat extends TableInputFormatBase
-implements Configurable {
-
- @SuppressWarnings("hiding")
- private static final Log LOG = LogFactory.getLog(TableInputFormat.class);
-
- /** Job parameter that specifies the input table. */
- public static final String INPUT_TABLE = "hbase.mapreduce.inputtable";
- /**
- * If specified, use start keys of this table to split.
- * This is useful when you are preparing data for bulkload.
- */
- private static final String SPLIT_TABLE = "hbase.mapreduce.splittable";
- /** Base-64 encoded scanner. All other SCAN_ confs are ignored if this is specified.
- * See {@link TableMapReduceUtil#convertScanToString(Scan)} for more details.
- */
- public static final String SCAN = "hbase.mapreduce.scan";
- /** Scan start row */
- public static final String SCAN_ROW_START = "hbase.mapreduce.scan.row.start";
- /** Scan stop row */
- public static final String SCAN_ROW_STOP = "hbase.mapreduce.scan.row.stop";
- /** Column Family to Scan */
- public static final String SCAN_COLUMN_FAMILY = "hbase.mapreduce.scan.column.family";
- /** Space delimited list of columns and column families to scan. */
- public static final String SCAN_COLUMNS = "hbase.mapreduce.scan.columns";
- /** The timestamp used to filter columns with a specific timestamp. */
- public static final String SCAN_TIMESTAMP = "hbase.mapreduce.scan.timestamp";
- /** The starting timestamp used to filter columns with a specific range of versions. */
- public static final String SCAN_TIMERANGE_START = "hbase.mapreduce.scan.timerange.start";
- /** The ending timestamp used to filter columns with a specific range of versions. */
- public static final String SCAN_TIMERANGE_END = "hbase.mapreduce.scan.timerange.end";
- /** The maximum number of version to return. */
- public static final String SCAN_MAXVERSIONS = "hbase.mapreduce.scan.maxversions";
- /** Set to false to disable server-side caching of blocks for this scan. */
- public static final String SCAN_CACHEBLOCKS = "hbase.mapreduce.scan.cacheblocks";
- /** The number of rows for caching that will be passed to scanners. */
- public static final String SCAN_CACHEDROWS = "hbase.mapreduce.scan.cachedrows";
- /** Set the maximum number of values to return for each call to next(). */
- public static final String SCAN_BATCHSIZE = "hbase.mapreduce.scan.batchsize";
- /** Specify if we have to shuffle the map tasks. */
- public static final String SHUFFLE_MAPS = "hbase.mapreduce.inputtable.shufflemaps";
-
- /** The configuration. */
- private Configuration conf = null;
-
- /**
- * Returns the current configuration.
- *
- * @return The current configuration.
- * @see org.apache.hadoop.conf.Configurable#getConf()
- */
- @Override
- public Configuration getConf() {
- return conf;
- }
-
- /**
- * Sets the configuration. This is used to set the details for the table to
- * be scanned.
- *
- * @param configuration The configuration to set.
- * @see org.apache.hadoop.conf.Configurable#setConf(
- * org.apache.hadoop.conf.Configuration)
- */
- @Override
- @edu.umd.cs.findbugs.annotations.SuppressWarnings(value="REC_CATCH_EXCEPTION",
- justification="Intentional")
- public void setConf(Configuration configuration) {
- this.conf = configuration;
-
- Scan scan = null;
-
- if (conf.get(SCAN) != null) {
- try {
- scan = TableMapReduceUtil.convertStringToScan(conf.get(SCAN));
- } catch (IOException e) {
- LOG.error("An error occurred.", e);
- }
- } else {
- try {
- scan = createScanFromConfiguration(conf);
- } catch (Exception e) {
- LOG.error(StringUtils.stringifyException(e));
- }
- }
-
- setScan(scan);
- }
-
- /**
- * Sets up a {@link Scan} instance, applying settings from the configuration property
- * constants defined in {@code TableInputFormat}. This allows specifying things such as:
- * <ul>
- * <li>start and stop rows</li>
- * <li>column qualifiers or families</li>
- * <li>timestamps or timerange</li>
- * <li>scanner caching and batch size</li>
- * </ul>
- */
- public static Scan createScanFromConfiguration(Configuration conf) throws IOException {
- Scan scan = new Scan();
-
- if (conf.get(SCAN_ROW_START) != null) {
- scan.setStartRow(Bytes.toBytesBinary(conf.get(SCAN_ROW_START)));
- }
-
- if (conf.get(SCAN_ROW_STOP) != null) {
- scan.setStopRow(Bytes.toBytesBinary(conf.get(SCAN_ROW_STOP)));
- }
-
- if (conf.get(SCAN_COLUMNS) != null) {
- addColumns(scan, conf.get(SCAN_COLUMNS));
- }
-
- for (String columnFamily : conf.getTrimmedStrings(SCAN_COLUMN_FAMILY)) {
- scan.addFamily(Bytes.toBytes(columnFamily));
- }
-
- if (conf.get(SCAN_TIMESTAMP) != null) {
- scan.setTimeStamp(Long.parseLong(conf.get(SCAN_TIMESTAMP)));
- }
-
- if (conf.get(SCAN_TIMERANGE_START) != null && conf.get(SCAN_TIMERANGE_END) != null) {
- scan.setTimeRange(
- Long.parseLong(conf.get(SCAN_TIMERANGE_START)),
- Long.parseLong(conf.get(SCAN_TIMERANGE_END)));
- }
-
- if (conf.get(SCAN_MAXVERSIONS) != null) {
- scan.setMaxVersions(Integer.parseInt(conf.get(SCAN_MAXVERSIONS)));
- }
-
- if (conf.get(SCAN_CACHEDROWS) != null) {
- scan.setCaching(Integer.parseInt(conf.get(SCAN_CACHEDROWS)));
- }
-
- if (conf.get(SCAN_BATCHSIZE) != null) {
- scan.setBatch(Integer.parseInt(conf.get(SCAN_BATCHSIZE)));
- }
-
- // false by default, full table scans generate too much BC churn
- scan.setCacheBlocks((conf.getBoolean(SCAN_CACHEBLOCKS, false)));
-
- return scan;
- }
-
- @Override
- protected void initialize(JobContext context) throws IOException {
- // Do we have to worry about mis-matches between the Configuration from setConf and the one
- // in this context?
- TableName tableName = TableName.valueOf(conf.get(INPUT_TABLE));
- try {
- initializeTable(ConnectionFactory.createConnection(new Configuration(conf)), tableName);
- } catch (Exception e) {
- LOG.error(StringUtils.stringifyException(e));
- }
- }
-
- /**
- * Parses a combined family and qualifier and adds either both or just the
- * family in case there is no qualifier. This assumes the older colon
- * divided notation, e.g. "family:qualifier".
- *
- * @param scan The Scan to update.
- * @param familyAndQualifier family and qualifier
- * @throws IllegalArgumentException When familyAndQualifier is invalid.
- */
- private static void addColumn(Scan scan, byte[] familyAndQualifier) {
- byte [][] fq = KeyValue.parseColumn(familyAndQualifier);
- if (fq.length == 1) {
- scan.addFamily(fq[0]);
- } else if (fq.length == 2) {
- scan.addColumn(fq[0], fq[1]);
- } else {
- throw new IllegalArgumentException("Invalid familyAndQualifier provided.");
- }
- }
-
- /**
- * Adds an array of columns specified using old format, family:qualifier.
- * <p>
- * Overrides previous calls to {@link Scan#addColumn(byte[], byte[])}for any families in the
- * input.
- *
- * @param scan The Scan to update.
- * @param columns array of columns, formatted as <code>family:qualifier</code>
- * @see Scan#addColumn(byte[], byte[])
- */
- public static void addColumns(Scan scan, byte [][] columns) {
- for (byte[] column : columns) {
- addColumn(scan, column);
- }
- }
-
- /**
- * Calculates the splits that will serve as input for the map tasks. The
- * number of splits matches the number of regions in a table. Splits are shuffled if
- * required.
- * @param context The current job context.
- * @return The list of input splits.
- * @throws IOException When creating the list of splits fails.
- * @see org.apache.hadoop.mapreduce.InputFormat#getSplits(
- * org.apache.hadoop.mapreduce.JobContext)
- */
- @Override
- public List<InputSplit> getSplits(JobContext context) throws IOException {
- List<InputSplit> splits = super.getSplits(context);
- if ((conf.get(SHUFFLE_MAPS) != null) && "true".equals(conf.get(SHUFFLE_MAPS).toLowerCase(Locale.ROOT))) {
- Collections.shuffle(splits);
- }
- return splits;
- }
-
- /**
- * Convenience method to parse a string representation of an array of column specifiers.
- *
- * @param scan The Scan to update.
- * @param columns The columns to parse.
- */
- private static void addColumns(Scan scan, String columns) {
- String[] cols = columns.split(" ");
- for (String col : cols) {
- addColumn(scan, Bytes.toBytes(col));
- }
- }
-
- @Override
- protected Pair<byte[][], byte[][]> getStartEndKeys() throws IOException {
- if (conf.get(SPLIT_TABLE) != null) {
- TableName splitTableName = TableName.valueOf(conf.get(SPLIT_TABLE));
- try (Connection conn = ConnectionFactory.createConnection(getConf())) {
- try (RegionLocator rl = conn.getRegionLocator(splitTableName)) {
- return rl.getStartEndKeys();
- }
- }
- }
-
- return super.getStartEndKeys();
- }
-
- /**
- * Sets split table in map-reduce job.
- */
- public static void configureSplitTable(Job job, TableName tableName) {
- job.getConfiguration().set(SPLIT_TABLE, tableName.getNameAsString());
- }
-}
http://git-wip-us.apache.org/repos/asf/hbase/blob/664b6be0/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/TableInputFormatBase.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/TableInputFormatBase.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/TableInputFormatBase.java
deleted file mode 100644
index ce1928e6..0000000
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/TableInputFormatBase.java
+++ /dev/null
@@ -1,653 +0,0 @@
-/**
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.hadoop.hbase.mapreduce;
-
-import java.io.Closeable;
-import java.io.IOException;
-import java.net.InetAddress;
-import java.net.InetSocketAddress;
-import java.net.UnknownHostException;
-import java.util.ArrayList;
-import java.util.Arrays;
-import java.util.HashMap;
-import java.util.List;
-
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.hbase.classification.InterfaceAudience;
-import org.apache.hadoop.hbase.HConstants;
-import org.apache.hadoop.hbase.HRegionLocation;
-import org.apache.hadoop.hbase.TableName;
-import org.apache.hadoop.hbase.client.Admin;
-import org.apache.hadoop.hbase.client.Connection;
-import org.apache.hadoop.hbase.client.RegionLocator;
-import org.apache.hadoop.hbase.client.Result;
-import org.apache.hadoop.hbase.client.Scan;
-import org.apache.hadoop.hbase.client.Table;
-import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
-import org.apache.hadoop.hbase.util.Addressing;
-import org.apache.hadoop.hbase.util.Bytes;
-import org.apache.hadoop.hbase.util.Pair;
-import org.apache.hadoop.hbase.util.RegionSizeCalculator;
-import org.apache.hadoop.hbase.util.Strings;
-import org.apache.hadoop.mapreduce.InputFormat;
-import org.apache.hadoop.mapreduce.InputSplit;
-import org.apache.hadoop.mapreduce.JobContext;
-import org.apache.hadoop.mapreduce.RecordReader;
-import org.apache.hadoop.mapreduce.TaskAttemptContext;
-import org.apache.hadoop.net.DNS;
-import org.apache.hadoop.util.StringUtils;
-
-/**
- * A base for {@link TableInputFormat}s. Receives a {@link Connection}, a {@link TableName},
- * an {@link Scan} instance that defines the input columns etc. Subclasses may use
- * other TableRecordReader implementations.
- *
- * Subclasses MUST ensure initializeTable(Connection, TableName) is called for an instance to
- * function properly. Each of the entry points to this class used by the MapReduce framework,
- * {@link #createRecordReader(InputSplit, TaskAttemptContext)} and {@link #getSplits(JobContext)},
- * will call {@link #initialize(JobContext)} as a convenient centralized location to handle
- * retrieving the necessary configuration information. If your subclass overrides either of these
- * methods, either call the parent version or call initialize yourself.
- *
- * <p>
- * An example of a subclass:
- * <pre>
- * class ExampleTIF extends TableInputFormatBase {
- *
- * {@literal @}Override
- * protected void initialize(JobContext context) throws IOException {
- * // We are responsible for the lifecycle of this connection until we hand it over in
- * // initializeTable.
- * Connection connection = ConnectionFactory.createConnection(HBaseConfiguration.create(
- * job.getConfiguration()));
- * TableName tableName = TableName.valueOf("exampleTable");
- * // mandatory. once passed here, TableInputFormatBase will handle closing the connection.
- * initializeTable(connection, tableName);
- * byte[][] inputColumns = new byte [][] { Bytes.toBytes("columnA"),
- * Bytes.toBytes("columnB") };
- * // optional, by default we'll get everything for the table.
- * Scan scan = new Scan();
- * for (byte[] family : inputColumns) {
- * scan.addFamily(family);
- * }
- * Filter exampleFilter = new RowFilter(CompareOp.EQUAL, new RegexStringComparator("aa.*"));
- * scan.setFilter(exampleFilter);
- * setScan(scan);
- * }
- * }
- * </pre>
- */
-@InterfaceAudience.Public
-public abstract class TableInputFormatBase
-extends InputFormat<ImmutableBytesWritable, Result> {
-
- /** Specify if we enable auto-balance for input in M/R jobs.*/
- public static final String MAPREDUCE_INPUT_AUTOBALANCE = "hbase.mapreduce.input.autobalance";
- /** Specify if ratio for data skew in M/R jobs, it goes well with the enabling hbase.mapreduce
- * .input.autobalance property.*/
- public static final String INPUT_AUTOBALANCE_MAXSKEWRATIO = "hbase.mapreduce.input.autobalance" +
- ".maxskewratio";
- /** Specify if the row key in table is text (ASCII between 32~126),
- * default is true. False means the table is using binary row key*/
- public static final String TABLE_ROW_TEXTKEY = "hbase.table.row.textkey";
-
- private static final Log LOG = LogFactory.getLog(TableInputFormatBase.class);
-
- private static final String NOT_INITIALIZED = "The input format instance has not been properly " +
- "initialized. Ensure you call initializeTable either in your constructor or initialize " +
- "method";
- private static final String INITIALIZATION_ERROR = "Cannot create a record reader because of a" +
- " previous error. Please look at the previous logs lines from" +
- " the task's full log for more details.";
-
- /** Holds the details for the internal scanner.
- *
- * @see Scan */
- private Scan scan = null;
- /** The {@link Admin}. */
- private Admin admin;
- /** The {@link Table} to scan. */
- private Table table;
- /** The {@link RegionLocator} of the table. */
- private RegionLocator regionLocator;
- /** The reader scanning the table, can be a custom one. */
- private TableRecordReader tableRecordReader = null;
- /** The underlying {@link Connection} of the table. */
- private Connection connection;
-
-
- /** The reverse DNS lookup cache mapping: IPAddress => HostName */
- private HashMap<InetAddress, String> reverseDNSCacheMap = new HashMap<>();
-
- /**
- * Builds a {@link TableRecordReader}. If no {@link TableRecordReader} was provided, uses
- * the default.
- *
- * @param split The split to work with.
- * @param context The current context.
- * @return The newly created record reader.
- * @throws IOException When creating the reader fails.
- * @see org.apache.hadoop.mapreduce.InputFormat#createRecordReader(
- * org.apache.hadoop.mapreduce.InputSplit,
- * org.apache.hadoop.mapreduce.TaskAttemptContext)
- */
- @Override
- public RecordReader<ImmutableBytesWritable, Result> createRecordReader(
- InputSplit split, TaskAttemptContext context)
- throws IOException {
- // Just in case a subclass is relying on JobConfigurable magic.
- if (table == null) {
- initialize(context);
- }
- // null check in case our child overrides getTable to not throw.
- try {
- if (getTable() == null) {
- // initialize() must not have been implemented in the subclass.
- throw new IOException(INITIALIZATION_ERROR);
- }
- } catch (IllegalStateException exception) {
- throw new IOException(INITIALIZATION_ERROR, exception);
- }
- TableSplit tSplit = (TableSplit) split;
- LOG.info("Input split length: " + StringUtils.humanReadableInt(tSplit.getLength()) + " bytes.");
- final TableRecordReader trr =
- this.tableRecordReader != null ? this.tableRecordReader : new TableRecordReader();
- Scan sc = new Scan(this.scan);
- sc.setStartRow(tSplit.getStartRow());
- sc.setStopRow(tSplit.getEndRow());
- trr.setScan(sc);
- trr.setTable(getTable());
- return new RecordReader<ImmutableBytesWritable, Result>() {
-
- @Override
- public void close() throws IOException {
- trr.close();
- closeTable();
- }
-
- @Override
- public ImmutableBytesWritable getCurrentKey() throws IOException, InterruptedException {
- return trr.getCurrentKey();
- }
-
- @Override
- public Result getCurrentValue() throws IOException, InterruptedException {
- return trr.getCurrentValue();
- }
-
- @Override
- public float getProgress() throws IOException, InterruptedException {
- return trr.getProgress();
- }
-
- @Override
- public void initialize(InputSplit inputsplit, TaskAttemptContext context) throws IOException,
- InterruptedException {
- trr.initialize(inputsplit, context);
- }
-
- @Override
- public boolean nextKeyValue() throws IOException, InterruptedException {
- return trr.nextKeyValue();
- }
- };
- }
-
- protected Pair<byte[][],byte[][]> getStartEndKeys() throws IOException {
- return getRegionLocator().getStartEndKeys();
- }
-
- /**
- * Calculates the splits that will serve as input for the map tasks. The
- * number of splits matches the number of regions in a table.
- *
- * @param context The current job context.
- * @return The list of input splits.
- * @throws IOException When creating the list of splits fails.
- * @see org.apache.hadoop.mapreduce.InputFormat#getSplits(
- * org.apache.hadoop.mapreduce.JobContext)
- */
- @Override
- public List<InputSplit> getSplits(JobContext context) throws IOException {
- boolean closeOnFinish = false;
-
- // Just in case a subclass is relying on JobConfigurable magic.
- if (table == null) {
- initialize(context);
- closeOnFinish = true;
- }
-
- // null check in case our child overrides getTable to not throw.
- try {
- if (getTable() == null) {
- // initialize() must not have been implemented in the subclass.
- throw new IOException(INITIALIZATION_ERROR);
- }
- } catch (IllegalStateException exception) {
- throw new IOException(INITIALIZATION_ERROR, exception);
- }
-
- try {
- RegionSizeCalculator sizeCalculator =
- new RegionSizeCalculator(getRegionLocator(), getAdmin());
-
- TableName tableName = getTable().getName();
-
- Pair<byte[][], byte[][]> keys = getStartEndKeys();
- if (keys == null || keys.getFirst() == null ||
- keys.getFirst().length == 0) {
- HRegionLocation regLoc =
- getRegionLocator().getRegionLocation(HConstants.EMPTY_BYTE_ARRAY, false);
- if (null == regLoc) {
- throw new IOException("Expecting at least one region.");
- }
- List<InputSplit> splits = new ArrayList<>(1);
- long regionSize = sizeCalculator.getRegionSize(regLoc.getRegionInfo().getRegionName());
- TableSplit split = new TableSplit(tableName, scan,
- HConstants.EMPTY_BYTE_ARRAY, HConstants.EMPTY_BYTE_ARRAY, regLoc
- .getHostnamePort().split(Addressing.HOSTNAME_PORT_SEPARATOR)[0], regionSize);
- splits.add(split);
- return splits;
- }
- List<InputSplit> splits = new ArrayList<>(keys.getFirst().length);
- for (int i = 0; i < keys.getFirst().length; i++) {
- if (!includeRegionInSplit(keys.getFirst()[i], keys.getSecond()[i])) {
- continue;
- }
-
- byte[] startRow = scan.getStartRow();
- byte[] stopRow = scan.getStopRow();
- // determine if the given start an stop key fall into the region
- if ((startRow.length == 0 || keys.getSecond()[i].length == 0 ||
- Bytes.compareTo(startRow, keys.getSecond()[i]) < 0) &&
- (stopRow.length == 0 ||
- Bytes.compareTo(stopRow, keys.getFirst()[i]) > 0)) {
- byte[] splitStart = startRow.length == 0 ||
- Bytes.compareTo(keys.getFirst()[i], startRow) >= 0 ?
- keys.getFirst()[i] : startRow;
- byte[] splitStop = (stopRow.length == 0 ||
- Bytes.compareTo(keys.getSecond()[i], stopRow) <= 0) &&
- keys.getSecond()[i].length > 0 ?
- keys.getSecond()[i] : stopRow;
-
- HRegionLocation location = getRegionLocator().getRegionLocation(keys.getFirst()[i], false);
- // The below InetSocketAddress creation does a name resolution.
- InetSocketAddress isa = new InetSocketAddress(location.getHostname(), location.getPort());
- if (isa.isUnresolved()) {
- LOG.warn("Failed resolve " + isa);
- }
- InetAddress regionAddress = isa.getAddress();
- String regionLocation;
- regionLocation = reverseDNS(regionAddress);
-
- byte[] regionName = location.getRegionInfo().getRegionName();
- String encodedRegionName = location.getRegionInfo().getEncodedName();
- long regionSize = sizeCalculator.getRegionSize(regionName);
- TableSplit split = new TableSplit(tableName, scan,
- splitStart, splitStop, regionLocation, encodedRegionName, regionSize);
- splits.add(split);
- if (LOG.isDebugEnabled()) {
- LOG.debug("getSplits: split -> " + i + " -> " + split);
- }
- }
- }
- //The default value of "hbase.mapreduce.input.autobalance" is false, which means not enabled.
- boolean enableAutoBalance = context.getConfiguration()
- .getBoolean(MAPREDUCE_INPUT_AUTOBALANCE, false);
- if (enableAutoBalance) {
- long totalRegionSize=0;
- for (int i = 0; i < splits.size(); i++){
- TableSplit ts = (TableSplit)splits.get(i);
- totalRegionSize += ts.getLength();
- }
- long averageRegionSize = totalRegionSize / splits.size();
- // the averageRegionSize must be positive.
- if (averageRegionSize <= 0) {
- LOG.warn("The averageRegionSize is not positive: "+ averageRegionSize + ", " +
- "set it to 1.");
- averageRegionSize = 1;
- }
- return calculateRebalancedSplits(splits, context, averageRegionSize);
- } else {
- return splits;
- }
- } finally {
- if (closeOnFinish) {
- closeTable();
- }
- }
- }
-
- String reverseDNS(InetAddress ipAddress) throws UnknownHostException {
- String hostName = this.reverseDNSCacheMap.get(ipAddress);
- if (hostName == null) {
- String ipAddressString = null;
- try {
- ipAddressString = DNS.reverseDns(ipAddress, null);
- } catch (Exception e) {
- // We can use InetAddress in case the jndi failed to pull up the reverse DNS entry from the
- // name service. Also, in case of ipv6, we need to use the InetAddress since resolving
- // reverse DNS using jndi doesn't work well with ipv6 addresses.
- ipAddressString = InetAddress.getByName(ipAddress.getHostAddress()).getHostName();
- }
- if (ipAddressString == null) throw new UnknownHostException("No host found for " + ipAddress);
- hostName = Strings.domainNamePointerToHostName(ipAddressString);
- this.reverseDNSCacheMap.put(ipAddress, hostName);
- }
- return hostName;
- }
-
- /**
- * Calculates the number of MapReduce input splits for the map tasks. The number of
- * MapReduce input splits depends on the average region size and the "data skew ratio" user set in
- * configuration.
- *
- * @param list The list of input splits before balance.
- * @param context The current job context.
- * @param average The average size of all regions .
- * @return The list of input splits.
- * @throws IOException When creating the list of splits fails.
- * @see org.apache.hadoop.mapreduce.InputFormat#getSplits(
- * org.apache.hadoop.mapreduce.JobContext)
- */
- private List<InputSplit> calculateRebalancedSplits(List<InputSplit> list, JobContext context,
- long average) throws IOException {
- List<InputSplit> resultList = new ArrayList<>();
- Configuration conf = context.getConfiguration();
- //The default data skew ratio is 3
- long dataSkewRatio = conf.getLong(INPUT_AUTOBALANCE_MAXSKEWRATIO, 3);
- //It determines which mode to use: text key mode or binary key mode. The default is text mode.
- boolean isTextKey = context.getConfiguration().getBoolean(TABLE_ROW_TEXTKEY, true);
- long dataSkewThreshold = dataSkewRatio * average;
- int count = 0;
- while (count < list.size()) {
- TableSplit ts = (TableSplit)list.get(count);
- TableName tableName = ts.getTable();
- String regionLocation = ts.getRegionLocation();
- String encodedRegionName = ts.getEncodedRegionName();
- long regionSize = ts.getLength();
- if (regionSize >= dataSkewThreshold) {
- // if the current region size is large than the data skew threshold,
- // split the region into two MapReduce input splits.
- byte[] splitKey = getSplitKey(ts.getStartRow(), ts.getEndRow(), isTextKey);
- if (Arrays.equals(ts.getEndRow(), splitKey)) {
- // Not splitting since the end key is the same as the split key
- resultList.add(ts);
- } else {
- //Set the size of child TableSplit as 1/2 of the region size. The exact size of the
- // MapReduce input splits is not far off.
- TableSplit t1 = new TableSplit(tableName, scan, ts.getStartRow(), splitKey,
- regionLocation, regionSize / 2);
- TableSplit t2 = new TableSplit(tableName, scan, splitKey, ts.getEndRow(), regionLocation,
- regionSize - regionSize / 2);
- resultList.add(t1);
- resultList.add(t2);
- }
- count++;
- } else if (regionSize >= average) {
- // if the region size between average size and data skew threshold size,
- // make this region as one MapReduce input split.
- resultList.add(ts);
- count++;
- } else {
- // if the total size of several small continuous regions less than the average region size,
- // combine them into one MapReduce input split.
- long totalSize = regionSize;
- byte[] splitStartKey = ts.getStartRow();
- byte[] splitEndKey = ts.getEndRow();
- count++;
- for (; count < list.size(); count++) {
- TableSplit nextRegion = (TableSplit)list.get(count);
- long nextRegionSize = nextRegion.getLength();
- if (totalSize + nextRegionSize <= dataSkewThreshold) {
- totalSize = totalSize + nextRegionSize;
- splitEndKey = nextRegion.getEndRow();
- } else {
- break;
- }
- }
- TableSplit t = new TableSplit(tableName, scan, splitStartKey, splitEndKey,
- regionLocation, encodedRegionName, totalSize);
- resultList.add(t);
- }
- }
- return resultList;
- }
-
- /**
- * select a split point in the region. The selection of the split point is based on an uniform
- * distribution assumption for the keys in a region.
- * Here are some examples:
- *
- * <table>
- * <tr>
- * <th>start key</th>
- * <th>end key</th>
- * <th>is text</th>
- * <th>split point</th>
- * </tr>
- * <tr>
- * <td>'a', 'a', 'a', 'b', 'c', 'd', 'e', 'f', 'g'</td>
- * <td>'a', 'a', 'a', 'f', 'f', 'f'</td>
- * <td>true</td>
- * <td>'a', 'a', 'a', 'd', 'd', -78, 50, -77, 51</td>
- * </tr>
- * <tr>
- * <td>'1', '1', '1', '0', '0', '0'</td>
- * <td>'1', '1', '2', '5', '7', '9', '0'</td>
- * <td>true</td>
- * <td>'1', '1', '1', -78, -77, -76, -104</td>
- * </tr>
- * <tr>
- * <td>'1', '1', '1', '0'</td>
- * <td>'1', '1', '2', '0'</td>
- * <td>true</td>
- * <td>'1', '1', '1', -80</td>
- * </tr>
- * <tr>
- * <td>13, -19, 126, 127</td>
- * <td>13, -19, 127, 0</td>
- * <td>false</td>
- * <td>13, -19, 126, -65</td>
- * </tr>
- * </table>
- *
- * Set this function as "public static", make it easier for test.
- *
- * @param start Start key of the region
- * @param end End key of the region
- * @param isText It determines to use text key mode or binary key mode
- * @return The split point in the region.
- */
- @InterfaceAudience.Private
- public static byte[] getSplitKey(byte[] start, byte[] end, boolean isText) {
- byte upperLimitByte;
- byte lowerLimitByte;
- //Use text mode or binary mode.
- if (isText) {
- //The range of text char set in ASCII is [32,126], the lower limit is space and the upper
- // limit is '~'.
- upperLimitByte = '~';
- lowerLimitByte = ' ';
- } else {
- upperLimitByte = -1;
- lowerLimitByte = 0;
- }
- // For special case
- // Example 1 : startkey=null, endkey="hhhqqqwww", splitKey="h"
- // Example 2 (text key mode): startKey="ffffaaa", endKey=null, splitkey="f~~~~~~"
- if (start.length == 0 && end.length == 0){
- return new byte[]{(byte) ((lowerLimitByte + upperLimitByte) / 2)};
- }
- if (start.length == 0 && end.length != 0){
- return new byte[]{ end[0] };
- }
- if (start.length != 0 && end.length == 0){
- byte[] result =new byte[start.length];
- result[0]=start[0];
- for (int k = 1; k < start.length; k++){
- result[k] = upperLimitByte;
- }
- return result;
- }
- return Bytes.split(start, end, false, 1)[1];
- }
-
- /**
- * Test if the given region is to be included in the InputSplit while splitting
- * the regions of a table.
- * <p>
- * This optimization is effective when there is a specific reasoning to exclude an entire region from the M-R job,
- * (and hence, not contributing to the InputSplit), given the start and end keys of the same. <br>
- * Useful when we need to remember the last-processed top record and revisit the [last, current) interval for M-R processing,
- * continuously. In addition to reducing InputSplits, reduces the load on the region server as well, due to the ordering of the keys.
- * <br>
- * <br>
- * Note: It is possible that <code>endKey.length() == 0 </code> , for the last (recent) region.
- * <br>
- * Override this method, if you want to bulk exclude regions altogether from M-R. By default, no region is excluded( i.e. all regions are included).
- *
- *
- * @param startKey Start key of the region
- * @param endKey End key of the region
- * @return true, if this region needs to be included as part of the input (default).
- *
- */
- protected boolean includeRegionInSplit(final byte[] startKey, final byte [] endKey) {
- return true;
- }
-
- /**
- * Allows subclasses to get the {@link RegionLocator}.
- */
- protected RegionLocator getRegionLocator() {
- if (regionLocator == null) {
- throw new IllegalStateException(NOT_INITIALIZED);
- }
- return regionLocator;
- }
-
- /**
- * Allows subclasses to get the {@link Table}.
- */
- protected Table getTable() {
- if (table == null) {
- throw new IllegalStateException(NOT_INITIALIZED);
- }
- return table;
- }
-
- /**
- * Allows subclasses to get the {@link Admin}.
- */
- protected Admin getAdmin() {
- if (admin == null) {
- throw new IllegalStateException(NOT_INITIALIZED);
- }
- return admin;
- }
-
- /**
- * Allows subclasses to initialize the table information.
- *
- * @param connection The Connection to the HBase cluster. MUST be unmanaged. We will close.
- * @param tableName The {@link TableName} of the table to process.
- * @throws IOException
- */
- protected void initializeTable(Connection connection, TableName tableName) throws IOException {
- if (this.table != null || this.connection != null) {
- LOG.warn("initializeTable called multiple times. Overwriting connection and table " +
- "reference; TableInputFormatBase will not close these old references when done.");
- }
- this.table = connection.getTable(tableName);
- this.regionLocator = connection.getRegionLocator(tableName);
- this.admin = connection.getAdmin();
- this.connection = connection;
- }
-
- /**
- * Gets the scan defining the actual details like columns etc.
- *
- * @return The internal scan instance.
- */
- public Scan getScan() {
- if (this.scan == null) this.scan = new Scan();
- return scan;
- }
-
- /**
- * Sets the scan defining the actual details like columns etc.
- *
- * @param scan The scan to set.
- */
- public void setScan(Scan scan) {
- this.scan = scan;
- }
-
- /**
- * Allows subclasses to set the {@link TableRecordReader}.
- *
- * @param tableRecordReader A different {@link TableRecordReader}
- * implementation.
- */
- protected void setTableRecordReader(TableRecordReader tableRecordReader) {
- this.tableRecordReader = tableRecordReader;
- }
-
- /**
- * Handle subclass specific set up.
- * Each of the entry points used by the MapReduce framework,
- * {@link #createRecordReader(InputSplit, TaskAttemptContext)} and {@link #getSplits(JobContext)},
- * will call {@link #initialize(JobContext)} as a convenient centralized location to handle
- * retrieving the necessary configuration information and calling
- * {@link #initializeTable(Connection, TableName)}.
- *
- * Subclasses should implement their initialize call such that it is safe to call multiple times.
- * The current TableInputFormatBase implementation relies on a non-null table reference to decide
- * if an initialize call is needed, but this behavior may change in the future. In particular,
- * it is critical that initializeTable not be called multiple times since this will leak
- * Connection instances.
- *
- */
- protected void initialize(JobContext context) throws IOException {
- }
-
- /**
- * Close the Table and related objects that were initialized via
- * {@link #initializeTable(Connection, TableName)}.
- *
- * @throws IOException
- */
- protected void closeTable() throws IOException {
- close(admin, table, regionLocator, connection);
- admin = null;
- table = null;
- regionLocator = null;
- connection = null;
- }
-
- private void close(Closeable... closables) throws IOException {
- for (Closeable c : closables) {
- if(c != null) { c.close(); }
- }
- }
-
-}