You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hbase.apache.org by jx...@apache.org on 2014/02/21 21:39:22 UTC
svn commit: r1570702 [1/2] - in /hbase/trunk/hbase-server/src:
main/java/org/apache/hadoop/hbase/mapreduce/
test/java/org/apache/hadoop/hbase/mapreduce/
Author: jxiang
Date: Fri Feb 21 20:39:21 2014
New Revision: 1570702
URL: http://svn.apache.org/r1570702
Log:
HBASE-10526 Using Cell instead of KeyValue in HFileOutputFormat
Added:
hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/HFileOutputFormat2.java (with props)
hbase/trunk/hbase-server/src/test/java/org/apache/hadoop/hbase/mapreduce/TestHFileOutputFormat2.java (with props)
Modified:
hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/HFileOutputFormat.java
Modified: hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/HFileOutputFormat.java
URL: http://svn.apache.org/viewvc/hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/HFileOutputFormat.java?rev=1570702&r1=1570701&r2=1570702&view=diff
==============================================================================
--- hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/HFileOutputFormat.java (original)
+++ hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/HFileOutputFormat.java Fri Feb 21 20:39:21 2014
@@ -18,52 +18,27 @@
*/
package org.apache.hadoop.hbase.mapreduce;
-import com.google.common.annotations.VisibleForTesting;
import java.io.IOException;
-import java.io.UnsupportedEncodingException;
-import java.net.URLDecoder;
-import java.net.URLEncoder;
-import java.util.ArrayList;
-import java.util.Collection;
import java.util.List;
import java.util.Map;
-import java.util.TreeMap;
-import java.util.TreeSet;
-import java.util.UUID;
+
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.classification.InterfaceStability;
import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.hbase.HColumnDescriptor;
-import org.apache.hadoop.hbase.HConstants;
-import org.apache.hadoop.hbase.HTableDescriptor;
import org.apache.hadoop.hbase.KeyValue;
import org.apache.hadoop.hbase.client.HTable;
-import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
-import org.apache.hadoop.hbase.io.compress.Compression;
import org.apache.hadoop.hbase.io.compress.Compression.Algorithm;
import org.apache.hadoop.hbase.io.encoding.DataBlockEncoding;
-import org.apache.hadoop.hbase.io.hfile.AbstractHFileWriter;
-import org.apache.hadoop.hbase.io.hfile.CacheConfig;
-import org.apache.hadoop.hbase.io.hfile.HFileContext;
-import org.apache.hadoop.hbase.io.hfile.HFileContextBuilder;
import org.apache.hadoop.hbase.regionserver.BloomType;
-import org.apache.hadoop.hbase.regionserver.HStore;
-import org.apache.hadoop.hbase.regionserver.StoreFile;
-import org.apache.hadoop.hbase.util.Bytes;
-import org.apache.hadoop.io.NullWritable;
-import org.apache.hadoop.io.SequenceFile;
-import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.RecordWriter;
import org.apache.hadoop.mapreduce.TaskAttemptContext;
-import org.apache.hadoop.mapreduce.lib.output.FileOutputCommitter;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
-import org.apache.hadoop.mapreduce.lib.partition.TotalOrderPartitioner;
+
+import com.google.common.annotations.VisibleForTesting;
/**
* Writes HFiles. Passed KeyValues must arrive in order.
@@ -74,251 +49,24 @@ import org.apache.hadoop.mapreduce.lib.p
* Using this class as part of a MapReduce job is best done
* using {@link #configureIncrementalLoad(Job, HTable)}.
* @see KeyValueSortReducer
+ * @deprecated use {@link HFileOutputFormat2} instead.
*/
+@Deprecated
@InterfaceAudience.Public
@InterfaceStability.Stable
public class HFileOutputFormat extends FileOutputFormat<ImmutableBytesWritable, KeyValue> {
static Log LOG = LogFactory.getLog(HFileOutputFormat.class);
- // The following constants are private since these are used by
- // HFileOutputFormat to internally transfer data between job setup and
- // reducer run using conf.
- // These should not be changed by the client.
- private static final String COMPRESSION_FAMILIES_CONF_KEY =
- "hbase.hfileoutputformat.families.compression";
- private static final String BLOOM_TYPE_FAMILIES_CONF_KEY =
- "hbase.hfileoutputformat.families.bloomtype";
- private static final String BLOCK_SIZE_FAMILIES_CONF_KEY =
- "hbase.mapreduce.hfileoutputformat.blocksize";
- private static final String DATABLOCK_ENCODING_FAMILIES_CONF_KEY =
- "hbase.mapreduce.hfileoutputformat.families.datablock.encoding";
-
// This constant is public since the client can modify this when setting
// up their conf object and thus refer to this symbol.
// It is present for backwards compatibility reasons. Use it only to
// override the auto-detection of datablock encoding.
public static final String DATABLOCK_ENCODING_OVERRIDE_CONF_KEY =
- "hbase.mapreduce.hfileoutputformat.datablock.encoding";
-
- public RecordWriter<ImmutableBytesWritable, KeyValue> getRecordWriter(final TaskAttemptContext context)
- throws IOException, InterruptedException {
- // Get the path of the temporary output file
- final Path outputPath = FileOutputFormat.getOutputPath(context);
- final Path outputdir = new FileOutputCommitter(outputPath, context).getWorkPath();
- final Configuration conf = context.getConfiguration();
- final FileSystem fs = outputdir.getFileSystem(conf);
- // These configs. are from hbase-*.xml
- final long maxsize = conf.getLong(HConstants.HREGION_MAX_FILESIZE,
- HConstants.DEFAULT_MAX_FILE_SIZE);
- // Invented config. Add to hbase-*.xml if other than default compression.
- final String defaultCompressionStr = conf.get("hfile.compression",
- Compression.Algorithm.NONE.getName());
- final Algorithm defaultCompression = AbstractHFileWriter
- .compressionByName(defaultCompressionStr);
- final boolean compactionExclude = conf.getBoolean(
- "hbase.mapreduce.hfileoutputformat.compaction.exclude", false);
-
- // create a map from column family to the compression algorithm
- final Map<byte[], Algorithm> compressionMap = createFamilyCompressionMap(conf);
- final Map<byte[], BloomType> bloomTypeMap = createFamilyBloomTypeMap(conf);
- final Map<byte[], Integer> blockSizeMap = createFamilyBlockSizeMap(conf);
-
- String dataBlockEncodingStr = conf.get(DATABLOCK_ENCODING_OVERRIDE_CONF_KEY);
- final Map<byte[], DataBlockEncoding> datablockEncodingMap
- = createFamilyDataBlockEncodingMap(conf);
- final DataBlockEncoding overriddenEncoding;
- if (dataBlockEncodingStr != null) {
- overriddenEncoding = DataBlockEncoding.valueOf(dataBlockEncodingStr);
- } else {
- overriddenEncoding = null;
- }
-
- return new RecordWriter<ImmutableBytesWritable, KeyValue>() {
- // Map of families to writers and how much has been output on the writer.
- private final Map<byte [], WriterLength> writers =
- new TreeMap<byte [], WriterLength>(Bytes.BYTES_COMPARATOR);
- private byte [] previousRow = HConstants.EMPTY_BYTE_ARRAY;
- private final byte [] now = Bytes.toBytes(System.currentTimeMillis());
- private boolean rollRequested = false;
-
- public void write(ImmutableBytesWritable row, KeyValue kv)
- throws IOException {
- // null input == user explicitly wants to flush
- if (row == null && kv == null) {
- rollWriters();
- return;
- }
-
- byte [] rowKey = kv.getRow();
- long length = kv.getLength();
- byte [] family = kv.getFamily();
- WriterLength wl = this.writers.get(family);
-
- // If this is a new column family, verify that the directory exists
- if (wl == null) {
- fs.mkdirs(new Path(outputdir, Bytes.toString(family)));
- }
-
- // If any of the HFiles for the column families has reached
- // maxsize, we need to roll all the writers
- if (wl != null && wl.written + length >= maxsize) {
- this.rollRequested = true;
- }
-
- // This can only happen once a row is finished though
- if (rollRequested && Bytes.compareTo(this.previousRow, rowKey) != 0) {
- rollWriters();
- }
-
- // create a new HLog writer, if necessary
- if (wl == null || wl.writer == null) {
- wl = getNewWriter(family, conf);
- }
-
- // we now have the proper HLog writer. full steam ahead
- kv.updateLatestStamp(this.now);
- wl.writer.append(kv);
- wl.written += length;
-
- // Copy the row so we know when a row transition.
- this.previousRow = rowKey;
- }
-
- private void rollWriters() throws IOException {
- for (WriterLength wl : this.writers.values()) {
- if (wl.writer != null) {
- LOG.info("Writer=" + wl.writer.getPath() +
- ((wl.written == 0)? "": ", wrote=" + wl.written));
- close(wl.writer);
- }
- wl.writer = null;
- wl.written = 0;
- }
- this.rollRequested = false;
- }
-
- /* Create a new StoreFile.Writer.
- * @param family
- * @return A WriterLength, containing a new StoreFile.Writer.
- * @throws IOException
- */
- private WriterLength getNewWriter(byte[] family, Configuration conf)
- throws IOException {
- WriterLength wl = new WriterLength();
- Path familydir = new Path(outputdir, Bytes.toString(family));
- Algorithm compression = compressionMap.get(family);
- compression = compression == null ? defaultCompression : compression;
- BloomType bloomType = bloomTypeMap.get(family);
- bloomType = bloomType == null ? BloomType.NONE : bloomType;
- Integer blockSize = blockSizeMap.get(family);
- blockSize = blockSize == null ? HConstants.DEFAULT_BLOCKSIZE : blockSize;
- DataBlockEncoding encoding = overriddenEncoding;
- encoding = encoding == null ? datablockEncodingMap.get(family) : encoding;
- encoding = encoding == null ? DataBlockEncoding.NONE : encoding;
- Configuration tempConf = new Configuration(conf);
- tempConf.setFloat(HConstants.HFILE_BLOCK_CACHE_SIZE_KEY, 0.0f);
- HFileContextBuilder contextBuilder = new HFileContextBuilder()
- .withCompression(compression)
- .withChecksumType(HStore.getChecksumType(conf))
- .withBytesPerCheckSum(HStore.getBytesPerChecksum(conf))
- .withBlockSize(blockSize);
- contextBuilder.withDataBlockEncoding(encoding);
- HFileContext hFileContext = contextBuilder.build();
-
- wl.writer = new StoreFile.WriterBuilder(conf, new CacheConfig(tempConf), fs)
- .withOutputDir(familydir).withBloomType(bloomType).withComparator(KeyValue.COMPARATOR)
- .withFileContext(hFileContext)
- .build();
-
- this.writers.put(family, wl);
- return wl;
- }
-
- private void close(final StoreFile.Writer w) throws IOException {
- if (w != null) {
- w.appendFileInfo(StoreFile.BULKLOAD_TIME_KEY,
- Bytes.toBytes(System.currentTimeMillis()));
- w.appendFileInfo(StoreFile.BULKLOAD_TASK_KEY,
- Bytes.toBytes(context.getTaskAttemptID().toString()));
- w.appendFileInfo(StoreFile.MAJOR_COMPACTION_KEY,
- Bytes.toBytes(true));
- w.appendFileInfo(StoreFile.EXCLUDE_FROM_MINOR_COMPACTION_KEY,
- Bytes.toBytes(compactionExclude));
- w.appendTrackedTimestampsToMetadata();
- w.close();
- }
- }
-
- public void close(TaskAttemptContext c)
- throws IOException, InterruptedException {
- for (WriterLength wl: this.writers.values()) {
- close(wl.writer);
- }
- }
- };
- }
-
- /*
- * Data structure to hold a Writer and amount of data written on it.
- */
- static class WriterLength {
- long written = 0;
- StoreFile.Writer writer = null;
- }
-
- /**
- * Return the start keys of all of the regions in this table,
- * as a list of ImmutableBytesWritable.
- */
- private static List<ImmutableBytesWritable> getRegionStartKeys(HTable table)
- throws IOException {
- byte[][] byteKeys = table.getStartKeys();
- ArrayList<ImmutableBytesWritable> ret =
- new ArrayList<ImmutableBytesWritable>(byteKeys.length);
- for (byte[] byteKey : byteKeys) {
- ret.add(new ImmutableBytesWritable(byteKey));
- }
- return ret;
- }
+ HFileOutputFormat2.DATABLOCK_ENCODING_OVERRIDE_CONF_KEY;
- /**
- * Write out a {@link SequenceFile} that can be read by
- * {@link TotalOrderPartitioner} that contains the split points in startKeys.
- */
- private static void writePartitions(Configuration conf, Path partitionsPath,
- List<ImmutableBytesWritable> startKeys) throws IOException {
- LOG.info("Writing partition information to " + partitionsPath);
- if (startKeys.isEmpty()) {
- throw new IllegalArgumentException("No regions passed");
- }
-
- // We're generating a list of split points, and we don't ever
- // have keys < the first region (which has an empty start key)
- // so we need to remove it. Otherwise we would end up with an
- // empty reducer with index 0
- TreeSet<ImmutableBytesWritable> sorted =
- new TreeSet<ImmutableBytesWritable>(startKeys);
-
- ImmutableBytesWritable first = sorted.first();
- if (!first.equals(HConstants.EMPTY_BYTE_ARRAY)) {
- throw new IllegalArgumentException(
- "First region of table should have empty start key. Instead has: "
- + Bytes.toStringBinary(first.get()));
- }
- sorted.remove(first);
-
- // Write the actual file
- FileSystem fs = partitionsPath.getFileSystem(conf);
- SequenceFile.Writer writer = SequenceFile.createWriter(fs,
- conf, partitionsPath, ImmutableBytesWritable.class, NullWritable.class);
-
- try {
- for (ImmutableBytesWritable startKey : sorted) {
- writer.append(startKey, NullWritable.get());
- }
- } finally {
- writer.close();
- }
+ public RecordWriter<ImmutableBytesWritable, KeyValue> getRecordWriter(
+ final TaskAttemptContext context) throws IOException, InterruptedException {
+ return HFileOutputFormat2.createRecordWriter(context);
}
/**
@@ -336,47 +84,8 @@ public class HFileOutputFormat extends F
* running this function.
*/
public static void configureIncrementalLoad(Job job, HTable table)
- throws IOException {
- Configuration conf = job.getConfiguration();
-
- job.setOutputKeyClass(ImmutableBytesWritable.class);
- job.setOutputValueClass(KeyValue.class);
- job.setOutputFormatClass(HFileOutputFormat.class);
-
- // Based on the configured map output class, set the correct reducer to properly
- // sort the incoming values.
- // TODO it would be nice to pick one or the other of these formats.
- if (KeyValue.class.equals(job.getMapOutputValueClass())) {
- job.setReducerClass(KeyValueSortReducer.class);
- } else if (Put.class.equals(job.getMapOutputValueClass())) {
- job.setReducerClass(PutSortReducer.class);
- } else if (Text.class.equals(job.getMapOutputValueClass())) {
- job.setReducerClass(TextSortReducer.class);
- } else {
- LOG.warn("Unknown map output value type:" + job.getMapOutputValueClass());
- }
-
- conf.setStrings("io.serializations", conf.get("io.serializations"),
- MutationSerialization.class.getName(), ResultSerialization.class.getName(),
- KeyValueSerialization.class.getName());
-
- // Use table's region boundaries for TOP split points.
- LOG.info("Looking up current regions for table " + Bytes.toString(table.getTableName()));
- List<ImmutableBytesWritable> startKeys = getRegionStartKeys(table);
- LOG.info("Configuring " + startKeys.size() + " reduce partitions " +
- "to match current region count");
- job.setNumReduceTasks(startKeys.size());
-
- configurePartitioner(job, startKeys);
- // Set compression algorithms based on column families
- configureCompression(table, conf);
- configureBloomType(table, conf);
- configureBlockSize(table, conf);
- configureDataBlockEncoding(table, conf);
-
- TableMapReduceUtil.addDependencyJars(job);
- TableMapReduceUtil.initCredentials(job);
- LOG.info("Incremental table " + Bytes.toString(table.getTableName()) + " output configured.");
+ throws IOException {
+ HFileOutputFormat2.configureIncrementalLoad(job, table, HFileOutputFormat.class);
}
/**
@@ -389,16 +98,7 @@ public class HFileOutputFormat extends F
@VisibleForTesting
static Map<byte[], Algorithm> createFamilyCompressionMap(Configuration
conf) {
- Map<byte[], String> stringMap = createFamilyConfValueMap(conf,
- COMPRESSION_FAMILIES_CONF_KEY);
- Map<byte[], Algorithm> compressionMap = new TreeMap<byte[],
- Algorithm>(Bytes.BYTES_COMPARATOR);
- for (Map.Entry<byte[], String> e : stringMap.entrySet()) {
- Algorithm algorithm = AbstractHFileWriter.compressionByName
- (e.getValue());
- compressionMap.put(e.getKey(), algorithm);
- }
- return compressionMap;
+ return HFileOutputFormat2.createFamilyCompressionMap(conf);
}
/**
@@ -410,15 +110,7 @@ public class HFileOutputFormat extends F
*/
@VisibleForTesting
static Map<byte[], BloomType> createFamilyBloomTypeMap(Configuration conf) {
- Map<byte[], String> stringMap = createFamilyConfValueMap(conf,
- BLOOM_TYPE_FAMILIES_CONF_KEY);
- Map<byte[], BloomType> bloomTypeMap = new TreeMap<byte[],
- BloomType>(Bytes.BYTES_COMPARATOR);
- for (Map.Entry<byte[], String> e : stringMap.entrySet()) {
- BloomType bloomType = BloomType.valueOf(e.getValue());
- bloomTypeMap.put(e.getKey(), bloomType);
- }
- return bloomTypeMap;
+ return HFileOutputFormat2.createFamilyBloomTypeMap(conf);
}
/**
@@ -430,15 +122,7 @@ public class HFileOutputFormat extends F
*/
@VisibleForTesting
static Map<byte[], Integer> createFamilyBlockSizeMap(Configuration conf) {
- Map<byte[], String> stringMap = createFamilyConfValueMap(conf,
- BLOCK_SIZE_FAMILIES_CONF_KEY);
- Map<byte[], Integer> blockSizeMap = new TreeMap<byte[],
- Integer>(Bytes.BYTES_COMPARATOR);
- for (Map.Entry<byte[], String> e : stringMap.entrySet()) {
- Integer blockSize = Integer.parseInt(e.getValue());
- blockSizeMap.put(e.getKey(), blockSize);
- }
- return blockSizeMap;
+ return HFileOutputFormat2.createFamilyBlockSizeMap(conf);
}
/**
@@ -452,41 +136,7 @@ public class HFileOutputFormat extends F
@VisibleForTesting
static Map<byte[], DataBlockEncoding> createFamilyDataBlockEncodingMap(
Configuration conf) {
- Map<byte[], String> stringMap = createFamilyConfValueMap(conf,
- DATABLOCK_ENCODING_FAMILIES_CONF_KEY);
- Map<byte[], DataBlockEncoding> encoderMap = new TreeMap<byte[],
- DataBlockEncoding>(Bytes.BYTES_COMPARATOR);
- for (Map.Entry<byte[], String> e : stringMap.entrySet()) {
- encoderMap.put(e.getKey(), DataBlockEncoding.valueOf((e.getValue())));
- }
- return encoderMap;
- }
-
-
- /**
- * Run inside the task to deserialize column family to given conf value map.
- *
- * @param conf to read the serialized values from
- * @param confName conf key to read from the configuration
- * @return a map of column family to the given configuration value
- */
- private static Map<byte[], String> createFamilyConfValueMap(Configuration conf, String confName) {
- Map<byte[], String> confValMap = new TreeMap<byte[], String>(Bytes.BYTES_COMPARATOR);
- String confVal = conf.get(confName, "");
- for (String familyConf : confVal.split("&")) {
- String[] familySplit = familyConf.split("=");
- if (familySplit.length != 2) {
- continue;
- }
- try {
- confValMap.put(URLDecoder.decode(familySplit[0], "UTF-8").getBytes(),
- URLDecoder.decode(familySplit[1], "UTF-8"));
- } catch (UnsupportedEncodingException e) {
- // will not happen with UTF-8 encoding
- throw new AssertionError(e);
- }
- }
- return confValMap;
+ return HFileOutputFormat2.createFamilyDataBlockEncodingMap(conf);
}
/**
@@ -495,17 +145,7 @@ public class HFileOutputFormat extends F
*/
static void configurePartitioner(Job job, List<ImmutableBytesWritable> splitPoints)
throws IOException {
-
- // create the partitions file
- FileSystem fs = FileSystem.get(job.getConfiguration());
- Path partitionsPath = new Path("/tmp", "partitions_" + UUID.randomUUID());
- fs.makeQualified(partitionsPath);
- fs.deleteOnExit(partitionsPath);
- writePartitions(job.getConfiguration(), partitionsPath, splitPoints);
-
- // configure job to use it
- job.setPartitionerClass(TotalOrderPartitioner.class);
- TotalOrderPartitioner.setPartitionFile(job.getConfiguration(), partitionsPath);
+ HFileOutputFormat2.configurePartitioner(job, splitPoints);
}
/**
@@ -521,24 +161,7 @@ public class HFileOutputFormat extends F
value="RCN_REDUNDANT_NULLCHECK_OF_NONNULL_VALUE")
@VisibleForTesting
static void configureCompression(HTable table, Configuration conf) throws IOException {
- StringBuilder compressionConfigValue = new StringBuilder();
- HTableDescriptor tableDescriptor = table.getTableDescriptor();
- if(tableDescriptor == null){
- // could happen with mock table instance
- return;
- }
- Collection<HColumnDescriptor> families = tableDescriptor.getFamilies();
- int i = 0;
- for (HColumnDescriptor familyDescriptor : families) {
- if (i++ > 0) {
- compressionConfigValue.append('&');
- }
- compressionConfigValue.append(URLEncoder.encode(familyDescriptor.getNameAsString(), "UTF-8"));
- compressionConfigValue.append('=');
- compressionConfigValue.append(URLEncoder.encode(familyDescriptor.getCompression().getName(), "UTF-8"));
- }
- // Get rid of the last ampersand
- conf.set(COMPRESSION_FAMILIES_CONF_KEY, compressionConfigValue.toString());
+ HFileOutputFormat2.configureCompression(table, conf);
}
/**
@@ -552,26 +175,7 @@ public class HFileOutputFormat extends F
*/
@VisibleForTesting
static void configureBlockSize(HTable table, Configuration conf) throws IOException {
- StringBuilder blockSizeConfigValue = new StringBuilder();
- HTableDescriptor tableDescriptor = table.getTableDescriptor();
- if (tableDescriptor == null) {
- // could happen with mock table instance
- return;
- }
- Collection<HColumnDescriptor> families = tableDescriptor.getFamilies();
- int i = 0;
- for (HColumnDescriptor familyDescriptor : families) {
- if (i++ > 0) {
- blockSizeConfigValue.append('&');
- }
- blockSizeConfigValue.append(URLEncoder.encode(
- familyDescriptor.getNameAsString(), "UTF-8"));
- blockSizeConfigValue.append('=');
- blockSizeConfigValue.append(URLEncoder.encode(
- String.valueOf(familyDescriptor.getBlocksize()), "UTF-8"));
- }
- // Get rid of the last ampersand
- conf.set(BLOCK_SIZE_FAMILIES_CONF_KEY, blockSizeConfigValue.toString());
+ HFileOutputFormat2.configureBlockSize(table, conf);
}
/**
@@ -585,27 +189,7 @@ public class HFileOutputFormat extends F
*/
@VisibleForTesting
static void configureBloomType(HTable table, Configuration conf) throws IOException {
- HTableDescriptor tableDescriptor = table.getTableDescriptor();
- if (tableDescriptor == null) {
- // could happen with mock table instance
- return;
- }
- StringBuilder bloomTypeConfigValue = new StringBuilder();
- Collection<HColumnDescriptor> families = tableDescriptor.getFamilies();
- int i = 0;
- for (HColumnDescriptor familyDescriptor : families) {
- if (i++ > 0) {
- bloomTypeConfigValue.append('&');
- }
- bloomTypeConfigValue.append(URLEncoder.encode(familyDescriptor.getNameAsString(), "UTF-8"));
- bloomTypeConfigValue.append('=');
- String bloomType = familyDescriptor.getBloomFilterType().toString();
- if (bloomType == null) {
- bloomType = HColumnDescriptor.DEFAULT_BLOOMFILTER;
- }
- bloomTypeConfigValue.append(URLEncoder.encode(bloomType, "UTF-8"));
- }
- conf.set(BLOOM_TYPE_FAMILIES_CONF_KEY, bloomTypeConfigValue.toString());
+ HFileOutputFormat2.configureBloomType(table, conf);
}
/**
@@ -620,29 +204,6 @@ public class HFileOutputFormat extends F
@VisibleForTesting
static void configureDataBlockEncoding(HTable table,
Configuration conf) throws IOException {
- HTableDescriptor tableDescriptor = table.getTableDescriptor();
- if (tableDescriptor == null) {
- // could happen with mock table instance
- return;
- }
- StringBuilder dataBlockEncodingConfigValue = new StringBuilder();
- Collection<HColumnDescriptor> families = tableDescriptor.getFamilies();
- int i = 0;
- for (HColumnDescriptor familyDescriptor : families) {
- if (i++ > 0) {
- dataBlockEncodingConfigValue.append('&');
- }
- dataBlockEncodingConfigValue.append(
- URLEncoder.encode(familyDescriptor.getNameAsString(), "UTF-8"));
- dataBlockEncodingConfigValue.append('=');
- DataBlockEncoding encoding = familyDescriptor.getDataBlockEncoding();
- if (encoding == null) {
- encoding = DataBlockEncoding.NONE;
- }
- dataBlockEncodingConfigValue.append(URLEncoder.encode(encoding.toString(),
- "UTF-8"));
- }
- conf.set(DATABLOCK_ENCODING_FAMILIES_CONF_KEY,
- dataBlockEncodingConfigValue.toString());
+ HFileOutputFormat2.configureDataBlockEncoding(table, conf);
}
}
Added: hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/HFileOutputFormat2.java
URL: http://svn.apache.org/viewvc/hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/HFileOutputFormat2.java?rev=1570702&view=auto
==============================================================================
--- hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/HFileOutputFormat2.java (added)
+++ hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/HFileOutputFormat2.java Fri Feb 21 20:39:21 2014
@@ -0,0 +1,677 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.mapreduce;
+
+import java.io.IOException;
+import java.io.UnsupportedEncodingException;
+import java.net.URLDecoder;
+import java.net.URLEncoder;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.List;
+import java.util.Map;
+import java.util.TreeMap;
+import java.util.TreeSet;
+import java.util.UUID;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.classification.InterfaceStability;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hbase.Cell;
+import org.apache.hadoop.hbase.CellUtil;
+import org.apache.hadoop.hbase.HColumnDescriptor;
+import org.apache.hadoop.hbase.HConstants;
+import org.apache.hadoop.hbase.HTableDescriptor;
+import org.apache.hadoop.hbase.KeyValue;
+import org.apache.hadoop.hbase.KeyValueUtil;
+import org.apache.hadoop.hbase.client.HTable;
+import org.apache.hadoop.hbase.client.Put;
+import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
+import org.apache.hadoop.hbase.io.compress.Compression;
+import org.apache.hadoop.hbase.io.compress.Compression.Algorithm;
+import org.apache.hadoop.hbase.io.encoding.DataBlockEncoding;
+import org.apache.hadoop.hbase.io.hfile.AbstractHFileWriter;
+import org.apache.hadoop.hbase.io.hfile.CacheConfig;
+import org.apache.hadoop.hbase.io.hfile.HFileContext;
+import org.apache.hadoop.hbase.io.hfile.HFileContextBuilder;
+import org.apache.hadoop.hbase.regionserver.BloomType;
+import org.apache.hadoop.hbase.regionserver.HStore;
+import org.apache.hadoop.hbase.regionserver.StoreFile;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.hadoop.io.NullWritable;
+import org.apache.hadoop.io.SequenceFile;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.mapreduce.Job;
+import org.apache.hadoop.mapreduce.OutputFormat;
+import org.apache.hadoop.mapreduce.RecordWriter;
+import org.apache.hadoop.mapreduce.TaskAttemptContext;
+import org.apache.hadoop.mapreduce.lib.output.FileOutputCommitter;
+import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
+import org.apache.hadoop.mapreduce.lib.partition.TotalOrderPartitioner;
+
+import com.google.common.annotations.VisibleForTesting;
+
+/**
+ * Writes HFiles. Passed Cells must arrive in order.
+ * Writes current time as the sequence id for the file. Sets the major compacted
+ * attribute on created hfiles. Calling write(null,null) will forcibly roll
+ * all HFiles being written.
+ * <p>
+ * Using this class as part of a MapReduce job is best done
+ * using {@link #configureIncrementalLoad(Job, HTable)}.
+ */
+@InterfaceAudience.Public
+@InterfaceStability.Evolving
+public class HFileOutputFormat2
+ extends FileOutputFormat<ImmutableBytesWritable, Cell> {
+ static Log LOG = LogFactory.getLog(HFileOutputFormat2.class);
+
+ // The following constants are private since these are used by
+ // HFileOutputFormat2 to internally transfer data between job setup and
+ // reducer run using conf.
+ // These should not be changed by the client.
+ private static final String COMPRESSION_FAMILIES_CONF_KEY =
+ "hbase.hfileoutputformat.families.compression";
+ private static final String BLOOM_TYPE_FAMILIES_CONF_KEY =
+ "hbase.hfileoutputformat.families.bloomtype";
+ private static final String BLOCK_SIZE_FAMILIES_CONF_KEY =
+ "hbase.mapreduce.hfileoutputformat.blocksize";
+ private static final String DATABLOCK_ENCODING_FAMILIES_CONF_KEY =
+ "hbase.mapreduce.hfileoutputformat.families.datablock.encoding";
+
+ // This constant is public since the client can modify this when setting
+ // up their conf object and thus refer to this symbol.
+ // It is present for backwards compatibility reasons. Use it only to
+ // override the auto-detection of datablock encoding.
+ public static final String DATABLOCK_ENCODING_OVERRIDE_CONF_KEY =
+ "hbase.mapreduce.hfileoutputformat.datablock.encoding";
+
+ public RecordWriter<ImmutableBytesWritable, Cell> getRecordWriter(
+ final TaskAttemptContext context) throws IOException, InterruptedException {
+ return createRecordWriter(context);
+ }
+
+ static <V extends Cell> RecordWriter<ImmutableBytesWritable, V>
+ createRecordWriter(final TaskAttemptContext context)
+ throws IOException, InterruptedException {
+
+ // Get the path of the temporary output file
+ final Path outputPath = FileOutputFormat.getOutputPath(context);
+ final Path outputdir = new FileOutputCommitter(outputPath, context).getWorkPath();
+ final Configuration conf = context.getConfiguration();
+ final FileSystem fs = outputdir.getFileSystem(conf);
+ // These configs. are from hbase-*.xml
+ final long maxsize = conf.getLong(HConstants.HREGION_MAX_FILESIZE,
+ HConstants.DEFAULT_MAX_FILE_SIZE);
+ // Invented config. Add to hbase-*.xml if other than default compression.
+ final String defaultCompressionStr = conf.get("hfile.compression",
+ Compression.Algorithm.NONE.getName());
+ final Algorithm defaultCompression = AbstractHFileWriter
+ .compressionByName(defaultCompressionStr);
+ final boolean compactionExclude = conf.getBoolean(
+ "hbase.mapreduce.hfileoutputformat.compaction.exclude", false);
+
+ // create a map from column family to the compression algorithm
+ final Map<byte[], Algorithm> compressionMap = createFamilyCompressionMap(conf);
+ final Map<byte[], BloomType> bloomTypeMap = createFamilyBloomTypeMap(conf);
+ final Map<byte[], Integer> blockSizeMap = createFamilyBlockSizeMap(conf);
+
+ String dataBlockEncodingStr = conf.get(DATABLOCK_ENCODING_OVERRIDE_CONF_KEY);
+ final Map<byte[], DataBlockEncoding> datablockEncodingMap
+ = createFamilyDataBlockEncodingMap(conf);
+ final DataBlockEncoding overriddenEncoding;
+ if (dataBlockEncodingStr != null) {
+ overriddenEncoding = DataBlockEncoding.valueOf(dataBlockEncodingStr);
+ } else {
+ overriddenEncoding = null;
+ }
+
+ return new RecordWriter<ImmutableBytesWritable, V>() {
+ // Map of families to writers and how much has been output on the writer.
+ private final Map<byte [], WriterLength> writers =
+ new TreeMap<byte [], WriterLength>(Bytes.BYTES_COMPARATOR);
+ private byte [] previousRow = HConstants.EMPTY_BYTE_ARRAY;
+ private final byte [] now = Bytes.toBytes(System.currentTimeMillis());
+ private boolean rollRequested = false;
+
+ public void write(ImmutableBytesWritable row, V cell)
+ throws IOException {
+ KeyValue kv = KeyValueUtil.ensureKeyValue(cell);
+
+ // null input == user explicitly wants to flush
+ if (row == null && kv == null) {
+ rollWriters();
+ return;
+ }
+
+ byte [] rowKey = CellUtil.cloneRow(kv);
+ long length = kv.getLength();
+ byte [] family = CellUtil.cloneFamily(kv);
+ WriterLength wl = this.writers.get(family);
+
+ // If this is a new column family, verify that the directory exists
+ if (wl == null) {
+ fs.mkdirs(new Path(outputdir, Bytes.toString(family)));
+ }
+
+ // If any of the HFiles for the column families has reached
+ // maxsize, we need to roll all the writers
+ if (wl != null && wl.written + length >= maxsize) {
+ this.rollRequested = true;
+ }
+
+ // This can only happen once a row is finished though
+ if (rollRequested && Bytes.compareTo(this.previousRow, rowKey) != 0) {
+ rollWriters();
+ }
+
+ // create a new HLog writer, if necessary
+ if (wl == null || wl.writer == null) {
+ wl = getNewWriter(family, conf);
+ }
+
+ // we now have the proper HLog writer. full steam ahead
+ kv.updateLatestStamp(this.now);
+ wl.writer.append(kv);
+ wl.written += length;
+
+ // Copy the row so we know when a row transition.
+ this.previousRow = rowKey;
+ }
+
+ private void rollWriters() throws IOException {
+ for (WriterLength wl : this.writers.values()) {
+ if (wl.writer != null) {
+ LOG.info("Writer=" + wl.writer.getPath() +
+ ((wl.written == 0)? "": ", wrote=" + wl.written));
+ close(wl.writer);
+ }
+ wl.writer = null;
+ wl.written = 0;
+ }
+ this.rollRequested = false;
+ }
+
+ /* Create a new StoreFile.Writer.
+ * @param family
+ * @return A WriterLength, containing a new StoreFile.Writer.
+ * @throws IOException
+ */
+ private WriterLength getNewWriter(byte[] family, Configuration conf)
+ throws IOException {
+ WriterLength wl = new WriterLength();
+ Path familydir = new Path(outputdir, Bytes.toString(family));
+ Algorithm compression = compressionMap.get(family);
+ compression = compression == null ? defaultCompression : compression;
+ BloomType bloomType = bloomTypeMap.get(family);
+ bloomType = bloomType == null ? BloomType.NONE : bloomType;
+ Integer blockSize = blockSizeMap.get(family);
+ blockSize = blockSize == null ? HConstants.DEFAULT_BLOCKSIZE : blockSize;
+ DataBlockEncoding encoding = overriddenEncoding;
+ encoding = encoding == null ? datablockEncodingMap.get(family) : encoding;
+ encoding = encoding == null ? DataBlockEncoding.NONE : encoding;
+ Configuration tempConf = new Configuration(conf);
+ tempConf.setFloat(HConstants.HFILE_BLOCK_CACHE_SIZE_KEY, 0.0f);
+ HFileContextBuilder contextBuilder = new HFileContextBuilder()
+ .withCompression(compression)
+ .withChecksumType(HStore.getChecksumType(conf))
+ .withBytesPerCheckSum(HStore.getBytesPerChecksum(conf))
+ .withBlockSize(blockSize);
+ contextBuilder.withDataBlockEncoding(encoding);
+ HFileContext hFileContext = contextBuilder.build();
+
+ wl.writer = new StoreFile.WriterBuilder(conf, new CacheConfig(tempConf), fs)
+ .withOutputDir(familydir).withBloomType(bloomType)
+ .withComparator(KeyValue.COMPARATOR)
+ .withFileContext(hFileContext).build();
+
+ this.writers.put(family, wl);
+ return wl;
+ }
+
+ private void close(final StoreFile.Writer w) throws IOException {
+ if (w != null) {
+ w.appendFileInfo(StoreFile.BULKLOAD_TIME_KEY,
+ Bytes.toBytes(System.currentTimeMillis()));
+ w.appendFileInfo(StoreFile.BULKLOAD_TASK_KEY,
+ Bytes.toBytes(context.getTaskAttemptID().toString()));
+ w.appendFileInfo(StoreFile.MAJOR_COMPACTION_KEY,
+ Bytes.toBytes(true));
+ w.appendFileInfo(StoreFile.EXCLUDE_FROM_MINOR_COMPACTION_KEY,
+ Bytes.toBytes(compactionExclude));
+ w.appendTrackedTimestampsToMetadata();
+ w.close();
+ }
+ }
+
+ public void close(TaskAttemptContext c)
+ throws IOException, InterruptedException {
+ for (WriterLength wl: this.writers.values()) {
+ close(wl.writer);
+ }
+ }
+ };
+ }
+
+ /*
+ * Data structure to hold a Writer and amount of data written on it.
+ */
+ static class WriterLength {
+ long written = 0;
+ StoreFile.Writer writer = null;
+ }
+
+ /**
+ * Return the start keys of all of the regions in this table,
+ * as a list of ImmutableBytesWritable.
+ */
+ private static List<ImmutableBytesWritable> getRegionStartKeys(HTable table)
+ throws IOException {
+ byte[][] byteKeys = table.getStartKeys();
+ ArrayList<ImmutableBytesWritable> ret =
+ new ArrayList<ImmutableBytesWritable>(byteKeys.length);
+ for (byte[] byteKey : byteKeys) {
+ ret.add(new ImmutableBytesWritable(byteKey));
+ }
+ return ret;
+ }
+
+ /**
+ * Write out a {@link SequenceFile} that can be read by
+ * {@link TotalOrderPartitioner} that contains the split points in startKeys.
+ */
+ @SuppressWarnings("deprecation")
+ private static void writePartitions(Configuration conf, Path partitionsPath,
+ List<ImmutableBytesWritable> startKeys) throws IOException {
+ LOG.info("Writing partition information to " + partitionsPath);
+ if (startKeys.isEmpty()) {
+ throw new IllegalArgumentException("No regions passed");
+ }
+
+ // We're generating a list of split points, and we don't ever
+ // have keys < the first region (which has an empty start key)
+ // so we need to remove it. Otherwise we would end up with an
+ // empty reducer with index 0
+ TreeSet<ImmutableBytesWritable> sorted =
+ new TreeSet<ImmutableBytesWritable>(startKeys);
+
+ ImmutableBytesWritable first = sorted.first();
+ if (!first.equals(HConstants.EMPTY_BYTE_ARRAY)) {
+ throw new IllegalArgumentException(
+ "First region of table should have empty start key. Instead has: "
+ + Bytes.toStringBinary(first.get()));
+ }
+ sorted.remove(first);
+
+ // Write the actual file
+ FileSystem fs = partitionsPath.getFileSystem(conf);
+ SequenceFile.Writer writer = SequenceFile.createWriter(
+ fs, conf, partitionsPath, ImmutableBytesWritable.class,
+ NullWritable.class);
+
+ try {
+ for (ImmutableBytesWritable startKey : sorted) {
+ writer.append(startKey, NullWritable.get());
+ }
+ } finally {
+ writer.close();
+ }
+ }
+
+ /**
+ * Configure a MapReduce Job to perform an incremental load into the given
+ * table. This
+ * <ul>
+ * <li>Inspects the table to configure a total order partitioner</li>
+ * <li>Uploads the partitions file to the cluster and adds it to the DistributedCache</li>
+ * <li>Sets the number of reduce tasks to match the current number of regions</li>
+ * <li>Sets the output key/value class to match HFileOutputFormat2's requirements</li>
+ * <li>Sets the reducer up to perform the appropriate sorting (either KeyValueSortReducer or
+ * PutSortReducer)</li>
+ * </ul>
+ * The user should be sure to set the map output value class to either KeyValue or Put before
+ * running this function.
+ */
+ public static void configureIncrementalLoad(Job job, HTable table)
+ throws IOException {
+ configureIncrementalLoad(job, table, HFileOutputFormat2.class);
+ }
+
+ static void configureIncrementalLoad(Job job, HTable table,
+ Class<? extends OutputFormat<?, ?>> cls) throws IOException {
+ Configuration conf = job.getConfiguration();
+
+ job.setOutputKeyClass(ImmutableBytesWritable.class);
+ job.setOutputValueClass(KeyValue.class);
+ job.setOutputFormatClass(cls);
+
+ // Based on the configured map output class, set the correct reducer to properly
+ // sort the incoming values.
+ // TODO it would be nice to pick one or the other of these formats.
+ if (KeyValue.class.equals(job.getMapOutputValueClass())) {
+ job.setReducerClass(KeyValueSortReducer.class);
+ } else if (Put.class.equals(job.getMapOutputValueClass())) {
+ job.setReducerClass(PutSortReducer.class);
+ } else if (Text.class.equals(job.getMapOutputValueClass())) {
+ job.setReducerClass(TextSortReducer.class);
+ } else {
+ LOG.warn("Unknown map output value type:" + job.getMapOutputValueClass());
+ }
+
+ conf.setStrings("io.serializations", conf.get("io.serializations"),
+ MutationSerialization.class.getName(), ResultSerialization.class.getName(),
+ KeyValueSerialization.class.getName());
+
+ // Use table's region boundaries for TOP split points.
+ LOG.info("Looking up current regions for table " + Bytes.toString(table.getTableName()));
+ List<ImmutableBytesWritable> startKeys = getRegionStartKeys(table);
+ LOG.info("Configuring " + startKeys.size() + " reduce partitions " +
+ "to match current region count");
+ job.setNumReduceTasks(startKeys.size());
+
+ configurePartitioner(job, startKeys);
+ // Set compression algorithms based on column families
+ configureCompression(table, conf);
+ configureBloomType(table, conf);
+ configureBlockSize(table, conf);
+ configureDataBlockEncoding(table, conf);
+
+ TableMapReduceUtil.addDependencyJars(job);
+ TableMapReduceUtil.initCredentials(job);
+ LOG.info("Incremental table " + Bytes.toString(table.getTableName())
+ + " output configured.");
+ }
+
+ /**
+ * Runs inside the task to deserialize column family to compression algorithm
+ * map from the configuration.
+ *
+ * @param conf to read the serialized values from
+ * @return a map from column family to the configured compression algorithm
+ */
+ @VisibleForTesting
+ static Map<byte[], Algorithm> createFamilyCompressionMap(Configuration
+ conf) {
+ Map<byte[], String> stringMap = createFamilyConfValueMap(conf,
+ COMPRESSION_FAMILIES_CONF_KEY);
+ Map<byte[], Algorithm> compressionMap = new TreeMap<byte[],
+ Algorithm>(Bytes.BYTES_COMPARATOR);
+ for (Map.Entry<byte[], String> e : stringMap.entrySet()) {
+ Algorithm algorithm = AbstractHFileWriter.compressionByName
+ (e.getValue());
+ compressionMap.put(e.getKey(), algorithm);
+ }
+ return compressionMap;
+ }
+
+ /**
+ * Runs inside the task to deserialize column family to bloom filter type
+ * map from the configuration.
+ *
+ * @param conf to read the serialized values from
+ * @return a map from column family to the the configured bloom filter type
+ */
+ @VisibleForTesting
+ static Map<byte[], BloomType> createFamilyBloomTypeMap(Configuration conf) {
+ Map<byte[], String> stringMap = createFamilyConfValueMap(conf,
+ BLOOM_TYPE_FAMILIES_CONF_KEY);
+ Map<byte[], BloomType> bloomTypeMap = new TreeMap<byte[],
+ BloomType>(Bytes.BYTES_COMPARATOR);
+ for (Map.Entry<byte[], String> e : stringMap.entrySet()) {
+ BloomType bloomType = BloomType.valueOf(e.getValue());
+ bloomTypeMap.put(e.getKey(), bloomType);
+ }
+ return bloomTypeMap;
+ }
+
+ /**
+ * Runs inside the task to deserialize column family to block size
+ * map from the configuration.
+ *
+ * @param conf to read the serialized values from
+ * @return a map from column family to the configured block size
+ */
+ @VisibleForTesting
+ static Map<byte[], Integer> createFamilyBlockSizeMap(Configuration conf) {
+ Map<byte[], String> stringMap = createFamilyConfValueMap(conf,
+ BLOCK_SIZE_FAMILIES_CONF_KEY);
+ Map<byte[], Integer> blockSizeMap = new TreeMap<byte[],
+ Integer>(Bytes.BYTES_COMPARATOR);
+ for (Map.Entry<byte[], String> e : stringMap.entrySet()) {
+ Integer blockSize = Integer.parseInt(e.getValue());
+ blockSizeMap.put(e.getKey(), blockSize);
+ }
+ return blockSizeMap;
+ }
+
+ /**
+ * Runs inside the task to deserialize column family to data block encoding
+ * type map from the configuration.
+ *
+ * @param conf to read the serialized values from
+ * @return a map from column family to HFileDataBlockEncoder for the
+ * configured data block type for the family
+ */
+ @VisibleForTesting
+ static Map<byte[], DataBlockEncoding> createFamilyDataBlockEncodingMap(
+ Configuration conf) {
+ Map<byte[], String> stringMap = createFamilyConfValueMap(conf,
+ DATABLOCK_ENCODING_FAMILIES_CONF_KEY);
+ Map<byte[], DataBlockEncoding> encoderMap = new TreeMap<byte[],
+ DataBlockEncoding>(Bytes.BYTES_COMPARATOR);
+ for (Map.Entry<byte[], String> e : stringMap.entrySet()) {
+ encoderMap.put(e.getKey(), DataBlockEncoding.valueOf((e.getValue())));
+ }
+ return encoderMap;
+ }
+
+
+ /**
+ * Run inside the task to deserialize column family to given conf value map.
+ *
+ * @param conf to read the serialized values from
+ * @param confName conf key to read from the configuration
+ * @return a map of column family to the given configuration value
+ */
+ private static Map<byte[], String> createFamilyConfValueMap(
+ Configuration conf, String confName) {
+ Map<byte[], String> confValMap = new TreeMap<byte[], String>(Bytes.BYTES_COMPARATOR);
+ String confVal = conf.get(confName, "");
+ for (String familyConf : confVal.split("&")) {
+ String[] familySplit = familyConf.split("=");
+ if (familySplit.length != 2) {
+ continue;
+ }
+ try {
+ confValMap.put(URLDecoder.decode(familySplit[0], "UTF-8").getBytes(),
+ URLDecoder.decode(familySplit[1], "UTF-8"));
+ } catch (UnsupportedEncodingException e) {
+ // will not happen with UTF-8 encoding
+ throw new AssertionError(e);
+ }
+ }
+ return confValMap;
+ }
+
+ /**
+ * Configure <code>job</code> with a TotalOrderPartitioner, partitioning against
+ * <code>splitPoints</code>. Cleans up the partitions file after job exists.
+ */
+ static void configurePartitioner(Job job, List<ImmutableBytesWritable> splitPoints)
+ throws IOException {
+
+ // create the partitions file
+ FileSystem fs = FileSystem.get(job.getConfiguration());
+ Path partitionsPath = new Path("/tmp", "partitions_" + UUID.randomUUID());
+ fs.makeQualified(partitionsPath);
+ fs.deleteOnExit(partitionsPath);
+ writePartitions(job.getConfiguration(), partitionsPath, splitPoints);
+
+ // configure job to use it
+ job.setPartitionerClass(TotalOrderPartitioner.class);
+ TotalOrderPartitioner.setPartitionFile(job.getConfiguration(), partitionsPath);
+ }
+
+ /**
+ * Serialize column family to compression algorithm map to configuration.
+ * Invoked while configuring the MR job for incremental load.
+ *
+ * @param table to read the properties from
+ * @param conf to persist serialized values into
+ * @throws IOException
+ * on failure to read column family descriptors
+ */
+ @edu.umd.cs.findbugs.annotations.SuppressWarnings(
+ value="RCN_REDUNDANT_NULLCHECK_OF_NONNULL_VALUE")
+ @VisibleForTesting
+ static void configureCompression(
+ HTable table, Configuration conf) throws IOException {
+ StringBuilder compressionConfigValue = new StringBuilder();
+ HTableDescriptor tableDescriptor = table.getTableDescriptor();
+ if(tableDescriptor == null){
+ // could happen with mock table instance
+ return;
+ }
+ Collection<HColumnDescriptor> families = tableDescriptor.getFamilies();
+ int i = 0;
+ for (HColumnDescriptor familyDescriptor : families) {
+ if (i++ > 0) {
+ compressionConfigValue.append('&');
+ }
+ compressionConfigValue.append(URLEncoder.encode(
+ familyDescriptor.getNameAsString(), "UTF-8"));
+ compressionConfigValue.append('=');
+ compressionConfigValue.append(URLEncoder.encode(
+ familyDescriptor.getCompression().getName(), "UTF-8"));
+ }
+ // Get rid of the last ampersand
+ conf.set(COMPRESSION_FAMILIES_CONF_KEY, compressionConfigValue.toString());
+ }
+
+ /**
+ * Serialize column family to block size map to configuration.
+ * Invoked while configuring the MR job for incremental load.
+ *
+ * @param table to read the properties from
+ * @param conf to persist serialized values into
+ * @throws IOException
+ * on failure to read column family descriptors
+ */
+ @VisibleForTesting
+ static void configureBlockSize(
+ HTable table, Configuration conf) throws IOException {
+ StringBuilder blockSizeConfigValue = new StringBuilder();
+ HTableDescriptor tableDescriptor = table.getTableDescriptor();
+ if (tableDescriptor == null) {
+ // could happen with mock table instance
+ return;
+ }
+ Collection<HColumnDescriptor> families = tableDescriptor.getFamilies();
+ int i = 0;
+ for (HColumnDescriptor familyDescriptor : families) {
+ if (i++ > 0) {
+ blockSizeConfigValue.append('&');
+ }
+ blockSizeConfigValue.append(URLEncoder.encode(
+ familyDescriptor.getNameAsString(), "UTF-8"));
+ blockSizeConfigValue.append('=');
+ blockSizeConfigValue.append(URLEncoder.encode(
+ String.valueOf(familyDescriptor.getBlocksize()), "UTF-8"));
+ }
+ // Get rid of the last ampersand
+ conf.set(BLOCK_SIZE_FAMILIES_CONF_KEY, blockSizeConfigValue.toString());
+ }
+
+ /**
+ * Serialize column family to bloom type map to configuration.
+ * Invoked while configuring the MR job for incremental load.
+ *
+ * @param table to read the properties from
+ * @param conf to persist serialized values into
+ * @throws IOException
+ * on failure to read column family descriptors
+ */
+ @VisibleForTesting
+ static void configureBloomType(
+ HTable table, Configuration conf) throws IOException {
+ HTableDescriptor tableDescriptor = table.getTableDescriptor();
+ if (tableDescriptor == null) {
+ // could happen with mock table instance
+ return;
+ }
+ StringBuilder bloomTypeConfigValue = new StringBuilder();
+ Collection<HColumnDescriptor> families = tableDescriptor.getFamilies();
+ int i = 0;
+ for (HColumnDescriptor familyDescriptor : families) {
+ if (i++ > 0) {
+ bloomTypeConfigValue.append('&');
+ }
+ bloomTypeConfigValue.append(URLEncoder.encode(
+ familyDescriptor.getNameAsString(), "UTF-8"));
+ bloomTypeConfigValue.append('=');
+ String bloomType = familyDescriptor.getBloomFilterType().toString();
+ if (bloomType == null) {
+ bloomType = HColumnDescriptor.DEFAULT_BLOOMFILTER;
+ }
+ bloomTypeConfigValue.append(URLEncoder.encode(bloomType, "UTF-8"));
+ }
+ conf.set(BLOOM_TYPE_FAMILIES_CONF_KEY, bloomTypeConfigValue.toString());
+ }
+
+ /**
+ * Serialize column family to data block encoding map to configuration.
+ * Invoked while configuring the MR job for incremental load.
+ *
+ * @param table to read the properties from
+ * @param conf to persist serialized values into
+ * @throws IOException
+ * on failure to read column family descriptors
+ */
+ @VisibleForTesting
+ static void configureDataBlockEncoding(HTable table,
+ Configuration conf) throws IOException {
+ HTableDescriptor tableDescriptor = table.getTableDescriptor();
+ if (tableDescriptor == null) {
+ // could happen with mock table instance
+ return;
+ }
+ StringBuilder dataBlockEncodingConfigValue = new StringBuilder();
+ Collection<HColumnDescriptor> families = tableDescriptor.getFamilies();
+ int i = 0;
+ for (HColumnDescriptor familyDescriptor : families) {
+ if (i++ > 0) {
+ dataBlockEncodingConfigValue.append('&');
+ }
+ dataBlockEncodingConfigValue.append(
+ URLEncoder.encode(familyDescriptor.getNameAsString(), "UTF-8"));
+ dataBlockEncodingConfigValue.append('=');
+ DataBlockEncoding encoding = familyDescriptor.getDataBlockEncoding();
+ if (encoding == null) {
+ encoding = DataBlockEncoding.NONE;
+ }
+ dataBlockEncodingConfigValue.append(URLEncoder.encode(encoding.toString(),
+ "UTF-8"));
+ }
+ conf.set(DATABLOCK_ENCODING_FAMILIES_CONF_KEY,
+ dataBlockEncodingConfigValue.toString());
+ }
+}
Propchange: hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/HFileOutputFormat2.java
------------------------------------------------------------------------------
svn:eol-style = native