You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hbase.apache.org by ap...@apache.org on 2017/08/26 01:39:37 UTC
[37/41] hbase git commit: HBASE-18640 Move mapreduce out of
hbase-server into separate module.
http://git-wip-us.apache.org/repos/asf/hbase/blob/664b6be0/hbase-mapreduce/src/main/java/org/apache/hadoop/hbase/mapreduce/ImportTsv.java
----------------------------------------------------------------------
diff --git a/hbase-mapreduce/src/main/java/org/apache/hadoop/hbase/mapreduce/ImportTsv.java b/hbase-mapreduce/src/main/java/org/apache/hadoop/hbase/mapreduce/ImportTsv.java
new file mode 100644
index 0000000..b64271e
--- /dev/null
+++ b/hbase-mapreduce/src/main/java/org/apache/hadoop/hbase/mapreduce/ImportTsv.java
@@ -0,0 +1,793 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.mapreduce;
+
+import static java.lang.String.format;
+
+import java.io.File;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.HashSet;
+import java.util.Set;
+
+import org.apache.commons.lang.StringUtils;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.conf.Configured;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hbase.HBaseConfiguration;
+import org.apache.hadoop.hbase.HColumnDescriptor;
+import org.apache.hadoop.hbase.HConstants;
+import org.apache.hadoop.hbase.HTableDescriptor;
+import org.apache.hadoop.hbase.TableName;
+import org.apache.hadoop.hbase.TableNotEnabledException;
+import org.apache.hadoop.hbase.TableNotFoundException;
+import org.apache.hadoop.hbase.classification.InterfaceAudience;
+import org.apache.hadoop.hbase.client.Admin;
+import org.apache.hadoop.hbase.client.Connection;
+import org.apache.hadoop.hbase.client.ConnectionFactory;
+import org.apache.hadoop.hbase.client.Put;
+import org.apache.hadoop.hbase.client.RegionLocator;
+import org.apache.hadoop.hbase.client.Table;
+import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
+import org.apache.hadoop.hbase.util.Base64;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.hadoop.hbase.util.Pair;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.mapreduce.Job;
+import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
+import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
+import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
+import org.apache.hadoop.mapreduce.lib.output.NullOutputFormat;
+import org.apache.hadoop.security.Credentials;
+import org.apache.hadoop.util.Tool;
+import org.apache.hadoop.util.ToolRunner;
+
+import org.apache.hadoop.hbase.client.ColumnFamilyDescriptor;
+import org.apache.hadoop.hbase.client.TableDescriptor;
+import org.apache.hadoop.hbase.shaded.com.google.common.base.Preconditions;
+import org.apache.hadoop.hbase.shaded.com.google.common.base.Splitter;
+import org.apache.hadoop.hbase.shaded.com.google.common.collect.Lists;
+
+/**
+ * Tool to import data from a TSV file.
+ *
+ * This tool is rather simplistic - it doesn't do any quoting or
+ * escaping, but is useful for many data loads.
+ *
+ * @see ImportTsv#usage(String)
+ */
+@InterfaceAudience.Public
+public class ImportTsv extends Configured implements Tool {
+
+ protected static final Log LOG = LogFactory.getLog(ImportTsv.class);
+
+ final static String NAME = "importtsv";
+
+ public final static String MAPPER_CONF_KEY = "importtsv.mapper.class";
+ public final static String BULK_OUTPUT_CONF_KEY = "importtsv.bulk.output";
+ public final static String TIMESTAMP_CONF_KEY = "importtsv.timestamp";
+ public final static String JOB_NAME_CONF_KEY = "mapreduce.job.name";
+ // TODO: the rest of these configs are used exclusively by TsvImporterMapper.
+ // Move them out of the tool and let the mapper handle its own validation.
+ public final static String DRY_RUN_CONF_KEY = "importtsv.dry.run";
+ // If true, bad lines are logged to stderr. Default: false.
+ public final static String LOG_BAD_LINES_CONF_KEY = "importtsv.log.bad.lines";
+ public final static String SKIP_LINES_CONF_KEY = "importtsv.skip.bad.lines";
+ public final static String SKIP_EMPTY_COLUMNS = "importtsv.skip.empty.columns";
+ public final static String COLUMNS_CONF_KEY = "importtsv.columns";
+ public final static String SEPARATOR_CONF_KEY = "importtsv.separator";
+ public final static String ATTRIBUTE_SEPERATOR_CONF_KEY = "attributes.seperator";
+ //This config is used to propagate credentials from parent MR jobs which launch
+ //ImportTSV jobs. SEE IntegrationTestImportTsv.
+ public final static String CREDENTIALS_LOCATION = "credentials_location";
+ final static String DEFAULT_SEPARATOR = "\t";
+ final static String DEFAULT_ATTRIBUTES_SEPERATOR = "=>";
+ final static String DEFAULT_MULTIPLE_ATTRIBUTES_SEPERATOR = ",";
+ final static Class DEFAULT_MAPPER = TsvImporterMapper.class;
+ public final static String CREATE_TABLE_CONF_KEY = "create.table";
+ public final static String NO_STRICT_COL_FAMILY = "no.strict";
+ /**
+ * If table didn't exist and was created in dry-run mode, this flag is
+ * flipped to delete it when MR ends.
+ */
+ private static boolean DRY_RUN_TABLE_CREATED;
+
+ public static class TsvParser {
+ /**
+ * Column families and qualifiers mapped to the TSV columns
+ */
+ private final byte[][] families;
+ private final byte[][] qualifiers;
+
+ private final byte separatorByte;
+
+ private int rowKeyColumnIndex;
+
+ private int maxColumnCount;
+
+ // Default value must be negative
+ public static final int DEFAULT_TIMESTAMP_COLUMN_INDEX = -1;
+
+ private int timestampKeyColumnIndex = DEFAULT_TIMESTAMP_COLUMN_INDEX;
+
+ public static final String ROWKEY_COLUMN_SPEC = "HBASE_ROW_KEY";
+
+ public static final String TIMESTAMPKEY_COLUMN_SPEC = "HBASE_TS_KEY";
+
+ public static final String ATTRIBUTES_COLUMN_SPEC = "HBASE_ATTRIBUTES_KEY";
+
+ public static final String CELL_VISIBILITY_COLUMN_SPEC = "HBASE_CELL_VISIBILITY";
+
+ public static final String CELL_TTL_COLUMN_SPEC = "HBASE_CELL_TTL";
+
+ private int attrKeyColumnIndex = DEFAULT_ATTRIBUTES_COLUMN_INDEX;
+
+ public static final int DEFAULT_ATTRIBUTES_COLUMN_INDEX = -1;
+
+ public static final int DEFAULT_CELL_VISIBILITY_COLUMN_INDEX = -1;
+
+ public static final int DEFAULT_CELL_TTL_COLUMN_INDEX = -1;
+
+ private int cellVisibilityColumnIndex = DEFAULT_CELL_VISIBILITY_COLUMN_INDEX;
+
+ private int cellTTLColumnIndex = DEFAULT_CELL_TTL_COLUMN_INDEX;
+
+ /**
+ * @param columnsSpecification the list of columns to parser out, comma separated.
+ * The row key should be the special token TsvParser.ROWKEY_COLUMN_SPEC
+ * @param separatorStr
+ */
+ public TsvParser(String columnsSpecification, String separatorStr) {
+ // Configure separator
+ byte[] separator = Bytes.toBytes(separatorStr);
+ Preconditions.checkArgument(separator.length == 1,
+ "TsvParser only supports single-byte separators");
+ separatorByte = separator[0];
+
+ // Configure columns
+ ArrayList<String> columnStrings = Lists.newArrayList(
+ Splitter.on(',').trimResults().split(columnsSpecification));
+
+ maxColumnCount = columnStrings.size();
+ families = new byte[maxColumnCount][];
+ qualifiers = new byte[maxColumnCount][];
+
+ for (int i = 0; i < columnStrings.size(); i++) {
+ String str = columnStrings.get(i);
+ if (ROWKEY_COLUMN_SPEC.equals(str)) {
+ rowKeyColumnIndex = i;
+ continue;
+ }
+ if (TIMESTAMPKEY_COLUMN_SPEC.equals(str)) {
+ timestampKeyColumnIndex = i;
+ continue;
+ }
+ if (ATTRIBUTES_COLUMN_SPEC.equals(str)) {
+ attrKeyColumnIndex = i;
+ continue;
+ }
+ if (CELL_VISIBILITY_COLUMN_SPEC.equals(str)) {
+ cellVisibilityColumnIndex = i;
+ continue;
+ }
+ if (CELL_TTL_COLUMN_SPEC.equals(str)) {
+ cellTTLColumnIndex = i;
+ continue;
+ }
+ String[] parts = str.split(":", 2);
+ if (parts.length == 1) {
+ families[i] = str.getBytes();
+ qualifiers[i] = HConstants.EMPTY_BYTE_ARRAY;
+ } else {
+ families[i] = parts[0].getBytes();
+ qualifiers[i] = parts[1].getBytes();
+ }
+ }
+ }
+
+ public boolean hasTimestamp() {
+ return timestampKeyColumnIndex != DEFAULT_TIMESTAMP_COLUMN_INDEX;
+ }
+
+ public int getTimestampKeyColumnIndex() {
+ return timestampKeyColumnIndex;
+ }
+
+ public boolean hasAttributes() {
+ return attrKeyColumnIndex != DEFAULT_ATTRIBUTES_COLUMN_INDEX;
+ }
+
+ public boolean hasCellVisibility() {
+ return cellVisibilityColumnIndex != DEFAULT_CELL_VISIBILITY_COLUMN_INDEX;
+ }
+
+ public boolean hasCellTTL() {
+ return cellTTLColumnIndex != DEFAULT_CELL_VISIBILITY_COLUMN_INDEX;
+ }
+
+ public int getAttributesKeyColumnIndex() {
+ return attrKeyColumnIndex;
+ }
+
+ public int getCellVisibilityColumnIndex() {
+ return cellVisibilityColumnIndex;
+ }
+
+ public int getCellTTLColumnIndex() {
+ return cellTTLColumnIndex;
+ }
+
+ public int getRowKeyColumnIndex() {
+ return rowKeyColumnIndex;
+ }
+
+ public byte[] getFamily(int idx) {
+ return families[idx];
+ }
+ public byte[] getQualifier(int idx) {
+ return qualifiers[idx];
+ }
+
+ public ParsedLine parse(byte[] lineBytes, int length)
+ throws BadTsvLineException {
+ // Enumerate separator offsets
+ ArrayList<Integer> tabOffsets = new ArrayList<>(maxColumnCount);
+ for (int i = 0; i < length; i++) {
+ if (lineBytes[i] == separatorByte) {
+ tabOffsets.add(i);
+ }
+ }
+ if (tabOffsets.isEmpty()) {
+ throw new BadTsvLineException("No delimiter");
+ }
+
+ tabOffsets.add(length);
+
+ if (tabOffsets.size() > maxColumnCount) {
+ throw new BadTsvLineException("Excessive columns");
+ } else if (tabOffsets.size() <= getRowKeyColumnIndex()) {
+ throw new BadTsvLineException("No row key");
+ } else if (hasTimestamp()
+ && tabOffsets.size() <= getTimestampKeyColumnIndex()) {
+ throw new BadTsvLineException("No timestamp");
+ } else if (hasAttributes() && tabOffsets.size() <= getAttributesKeyColumnIndex()) {
+ throw new BadTsvLineException("No attributes specified");
+ } else if (hasCellVisibility() && tabOffsets.size() <= getCellVisibilityColumnIndex()) {
+ throw new BadTsvLineException("No cell visibility specified");
+ } else if (hasCellTTL() && tabOffsets.size() <= getCellTTLColumnIndex()) {
+ throw new BadTsvLineException("No cell TTL specified");
+ }
+ return new ParsedLine(tabOffsets, lineBytes);
+ }
+
+ class ParsedLine {
+ private final ArrayList<Integer> tabOffsets;
+ private byte[] lineBytes;
+
+ ParsedLine(ArrayList<Integer> tabOffsets, byte[] lineBytes) {
+ this.tabOffsets = tabOffsets;
+ this.lineBytes = lineBytes;
+ }
+
+ public int getRowKeyOffset() {
+ return getColumnOffset(rowKeyColumnIndex);
+ }
+ public int getRowKeyLength() {
+ return getColumnLength(rowKeyColumnIndex);
+ }
+
+ public long getTimestamp(long ts) throws BadTsvLineException {
+ // Return ts if HBASE_TS_KEY is not configured in column spec
+ if (!hasTimestamp()) {
+ return ts;
+ }
+
+ String timeStampStr = Bytes.toString(lineBytes,
+ getColumnOffset(timestampKeyColumnIndex),
+ getColumnLength(timestampKeyColumnIndex));
+ try {
+ return Long.parseLong(timeStampStr);
+ } catch (NumberFormatException nfe) {
+ // treat this record as bad record
+ throw new BadTsvLineException("Invalid timestamp " + timeStampStr);
+ }
+ }
+
+ private String getAttributes() {
+ if (!hasAttributes()) {
+ return null;
+ } else {
+ return Bytes.toString(lineBytes, getColumnOffset(attrKeyColumnIndex),
+ getColumnLength(attrKeyColumnIndex));
+ }
+ }
+
+ public String[] getIndividualAttributes() {
+ String attributes = getAttributes();
+ if (attributes != null) {
+ return attributes.split(DEFAULT_MULTIPLE_ATTRIBUTES_SEPERATOR);
+ } else {
+ return null;
+ }
+ }
+
+ public int getAttributeKeyOffset() {
+ if (hasAttributes()) {
+ return getColumnOffset(attrKeyColumnIndex);
+ } else {
+ return DEFAULT_ATTRIBUTES_COLUMN_INDEX;
+ }
+ }
+
+ public int getAttributeKeyLength() {
+ if (hasAttributes()) {
+ return getColumnLength(attrKeyColumnIndex);
+ } else {
+ return DEFAULT_ATTRIBUTES_COLUMN_INDEX;
+ }
+ }
+
+ public int getCellVisibilityColumnOffset() {
+ if (hasCellVisibility()) {
+ return getColumnOffset(cellVisibilityColumnIndex);
+ } else {
+ return DEFAULT_CELL_VISIBILITY_COLUMN_INDEX;
+ }
+ }
+
+ public int getCellVisibilityColumnLength() {
+ if (hasCellVisibility()) {
+ return getColumnLength(cellVisibilityColumnIndex);
+ } else {
+ return DEFAULT_CELL_VISIBILITY_COLUMN_INDEX;
+ }
+ }
+
+ public String getCellVisibility() {
+ if (!hasCellVisibility()) {
+ return null;
+ } else {
+ return Bytes.toString(lineBytes, getColumnOffset(cellVisibilityColumnIndex),
+ getColumnLength(cellVisibilityColumnIndex));
+ }
+ }
+
+ public int getCellTTLColumnOffset() {
+ if (hasCellTTL()) {
+ return getColumnOffset(cellTTLColumnIndex);
+ } else {
+ return DEFAULT_CELL_TTL_COLUMN_INDEX;
+ }
+ }
+
+ public int getCellTTLColumnLength() {
+ if (hasCellTTL()) {
+ return getColumnLength(cellTTLColumnIndex);
+ } else {
+ return DEFAULT_CELL_TTL_COLUMN_INDEX;
+ }
+ }
+
+ public long getCellTTL() {
+ if (!hasCellTTL()) {
+ return 0;
+ } else {
+ return Bytes.toLong(lineBytes, getColumnOffset(cellTTLColumnIndex),
+ getColumnLength(cellTTLColumnIndex));
+ }
+ }
+
+ public int getColumnOffset(int idx) {
+ if (idx > 0)
+ return tabOffsets.get(idx - 1) + 1;
+ else
+ return 0;
+ }
+ public int getColumnLength(int idx) {
+ return tabOffsets.get(idx) - getColumnOffset(idx);
+ }
+ public int getColumnCount() {
+ return tabOffsets.size();
+ }
+ public byte[] getLineBytes() {
+ return lineBytes;
+ }
+ }
+
+ public static class BadTsvLineException extends Exception {
+ public BadTsvLineException(String err) {
+ super(err);
+ }
+ private static final long serialVersionUID = 1L;
+ }
+
+ /**
+ * Return starting position and length of row key from the specified line bytes.
+ * @param lineBytes
+ * @param length
+ * @return Pair of row key offset and length.
+ * @throws BadTsvLineException
+ */
+ public Pair<Integer, Integer> parseRowKey(byte[] lineBytes, int length)
+ throws BadTsvLineException {
+ int rkColumnIndex = 0;
+ int startPos = 0, endPos = 0;
+ for (int i = 0; i <= length; i++) {
+ if (i == length || lineBytes[i] == separatorByte) {
+ endPos = i - 1;
+ if (rkColumnIndex++ == getRowKeyColumnIndex()) {
+ if ((endPos + 1) == startPos) {
+ throw new BadTsvLineException("Empty value for ROW KEY.");
+ }
+ break;
+ } else {
+ startPos = endPos + 2;
+ }
+ }
+ if (i == length) {
+ throw new BadTsvLineException(
+ "Row key does not exist as number of columns in the line"
+ + " are less than row key position.");
+ }
+ }
+ return new Pair<>(startPos, endPos - startPos + 1);
+ }
+ }
+
+ /**
+ * Sets up the actual job.
+ *
+ * @param conf The current configuration.
+ * @param args The command line parameters.
+ * @return The newly created job.
+ * @throws IOException When setting up the job fails.
+ */
+ protected static Job createSubmittableJob(Configuration conf, String[] args)
+ throws IOException, ClassNotFoundException {
+ Job job = null;
+ boolean isDryRun = conf.getBoolean(DRY_RUN_CONF_KEY, false);
+ try (Connection connection = ConnectionFactory.createConnection(conf)) {
+ try (Admin admin = connection.getAdmin()) {
+ // Support non-XML supported characters
+ // by re-encoding the passed separator as a Base64 string.
+ String actualSeparator = conf.get(SEPARATOR_CONF_KEY);
+ if (actualSeparator != null) {
+ conf.set(SEPARATOR_CONF_KEY,
+ Base64.encodeBytes(actualSeparator.getBytes()));
+ }
+
+ // See if a non-default Mapper was set
+ String mapperClassName = conf.get(MAPPER_CONF_KEY);
+ Class mapperClass = mapperClassName != null? Class.forName(mapperClassName): DEFAULT_MAPPER;
+
+ TableName tableName = TableName.valueOf(args[0]);
+ Path inputDir = new Path(args[1]);
+ String jobName = conf.get(JOB_NAME_CONF_KEY,NAME + "_" + tableName.getNameAsString());
+ job = Job.getInstance(conf, jobName);
+ job.setJarByClass(mapperClass);
+ FileInputFormat.setInputPaths(job, inputDir);
+ job.setInputFormatClass(TextInputFormat.class);
+ job.setMapperClass(mapperClass);
+ job.setMapOutputKeyClass(ImmutableBytesWritable.class);
+ String hfileOutPath = conf.get(BULK_OUTPUT_CONF_KEY);
+ String[] columns = conf.getStrings(COLUMNS_CONF_KEY);
+ if(StringUtils.isNotEmpty(conf.get(CREDENTIALS_LOCATION))) {
+ String fileLoc = conf.get(CREDENTIALS_LOCATION);
+ Credentials cred = Credentials.readTokenStorageFile(new File(fileLoc), conf);
+ job.getCredentials().addAll(cred);
+ }
+
+ if (hfileOutPath != null) {
+ if (!admin.tableExists(tableName)) {
+ LOG.warn(format("Table '%s' does not exist.", tableName));
+ if ("yes".equalsIgnoreCase(conf.get(CREATE_TABLE_CONF_KEY, "yes"))) {
+ // TODO: this is backwards. Instead of depending on the existence of a table,
+ // create a sane splits file for HFileOutputFormat based on data sampling.
+ createTable(admin, tableName, columns);
+ if (isDryRun) {
+ LOG.warn("Dry run: Table will be deleted at end of dry run.");
+ synchronized (ImportTsv.class) {
+ DRY_RUN_TABLE_CREATED = true;
+ }
+ }
+ } else {
+ String errorMsg =
+ format("Table '%s' does not exist and '%s' is set to no.", tableName,
+ CREATE_TABLE_CONF_KEY);
+ LOG.error(errorMsg);
+ throw new TableNotFoundException(errorMsg);
+ }
+ }
+ try (Table table = connection.getTable(tableName);
+ RegionLocator regionLocator = connection.getRegionLocator(tableName)) {
+ boolean noStrict = conf.getBoolean(NO_STRICT_COL_FAMILY, false);
+ // if no.strict is false then check column family
+ if(!noStrict) {
+ ArrayList<String> unmatchedFamilies = new ArrayList<>();
+ Set<String> cfSet = getColumnFamilies(columns);
+ TableDescriptor tDesc = table.getDescriptor();
+ for (String cf : cfSet) {
+ if(!tDesc.hasColumnFamily(Bytes.toBytes(cf))) {
+ unmatchedFamilies.add(cf);
+ }
+ }
+ if(unmatchedFamilies.size() > 0) {
+ ArrayList<String> familyNames = new ArrayList<>();
+ for (ColumnFamilyDescriptor family : table.getDescriptor().getColumnFamilies()) {
+ familyNames.add(family.getNameAsString());
+ }
+ String msg =
+ "Column Families " + unmatchedFamilies + " specified in " + COLUMNS_CONF_KEY
+ + " does not match with any of the table " + tableName
+ + " column families " + familyNames + ".\n"
+ + "To disable column family check, use -D" + NO_STRICT_COL_FAMILY
+ + "=true.\n";
+ usage(msg);
+ System.exit(-1);
+ }
+ }
+ if (mapperClass.equals(TsvImporterTextMapper.class)) {
+ job.setMapOutputValueClass(Text.class);
+ job.setReducerClass(TextSortReducer.class);
+ } else {
+ job.setMapOutputValueClass(Put.class);
+ job.setCombinerClass(PutCombiner.class);
+ job.setReducerClass(PutSortReducer.class);
+ }
+ if (!isDryRun) {
+ Path outputDir = new Path(hfileOutPath);
+ FileOutputFormat.setOutputPath(job, outputDir);
+ HFileOutputFormat2.configureIncrementalLoad(job, table.getDescriptor(),
+ regionLocator);
+ }
+ }
+ } else {
+ if (!admin.tableExists(tableName)) {
+ String errorMsg = format("Table '%s' does not exist.", tableName);
+ LOG.error(errorMsg);
+ throw new TableNotFoundException(errorMsg);
+ }
+ if (mapperClass.equals(TsvImporterTextMapper.class)) {
+ usage(TsvImporterTextMapper.class.toString()
+ + " should not be used for non bulkloading case. use "
+ + TsvImporterMapper.class.toString()
+ + " or custom mapper whose value type is Put.");
+ System.exit(-1);
+ }
+ if (!isDryRun) {
+ // No reducers. Just write straight to table. Call initTableReducerJob
+ // to set up the TableOutputFormat.
+ TableMapReduceUtil.initTableReducerJob(tableName.getNameAsString(), null, job);
+ }
+ job.setNumReduceTasks(0);
+ }
+ if (isDryRun) {
+ job.setOutputFormatClass(NullOutputFormat.class);
+ job.getConfiguration().setStrings("io.serializations",
+ job.getConfiguration().get("io.serializations"),
+ MutationSerialization.class.getName(), ResultSerialization.class.getName(),
+ KeyValueSerialization.class.getName());
+ }
+ TableMapReduceUtil.addDependencyJars(job);
+ TableMapReduceUtil.addDependencyJarsForClasses(job.getConfiguration(),
+ org.apache.hadoop.hbase.shaded.com.google.common.base.Function.class /* Guava used by TsvParser */);
+ }
+ }
+ return job;
+ }
+
+ private static void createTable(Admin admin, TableName tableName, String[] columns)
+ throws IOException {
+ HTableDescriptor htd = new HTableDescriptor(tableName);
+ Set<String> cfSet = getColumnFamilies(columns);
+ for (String cf : cfSet) {
+ HColumnDescriptor hcd = new HColumnDescriptor(Bytes.toBytes(cf));
+ htd.addFamily(hcd);
+ }
+ LOG.warn(format("Creating table '%s' with '%s' columns and default descriptors.",
+ tableName, cfSet));
+ admin.createTable(htd);
+ }
+
+ private static void deleteTable(Configuration conf, String[] args) {
+ TableName tableName = TableName.valueOf(args[0]);
+ try (Connection connection = ConnectionFactory.createConnection(conf);
+ Admin admin = connection.getAdmin()) {
+ try {
+ admin.disableTable(tableName);
+ } catch (TableNotEnabledException e) {
+ LOG.debug("Dry mode: Table: " + tableName + " already disabled, so just deleting it.");
+ }
+ admin.deleteTable(tableName);
+ } catch (IOException e) {
+ LOG.error(format("***Dry run: Failed to delete table '%s'.***%n%s", tableName,
+ e.toString()));
+ return;
+ }
+ LOG.info(format("Dry run: Deleted table '%s'.", tableName));
+ }
+
+ private static Set<String> getColumnFamilies(String[] columns) {
+ Set<String> cfSet = new HashSet<>();
+ for (String aColumn : columns) {
+ if (TsvParser.ROWKEY_COLUMN_SPEC.equals(aColumn)
+ || TsvParser.TIMESTAMPKEY_COLUMN_SPEC.equals(aColumn)
+ || TsvParser.CELL_VISIBILITY_COLUMN_SPEC.equals(aColumn)
+ || TsvParser.CELL_TTL_COLUMN_SPEC.equals(aColumn)
+ || TsvParser.ATTRIBUTES_COLUMN_SPEC.equals(aColumn))
+ continue;
+ // we are only concerned with the first one (in case this is a cf:cq)
+ cfSet.add(aColumn.split(":", 2)[0]);
+ }
+ return cfSet;
+ }
+
+ /*
+ * @param errorMsg Error message. Can be null.
+ */
+ private static void usage(final String errorMsg) {
+ if (errorMsg != null && errorMsg.length() > 0) {
+ System.err.println("ERROR: " + errorMsg);
+ }
+ String usage =
+ "Usage: " + NAME + " -D"+ COLUMNS_CONF_KEY + "=a,b,c <tablename> <inputdir>\n" +
+ "\n" +
+ "Imports the given input directory of TSV data into the specified table.\n" +
+ "\n" +
+ "The column names of the TSV data must be specified using the -D" + COLUMNS_CONF_KEY + "\n" +
+ "option. This option takes the form of comma-separated column names, where each\n" +
+ "column name is either a simple column family, or a columnfamily:qualifier. The special\n" +
+ "column name " + TsvParser.ROWKEY_COLUMN_SPEC + " is used to designate that this column should be used\n" +
+ "as the row key for each imported record. You must specify exactly one column\n" +
+ "to be the row key, and you must specify a column name for every column that exists in the\n" +
+ "input data. Another special column" + TsvParser.TIMESTAMPKEY_COLUMN_SPEC +
+ " designates that this column should be\n" +
+ "used as timestamp for each record. Unlike " + TsvParser.ROWKEY_COLUMN_SPEC + ", " +
+ TsvParser.TIMESTAMPKEY_COLUMN_SPEC + " is optional." + "\n" +
+ "You must specify at most one column as timestamp key for each imported record.\n" +
+ "Record with invalid timestamps (blank, non-numeric) will be treated as bad record.\n" +
+ "Note: if you use this option, then '" + TIMESTAMP_CONF_KEY + "' option will be ignored.\n" +
+ "\n" +
+ "Other special columns that can be specified are " + TsvParser.CELL_TTL_COLUMN_SPEC +
+ " and " + TsvParser.CELL_VISIBILITY_COLUMN_SPEC + ".\n" +
+ TsvParser.CELL_TTL_COLUMN_SPEC + " designates that this column will be used " +
+ "as a Cell's Time To Live (TTL) attribute.\n" +
+ TsvParser.CELL_VISIBILITY_COLUMN_SPEC + " designates that this column contains the " +
+ "visibility label expression.\n" +
+ "\n" +
+ TsvParser.ATTRIBUTES_COLUMN_SPEC+" can be used to specify Operation Attributes per record.\n"+
+ " Should be specified as key=>value where "+TsvParser.DEFAULT_ATTRIBUTES_COLUMN_INDEX+ " is used \n"+
+ " as the seperator. Note that more than one OperationAttributes can be specified.\n"+
+ "By default importtsv will load data directly into HBase. To instead generate\n" +
+ "HFiles of data to prepare for a bulk data load, pass the option:\n" +
+ " -D" + BULK_OUTPUT_CONF_KEY + "=/path/for/output\n" +
+ " Note: if you do not use this option, then the target table must already exist in HBase\n" +
+ "\n" +
+ "Other options that may be specified with -D include:\n" +
+ " -D" + DRY_RUN_CONF_KEY + "=true - Dry run mode. Data is not actually populated into" +
+ " table. If table does not exist, it is created but deleted in the end.\n" +
+ " -D" + SKIP_LINES_CONF_KEY + "=false - fail if encountering an invalid line\n" +
+ " -D" + LOG_BAD_LINES_CONF_KEY + "=true - logs invalid lines to stderr\n" +
+ " -D" + SKIP_EMPTY_COLUMNS + "=false - If true then skip empty columns in bulk import\n" +
+ " '-D" + SEPARATOR_CONF_KEY + "=|' - eg separate on pipes instead of tabs\n" +
+ " -D" + TIMESTAMP_CONF_KEY + "=currentTimeAsLong - use the specified timestamp for the import\n" +
+ " -D" + MAPPER_CONF_KEY + "=my.Mapper - A user-defined Mapper to use instead of " +
+ DEFAULT_MAPPER.getName() + "\n" +
+ " -D" + JOB_NAME_CONF_KEY + "=jobName - use the specified mapreduce job name for the import\n" +
+ " -D" + CREATE_TABLE_CONF_KEY + "=no - can be used to avoid creation of table by this tool\n" +
+ " Note: if you set this to 'no', then the target table must already exist in HBase\n" +
+ " -D" + NO_STRICT_COL_FAMILY + "=true - ignore column family check in hbase table. " +
+ "Default is false\n\n" +
+ "For performance consider the following options:\n" +
+ " -Dmapreduce.map.speculative=false\n" +
+ " -Dmapreduce.reduce.speculative=false";
+
+ System.err.println(usage);
+ }
+
+ @Override
+ public int run(String[] args) throws Exception {
+ if (args.length < 2) {
+ usage("Wrong number of arguments: " + args.length);
+ return -1;
+ }
+
+ // When MAPPER_CONF_KEY is null, the user wants to use the provided TsvImporterMapper, so
+ // perform validation on these additional args. When it's not null, user has provided their
+ // own mapper, thus these validation are not relevant.
+ // TODO: validation for TsvImporterMapper, not this tool. Move elsewhere.
+ if (null == getConf().get(MAPPER_CONF_KEY)) {
+ // Make sure columns are specified
+ String[] columns = getConf().getStrings(COLUMNS_CONF_KEY);
+ if (columns == null) {
+ usage("No columns specified. Please specify with -D" +
+ COLUMNS_CONF_KEY+"=...");
+ return -1;
+ }
+
+ // Make sure they specify exactly one column as the row key
+ int rowkeysFound = 0;
+ for (String col : columns) {
+ if (col.equals(TsvParser.ROWKEY_COLUMN_SPEC)) rowkeysFound++;
+ }
+ if (rowkeysFound != 1) {
+ usage("Must specify exactly one column as " + TsvParser.ROWKEY_COLUMN_SPEC);
+ return -1;
+ }
+
+ // Make sure we have at most one column as the timestamp key
+ int tskeysFound = 0;
+ for (String col : columns) {
+ if (col.equals(TsvParser.TIMESTAMPKEY_COLUMN_SPEC))
+ tskeysFound++;
+ }
+ if (tskeysFound > 1) {
+ usage("Must specify at most one column as "
+ + TsvParser.TIMESTAMPKEY_COLUMN_SPEC);
+ return -1;
+ }
+
+ int attrKeysFound = 0;
+ for (String col : columns) {
+ if (col.equals(TsvParser.ATTRIBUTES_COLUMN_SPEC))
+ attrKeysFound++;
+ }
+ if (attrKeysFound > 1) {
+ usage("Must specify at most one column as "
+ + TsvParser.ATTRIBUTES_COLUMN_SPEC);
+ return -1;
+ }
+
+ // Make sure one or more columns are specified excluding rowkey and
+ // timestamp key
+ if (columns.length - (rowkeysFound + tskeysFound + attrKeysFound) < 1) {
+ usage("One or more columns in addition to the row key and timestamp(optional) are required");
+ return -1;
+ }
+ }
+
+ // If timestamp option is not specified, use current system time.
+ long timstamp = getConf().getLong(TIMESTAMP_CONF_KEY, System.currentTimeMillis());
+
+ // Set it back to replace invalid timestamp (non-numeric) with current
+ // system time
+ getConf().setLong(TIMESTAMP_CONF_KEY, timstamp);
+
+ synchronized (ImportTsv.class) {
+ DRY_RUN_TABLE_CREATED = false;
+ }
+ Job job = createSubmittableJob(getConf(), args);
+ boolean success = job.waitForCompletion(true);
+ boolean delete = false;
+ synchronized (ImportTsv.class) {
+ delete = DRY_RUN_TABLE_CREATED;
+ }
+ if (delete) {
+ deleteTable(getConf(), args);
+ }
+ return success ? 0 : 1;
+ }
+
+ public static void main(String[] args) throws Exception {
+ int status = ToolRunner.run(HBaseConfiguration.create(), new ImportTsv(), args);
+ System.exit(status);
+ }
+}
http://git-wip-us.apache.org/repos/asf/hbase/blob/664b6be0/hbase-mapreduce/src/main/java/org/apache/hadoop/hbase/mapreduce/JarFinder.java
----------------------------------------------------------------------
diff --git a/hbase-mapreduce/src/main/java/org/apache/hadoop/hbase/mapreduce/JarFinder.java b/hbase-mapreduce/src/main/java/org/apache/hadoop/hbase/mapreduce/JarFinder.java
new file mode 100644
index 0000000..953df62
--- /dev/null
+++ b/hbase-mapreduce/src/main/java/org/apache/hadoop/hbase/mapreduce/JarFinder.java
@@ -0,0 +1,186 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.hadoop.hbase.mapreduce;
+
+import org.apache.hadoop.hbase.shaded.com.google.common.base.Preconditions;
+
+import java.io.BufferedOutputStream;
+import java.io.File;
+import java.io.FileInputStream;
+import java.io.FileOutputStream;
+import java.io.IOException;
+import java.io.InputStream;
+import java.net.URL;
+import java.net.URLDecoder;
+import java.text.MessageFormat;
+import java.util.Enumeration;
+import java.util.jar.JarFile;
+import java.util.jar.JarOutputStream;
+import java.util.jar.Manifest;
+import java.util.zip.ZipEntry;
+import java.util.zip.ZipOutputStream;
+
+/**
+ * Finds the Jar for a class. If the class is in a directory in the
+ * classpath, it creates a Jar on the fly with the contents of the directory
+ * and returns the path to that Jar. If a Jar is created, it is created in
+ * the system temporary directory.
+ *
+ * This file was forked from hadoop/common/branches/branch-2@1377176.
+ */
+public class JarFinder {
+
+ private static void copyToZipStream(File file, ZipEntry entry,
+ ZipOutputStream zos) throws IOException {
+ InputStream is = new FileInputStream(file);
+ try {
+ zos.putNextEntry(entry);
+ byte[] arr = new byte[4096];
+ int read = is.read(arr);
+ while (read > -1) {
+ zos.write(arr, 0, read);
+ read = is.read(arr);
+ }
+ } finally {
+ try {
+ is.close();
+ } finally {
+ zos.closeEntry();
+ }
+ }
+ }
+
+ public static void jarDir(File dir, String relativePath, ZipOutputStream zos)
+ throws IOException {
+ Preconditions.checkNotNull(relativePath, "relativePath");
+ Preconditions.checkNotNull(zos, "zos");
+
+ // by JAR spec, if there is a manifest, it must be the first entry in the
+ // ZIP.
+ File manifestFile = new File(dir, JarFile.MANIFEST_NAME);
+ ZipEntry manifestEntry = new ZipEntry(JarFile.MANIFEST_NAME);
+ if (!manifestFile.exists()) {
+ zos.putNextEntry(manifestEntry);
+ new Manifest().write(new BufferedOutputStream(zos));
+ zos.closeEntry();
+ } else {
+ copyToZipStream(manifestFile, manifestEntry, zos);
+ }
+ zos.closeEntry();
+ zipDir(dir, relativePath, zos, true);
+ zos.close();
+ }
+
+ private static void zipDir(File dir, String relativePath, ZipOutputStream zos,
+ boolean start) throws IOException {
+ String[] dirList = dir.list();
+ if (dirList == null) {
+ return;
+ }
+ for (String aDirList : dirList) {
+ File f = new File(dir, aDirList);
+ if (!f.isHidden()) {
+ if (f.isDirectory()) {
+ if (!start) {
+ ZipEntry dirEntry = new ZipEntry(relativePath + f.getName() + "/");
+ zos.putNextEntry(dirEntry);
+ zos.closeEntry();
+ }
+ String filePath = f.getPath();
+ File file = new File(filePath);
+ zipDir(file, relativePath + f.getName() + "/", zos, false);
+ }
+ else {
+ String path = relativePath + f.getName();
+ if (!path.equals(JarFile.MANIFEST_NAME)) {
+ ZipEntry anEntry = new ZipEntry(path);
+ copyToZipStream(f, anEntry, zos);
+ }
+ }
+ }
+ }
+ }
+
+ private static void createJar(File dir, File jarFile) throws IOException {
+ Preconditions.checkNotNull(dir, "dir");
+ Preconditions.checkNotNull(jarFile, "jarFile");
+ File jarDir = jarFile.getParentFile();
+ if (!jarDir.exists()) {
+ if (!jarDir.mkdirs()) {
+ throw new IOException(MessageFormat.format("could not create dir [{0}]",
+ jarDir));
+ }
+ }
+ try (FileOutputStream fos = new FileOutputStream(jarFile);
+ JarOutputStream jos = new JarOutputStream(fos)) {
+ jarDir(dir, "", jos);
+ }
+ }
+
+ /**
+ * Returns the full path to the Jar containing the class. It always return a
+ * JAR.
+ *
+ * @param klass class.
+ *
+ * @return path to the Jar containing the class.
+ */
+ public static String getJar(Class klass) {
+ Preconditions.checkNotNull(klass, "klass");
+ ClassLoader loader = klass.getClassLoader();
+ if (loader != null) {
+ String class_file = klass.getName().replaceAll("\\.", "/") + ".class";
+ try {
+ for (Enumeration itr = loader.getResources(class_file);
+ itr.hasMoreElements(); ) {
+ URL url = (URL) itr.nextElement();
+ String path = url.getPath();
+ if (path.startsWith("file:")) {
+ path = path.substring("file:".length());
+ }
+ path = URLDecoder.decode(path, "UTF-8");
+ if ("jar".equals(url.getProtocol())) {
+ path = URLDecoder.decode(path, "UTF-8");
+ return path.replaceAll("!.*$", "");
+ }
+ else if ("file".equals(url.getProtocol())) {
+ String klassName = klass.getName();
+ klassName = klassName.replace(".", "/") + ".class";
+ path = path.substring(0, path.length() - klassName.length());
+ File baseDir = new File(path);
+ File testDir = new File(System.getProperty("test.build.dir", "target/test-dir"));
+ testDir = testDir.getAbsoluteFile();
+ if (!testDir.exists()) {
+ testDir.mkdirs();
+ }
+ File tempJar = File.createTempFile("hadoop-", "", testDir);
+ tempJar = new File(tempJar.getAbsolutePath() + ".jar");
+ tempJar.deleteOnExit();
+ createJar(baseDir, tempJar);
+ return tempJar.getAbsolutePath();
+ }
+ }
+ }
+ catch (IOException e) {
+ throw new RuntimeException(e);
+ }
+ }
+ return null;
+ }
+}
http://git-wip-us.apache.org/repos/asf/hbase/blob/664b6be0/hbase-mapreduce/src/main/java/org/apache/hadoop/hbase/mapreduce/KeyValueSerialization.java
----------------------------------------------------------------------
diff --git a/hbase-mapreduce/src/main/java/org/apache/hadoop/hbase/mapreduce/KeyValueSerialization.java b/hbase-mapreduce/src/main/java/org/apache/hadoop/hbase/mapreduce/KeyValueSerialization.java
new file mode 100644
index 0000000..241608b
--- /dev/null
+++ b/hbase-mapreduce/src/main/java/org/apache/hadoop/hbase/mapreduce/KeyValueSerialization.java
@@ -0,0 +1,88 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.mapreduce;
+
+import java.io.DataInputStream;
+import java.io.DataOutputStream;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStream;
+
+import org.apache.hadoop.hbase.KeyValue;
+import org.apache.hadoop.hbase.KeyValueUtil;
+import org.apache.hadoop.hbase.classification.InterfaceAudience;
+import org.apache.hadoop.io.serializer.Deserializer;
+import org.apache.hadoop.io.serializer.Serialization;
+import org.apache.hadoop.io.serializer.Serializer;
+
+@InterfaceAudience.Public
+public class KeyValueSerialization implements Serialization<KeyValue> {
+ @Override
+ public boolean accept(Class<?> c) {
+ return KeyValue.class.isAssignableFrom(c);
+ }
+
+ @Override
+ public KeyValueDeserializer getDeserializer(Class<KeyValue> t) {
+ return new KeyValueDeserializer();
+ }
+
+ @Override
+ public KeyValueSerializer getSerializer(Class<KeyValue> c) {
+ return new KeyValueSerializer();
+ }
+
+ public static class KeyValueDeserializer implements Deserializer<KeyValue> {
+ private DataInputStream dis;
+
+ @Override
+ public void close() throws IOException {
+ this.dis.close();
+ }
+
+ @Override
+ public KeyValue deserialize(KeyValue ignore) throws IOException {
+ // I can't overwrite the passed in KV, not from a proto kv, not just yet. TODO
+ return KeyValueUtil.create(this.dis);
+ }
+
+ @Override
+ public void open(InputStream is) throws IOException {
+ this.dis = new DataInputStream(is);
+ }
+ }
+
+ public static class KeyValueSerializer implements Serializer<KeyValue> {
+ private DataOutputStream dos;
+
+ @Override
+ public void close() throws IOException {
+ this.dos.close();
+ }
+
+ @Override
+ public void open(OutputStream os) throws IOException {
+ this.dos = new DataOutputStream(os);
+ }
+
+ @Override
+ public void serialize(KeyValue kv) throws IOException {
+ KeyValueUtil.write(kv, this.dos);
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/hbase/blob/664b6be0/hbase-mapreduce/src/main/java/org/apache/hadoop/hbase/mapreduce/KeyValueSortReducer.java
----------------------------------------------------------------------
diff --git a/hbase-mapreduce/src/main/java/org/apache/hadoop/hbase/mapreduce/KeyValueSortReducer.java b/hbase-mapreduce/src/main/java/org/apache/hadoop/hbase/mapreduce/KeyValueSortReducer.java
new file mode 100644
index 0000000..997e5a8
--- /dev/null
+++ b/hbase-mapreduce/src/main/java/org/apache/hadoop/hbase/mapreduce/KeyValueSortReducer.java
@@ -0,0 +1,57 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.mapreduce;
+
+import java.util.TreeSet;
+
+import org.apache.hadoop.hbase.classification.InterfaceAudience;
+import org.apache.hadoop.hbase.CellComparator;
+import org.apache.hadoop.hbase.KeyValue;
+import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
+import org.apache.hadoop.mapreduce.Reducer;
+
+/**
+ * Emits sorted KeyValues.
+ * Reads in all KeyValues from passed Iterator, sorts them, then emits
+ * KeyValues in sorted order. If lots of columns per row, it will use lots of
+ * memory sorting.
+ * @see HFileOutputFormat2
+ */
+@InterfaceAudience.Public
+public class KeyValueSortReducer
+ extends Reducer<ImmutableBytesWritable, KeyValue, ImmutableBytesWritable, KeyValue> {
+ protected void reduce(ImmutableBytesWritable row, Iterable<KeyValue> kvs,
+ Reducer<ImmutableBytesWritable, KeyValue, ImmutableBytesWritable, KeyValue>.Context context)
+ throws java.io.IOException, InterruptedException {
+ TreeSet<KeyValue> map = new TreeSet<>(CellComparator.COMPARATOR);
+ for (KeyValue kv: kvs) {
+ try {
+ map.add(kv.clone());
+ } catch (CloneNotSupportedException e) {
+ throw new java.io.IOException(e);
+ }
+ }
+ context.setStatus("Read " + map.getClass());
+ int index = 0;
+ for (KeyValue kv: map) {
+ context.write(row, kv);
+ if (++index % 100 == 0) context.setStatus("Wrote " + index);
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/hbase/blob/664b6be0/hbase-mapreduce/src/main/java/org/apache/hadoop/hbase/mapreduce/MultiTableHFileOutputFormat.java
----------------------------------------------------------------------
diff --git a/hbase-mapreduce/src/main/java/org/apache/hadoop/hbase/mapreduce/MultiTableHFileOutputFormat.java b/hbase-mapreduce/src/main/java/org/apache/hadoop/hbase/mapreduce/MultiTableHFileOutputFormat.java
new file mode 100644
index 0000000..9f783f1
--- /dev/null
+++ b/hbase-mapreduce/src/main/java/org/apache/hadoop/hbase/mapreduce/MultiTableHFileOutputFormat.java
@@ -0,0 +1,122 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hbase.mapreduce;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.hbase.classification.InterfaceAudience;
+import org.apache.hadoop.hbase.client.TableDescriptor;
+import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.hadoop.mapreduce.Job;
+
+import java.io.IOException;
+import java.nio.charset.Charset;
+import java.util.List;
+import org.apache.hadoop.hbase.shaded.com.google.common.annotations.VisibleForTesting;
+
+/**
+ * Create 3 level tree directory, first level is using table name as parent
+ * directory and then use family name as child directory, and all related HFiles
+ * for one family are under child directory
+ * -tableName1
+ * -columnFamilyName1
+ * -columnFamilyName2
+ * -HFiles
+ * -tableName2
+ * -columnFamilyName1
+ * -HFiles
+ * -columnFamilyName2
+ */
+@InterfaceAudience.Public
+@VisibleForTesting
+public class MultiTableHFileOutputFormat extends HFileOutputFormat2 {
+ private static final Log LOG = LogFactory.getLog(MultiTableHFileOutputFormat.class);
+
+ /**
+ * Creates a composite key to use as a mapper output key when using
+ * MultiTableHFileOutputFormat.configureIncrementaLoad to set up bulk ingest job
+ *
+ * @param tableName Name of the Table - Eg: TableName.getNameAsString()
+ * @param suffix Usually represents a rowkey when creating a mapper key or column family
+ * @return byte[] representation of composite key
+ */
+ public static byte[] createCompositeKey(byte[] tableName,
+ byte[] suffix) {
+ return combineTableNameSuffix(tableName, suffix);
+ }
+
+ /**
+ * Alternate api which accepts an ImmutableBytesWritable for the suffix
+ * @see MultiTableHFileOutputFormat#createCompositeKey(byte[], byte[])
+ */
+ public static byte[] createCompositeKey(byte[] tableName,
+ ImmutableBytesWritable suffix) {
+ return combineTableNameSuffix(tableName, suffix.get());
+ }
+
+ /**
+ * Alternate api which accepts a String for the tableName and ImmutableBytesWritable for the
+ * suffix
+ * @see MultiTableHFileOutputFormat#createCompositeKey(byte[], byte[])
+ */
+ public static byte[] createCompositeKey(String tableName,
+ ImmutableBytesWritable suffix) {
+ return combineTableNameSuffix(tableName.getBytes(Charset.forName("UTF-8")), suffix.get());
+ }
+
+ /**
+ * Analogous to
+ * {@link HFileOutputFormat2#configureIncrementalLoad(Job, TableDescriptor, RegionLocator)},
+ * this function will configure the requisite number of reducers to write HFiles for multple
+ * tables simultaneously
+ *
+ * @param job See {@link org.apache.hadoop.mapreduce.Job}
+ * @param multiTableDescriptors Table descriptor and region locator pairs
+ * @throws IOException
+ */
+ public static void configureIncrementalLoad(Job job, List<TableInfo>
+ multiTableDescriptors)
+ throws IOException {
+ MultiTableHFileOutputFormat.configureIncrementalLoad(job, multiTableDescriptors,
+ MultiTableHFileOutputFormat.class);
+ }
+
+ final private static int validateCompositeKey(byte[] keyBytes) {
+
+ int separatorIdx = Bytes.indexOf(keyBytes, tableSeparator);
+
+ // Either the separator was not found or a tablename wasn't present or a key wasn't present
+ if (separatorIdx == -1) {
+ throw new IllegalArgumentException("Invalid format for composite key [" + Bytes
+ .toStringBinary(keyBytes) + "]. Cannot extract tablename and suffix from key");
+ }
+ return separatorIdx;
+ }
+
+ protected static byte[] getTableName(byte[] keyBytes) {
+ int separatorIdx = validateCompositeKey(keyBytes);
+ return Bytes.copy(keyBytes, 0, separatorIdx);
+ }
+
+ protected static byte[] getSuffix(byte[] keyBytes) {
+ int separatorIdx = validateCompositeKey(keyBytes);
+ return Bytes.copy(keyBytes, separatorIdx+1, keyBytes.length - separatorIdx - 1);
+ }
+}
http://git-wip-us.apache.org/repos/asf/hbase/blob/664b6be0/hbase-mapreduce/src/main/java/org/apache/hadoop/hbase/mapreduce/MultiTableInputFormat.java
----------------------------------------------------------------------
diff --git a/hbase-mapreduce/src/main/java/org/apache/hadoop/hbase/mapreduce/MultiTableInputFormat.java b/hbase-mapreduce/src/main/java/org/apache/hadoop/hbase/mapreduce/MultiTableInputFormat.java
new file mode 100644
index 0000000..f8fb6dc
--- /dev/null
+++ b/hbase-mapreduce/src/main/java/org/apache/hadoop/hbase/mapreduce/MultiTableInputFormat.java
@@ -0,0 +1,104 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.mapreduce;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+
+import org.apache.hadoop.hbase.classification.InterfaceAudience;
+import org.apache.hadoop.conf.Configurable;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.client.Scan;
+
+/**
+ * Convert HBase tabular data from multiple scanners into a format that
+ * is consumable by Map/Reduce.
+ *
+ * <p>
+ * Usage example
+ * </p>
+ *
+ * <pre>
+ * List<Scan> scans = new ArrayList<Scan>();
+ *
+ * Scan scan1 = new Scan();
+ * scan1.setStartRow(firstRow1);
+ * scan1.setStopRow(lastRow1);
+ * scan1.setAttribute(Scan.SCAN_ATTRIBUTES_TABLE_NAME, table1);
+ * scans.add(scan1);
+ *
+ * Scan scan2 = new Scan();
+ * scan2.setStartRow(firstRow2);
+ * scan2.setStopRow(lastRow2);
+ * scan1.setAttribute(Scan.SCAN_ATTRIBUTES_TABLE_NAME, table2);
+ * scans.add(scan2);
+ *
+ * TableMapReduceUtil.initTableMapperJob(scans, TableMapper.class, Text.class,
+ * IntWritable.class, job);
+ * </pre>
+ */
+@InterfaceAudience.Public
+public class MultiTableInputFormat extends MultiTableInputFormatBase implements
+ Configurable {
+
+ /** Job parameter that specifies the scan list. */
+ public static final String SCANS = "hbase.mapreduce.scans";
+
+ /** The configuration. */
+ private Configuration conf = null;
+
+ /**
+ * Returns the current configuration.
+ *
+ * @return The current configuration.
+ * @see org.apache.hadoop.conf.Configurable#getConf()
+ */
+ @Override
+ public Configuration getConf() {
+ return conf;
+ }
+
+ /**
+ * Sets the configuration. This is used to set the details for the tables to
+ * be scanned.
+ *
+ * @param configuration The configuration to set.
+ * @see org.apache.hadoop.conf.Configurable#setConf(
+ * org.apache.hadoop.conf.Configuration)
+ */
+ @Override
+ public void setConf(Configuration configuration) {
+ this.conf = configuration;
+ String[] rawScans = conf.getStrings(SCANS);
+ if (rawScans.length <= 0) {
+ throw new IllegalArgumentException("There must be at least 1 scan configuration set to : "
+ + SCANS);
+ }
+ List<Scan> scans = new ArrayList<>();
+
+ for (int i = 0; i < rawScans.length; i++) {
+ try {
+ scans.add(TableMapReduceUtil.convertStringToScan(rawScans[i]));
+ } catch (IOException e) {
+ throw new RuntimeException("Failed to convert Scan : " + rawScans[i] + " to string", e);
+ }
+ }
+ this.setScans(scans);
+ }
+}
http://git-wip-us.apache.org/repos/asf/hbase/blob/664b6be0/hbase-mapreduce/src/main/java/org/apache/hadoop/hbase/mapreduce/MultiTableInputFormatBase.java
----------------------------------------------------------------------
diff --git a/hbase-mapreduce/src/main/java/org/apache/hadoop/hbase/mapreduce/MultiTableInputFormatBase.java b/hbase-mapreduce/src/main/java/org/apache/hadoop/hbase/mapreduce/MultiTableInputFormatBase.java
new file mode 100644
index 0000000..5d541a6
--- /dev/null
+++ b/hbase-mapreduce/src/main/java/org/apache/hadoop/hbase/mapreduce/MultiTableInputFormatBase.java
@@ -0,0 +1,296 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.mapreduce;
+
+import java.io.IOException;
+import java.text.MessageFormat;
+import java.util.ArrayList;
+import java.util.List;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.hbase.classification.InterfaceAudience;
+import org.apache.hadoop.hbase.HRegionInfo;
+import org.apache.hadoop.hbase.HRegionLocation;
+import org.apache.hadoop.hbase.TableName;
+import org.apache.hadoop.hbase.client.Connection;
+import org.apache.hadoop.hbase.client.ConnectionFactory;
+import org.apache.hadoop.hbase.client.RegionLocator;
+import org.apache.hadoop.hbase.client.Result;
+import org.apache.hadoop.hbase.client.Scan;
+import org.apache.hadoop.hbase.client.Table;
+import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.hadoop.hbase.util.Pair;
+import org.apache.hadoop.mapreduce.InputFormat;
+import org.apache.hadoop.mapreduce.InputSplit;
+import org.apache.hadoop.mapreduce.JobContext;
+import org.apache.hadoop.mapreduce.RecordReader;
+import org.apache.hadoop.mapreduce.TaskAttemptContext;
+
+import java.util.Map;
+import java.util.HashMap;
+import java.util.Iterator;
+/**
+ * A base for {@link MultiTableInputFormat}s. Receives a list of
+ * {@link Scan} instances that define the input tables and
+ * filters etc. Subclasses may use other TableRecordReader implementations.
+ */
+@InterfaceAudience.Public
+public abstract class MultiTableInputFormatBase extends
+ InputFormat<ImmutableBytesWritable, Result> {
+
+ private static final Log LOG = LogFactory.getLog(MultiTableInputFormatBase.class);
+
+ /** Holds the set of scans used to define the input. */
+ private List<Scan> scans;
+
+ /** The reader scanning the table, can be a custom one. */
+ private TableRecordReader tableRecordReader = null;
+
+ /**
+ * Builds a TableRecordReader. If no TableRecordReader was provided, uses the
+ * default.
+ *
+ * @param split The split to work with.
+ * @param context The current context.
+ * @return The newly created record reader.
+ * @throws IOException When creating the reader fails.
+ * @throws InterruptedException when record reader initialization fails
+ * @see org.apache.hadoop.mapreduce.InputFormat#createRecordReader(
+ * org.apache.hadoop.mapreduce.InputSplit,
+ * org.apache.hadoop.mapreduce.TaskAttemptContext)
+ */
+ @Override
+ public RecordReader<ImmutableBytesWritable, Result> createRecordReader(
+ InputSplit split, TaskAttemptContext context)
+ throws IOException, InterruptedException {
+ TableSplit tSplit = (TableSplit) split;
+ LOG.info(MessageFormat.format("Input split length: {0} bytes.", tSplit.getLength()));
+
+ if (tSplit.getTable() == null) {
+ throw new IOException("Cannot create a record reader because of a"
+ + " previous error. Please look at the previous logs lines from"
+ + " the task's full log for more details.");
+ }
+ final Connection connection = ConnectionFactory.createConnection(context.getConfiguration());
+ Table table = connection.getTable(tSplit.getTable());
+
+ if (this.tableRecordReader == null) {
+ this.tableRecordReader = new TableRecordReader();
+ }
+ final TableRecordReader trr = this.tableRecordReader;
+
+ try {
+ Scan sc = tSplit.getScan();
+ sc.setStartRow(tSplit.getStartRow());
+ sc.setStopRow(tSplit.getEndRow());
+ trr.setScan(sc);
+ trr.setTable(table);
+ return new RecordReader<ImmutableBytesWritable, Result>() {
+
+ @Override
+ public void close() throws IOException {
+ trr.close();
+ connection.close();
+ }
+
+ @Override
+ public ImmutableBytesWritable getCurrentKey() throws IOException, InterruptedException {
+ return trr.getCurrentKey();
+ }
+
+ @Override
+ public Result getCurrentValue() throws IOException, InterruptedException {
+ return trr.getCurrentValue();
+ }
+
+ @Override
+ public float getProgress() throws IOException, InterruptedException {
+ return trr.getProgress();
+ }
+
+ @Override
+ public void initialize(InputSplit inputsplit, TaskAttemptContext context)
+ throws IOException, InterruptedException {
+ trr.initialize(inputsplit, context);
+ }
+
+ @Override
+ public boolean nextKeyValue() throws IOException, InterruptedException {
+ return trr.nextKeyValue();
+ }
+ };
+ } catch (IOException ioe) {
+ // If there is an exception make sure that all
+ // resources are closed and released.
+ trr.close();
+ connection.close();
+ throw ioe;
+ }
+ }
+
+ /**
+ * Calculates the splits that will serve as input for the map tasks. The
+ * number of splits matches the number of regions in a table.
+ *
+ * @param context The current job context.
+ * @return The list of input splits.
+ * @throws IOException When creating the list of splits fails.
+ * @see org.apache.hadoop.mapreduce.InputFormat#getSplits(org.apache.hadoop.mapreduce.JobContext)
+ */
+ @Override
+ public List<InputSplit> getSplits(JobContext context) throws IOException {
+ if (scans.isEmpty()) {
+ throw new IOException("No scans were provided.");
+ }
+
+ Map<TableName, List<Scan>> tableMaps = new HashMap<>();
+ for (Scan scan : scans) {
+ byte[] tableNameBytes = scan.getAttribute(Scan.SCAN_ATTRIBUTES_TABLE_NAME);
+ if (tableNameBytes == null)
+ throw new IOException("A scan object did not have a table name");
+
+ TableName tableName = TableName.valueOf(tableNameBytes);
+
+ List<Scan> scanList = tableMaps.get(tableName);
+ if (scanList == null) {
+ scanList = new ArrayList<>();
+ tableMaps.put(tableName, scanList);
+ }
+ scanList.add(scan);
+ }
+
+ List<InputSplit> splits = new ArrayList<>();
+ Iterator iter = tableMaps.entrySet().iterator();
+ while (iter.hasNext()) {
+ Map.Entry<TableName, List<Scan>> entry = (Map.Entry<TableName, List<Scan>>) iter.next();
+ TableName tableName = entry.getKey();
+ List<Scan> scanList = entry.getValue();
+
+ try (Connection conn = ConnectionFactory.createConnection(context.getConfiguration());
+ Table table = conn.getTable(tableName);
+ RegionLocator regionLocator = conn.getRegionLocator(tableName)) {
+ RegionSizeCalculator sizeCalculator = new RegionSizeCalculator(
+ regionLocator, conn.getAdmin());
+ Pair<byte[][], byte[][]> keys = regionLocator.getStartEndKeys();
+ for (Scan scan : scanList) {
+ if (keys == null || keys.getFirst() == null || keys.getFirst().length == 0) {
+ throw new IOException("Expecting at least one region for table : "
+ + tableName.getNameAsString());
+ }
+ int count = 0;
+
+ byte[] startRow = scan.getStartRow();
+ byte[] stopRow = scan.getStopRow();
+
+ for (int i = 0; i < keys.getFirst().length; i++) {
+ if (!includeRegionInSplit(keys.getFirst()[i], keys.getSecond()[i])) {
+ continue;
+ }
+
+ if ((startRow.length == 0 || keys.getSecond()[i].length == 0 ||
+ Bytes.compareTo(startRow, keys.getSecond()[i]) < 0) &&
+ (stopRow.length == 0 || Bytes.compareTo(stopRow,
+ keys.getFirst()[i]) > 0)) {
+ byte[] splitStart = startRow.length == 0 ||
+ Bytes.compareTo(keys.getFirst()[i], startRow) >= 0 ?
+ keys.getFirst()[i] : startRow;
+ byte[] splitStop = (stopRow.length == 0 ||
+ Bytes.compareTo(keys.getSecond()[i], stopRow) <= 0) &&
+ keys.getSecond()[i].length > 0 ?
+ keys.getSecond()[i] : stopRow;
+
+ HRegionLocation hregionLocation = regionLocator.getRegionLocation(
+ keys.getFirst()[i], false);
+ String regionHostname = hregionLocation.getHostname();
+ HRegionInfo regionInfo = hregionLocation.getRegionInfo();
+ String encodedRegionName = regionInfo.getEncodedName();
+ long regionSize = sizeCalculator.getRegionSize(
+ regionInfo.getRegionName());
+
+ TableSplit split = new TableSplit(table.getName(),
+ scan, splitStart, splitStop, regionHostname,
+ encodedRegionName, regionSize);
+
+ splits.add(split);
+
+ if (LOG.isDebugEnabled())
+ LOG.debug("getSplits: split -> " + (count++) + " -> " + split);
+ }
+ }
+ }
+ }
+ }
+
+ return splits;
+ }
+
+ /**
+ * Test if the given region is to be included in the InputSplit while
+ * splitting the regions of a table.
+ * <p>
+ * This optimization is effective when there is a specific reasoning to
+ * exclude an entire region from the M-R job, (and hence, not contributing to
+ * the InputSplit), given the start and end keys of the same. <br>
+ * Useful when we need to remember the last-processed top record and revisit
+ * the [last, current) interval for M-R processing, continuously. In addition
+ * to reducing InputSplits, reduces the load on the region server as well, due
+ * to the ordering of the keys. <br>
+ * <br>
+ * Note: It is possible that <code>endKey.length() == 0 </code> , for the last
+ * (recent) region. <br>
+ * Override this method, if you want to bulk exclude regions altogether from
+ * M-R. By default, no region is excluded( i.e. all regions are included).
+ *
+ * @param startKey Start key of the region
+ * @param endKey End key of the region
+ * @return true, if this region needs to be included as part of the input
+ * (default).
+ */
+ protected boolean includeRegionInSplit(final byte[] startKey,
+ final byte[] endKey) {
+ return true;
+ }
+
+ /**
+ * Allows subclasses to get the list of {@link Scan} objects.
+ */
+ protected List<Scan> getScans() {
+ return this.scans;
+ }
+
+ /**
+ * Allows subclasses to set the list of {@link Scan} objects.
+ *
+ * @param scans The list of {@link Scan} used to define the input
+ */
+ protected void setScans(List<Scan> scans) {
+ this.scans = scans;
+ }
+
+ /**
+ * Allows subclasses to set the {@link TableRecordReader}.
+ *
+ * @param tableRecordReader A different {@link TableRecordReader}
+ * implementation.
+ */
+ protected void setTableRecordReader(TableRecordReader tableRecordReader) {
+ this.tableRecordReader = tableRecordReader;
+ }
+}
http://git-wip-us.apache.org/repos/asf/hbase/blob/664b6be0/hbase-mapreduce/src/main/java/org/apache/hadoop/hbase/mapreduce/MultiTableOutputFormat.java
----------------------------------------------------------------------
diff --git a/hbase-mapreduce/src/main/java/org/apache/hadoop/hbase/mapreduce/MultiTableOutputFormat.java b/hbase-mapreduce/src/main/java/org/apache/hadoop/hbase/mapreduce/MultiTableOutputFormat.java
new file mode 100644
index 0000000..4cc784f
--- /dev/null
+++ b/hbase-mapreduce/src/main/java/org/apache/hadoop/hbase/mapreduce/MultiTableOutputFormat.java
@@ -0,0 +1,176 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.mapreduce;
+
+import java.io.IOException;
+import java.util.HashMap;
+import java.util.Map;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.hbase.classification.InterfaceAudience;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.HBaseConfiguration;
+import org.apache.hadoop.hbase.TableName;
+import org.apache.hadoop.hbase.client.BufferedMutator;
+import org.apache.hadoop.hbase.client.Connection;
+import org.apache.hadoop.hbase.client.ConnectionFactory;
+import org.apache.hadoop.hbase.client.Delete;
+import org.apache.hadoop.hbase.client.Mutation;
+import org.apache.hadoop.hbase.client.Put;
+import org.apache.hadoop.hbase.client.Durability;
+import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.hadoop.mapreduce.JobContext;
+import org.apache.hadoop.mapreduce.OutputCommitter;
+import org.apache.hadoop.mapreduce.OutputFormat;
+import org.apache.hadoop.mapreduce.RecordWriter;
+import org.apache.hadoop.mapreduce.TaskAttemptContext;
+
+/**
+ * <p>
+ * Hadoop output format that writes to one or more HBase tables. The key is
+ * taken to be the table name while the output value <em>must</em> be either a
+ * {@link Put} or a {@link Delete} instance. All tables must already exist, and
+ * all Puts and Deletes must reference only valid column families.
+ * </p>
+ *
+ * <p>
+ * Write-ahead logging (WAL) for Puts can be disabled by setting
+ * {@link #WAL_PROPERTY} to {@link #WAL_OFF}. Default value is {@link #WAL_ON}.
+ * Note that disabling write-ahead logging is only appropriate for jobs where
+ * loss of data due to region server failure can be tolerated (for example,
+ * because it is easy to rerun a bulk import).
+ * </p>
+ */
+@InterfaceAudience.Public
+public class MultiTableOutputFormat extends OutputFormat<ImmutableBytesWritable, Mutation> {
+ /** Set this to {@link #WAL_OFF} to turn off write-ahead logging (WAL) */
+ public static final String WAL_PROPERTY = "hbase.mapreduce.multitableoutputformat.wal";
+ /** Property value to use write-ahead logging */
+ public static final boolean WAL_ON = true;
+ /** Property value to disable write-ahead logging */
+ public static final boolean WAL_OFF = false;
+ /**
+ * Record writer for outputting to multiple HTables.
+ */
+ protected static class MultiTableRecordWriter extends
+ RecordWriter<ImmutableBytesWritable, Mutation> {
+ private static final Log LOG = LogFactory.getLog(MultiTableRecordWriter.class);
+ Connection connection;
+ Map<ImmutableBytesWritable, BufferedMutator> mutatorMap = new HashMap<>();
+ Configuration conf;
+ boolean useWriteAheadLogging;
+
+ /**
+ * @param conf
+ * HBaseConfiguration to used
+ * @param useWriteAheadLogging
+ * whether to use write ahead logging. This can be turned off (
+ * <tt>false</tt>) to improve performance when bulk loading data.
+ */
+ public MultiTableRecordWriter(Configuration conf,
+ boolean useWriteAheadLogging) throws IOException {
+ LOG.debug("Created new MultiTableRecordReader with WAL "
+ + (useWriteAheadLogging ? "on" : "off"));
+ this.conf = conf;
+ this.useWriteAheadLogging = useWriteAheadLogging;
+ }
+
+ /**
+ * @param tableName
+ * the name of the table, as a string
+ * @return the named mutator
+ * @throws IOException
+ * if there is a problem opening a table
+ */
+ BufferedMutator getBufferedMutator(ImmutableBytesWritable tableName) throws IOException {
+ if(this.connection == null){
+ this.connection = ConnectionFactory.createConnection(conf);
+ }
+ if (!mutatorMap.containsKey(tableName)) {
+ LOG.debug("Opening HTable \"" + Bytes.toString(tableName.get())+ "\" for writing");
+
+ BufferedMutator mutator =
+ connection.getBufferedMutator(TableName.valueOf(tableName.get()));
+ mutatorMap.put(tableName, mutator);
+ }
+ return mutatorMap.get(tableName);
+ }
+
+ @Override
+ public void close(TaskAttemptContext context) throws IOException {
+ for (BufferedMutator mutator : mutatorMap.values()) {
+ mutator.close();
+ }
+ if (connection != null) {
+ connection.close();
+ }
+ }
+
+ /**
+ * Writes an action (Put or Delete) to the specified table.
+ *
+ * @param tableName
+ * the table being updated.
+ * @param action
+ * the update, either a put or a delete.
+ * @throws IllegalArgumentException
+ * if the action is not a put or a delete.
+ */
+ @Override
+ public void write(ImmutableBytesWritable tableName, Mutation action) throws IOException {
+ BufferedMutator mutator = getBufferedMutator(tableName);
+ // The actions are not immutable, so we defensively copy them
+ if (action instanceof Put) {
+ Put put = new Put((Put) action);
+ put.setDurability(useWriteAheadLogging ? Durability.SYNC_WAL
+ : Durability.SKIP_WAL);
+ mutator.mutate(put);
+ } else if (action instanceof Delete) {
+ Delete delete = new Delete((Delete) action);
+ mutator.mutate(delete);
+ } else
+ throw new IllegalArgumentException(
+ "action must be either Delete or Put");
+ }
+ }
+
+ @Override
+ public void checkOutputSpecs(JobContext context) throws IOException,
+ InterruptedException {
+ // we can't know ahead of time if it's going to blow up when the user
+ // passes a table name that doesn't exist, so nothing useful here.
+ }
+
+ @Override
+ public OutputCommitter getOutputCommitter(TaskAttemptContext context)
+ throws IOException, InterruptedException {
+ return new TableOutputCommitter();
+ }
+
+ @Override
+ public RecordWriter<ImmutableBytesWritable, Mutation> getRecordWriter(TaskAttemptContext context)
+ throws IOException, InterruptedException {
+ Configuration conf = context.getConfiguration();
+ return new MultiTableRecordWriter(HBaseConfiguration.create(conf),
+ conf.getBoolean(WAL_PROPERTY, WAL_ON));
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/hbase/blob/664b6be0/hbase-mapreduce/src/main/java/org/apache/hadoop/hbase/mapreduce/MultiTableSnapshotInputFormat.java
----------------------------------------------------------------------
diff --git a/hbase-mapreduce/src/main/java/org/apache/hadoop/hbase/mapreduce/MultiTableSnapshotInputFormat.java b/hbase-mapreduce/src/main/java/org/apache/hadoop/hbase/mapreduce/MultiTableSnapshotInputFormat.java
new file mode 100644
index 0000000..e7538a8
--- /dev/null
+++ b/hbase-mapreduce/src/main/java/org/apache/hadoop/hbase/mapreduce/MultiTableSnapshotInputFormat.java
@@ -0,0 +1,106 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hbase.mapreduce;
+
+import org.apache.hadoop.hbase.shaded.com.google.common.collect.Lists;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hbase.classification.InterfaceAudience;
+import org.apache.hadoop.hbase.client.Scan;
+import org.apache.hadoop.mapreduce.InputSplit;
+import org.apache.hadoop.mapreduce.JobContext;
+
+import java.io.IOException;
+import java.util.Collection;
+import java.util.List;
+import java.util.Map;
+
+/**
+ * MultiTableSnapshotInputFormat generalizes
+ * {@link TableSnapshotInputFormat}
+ * allowing a MapReduce job to run over one or more table snapshots, with one or more scans
+ * configured for each.
+ * Internally, the input format delegates to
+ * {@link TableSnapshotInputFormat}
+ * and thus has the same performance advantages;
+ * see {@link TableSnapshotInputFormat} for
+ * more details.
+ * Usage is similar to TableSnapshotInputFormat, with the following exception:
+ * initMultiTableSnapshotMapperJob takes in a map
+ * from snapshot name to a collection of scans. For each snapshot in the map, each corresponding
+ * scan will be applied;
+ * the overall dataset for the job is defined by the concatenation of the regions and tables
+ * included in each snapshot/scan
+ * pair.
+ * {@link TableMapReduceUtil#initMultiTableSnapshotMapperJob
+ * (java.util.Map, Class, Class, Class, org.apache.hadoop.mapreduce.Job, boolean, org.apache
+ * .hadoop.fs.Path)}
+ * can be used to configure the job.
+ * <pre>{@code
+ * Job job = new Job(conf);
+ * Map<String, Collection<Scan>> snapshotScans = ImmutableMap.of(
+ * "snapshot1", ImmutableList.of(new Scan(Bytes.toBytes("a"), Bytes.toBytes("b"))),
+ * "snapshot2", ImmutableList.of(new Scan(Bytes.toBytes("1"), Bytes.toBytes("2")))
+ * );
+ * Path restoreDir = new Path("/tmp/snapshot_restore_dir")
+ * TableMapReduceUtil.initTableSnapshotMapperJob(
+ * snapshotScans, MyTableMapper.class, MyMapKeyOutput.class,
+ * MyMapOutputValueWritable.class, job, true, restoreDir);
+ * }
+ * </pre>
+ * Internally, this input format restores each snapshot into a subdirectory of the given tmp
+ * directory. Input splits and
+ * record readers are created as described in {@link org.apache.hadoop.hbase.mapreduce
+ * .TableSnapshotInputFormat}
+ * (one per region).
+ * See {@link TableSnapshotInputFormat} for more notes on
+ * permissioning; the
+ * same caveats apply here.
+ *
+ * @see TableSnapshotInputFormat
+ * @see org.apache.hadoop.hbase.client.TableSnapshotScanner
+ */
+@InterfaceAudience.Public
+public class MultiTableSnapshotInputFormat extends TableSnapshotInputFormat {
+
+ private final MultiTableSnapshotInputFormatImpl delegate;
+
+ public MultiTableSnapshotInputFormat() {
+ this.delegate = new MultiTableSnapshotInputFormatImpl();
+ }
+
+ @Override
+ public List<InputSplit> getSplits(JobContext jobContext)
+ throws IOException, InterruptedException {
+ List<TableSnapshotInputFormatImpl.InputSplit> splits =
+ delegate.getSplits(jobContext.getConfiguration());
+ List<InputSplit> rtn = Lists.newArrayListWithCapacity(splits.size());
+
+ for (TableSnapshotInputFormatImpl.InputSplit split : splits) {
+ rtn.add(new TableSnapshotInputFormat.TableSnapshotRegionSplit(split));
+ }
+
+ return rtn;
+ }
+
+ public static void setInput(Configuration configuration,
+ Map<String, Collection<Scan>> snapshotScans, Path tmpRestoreDir) throws IOException {
+ new MultiTableSnapshotInputFormatImpl().setInput(configuration, snapshotScans, tmpRestoreDir);
+ }
+}