You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hbase.apache.org by ap...@apache.org on 2017/08/26 01:39:38 UTC
[38/41] hbase git commit: HBASE-18640 Move mapreduce out of
hbase-server into separate module.
http://git-wip-us.apache.org/repos/asf/hbase/blob/664b6be0/hbase-mapreduce/src/main/java/org/apache/hadoop/hbase/mapreduce/HRegionPartitioner.java
----------------------------------------------------------------------
diff --git a/hbase-mapreduce/src/main/java/org/apache/hadoop/hbase/mapreduce/HRegionPartitioner.java b/hbase-mapreduce/src/main/java/org/apache/hadoop/hbase/mapreduce/HRegionPartitioner.java
new file mode 100644
index 0000000..3c3060b
--- /dev/null
+++ b/hbase-mapreduce/src/main/java/org/apache/hadoop/hbase/mapreduce/HRegionPartitioner.java
@@ -0,0 +1,140 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.mapreduce;
+
+import java.io.IOException;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.hbase.classification.InterfaceAudience;
+import org.apache.hadoop.conf.Configurable;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.HBaseConfiguration;
+import org.apache.hadoop.hbase.TableName;
+import org.apache.hadoop.hbase.client.Connection;
+import org.apache.hadoop.hbase.client.ConnectionFactory;
+import org.apache.hadoop.hbase.client.RegionLocator;
+import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
+import org.apache.hadoop.hbase.mapred.TableOutputFormat;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.hadoop.mapreduce.Partitioner;
+
+/**
+ * This is used to partition the output keys into groups of keys.
+ * Keys are grouped according to the regions that currently exist
+ * so that each reducer fills a single region so load is distributed.
+ *
+ * <p>This class is not suitable as partitioner creating hfiles
+ * for incremental bulk loads as region spread will likely change between time of
+ * hfile creation and load time. See {@link LoadIncrementalHFiles}
+ * and <a href="http://hbase.apache.org/book.html#arch.bulk.load">Bulk Load</a>.</p>
+ *
+ * @param <KEY> The type of the key.
+ * @param <VALUE> The type of the value.
+ */
+@InterfaceAudience.Public
+public class HRegionPartitioner<KEY, VALUE>
+extends Partitioner<ImmutableBytesWritable, VALUE>
+implements Configurable {
+
+ private static final Log LOG = LogFactory.getLog(HRegionPartitioner.class);
+ private Configuration conf = null;
+ // Connection and locator are not cleaned up; they just die when partitioner is done.
+ private Connection connection;
+ private RegionLocator locator;
+ private byte[][] startKeys;
+
+ /**
+ * Gets the partition number for a given key (hence record) given the total
+ * number of partitions i.e. number of reduce-tasks for the job.
+ *
+ * <p>Typically a hash function on a all or a subset of the key.</p>
+ *
+ * @param key The key to be partitioned.
+ * @param value The entry value.
+ * @param numPartitions The total number of partitions.
+ * @return The partition number for the <code>key</code>.
+ * @see org.apache.hadoop.mapreduce.Partitioner#getPartition(
+ * java.lang.Object, java.lang.Object, int)
+ */
+ @Override
+ public int getPartition(ImmutableBytesWritable key,
+ VALUE value, int numPartitions) {
+ byte[] region = null;
+ // Only one region return 0
+ if (this.startKeys.length == 1){
+ return 0;
+ }
+ try {
+ // Not sure if this is cached after a split so we could have problems
+ // here if a region splits while mapping
+ region = this.locator.getRegionLocation(key.get()).getRegionInfo().getStartKey();
+ } catch (IOException e) {
+ LOG.error(e);
+ }
+ for (int i = 0; i < this.startKeys.length; i++){
+ if (Bytes.compareTo(region, this.startKeys[i]) == 0 ){
+ if (i >= numPartitions-1){
+ // cover if we have less reduces then regions.
+ return (Integer.toString(i).hashCode()
+ & Integer.MAX_VALUE) % numPartitions;
+ }
+ return i;
+ }
+ }
+ // if above fails to find start key that match we need to return something
+ return 0;
+ }
+
+ /**
+ * Returns the current configuration.
+ *
+ * @return The current configuration.
+ * @see org.apache.hadoop.conf.Configurable#getConf()
+ */
+ @Override
+ public Configuration getConf() {
+ return conf;
+ }
+
+ /**
+ * Sets the configuration. This is used to determine the start keys for the
+ * given table.
+ *
+ * @param configuration The configuration to set.
+ * @see org.apache.hadoop.conf.Configurable#setConf(
+ * org.apache.hadoop.conf.Configuration)
+ */
+ @Override
+ public void setConf(Configuration configuration) {
+ this.conf = HBaseConfiguration.create(configuration);
+ try {
+ this.connection = ConnectionFactory.createConnection(HBaseConfiguration.create(conf));
+ TableName tableName = TableName.valueOf(conf.get(TableOutputFormat.OUTPUT_TABLE));
+ this.locator = this.connection.getRegionLocator(tableName);
+ } catch (IOException e) {
+ LOG.error(e);
+ }
+ try {
+ this.startKeys = this.locator.getStartKeys();
+ } catch (IOException e) {
+ LOG.error(e);
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/hbase/blob/664b6be0/hbase-mapreduce/src/main/java/org/apache/hadoop/hbase/mapreduce/HashTable.java
----------------------------------------------------------------------
diff --git a/hbase-mapreduce/src/main/java/org/apache/hadoop/hbase/mapreduce/HashTable.java b/hbase-mapreduce/src/main/java/org/apache/hadoop/hbase/mapreduce/HashTable.java
new file mode 100644
index 0000000..2c8caf5
--- /dev/null
+++ b/hbase-mapreduce/src/main/java/org/apache/hadoop/hbase/mapreduce/HashTable.java
@@ -0,0 +1,747 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.mapreduce;
+
+import java.io.IOException;
+import java.io.InputStreamReader;
+import java.io.OutputStreamWriter;
+import java.security.MessageDigest;
+import java.security.NoSuchAlgorithmException;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+import java.util.Properties;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.conf.Configured;
+import org.apache.hadoop.fs.FSDataInputStream;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hbase.Cell;
+import org.apache.hadoop.hbase.HBaseConfiguration;
+import org.apache.hadoop.hbase.HConstants;
+import org.apache.hadoop.hbase.TableName;
+import org.apache.hadoop.hbase.client.Connection;
+import org.apache.hadoop.hbase.client.ConnectionFactory;
+import org.apache.hadoop.hbase.client.Result;
+import org.apache.hadoop.hbase.client.Scan;
+import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.hadoop.hbase.util.Pair;
+import org.apache.hadoop.io.MapFile;
+import org.apache.hadoop.io.NullWritable;
+import org.apache.hadoop.io.SequenceFile;
+import org.apache.hadoop.mapreduce.Job;
+import org.apache.hadoop.mapreduce.Reducer;
+import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
+import org.apache.hadoop.mapreduce.lib.output.MapFileOutputFormat;
+import org.apache.hadoop.mapreduce.lib.partition.TotalOrderPartitioner;
+import org.apache.hadoop.util.GenericOptionsParser;
+import org.apache.hadoop.util.Tool;
+import org.apache.hadoop.util.ToolRunner;
+
+import org.apache.hadoop.hbase.shaded.com.google.common.base.Charsets;
+import org.apache.hadoop.hbase.shaded.com.google.common.base.Throwables;
+import org.apache.hadoop.hbase.shaded.com.google.common.collect.Ordering;
+
+public class HashTable extends Configured implements Tool {
+
+ private static final Log LOG = LogFactory.getLog(HashTable.class);
+
+ private static final int DEFAULT_BATCH_SIZE = 8000;
+
+ private final static String HASH_BATCH_SIZE_CONF_KEY = "hash.batch.size";
+ final static String PARTITIONS_FILE_NAME = "partitions";
+ final static String MANIFEST_FILE_NAME = "manifest";
+ final static String HASH_DATA_DIR = "hashes";
+ final static String OUTPUT_DATA_FILE_PREFIX = "part-r-";
+ private final static String TMP_MANIFEST_FILE_NAME = "manifest.tmp";
+
+ TableHash tableHash = new TableHash();
+ Path destPath;
+
+ public HashTable(Configuration conf) {
+ super(conf);
+ }
+
+ public static class TableHash {
+
+ Path hashDir;
+
+ String tableName;
+ String families = null;
+ long batchSize = DEFAULT_BATCH_SIZE;
+ int numHashFiles = 0;
+ byte[] startRow = HConstants.EMPTY_START_ROW;
+ byte[] stopRow = HConstants.EMPTY_END_ROW;
+ int scanBatch = 0;
+ int versions = -1;
+ long startTime = 0;
+ long endTime = 0;
+
+ List<ImmutableBytesWritable> partitions;
+
+ public static TableHash read(Configuration conf, Path hashDir) throws IOException {
+ TableHash tableHash = new TableHash();
+ FileSystem fs = hashDir.getFileSystem(conf);
+ tableHash.hashDir = hashDir;
+ tableHash.readPropertiesFile(fs, new Path(hashDir, MANIFEST_FILE_NAME));
+ tableHash.readPartitionFile(fs, conf, new Path(hashDir, PARTITIONS_FILE_NAME));
+ return tableHash;
+ }
+
+ void writePropertiesFile(FileSystem fs, Path path) throws IOException {
+ Properties p = new Properties();
+ p.setProperty("table", tableName);
+ if (families != null) {
+ p.setProperty("columnFamilies", families);
+ }
+ p.setProperty("targetBatchSize", Long.toString(batchSize));
+ p.setProperty("numHashFiles", Integer.toString(numHashFiles));
+ if (!isTableStartRow(startRow)) {
+ p.setProperty("startRowHex", Bytes.toHex(startRow));
+ }
+ if (!isTableEndRow(stopRow)) {
+ p.setProperty("stopRowHex", Bytes.toHex(stopRow));
+ }
+ if (scanBatch > 0) {
+ p.setProperty("scanBatch", Integer.toString(scanBatch));
+ }
+ if (versions >= 0) {
+ p.setProperty("versions", Integer.toString(versions));
+ }
+ if (startTime != 0) {
+ p.setProperty("startTimestamp", Long.toString(startTime));
+ }
+ if (endTime != 0) {
+ p.setProperty("endTimestamp", Long.toString(endTime));
+ }
+
+ try (OutputStreamWriter osw = new OutputStreamWriter(fs.create(path), Charsets.UTF_8)) {
+ p.store(osw, null);
+ }
+ }
+
+ void readPropertiesFile(FileSystem fs, Path path) throws IOException {
+ Properties p = new Properties();
+ try (FSDataInputStream in = fs.open(path)) {
+ try (InputStreamReader isr = new InputStreamReader(in, Charsets.UTF_8)) {
+ p.load(isr);
+ }
+ }
+ tableName = p.getProperty("table");
+ families = p.getProperty("columnFamilies");
+ batchSize = Long.parseLong(p.getProperty("targetBatchSize"));
+ numHashFiles = Integer.parseInt(p.getProperty("numHashFiles"));
+
+ String startRowHex = p.getProperty("startRowHex");
+ if (startRowHex != null) {
+ startRow = Bytes.fromHex(startRowHex);
+ }
+ String stopRowHex = p.getProperty("stopRowHex");
+ if (stopRowHex != null) {
+ stopRow = Bytes.fromHex(stopRowHex);
+ }
+
+ String scanBatchString = p.getProperty("scanBatch");
+ if (scanBatchString != null) {
+ scanBatch = Integer.parseInt(scanBatchString);
+ }
+
+ String versionString = p.getProperty("versions");
+ if (versionString != null) {
+ versions = Integer.parseInt(versionString);
+ }
+
+ String startTimeString = p.getProperty("startTimestamp");
+ if (startTimeString != null) {
+ startTime = Long.parseLong(startTimeString);
+ }
+
+ String endTimeString = p.getProperty("endTimestamp");
+ if (endTimeString != null) {
+ endTime = Long.parseLong(endTimeString);
+ }
+ }
+
+ Scan initScan() throws IOException {
+ Scan scan = new Scan();
+ scan.setCacheBlocks(false);
+ if (startTime != 0 || endTime != 0) {
+ scan.setTimeRange(startTime, endTime == 0 ? HConstants.LATEST_TIMESTAMP : endTime);
+ }
+ if (scanBatch > 0) {
+ scan.setBatch(scanBatch);
+ }
+ if (versions >= 0) {
+ scan.setMaxVersions(versions);
+ }
+ if (!isTableStartRow(startRow)) {
+ scan.setStartRow(startRow);
+ }
+ if (!isTableEndRow(stopRow)) {
+ scan.setStopRow(stopRow);
+ }
+ if(families != null) {
+ for(String fam : families.split(",")) {
+ scan.addFamily(Bytes.toBytes(fam));
+ }
+ }
+ return scan;
+ }
+
+ /**
+ * Choose partitions between row ranges to hash to a single output file
+ * Selects region boundaries that fall within the scan range, and groups them
+ * into the desired number of partitions.
+ */
+ void selectPartitions(Pair<byte[][], byte[][]> regionStartEndKeys) {
+ List<byte[]> startKeys = new ArrayList<>();
+ for (int i = 0; i < regionStartEndKeys.getFirst().length; i++) {
+ byte[] regionStartKey = regionStartEndKeys.getFirst()[i];
+ byte[] regionEndKey = regionStartEndKeys.getSecond()[i];
+
+ // if scan begins after this region, or starts before this region, then drop this region
+ // in other words:
+ // IF (scan begins before the end of this region
+ // AND scan ends before the start of this region)
+ // THEN include this region
+ if ((isTableStartRow(startRow) || isTableEndRow(regionEndKey)
+ || Bytes.compareTo(startRow, regionEndKey) < 0)
+ && (isTableEndRow(stopRow) || isTableStartRow(regionStartKey)
+ || Bytes.compareTo(stopRow, regionStartKey) > 0)) {
+ startKeys.add(regionStartKey);
+ }
+ }
+
+ int numRegions = startKeys.size();
+ if (numHashFiles == 0) {
+ numHashFiles = numRegions / 100;
+ }
+ if (numHashFiles == 0) {
+ numHashFiles = 1;
+ }
+ if (numHashFiles > numRegions) {
+ // can't partition within regions
+ numHashFiles = numRegions;
+ }
+
+ // choose a subset of start keys to group regions into ranges
+ partitions = new ArrayList<>(numHashFiles - 1);
+ // skip the first start key as it is not a partition between ranges.
+ for (long i = 1; i < numHashFiles; i++) {
+ int splitIndex = (int) (numRegions * i / numHashFiles);
+ partitions.add(new ImmutableBytesWritable(startKeys.get(splitIndex)));
+ }
+ }
+
+ void writePartitionFile(Configuration conf, Path path) throws IOException {
+ FileSystem fs = path.getFileSystem(conf);
+ @SuppressWarnings("deprecation")
+ SequenceFile.Writer writer = SequenceFile.createWriter(
+ fs, conf, path, ImmutableBytesWritable.class, NullWritable.class);
+
+ for (int i = 0; i < partitions.size(); i++) {
+ writer.append(partitions.get(i), NullWritable.get());
+ }
+ writer.close();
+ }
+
+ private void readPartitionFile(FileSystem fs, Configuration conf, Path path)
+ throws IOException {
+ @SuppressWarnings("deprecation")
+ SequenceFile.Reader reader = new SequenceFile.Reader(fs, path, conf);
+ ImmutableBytesWritable key = new ImmutableBytesWritable();
+ partitions = new ArrayList<>();
+ while (reader.next(key)) {
+ partitions.add(new ImmutableBytesWritable(key.copyBytes()));
+ }
+ reader.close();
+
+ if (!Ordering.natural().isOrdered(partitions)) {
+ throw new IOException("Partitions are not ordered!");
+ }
+ }
+
+ @Override
+ public String toString() {
+ StringBuilder sb = new StringBuilder();
+ sb.append("tableName=").append(tableName);
+ if (families != null) {
+ sb.append(", families=").append(families);
+ }
+ sb.append(", batchSize=").append(batchSize);
+ sb.append(", numHashFiles=").append(numHashFiles);
+ if (!isTableStartRow(startRow)) {
+ sb.append(", startRowHex=").append(Bytes.toHex(startRow));
+ }
+ if (!isTableEndRow(stopRow)) {
+ sb.append(", stopRowHex=").append(Bytes.toHex(stopRow));
+ }
+ if (scanBatch >= 0) {
+ sb.append(", scanBatch=").append(scanBatch);
+ }
+ if (versions >= 0) {
+ sb.append(", versions=").append(versions);
+ }
+ if (startTime != 0) {
+ sb.append("startTime=").append(startTime);
+ }
+ if (endTime != 0) {
+ sb.append("endTime=").append(endTime);
+ }
+ return sb.toString();
+ }
+
+ static String getDataFileName(int hashFileIndex) {
+ return String.format(HashTable.OUTPUT_DATA_FILE_PREFIX + "%05d", hashFileIndex);
+ }
+
+ /**
+ * Open a TableHash.Reader starting at the first hash at or after the given key.
+ * @throws IOException
+ */
+ public Reader newReader(Configuration conf, ImmutableBytesWritable startKey)
+ throws IOException {
+ return new Reader(conf, startKey);
+ }
+
+ public class Reader implements java.io.Closeable {
+ private final Configuration conf;
+
+ private int hashFileIndex;
+ private MapFile.Reader mapFileReader;
+
+ private boolean cachedNext;
+ private ImmutableBytesWritable key;
+ private ImmutableBytesWritable hash;
+
+ Reader(Configuration conf, ImmutableBytesWritable startKey) throws IOException {
+ this.conf = conf;
+ int partitionIndex = Collections.binarySearch(partitions, startKey);
+ if (partitionIndex >= 0) {
+ // if the key is equal to a partition, then go the file after that partition
+ hashFileIndex = partitionIndex+1;
+ } else {
+ // if the key is between partitions, then go to the file between those partitions
+ hashFileIndex = -1-partitionIndex;
+ }
+ openHashFile();
+
+ // MapFile's don't make it easy to seek() so that the subsequent next() returns
+ // the desired key/value pair. So we cache it for the first call of next().
+ hash = new ImmutableBytesWritable();
+ key = (ImmutableBytesWritable) mapFileReader.getClosest(startKey, hash);
+ if (key == null) {
+ cachedNext = false;
+ hash = null;
+ } else {
+ cachedNext = true;
+ }
+ }
+
+ /**
+ * Read the next key/hash pair.
+ * Returns true if such a pair exists and false when at the end of the data.
+ */
+ public boolean next() throws IOException {
+ if (cachedNext) {
+ cachedNext = false;
+ return true;
+ }
+ key = new ImmutableBytesWritable();
+ hash = new ImmutableBytesWritable();
+ while (true) {
+ boolean hasNext = mapFileReader.next(key, hash);
+ if (hasNext) {
+ return true;
+ }
+ hashFileIndex++;
+ if (hashFileIndex < TableHash.this.numHashFiles) {
+ mapFileReader.close();
+ openHashFile();
+ } else {
+ key = null;
+ hash = null;
+ return false;
+ }
+ }
+ }
+
+ /**
+ * Get the current key
+ * @return the current key or null if there is no current key
+ */
+ public ImmutableBytesWritable getCurrentKey() {
+ return key;
+ }
+
+ /**
+ * Get the current hash
+ * @return the current hash or null if there is no current hash
+ */
+ public ImmutableBytesWritable getCurrentHash() {
+ return hash;
+ }
+
+ private void openHashFile() throws IOException {
+ if (mapFileReader != null) {
+ mapFileReader.close();
+ }
+ Path dataDir = new Path(TableHash.this.hashDir, HASH_DATA_DIR);
+ Path dataFile = new Path(dataDir, getDataFileName(hashFileIndex));
+ mapFileReader = new MapFile.Reader(dataFile, conf);
+ }
+
+ @Override
+ public void close() throws IOException {
+ mapFileReader.close();
+ }
+ }
+ }
+
+ static boolean isTableStartRow(byte[] row) {
+ return Bytes.equals(HConstants.EMPTY_START_ROW, row);
+ }
+
+ static boolean isTableEndRow(byte[] row) {
+ return Bytes.equals(HConstants.EMPTY_END_ROW, row);
+ }
+
+ public Job createSubmittableJob(String[] args) throws IOException {
+ Path partitionsPath = new Path(destPath, PARTITIONS_FILE_NAME);
+ generatePartitions(partitionsPath);
+
+ Job job = Job.getInstance(getConf(),
+ getConf().get("mapreduce.job.name", "hashTable_" + tableHash.tableName));
+ Configuration jobConf = job.getConfiguration();
+ jobConf.setLong(HASH_BATCH_SIZE_CONF_KEY, tableHash.batchSize);
+ job.setJarByClass(HashTable.class);
+
+ TableMapReduceUtil.initTableMapperJob(tableHash.tableName, tableHash.initScan(),
+ HashMapper.class, ImmutableBytesWritable.class, ImmutableBytesWritable.class, job);
+
+ // use a TotalOrderPartitioner and reducers to group region output into hash files
+ job.setPartitionerClass(TotalOrderPartitioner.class);
+ TotalOrderPartitioner.setPartitionFile(jobConf, partitionsPath);
+ job.setReducerClass(Reducer.class); // identity reducer
+ job.setNumReduceTasks(tableHash.numHashFiles);
+ job.setOutputKeyClass(ImmutableBytesWritable.class);
+ job.setOutputValueClass(ImmutableBytesWritable.class);
+ job.setOutputFormatClass(MapFileOutputFormat.class);
+ FileOutputFormat.setOutputPath(job, new Path(destPath, HASH_DATA_DIR));
+
+ return job;
+ }
+
+ private void generatePartitions(Path partitionsPath) throws IOException {
+ Connection connection = ConnectionFactory.createConnection(getConf());
+ Pair<byte[][], byte[][]> regionKeys
+ = connection.getRegionLocator(TableName.valueOf(tableHash.tableName)).getStartEndKeys();
+ connection.close();
+
+ tableHash.selectPartitions(regionKeys);
+ LOG.info("Writing " + tableHash.partitions.size() + " partition keys to " + partitionsPath);
+
+ tableHash.writePartitionFile(getConf(), partitionsPath);
+ }
+
+ static class ResultHasher {
+ private MessageDigest digest;
+
+ private boolean batchStarted = false;
+ private ImmutableBytesWritable batchStartKey;
+ private ImmutableBytesWritable batchHash;
+ private long batchSize = 0;
+
+
+ public ResultHasher() {
+ try {
+ digest = MessageDigest.getInstance("MD5");
+ } catch (NoSuchAlgorithmException e) {
+ Throwables.propagate(e);
+ }
+ }
+
+ public void startBatch(ImmutableBytesWritable row) {
+ if (batchStarted) {
+ throw new RuntimeException("Cannot start new batch without finishing existing one.");
+ }
+ batchStarted = true;
+ batchSize = 0;
+ batchStartKey = row;
+ batchHash = null;
+ }
+
+ public void hashResult(Result result) {
+ if (!batchStarted) {
+ throw new RuntimeException("Cannot add to batch that has not been started.");
+ }
+ for (Cell cell : result.rawCells()) {
+ int rowLength = cell.getRowLength();
+ int familyLength = cell.getFamilyLength();
+ int qualifierLength = cell.getQualifierLength();
+ int valueLength = cell.getValueLength();
+ digest.update(cell.getRowArray(), cell.getRowOffset(), rowLength);
+ digest.update(cell.getFamilyArray(), cell.getFamilyOffset(), familyLength);
+ digest.update(cell.getQualifierArray(), cell.getQualifierOffset(), qualifierLength);
+ long ts = cell.getTimestamp();
+ for (int i = 8; i > 0; i--) {
+ digest.update((byte) ts);
+ ts >>>= 8;
+ }
+ digest.update(cell.getValueArray(), cell.getValueOffset(), valueLength);
+
+ batchSize += rowLength + familyLength + qualifierLength + 8 + valueLength;
+ }
+ }
+
+ public void finishBatch() {
+ if (!batchStarted) {
+ throw new RuntimeException("Cannot finish batch that has not started.");
+ }
+ batchStarted = false;
+ batchHash = new ImmutableBytesWritable(digest.digest());
+ }
+
+ public boolean isBatchStarted() {
+ return batchStarted;
+ }
+
+ public ImmutableBytesWritable getBatchStartKey() {
+ return batchStartKey;
+ }
+
+ public ImmutableBytesWritable getBatchHash() {
+ return batchHash;
+ }
+
+ public long getBatchSize() {
+ return batchSize;
+ }
+ }
+
+ public static class HashMapper
+ extends TableMapper<ImmutableBytesWritable, ImmutableBytesWritable> {
+
+ private ResultHasher hasher;
+ private long targetBatchSize;
+
+ private ImmutableBytesWritable currentRow;
+
+ @Override
+ protected void setup(Context context) throws IOException, InterruptedException {
+ targetBatchSize = context.getConfiguration()
+ .getLong(HASH_BATCH_SIZE_CONF_KEY, DEFAULT_BATCH_SIZE);
+ hasher = new ResultHasher();
+
+ TableSplit split = (TableSplit) context.getInputSplit();
+ hasher.startBatch(new ImmutableBytesWritable(split.getStartRow()));
+ }
+
+ @Override
+ protected void map(ImmutableBytesWritable key, Result value, Context context)
+ throws IOException, InterruptedException {
+
+ if (currentRow == null || !currentRow.equals(key)) {
+ currentRow = new ImmutableBytesWritable(key); // not immutable
+
+ if (hasher.getBatchSize() >= targetBatchSize) {
+ hasher.finishBatch();
+ context.write(hasher.getBatchStartKey(), hasher.getBatchHash());
+ hasher.startBatch(currentRow);
+ }
+ }
+
+ hasher.hashResult(value);
+ }
+
+ @Override
+ protected void cleanup(Context context) throws IOException, InterruptedException {
+ hasher.finishBatch();
+ context.write(hasher.getBatchStartKey(), hasher.getBatchHash());
+ }
+ }
+
+ private void writeTempManifestFile() throws IOException {
+ Path tempManifestPath = new Path(destPath, TMP_MANIFEST_FILE_NAME);
+ FileSystem fs = tempManifestPath.getFileSystem(getConf());
+ tableHash.writePropertiesFile(fs, tempManifestPath);
+ }
+
+ private void completeManifest() throws IOException {
+ Path tempManifestPath = new Path(destPath, TMP_MANIFEST_FILE_NAME);
+ Path manifestPath = new Path(destPath, MANIFEST_FILE_NAME);
+ FileSystem fs = tempManifestPath.getFileSystem(getConf());
+ fs.rename(tempManifestPath, manifestPath);
+ }
+
+ private static final int NUM_ARGS = 2;
+ private static void printUsage(final String errorMsg) {
+ if (errorMsg != null && errorMsg.length() > 0) {
+ System.err.println("ERROR: " + errorMsg);
+ System.err.println();
+ }
+ System.err.println("Usage: HashTable [options] <tablename> <outputpath>");
+ System.err.println();
+ System.err.println("Options:");
+ System.err.println(" batchsize the target amount of bytes to hash in each batch");
+ System.err.println(" rows are added to the batch until this size is reached");
+ System.err.println(" (defaults to " + DEFAULT_BATCH_SIZE + " bytes)");
+ System.err.println(" numhashfiles the number of hash files to create");
+ System.err.println(" if set to fewer than number of regions then");
+ System.err.println(" the job will create this number of reducers");
+ System.err.println(" (defaults to 1/100 of regions -- at least 1)");
+ System.err.println(" startrow the start row");
+ System.err.println(" stoprow the stop row");
+ System.err.println(" starttime beginning of the time range (unixtime in millis)");
+ System.err.println(" without endtime means from starttime to forever");
+ System.err.println(" endtime end of the time range. Ignored if no starttime specified.");
+ System.err.println(" scanbatch scanner batch size to support intra row scans");
+ System.err.println(" versions number of cell versions to include");
+ System.err.println(" families comma-separated list of families to include");
+ System.err.println();
+ System.err.println("Args:");
+ System.err.println(" tablename Name of the table to hash");
+ System.err.println(" outputpath Filesystem path to put the output data");
+ System.err.println();
+ System.err.println("Examples:");
+ System.err.println(" To hash 'TestTable' in 32kB batches for a 1 hour window into 50 files:");
+ System.err.println(" $ hbase " +
+ "org.apache.hadoop.hbase.mapreduce.HashTable --batchsize=32000 --numhashfiles=50"
+ + " --starttime=1265875194289 --endtime=1265878794289 --families=cf2,cf3"
+ + " TestTable /hashes/testTable");
+ }
+
+ private boolean doCommandLine(final String[] args) {
+ if (args.length < NUM_ARGS) {
+ printUsage(null);
+ return false;
+ }
+ try {
+
+ tableHash.tableName = args[args.length-2];
+ destPath = new Path(args[args.length-1]);
+
+ for (int i = 0; i < args.length - NUM_ARGS; i++) {
+ String cmd = args[i];
+ if (cmd.equals("-h") || cmd.startsWith("--h")) {
+ printUsage(null);
+ return false;
+ }
+
+ final String batchSizeArgKey = "--batchsize=";
+ if (cmd.startsWith(batchSizeArgKey)) {
+ tableHash.batchSize = Long.parseLong(cmd.substring(batchSizeArgKey.length()));
+ continue;
+ }
+
+ final String numHashFilesArgKey = "--numhashfiles=";
+ if (cmd.startsWith(numHashFilesArgKey)) {
+ tableHash.numHashFiles = Integer.parseInt(cmd.substring(numHashFilesArgKey.length()));
+ continue;
+ }
+
+ final String startRowArgKey = "--startrow=";
+ if (cmd.startsWith(startRowArgKey)) {
+ tableHash.startRow = Bytes.fromHex(cmd.substring(startRowArgKey.length()));
+ continue;
+ }
+
+ final String stopRowArgKey = "--stoprow=";
+ if (cmd.startsWith(stopRowArgKey)) {
+ tableHash.stopRow = Bytes.fromHex(cmd.substring(stopRowArgKey.length()));
+ continue;
+ }
+
+ final String startTimeArgKey = "--starttime=";
+ if (cmd.startsWith(startTimeArgKey)) {
+ tableHash.startTime = Long.parseLong(cmd.substring(startTimeArgKey.length()));
+ continue;
+ }
+
+ final String endTimeArgKey = "--endtime=";
+ if (cmd.startsWith(endTimeArgKey)) {
+ tableHash.endTime = Long.parseLong(cmd.substring(endTimeArgKey.length()));
+ continue;
+ }
+
+ final String scanBatchArgKey = "--scanbatch=";
+ if (cmd.startsWith(scanBatchArgKey)) {
+ tableHash.scanBatch = Integer.parseInt(cmd.substring(scanBatchArgKey.length()));
+ continue;
+ }
+
+ final String versionsArgKey = "--versions=";
+ if (cmd.startsWith(versionsArgKey)) {
+ tableHash.versions = Integer.parseInt(cmd.substring(versionsArgKey.length()));
+ continue;
+ }
+
+ final String familiesArgKey = "--families=";
+ if (cmd.startsWith(familiesArgKey)) {
+ tableHash.families = cmd.substring(familiesArgKey.length());
+ continue;
+ }
+
+ printUsage("Invalid argument '" + cmd + "'");
+ return false;
+ }
+ if ((tableHash.startTime != 0 || tableHash.endTime != 0)
+ && (tableHash.startTime >= tableHash.endTime)) {
+ printUsage("Invalid time range filter: starttime="
+ + tableHash.startTime + " >= endtime=" + tableHash.endTime);
+ return false;
+ }
+
+ } catch (Exception e) {
+ e.printStackTrace();
+ printUsage("Can't start because " + e.getMessage());
+ return false;
+ }
+ return true;
+ }
+
+ /**
+ * Main entry point.
+ */
+ public static void main(String[] args) throws Exception {
+ int ret = ToolRunner.run(new HashTable(HBaseConfiguration.create()), args);
+ System.exit(ret);
+ }
+
+ @Override
+ public int run(String[] args) throws Exception {
+ String[] otherArgs = new GenericOptionsParser(getConf(), args).getRemainingArgs();
+ if (!doCommandLine(otherArgs)) {
+ return 1;
+ }
+
+ Job job = createSubmittableJob(otherArgs);
+ writeTempManifestFile();
+ if (!job.waitForCompletion(true)) {
+ LOG.info("Map-reduce job failed!");
+ return 1;
+ }
+ completeManifest();
+ return 0;
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/hbase/blob/664b6be0/hbase-mapreduce/src/main/java/org/apache/hadoop/hbase/mapreduce/IdentityTableMapper.java
----------------------------------------------------------------------
diff --git a/hbase-mapreduce/src/main/java/org/apache/hadoop/hbase/mapreduce/IdentityTableMapper.java b/hbase-mapreduce/src/main/java/org/apache/hadoop/hbase/mapreduce/IdentityTableMapper.java
new file mode 100644
index 0000000..7103ef8
--- /dev/null
+++ b/hbase-mapreduce/src/main/java/org/apache/hadoop/hbase/mapreduce/IdentityTableMapper.java
@@ -0,0 +1,67 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.mapreduce;
+
+import java.io.IOException;
+
+import org.apache.hadoop.hbase.classification.InterfaceAudience;
+import org.apache.hadoop.hbase.client.Result;
+import org.apache.hadoop.hbase.client.Scan;
+import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
+import org.apache.hadoop.mapreduce.Job;
+
+/**
+ * Pass the given key and record as-is to the reduce phase.
+ */
+@InterfaceAudience.Public
+public class IdentityTableMapper
+extends TableMapper<ImmutableBytesWritable, Result> {
+
+ /**
+ * Use this before submitting a TableMap job. It will appropriately set up
+ * the job.
+ *
+ * @param table The table name.
+ * @param scan The scan with the columns to scan.
+ * @param mapper The mapper class.
+ * @param job The job configuration.
+ * @throws IOException When setting up the job fails.
+ */
+ @SuppressWarnings("rawtypes")
+ public static void initJob(String table, Scan scan,
+ Class<? extends TableMapper> mapper, Job job) throws IOException {
+ TableMapReduceUtil.initTableMapperJob(table, scan, mapper,
+ ImmutableBytesWritable.class, Result.class, job);
+ }
+
+ /**
+ * Pass the key, value to reduce.
+ *
+ * @param key The current key.
+ * @param value The current value.
+ * @param context The current context.
+ * @throws IOException When writing the record fails.
+ * @throws InterruptedException When the job is aborted.
+ */
+ public void map(ImmutableBytesWritable key, Result value, Context context)
+ throws IOException, InterruptedException {
+ context.write(key, value);
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/hbase/blob/664b6be0/hbase-mapreduce/src/main/java/org/apache/hadoop/hbase/mapreduce/IdentityTableReducer.java
----------------------------------------------------------------------
diff --git a/hbase-mapreduce/src/main/java/org/apache/hadoop/hbase/mapreduce/IdentityTableReducer.java b/hbase-mapreduce/src/main/java/org/apache/hadoop/hbase/mapreduce/IdentityTableReducer.java
new file mode 100644
index 0000000..73475db
--- /dev/null
+++ b/hbase-mapreduce/src/main/java/org/apache/hadoop/hbase/mapreduce/IdentityTableReducer.java
@@ -0,0 +1,79 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.mapreduce;
+
+import java.io.IOException;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.hbase.classification.InterfaceAudience;
+import org.apache.hadoop.hbase.client.Mutation;
+import org.apache.hadoop.io.Writable;
+
+/**
+ * Convenience class that simply writes all values (which must be
+ * {@link org.apache.hadoop.hbase.client.Put Put} or
+ * {@link org.apache.hadoop.hbase.client.Delete Delete} instances)
+ * passed to it out to the configured HBase table. This works in combination
+ * with {@link TableOutputFormat} which actually does the writing to HBase.<p>
+ *
+ * Keys are passed along but ignored in TableOutputFormat. However, they can
+ * be used to control how your values will be divided up amongst the specified
+ * number of reducers. <p>
+ *
+ * You can also use the {@link TableMapReduceUtil} class to set up the two
+ * classes in one step:
+ * <blockquote><code>
+ * TableMapReduceUtil.initTableReducerJob("table", IdentityTableReducer.class, job);
+ * </code></blockquote>
+ * This will also set the proper {@link TableOutputFormat} which is given the
+ * <code>table</code> parameter. The
+ * {@link org.apache.hadoop.hbase.client.Put Put} or
+ * {@link org.apache.hadoop.hbase.client.Delete Delete} define the
+ * row and columns implicitly.
+ */
+@InterfaceAudience.Public
+public class IdentityTableReducer
+extends TableReducer<Writable, Mutation, Writable> {
+
+ @SuppressWarnings("unused")
+ private static final Log LOG = LogFactory.getLog(IdentityTableReducer.class);
+
+ /**
+ * Writes each given record, consisting of the row key and the given values,
+ * to the configured {@link org.apache.hadoop.mapreduce.OutputFormat}.
+ * It is emitting the row key and each {@link org.apache.hadoop.hbase.client.Put Put}
+ * or {@link org.apache.hadoop.hbase.client.Delete Delete} as separate pairs.
+ *
+ * @param key The current row key.
+ * @param values The {@link org.apache.hadoop.hbase.client.Put Put} or
+ * {@link org.apache.hadoop.hbase.client.Delete Delete} list for the given
+ * row.
+ * @param context The context of the reduce.
+ * @throws IOException When writing the record fails.
+ * @throws InterruptedException When the job gets interrupted.
+ */
+ @Override
+ public void reduce(Writable key, Iterable<Mutation> values, Context context)
+ throws IOException, InterruptedException {
+ for(Mutation putOrDelete : values) {
+ context.write(key, putOrDelete);
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/hbase/blob/664b6be0/hbase-mapreduce/src/main/java/org/apache/hadoop/hbase/mapreduce/Import.java
----------------------------------------------------------------------
diff --git a/hbase-mapreduce/src/main/java/org/apache/hadoop/hbase/mapreduce/Import.java b/hbase-mapreduce/src/main/java/org/apache/hadoop/hbase/mapreduce/Import.java
new file mode 100644
index 0000000..18dcf35
--- /dev/null
+++ b/hbase-mapreduce/src/main/java/org/apache/hadoop/hbase/mapreduce/Import.java
@@ -0,0 +1,780 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.mapreduce;
+
+import java.io.ByteArrayInputStream;
+import java.io.DataInput;
+import java.io.DataInputStream;
+import java.io.DataOutput;
+import java.io.IOException;
+import java.lang.reflect.InvocationTargetException;
+import java.lang.reflect.Method;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+import java.util.Locale;
+import java.util.Map;
+import java.util.TreeMap;
+import java.util.UUID;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.conf.Configured;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hbase.Cell;
+import org.apache.hadoop.hbase.CellComparator;
+import org.apache.hadoop.hbase.CellUtil;
+import org.apache.hadoop.hbase.HBaseConfiguration;
+import org.apache.hadoop.hbase.KeyValue;
+import org.apache.hadoop.hbase.KeyValueUtil;
+import org.apache.hadoop.hbase.TableName;
+import org.apache.hadoop.hbase.ZooKeeperConnectionException;
+import org.apache.hadoop.hbase.classification.InterfaceAudience;
+import org.apache.hadoop.hbase.client.Admin;
+import org.apache.hadoop.hbase.client.Connection;
+import org.apache.hadoop.hbase.client.ConnectionFactory;
+import org.apache.hadoop.hbase.client.Delete;
+import org.apache.hadoop.hbase.client.Durability;
+import org.apache.hadoop.hbase.client.Mutation;
+import org.apache.hadoop.hbase.client.Put;
+import org.apache.hadoop.hbase.client.RegionLocator;
+import org.apache.hadoop.hbase.client.Result;
+import org.apache.hadoop.hbase.client.Table;
+import org.apache.hadoop.hbase.filter.Filter;
+import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.hadoop.hbase.zookeeper.ZKClusterId;
+import org.apache.hadoop.hbase.zookeeper.ZooKeeperWatcher;
+import org.apache.hadoop.io.RawComparator;
+import org.apache.hadoop.io.WritableComparable;
+import org.apache.hadoop.io.WritableComparator;
+import org.apache.hadoop.mapreduce.Job;
+import org.apache.hadoop.mapreduce.Partitioner;
+import org.apache.hadoop.mapreduce.Reducer;
+import org.apache.hadoop.mapreduce.TaskCounter;
+import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
+import org.apache.hadoop.mapreduce.lib.input.SequenceFileInputFormat;
+import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
+import org.apache.hadoop.mapreduce.lib.partition.TotalOrderPartitioner;
+import org.apache.hadoop.util.Tool;
+import org.apache.hadoop.util.ToolRunner;
+import org.apache.zookeeper.KeeperException;
+
+
+/**
+ * Import data written by {@link Export}.
+ */
+@InterfaceAudience.Public
+public class Import extends Configured implements Tool {
+ private static final Log LOG = LogFactory.getLog(Import.class);
+ final static String NAME = "import";
+ public final static String CF_RENAME_PROP = "HBASE_IMPORTER_RENAME_CFS";
+ public final static String BULK_OUTPUT_CONF_KEY = "import.bulk.output";
+ public final static String FILTER_CLASS_CONF_KEY = "import.filter.class";
+ public final static String FILTER_ARGS_CONF_KEY = "import.filter.args";
+ public final static String TABLE_NAME = "import.table.name";
+ public final static String WAL_DURABILITY = "import.wal.durability";
+ public final static String HAS_LARGE_RESULT= "import.bulk.hasLargeResult";
+
+ private final static String JOB_NAME_CONF_KEY = "mapreduce.job.name";
+
+ public static class KeyValueWritableComparablePartitioner
+ extends Partitioner<KeyValueWritableComparable, KeyValue> {
+ private static KeyValueWritableComparable[] START_KEYS = null;
+ @Override
+ public int getPartition(KeyValueWritableComparable key, KeyValue value,
+ int numPartitions) {
+ for (int i = 0; i < START_KEYS.length; ++i) {
+ if (key.compareTo(START_KEYS[i]) <= 0) {
+ return i;
+ }
+ }
+ return START_KEYS.length;
+ }
+
+ }
+
+ public static class KeyValueWritableComparable
+ implements WritableComparable<KeyValueWritableComparable> {
+
+ private KeyValue kv = null;
+
+ static {
+ // register this comparator
+ WritableComparator.define(KeyValueWritableComparable.class,
+ new KeyValueWritableComparator());
+ }
+
+ public KeyValueWritableComparable() {
+ }
+
+ public KeyValueWritableComparable(KeyValue kv) {
+ this.kv = kv;
+ }
+
+ @Override
+ public void write(DataOutput out) throws IOException {
+ KeyValue.write(kv, out);
+ }
+
+ @Override
+ public void readFields(DataInput in) throws IOException {
+ kv = KeyValue.create(in);
+ }
+
+ @Override
+ @edu.umd.cs.findbugs.annotations.SuppressWarnings(value="EQ_COMPARETO_USE_OBJECT_EQUALS",
+ justification="This is wrong, yes, but we should be purging Writables, not fixing them")
+ public int compareTo(KeyValueWritableComparable o) {
+ return CellComparator.COMPARATOR.compare(this.kv, ((KeyValueWritableComparable)o).kv);
+ }
+
+ public static class KeyValueWritableComparator extends WritableComparator {
+
+ @Override
+ public int compare(byte[] b1, int s1, int l1, byte[] b2, int s2, int l2) {
+ try {
+ KeyValueWritableComparable kv1 = new KeyValueWritableComparable();
+ kv1.readFields(new DataInputStream(new ByteArrayInputStream(b1, s1, l1)));
+ KeyValueWritableComparable kv2 = new KeyValueWritableComparable();
+ kv2.readFields(new DataInputStream(new ByteArrayInputStream(b2, s2, l2)));
+ return compare(kv1, kv2);
+ } catch (IOException e) {
+ throw new RuntimeException(e);
+ }
+ }
+
+ }
+
+ }
+
+ public static class KeyValueReducer
+ extends
+ Reducer<KeyValueWritableComparable, KeyValue, ImmutableBytesWritable, KeyValue> {
+ protected void reduce(
+ KeyValueWritableComparable row,
+ Iterable<KeyValue> kvs,
+ Reducer<KeyValueWritableComparable,
+ KeyValue, ImmutableBytesWritable, KeyValue>.Context context)
+ throws java.io.IOException, InterruptedException {
+ int index = 0;
+ for (KeyValue kv : kvs) {
+ context.write(new ImmutableBytesWritable(kv.getRowArray()), kv);
+ if (++index % 100 == 0)
+ context.setStatus("Wrote " + index + " KeyValues, "
+ + "and the rowkey whose is being wrote is " + Bytes.toString(kv.getRowArray()));
+ }
+ }
+ }
+
+ public static class KeyValueSortImporter
+ extends TableMapper<KeyValueWritableComparable, KeyValue> {
+ private Map<byte[], byte[]> cfRenameMap;
+ private Filter filter;
+ private static final Log LOG = LogFactory.getLog(KeyValueImporter.class);
+
+ /**
+ * @param row The current table row key.
+ * @param value The columns.
+ * @param context The current context.
+ * @throws IOException When something is broken with the data.
+ */
+ @Override
+ public void map(ImmutableBytesWritable row, Result value,
+ Context context)
+ throws IOException {
+ try {
+ if (LOG.isTraceEnabled()) {
+ LOG.trace("Considering the row."
+ + Bytes.toString(row.get(), row.getOffset(), row.getLength()));
+ }
+ if (filter == null
+ || !filter.filterRowKey(CellUtil.createFirstOnRow(row.get(), row.getOffset(),
+ (short) row.getLength()))) {
+ for (Cell kv : value.rawCells()) {
+ kv = filterKv(filter, kv);
+ // skip if we filtered it out
+ if (kv == null) continue;
+ // TODO get rid of ensureKeyValue
+ KeyValue ret = KeyValueUtil.ensureKeyValue(convertKv(kv, cfRenameMap));
+ context.write(new KeyValueWritableComparable(ret.createKeyOnly(false)), ret);
+ }
+ }
+ } catch (InterruptedException e) {
+ e.printStackTrace();
+ }
+ }
+
+ @Override
+ public void setup(Context context) throws IOException {
+ cfRenameMap = createCfRenameMap(context.getConfiguration());
+ filter = instantiateFilter(context.getConfiguration());
+ int reduceNum = context.getNumReduceTasks();
+ Configuration conf = context.getConfiguration();
+ TableName tableName = TableName.valueOf(context.getConfiguration().get(TABLE_NAME));
+ try (Connection conn = ConnectionFactory.createConnection(conf);
+ RegionLocator regionLocator = conn.getRegionLocator(tableName)) {
+ byte[][] startKeys = regionLocator.getStartKeys();
+ if (startKeys.length != reduceNum) {
+ throw new IOException("Region split after job initialization");
+ }
+ KeyValueWritableComparable[] startKeyWraps =
+ new KeyValueWritableComparable[startKeys.length - 1];
+ for (int i = 1; i < startKeys.length; ++i) {
+ startKeyWraps[i - 1] =
+ new KeyValueWritableComparable(KeyValueUtil.createFirstOnRow(startKeys[i]));
+ }
+ KeyValueWritableComparablePartitioner.START_KEYS = startKeyWraps;
+ }
+ }
+ }
+
+ /**
+ * A mapper that just writes out KeyValues.
+ */
+ @edu.umd.cs.findbugs.annotations.SuppressWarnings(value="EQ_COMPARETO_USE_OBJECT_EQUALS",
+ justification="Writables are going away and this has been this way forever")
+ public static class KeyValueImporter extends TableMapper<ImmutableBytesWritable, KeyValue> {
+ private Map<byte[], byte[]> cfRenameMap;
+ private Filter filter;
+ private static final Log LOG = LogFactory.getLog(KeyValueImporter.class);
+
+ /**
+ * @param row The current table row key.
+ * @param value The columns.
+ * @param context The current context.
+ * @throws IOException When something is broken with the data.
+ */
+ @Override
+ public void map(ImmutableBytesWritable row, Result value,
+ Context context)
+ throws IOException {
+ try {
+ if (LOG.isTraceEnabled()) {
+ LOG.trace("Considering the row."
+ + Bytes.toString(row.get(), row.getOffset(), row.getLength()));
+ }
+ if (filter == null
+ || !filter.filterRowKey(CellUtil.createFirstOnRow(row.get(), row.getOffset(),
+ (short) row.getLength()))) {
+ for (Cell kv : value.rawCells()) {
+ kv = filterKv(filter, kv);
+ // skip if we filtered it out
+ if (kv == null) continue;
+ // TODO get rid of ensureKeyValue
+ context.write(row, KeyValueUtil.ensureKeyValue(convertKv(kv, cfRenameMap)));
+ }
+ }
+ } catch (InterruptedException e) {
+ e.printStackTrace();
+ }
+ }
+
+ @Override
+ public void setup(Context context) {
+ cfRenameMap = createCfRenameMap(context.getConfiguration());
+ filter = instantiateFilter(context.getConfiguration());
+ }
+ }
+
+ /**
+ * Write table content out to files in hdfs.
+ */
+ public static class Importer extends TableMapper<ImmutableBytesWritable, Mutation> {
+ private Map<byte[], byte[]> cfRenameMap;
+ private List<UUID> clusterIds;
+ private Filter filter;
+ private Durability durability;
+
+ /**
+ * @param row The current table row key.
+ * @param value The columns.
+ * @param context The current context.
+ * @throws IOException When something is broken with the data.
+ */
+ @Override
+ public void map(ImmutableBytesWritable row, Result value,
+ Context context)
+ throws IOException {
+ try {
+ writeResult(row, value, context);
+ } catch (InterruptedException e) {
+ e.printStackTrace();
+ }
+ }
+
+ private void writeResult(ImmutableBytesWritable key, Result result, Context context)
+ throws IOException, InterruptedException {
+ Put put = null;
+ Delete delete = null;
+ if (LOG.isTraceEnabled()) {
+ LOG.trace("Considering the row."
+ + Bytes.toString(key.get(), key.getOffset(), key.getLength()));
+ }
+ if (filter == null
+ || !filter.filterRowKey(CellUtil.createFirstOnRow(key.get(), key.getOffset(),
+ (short) key.getLength()))) {
+ processKV(key, result, context, put, delete);
+ }
+ }
+
+ protected void processKV(ImmutableBytesWritable key, Result result, Context context, Put put,
+ Delete delete) throws IOException, InterruptedException {
+ for (Cell kv : result.rawCells()) {
+ kv = filterKv(filter, kv);
+ // skip if we filter it out
+ if (kv == null) continue;
+
+ kv = convertKv(kv, cfRenameMap);
+ // Deletes and Puts are gathered and written when finished
+ /*
+ * If there are sequence of mutations and tombstones in an Export, and after Import the same
+ * sequence should be restored as it is. If we combine all Delete tombstones into single
+ * request then there is chance of ignoring few DeleteFamily tombstones, because if we
+ * submit multiple DeleteFamily tombstones in single Delete request then we are maintaining
+ * only newest in hbase table and ignoring other. Check - HBASE-12065
+ */
+ if (CellUtil.isDeleteFamily(kv)) {
+ Delete deleteFamily = new Delete(key.get());
+ deleteFamily.add(kv);
+ if (durability != null) {
+ deleteFamily.setDurability(durability);
+ }
+ deleteFamily.setClusterIds(clusterIds);
+ context.write(key, deleteFamily);
+ } else if (CellUtil.isDelete(kv)) {
+ if (delete == null) {
+ delete = new Delete(key.get());
+ }
+ delete.add(kv);
+ } else {
+ if (put == null) {
+ put = new Put(key.get());
+ }
+ addPutToKv(put, kv);
+ }
+ }
+ if (put != null) {
+ if (durability != null) {
+ put.setDurability(durability);
+ }
+ put.setClusterIds(clusterIds);
+ context.write(key, put);
+ }
+ if (delete != null) {
+ if (durability != null) {
+ delete.setDurability(durability);
+ }
+ delete.setClusterIds(clusterIds);
+ context.write(key, delete);
+ }
+ }
+
+ protected void addPutToKv(Put put, Cell kv) throws IOException {
+ put.add(kv);
+ }
+
+ @Override
+ public void setup(Context context) {
+ LOG.info("Setting up " + getClass() + " mapper.");
+ Configuration conf = context.getConfiguration();
+ cfRenameMap = createCfRenameMap(conf);
+ filter = instantiateFilter(conf);
+ String durabilityStr = conf.get(WAL_DURABILITY);
+ if(durabilityStr != null){
+ durability = Durability.valueOf(durabilityStr.toUpperCase(Locale.ROOT));
+ LOG.info("setting WAL durability to " + durability);
+ } else {
+ LOG.info("setting WAL durability to default.");
+ }
+ // TODO: This is kind of ugly doing setup of ZKW just to read the clusterid.
+ ZooKeeperWatcher zkw = null;
+ Exception ex = null;
+ try {
+ zkw = new ZooKeeperWatcher(conf, context.getTaskAttemptID().toString(), null);
+ clusterIds = Collections.singletonList(ZKClusterId.getUUIDForCluster(zkw));
+ } catch (ZooKeeperConnectionException e) {
+ ex = e;
+ LOG.error("Problem connecting to ZooKeper during task setup", e);
+ } catch (KeeperException e) {
+ ex = e;
+ LOG.error("Problem reading ZooKeeper data during task setup", e);
+ } catch (IOException e) {
+ ex = e;
+ LOG.error("Problem setting up task", e);
+ } finally {
+ if (zkw != null) zkw.close();
+ }
+ if (clusterIds == null) {
+ // exit early if setup fails
+ throw new RuntimeException(ex);
+ }
+ }
+ }
+
+ /**
+ * Create a {@link Filter} to apply to all incoming keys ({@link KeyValue KeyValues}) to
+ * optionally not include in the job output
+ * @param conf {@link Configuration} from which to load the filter
+ * @return the filter to use for the task, or <tt>null</tt> if no filter to should be used
+ * @throws IllegalArgumentException if the filter is misconfigured
+ */
+ public static Filter instantiateFilter(Configuration conf) {
+ // get the filter, if it was configured
+ Class<? extends Filter> filterClass = conf.getClass(FILTER_CLASS_CONF_KEY, null, Filter.class);
+ if (filterClass == null) {
+ LOG.debug("No configured filter class, accepting all keyvalues.");
+ return null;
+ }
+ LOG.debug("Attempting to create filter:" + filterClass);
+ String[] filterArgs = conf.getStrings(FILTER_ARGS_CONF_KEY);
+ ArrayList<byte[]> quotedArgs = toQuotedByteArrays(filterArgs);
+ try {
+ Method m = filterClass.getMethod("createFilterFromArguments", ArrayList.class);
+ return (Filter) m.invoke(null, quotedArgs);
+ } catch (IllegalAccessException e) {
+ LOG.error("Couldn't instantiate filter!", e);
+ throw new RuntimeException(e);
+ } catch (SecurityException e) {
+ LOG.error("Couldn't instantiate filter!", e);
+ throw new RuntimeException(e);
+ } catch (NoSuchMethodException e) {
+ LOG.error("Couldn't instantiate filter!", e);
+ throw new RuntimeException(e);
+ } catch (IllegalArgumentException e) {
+ LOG.error("Couldn't instantiate filter!", e);
+ throw new RuntimeException(e);
+ } catch (InvocationTargetException e) {
+ LOG.error("Couldn't instantiate filter!", e);
+ throw new RuntimeException(e);
+ }
+ }
+
+ private static ArrayList<byte[]> toQuotedByteArrays(String... stringArgs) {
+ ArrayList<byte[]> quotedArgs = new ArrayList<>();
+ for (String stringArg : stringArgs) {
+ // all the filters' instantiation methods expected quoted args since they are coming from
+ // the shell, so add them here, though it shouldn't really be needed :-/
+ quotedArgs.add(Bytes.toBytes("'" + stringArg + "'"));
+ }
+ return quotedArgs;
+ }
+
+ /**
+ * Attempt to filter out the keyvalue
+ * @param kv {@link KeyValue} on which to apply the filter
+ * @return <tt>null</tt> if the key should not be written, otherwise returns the original
+ * {@link KeyValue}
+ */
+ public static Cell filterKv(Filter filter, Cell kv) throws IOException {
+ // apply the filter and skip this kv if the filter doesn't apply
+ if (filter != null) {
+ Filter.ReturnCode code = filter.filterKeyValue(kv);
+ if (LOG.isTraceEnabled()) {
+ LOG.trace("Filter returned:" + code + " for the key value:" + kv);
+ }
+ // if its not an accept type, then skip this kv
+ if (!(code.equals(Filter.ReturnCode.INCLUDE) || code
+ .equals(Filter.ReturnCode.INCLUDE_AND_NEXT_COL))) {
+ return null;
+ }
+ }
+ return kv;
+ }
+
+ // helper: create a new KeyValue based on CF rename map
+ private static Cell convertKv(Cell kv, Map<byte[], byte[]> cfRenameMap) {
+ if(cfRenameMap != null) {
+ // If there's a rename mapping for this CF, create a new KeyValue
+ byte[] newCfName = cfRenameMap.get(CellUtil.cloneFamily(kv));
+ if(newCfName != null) {
+ kv = new KeyValue(kv.getRowArray(), // row buffer
+ kv.getRowOffset(), // row offset
+ kv.getRowLength(), // row length
+ newCfName, // CF buffer
+ 0, // CF offset
+ newCfName.length, // CF length
+ kv.getQualifierArray(), // qualifier buffer
+ kv.getQualifierOffset(), // qualifier offset
+ kv.getQualifierLength(), // qualifier length
+ kv.getTimestamp(), // timestamp
+ KeyValue.Type.codeToType(kv.getTypeByte()), // KV Type
+ kv.getValueArray(), // value buffer
+ kv.getValueOffset(), // value offset
+ kv.getValueLength()); // value length
+ }
+ }
+ return kv;
+ }
+
+ // helper: make a map from sourceCfName to destCfName by parsing a config key
+ private static Map<byte[], byte[]> createCfRenameMap(Configuration conf) {
+ Map<byte[], byte[]> cfRenameMap = null;
+ String allMappingsPropVal = conf.get(CF_RENAME_PROP);
+ if(allMappingsPropVal != null) {
+ // The conf value format should be sourceCf1:destCf1,sourceCf2:destCf2,...
+ String[] allMappings = allMappingsPropVal.split(",");
+ for (String mapping: allMappings) {
+ if(cfRenameMap == null) {
+ cfRenameMap = new TreeMap<>(Bytes.BYTES_COMPARATOR);
+ }
+ String [] srcAndDest = mapping.split(":");
+ if(srcAndDest.length != 2) {
+ continue;
+ }
+ cfRenameMap.put(srcAndDest[0].getBytes(), srcAndDest[1].getBytes());
+ }
+ }
+ return cfRenameMap;
+ }
+
+ /**
+ * <p>Sets a configuration property with key {@link #CF_RENAME_PROP} in conf that tells
+ * the mapper how to rename column families.
+ *
+ * <p>Alternately, instead of calling this function, you could set the configuration key
+ * {@link #CF_RENAME_PROP} yourself. The value should look like
+ * <pre>srcCf1:destCf1,srcCf2:destCf2,....</pre>. This would have the same effect on
+ * the mapper behavior.
+ *
+ * @param conf the Configuration in which the {@link #CF_RENAME_PROP} key will be
+ * set
+ * @param renameMap a mapping from source CF names to destination CF names
+ */
+ static public void configureCfRenaming(Configuration conf,
+ Map<String, String> renameMap) {
+ StringBuilder sb = new StringBuilder();
+ for(Map.Entry<String,String> entry: renameMap.entrySet()) {
+ String sourceCf = entry.getKey();
+ String destCf = entry.getValue();
+
+ if(sourceCf.contains(":") || sourceCf.contains(",") ||
+ destCf.contains(":") || destCf.contains(",")) {
+ throw new IllegalArgumentException("Illegal character in CF names: "
+ + sourceCf + ", " + destCf);
+ }
+
+ if(sb.length() != 0) {
+ sb.append(",");
+ }
+ sb.append(sourceCf + ":" + destCf);
+ }
+ conf.set(CF_RENAME_PROP, sb.toString());
+ }
+
+ /**
+ * Add a Filter to be instantiated on import
+ * @param conf Configuration to update (will be passed to the job)
+ * @param clazz {@link Filter} subclass to instantiate on the server.
+ * @param filterArgs List of arguments to pass to the filter on instantiation
+ */
+ public static void addFilterAndArguments(Configuration conf, Class<? extends Filter> clazz,
+ List<String> filterArgs) throws IOException {
+ conf.set(Import.FILTER_CLASS_CONF_KEY, clazz.getName());
+ conf.setStrings(Import.FILTER_ARGS_CONF_KEY, filterArgs.toArray(new String[filterArgs.size()]));
+ }
+
+ /**
+ * Sets up the actual job.
+ * @param conf The current configuration.
+ * @param args The command line parameters.
+ * @return The newly created job.
+ * @throws IOException When setting up the job fails.
+ */
+ public static Job createSubmittableJob(Configuration conf, String[] args)
+ throws IOException {
+ TableName tableName = TableName.valueOf(args[0]);
+ conf.set(TABLE_NAME, tableName.getNameAsString());
+ Path inputDir = new Path(args[1]);
+ Job job = Job.getInstance(conf, conf.get(JOB_NAME_CONF_KEY, NAME + "_" + tableName));
+ job.setJarByClass(Importer.class);
+ FileInputFormat.setInputPaths(job, inputDir);
+ job.setInputFormatClass(SequenceFileInputFormat.class);
+ String hfileOutPath = conf.get(BULK_OUTPUT_CONF_KEY);
+
+ // make sure we get the filter in the jars
+ try {
+ Class<? extends Filter> filter = conf.getClass(FILTER_CLASS_CONF_KEY, null, Filter.class);
+ if (filter != null) {
+ TableMapReduceUtil.addDependencyJarsForClasses(conf, filter);
+ }
+ } catch (Exception e) {
+ throw new IOException(e);
+ }
+
+ if (hfileOutPath != null && conf.getBoolean(HAS_LARGE_RESULT, false)) {
+ LOG.info("Use Large Result!!");
+ try (Connection conn = ConnectionFactory.createConnection(conf);
+ Table table = conn.getTable(tableName);
+ RegionLocator regionLocator = conn.getRegionLocator(tableName)) {
+ HFileOutputFormat2.configureIncrementalLoad(job, table.getDescriptor(), regionLocator);
+ job.setMapperClass(KeyValueSortImporter.class);
+ job.setReducerClass(KeyValueReducer.class);
+ Path outputDir = new Path(hfileOutPath);
+ FileOutputFormat.setOutputPath(job, outputDir);
+ job.setMapOutputKeyClass(KeyValueWritableComparable.class);
+ job.setMapOutputValueClass(KeyValue.class);
+ job.getConfiguration().setClass("mapreduce.job.output.key.comparator.class",
+ KeyValueWritableComparable.KeyValueWritableComparator.class,
+ RawComparator.class);
+ Path partitionsPath =
+ new Path(TotalOrderPartitioner.getPartitionFile(job.getConfiguration()));
+ FileSystem fs = FileSystem.get(job.getConfiguration());
+ fs.deleteOnExit(partitionsPath);
+ job.setPartitionerClass(KeyValueWritableComparablePartitioner.class);
+ job.setNumReduceTasks(regionLocator.getStartKeys().length);
+ TableMapReduceUtil.addDependencyJarsForClasses(job.getConfiguration(),
+ org.apache.hadoop.hbase.shaded.com.google.common.base.Preconditions.class);
+ }
+ } else if (hfileOutPath != null) {
+ LOG.info("writing to hfiles for bulk load.");
+ job.setMapperClass(KeyValueImporter.class);
+ try (Connection conn = ConnectionFactory.createConnection(conf);
+ Table table = conn.getTable(tableName);
+ RegionLocator regionLocator = conn.getRegionLocator(tableName)){
+ job.setReducerClass(KeyValueSortReducer.class);
+ Path outputDir = new Path(hfileOutPath);
+ FileOutputFormat.setOutputPath(job, outputDir);
+ job.setMapOutputKeyClass(ImmutableBytesWritable.class);
+ job.setMapOutputValueClass(KeyValue.class);
+ HFileOutputFormat2.configureIncrementalLoad(job, table.getDescriptor(), regionLocator);
+ TableMapReduceUtil.addDependencyJarsForClasses(job.getConfiguration(),
+ org.apache.hadoop.hbase.shaded.com.google.common.base.Preconditions.class);
+ }
+ } else {
+ LOG.info("writing directly to table from Mapper.");
+ // No reducers. Just write straight to table. Call initTableReducerJob
+ // because it sets up the TableOutputFormat.
+ job.setMapperClass(Importer.class);
+ TableMapReduceUtil.initTableReducerJob(tableName.getNameAsString(), null, job);
+ job.setNumReduceTasks(0);
+ }
+ return job;
+ }
+
+ /*
+ * @param errorMsg Error message. Can be null.
+ */
+ private static void usage(final String errorMsg) {
+ if (errorMsg != null && errorMsg.length() > 0) {
+ System.err.println("ERROR: " + errorMsg);
+ }
+ System.err.println("Usage: Import [options] <tablename> <inputdir>");
+ System.err.println("By default Import will load data directly into HBase. To instead generate");
+ System.err.println("HFiles of data to prepare for a bulk data load, pass the option:");
+ System.err.println(" -D" + BULK_OUTPUT_CONF_KEY + "=/path/for/output");
+ System.err.println("If there is a large result that includes too much KeyValue "
+ + "whitch can occur OOME caused by the memery sort in reducer, pass the option:");
+ System.err.println(" -D" + HAS_LARGE_RESULT + "=true");
+ System.err
+ .println(" To apply a generic org.apache.hadoop.hbase.filter.Filter to the input, use");
+ System.err.println(" -D" + FILTER_CLASS_CONF_KEY + "=<name of filter class>");
+ System.err.println(" -D" + FILTER_ARGS_CONF_KEY + "=<comma separated list of args for filter");
+ System.err.println(" NOTE: The filter will be applied BEFORE doing key renames via the "
+ + CF_RENAME_PROP + " property. Futher, filters will only use the"
+ + " Filter#filterRowKey(byte[] buffer, int offset, int length) method to identify "
+ + " whether the current row needs to be ignored completely for processing and "
+ + " Filter#filterKeyValue(KeyValue) method to determine if the KeyValue should be added;"
+ + " Filter.ReturnCode#INCLUDE and #INCLUDE_AND_NEXT_COL will be considered as including"
+ + " the KeyValue.");
+ System.err.println("To import data exported from HBase 0.94, use");
+ System.err.println(" -Dhbase.import.version=0.94");
+ System.err.println(" -D " + JOB_NAME_CONF_KEY
+ + "=jobName - use the specified mapreduce job name for the import");
+ System.err.println("For performance consider the following options:\n"
+ + " -Dmapreduce.map.speculative=false\n"
+ + " -Dmapreduce.reduce.speculative=false\n"
+ + " -D" + WAL_DURABILITY + "=<Used while writing data to hbase."
+ +" Allowed values are the supported durability values"
+ +" like SKIP_WAL/ASYNC_WAL/SYNC_WAL/...>");
+ }
+
+ /**
+ * If the durability is set to {@link Durability#SKIP_WAL} and the data is imported to hbase, we
+ * need to flush all the regions of the table as the data is held in memory and is also not
+ * present in the Write Ahead Log to replay in scenarios of a crash. This method flushes all the
+ * regions of the table in the scenarios of import data to hbase with {@link Durability#SKIP_WAL}
+ */
+ public static void flushRegionsIfNecessary(Configuration conf) throws IOException,
+ InterruptedException {
+ String tableName = conf.get(TABLE_NAME);
+ Admin hAdmin = null;
+ Connection connection = null;
+ String durability = conf.get(WAL_DURABILITY);
+ // Need to flush if the data is written to hbase and skip wal is enabled.
+ if (conf.get(BULK_OUTPUT_CONF_KEY) == null && durability != null
+ && Durability.SKIP_WAL.name().equalsIgnoreCase(durability)) {
+ LOG.info("Flushing all data that skipped the WAL.");
+ try {
+ connection = ConnectionFactory.createConnection(conf);
+ hAdmin = connection.getAdmin();
+ hAdmin.flush(TableName.valueOf(tableName));
+ } finally {
+ if (hAdmin != null) {
+ hAdmin.close();
+ }
+ if (connection != null) {
+ connection.close();
+ }
+ }
+ }
+ }
+
+ @Override
+ public int run(String[] args) throws Exception {
+ if (args.length < 2) {
+ usage("Wrong number of arguments: " + args.length);
+ return -1;
+ }
+ String inputVersionString = System.getProperty(ResultSerialization.IMPORT_FORMAT_VER);
+ if (inputVersionString != null) {
+ getConf().set(ResultSerialization.IMPORT_FORMAT_VER, inputVersionString);
+ }
+ Job job = createSubmittableJob(getConf(), args);
+ boolean isJobSuccessful = job.waitForCompletion(true);
+ if(isJobSuccessful){
+ // Flush all the regions of the table
+ flushRegionsIfNecessary(getConf());
+ }
+ long inputRecords = job.getCounters().findCounter(TaskCounter.MAP_INPUT_RECORDS).getValue();
+ long outputRecords = job.getCounters().findCounter(TaskCounter.MAP_OUTPUT_RECORDS).getValue();
+ if (outputRecords < inputRecords) {
+ System.err.println("Warning, not all records were imported (maybe filtered out).");
+ if (outputRecords == 0) {
+ System.err.println("If the data was exported from HBase 0.94 "+
+ "consider using -Dhbase.import.version=0.94.");
+ }
+ }
+
+ return (isJobSuccessful ? 0 : 1);
+ }
+
+ /**
+ * Main entry point.
+ * @param args The command line parameters.
+ * @throws Exception When running the job fails.
+ */
+ public static void main(String[] args) throws Exception {
+ int errCode = ToolRunner.run(HBaseConfiguration.create(), new Import(), args);
+ System.exit(errCode);
+ }
+
+}