You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hbase.apache.org by bu...@apache.org on 2017/08/27 05:33:24 UTC
[24/50] [abbrv] hbase git commit: HBASE-18640 Move mapreduce out of
hbase-server into separate module.
http://git-wip-us.apache.org/repos/asf/hbase/blob/664b6be0/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/HFileOutputFormat2.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/HFileOutputFormat2.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/HFileOutputFormat2.java
deleted file mode 100644
index 7fea254..0000000
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/HFileOutputFormat2.java
+++ /dev/null
@@ -1,902 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.hadoop.hbase.mapreduce;
-
-import java.io.IOException;
-import java.io.UnsupportedEncodingException;
-import java.net.InetSocketAddress;
-import java.net.URLDecoder;
-import java.net.URLEncoder;
-import java.nio.charset.StandardCharsets;
-import java.util.ArrayList;
-import java.util.Arrays;
-import java.util.List;
-import java.util.Map;
-import java.util.Set;
-import java.util.TreeMap;
-import java.util.TreeSet;
-import java.util.UUID;
-import java.util.function.Function;
-import java.util.stream.Collectors;
-
-import org.apache.commons.lang.StringUtils;
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.hbase.Cell;
-import org.apache.hadoop.hbase.CellComparator;
-import org.apache.hadoop.hbase.CellUtil;
-import org.apache.hadoop.hbase.classification.InterfaceAudience;
-import org.apache.hadoop.hbase.client.TableDescriptor;
-import org.apache.hadoop.hbase.client.ColumnFamilyDescriptor;
-import org.apache.hadoop.hbase.client.ColumnFamilyDescriptorBuilder;
-import org.apache.hadoop.hbase.client.Connection;
-import org.apache.hadoop.hbase.client.ConnectionFactory;
-import org.apache.hadoop.hbase.client.Put;
-import org.apache.hadoop.hbase.client.RegionLocator;
-import org.apache.hadoop.hbase.client.Table;
-import org.apache.hadoop.hbase.fs.HFileSystem;
-import org.apache.hadoop.hbase.HConstants;
-import org.apache.hadoop.hbase.HRegionLocation;
-import org.apache.hadoop.hbase.HTableDescriptor;
-import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
-import org.apache.hadoop.hbase.io.compress.Compression;
-import org.apache.hadoop.hbase.io.compress.Compression.Algorithm;
-import org.apache.hadoop.hbase.io.encoding.DataBlockEncoding;
-import org.apache.hadoop.hbase.io.hfile.CacheConfig;
-import org.apache.hadoop.hbase.io.hfile.HFile;
-import org.apache.hadoop.hbase.io.hfile.HFileContext;
-import org.apache.hadoop.hbase.io.hfile.HFileContextBuilder;
-import org.apache.hadoop.hbase.io.hfile.HFileWriterImpl;
-import org.apache.hadoop.hbase.KeyValue;
-import org.apache.hadoop.hbase.KeyValueUtil;
-import org.apache.hadoop.hbase.regionserver.BloomType;
-import org.apache.hadoop.hbase.regionserver.HStore;
-import org.apache.hadoop.hbase.regionserver.StoreFile;
-import org.apache.hadoop.hbase.regionserver.StoreFileWriter;
-import org.apache.hadoop.hbase.TableName;
-import org.apache.hadoop.hbase.util.Bytes;
-import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
-import org.apache.hadoop.hbase.util.FSUtils;
-import org.apache.hadoop.io.NullWritable;
-import org.apache.hadoop.io.SequenceFile;
-import org.apache.hadoop.io.Text;
-import org.apache.hadoop.mapreduce.Job;
-import org.apache.hadoop.mapreduce.OutputFormat;
-import org.apache.hadoop.mapreduce.RecordWriter;
-import org.apache.hadoop.mapreduce.TaskAttemptContext;
-import org.apache.hadoop.mapreduce.lib.output.FileOutputCommitter;
-import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
-import org.apache.hadoop.mapreduce.lib.partition.TotalOrderPartitioner;
-
-import org.apache.hadoop.hbase.shaded.com.google.common.annotations.VisibleForTesting;
-
-/**
- * Writes HFiles. Passed Cells must arrive in order.
- * Writes current time as the sequence id for the file. Sets the major compacted
- * attribute on created @{link {@link HFile}s. Calling write(null,null) will forcibly roll
- * all HFiles being written.
- * <p>
- * Using this class as part of a MapReduce job is best done
- * using {@link #configureIncrementalLoad(Job, TableDescriptor, RegionLocator)}.
- */
-@InterfaceAudience.Public
-public class HFileOutputFormat2
- extends FileOutputFormat<ImmutableBytesWritable, Cell> {
- private static final Log LOG = LogFactory.getLog(HFileOutputFormat2.class);
- static class TableInfo {
- private TableDescriptor tableDesctiptor;
- private RegionLocator regionLocator;
-
- public TableInfo(TableDescriptor tableDesctiptor, RegionLocator regionLocator) {
- this.tableDesctiptor = tableDesctiptor;
- this.regionLocator = regionLocator;
- }
-
- /**
- * The modification for the returned HTD doesn't affect the inner TD.
- * @return A clone of inner table descriptor
- * @deprecated use {@link #getTableDescriptor}
- */
- @Deprecated
- public HTableDescriptor getHTableDescriptor() {
- return new HTableDescriptor(tableDesctiptor);
- }
-
- public TableDescriptor getTableDescriptor() {
- return tableDesctiptor;
- }
-
- public RegionLocator getRegionLocator() {
- return regionLocator;
- }
- }
-
- protected static final byte[] tableSeparator = ";".getBytes(StandardCharsets.UTF_8);
-
- protected static byte[] combineTableNameSuffix(byte[] tableName,
- byte[] suffix ) {
- return Bytes.add(tableName, tableSeparator, suffix);
- }
-
- // The following constants are private since these are used by
- // HFileOutputFormat2 to internally transfer data between job setup and
- // reducer run using conf.
- // These should not be changed by the client.
- static final String COMPRESSION_FAMILIES_CONF_KEY =
- "hbase.hfileoutputformat.families.compression";
- static final String BLOOM_TYPE_FAMILIES_CONF_KEY =
- "hbase.hfileoutputformat.families.bloomtype";
- static final String BLOCK_SIZE_FAMILIES_CONF_KEY =
- "hbase.mapreduce.hfileoutputformat.blocksize";
- static final String DATABLOCK_ENCODING_FAMILIES_CONF_KEY =
- "hbase.mapreduce.hfileoutputformat.families.datablock.encoding";
-
- // This constant is public since the client can modify this when setting
- // up their conf object and thus refer to this symbol.
- // It is present for backwards compatibility reasons. Use it only to
- // override the auto-detection of datablock encoding.
- public static final String DATABLOCK_ENCODING_OVERRIDE_CONF_KEY =
- "hbase.mapreduce.hfileoutputformat.datablock.encoding";
-
- /**
- * Keep locality while generating HFiles for bulkload. See HBASE-12596
- */
- public static final String LOCALITY_SENSITIVE_CONF_KEY =
- "hbase.bulkload.locality.sensitive.enabled";
- private static final boolean DEFAULT_LOCALITY_SENSITIVE = true;
- static final String OUTPUT_TABLE_NAME_CONF_KEY =
- "hbase.mapreduce.hfileoutputformat.table.name";
- static final String MULTI_TABLE_HFILEOUTPUTFORMAT_CONF_KEY =
- "hbase.mapreduce.use.multi.table.hfileoutputformat";
-
- public static final String STORAGE_POLICY_PROPERTY = "hbase.hstore.storagepolicy";
- public static final String STORAGE_POLICY_PROPERTY_CF_PREFIX = STORAGE_POLICY_PROPERTY + ".";
-
- @Override
- public RecordWriter<ImmutableBytesWritable, Cell> getRecordWriter(
- final TaskAttemptContext context) throws IOException, InterruptedException {
- return createRecordWriter(context);
- }
-
- protected static byte[] getTableNameSuffixedWithFamily(byte[] tableName, byte[] family) {
- return combineTableNameSuffix(tableName, family);
- }
-
- static <V extends Cell> RecordWriter<ImmutableBytesWritable, V>
- createRecordWriter(final TaskAttemptContext context)
- throws IOException {
-
- // Get the path of the temporary output file
- final Path outputPath = FileOutputFormat.getOutputPath(context);
- final Path outputDir = new FileOutputCommitter(outputPath, context).getWorkPath();
- final Configuration conf = context.getConfiguration();
- final boolean writeMultipleTables = conf.getBoolean(MULTI_TABLE_HFILEOUTPUTFORMAT_CONF_KEY, false) ;
- final String writeTableNames = conf.get(OUTPUT_TABLE_NAME_CONF_KEY);
- if (writeTableNames==null || writeTableNames.isEmpty()) {
- throw new IllegalArgumentException("Configuration parameter " + OUTPUT_TABLE_NAME_CONF_KEY
- + " cannot be empty");
- }
- final FileSystem fs = outputDir.getFileSystem(conf);
- // These configs. are from hbase-*.xml
- final long maxsize = conf.getLong(HConstants.HREGION_MAX_FILESIZE,
- HConstants.DEFAULT_MAX_FILE_SIZE);
- // Invented config. Add to hbase-*.xml if other than default compression.
- final String defaultCompressionStr = conf.get("hfile.compression",
- Compression.Algorithm.NONE.getName());
- final Algorithm defaultCompression = HFileWriterImpl
- .compressionByName(defaultCompressionStr);
- final boolean compactionExclude = conf.getBoolean(
- "hbase.mapreduce.hfileoutputformat.compaction.exclude", false);
-
- final Set<String> allTableNames = Arrays.stream(writeTableNames.split(
- Bytes.toString(tableSeparator))).collect(Collectors.toSet());
-
- // create a map from column family to the compression algorithm
- final Map<byte[], Algorithm> compressionMap = createFamilyCompressionMap(conf);
- final Map<byte[], BloomType> bloomTypeMap = createFamilyBloomTypeMap(conf);
- final Map<byte[], Integer> blockSizeMap = createFamilyBlockSizeMap(conf);
-
- String dataBlockEncodingStr = conf.get(DATABLOCK_ENCODING_OVERRIDE_CONF_KEY);
- final Map<byte[], DataBlockEncoding> datablockEncodingMap
- = createFamilyDataBlockEncodingMap(conf);
- final DataBlockEncoding overriddenEncoding;
- if (dataBlockEncodingStr != null) {
- overriddenEncoding = DataBlockEncoding.valueOf(dataBlockEncodingStr);
- } else {
- overriddenEncoding = null;
- }
-
- return new RecordWriter<ImmutableBytesWritable, V>() {
- // Map of families to writers and how much has been output on the writer.
- private final Map<byte[], WriterLength> writers =
- new TreeMap<>(Bytes.BYTES_COMPARATOR);
- private byte[] previousRow = HConstants.EMPTY_BYTE_ARRAY;
- private final byte[] now = Bytes.toBytes(EnvironmentEdgeManager.currentTime());
- private boolean rollRequested = false;
-
- @Override
- public void write(ImmutableBytesWritable row, V cell)
- throws IOException {
- KeyValue kv = KeyValueUtil.ensureKeyValue(cell);
-
- // null input == user explicitly wants to flush
- if (row == null && kv == null) {
- rollWriters();
- return;
- }
-
- byte[] rowKey = CellUtil.cloneRow(kv);
- long length = kv.getLength();
- byte[] family = CellUtil.cloneFamily(kv);
- byte[] tableNameBytes = null;
- if (writeMultipleTables) {
- tableNameBytes = MultiTableHFileOutputFormat.getTableName(row.get());
- if (!allTableNames.contains(Bytes.toString(tableNameBytes))) {
- throw new IllegalArgumentException("TableName '" + Bytes.toString(tableNameBytes) +
- "' not" + " expected");
- }
- } else {
- tableNameBytes = writeTableNames.getBytes(StandardCharsets.UTF_8);
- }
- byte[] tableAndFamily = getTableNameSuffixedWithFamily(tableNameBytes, family);
- WriterLength wl = this.writers.get(tableAndFamily);
-
- // If this is a new column family, verify that the directory exists
- if (wl == null) {
- Path writerPath = null;
- if (writeMultipleTables) {
- writerPath = new Path(outputDir, new Path(Bytes.toString(tableNameBytes), Bytes
- .toString(family)));
- }
- else {
- writerPath = new Path(outputDir, Bytes.toString(family));
- }
- fs.mkdirs(writerPath);
- configureStoragePolicy(conf, fs, tableAndFamily, writerPath);
- }
-
- // If any of the HFiles for the column families has reached
- // maxsize, we need to roll all the writers
- if (wl != null && wl.written + length >= maxsize) {
- this.rollRequested = true;
- }
-
- // This can only happen once a row is finished though
- if (rollRequested && Bytes.compareTo(this.previousRow, rowKey) != 0) {
- rollWriters();
- }
-
- // create a new WAL writer, if necessary
- if (wl == null || wl.writer == null) {
- if (conf.getBoolean(LOCALITY_SENSITIVE_CONF_KEY, DEFAULT_LOCALITY_SENSITIVE)) {
- HRegionLocation loc = null;
-
- String tableName = Bytes.toString(tableNameBytes);
- if (tableName != null) {
- try (Connection connection = ConnectionFactory.createConnection(conf);
- RegionLocator locator =
- connection.getRegionLocator(TableName.valueOf(tableName))) {
- loc = locator.getRegionLocation(rowKey);
- } catch (Throwable e) {
- LOG.warn("There's something wrong when locating rowkey: " +
- Bytes.toString(rowKey) + " for tablename: " + tableName, e);
- loc = null;
- } }
-
- if (null == loc) {
- if (LOG.isTraceEnabled()) {
- LOG.trace("failed to get region location, so use default writer for rowkey: " +
- Bytes.toString(rowKey));
- }
- wl = getNewWriter(tableNameBytes, family, conf, null);
- } else {
- if (LOG.isDebugEnabled()) {
- LOG.debug("first rowkey: [" + Bytes.toString(rowKey) + "]");
- }
- InetSocketAddress initialIsa =
- new InetSocketAddress(loc.getHostname(), loc.getPort());
- if (initialIsa.isUnresolved()) {
- if (LOG.isTraceEnabled()) {
- LOG.trace("failed to resolve bind address: " + loc.getHostname() + ":"
- + loc.getPort() + ", so use default writer");
- }
- wl = getNewWriter(tableNameBytes, family, conf, null);
- } else {
- if (LOG.isDebugEnabled()) {
- LOG.debug("use favored nodes writer: " + initialIsa.getHostString());
- }
- wl = getNewWriter(tableNameBytes, family, conf, new InetSocketAddress[] { initialIsa
- });
- }
- }
- } else {
- wl = getNewWriter(tableNameBytes, family, conf, null);
- }
- }
-
- // we now have the proper WAL writer. full steam ahead
- kv.updateLatestStamp(this.now);
- wl.writer.append(kv);
- wl.written += length;
-
- // Copy the row so we know when a row transition.
- this.previousRow = rowKey;
- }
-
- private void rollWriters() throws IOException {
- for (WriterLength wl : this.writers.values()) {
- if (wl.writer != null) {
- LOG.info(
- "Writer=" + wl.writer.getPath() + ((wl.written == 0)? "": ", wrote=" + wl.written));
- close(wl.writer);
- }
- wl.writer = null;
- wl.written = 0;
- }
- this.rollRequested = false;
- }
-
- /*
- * Create a new StoreFile.Writer.
- * @param family
- * @return A WriterLength, containing a new StoreFile.Writer.
- * @throws IOException
- */
- @edu.umd.cs.findbugs.annotations.SuppressWarnings(value="BX_UNBOXING_IMMEDIATELY_REBOXED",
- justification="Not important")
- private WriterLength getNewWriter(byte[] tableName, byte[] family, Configuration
- conf, InetSocketAddress[] favoredNodes) throws IOException {
- byte[] tableAndFamily = getTableNameSuffixedWithFamily(tableName, family);
- Path familydir = new Path(outputDir, Bytes.toString(family));
- if (writeMultipleTables) {
- familydir = new Path(outputDir,
- new Path(Bytes.toString(tableName), Bytes.toString(family)));
- }
- WriterLength wl = new WriterLength();
- Algorithm compression = compressionMap.get(tableAndFamily);
- compression = compression == null ? defaultCompression : compression;
- BloomType bloomType = bloomTypeMap.get(tableAndFamily);
- bloomType = bloomType == null ? BloomType.NONE : bloomType;
- Integer blockSize = blockSizeMap.get(tableAndFamily);
- blockSize = blockSize == null ? HConstants.DEFAULT_BLOCKSIZE : blockSize;
- DataBlockEncoding encoding = overriddenEncoding;
- encoding = encoding == null ? datablockEncodingMap.get(tableAndFamily) : encoding;
- encoding = encoding == null ? DataBlockEncoding.NONE : encoding;
- Configuration tempConf = new Configuration(conf);
- tempConf.setFloat(HConstants.HFILE_BLOCK_CACHE_SIZE_KEY, 0.0f);
- HFileContextBuilder contextBuilder = new HFileContextBuilder()
- .withCompression(compression)
- .withChecksumType(HStore.getChecksumType(conf))
- .withBytesPerCheckSum(HStore.getBytesPerChecksum(conf))
- .withBlockSize(blockSize);
-
- if (HFile.getFormatVersion(conf) >= HFile.MIN_FORMAT_VERSION_WITH_TAGS) {
- contextBuilder.withIncludesTags(true);
- }
-
- contextBuilder.withDataBlockEncoding(encoding);
- HFileContext hFileContext = contextBuilder.build();
- if (null == favoredNodes) {
- wl.writer =
- new StoreFileWriter.Builder(conf, new CacheConfig(tempConf), fs)
- .withOutputDir(familydir).withBloomType(bloomType)
- .withComparator(CellComparator.COMPARATOR).withFileContext(hFileContext).build();
- } else {
- wl.writer =
- new StoreFileWriter.Builder(conf, new CacheConfig(tempConf), new HFileSystem(fs))
- .withOutputDir(familydir).withBloomType(bloomType)
- .withComparator(CellComparator.COMPARATOR).withFileContext(hFileContext)
- .withFavoredNodes(favoredNodes).build();
- }
-
- this.writers.put(tableAndFamily, wl);
- return wl;
- }
-
- private void close(final StoreFileWriter w) throws IOException {
- if (w != null) {
- w.appendFileInfo(StoreFile.BULKLOAD_TIME_KEY,
- Bytes.toBytes(System.currentTimeMillis()));
- w.appendFileInfo(StoreFile.BULKLOAD_TASK_KEY,
- Bytes.toBytes(context.getTaskAttemptID().toString()));
- w.appendFileInfo(StoreFile.MAJOR_COMPACTION_KEY,
- Bytes.toBytes(true));
- w.appendFileInfo(StoreFile.EXCLUDE_FROM_MINOR_COMPACTION_KEY,
- Bytes.toBytes(compactionExclude));
- w.appendTrackedTimestampsToMetadata();
- w.close();
- }
- }
-
- @Override
- public void close(TaskAttemptContext c)
- throws IOException, InterruptedException {
- for (WriterLength wl: this.writers.values()) {
- close(wl.writer);
- }
- }
- };
- }
-
- /**
- * Configure block storage policy for CF after the directory is created.
- */
- static void configureStoragePolicy(final Configuration conf, final FileSystem fs,
- byte[] tableAndFamily, Path cfPath) {
- if (null == conf || null == fs || null == tableAndFamily || null == cfPath) {
- return;
- }
-
- String policy =
- conf.get(STORAGE_POLICY_PROPERTY_CF_PREFIX + Bytes.toString(tableAndFamily),
- conf.get(STORAGE_POLICY_PROPERTY));
- FSUtils.setStoragePolicy(fs, cfPath, policy);
- }
-
- /*
- * Data structure to hold a Writer and amount of data written on it.
- */
- static class WriterLength {
- long written = 0;
- StoreFileWriter writer = null;
- }
-
- /**
- * Return the start keys of all of the regions in this table,
- * as a list of ImmutableBytesWritable.
- */
- private static List<ImmutableBytesWritable> getRegionStartKeys(List<RegionLocator> regionLocators,
- boolean writeMultipleTables)
- throws IOException {
-
- ArrayList<ImmutableBytesWritable> ret = new ArrayList<>();
- for(RegionLocator regionLocator : regionLocators)
- {
- TableName tableName = regionLocator.getName();
- LOG.info("Looking up current regions for table " + tableName);
- byte[][] byteKeys = regionLocator.getStartKeys();
- for (byte[] byteKey : byteKeys) {
- byte[] fullKey = byteKey; //HFileOutputFormat2 use case
- if (writeMultipleTables)
- {
- //MultiTableHFileOutputFormat use case
- fullKey = combineTableNameSuffix(tableName.getName(), byteKey);
- }
- if (LOG.isDebugEnabled()) {
- LOG.debug("SplitPoint startkey for table [" + tableName + "]: [" + Bytes.toStringBinary
- (fullKey) + "]");
- }
- ret.add(new ImmutableBytesWritable(fullKey));
- }
- }
- return ret;
- }
-
- /**
- * Write out a {@link SequenceFile} that can be read by
- * {@link TotalOrderPartitioner} that contains the split points in startKeys.
- */
- @SuppressWarnings("deprecation")
- private static void writePartitions(Configuration conf, Path partitionsPath,
- List<ImmutableBytesWritable> startKeys, boolean writeMultipleTables) throws IOException {
- LOG.info("Writing partition information to " + partitionsPath);
- if (startKeys.isEmpty()) {
- throw new IllegalArgumentException("No regions passed");
- }
-
- // We're generating a list of split points, and we don't ever
- // have keys < the first region (which has an empty start key)
- // so we need to remove it. Otherwise we would end up with an
- // empty reducer with index 0
- TreeSet<ImmutableBytesWritable> sorted = new TreeSet<>(startKeys);
- ImmutableBytesWritable first = sorted.first();
- if (writeMultipleTables) {
- first = new ImmutableBytesWritable(MultiTableHFileOutputFormat.getSuffix(sorted.first
- ().get()));
- }
- if (!first.equals(HConstants.EMPTY_BYTE_ARRAY)) {
- throw new IllegalArgumentException(
- "First region of table should have empty start key. Instead has: "
- + Bytes.toStringBinary(first.get()));
- }
- sorted.remove(sorted.first());
-
- // Write the actual file
- FileSystem fs = partitionsPath.getFileSystem(conf);
- SequenceFile.Writer writer = SequenceFile.createWriter(
- fs, conf, partitionsPath, ImmutableBytesWritable.class,
- NullWritable.class);
-
- try {
- for (ImmutableBytesWritable startKey : sorted) {
- writer.append(startKey, NullWritable.get());
- }
- } finally {
- writer.close();
- }
- }
-
- /**
- * Configure a MapReduce Job to perform an incremental load into the given
- * table. This
- * <ul>
- * <li>Inspects the table to configure a total order partitioner</li>
- * <li>Uploads the partitions file to the cluster and adds it to the DistributedCache</li>
- * <li>Sets the number of reduce tasks to match the current number of regions</li>
- * <li>Sets the output key/value class to match HFileOutputFormat2's requirements</li>
- * <li>Sets the reducer up to perform the appropriate sorting (either KeyValueSortReducer or
- * PutSortReducer)</li>
- * </ul>
- * The user should be sure to set the map output value class to either KeyValue or Put before
- * running this function.
- */
- public static void configureIncrementalLoad(Job job, Table table, RegionLocator regionLocator)
- throws IOException {
- configureIncrementalLoad(job, table.getDescriptor(), regionLocator);
- }
-
- /**
- * Configure a MapReduce Job to perform an incremental load into the given
- * table. This
- * <ul>
- * <li>Inspects the table to configure a total order partitioner</li>
- * <li>Uploads the partitions file to the cluster and adds it to the DistributedCache</li>
- * <li>Sets the number of reduce tasks to match the current number of regions</li>
- * <li>Sets the output key/value class to match HFileOutputFormat2's requirements</li>
- * <li>Sets the reducer up to perform the appropriate sorting (either KeyValueSortReducer or
- * PutSortReducer)</li>
- * </ul>
- * The user should be sure to set the map output value class to either KeyValue or Put before
- * running this function.
- */
- public static void configureIncrementalLoad(Job job, TableDescriptor tableDescriptor,
- RegionLocator regionLocator) throws IOException {
- ArrayList<TableInfo> singleTableInfo = new ArrayList<>();
- singleTableInfo.add(new TableInfo(tableDescriptor, regionLocator));
- configureIncrementalLoad(job, singleTableInfo, HFileOutputFormat2.class);
- }
-
- static void configureIncrementalLoad(Job job, List<TableInfo> multiTableInfo, Class<? extends OutputFormat<?, ?>> cls) throws IOException {
- Configuration conf = job.getConfiguration();
- job.setOutputKeyClass(ImmutableBytesWritable.class);
- job.setOutputValueClass(KeyValue.class);
- job.setOutputFormatClass(cls);
-
- if (multiTableInfo.stream().distinct().count() != multiTableInfo.size()) {
- throw new IllegalArgumentException("Duplicate entries found in TableInfo argument");
- }
- boolean writeMultipleTables = false;
- if (MultiTableHFileOutputFormat.class.equals(cls)) {
- writeMultipleTables = true;
- conf.setBoolean(MULTI_TABLE_HFILEOUTPUTFORMAT_CONF_KEY, true);
- }
- // Based on the configured map output class, set the correct reducer to properly
- // sort the incoming values.
- // TODO it would be nice to pick one or the other of these formats.
- if (KeyValue.class.equals(job.getMapOutputValueClass())) {
- job.setReducerClass(KeyValueSortReducer.class);
- } else if (Put.class.equals(job.getMapOutputValueClass())) {
- job.setReducerClass(PutSortReducer.class);
- } else if (Text.class.equals(job.getMapOutputValueClass())) {
- job.setReducerClass(TextSortReducer.class);
- } else {
- LOG.warn("Unknown map output value type:" + job.getMapOutputValueClass());
- }
-
- conf.setStrings("io.serializations", conf.get("io.serializations"),
- MutationSerialization.class.getName(), ResultSerialization.class.getName(),
- KeyValueSerialization.class.getName());
-
- if (conf.getBoolean(LOCALITY_SENSITIVE_CONF_KEY, DEFAULT_LOCALITY_SENSITIVE)) {
- LOG.info("bulkload locality sensitive enabled");
- }
-
- /* Now get the region start keys for every table required */
- List<String> allTableNames = new ArrayList<>(multiTableInfo.size());
- List<RegionLocator> regionLocators = new ArrayList<>( multiTableInfo.size());
- List<TableDescriptor> tableDescriptors = new ArrayList<>( multiTableInfo.size());
-
- for( TableInfo tableInfo : multiTableInfo )
- {
- regionLocators.add(tableInfo.getRegionLocator());
- allTableNames.add(tableInfo.getRegionLocator().getName().getNameAsString());
- tableDescriptors.add(tableInfo.getTableDescriptor());
- }
- // Record tablenames for creating writer by favored nodes, and decoding compression, block size and other attributes of columnfamily per table
- conf.set(OUTPUT_TABLE_NAME_CONF_KEY, StringUtils.join(allTableNames, Bytes
- .toString(tableSeparator)));
- List<ImmutableBytesWritable> startKeys = getRegionStartKeys(regionLocators, writeMultipleTables);
- // Use table's region boundaries for TOP split points.
- LOG.info("Configuring " + startKeys.size() + " reduce partitions " +
- "to match current region count for all tables");
- job.setNumReduceTasks(startKeys.size());
-
- configurePartitioner(job, startKeys, writeMultipleTables);
- // Set compression algorithms based on column families
-
- conf.set(COMPRESSION_FAMILIES_CONF_KEY, serializeColumnFamilyAttribute(compressionDetails,
- tableDescriptors));
- conf.set(BLOCK_SIZE_FAMILIES_CONF_KEY, serializeColumnFamilyAttribute(blockSizeDetails,
- tableDescriptors));
- conf.set(BLOOM_TYPE_FAMILIES_CONF_KEY, serializeColumnFamilyAttribute(bloomTypeDetails,
- tableDescriptors));
- conf.set(DATABLOCK_ENCODING_FAMILIES_CONF_KEY,
- serializeColumnFamilyAttribute(dataBlockEncodingDetails, tableDescriptors));
-
- TableMapReduceUtil.addDependencyJars(job);
- TableMapReduceUtil.initCredentials(job);
- LOG.info("Incremental output configured for tables: " + StringUtils.join(allTableNames, ","));
- }
-
- public static void configureIncrementalLoadMap(Job job, TableDescriptor tableDescriptor) throws
- IOException {
- Configuration conf = job.getConfiguration();
-
- job.setOutputKeyClass(ImmutableBytesWritable.class);
- job.setOutputValueClass(KeyValue.class);
- job.setOutputFormatClass(HFileOutputFormat2.class);
-
- ArrayList<TableDescriptor> singleTableDescriptor = new ArrayList<>(1);
- singleTableDescriptor.add(tableDescriptor);
-
- conf.set(OUTPUT_TABLE_NAME_CONF_KEY, tableDescriptor.getTableName().getNameAsString());
- // Set compression algorithms based on column families
- conf.set(COMPRESSION_FAMILIES_CONF_KEY,
- serializeColumnFamilyAttribute(compressionDetails, singleTableDescriptor));
- conf.set(BLOCK_SIZE_FAMILIES_CONF_KEY,
- serializeColumnFamilyAttribute(blockSizeDetails, singleTableDescriptor));
- conf.set(BLOOM_TYPE_FAMILIES_CONF_KEY,
- serializeColumnFamilyAttribute(bloomTypeDetails, singleTableDescriptor));
- conf.set(DATABLOCK_ENCODING_FAMILIES_CONF_KEY,
- serializeColumnFamilyAttribute(dataBlockEncodingDetails, singleTableDescriptor));
-
- TableMapReduceUtil.addDependencyJars(job);
- TableMapReduceUtil.initCredentials(job);
- LOG.info("Incremental table " + tableDescriptor.getTableName() + " output configured.");
- }
-
- /**
- * Runs inside the task to deserialize column family to compression algorithm
- * map from the configuration.
- *
- * @param conf to read the serialized values from
- * @return a map from column family to the configured compression algorithm
- */
- @VisibleForTesting
- static Map<byte[], Algorithm> createFamilyCompressionMap(Configuration
- conf) {
- Map<byte[], String> stringMap = createFamilyConfValueMap(conf,
- COMPRESSION_FAMILIES_CONF_KEY);
- Map<byte[], Algorithm> compressionMap = new TreeMap<>(Bytes.BYTES_COMPARATOR);
- for (Map.Entry<byte[], String> e : stringMap.entrySet()) {
- Algorithm algorithm = HFileWriterImpl.compressionByName(e.getValue());
- compressionMap.put(e.getKey(), algorithm);
- }
- return compressionMap;
- }
-
- /**
- * Runs inside the task to deserialize column family to bloom filter type
- * map from the configuration.
- *
- * @param conf to read the serialized values from
- * @return a map from column family to the the configured bloom filter type
- */
- @VisibleForTesting
- static Map<byte[], BloomType> createFamilyBloomTypeMap(Configuration conf) {
- Map<byte[], String> stringMap = createFamilyConfValueMap(conf,
- BLOOM_TYPE_FAMILIES_CONF_KEY);
- Map<byte[], BloomType> bloomTypeMap = new TreeMap<>(Bytes.BYTES_COMPARATOR);
- for (Map.Entry<byte[], String> e : stringMap.entrySet()) {
- BloomType bloomType = BloomType.valueOf(e.getValue());
- bloomTypeMap.put(e.getKey(), bloomType);
- }
- return bloomTypeMap;
- }
-
- /**
- * Runs inside the task to deserialize column family to block size
- * map from the configuration.
- *
- * @param conf to read the serialized values from
- * @return a map from column family to the configured block size
- */
- @VisibleForTesting
- static Map<byte[], Integer> createFamilyBlockSizeMap(Configuration conf) {
- Map<byte[], String> stringMap = createFamilyConfValueMap(conf,
- BLOCK_SIZE_FAMILIES_CONF_KEY);
- Map<byte[], Integer> blockSizeMap = new TreeMap<>(Bytes.BYTES_COMPARATOR);
- for (Map.Entry<byte[], String> e : stringMap.entrySet()) {
- Integer blockSize = Integer.parseInt(e.getValue());
- blockSizeMap.put(e.getKey(), blockSize);
- }
- return blockSizeMap;
- }
-
- /**
- * Runs inside the task to deserialize column family to data block encoding
- * type map from the configuration.
- *
- * @param conf to read the serialized values from
- * @return a map from column family to HFileDataBlockEncoder for the
- * configured data block type for the family
- */
- @VisibleForTesting
- static Map<byte[], DataBlockEncoding> createFamilyDataBlockEncodingMap(
- Configuration conf) {
- Map<byte[], String> stringMap = createFamilyConfValueMap(conf,
- DATABLOCK_ENCODING_FAMILIES_CONF_KEY);
- Map<byte[], DataBlockEncoding> encoderMap = new TreeMap<>(Bytes.BYTES_COMPARATOR);
- for (Map.Entry<byte[], String> e : stringMap.entrySet()) {
- encoderMap.put(e.getKey(), DataBlockEncoding.valueOf((e.getValue())));
- }
- return encoderMap;
- }
-
-
- /**
- * Run inside the task to deserialize column family to given conf value map.
- *
- * @param conf to read the serialized values from
- * @param confName conf key to read from the configuration
- * @return a map of column family to the given configuration value
- */
- private static Map<byte[], String> createFamilyConfValueMap(
- Configuration conf, String confName) {
- Map<byte[], String> confValMap = new TreeMap<>(Bytes.BYTES_COMPARATOR);
- String confVal = conf.get(confName, "");
- for (String familyConf : confVal.split("&")) {
- String[] familySplit = familyConf.split("=");
- if (familySplit.length != 2) {
- continue;
- }
- try {
- confValMap.put(URLDecoder.decode(familySplit[0], "UTF-8").getBytes(StandardCharsets.UTF_8),
- URLDecoder.decode(familySplit[1], "UTF-8"));
- } catch (UnsupportedEncodingException e) {
- // will not happen with UTF-8 encoding
- throw new AssertionError(e);
- }
- }
- return confValMap;
- }
-
- /**
- * Configure <code>job</code> with a TotalOrderPartitioner, partitioning against
- * <code>splitPoints</code>. Cleans up the partitions file after job exists.
- */
- static void configurePartitioner(Job job, List<ImmutableBytesWritable> splitPoints, boolean
- writeMultipleTables)
- throws IOException {
- Configuration conf = job.getConfiguration();
- // create the partitions file
- FileSystem fs = FileSystem.get(conf);
- String hbaseTmpFsDir =
- conf.get(HConstants.TEMPORARY_FS_DIRECTORY_KEY,
- HConstants.DEFAULT_TEMPORARY_HDFS_DIRECTORY);
- Path partitionsPath = new Path(hbaseTmpFsDir, "partitions_" + UUID.randomUUID());
- fs.makeQualified(partitionsPath);
- writePartitions(conf, partitionsPath, splitPoints, writeMultipleTables);
- fs.deleteOnExit(partitionsPath);
-
- // configure job to use it
- job.setPartitionerClass(TotalOrderPartitioner.class);
- TotalOrderPartitioner.setPartitionFile(conf, partitionsPath);
- }
-
- @edu.umd.cs.findbugs.annotations.SuppressWarnings(value = "RCN_REDUNDANT_NULLCHECK_OF_NONNULL_VALUE")
- @VisibleForTesting
- static String serializeColumnFamilyAttribute(Function<ColumnFamilyDescriptor, String> fn, List<TableDescriptor> allTables)
- throws UnsupportedEncodingException {
- StringBuilder attributeValue = new StringBuilder();
- int i = 0;
- for (TableDescriptor tableDescriptor : allTables) {
- if (tableDescriptor == null) {
- // could happen with mock table instance
- // CODEREVIEW: Can I set an empty string in conf if mock table instance?
- return "";
- }
- for (ColumnFamilyDescriptor familyDescriptor : tableDescriptor.getColumnFamilies()) {
- if (i++ > 0) {
- attributeValue.append('&');
- }
- attributeValue.append(URLEncoder.encode(
- Bytes.toString(combineTableNameSuffix(tableDescriptor.getTableName().getName(), familyDescriptor.getName())),
- "UTF-8"));
- attributeValue.append('=');
- attributeValue.append(URLEncoder.encode(fn.apply(familyDescriptor), "UTF-8"));
- }
- }
- // Get rid of the last ampersand
- return attributeValue.toString();
- }
-
- /**
- * Serialize column family to compression algorithm map to configuration.
- * Invoked while configuring the MR job for incremental load.
- *
- * @param tableDescriptor to read the properties from
- * @param conf to persist serialized values into
- * @throws IOException
- * on failure to read column family descriptors
- */
- @VisibleForTesting
- static Function<ColumnFamilyDescriptor, String> compressionDetails = familyDescriptor ->
- familyDescriptor.getCompressionType().getName();
-
- /**
- * Serialize column family to block size map to configuration. Invoked while
- * configuring the MR job for incremental load.
- *
- * @param tableDescriptor
- * to read the properties from
- * @param conf
- * to persist serialized values into
- *
- * @throws IOException
- * on failure to read column family descriptors
- */
- @VisibleForTesting
- static Function<ColumnFamilyDescriptor, String> blockSizeDetails = familyDescriptor -> String
- .valueOf(familyDescriptor.getBlocksize());
-
- /**
- * Serialize column family to bloom type map to configuration. Invoked while
- * configuring the MR job for incremental load.
- *
- * @param tableDescriptor
- * to read the properties from
- * @param conf
- * to persist serialized values into
- *
- * @throws IOException
- * on failure to read column family descriptors
- */
- @VisibleForTesting
- static Function<ColumnFamilyDescriptor, String> bloomTypeDetails = familyDescriptor -> {
- String bloomType = familyDescriptor.getBloomFilterType().toString();
- if (bloomType == null) {
- bloomType = ColumnFamilyDescriptorBuilder.DEFAULT_BLOOMFILTER.name();
- }
- return bloomType;
- };
-
- /**
- * Serialize column family to data block encoding map to configuration.
- * Invoked while configuring the MR job for incremental load.
- *
- * @param tableDescriptor
- * to read the properties from
- * @param conf
- * to persist serialized values into
- * @throws IOException
- * on failure to read column family descriptors
- */
- @VisibleForTesting
- static Function<ColumnFamilyDescriptor, String> dataBlockEncodingDetails = familyDescriptor -> {
- DataBlockEncoding encoding = familyDescriptor.getDataBlockEncoding();
- if (encoding == null) {
- encoding = DataBlockEncoding.NONE;
- }
- return encoding.toString();
- };
-
-}
http://git-wip-us.apache.org/repos/asf/hbase/blob/664b6be0/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/HRegionPartitioner.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/HRegionPartitioner.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/HRegionPartitioner.java
deleted file mode 100644
index 3475a48..0000000
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/HRegionPartitioner.java
+++ /dev/null
@@ -1,140 +0,0 @@
-/**
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.hadoop.hbase.mapreduce;
-
-import java.io.IOException;
-
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
-import org.apache.hadoop.hbase.classification.InterfaceAudience;
-import org.apache.hadoop.conf.Configurable;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.hbase.HBaseConfiguration;
-import org.apache.hadoop.hbase.TableName;
-import org.apache.hadoop.hbase.client.Connection;
-import org.apache.hadoop.hbase.client.ConnectionFactory;
-import org.apache.hadoop.hbase.client.RegionLocator;
-import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
-import org.apache.hadoop.hbase.mapred.TableOutputFormat;
-import org.apache.hadoop.hbase.util.Bytes;
-import org.apache.hadoop.mapreduce.Partitioner;
-
-/**
- * This is used to partition the output keys into groups of keys.
- * Keys are grouped according to the regions that currently exist
- * so that each reducer fills a single region so load is distributed.
- *
- * <p>This class is not suitable as partitioner creating hfiles
- * for incremental bulk loads as region spread will likely change between time of
- * hfile creation and load time. See {@link LoadIncrementalHFiles}
- * and <a href="http://hbase.apache.org/book.html#arch.bulk.load">Bulk Load</a>.
- *
- * @param <KEY> The type of the key.
- * @param <VALUE> The type of the value.
- */
-@InterfaceAudience.Public
-public class HRegionPartitioner<KEY, VALUE>
-extends Partitioner<ImmutableBytesWritable, VALUE>
-implements Configurable {
-
- private static final Log LOG = LogFactory.getLog(HRegionPartitioner.class);
- private Configuration conf = null;
- // Connection and locator are not cleaned up; they just die when partitioner is done.
- private Connection connection;
- private RegionLocator locator;
- private byte[][] startKeys;
-
- /**
- * Gets the partition number for a given key (hence record) given the total
- * number of partitions i.e. number of reduce-tasks for the job.
- *
- * <p>Typically a hash function on a all or a subset of the key.</p>
- *
- * @param key The key to be partitioned.
- * @param value The entry value.
- * @param numPartitions The total number of partitions.
- * @return The partition number for the <code>key</code>.
- * @see org.apache.hadoop.mapreduce.Partitioner#getPartition(
- * java.lang.Object, java.lang.Object, int)
- */
- @Override
- public int getPartition(ImmutableBytesWritable key,
- VALUE value, int numPartitions) {
- byte[] region = null;
- // Only one region return 0
- if (this.startKeys.length == 1){
- return 0;
- }
- try {
- // Not sure if this is cached after a split so we could have problems
- // here if a region splits while mapping
- region = this.locator.getRegionLocation(key.get()).getRegionInfo().getStartKey();
- } catch (IOException e) {
- LOG.error(e);
- }
- for (int i = 0; i < this.startKeys.length; i++){
- if (Bytes.compareTo(region, this.startKeys[i]) == 0 ){
- if (i >= numPartitions-1){
- // cover if we have less reduces then regions.
- return (Integer.toString(i).hashCode()
- & Integer.MAX_VALUE) % numPartitions;
- }
- return i;
- }
- }
- // if above fails to find start key that match we need to return something
- return 0;
- }
-
- /**
- * Returns the current configuration.
- *
- * @return The current configuration.
- * @see org.apache.hadoop.conf.Configurable#getConf()
- */
- @Override
- public Configuration getConf() {
- return conf;
- }
-
- /**
- * Sets the configuration. This is used to determine the start keys for the
- * given table.
- *
- * @param configuration The configuration to set.
- * @see org.apache.hadoop.conf.Configurable#setConf(
- * org.apache.hadoop.conf.Configuration)
- */
- @Override
- public void setConf(Configuration configuration) {
- this.conf = HBaseConfiguration.create(configuration);
- try {
- this.connection = ConnectionFactory.createConnection(HBaseConfiguration.create(conf));
- TableName tableName = TableName.valueOf(conf.get(TableOutputFormat.OUTPUT_TABLE));
- this.locator = this.connection.getRegionLocator(tableName);
- } catch (IOException e) {
- LOG.error(e);
- }
- try {
- this.startKeys = this.locator.getStartKeys();
- } catch (IOException e) {
- LOG.error(e);
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/hbase/blob/664b6be0/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/HashTable.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/HashTable.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/HashTable.java
deleted file mode 100644
index dfac471..0000000
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/HashTable.java
+++ /dev/null
@@ -1,747 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.hadoop.hbase.mapreduce;
-
-import java.io.IOException;
-import java.io.InputStreamReader;
-import java.io.OutputStreamWriter;
-import java.security.MessageDigest;
-import java.security.NoSuchAlgorithmException;
-import java.util.ArrayList;
-import java.util.Collections;
-import java.util.List;
-import java.util.Properties;
-
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.conf.Configured;
-import org.apache.hadoop.fs.FSDataInputStream;
-import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.hbase.Cell;
-import org.apache.hadoop.hbase.HBaseConfiguration;
-import org.apache.hadoop.hbase.HConstants;
-import org.apache.hadoop.hbase.TableName;
-import org.apache.hadoop.hbase.client.Connection;
-import org.apache.hadoop.hbase.client.ConnectionFactory;
-import org.apache.hadoop.hbase.client.Result;
-import org.apache.hadoop.hbase.client.Scan;
-import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
-import org.apache.hadoop.hbase.util.Bytes;
-import org.apache.hadoop.hbase.util.Pair;
-import org.apache.hadoop.io.MapFile;
-import org.apache.hadoop.io.NullWritable;
-import org.apache.hadoop.io.SequenceFile;
-import org.apache.hadoop.mapreduce.Job;
-import org.apache.hadoop.mapreduce.Reducer;
-import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
-import org.apache.hadoop.mapreduce.lib.output.MapFileOutputFormat;
-import org.apache.hadoop.mapreduce.lib.partition.TotalOrderPartitioner;
-import org.apache.hadoop.util.GenericOptionsParser;
-import org.apache.hadoop.util.Tool;
-import org.apache.hadoop.util.ToolRunner;
-
-import org.apache.hadoop.hbase.shaded.com.google.common.base.Charsets;
-import org.apache.hadoop.hbase.shaded.com.google.common.base.Throwables;
-import org.apache.hadoop.hbase.shaded.com.google.common.collect.Ordering;
-
-public class HashTable extends Configured implements Tool {
-
- private static final Log LOG = LogFactory.getLog(HashTable.class);
-
- private static final int DEFAULT_BATCH_SIZE = 8000;
-
- private final static String HASH_BATCH_SIZE_CONF_KEY = "hash.batch.size";
- final static String PARTITIONS_FILE_NAME = "partitions";
- final static String MANIFEST_FILE_NAME = "manifest";
- final static String HASH_DATA_DIR = "hashes";
- final static String OUTPUT_DATA_FILE_PREFIX = "part-r-";
- private final static String TMP_MANIFEST_FILE_NAME = "manifest.tmp";
-
- TableHash tableHash = new TableHash();
- Path destPath;
-
- public HashTable(Configuration conf) {
- super(conf);
- }
-
- public static class TableHash {
-
- Path hashDir;
-
- String tableName;
- String families = null;
- long batchSize = DEFAULT_BATCH_SIZE;
- int numHashFiles = 0;
- byte[] startRow = HConstants.EMPTY_START_ROW;
- byte[] stopRow = HConstants.EMPTY_END_ROW;
- int scanBatch = 0;
- int versions = -1;
- long startTime = 0;
- long endTime = 0;
-
- List<ImmutableBytesWritable> partitions;
-
- public static TableHash read(Configuration conf, Path hashDir) throws IOException {
- TableHash tableHash = new TableHash();
- FileSystem fs = hashDir.getFileSystem(conf);
- tableHash.hashDir = hashDir;
- tableHash.readPropertiesFile(fs, new Path(hashDir, MANIFEST_FILE_NAME));
- tableHash.readPartitionFile(fs, conf, new Path(hashDir, PARTITIONS_FILE_NAME));
- return tableHash;
- }
-
- void writePropertiesFile(FileSystem fs, Path path) throws IOException {
- Properties p = new Properties();
- p.setProperty("table", tableName);
- if (families != null) {
- p.setProperty("columnFamilies", families);
- }
- p.setProperty("targetBatchSize", Long.toString(batchSize));
- p.setProperty("numHashFiles", Integer.toString(numHashFiles));
- if (!isTableStartRow(startRow)) {
- p.setProperty("startRowHex", Bytes.toHex(startRow));
- }
- if (!isTableEndRow(stopRow)) {
- p.setProperty("stopRowHex", Bytes.toHex(stopRow));
- }
- if (scanBatch > 0) {
- p.setProperty("scanBatch", Integer.toString(scanBatch));
- }
- if (versions >= 0) {
- p.setProperty("versions", Integer.toString(versions));
- }
- if (startTime != 0) {
- p.setProperty("startTimestamp", Long.toString(startTime));
- }
- if (endTime != 0) {
- p.setProperty("endTimestamp", Long.toString(endTime));
- }
-
- try (OutputStreamWriter osw = new OutputStreamWriter(fs.create(path), Charsets.UTF_8)) {
- p.store(osw, null);
- }
- }
-
- void readPropertiesFile(FileSystem fs, Path path) throws IOException {
- Properties p = new Properties();
- try (FSDataInputStream in = fs.open(path)) {
- try (InputStreamReader isr = new InputStreamReader(in, Charsets.UTF_8)) {
- p.load(isr);
- }
- }
- tableName = p.getProperty("table");
- families = p.getProperty("columnFamilies");
- batchSize = Long.parseLong(p.getProperty("targetBatchSize"));
- numHashFiles = Integer.parseInt(p.getProperty("numHashFiles"));
-
- String startRowHex = p.getProperty("startRowHex");
- if (startRowHex != null) {
- startRow = Bytes.fromHex(startRowHex);
- }
- String stopRowHex = p.getProperty("stopRowHex");
- if (stopRowHex != null) {
- stopRow = Bytes.fromHex(stopRowHex);
- }
-
- String scanBatchString = p.getProperty("scanBatch");
- if (scanBatchString != null) {
- scanBatch = Integer.parseInt(scanBatchString);
- }
-
- String versionString = p.getProperty("versions");
- if (versionString != null) {
- versions = Integer.parseInt(versionString);
- }
-
- String startTimeString = p.getProperty("startTimestamp");
- if (startTimeString != null) {
- startTime = Long.parseLong(startTimeString);
- }
-
- String endTimeString = p.getProperty("endTimestamp");
- if (endTimeString != null) {
- endTime = Long.parseLong(endTimeString);
- }
- }
-
- Scan initScan() throws IOException {
- Scan scan = new Scan();
- scan.setCacheBlocks(false);
- if (startTime != 0 || endTime != 0) {
- scan.setTimeRange(startTime, endTime == 0 ? HConstants.LATEST_TIMESTAMP : endTime);
- }
- if (scanBatch > 0) {
- scan.setBatch(scanBatch);
- }
- if (versions >= 0) {
- scan.setMaxVersions(versions);
- }
- if (!isTableStartRow(startRow)) {
- scan.setStartRow(startRow);
- }
- if (!isTableEndRow(stopRow)) {
- scan.setStopRow(stopRow);
- }
- if(families != null) {
- for(String fam : families.split(",")) {
- scan.addFamily(Bytes.toBytes(fam));
- }
- }
- return scan;
- }
-
- /**
- * Choose partitions between row ranges to hash to a single output file
- * Selects region boundaries that fall within the scan range, and groups them
- * into the desired number of partitions.
- */
- void selectPartitions(Pair<byte[][], byte[][]> regionStartEndKeys) {
- List<byte[]> startKeys = new ArrayList<>();
- for (int i = 0; i < regionStartEndKeys.getFirst().length; i++) {
- byte[] regionStartKey = regionStartEndKeys.getFirst()[i];
- byte[] regionEndKey = regionStartEndKeys.getSecond()[i];
-
- // if scan begins after this region, or starts before this region, then drop this region
- // in other words:
- // IF (scan begins before the end of this region
- // AND scan ends before the start of this region)
- // THEN include this region
- if ((isTableStartRow(startRow) || isTableEndRow(regionEndKey)
- || Bytes.compareTo(startRow, regionEndKey) < 0)
- && (isTableEndRow(stopRow) || isTableStartRow(regionStartKey)
- || Bytes.compareTo(stopRow, regionStartKey) > 0)) {
- startKeys.add(regionStartKey);
- }
- }
-
- int numRegions = startKeys.size();
- if (numHashFiles == 0) {
- numHashFiles = numRegions / 100;
- }
- if (numHashFiles == 0) {
- numHashFiles = 1;
- }
- if (numHashFiles > numRegions) {
- // can't partition within regions
- numHashFiles = numRegions;
- }
-
- // choose a subset of start keys to group regions into ranges
- partitions = new ArrayList<>(numHashFiles - 1);
- // skip the first start key as it is not a partition between ranges.
- for (long i = 1; i < numHashFiles; i++) {
- int splitIndex = (int) (numRegions * i / numHashFiles);
- partitions.add(new ImmutableBytesWritable(startKeys.get(splitIndex)));
- }
- }
-
- void writePartitionFile(Configuration conf, Path path) throws IOException {
- FileSystem fs = path.getFileSystem(conf);
- @SuppressWarnings("deprecation")
- SequenceFile.Writer writer = SequenceFile.createWriter(
- fs, conf, path, ImmutableBytesWritable.class, NullWritable.class);
-
- for (int i = 0; i < partitions.size(); i++) {
- writer.append(partitions.get(i), NullWritable.get());
- }
- writer.close();
- }
-
- private void readPartitionFile(FileSystem fs, Configuration conf, Path path)
- throws IOException {
- @SuppressWarnings("deprecation")
- SequenceFile.Reader reader = new SequenceFile.Reader(fs, path, conf);
- ImmutableBytesWritable key = new ImmutableBytesWritable();
- partitions = new ArrayList<>();
- while (reader.next(key)) {
- partitions.add(new ImmutableBytesWritable(key.copyBytes()));
- }
- reader.close();
-
- if (!Ordering.natural().isOrdered(partitions)) {
- throw new IOException("Partitions are not ordered!");
- }
- }
-
- @Override
- public String toString() {
- StringBuilder sb = new StringBuilder();
- sb.append("tableName=").append(tableName);
- if (families != null) {
- sb.append(", families=").append(families);
- }
- sb.append(", batchSize=").append(batchSize);
- sb.append(", numHashFiles=").append(numHashFiles);
- if (!isTableStartRow(startRow)) {
- sb.append(", startRowHex=").append(Bytes.toHex(startRow));
- }
- if (!isTableEndRow(stopRow)) {
- sb.append(", stopRowHex=").append(Bytes.toHex(stopRow));
- }
- if (scanBatch >= 0) {
- sb.append(", scanBatch=").append(scanBatch);
- }
- if (versions >= 0) {
- sb.append(", versions=").append(versions);
- }
- if (startTime != 0) {
- sb.append("startTime=").append(startTime);
- }
- if (endTime != 0) {
- sb.append("endTime=").append(endTime);
- }
- return sb.toString();
- }
-
- static String getDataFileName(int hashFileIndex) {
- return String.format(HashTable.OUTPUT_DATA_FILE_PREFIX + "%05d", hashFileIndex);
- }
-
- /**
- * Open a TableHash.Reader starting at the first hash at or after the given key.
- * @throws IOException
- */
- public Reader newReader(Configuration conf, ImmutableBytesWritable startKey)
- throws IOException {
- return new Reader(conf, startKey);
- }
-
- public class Reader implements java.io.Closeable {
- private final Configuration conf;
-
- private int hashFileIndex;
- private MapFile.Reader mapFileReader;
-
- private boolean cachedNext;
- private ImmutableBytesWritable key;
- private ImmutableBytesWritable hash;
-
- Reader(Configuration conf, ImmutableBytesWritable startKey) throws IOException {
- this.conf = conf;
- int partitionIndex = Collections.binarySearch(partitions, startKey);
- if (partitionIndex >= 0) {
- // if the key is equal to a partition, then go the file after that partition
- hashFileIndex = partitionIndex+1;
- } else {
- // if the key is between partitions, then go to the file between those partitions
- hashFileIndex = -1-partitionIndex;
- }
- openHashFile();
-
- // MapFile's don't make it easy to seek() so that the subsequent next() returns
- // the desired key/value pair. So we cache it for the first call of next().
- hash = new ImmutableBytesWritable();
- key = (ImmutableBytesWritable) mapFileReader.getClosest(startKey, hash);
- if (key == null) {
- cachedNext = false;
- hash = null;
- } else {
- cachedNext = true;
- }
- }
-
- /**
- * Read the next key/hash pair.
- * Returns true if such a pair exists and false when at the end of the data.
- */
- public boolean next() throws IOException {
- if (cachedNext) {
- cachedNext = false;
- return true;
- }
- key = new ImmutableBytesWritable();
- hash = new ImmutableBytesWritable();
- while (true) {
- boolean hasNext = mapFileReader.next(key, hash);
- if (hasNext) {
- return true;
- }
- hashFileIndex++;
- if (hashFileIndex < TableHash.this.numHashFiles) {
- mapFileReader.close();
- openHashFile();
- } else {
- key = null;
- hash = null;
- return false;
- }
- }
- }
-
- /**
- * Get the current key
- * @return the current key or null if there is no current key
- */
- public ImmutableBytesWritable getCurrentKey() {
- return key;
- }
-
- /**
- * Get the current hash
- * @return the current hash or null if there is no current hash
- */
- public ImmutableBytesWritable getCurrentHash() {
- return hash;
- }
-
- private void openHashFile() throws IOException {
- if (mapFileReader != null) {
- mapFileReader.close();
- }
- Path dataDir = new Path(TableHash.this.hashDir, HASH_DATA_DIR);
- Path dataFile = new Path(dataDir, getDataFileName(hashFileIndex));
- mapFileReader = new MapFile.Reader(dataFile, conf);
- }
-
- @Override
- public void close() throws IOException {
- mapFileReader.close();
- }
- }
- }
-
- static boolean isTableStartRow(byte[] row) {
- return Bytes.equals(HConstants.EMPTY_START_ROW, row);
- }
-
- static boolean isTableEndRow(byte[] row) {
- return Bytes.equals(HConstants.EMPTY_END_ROW, row);
- }
-
- public Job createSubmittableJob(String[] args) throws IOException {
- Path partitionsPath = new Path(destPath, PARTITIONS_FILE_NAME);
- generatePartitions(partitionsPath);
-
- Job job = Job.getInstance(getConf(),
- getConf().get("mapreduce.job.name", "hashTable_" + tableHash.tableName));
- Configuration jobConf = job.getConfiguration();
- jobConf.setLong(HASH_BATCH_SIZE_CONF_KEY, tableHash.batchSize);
- job.setJarByClass(HashTable.class);
-
- TableMapReduceUtil.initTableMapperJob(tableHash.tableName, tableHash.initScan(),
- HashMapper.class, ImmutableBytesWritable.class, ImmutableBytesWritable.class, job);
-
- // use a TotalOrderPartitioner and reducers to group region output into hash files
- job.setPartitionerClass(TotalOrderPartitioner.class);
- TotalOrderPartitioner.setPartitionFile(jobConf, partitionsPath);
- job.setReducerClass(Reducer.class); // identity reducer
- job.setNumReduceTasks(tableHash.numHashFiles);
- job.setOutputKeyClass(ImmutableBytesWritable.class);
- job.setOutputValueClass(ImmutableBytesWritable.class);
- job.setOutputFormatClass(MapFileOutputFormat.class);
- FileOutputFormat.setOutputPath(job, new Path(destPath, HASH_DATA_DIR));
-
- return job;
- }
-
- private void generatePartitions(Path partitionsPath) throws IOException {
- Connection connection = ConnectionFactory.createConnection(getConf());
- Pair<byte[][], byte[][]> regionKeys
- = connection.getRegionLocator(TableName.valueOf(tableHash.tableName)).getStartEndKeys();
- connection.close();
-
- tableHash.selectPartitions(regionKeys);
- LOG.info("Writing " + tableHash.partitions.size() + " partition keys to " + partitionsPath);
-
- tableHash.writePartitionFile(getConf(), partitionsPath);
- }
-
- static class ResultHasher {
- private MessageDigest digest;
-
- private boolean batchStarted = false;
- private ImmutableBytesWritable batchStartKey;
- private ImmutableBytesWritable batchHash;
- private long batchSize = 0;
-
-
- public ResultHasher() {
- try {
- digest = MessageDigest.getInstance("MD5");
- } catch (NoSuchAlgorithmException e) {
- Throwables.propagate(e);
- }
- }
-
- public void startBatch(ImmutableBytesWritable row) {
- if (batchStarted) {
- throw new RuntimeException("Cannot start new batch without finishing existing one.");
- }
- batchStarted = true;
- batchSize = 0;
- batchStartKey = row;
- batchHash = null;
- }
-
- public void hashResult(Result result) {
- if (!batchStarted) {
- throw new RuntimeException("Cannot add to batch that has not been started.");
- }
- for (Cell cell : result.rawCells()) {
- int rowLength = cell.getRowLength();
- int familyLength = cell.getFamilyLength();
- int qualifierLength = cell.getQualifierLength();
- int valueLength = cell.getValueLength();
- digest.update(cell.getRowArray(), cell.getRowOffset(), rowLength);
- digest.update(cell.getFamilyArray(), cell.getFamilyOffset(), familyLength);
- digest.update(cell.getQualifierArray(), cell.getQualifierOffset(), qualifierLength);
- long ts = cell.getTimestamp();
- for (int i = 8; i > 0; i--) {
- digest.update((byte) ts);
- ts >>>= 8;
- }
- digest.update(cell.getValueArray(), cell.getValueOffset(), valueLength);
-
- batchSize += rowLength + familyLength + qualifierLength + 8 + valueLength;
- }
- }
-
- public void finishBatch() {
- if (!batchStarted) {
- throw new RuntimeException("Cannot finish batch that has not started.");
- }
- batchStarted = false;
- batchHash = new ImmutableBytesWritable(digest.digest());
- }
-
- public boolean isBatchStarted() {
- return batchStarted;
- }
-
- public ImmutableBytesWritable getBatchStartKey() {
- return batchStartKey;
- }
-
- public ImmutableBytesWritable getBatchHash() {
- return batchHash;
- }
-
- public long getBatchSize() {
- return batchSize;
- }
- }
-
- public static class HashMapper
- extends TableMapper<ImmutableBytesWritable, ImmutableBytesWritable> {
-
- private ResultHasher hasher;
- private long targetBatchSize;
-
- private ImmutableBytesWritable currentRow;
-
- @Override
- protected void setup(Context context) throws IOException, InterruptedException {
- targetBatchSize = context.getConfiguration()
- .getLong(HASH_BATCH_SIZE_CONF_KEY, DEFAULT_BATCH_SIZE);
- hasher = new ResultHasher();
-
- TableSplit split = (TableSplit) context.getInputSplit();
- hasher.startBatch(new ImmutableBytesWritable(split.getStartRow()));
- }
-
- @Override
- protected void map(ImmutableBytesWritable key, Result value, Context context)
- throws IOException, InterruptedException {
-
- if (currentRow == null || !currentRow.equals(key)) {
- currentRow = new ImmutableBytesWritable(key); // not immutable
-
- if (hasher.getBatchSize() >= targetBatchSize) {
- hasher.finishBatch();
- context.write(hasher.getBatchStartKey(), hasher.getBatchHash());
- hasher.startBatch(currentRow);
- }
- }
-
- hasher.hashResult(value);
- }
-
- @Override
- protected void cleanup(Context context) throws IOException, InterruptedException {
- hasher.finishBatch();
- context.write(hasher.getBatchStartKey(), hasher.getBatchHash());
- }
- }
-
- private void writeTempManifestFile() throws IOException {
- Path tempManifestPath = new Path(destPath, TMP_MANIFEST_FILE_NAME);
- FileSystem fs = tempManifestPath.getFileSystem(getConf());
- tableHash.writePropertiesFile(fs, tempManifestPath);
- }
-
- private void completeManifest() throws IOException {
- Path tempManifestPath = new Path(destPath, TMP_MANIFEST_FILE_NAME);
- Path manifestPath = new Path(destPath, MANIFEST_FILE_NAME);
- FileSystem fs = tempManifestPath.getFileSystem(getConf());
- fs.rename(tempManifestPath, manifestPath);
- }
-
- private static final int NUM_ARGS = 2;
- private static void printUsage(final String errorMsg) {
- if (errorMsg != null && errorMsg.length() > 0) {
- System.err.println("ERROR: " + errorMsg);
- System.err.println();
- }
- System.err.println("Usage: HashTable [options] <tablename> <outputpath>");
- System.err.println();
- System.err.println("Options:");
- System.err.println(" batchsize the target amount of bytes to hash in each batch");
- System.err.println(" rows are added to the batch until this size is reached");
- System.err.println(" (defaults to " + DEFAULT_BATCH_SIZE + " bytes)");
- System.err.println(" numhashfiles the number of hash files to create");
- System.err.println(" if set to fewer than number of regions then");
- System.err.println(" the job will create this number of reducers");
- System.err.println(" (defaults to 1/100 of regions -- at least 1)");
- System.err.println(" startrow the start row");
- System.err.println(" stoprow the stop row");
- System.err.println(" starttime beginning of the time range (unixtime in millis)");
- System.err.println(" without endtime means from starttime to forever");
- System.err.println(" endtime end of the time range. Ignored if no starttime specified.");
- System.err.println(" scanbatch scanner batch size to support intra row scans");
- System.err.println(" versions number of cell versions to include");
- System.err.println(" families comma-separated list of families to include");
- System.err.println();
- System.err.println("Args:");
- System.err.println(" tablename Name of the table to hash");
- System.err.println(" outputpath Filesystem path to put the output data");
- System.err.println();
- System.err.println("Examples:");
- System.err.println(" To hash 'TestTable' in 32kB batches for a 1 hour window into 50 files:");
- System.err.println(" $ hbase " +
- "org.apache.hadoop.hbase.mapreduce.HashTable --batchsize=32000 --numhashfiles=50"
- + " --starttime=1265875194289 --endtime=1265878794289 --families=cf2,cf3"
- + " TestTable /hashes/testTable");
- }
-
- private boolean doCommandLine(final String[] args) {
- if (args.length < NUM_ARGS) {
- printUsage(null);
- return false;
- }
- try {
-
- tableHash.tableName = args[args.length-2];
- destPath = new Path(args[args.length-1]);
-
- for (int i = 0; i < args.length - NUM_ARGS; i++) {
- String cmd = args[i];
- if (cmd.equals("-h") || cmd.startsWith("--h")) {
- printUsage(null);
- return false;
- }
-
- final String batchSizeArgKey = "--batchsize=";
- if (cmd.startsWith(batchSizeArgKey)) {
- tableHash.batchSize = Long.parseLong(cmd.substring(batchSizeArgKey.length()));
- continue;
- }
-
- final String numHashFilesArgKey = "--numhashfiles=";
- if (cmd.startsWith(numHashFilesArgKey)) {
- tableHash.numHashFiles = Integer.parseInt(cmd.substring(numHashFilesArgKey.length()));
- continue;
- }
-
- final String startRowArgKey = "--startrow=";
- if (cmd.startsWith(startRowArgKey)) {
- tableHash.startRow = Bytes.fromHex(cmd.substring(startRowArgKey.length()));
- continue;
- }
-
- final String stopRowArgKey = "--stoprow=";
- if (cmd.startsWith(stopRowArgKey)) {
- tableHash.stopRow = Bytes.fromHex(cmd.substring(stopRowArgKey.length()));
- continue;
- }
-
- final String startTimeArgKey = "--starttime=";
- if (cmd.startsWith(startTimeArgKey)) {
- tableHash.startTime = Long.parseLong(cmd.substring(startTimeArgKey.length()));
- continue;
- }
-
- final String endTimeArgKey = "--endtime=";
- if (cmd.startsWith(endTimeArgKey)) {
- tableHash.endTime = Long.parseLong(cmd.substring(endTimeArgKey.length()));
- continue;
- }
-
- final String scanBatchArgKey = "--scanbatch=";
- if (cmd.startsWith(scanBatchArgKey)) {
- tableHash.scanBatch = Integer.parseInt(cmd.substring(scanBatchArgKey.length()));
- continue;
- }
-
- final String versionsArgKey = "--versions=";
- if (cmd.startsWith(versionsArgKey)) {
- tableHash.versions = Integer.parseInt(cmd.substring(versionsArgKey.length()));
- continue;
- }
-
- final String familiesArgKey = "--families=";
- if (cmd.startsWith(familiesArgKey)) {
- tableHash.families = cmd.substring(familiesArgKey.length());
- continue;
- }
-
- printUsage("Invalid argument '" + cmd + "'");
- return false;
- }
- if ((tableHash.startTime != 0 || tableHash.endTime != 0)
- && (tableHash.startTime >= tableHash.endTime)) {
- printUsage("Invalid time range filter: starttime="
- + tableHash.startTime + " >= endtime=" + tableHash.endTime);
- return false;
- }
-
- } catch (Exception e) {
- e.printStackTrace();
- printUsage("Can't start because " + e.getMessage());
- return false;
- }
- return true;
- }
-
- /**
- * Main entry point.
- */
- public static void main(String[] args) throws Exception {
- int ret = ToolRunner.run(new HashTable(HBaseConfiguration.create()), args);
- System.exit(ret);
- }
-
- @Override
- public int run(String[] args) throws Exception {
- String[] otherArgs = new GenericOptionsParser(getConf(), args).getRemainingArgs();
- if (!doCommandLine(otherArgs)) {
- return 1;
- }
-
- Job job = createSubmittableJob(otherArgs);
- writeTempManifestFile();
- if (!job.waitForCompletion(true)) {
- LOG.info("Map-reduce job failed!");
- return 1;
- }
- completeManifest();
- return 0;
- }
-
-}
http://git-wip-us.apache.org/repos/asf/hbase/blob/664b6be0/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/IdentityTableMapper.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/IdentityTableMapper.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/IdentityTableMapper.java
deleted file mode 100644
index 7103ef8..0000000
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/IdentityTableMapper.java
+++ /dev/null
@@ -1,67 +0,0 @@
-/**
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.hadoop.hbase.mapreduce;
-
-import java.io.IOException;
-
-import org.apache.hadoop.hbase.classification.InterfaceAudience;
-import org.apache.hadoop.hbase.client.Result;
-import org.apache.hadoop.hbase.client.Scan;
-import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
-import org.apache.hadoop.mapreduce.Job;
-
-/**
- * Pass the given key and record as-is to the reduce phase.
- */
-@InterfaceAudience.Public
-public class IdentityTableMapper
-extends TableMapper<ImmutableBytesWritable, Result> {
-
- /**
- * Use this before submitting a TableMap job. It will appropriately set up
- * the job.
- *
- * @param table The table name.
- * @param scan The scan with the columns to scan.
- * @param mapper The mapper class.
- * @param job The job configuration.
- * @throws IOException When setting up the job fails.
- */
- @SuppressWarnings("rawtypes")
- public static void initJob(String table, Scan scan,
- Class<? extends TableMapper> mapper, Job job) throws IOException {
- TableMapReduceUtil.initTableMapperJob(table, scan, mapper,
- ImmutableBytesWritable.class, Result.class, job);
- }
-
- /**
- * Pass the key, value to reduce.
- *
- * @param key The current key.
- * @param value The current value.
- * @param context The current context.
- * @throws IOException When writing the record fails.
- * @throws InterruptedException When the job is aborted.
- */
- public void map(ImmutableBytesWritable key, Result value, Context context)
- throws IOException, InterruptedException {
- context.write(key, value);
- }
-
-}
http://git-wip-us.apache.org/repos/asf/hbase/blob/664b6be0/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/IdentityTableReducer.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/IdentityTableReducer.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/IdentityTableReducer.java
deleted file mode 100644
index 5289f46..0000000
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/IdentityTableReducer.java
+++ /dev/null
@@ -1,79 +0,0 @@
-/**
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.hadoop.hbase.mapreduce;
-
-import java.io.IOException;
-
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
-import org.apache.hadoop.hbase.classification.InterfaceAudience;
-import org.apache.hadoop.hbase.client.Mutation;
-import org.apache.hadoop.io.Writable;
-
-/**
- * Convenience class that simply writes all values (which must be
- * {@link org.apache.hadoop.hbase.client.Put Put} or
- * {@link org.apache.hadoop.hbase.client.Delete Delete} instances)
- * passed to it out to the configured HBase table. This works in combination
- * with {@link TableOutputFormat} which actually does the writing to HBase.<p>
- *
- * Keys are passed along but ignored in TableOutputFormat. However, they can
- * be used to control how your values will be divided up amongst the specified
- * number of reducers. <p>
- *
- * You can also use the {@link TableMapReduceUtil} class to set up the two
- * classes in one step:
- * <blockquote><code>
- * TableMapReduceUtil.initTableReducerJob("table", IdentityTableReducer.class, job);
- * </code></blockquote>
- * This will also set the proper {@link TableOutputFormat} which is given the
- * <code>table</code> parameter. The
- * {@link org.apache.hadoop.hbase.client.Put Put} or
- * {@link org.apache.hadoop.hbase.client.Delete Delete} define the
- * row and columns implicitly.
- */
-@InterfaceAudience.Public
-public class IdentityTableReducer
-extends TableReducer<Writable, Mutation, Writable> {
-
- @SuppressWarnings("unused")
- private static final Log LOG = LogFactory.getLog(IdentityTableReducer.class);
-
- /**
- * Writes each given record, consisting of the row key and the given values,
- * to the configured {@link org.apache.hadoop.mapreduce.OutputFormat}.
- * It is emitting the row key and each {@link org.apache.hadoop.hbase.client.Put Put}
- * or {@link org.apache.hadoop.hbase.client.Delete Delete} as separate pairs.
- *
- * @param key The current row key.
- * @param values The {@link org.apache.hadoop.hbase.client.Put Put} or
- * {@link org.apache.hadoop.hbase.client.Delete Delete} list for the given
- * row.
- * @param context The context of the reduce.
- * @throws IOException When writing the record fails.
- * @throws InterruptedException When the job gets interrupted.
- */
- @Override
- public void reduce(Writable key, Iterable<Mutation> values, Context context)
- throws IOException, InterruptedException {
- for(Mutation putOrDelete : values) {
- context.write(key, putOrDelete);
- }
- }
-}