You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hbase.apache.org by ap...@apache.org on 2017/08/26 01:39:21 UTC
[21/41] hbase git commit: HBASE-18640 Move mapreduce out of
hbase-server into separate module.
http://git-wip-us.apache.org/repos/asf/hbase/blob/664b6be0/hbase-mapreduce/src/test/java/org/apache/hadoop/hbase/util/LoadTestTool.java
----------------------------------------------------------------------
diff --git a/hbase-mapreduce/src/test/java/org/apache/hadoop/hbase/util/LoadTestTool.java b/hbase-mapreduce/src/test/java/org/apache/hadoop/hbase/util/LoadTestTool.java
new file mode 100644
index 0000000..6b5cbe2
--- /dev/null
+++ b/hbase-mapreduce/src/test/java/org/apache/hadoop/hbase/util/LoadTestTool.java
@@ -0,0 +1,915 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with this
+ * work for additional information regarding copyright ownership. The ASF
+ * licenses this file to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.hadoop.hbase.util;
+
+import java.io.IOException;
+import java.io.InterruptedIOException;
+import java.lang.reflect.Constructor;
+import java.security.SecureRandom;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+import java.util.Properties;
+import java.util.Random;
+import java.util.concurrent.atomic.AtomicReference;
+
+import javax.crypto.spec.SecretKeySpec;
+
+import org.apache.commons.cli.CommandLine;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.HBaseConfiguration;
+import org.apache.hadoop.hbase.HBaseInterfaceAudience;
+import org.apache.hadoop.hbase.HBaseTestingUtility;
+import org.apache.hadoop.hbase.HColumnDescriptor;
+import org.apache.hadoop.hbase.HConstants;
+import org.apache.hadoop.hbase.TableName;
+import org.apache.hadoop.hbase.classification.InterfaceAudience;
+import org.apache.hadoop.hbase.client.Admin;
+import org.apache.hadoop.hbase.client.Connection;
+import org.apache.hadoop.hbase.client.ConnectionFactory;
+import org.apache.hadoop.hbase.client.Durability;
+import org.apache.hadoop.hbase.client.TableDescriptor;
+import org.apache.hadoop.hbase.io.compress.Compression;
+import org.apache.hadoop.hbase.io.crypto.Cipher;
+import org.apache.hadoop.hbase.io.crypto.Encryption;
+import org.apache.hadoop.hbase.io.encoding.DataBlockEncoding;
+import org.apache.hadoop.hbase.regionserver.BloomType;
+import org.apache.hadoop.hbase.security.EncryptionUtil;
+import org.apache.hadoop.hbase.security.HBaseKerberosUtils;
+import org.apache.hadoop.hbase.security.User;
+import org.apache.hadoop.hbase.security.access.AccessControlClient;
+import org.apache.hadoop.hbase.security.access.Permission;
+import org.apache.hadoop.hbase.util.test.LoadTestDataGenerator;
+import org.apache.hadoop.hbase.util.test.LoadTestDataGeneratorWithACL;
+import org.apache.hadoop.util.ToolRunner;
+
+/**
+ * A command-line utility that reads, writes, and verifies data. Unlike
+ * {@link org.apache.hadoop.hbase.PerformanceEvaluation}, this tool validates the data written,
+ * and supports simultaneously writing and reading the same set of keys.
+ */
+@InterfaceAudience.LimitedPrivate(HBaseInterfaceAudience.TOOLS)
+public class LoadTestTool extends AbstractHBaseTool {
+
+ private static final Log LOG = LogFactory.getLog(LoadTestTool.class);
+ private static final String COLON = ":";
+
+ /** Table name for the test */
+ private TableName tableName;
+
+ /** Column families for the test */
+ private byte[][] families;
+
+ /** Table name to use of not overridden on the command line */
+ protected static final String DEFAULT_TABLE_NAME = "cluster_test";
+
+ /** The default data size if not specified */
+ protected static final int DEFAULT_DATA_SIZE = 64;
+
+ /** The number of reader/writer threads if not specified */
+ protected static final int DEFAULT_NUM_THREADS = 20;
+
+ /** Usage string for the load option */
+ protected static final String OPT_USAGE_LOAD =
+ "<avg_cols_per_key>:<avg_data_size>" +
+ "[:<#threads=" + DEFAULT_NUM_THREADS + ">]";
+
+ /** Usage string for the read option */
+ protected static final String OPT_USAGE_READ =
+ "<verify_percent>[:<#threads=" + DEFAULT_NUM_THREADS + ">]";
+
+ /** Usage string for the update option */
+ protected static final String OPT_USAGE_UPDATE =
+ "<update_percent>[:<#threads=" + DEFAULT_NUM_THREADS
+ + ">][:<#whether to ignore nonce collisions=0>]";
+
+ protected static final String OPT_USAGE_BLOOM = "Bloom filter type, one of " +
+ Arrays.toString(BloomType.values());
+
+ protected static final String OPT_USAGE_COMPRESSION = "Compression type, " +
+ "one of " + Arrays.toString(Compression.Algorithm.values());
+
+ public static final String OPT_BLOOM = "bloom";
+ public static final String OPT_COMPRESSION = "compression";
+ public static final String OPT_DEFERRED_LOG_FLUSH = "deferredlogflush";
+ public static final String OPT_DEFERRED_LOG_FLUSH_USAGE = "Enable deferred log flush.";
+
+ public static final String OPT_INMEMORY = "in_memory";
+ public static final String OPT_USAGE_IN_MEMORY = "Tries to keep the HFiles of the CF " +
+ "inmemory as far as possible. Not guaranteed that reads are always served from inmemory";
+
+ public static final String OPT_GENERATOR = "generator";
+ public static final String OPT_GENERATOR_USAGE = "The class which generates load for the tool."
+ + " Any args for this class can be passed as colon separated after class name";
+
+ public static final String OPT_WRITER = "writer";
+ public static final String OPT_WRITER_USAGE = "The class for executing the write requests";
+
+ public static final String OPT_UPDATER = "updater";
+ public static final String OPT_UPDATER_USAGE = "The class for executing the update requests";
+
+ public static final String OPT_READER = "reader";
+ public static final String OPT_READER_USAGE = "The class for executing the read requests";
+
+ protected static final String OPT_KEY_WINDOW = "key_window";
+ protected static final String OPT_WRITE = "write";
+ protected static final String OPT_MAX_READ_ERRORS = "max_read_errors";
+ public static final String OPT_MULTIPUT = "multiput";
+ public static final String OPT_MULTIGET = "multiget_batchsize";
+ protected static final String OPT_NUM_KEYS = "num_keys";
+ protected static final String OPT_READ = "read";
+ protected static final String OPT_START_KEY = "start_key";
+ public static final String OPT_TABLE_NAME = "tn";
+ public static final String OPT_COLUMN_FAMILIES = "families";
+ protected static final String OPT_ZK_QUORUM = "zk";
+ protected static final String OPT_ZK_PARENT_NODE = "zk_root";
+ protected static final String OPT_SKIP_INIT = "skip_init";
+ protected static final String OPT_INIT_ONLY = "init_only";
+ protected static final String NUM_TABLES = "num_tables";
+ protected static final String OPT_REGIONS_PER_SERVER = "regions_per_server";
+ protected static final String OPT_BATCHUPDATE = "batchupdate";
+ protected static final String OPT_UPDATE = "update";
+
+ public static final String OPT_ENCRYPTION = "encryption";
+ protected static final String OPT_ENCRYPTION_USAGE =
+ "Enables transparent encryption on the test table, one of " +
+ Arrays.toString(Encryption.getSupportedCiphers());
+
+ public static final String OPT_NUM_REGIONS_PER_SERVER = "num_regions_per_server";
+ protected static final String OPT_NUM_REGIONS_PER_SERVER_USAGE
+ = "Desired number of regions per region server. Defaults to 5.";
+ public static int DEFAULT_NUM_REGIONS_PER_SERVER = 5;
+
+ public static final String OPT_REGION_REPLICATION = "region_replication";
+ protected static final String OPT_REGION_REPLICATION_USAGE =
+ "Desired number of replicas per region";
+
+ public static final String OPT_REGION_REPLICA_ID = "region_replica_id";
+ protected static final String OPT_REGION_REPLICA_ID_USAGE =
+ "Region replica id to do the reads from";
+
+ public static final String OPT_MOB_THRESHOLD = "mob_threshold";
+ protected static final String OPT_MOB_THRESHOLD_USAGE =
+ "Desired cell size to exceed in bytes that will use the MOB write path";
+
+ protected static final long DEFAULT_START_KEY = 0;
+
+ /** This will be removed as we factor out the dependency on command line */
+ protected CommandLine cmd;
+
+ protected MultiThreadedWriter writerThreads = null;
+ protected MultiThreadedReader readerThreads = null;
+ protected MultiThreadedUpdater updaterThreads = null;
+
+ protected long startKey, endKey;
+
+ protected boolean isWrite, isRead, isUpdate;
+ protected boolean deferredLogFlush;
+
+ // Column family options
+ protected DataBlockEncoding dataBlockEncodingAlgo;
+ protected Compression.Algorithm compressAlgo;
+ protected BloomType bloomType;
+ private boolean inMemoryCF;
+
+ private User userOwner;
+ // Writer options
+ protected int numWriterThreads = DEFAULT_NUM_THREADS;
+ protected int minColsPerKey, maxColsPerKey;
+ protected int minColDataSize = DEFAULT_DATA_SIZE, maxColDataSize = DEFAULT_DATA_SIZE;
+ protected boolean isMultiPut;
+
+ // Updater options
+ protected int numUpdaterThreads = DEFAULT_NUM_THREADS;
+ protected int updatePercent;
+ protected boolean ignoreConflicts = false;
+ protected boolean isBatchUpdate;
+
+ // Reader options
+ private int numReaderThreads = DEFAULT_NUM_THREADS;
+ private int keyWindow = MultiThreadedReader.DEFAULT_KEY_WINDOW;
+ private int multiGetBatchSize = MultiThreadedReader.DEFAULT_BATCH_SIZE;
+ private int maxReadErrors = MultiThreadedReader.DEFAULT_MAX_ERRORS;
+ private int verifyPercent;
+
+ private int numTables = 1;
+
+ private String superUser;
+
+ private String userNames;
+ //This file is used to read authentication information in secure clusters.
+ private String authnFileName;
+
+ private int numRegionsPerServer = DEFAULT_NUM_REGIONS_PER_SERVER;
+ private int regionReplication = -1; // not set
+ private int regionReplicaId = -1; // not set
+
+ private int mobThreshold = -1; // not set
+
+ // TODO: refactor LoadTestToolImpl somewhere to make the usage from tests less bad,
+ // console tool itself should only be used from console.
+ protected boolean isSkipInit = false;
+ protected boolean isInitOnly = false;
+
+ protected Cipher cipher = null;
+
+ protected String[] splitColonSeparated(String option,
+ int minNumCols, int maxNumCols) {
+ String optVal = cmd.getOptionValue(option);
+ String[] cols = optVal.split(COLON);
+ if (cols.length < minNumCols || cols.length > maxNumCols) {
+ throw new IllegalArgumentException("Expected at least "
+ + minNumCols + " columns but no more than " + maxNumCols +
+ " in the colon-separated value '" + optVal + "' of the " +
+ "-" + option + " option");
+ }
+ return cols;
+ }
+
+ protected int getNumThreads(String numThreadsStr) {
+ return parseInt(numThreadsStr, 1, Short.MAX_VALUE);
+ }
+
+ public byte[][] getColumnFamilies() {
+ return families;
+ }
+
+ /**
+ * Apply column family options such as Bloom filters, compression, and data
+ * block encoding.
+ */
+ protected void applyColumnFamilyOptions(TableName tableName,
+ byte[][] columnFamilies) throws IOException {
+ try (Connection conn = ConnectionFactory.createConnection(conf);
+ Admin admin = conn.getAdmin()) {
+ TableDescriptor tableDesc = admin.getTableDescriptor(tableName);
+ LOG.info("Disabling table " + tableName);
+ admin.disableTable(tableName);
+ for (byte[] cf : columnFamilies) {
+ HColumnDescriptor columnDesc = (HColumnDescriptor) tableDesc.getColumnFamily(cf);
+ boolean isNewCf = columnDesc == null;
+ if (isNewCf) {
+ columnDesc = new HColumnDescriptor(cf);
+ }
+ if (bloomType != null) {
+ columnDesc.setBloomFilterType(bloomType);
+ }
+ if (compressAlgo != null) {
+ columnDesc.setCompressionType(compressAlgo);
+ }
+ if (dataBlockEncodingAlgo != null) {
+ columnDesc.setDataBlockEncoding(dataBlockEncodingAlgo);
+ }
+ if (inMemoryCF) {
+ columnDesc.setInMemory(inMemoryCF);
+ }
+ if (cipher != null) {
+ byte[] keyBytes = new byte[cipher.getKeyLength()];
+ new SecureRandom().nextBytes(keyBytes);
+ columnDesc.setEncryptionType(cipher.getName());
+ columnDesc.setEncryptionKey(
+ EncryptionUtil.wrapKey(conf,
+ User.getCurrent().getShortName(),
+ new SecretKeySpec(keyBytes,
+ cipher.getName())));
+ }
+ if (mobThreshold >= 0) {
+ columnDesc.setMobEnabled(true);
+ columnDesc.setMobThreshold(mobThreshold);
+ }
+
+ if (isNewCf) {
+ admin.addColumnFamily(tableName, columnDesc);
+ } else {
+ admin.modifyColumnFamily(tableName, columnDesc);
+ }
+ }
+ LOG.info("Enabling table " + tableName);
+ admin.enableTable(tableName);
+ }
+ }
+
+ @Override
+ protected void addOptions() {
+ addOptWithArg(OPT_ZK_QUORUM, "ZK quorum as comma-separated host names " +
+ "without port numbers");
+ addOptWithArg(OPT_ZK_PARENT_NODE, "name of parent znode in zookeeper");
+ addOptWithArg(OPT_TABLE_NAME, "The name of the table to read or write");
+ addOptWithArg(OPT_COLUMN_FAMILIES, "The name of the column families to use separated by comma");
+ addOptWithArg(OPT_WRITE, OPT_USAGE_LOAD);
+ addOptWithArg(OPT_READ, OPT_USAGE_READ);
+ addOptWithArg(OPT_UPDATE, OPT_USAGE_UPDATE);
+ addOptNoArg(OPT_INIT_ONLY, "Initialize the test table only, don't do any loading");
+ addOptWithArg(OPT_BLOOM, OPT_USAGE_BLOOM);
+ addOptWithArg(OPT_COMPRESSION, OPT_USAGE_COMPRESSION);
+ addOptWithArg(HFileTestUtil.OPT_DATA_BLOCK_ENCODING, HFileTestUtil.OPT_DATA_BLOCK_ENCODING_USAGE);
+ addOptWithArg(OPT_MAX_READ_ERRORS, "The maximum number of read errors " +
+ "to tolerate before terminating all reader threads. The default is " +
+ MultiThreadedReader.DEFAULT_MAX_ERRORS + ".");
+ addOptWithArg(OPT_MULTIGET, "Whether to use multi-gets as opposed to " +
+ "separate gets for every column in a row");
+ addOptWithArg(OPT_KEY_WINDOW, "The 'key window' to maintain between " +
+ "reads and writes for concurrent write/read workload. The default " +
+ "is " + MultiThreadedReader.DEFAULT_KEY_WINDOW + ".");
+
+ addOptNoArg(OPT_MULTIPUT, "Whether to use multi-puts as opposed to " +
+ "separate puts for every column in a row");
+ addOptNoArg(OPT_BATCHUPDATE, "Whether to use batch as opposed to " +
+ "separate updates for every column in a row");
+ addOptNoArg(OPT_INMEMORY, OPT_USAGE_IN_MEMORY);
+ addOptWithArg(OPT_GENERATOR, OPT_GENERATOR_USAGE);
+ addOptWithArg(OPT_WRITER, OPT_WRITER_USAGE);
+ addOptWithArg(OPT_UPDATER, OPT_UPDATER_USAGE);
+ addOptWithArg(OPT_READER, OPT_READER_USAGE);
+
+ addOptWithArg(OPT_NUM_KEYS, "The number of keys to read/write");
+ addOptWithArg(OPT_START_KEY, "The first key to read/write " +
+ "(a 0-based index). The default value is " +
+ DEFAULT_START_KEY + ".");
+ addOptNoArg(OPT_SKIP_INIT, "Skip the initialization; assume test table "
+ + "already exists");
+
+ addOptWithArg(NUM_TABLES,
+ "A positive integer number. When a number n is speicfied, load test "
+ + "tool will load n table parallely. -tn parameter value becomes "
+ + "table name prefix. Each table name is in format <tn>_1...<tn>_n");
+
+ addOptWithArg(OPT_REGIONS_PER_SERVER,
+ "A positive integer number. When a number n is specified, load test "
+ + "tool will create the test table with n regions per server");
+
+ addOptWithArg(OPT_ENCRYPTION, OPT_ENCRYPTION_USAGE);
+ addOptNoArg(OPT_DEFERRED_LOG_FLUSH, OPT_DEFERRED_LOG_FLUSH_USAGE);
+ addOptWithArg(OPT_NUM_REGIONS_PER_SERVER, OPT_NUM_REGIONS_PER_SERVER_USAGE);
+ addOptWithArg(OPT_REGION_REPLICATION, OPT_REGION_REPLICATION_USAGE);
+ addOptWithArg(OPT_REGION_REPLICA_ID, OPT_REGION_REPLICA_ID_USAGE);
+ addOptWithArg(OPT_MOB_THRESHOLD, OPT_MOB_THRESHOLD_USAGE);
+ }
+
+ @Override
+ protected void processOptions(CommandLine cmd) {
+ this.cmd = cmd;
+
+ tableName = TableName.valueOf(cmd.getOptionValue(OPT_TABLE_NAME,
+ DEFAULT_TABLE_NAME));
+
+ if (cmd.hasOption(OPT_COLUMN_FAMILIES)) {
+ String[] list = cmd.getOptionValue(OPT_COLUMN_FAMILIES).split(",");
+ families = new byte[list.length][];
+ for (int i = 0; i < list.length; i++) {
+ families[i] = Bytes.toBytes(list[i]);
+ }
+ } else {
+ families = HFileTestUtil.DEFAULT_COLUMN_FAMILIES;
+ }
+
+ isWrite = cmd.hasOption(OPT_WRITE);
+ isRead = cmd.hasOption(OPT_READ);
+ isUpdate = cmd.hasOption(OPT_UPDATE);
+ isInitOnly = cmd.hasOption(OPT_INIT_ONLY);
+ deferredLogFlush = cmd.hasOption(OPT_DEFERRED_LOG_FLUSH);
+
+ if (!isWrite && !isRead && !isUpdate && !isInitOnly) {
+ throw new IllegalArgumentException("Either -" + OPT_WRITE + " or " +
+ "-" + OPT_UPDATE + " or -" + OPT_READ + " has to be specified");
+ }
+
+ if (isInitOnly && (isRead || isWrite || isUpdate)) {
+ throw new IllegalArgumentException(OPT_INIT_ONLY + " cannot be specified with"
+ + " either -" + OPT_WRITE + " or -" + OPT_UPDATE + " or -" + OPT_READ);
+ }
+
+ if (!isInitOnly) {
+ if (!cmd.hasOption(OPT_NUM_KEYS)) {
+ throw new IllegalArgumentException(OPT_NUM_KEYS + " must be specified in "
+ + "read or write mode");
+ }
+ startKey = parseLong(cmd.getOptionValue(OPT_START_KEY,
+ String.valueOf(DEFAULT_START_KEY)), 0, Long.MAX_VALUE);
+ long numKeys = parseLong(cmd.getOptionValue(OPT_NUM_KEYS), 1,
+ Long.MAX_VALUE - startKey);
+ endKey = startKey + numKeys;
+ isSkipInit = cmd.hasOption(OPT_SKIP_INIT);
+ System.out.println("Key range: [" + startKey + ".." + (endKey - 1) + "]");
+ }
+
+ parseColumnFamilyOptions(cmd);
+
+ if (isWrite) {
+ String[] writeOpts = splitColonSeparated(OPT_WRITE, 2, 3);
+
+ int colIndex = 0;
+ minColsPerKey = 1;
+ maxColsPerKey = 2 * Integer.parseInt(writeOpts[colIndex++]);
+ int avgColDataSize =
+ parseInt(writeOpts[colIndex++], 1, Integer.MAX_VALUE);
+ minColDataSize = avgColDataSize / 2;
+ maxColDataSize = avgColDataSize * 3 / 2;
+
+ if (colIndex < writeOpts.length) {
+ numWriterThreads = getNumThreads(writeOpts[colIndex++]);
+ }
+
+ isMultiPut = cmd.hasOption(OPT_MULTIPUT);
+
+ mobThreshold = -1;
+ if (cmd.hasOption(OPT_MOB_THRESHOLD)) {
+ mobThreshold = Integer.parseInt(cmd.getOptionValue(OPT_MOB_THRESHOLD));
+ }
+
+ System.out.println("Multi-puts: " + isMultiPut);
+ System.out.println("Columns per key: " + minColsPerKey + ".."
+ + maxColsPerKey);
+ System.out.println("Data size per column: " + minColDataSize + ".."
+ + maxColDataSize);
+ }
+
+ if (isUpdate) {
+ String[] mutateOpts = splitColonSeparated(OPT_UPDATE, 1, 3);
+ int colIndex = 0;
+ updatePercent = parseInt(mutateOpts[colIndex++], 0, 100);
+ if (colIndex < mutateOpts.length) {
+ numUpdaterThreads = getNumThreads(mutateOpts[colIndex++]);
+ }
+ if (colIndex < mutateOpts.length) {
+ ignoreConflicts = parseInt(mutateOpts[colIndex++], 0, 1) == 1;
+ }
+
+ isBatchUpdate = cmd.hasOption(OPT_BATCHUPDATE);
+
+ System.out.println("Batch updates: " + isBatchUpdate);
+ System.out.println("Percent of keys to update: " + updatePercent);
+ System.out.println("Updater threads: " + numUpdaterThreads);
+ System.out.println("Ignore nonce conflicts: " + ignoreConflicts);
+ }
+
+ if (isRead) {
+ String[] readOpts = splitColonSeparated(OPT_READ, 1, 2);
+ int colIndex = 0;
+ verifyPercent = parseInt(readOpts[colIndex++], 0, 100);
+ if (colIndex < readOpts.length) {
+ numReaderThreads = getNumThreads(readOpts[colIndex++]);
+ }
+
+ if (cmd.hasOption(OPT_MAX_READ_ERRORS)) {
+ maxReadErrors = parseInt(cmd.getOptionValue(OPT_MAX_READ_ERRORS),
+ 0, Integer.MAX_VALUE);
+ }
+
+ if (cmd.hasOption(OPT_KEY_WINDOW)) {
+ keyWindow = parseInt(cmd.getOptionValue(OPT_KEY_WINDOW),
+ 0, Integer.MAX_VALUE);
+ }
+
+ if (cmd.hasOption(OPT_MULTIGET)) {
+ multiGetBatchSize = parseInt(cmd.getOptionValue(OPT_MULTIGET),
+ 0, Integer.MAX_VALUE);
+ }
+
+ System.out.println("Multi-gets (value of 1 means no multigets): " + multiGetBatchSize);
+ System.out.println("Percent of keys to verify: " + verifyPercent);
+ System.out.println("Reader threads: " + numReaderThreads);
+ }
+
+ numTables = 1;
+ if (cmd.hasOption(NUM_TABLES)) {
+ numTables = parseInt(cmd.getOptionValue(NUM_TABLES), 1, Short.MAX_VALUE);
+ }
+
+ numRegionsPerServer = DEFAULT_NUM_REGIONS_PER_SERVER;
+ if (cmd.hasOption(OPT_NUM_REGIONS_PER_SERVER)) {
+ numRegionsPerServer = Integer.parseInt(cmd.getOptionValue(OPT_NUM_REGIONS_PER_SERVER));
+ }
+
+ regionReplication = 1;
+ if (cmd.hasOption(OPT_REGION_REPLICATION)) {
+ regionReplication = Integer.parseInt(cmd.getOptionValue(OPT_REGION_REPLICATION));
+ }
+
+ regionReplicaId = -1;
+ if (cmd.hasOption(OPT_REGION_REPLICA_ID)) {
+ regionReplicaId = Integer.parseInt(cmd.getOptionValue(OPT_REGION_REPLICA_ID));
+ }
+ }
+
+ private void parseColumnFamilyOptions(CommandLine cmd) {
+ String dataBlockEncodingStr = cmd.getOptionValue(HFileTestUtil.OPT_DATA_BLOCK_ENCODING);
+ dataBlockEncodingAlgo = dataBlockEncodingStr == null ? null :
+ DataBlockEncoding.valueOf(dataBlockEncodingStr);
+
+ String compressStr = cmd.getOptionValue(OPT_COMPRESSION);
+ compressAlgo = compressStr == null ? Compression.Algorithm.NONE :
+ Compression.Algorithm.valueOf(compressStr);
+
+ String bloomStr = cmd.getOptionValue(OPT_BLOOM);
+ bloomType = bloomStr == null ? BloomType.ROW :
+ BloomType.valueOf(bloomStr);
+
+ inMemoryCF = cmd.hasOption(OPT_INMEMORY);
+ if (cmd.hasOption(OPT_ENCRYPTION)) {
+ cipher = Encryption.getCipher(conf, cmd.getOptionValue(OPT_ENCRYPTION));
+ }
+
+ }
+
+ public void initTestTable() throws IOException {
+ Durability durability = Durability.USE_DEFAULT;
+ if (deferredLogFlush) {
+ durability = Durability.ASYNC_WAL;
+ }
+
+ HBaseTestingUtility.createPreSplitLoadTestTable(conf, tableName,
+ getColumnFamilies(), compressAlgo, dataBlockEncodingAlgo, numRegionsPerServer,
+ regionReplication, durability);
+ applyColumnFamilyOptions(tableName, getColumnFamilies());
+ }
+
+ @Override
+ protected int doWork() throws IOException {
+ if (numTables > 1) {
+ return parallelLoadTables();
+ } else {
+ return loadTable();
+ }
+ }
+
+ protected int loadTable() throws IOException {
+ if (cmd.hasOption(OPT_ZK_QUORUM)) {
+ conf.set(HConstants.ZOOKEEPER_QUORUM, cmd.getOptionValue(OPT_ZK_QUORUM));
+ }
+ if (cmd.hasOption(OPT_ZK_PARENT_NODE)) {
+ conf.set(HConstants.ZOOKEEPER_ZNODE_PARENT, cmd.getOptionValue(OPT_ZK_PARENT_NODE));
+ }
+
+ if (isInitOnly) {
+ LOG.info("Initializing only; no reads or writes");
+ initTestTable();
+ return 0;
+ }
+
+ if (!isSkipInit) {
+ initTestTable();
+ }
+ LoadTestDataGenerator dataGen = null;
+ if (cmd.hasOption(OPT_GENERATOR)) {
+ String[] clazzAndArgs = cmd.getOptionValue(OPT_GENERATOR).split(COLON);
+ dataGen = getLoadGeneratorInstance(clazzAndArgs[0]);
+ String[] args;
+ if (dataGen instanceof LoadTestDataGeneratorWithACL) {
+ LOG.info("Using LoadTestDataGeneratorWithACL");
+ if (User.isHBaseSecurityEnabled(conf)) {
+ LOG.info("Security is enabled");
+ authnFileName = clazzAndArgs[1];
+ superUser = clazzAndArgs[2];
+ userNames = clazzAndArgs[3];
+ args = Arrays.copyOfRange(clazzAndArgs, 2, clazzAndArgs.length);
+ Properties authConfig = new Properties();
+ authConfig.load(this.getClass().getClassLoader().getResourceAsStream(authnFileName));
+ try {
+ addAuthInfoToConf(authConfig, conf, superUser, userNames);
+ } catch (IOException exp) {
+ LOG.error(exp);
+ return EXIT_FAILURE;
+ }
+ userOwner = User.create(HBaseKerberosUtils.loginAndReturnUGI(conf, superUser));
+ } else {
+ superUser = clazzAndArgs[1];
+ userNames = clazzAndArgs[2];
+ args = Arrays.copyOfRange(clazzAndArgs, 1, clazzAndArgs.length);
+ userOwner = User.createUserForTesting(conf, superUser, new String[0]);
+ }
+ } else {
+ args = clazzAndArgs.length == 1 ? new String[0] : Arrays.copyOfRange(clazzAndArgs, 1,
+ clazzAndArgs.length);
+ }
+ dataGen.initialize(args);
+ } else {
+ // Default DataGenerator is MultiThreadedAction.DefaultDataGenerator
+ dataGen = new MultiThreadedAction.DefaultDataGenerator(minColDataSize, maxColDataSize,
+ minColsPerKey, maxColsPerKey, families);
+ }
+
+ if (userOwner != null) {
+ LOG.info("Granting permissions for user " + userOwner.getShortName());
+ Permission.Action[] actions = {
+ Permission.Action.ADMIN, Permission.Action.CREATE,
+ Permission.Action.READ, Permission.Action.WRITE };
+ try {
+ AccessControlClient.grant(ConnectionFactory.createConnection(conf),
+ tableName, userOwner.getShortName(), null, null, actions);
+ } catch (Throwable e) {
+ LOG.fatal("Error in granting permission for the user " + userOwner.getShortName(), e);
+ return EXIT_FAILURE;
+ }
+ }
+
+ if (userNames != null) {
+ // This will be comma separated list of expressions.
+ String users[] = userNames.split(",");
+ User user = null;
+ for (String userStr : users) {
+ if (User.isHBaseSecurityEnabled(conf)) {
+ user = User.create(HBaseKerberosUtils.loginAndReturnUGI(conf, userStr));
+ } else {
+ user = User.createUserForTesting(conf, userStr, new String[0]);
+ }
+ }
+ }
+
+ if (isWrite) {
+ if (userOwner != null) {
+ writerThreads = new MultiThreadedWriterWithACL(dataGen, conf, tableName, userOwner);
+ } else {
+ String writerClass = null;
+ if (cmd.hasOption(OPT_WRITER)) {
+ writerClass = cmd.getOptionValue(OPT_WRITER);
+ } else {
+ writerClass = MultiThreadedWriter.class.getCanonicalName();
+ }
+
+ writerThreads = getMultiThreadedWriterInstance(writerClass, dataGen);
+ }
+ writerThreads.setMultiPut(isMultiPut);
+ }
+
+ if (isUpdate) {
+ if (userOwner != null) {
+ updaterThreads = new MultiThreadedUpdaterWithACL(dataGen, conf, tableName, updatePercent,
+ userOwner, userNames);
+ } else {
+ String updaterClass = null;
+ if (cmd.hasOption(OPT_UPDATER)) {
+ updaterClass = cmd.getOptionValue(OPT_UPDATER);
+ } else {
+ updaterClass = MultiThreadedUpdater.class.getCanonicalName();
+ }
+ updaterThreads = getMultiThreadedUpdaterInstance(updaterClass, dataGen);
+ }
+ updaterThreads.setBatchUpdate(isBatchUpdate);
+ updaterThreads.setIgnoreNonceConflicts(ignoreConflicts);
+ }
+
+ if (isRead) {
+ if (userOwner != null) {
+ readerThreads = new MultiThreadedReaderWithACL(dataGen, conf, tableName, verifyPercent,
+ userNames);
+ } else {
+ String readerClass = null;
+ if (cmd.hasOption(OPT_READER)) {
+ readerClass = cmd.getOptionValue(OPT_READER);
+ } else {
+ readerClass = MultiThreadedReader.class.getCanonicalName();
+ }
+ readerThreads = getMultiThreadedReaderInstance(readerClass, dataGen);
+ }
+ readerThreads.setMaxErrors(maxReadErrors);
+ readerThreads.setKeyWindow(keyWindow);
+ readerThreads.setMultiGetBatchSize(multiGetBatchSize);
+ readerThreads.setRegionReplicaId(regionReplicaId);
+ }
+
+ if (isUpdate && isWrite) {
+ LOG.info("Concurrent write/update workload: making updaters aware of the " +
+ "write point");
+ updaterThreads.linkToWriter(writerThreads);
+ }
+
+ if (isRead && (isUpdate || isWrite)) {
+ LOG.info("Concurrent write/read workload: making readers aware of the " +
+ "write point");
+ readerThreads.linkToWriter(isUpdate ? updaterThreads : writerThreads);
+ }
+
+ if (isWrite) {
+ System.out.println("Starting to write data...");
+ writerThreads.start(startKey, endKey, numWriterThreads);
+ }
+
+ if (isUpdate) {
+ LOG.info("Starting to mutate data...");
+ System.out.println("Starting to mutate data...");
+ // TODO : currently append and increment operations not tested with tags
+ // Will update this aftet it is done
+ updaterThreads.start(startKey, endKey, numUpdaterThreads);
+ }
+
+ if (isRead) {
+ System.out.println("Starting to read data...");
+ readerThreads.start(startKey, endKey, numReaderThreads);
+ }
+
+ if (isWrite) {
+ writerThreads.waitForFinish();
+ }
+
+ if (isUpdate) {
+ updaterThreads.waitForFinish();
+ }
+
+ if (isRead) {
+ readerThreads.waitForFinish();
+ }
+
+ boolean success = true;
+ if (isWrite) {
+ success = success && writerThreads.getNumWriteFailures() == 0;
+ }
+ if (isUpdate) {
+ success = success && updaterThreads.getNumWriteFailures() == 0;
+ }
+ if (isRead) {
+ success = success && readerThreads.getNumReadErrors() == 0
+ && readerThreads.getNumReadFailures() == 0;
+ }
+ return success ? EXIT_SUCCESS : EXIT_FAILURE;
+ }
+
+ private LoadTestDataGenerator getLoadGeneratorInstance(String clazzName) throws IOException {
+ try {
+ Class<?> clazz = Class.forName(clazzName);
+ Constructor<?> constructor = clazz.getConstructor(int.class, int.class, int.class, int.class,
+ byte[][].class);
+ return (LoadTestDataGenerator) constructor.newInstance(minColDataSize, maxColDataSize,
+ minColsPerKey, maxColsPerKey, families);
+ } catch (Exception e) {
+ throw new IOException(e);
+ }
+ }
+
+ private MultiThreadedWriter getMultiThreadedWriterInstance(String clazzName
+ , LoadTestDataGenerator dataGen) throws IOException {
+ try {
+ Class<?> clazz = Class.forName(clazzName);
+ Constructor<?> constructor = clazz.getConstructor(
+ LoadTestDataGenerator.class, Configuration.class, TableName.class);
+ return (MultiThreadedWriter) constructor.newInstance(dataGen, conf, tableName);
+ } catch (Exception e) {
+ throw new IOException(e);
+ }
+ }
+
+ private MultiThreadedUpdater getMultiThreadedUpdaterInstance(String clazzName
+ , LoadTestDataGenerator dataGen) throws IOException {
+ try {
+ Class<?> clazz = Class.forName(clazzName);
+ Constructor<?> constructor = clazz.getConstructor(
+ LoadTestDataGenerator.class, Configuration.class, TableName.class, double.class);
+ return (MultiThreadedUpdater) constructor.newInstance(
+ dataGen, conf, tableName, updatePercent);
+ } catch (Exception e) {
+ throw new IOException(e);
+ }
+ }
+
+ private MultiThreadedReader getMultiThreadedReaderInstance(String clazzName
+ , LoadTestDataGenerator dataGen) throws IOException {
+ try {
+ Class<?> clazz = Class.forName(clazzName);
+ Constructor<?> constructor = clazz.getConstructor(
+ LoadTestDataGenerator.class, Configuration.class, TableName.class, double.class);
+ return (MultiThreadedReader) constructor.newInstance(dataGen, conf, tableName, verifyPercent);
+ } catch (Exception e) {
+ throw new IOException(e);
+ }
+ }
+
+ public static void main(String[] args) {
+ new LoadTestTool().doStaticMain(args);
+ }
+
+ /**
+ * When NUM_TABLES is specified, the function starts multiple worker threads
+ * which individually start a LoadTestTool instance to load a table. Each
+ * table name is in format <tn>_<index>. For example, "-tn test -num_tables 2"
+ * , table names will be "test_1", "test_2"
+ *
+ * @throws IOException
+ */
+ private int parallelLoadTables()
+ throws IOException {
+ // create new command args
+ String tableName = cmd.getOptionValue(OPT_TABLE_NAME, DEFAULT_TABLE_NAME);
+ String[] newArgs = null;
+ if (!cmd.hasOption(LoadTestTool.OPT_TABLE_NAME)) {
+ newArgs = new String[cmdLineArgs.length + 2];
+ newArgs[0] = "-" + LoadTestTool.OPT_TABLE_NAME;
+ newArgs[1] = LoadTestTool.DEFAULT_TABLE_NAME;
+ System.arraycopy(cmdLineArgs, 0, newArgs, 2, cmdLineArgs.length);
+ } else {
+ newArgs = cmdLineArgs;
+ }
+
+ int tableNameValueIndex = -1;
+ for (int j = 0; j < newArgs.length; j++) {
+ if (newArgs[j].endsWith(OPT_TABLE_NAME)) {
+ tableNameValueIndex = j + 1;
+ } else if (newArgs[j].endsWith(NUM_TABLES)) {
+ // change NUM_TABLES to 1 so that each worker loads one table
+ newArgs[j + 1] = "1";
+ }
+ }
+
+ // starting to load multiple tables
+ List<WorkerThread> workers = new ArrayList<>();
+ for (int i = 0; i < numTables; i++) {
+ String[] workerArgs = newArgs.clone();
+ workerArgs[tableNameValueIndex] = tableName + "_" + (i+1);
+ WorkerThread worker = new WorkerThread(i, workerArgs);
+ workers.add(worker);
+ LOG.info(worker + " starting");
+ worker.start();
+ }
+
+ // wait for all workers finish
+ LOG.info("Waiting for worker threads to finish");
+ for (WorkerThread t : workers) {
+ try {
+ t.join();
+ } catch (InterruptedException ie) {
+ IOException iie = new InterruptedIOException();
+ iie.initCause(ie);
+ throw iie;
+ }
+ checkForErrors();
+ }
+
+ return EXIT_SUCCESS;
+ }
+
+ // If an exception is thrown by one of worker threads, it will be
+ // stored here.
+ protected AtomicReference<Throwable> thrown = new AtomicReference<>();
+
+ private void workerThreadError(Throwable t) {
+ thrown.compareAndSet(null, t);
+ }
+
+ /**
+ * Check for errors in the writer threads. If any is found, rethrow it.
+ */
+ private void checkForErrors() throws IOException {
+ Throwable thrown = this.thrown.get();
+ if (thrown == null) return;
+ if (thrown instanceof IOException) {
+ throw (IOException) thrown;
+ } else {
+ throw new RuntimeException(thrown);
+ }
+ }
+
+ class WorkerThread extends Thread {
+ private String[] workerArgs;
+
+ WorkerThread(int i, String[] args) {
+ super("WorkerThread-" + i);
+ workerArgs = args;
+ }
+
+ @Override
+ public void run() {
+ try {
+ int ret = ToolRunner.run(HBaseConfiguration.create(), new LoadTestTool(), workerArgs);
+ if (ret != 0) {
+ throw new RuntimeException("LoadTestTool exit with non-zero return code.");
+ }
+ } catch (Exception ex) {
+ LOG.error("Error in worker thread", ex);
+ workerThreadError(ex);
+ }
+ }
+ }
+
+ private void addAuthInfoToConf(Properties authConfig, Configuration conf, String owner,
+ String userList) throws IOException {
+ List<String> users = new ArrayList(Arrays.asList(userList.split(",")));
+ users.add(owner);
+ for (String user : users) {
+ String keyTabFileConfKey = "hbase." + user + ".keytab.file";
+ String principalConfKey = "hbase." + user + ".kerberos.principal";
+ if (!authConfig.containsKey(keyTabFileConfKey) || !authConfig.containsKey(principalConfKey)) {
+ throw new IOException("Authentication configs missing for user : " + user);
+ }
+ }
+ for (String key : authConfig.stringPropertyNames()) {
+ conf.set(key, authConfig.getProperty(key));
+ }
+ LOG.debug("Added authentication properties to config successfully.");
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/hbase/blob/664b6be0/hbase-mapreduce/src/test/resources/hbase-site.xml
----------------------------------------------------------------------
diff --git a/hbase-mapreduce/src/test/resources/hbase-site.xml b/hbase-mapreduce/src/test/resources/hbase-site.xml
new file mode 100644
index 0000000..64a1964
--- /dev/null
+++ b/hbase-mapreduce/src/test/resources/hbase-site.xml
@@ -0,0 +1,161 @@
+<?xml version="1.0"?>
+<?xml-stylesheet type="text/xsl" href="configuration.xsl"?>
+<!--
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+-->
+<configuration>
+ <property>
+ <name>hbase.regionserver.msginterval</name>
+ <value>1000</value>
+ <description>Interval between messages from the RegionServer to HMaster
+ in milliseconds. Default is 15. Set this value low if you want unit
+ tests to be responsive.
+ </description>
+ </property>
+ <property>
+ <name>hbase.defaults.for.version.skip</name>
+ <value>true</value>
+ </property>
+ <property>
+ <name>hbase.server.thread.wakefrequency</name>
+ <value>1000</value>
+ <description>Time to sleep in between searches for work (in milliseconds).
+ Used as sleep interval by service threads such as hbase:meta scanner and log roller.
+ </description>
+ </property>
+ <property>
+ <name>hbase.master.event.waiting.time</name>
+ <value>50</value>
+ <description>Time to sleep between checks to see if a table event took place.
+ </description>
+ </property>
+ <property>
+ <name>hbase.regionserver.handler.count</name>
+ <value>5</value>
+ </property>
+ <property>
+ <name>hbase.regionserver.metahandler.count</name>
+ <value>6</value>
+ </property>
+ <property>
+ <name>hbase.ipc.server.read.threadpool.size</name>
+ <value>3</value>
+ </property>
+ <property>
+ <name>hbase.master.info.port</name>
+ <value>-1</value>
+ <description>The port for the hbase master web UI
+ Set to -1 if you do not want the info server to run.
+ </description>
+ </property>
+ <property>
+ <name>hbase.master.port</name>
+ <value>0</value>
+ <description>Always have masters and regionservers come up on port '0' so we don't clash over
+ default ports.
+ </description>
+ </property>
+ <property>
+ <name>hbase.regionserver.port</name>
+ <value>0</value>
+ <description>Always have masters and regionservers come up on port '0' so we don't clash over
+ default ports.
+ </description>
+ </property>
+ <property>
+ <name>hbase.ipc.client.fallback-to-simple-auth-allowed</name>
+ <value>true</value>
+ </property>
+
+ <property>
+ <name>hbase.regionserver.info.port</name>
+ <value>-1</value>
+ <description>The port for the hbase regionserver web UI
+ Set to -1 if you do not want the info server to run.
+ </description>
+ </property>
+ <property>
+ <name>hbase.regionserver.info.port.auto</name>
+ <value>true</value>
+ <description>Info server auto port bind. Enables automatic port
+ search if hbase.regionserver.info.port is already in use.
+ Enabled for testing to run multiple tests on one machine.
+ </description>
+ </property>
+ <property>
+ <name>hbase.regionserver.safemode</name>
+ <value>false</value>
+ <description>
+ Turn on/off safe mode in region server. Always on for production, always off
+ for tests.
+ </description>
+ </property>
+ <property>
+ <name>hbase.hregion.max.filesize</name>
+ <value>67108864</value>
+ <description>
+ Maximum desired file size for an HRegion. If filesize exceeds
+ value + (value / 2), the HRegion is split in two. Default: 256M.
+
+ Keep the maximum filesize small so we split more often in tests.
+ </description>
+ </property>
+ <property>
+ <name>hadoop.log.dir</name>
+ <value>${user.dir}/../logs</value>
+ </property>
+ <property>
+ <name>hbase.zookeeper.property.clientPort</name>
+ <value>21818</value>
+ <description>Property from ZooKeeper's config zoo.cfg.
+ The port at which the clients will connect.
+ </description>
+ </property>
+ <property>
+ <name>hbase.defaults.for.version.skip</name>
+ <value>true</value>
+ <description>
+ Set to true to skip the 'hbase.defaults.for.version'.
+ Setting this to true can be useful in contexts other than
+ the other side of a maven generation; i.e. running in an
+ ide. You'll want to set this boolean to true to avoid
+ seeing the RuntimeException complaint: "hbase-default.xml file
+ seems to be for and old version of HBase (@@@VERSION@@@), this
+ version is X.X.X-SNAPSHOT"
+ </description>
+ </property>
+ <property>
+ <name>hbase.table.sanity.checks</name>
+ <value>false</value>
+ <description>Skip sanity checks in tests
+ </description>
+ </property>
+ <property>
+ <name>hbase.procedure.fail.on.corruption</name>
+ <value>true</value>
+ <description>
+ Enable replay sanity checks on procedure tests.
+ </description>
+ </property>
+ <property>
+ <name>hbase.hconnection.threads.keepalivetime</name>
+ <value>3</value>
+ </property>
+</configuration>
http://git-wip-us.apache.org/repos/asf/hbase/blob/664b6be0/hbase-mapreduce/src/test/resources/hbase-site2.xml
----------------------------------------------------------------------
diff --git a/hbase-mapreduce/src/test/resources/hbase-site2.xml b/hbase-mapreduce/src/test/resources/hbase-site2.xml
new file mode 100644
index 0000000..8bef31a
--- /dev/null
+++ b/hbase-mapreduce/src/test/resources/hbase-site2.xml
@@ -0,0 +1,146 @@
+<?xml version="1.0"?>
+<?xml-stylesheet type="text/xsl" href="configuration.xsl"?>
+<!--
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+-->
+<configuration>
+ <property>
+ <name>hbase.custom.config</name>
+ <value>1000</value>
+ </property>
+ <property>
+ <name>hbase.regionserver.msginterval</name>
+ <value>1000</value>
+ <description>Interval between messages from the RegionServer to HMaster
+ in milliseconds. Default is 15. Set this value low if you want unit
+ tests to be responsive.
+ </description>
+ </property>
+ <property>
+ <name>hbase.defaults.for.version.skip</name>
+ <value>true</value>
+ </property>
+ <property>
+ <name>hbase.server.thread.wakefrequency</name>
+ <value>1000</value>
+ <description>Time to sleep in between searches for work (in milliseconds).
+ Used as sleep interval by service threads such as hbase:meta scanner and log roller.
+ </description>
+ </property>
+ <property>
+ <name>hbase.master.event.waiting.time</name>
+ <value>50</value>
+ <description>Time to sleep between checks to see if a table event took place.
+ </description>
+ </property>
+ <property>
+ <name>hbase.regionserver.handler.count</name>
+ <value>5</value>
+ </property>
+ <property>
+ <name>hbase.master.info.port</name>
+ <value>-1</value>
+ <description>The port for the hbase master web UI
+ Set to -1 if you do not want the info server to run.
+ </description>
+ </property>
+ <property>
+ <name>hbase.master.port</name>
+ <value>0</value>
+ <description>Always have masters and regionservers come up on port '0' so we don't clash over
+ default ports.
+ </description>
+ </property>
+ <property>
+ <name>hbase.regionserver.port</name>
+ <value>0</value>
+ <description>Always have masters and regionservers come up on port '0' so we don't clash over
+ default ports.
+ </description>
+ </property>
+ <property>
+ <name>hbase.ipc.client.fallback-to-simple-auth-allowed</name>
+ <value>true</value>
+ </property>
+
+ <property>
+ <name>hbase.regionserver.info.port</name>
+ <value>-1</value>
+ <description>The port for the hbase regionserver web UI
+ Set to -1 if you do not want the info server to run.
+ </description>
+ </property>
+ <property>
+ <name>hbase.regionserver.info.port.auto</name>
+ <value>true</value>
+ <description>Info server auto port bind. Enables automatic port
+ search if hbase.regionserver.info.port is already in use.
+ Enabled for testing to run multiple tests on one machine.
+ </description>
+ </property>
+ <property>
+ <name>hbase.regionserver.safemode</name>
+ <value>false</value>
+ <description>
+ Turn on/off safe mode in region server. Always on for production, always off
+ for tests.
+ </description>
+ </property>
+ <property>
+ <name>hbase.hregion.max.filesize</name>
+ <value>67108864</value>
+ <description>
+ Maximum desired file size for an HRegion. If filesize exceeds
+ value + (value / 2), the HRegion is split in two. Default: 256M.
+
+ Keep the maximum filesize small so we split more often in tests.
+ </description>
+ </property>
+ <property>
+ <name>hadoop.log.dir</name>
+ <value>${user.dir}/../logs</value>
+ </property>
+ <property>
+ <name>hbase.zookeeper.property.clientPort</name>
+ <value>21818</value>
+ <description>Property from ZooKeeper's config zoo.cfg.
+ The port at which the clients will connect.
+ </description>
+ </property>
+ <property>
+ <name>hbase.defaults.for.version.skip</name>
+ <value>true</value>
+ <description>
+ Set to true to skip the 'hbase.defaults.for.version'.
+ Setting this to true can be useful in contexts other than
+ the other side of a maven generation; i.e. running in an
+ ide. You'll want to set this boolean to true to avoid
+ seeing the RuntimeException complaint: "hbase-default.xml file
+ seems to be for and old version of HBase (@@@VERSION@@@), this
+ version is X.X.X-SNAPSHOT"
+ </description>
+ </property>
+ <property>
+ <name>hbase.table.sanity.checks</name>
+ <value>false</value>
+ <description>Skip sanity checks in tests
+ </description>
+ </property>
+</configuration>
http://git-wip-us.apache.org/repos/asf/hbase/blob/664b6be0/hbase-mapreduce/src/test/resources/hdfs-site.xml
----------------------------------------------------------------------
diff --git a/hbase-mapreduce/src/test/resources/hdfs-site.xml b/hbase-mapreduce/src/test/resources/hdfs-site.xml
new file mode 100644
index 0000000..03be0c7
--- /dev/null
+++ b/hbase-mapreduce/src/test/resources/hdfs-site.xml
@@ -0,0 +1,32 @@
+<?xml version="1.0"?>
+<?xml-stylesheet type="text/xsl" href="configuration.xsl"?>
+<!--
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+-->
+<configuration>
+
+ <!-- hadoop-2.0.5+'s HDFS-4305 by default enforces a min blocks size
+ of 1024*1024. Many unit tests that use the hlog use smaller
+ blocks. Setting this config to 0 to have tests pass -->
+ <property>
+ <name>dfs.namenode.fs-limits.min-block-size</name>
+ <value>0</value>
+ </property>
+</configuration>
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/hbase/blob/664b6be0/hbase-mapreduce/src/test/resources/log4j.properties
----------------------------------------------------------------------
diff --git a/hbase-mapreduce/src/test/resources/log4j.properties b/hbase-mapreduce/src/test/resources/log4j.properties
new file mode 100644
index 0000000..c322699
--- /dev/null
+++ b/hbase-mapreduce/src/test/resources/log4j.properties
@@ -0,0 +1,68 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+# Define some default values that can be overridden by system properties
+hbase.root.logger=INFO,console
+hbase.log.dir=.
+hbase.log.file=hbase.log
+
+# Define the root logger to the system property "hbase.root.logger".
+log4j.rootLogger=${hbase.root.logger}
+
+# Logging Threshold
+log4j.threshold=ALL
+
+#
+# Daily Rolling File Appender
+#
+log4j.appender.DRFA=org.apache.log4j.DailyRollingFileAppender
+log4j.appender.DRFA.File=${hbase.log.dir}/${hbase.log.file}
+
+# Rollver at midnight
+log4j.appender.DRFA.DatePattern=.yyyy-MM-dd
+
+# 30-day backup
+#log4j.appender.DRFA.MaxBackupIndex=30
+log4j.appender.DRFA.layout=org.apache.log4j.PatternLayout
+# Debugging Pattern format
+log4j.appender.DRFA.layout.ConversionPattern=%d{ISO8601} %-5p [%t] %C{2}(%L): %m%n
+
+
+#
+# console
+# Add "console" to rootlogger above if you want to use this
+#
+log4j.appender.console=org.apache.log4j.ConsoleAppender
+log4j.appender.console.target=System.err
+log4j.appender.console.layout=org.apache.log4j.PatternLayout
+log4j.appender.console.layout.ConversionPattern=%d{ISO8601} %-5p [%t] %C{2}(%L): %m%n
+
+# Custom Logging levels
+
+#log4j.logger.org.apache.hadoop.fs.FSNamesystem=DEBUG
+
+log4j.logger.org.apache.hadoop=WARN
+log4j.logger.org.apache.zookeeper=ERROR
+log4j.logger.org.apache.hadoop.hbase=DEBUG
+
+#These settings are workarounds against spurious logs from the minicluster.
+#See HBASE-4709
+log4j.logger.org.apache.hadoop.metrics2.impl.MetricsConfig=WARN
+log4j.logger.org.apache.hadoop.metrics2.impl.MetricsSinkAdapter=WARN
+log4j.logger.org.apache.hadoop.metrics2.impl.MetricsSystemImpl=WARN
+log4j.logger.org.apache.hadoop.metrics2.util.MBeans=WARN
+# Enable this to get detailed connection error/retry logging.
+# log4j.logger.org.apache.hadoop.hbase.client.ConnectionImplementation=TRACE
http://git-wip-us.apache.org/repos/asf/hbase/blob/664b6be0/hbase-mapreduce/src/test/resources/mapred-queues.xml
----------------------------------------------------------------------
diff --git a/hbase-mapreduce/src/test/resources/mapred-queues.xml b/hbase-mapreduce/src/test/resources/mapred-queues.xml
new file mode 100644
index 0000000..43f3e2a
--- /dev/null
+++ b/hbase-mapreduce/src/test/resources/mapred-queues.xml
@@ -0,0 +1,75 @@
+<?xml version="1.0"?>
+<!--
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+-->
+<!-- This is the template for queue configuration. The format supports nesting of
+ queues within queues - a feature called hierarchical queues. All queues are
+ defined within the 'queues' tag which is the top level element for this
+ XML document.
+ The 'aclsEnabled' attribute should be set to true, if ACLs should be checked
+ on queue operations such as submitting jobs, killing jobs etc. -->
+<queues aclsEnabled="false">
+
+ <!-- Configuration for a queue is specified by defining a 'queue' element. -->
+ <queue>
+
+ <!-- Name of a queue. Queue name cannot contain a ':' -->
+ <name>default</name>
+
+ <!-- properties for a queue, typically used by schedulers,
+ can be defined here -->
+ <properties>
+ </properties>
+
+ <!-- State of the queue. If running, the queue will accept new jobs.
+ If stopped, the queue will not accept new jobs. -->
+ <state>running</state>
+
+ <!-- Specifies the ACLs to check for submitting jobs to this queue.
+ If set to '*', it allows all users to submit jobs to the queue.
+ For specifying a list of users and groups the format to use is
+ user1,user2 group1,group2 -->
+ <acl-submit-job>*</acl-submit-job>
+
+ <!-- Specifies the ACLs to check for modifying jobs in this queue.
+ Modifications include killing jobs, tasks of jobs or changing
+ priorities.
+ If set to '*', it allows all users to submit jobs to the queue.
+ For specifying a list of users and groups the format to use is
+ user1,user2 group1,group2 -->
+ <acl-administer-jobs>*</acl-administer-jobs>
+ </queue>
+
+ <!-- Here is a sample of a hierarchical queue configuration
+ where q2 is a child of q1. In this example, q2 is a leaf level
+ queue as it has no queues configured within it. Currently, ACLs
+ and state are only supported for the leaf level queues.
+ Note also the usage of properties for the queue q2.
+ <queue>
+ <name>q1</name>
+ <queue>
+ <name>q2</name>
+ <properties>
+ <property key="capacity" value="20"/>
+ <property key="user-limit" value="30"/>
+ </properties>
+ </queue>
+ </queue>
+ -->
+</queues>
http://git-wip-us.apache.org/repos/asf/hbase/blob/664b6be0/hbase-mapreduce/src/test/resources/mapred-site.xml
----------------------------------------------------------------------
diff --git a/hbase-mapreduce/src/test/resources/mapred-site.xml b/hbase-mapreduce/src/test/resources/mapred-site.xml
new file mode 100644
index 0000000..787ffb7
--- /dev/null
+++ b/hbase-mapreduce/src/test/resources/mapred-site.xml
@@ -0,0 +1,34 @@
+<?xml version="1.0"?>
+<?xml-stylesheet type="text/xsl" href="configuration.xsl"?>
+<!--
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+-->
+<configuration>
+<property>
+ <name>mapred.map.child.java.opts</name>
+ <value>-Djava.awt.headless=true</value>
+</property>
+
+<property>
+ <name>mapred.reduce.child.java.opts</name>
+ <value>-Djava.awt.headless=true</value>
+</property>
+</configuration>
+
http://git-wip-us.apache.org/repos/asf/hbase/blob/664b6be0/hbase-mapreduce/src/test/resources/org/apache/hadoop/hbase/PerformanceEvaluation_Counter.properties
----------------------------------------------------------------------
diff --git a/hbase-mapreduce/src/test/resources/org/apache/hadoop/hbase/PerformanceEvaluation_Counter.properties b/hbase-mapreduce/src/test/resources/org/apache/hadoop/hbase/PerformanceEvaluation_Counter.properties
new file mode 100644
index 0000000..6fca96a
--- /dev/null
+++ b/hbase-mapreduce/src/test/resources/org/apache/hadoop/hbase/PerformanceEvaluation_Counter.properties
@@ -0,0 +1,28 @@
+# ResourceBundle properties file for Map-Reduce counters
+
+#/**
+# * Licensed to the Apache Software Foundation (ASF) under one
+# * or more contributor license agreements. See the NOTICE file
+# * distributed with this work for additional information
+# * regarding copyright ownership. The ASF licenses this file
+# * to you under the Apache License, Version 2.0 (the
+# * "License"); you may not use this file except in compliance
+# * with the License. You may obtain a copy of the License at
+# *
+# * http://www.apache.org/licenses/LICENSE-2.0
+# *
+# * Unless required by applicable law or agreed to in writing, software
+# * distributed under the License is distributed on an "AS IS" BASIS,
+# * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# * See the License for the specific language governing permissions and
+# * limitations under the License.
+# */
+
+CounterGroupName= HBase Performance Evaluation
+ELAPSED_TIME.name= Elapsed time in milliseconds
+ROWS.name= Row count
+# ResourceBundle properties file for Map-Reduce counters
+
+CounterGroupName= HBase Performance Evaluation
+ELAPSED_TIME.name= Elapsed time in milliseconds
+ROWS.name= Row count
http://git-wip-us.apache.org/repos/asf/hbase/blob/664b6be0/hbase-mapreduce/src/test/resources/org/apache/hadoop/hbase/mapreduce/exportedTableIn94Format
----------------------------------------------------------------------
diff --git a/hbase-mapreduce/src/test/resources/org/apache/hadoop/hbase/mapreduce/exportedTableIn94Format b/hbase-mapreduce/src/test/resources/org/apache/hadoop/hbase/mapreduce/exportedTableIn94Format
new file mode 100755
index 0000000..762ddd7
Binary files /dev/null and b/hbase-mapreduce/src/test/resources/org/apache/hadoop/hbase/mapreduce/exportedTableIn94Format differ
http://git-wip-us.apache.org/repos/asf/hbase/blob/664b6be0/hbase-rest/pom.xml
----------------------------------------------------------------------
diff --git a/hbase-rest/pom.xml b/hbase-rest/pom.xml
index 3af9829..639c0c2 100644
--- a/hbase-rest/pom.xml
+++ b/hbase-rest/pom.xml
@@ -212,6 +212,16 @@
</dependency>
<dependency>
<groupId>org.apache.hbase</groupId>
+ <artifactId>hbase-mapreduce</artifactId>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.hbase</groupId>
+ <artifactId>hbase-mapreduce</artifactId>
+ <type>test-jar</type>
+ <scope>test</scope>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.hbase</groupId>
<artifactId>hbase-hadoop-compat</artifactId>
</dependency>
<dependency>
http://git-wip-us.apache.org/repos/asf/hbase/blob/664b6be0/hbase-rest/src/test/java/org/apache/hadoop/hbase/rest/PerformanceEvaluation.java
----------------------------------------------------------------------
diff --git a/hbase-rest/src/test/java/org/apache/hadoop/hbase/rest/PerformanceEvaluation.java b/hbase-rest/src/test/java/org/apache/hadoop/hbase/rest/PerformanceEvaluation.java
index 3559ee0..6ed170e 100644
--- a/hbase-rest/src/test/java/org/apache/hadoop/hbase/rest/PerformanceEvaluation.java
+++ b/hbase-rest/src/test/java/org/apache/hadoop/hbase/rest/PerformanceEvaluation.java
@@ -220,8 +220,8 @@ public class PerformanceEvaluation extends Configured implements Tool {
/**
* This class works as the InputSplit of Performance Evaluation
- * MapReduce InputFormat, and the Record Value of RecordReader.
- * Each map task will only read one record from a PeInputSplit,
+ * MapReduce InputFormat, and the Record Value of RecordReader.
+ * Each map task will only read one record from a PeInputSplit,
* the record value is the PeInputSplit itself.
*/
public static class PeInputSplit extends InputSplit implements Writable {
@@ -950,7 +950,7 @@ public class PerformanceEvaluation extends Configured implements Tool {
static abstract class TableTest extends Test {
protected Table table;
-
+
public TableTest(Configuration conf, TestOptions options, Status status) {
super(conf, options, status);
}
http://git-wip-us.apache.org/repos/asf/hbase/blob/664b6be0/hbase-server/src/main/java/org/apache/hadoop/hbase/client/TableSnapshotScanner.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/client/TableSnapshotScanner.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/client/TableSnapshotScanner.java
index bcd433c..d520113 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/client/TableSnapshotScanner.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/client/TableSnapshotScanner.java
@@ -43,7 +43,7 @@ import org.apache.hadoop.hbase.util.FSUtils;
* <p>
* This also allows one to run the scan from an
* online or offline hbase cluster. The snapshot files can be exported by using the
- * {@link org.apache.hadoop.hbase.snapshot.ExportSnapshot} tool,
+ * org.apache.hadoop.hbase.snapshot.ExportSnapshot tool,
* to a pure-hdfs cluster, and this scanner can be used to
* run the scan directly over the snapshot files. The snapshot should not be deleted while there
* are open scanners reading from snapshot files.
@@ -60,7 +60,7 @@ import org.apache.hadoop.hbase.util.FSUtils;
* snapshot files, the job has to be run as the HBase user or the user must have group or other
* priviledges in the filesystem (See HBASE-8369). Note that, given other users access to read from
* snapshot/data files will completely circumvent the access control enforced by HBase.
- * @see org.apache.hadoop.hbase.mapreduce.TableSnapshotInputFormat
+ * See org.apache.hadoop.hbase.mapreduce.TableSnapshotInputFormat.
*/
@InterfaceAudience.Public
public class TableSnapshotScanner extends AbstractClientScanner {
http://git-wip-us.apache.org/repos/asf/hbase/blob/664b6be0/hbase-server/src/main/java/org/apache/hadoop/hbase/mapred/Driver.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/mapred/Driver.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/mapred/Driver.java
deleted file mode 100644
index 618c14a..0000000
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/mapred/Driver.java
+++ /dev/null
@@ -1,52 +0,0 @@
-/**
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.hadoop.hbase.mapred;
-
-import org.apache.hadoop.hbase.HBaseInterfaceAudience;
-import org.apache.hadoop.hbase.classification.InterfaceAudience;
-import org.apache.hadoop.hbase.classification.InterfaceStability;
-import org.apache.hadoop.util.ProgramDriver;
-
-import org.apache.hadoop.hbase.shaded.com.google.common.annotations.VisibleForTesting;
-
-/**
- * Driver for hbase mapreduce jobs. Select which to run by passing name of job
- * to this main.
- */
-@InterfaceAudience.LimitedPrivate(HBaseInterfaceAudience.TOOLS)
-@InterfaceStability.Stable
-public class Driver {
-
- private static ProgramDriver pgd = new ProgramDriver();
-
- @VisibleForTesting
- static void setProgramDriver(ProgramDriver pgd0) {
- pgd = pgd0;
- }
-
- /**
- * @param args
- * @throws Throwable
- */
- public static void main(String[] args) throws Throwable {
- pgd.addClass(RowCounter.NAME, RowCounter.class, "Count rows in HBase table");
- ProgramDriver.class.getMethod("driver", new Class[] { String[].class })
- .invoke(pgd, new Object[] { args });
- }
-}
http://git-wip-us.apache.org/repos/asf/hbase/blob/664b6be0/hbase-server/src/main/java/org/apache/hadoop/hbase/mapred/GroupingTableMap.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/mapred/GroupingTableMap.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/mapred/GroupingTableMap.java
deleted file mode 100644
index a534224..0000000
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/mapred/GroupingTableMap.java
+++ /dev/null
@@ -1,157 +0,0 @@
-/**
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.hadoop.hbase.mapred;
-
-import java.io.IOException;
-import java.util.ArrayList;
-
-import org.apache.hadoop.hbase.classification.InterfaceAudience;
-import org.apache.hadoop.hbase.Cell;
-import org.apache.hadoop.hbase.CellUtil;
-import org.apache.hadoop.hbase.KeyValue;
-import org.apache.hadoop.hbase.client.Result;
-import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
-import org.apache.hadoop.hbase.util.Bytes;
-import org.apache.hadoop.mapred.JobConf;
-import org.apache.hadoop.mapred.MapReduceBase;
-import org.apache.hadoop.mapred.OutputCollector;
-import org.apache.hadoop.mapred.Reporter;
-
-
-/**
- * Extract grouping columns from input record
- */
-@InterfaceAudience.Public
-public class GroupingTableMap
-extends MapReduceBase
-implements TableMap<ImmutableBytesWritable,Result> {
-
- /**
- * JobConf parameter to specify the columns used to produce the key passed to
- * collect from the map phase
- */
- public static final String GROUP_COLUMNS =
- "hbase.mapred.groupingtablemap.columns";
-
- protected byte [][] columns;
-
- /**
- * Use this before submitting a TableMap job. It will appropriately set up the
- * JobConf.
- *
- * @param table table to be processed
- * @param columns space separated list of columns to fetch
- * @param groupColumns space separated list of columns used to form the key
- * used in collect
- * @param mapper map class
- * @param job job configuration object
- */
- @SuppressWarnings("unchecked")
- public static void initJob(String table, String columns, String groupColumns,
- Class<? extends TableMap> mapper, JobConf job) {
-
- TableMapReduceUtil.initTableMapJob(table, columns, mapper,
- ImmutableBytesWritable.class, Result.class, job);
- job.set(GROUP_COLUMNS, groupColumns);
- }
-
- @Override
- public void configure(JobConf job) {
- super.configure(job);
- String[] cols = job.get(GROUP_COLUMNS, "").split(" ");
- columns = new byte[cols.length][];
- for(int i = 0; i < cols.length; i++) {
- columns[i] = Bytes.toBytes(cols[i]);
- }
- }
-
- /**
- * Extract the grouping columns from value to construct a new key.
- *
- * Pass the new key and value to reduce.
- * If any of the grouping columns are not found in the value, the record is skipped.
- * @param key
- * @param value
- * @param output
- * @param reporter
- * @throws IOException
- */
- public void map(ImmutableBytesWritable key, Result value,
- OutputCollector<ImmutableBytesWritable,Result> output,
- Reporter reporter) throws IOException {
-
- byte[][] keyVals = extractKeyValues(value);
- if(keyVals != null) {
- ImmutableBytesWritable tKey = createGroupKey(keyVals);
- output.collect(tKey, value);
- }
- }
-
- /**
- * Extract columns values from the current record. This method returns
- * null if any of the columns are not found.
- *
- * Override this method if you want to deal with nulls differently.
- *
- * @param r
- * @return array of byte values
- */
- protected byte[][] extractKeyValues(Result r) {
- byte[][] keyVals = null;
- ArrayList<byte[]> foundList = new ArrayList<>();
- int numCols = columns.length;
- if (numCols > 0) {
- for (Cell value: r.listCells()) {
- byte [] column = KeyValue.makeColumn(CellUtil.cloneFamily(value),
- CellUtil.cloneQualifier(value));
- for (int i = 0; i < numCols; i++) {
- if (Bytes.equals(column, columns[i])) {
- foundList.add(CellUtil.cloneValue(value));
- break;
- }
- }
- }
- if(foundList.size() == numCols) {
- keyVals = foundList.toArray(new byte[numCols][]);
- }
- }
- return keyVals;
- }
-
- /**
- * Create a key by concatenating multiple column values.
- * Override this function in order to produce different types of keys.
- *
- * @param vals
- * @return key generated by concatenating multiple column values
- */
- protected ImmutableBytesWritable createGroupKey(byte[][] vals) {
- if(vals == null) {
- return null;
- }
- StringBuilder sb = new StringBuilder();
- for(int i = 0; i < vals.length; i++) {
- if(i > 0) {
- sb.append(" ");
- }
- sb.append(Bytes.toString(vals[i]));
- }
- return new ImmutableBytesWritable(Bytes.toBytesBinary(sb.toString()));
- }
-}
http://git-wip-us.apache.org/repos/asf/hbase/blob/664b6be0/hbase-server/src/main/java/org/apache/hadoop/hbase/mapred/HRegionPartitioner.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/mapred/HRegionPartitioner.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/mapred/HRegionPartitioner.java
deleted file mode 100644
index 0011a60..0000000
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/mapred/HRegionPartitioner.java
+++ /dev/null
@@ -1,96 +0,0 @@
-/**
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.hadoop.hbase.mapred;
-
-import java.io.IOException;
-
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
-import org.apache.hadoop.hbase.HBaseConfiguration;
-import org.apache.hadoop.hbase.TableName;
-import org.apache.hadoop.hbase.classification.InterfaceAudience;
-import org.apache.hadoop.hbase.client.Connection;
-import org.apache.hadoop.hbase.client.ConnectionFactory;
-import org.apache.hadoop.hbase.client.RegionLocator;
-import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
-import org.apache.hadoop.hbase.util.Bytes;
-import org.apache.hadoop.mapred.JobConf;
-import org.apache.hadoop.mapred.Partitioner;
-
-
-/**
- * This is used to partition the output keys into groups of keys.
- * Keys are grouped according to the regions that currently exist
- * so that each reducer fills a single region so load is distributed.
- *
- * @param <K2>
- * @param <V2>
- */
-@InterfaceAudience.Public
-public class HRegionPartitioner<K2,V2>
-implements Partitioner<ImmutableBytesWritable, V2> {
- private static final Log LOG = LogFactory.getLog(HRegionPartitioner.class);
- // Connection and locator are not cleaned up; they just die when partitioner is done.
- private Connection connection;
- private RegionLocator locator;
- private byte[][] startKeys;
-
- public void configure(JobConf job) {
- try {
- this.connection = ConnectionFactory.createConnection(HBaseConfiguration.create(job));
- TableName tableName = TableName.valueOf(job.get(TableOutputFormat.OUTPUT_TABLE));
- this.locator = this.connection.getRegionLocator(tableName);
- } catch (IOException e) {
- LOG.error(e);
- }
-
- try {
- this.startKeys = this.locator.getStartKeys();
- } catch (IOException e) {
- LOG.error(e);
- }
- }
-
- public int getPartition(ImmutableBytesWritable key, V2 value, int numPartitions) {
- byte[] region = null;
- // Only one region return 0
- if (this.startKeys.length == 1){
- return 0;
- }
- try {
- // Not sure if this is cached after a split so we could have problems
- // here if a region splits while mapping
- region = locator.getRegionLocation(key.get()).getRegionInfo().getStartKey();
- } catch (IOException e) {
- LOG.error(e);
- }
- for (int i = 0; i < this.startKeys.length; i++){
- if (Bytes.compareTo(region, this.startKeys[i]) == 0 ){
- if (i >= numPartitions-1){
- // cover if we have less reduces then regions.
- return (Integer.toString(i).hashCode()
- & Integer.MAX_VALUE) % numPartitions;
- }
- return i;
- }
- }
- // if above fails to find start key that match we need to return something
- return 0;
- }
-}
http://git-wip-us.apache.org/repos/asf/hbase/blob/664b6be0/hbase-server/src/main/java/org/apache/hadoop/hbase/mapred/IdentityTableMap.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/mapred/IdentityTableMap.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/mapred/IdentityTableMap.java
deleted file mode 100644
index dfacff9..0000000
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/mapred/IdentityTableMap.java
+++ /dev/null
@@ -1,76 +0,0 @@
-/**
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.hadoop.hbase.mapred;
-
-import java.io.IOException;
-
-import org.apache.hadoop.hbase.classification.InterfaceAudience;
-import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
-import org.apache.hadoop.hbase.client.Result;
-import org.apache.hadoop.mapred.JobConf;
-import org.apache.hadoop.mapred.MapReduceBase;
-import org.apache.hadoop.mapred.OutputCollector;
-import org.apache.hadoop.mapred.Reporter;
-
-/**
- * Pass the given key and record as-is to reduce
- */
-@InterfaceAudience.Public
-public class IdentityTableMap
-extends MapReduceBase
-implements TableMap<ImmutableBytesWritable, Result> {
-
- /** constructor */
- public IdentityTableMap() {
- super();
- }
-
- /**
- * Use this before submitting a TableMap job. It will
- * appropriately set up the JobConf.
- *
- * @param table table name
- * @param columns columns to scan
- * @param mapper mapper class
- * @param job job configuration
- */
- @SuppressWarnings("unchecked")
- public static void initJob(String table, String columns,
- Class<? extends TableMap> mapper, JobConf job) {
- TableMapReduceUtil.initTableMapJob(table, columns, mapper,
- ImmutableBytesWritable.class,
- Result.class, job);
- }
-
- /**
- * Pass the key, value to reduce
- * @param key
- * @param value
- * @param output
- * @param reporter
- * @throws IOException
- */
- public void map(ImmutableBytesWritable key, Result value,
- OutputCollector<ImmutableBytesWritable,Result> output,
- Reporter reporter) throws IOException {
-
- // convert
- output.collect(key, value);
- }
-}
http://git-wip-us.apache.org/repos/asf/hbase/blob/664b6be0/hbase-server/src/main/java/org/apache/hadoop/hbase/mapred/IdentityTableReduce.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/mapred/IdentityTableReduce.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/mapred/IdentityTableReduce.java
deleted file mode 100644
index 9c2e604..0000000
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/mapred/IdentityTableReduce.java
+++ /dev/null
@@ -1,61 +0,0 @@
-/**
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.hadoop.hbase.mapred;
-
-import java.io.IOException;
-import java.util.Iterator;
-
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
-import org.apache.hadoop.hbase.classification.InterfaceAudience;
-import org.apache.hadoop.hbase.client.Put;
-import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
-import org.apache.hadoop.mapred.MapReduceBase;
-import org.apache.hadoop.mapred.OutputCollector;
-import org.apache.hadoop.mapred.Reporter;
-
-/**
- * Write to table each key, record pair
- */
-@InterfaceAudience.Public
-public class IdentityTableReduce
-extends MapReduceBase
-implements TableReduce<ImmutableBytesWritable, Put> {
- @SuppressWarnings("unused")
- private static final Log LOG =
- LogFactory.getLog(IdentityTableReduce.class.getName());
-
- /**
- * No aggregation, output pairs of (key, record)
- * @param key
- * @param values
- * @param output
- * @param reporter
- * @throws IOException
- */
- public void reduce(ImmutableBytesWritable key, Iterator<Put> values,
- OutputCollector<ImmutableBytesWritable, Put> output,
- Reporter reporter)
- throws IOException {
-
- while(values.hasNext()) {
- output.collect(key, values.next());
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/hbase/blob/664b6be0/hbase-server/src/main/java/org/apache/hadoop/hbase/mapred/MultiTableSnapshotInputFormat.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/mapred/MultiTableSnapshotInputFormat.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/mapred/MultiTableSnapshotInputFormat.java
deleted file mode 100644
index 3e121fe..0000000
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/mapred/MultiTableSnapshotInputFormat.java
+++ /dev/null
@@ -1,128 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.hadoop.hbase.mapred;
-
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.hbase.classification.InterfaceAudience;
-import org.apache.hadoop.hbase.client.Result;
-import org.apache.hadoop.hbase.client.Scan;
-import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
-import org.apache.hadoop.hbase.mapreduce.MultiTableSnapshotInputFormatImpl;
-import org.apache.hadoop.hbase.mapreduce.TableSnapshotInputFormatImpl;
-import org.apache.hadoop.mapred.InputFormat;
-import org.apache.hadoop.mapred.InputSplit;
-import org.apache.hadoop.mapred.JobConf;
-import org.apache.hadoop.mapred.RecordReader;
-import org.apache.hadoop.mapred.Reporter;
-
-import java.io.IOException;
-import java.util.Collection;
-import java.util.List;
-import java.util.Map;
-
-/**
- * MultiTableSnapshotInputFormat generalizes {@link org.apache.hadoop.hbase.mapred
- * .TableSnapshotInputFormat}
- * allowing a MapReduce job to run over one or more table snapshots, with one or more scans
- * configured for each.
- * Internally, the input format delegates to {@link org.apache.hadoop.hbase.mapreduce
- * .TableSnapshotInputFormat}
- * and thus has the same performance advantages; see {@link org.apache.hadoop.hbase.mapreduce
- * .TableSnapshotInputFormat} for
- * more details.
- * Usage is similar to TableSnapshotInputFormat, with the following exception:
- * initMultiTableSnapshotMapperJob takes in a map
- * from snapshot name to a collection of scans. For each snapshot in the map, each corresponding
- * scan will be applied;
- * the overall dataset for the job is defined by the concatenation of the regions and tables
- * included in each snapshot/scan
- * pair.
- * {@link org.apache.hadoop.hbase.mapred.TableMapReduceUtil#initMultiTableSnapshotMapperJob(Map,
- * Class, Class, Class, JobConf, boolean, Path)}
- * can be used to configure the job.
- * <pre>{@code
- * Job job = new Job(conf);
- * Map<String, Collection<Scan>> snapshotScans = ImmutableMap.of(
- * "snapshot1", ImmutableList.of(new Scan(Bytes.toBytes("a"), Bytes.toBytes("b"))),
- * "snapshot2", ImmutableList.of(new Scan(Bytes.toBytes("1"), Bytes.toBytes("2")))
- * );
- * Path restoreDir = new Path("/tmp/snapshot_restore_dir")
- * TableMapReduceUtil.initTableSnapshotMapperJob(
- * snapshotScans, MyTableMapper.class, MyMapKeyOutput.class,
- * MyMapOutputValueWritable.class, job, true, restoreDir);
- * }
- * </pre>
- * Internally, this input format restores each snapshot into a subdirectory of the given tmp
- * directory. Input splits and
- * record readers are created as described in {@link org.apache.hadoop.hbase.mapreduce
- * .TableSnapshotInputFormat}
- * (one per region).
- * See {@link org.apache.hadoop.hbase.mapreduce.TableSnapshotInputFormat} for more notes on
- * permissioning; the
- * same caveats apply here.
- *
- * @see org.apache.hadoop.hbase.mapreduce.TableSnapshotInputFormat
- * @see org.apache.hadoop.hbase.client.TableSnapshotScanner
- */
-@InterfaceAudience.Public
-public class MultiTableSnapshotInputFormat extends TableSnapshotInputFormat
- implements InputFormat<ImmutableBytesWritable, Result> {
-
- private final MultiTableSnapshotInputFormatImpl delegate;
-
- public MultiTableSnapshotInputFormat() {
- this.delegate = new MultiTableSnapshotInputFormatImpl();
- }
-
- @Override
- public InputSplit[] getSplits(JobConf job, int numSplits) throws IOException {
- List<TableSnapshotInputFormatImpl.InputSplit> splits = delegate.getSplits(job);
- InputSplit[] results = new InputSplit[splits.size()];
- for (int i = 0; i < splits.size(); i++) {
- results[i] = new TableSnapshotRegionSplit(splits.get(i));
- }
- return results;
- }
-
- @Override
- public RecordReader<ImmutableBytesWritable, Result> getRecordReader(InputSplit split, JobConf job,
- Reporter reporter) throws IOException {
- return new TableSnapshotRecordReader((TableSnapshotRegionSplit) split, job);
- }
-
- /**
- * Configure conf to read from snapshotScans, with snapshots restored to a subdirectory of
- * restoreDir.
- * Sets: {@link org.apache.hadoop.hbase.mapreduce
- * .MultiTableSnapshotInputFormatImpl#RESTORE_DIRS_KEY},
- * {@link org.apache.hadoop.hbase.mapreduce
- * .MultiTableSnapshotInputFormatImpl#SNAPSHOT_TO_SCANS_KEY}
- *
- * @param conf
- * @param snapshotScans
- * @param restoreDir
- * @throws IOException
- */
- public static void setInput(Configuration conf, Map<String, Collection<Scan>> snapshotScans,
- Path restoreDir) throws IOException {
- new MultiTableSnapshotInputFormatImpl().setInput(conf, snapshotScans, restoreDir);
- }
-
-}