You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hbase.apache.org by bu...@apache.org on 2017/08/27 05:33:23 UTC
[23/50] [abbrv] hbase git commit: HBASE-18640 Move mapreduce out of
hbase-server into separate module.
http://git-wip-us.apache.org/repos/asf/hbase/blob/664b6be0/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/Import.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/Import.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/Import.java
deleted file mode 100644
index b5bb2ec..0000000
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/Import.java
+++ /dev/null
@@ -1,780 +0,0 @@
-/**
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.hadoop.hbase.mapreduce;
-
-import java.io.ByteArrayInputStream;
-import java.io.DataInput;
-import java.io.DataInputStream;
-import java.io.DataOutput;
-import java.io.IOException;
-import java.lang.reflect.InvocationTargetException;
-import java.lang.reflect.Method;
-import java.util.ArrayList;
-import java.util.Collections;
-import java.util.List;
-import java.util.Locale;
-import java.util.Map;
-import java.util.TreeMap;
-import java.util.UUID;
-
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.conf.Configured;
-import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.hbase.Cell;
-import org.apache.hadoop.hbase.CellComparator;
-import org.apache.hadoop.hbase.CellUtil;
-import org.apache.hadoop.hbase.HBaseConfiguration;
-import org.apache.hadoop.hbase.KeyValue;
-import org.apache.hadoop.hbase.KeyValueUtil;
-import org.apache.hadoop.hbase.TableName;
-import org.apache.hadoop.hbase.ZooKeeperConnectionException;
-import org.apache.hadoop.hbase.classification.InterfaceAudience;
-import org.apache.hadoop.hbase.client.Admin;
-import org.apache.hadoop.hbase.client.Connection;
-import org.apache.hadoop.hbase.client.ConnectionFactory;
-import org.apache.hadoop.hbase.client.Delete;
-import org.apache.hadoop.hbase.client.Durability;
-import org.apache.hadoop.hbase.client.Mutation;
-import org.apache.hadoop.hbase.client.Put;
-import org.apache.hadoop.hbase.client.RegionLocator;
-import org.apache.hadoop.hbase.client.Result;
-import org.apache.hadoop.hbase.client.Table;
-import org.apache.hadoop.hbase.filter.Filter;
-import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
-import org.apache.hadoop.hbase.util.Bytes;
-import org.apache.hadoop.hbase.zookeeper.ZKClusterId;
-import org.apache.hadoop.hbase.zookeeper.ZooKeeperWatcher;
-import org.apache.hadoop.io.RawComparator;
-import org.apache.hadoop.io.WritableComparable;
-import org.apache.hadoop.io.WritableComparator;
-import org.apache.hadoop.mapreduce.Job;
-import org.apache.hadoop.mapreduce.Partitioner;
-import org.apache.hadoop.mapreduce.Reducer;
-import org.apache.hadoop.mapreduce.TaskCounter;
-import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
-import org.apache.hadoop.mapreduce.lib.input.SequenceFileInputFormat;
-import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
-import org.apache.hadoop.mapreduce.lib.partition.TotalOrderPartitioner;
-import org.apache.hadoop.util.Tool;
-import org.apache.hadoop.util.ToolRunner;
-import org.apache.zookeeper.KeeperException;
-
-
-/**
- * Import data written by {@link Export}.
- */
-@InterfaceAudience.Public
-public class Import extends Configured implements Tool {
- private static final Log LOG = LogFactory.getLog(Import.class);
- final static String NAME = "import";
- public final static String CF_RENAME_PROP = "HBASE_IMPORTER_RENAME_CFS";
- public final static String BULK_OUTPUT_CONF_KEY = "import.bulk.output";
- public final static String FILTER_CLASS_CONF_KEY = "import.filter.class";
- public final static String FILTER_ARGS_CONF_KEY = "import.filter.args";
- public final static String TABLE_NAME = "import.table.name";
- public final static String WAL_DURABILITY = "import.wal.durability";
- public final static String HAS_LARGE_RESULT= "import.bulk.hasLargeResult";
-
- private final static String JOB_NAME_CONF_KEY = "mapreduce.job.name";
-
- public static class KeyValueWritableComparablePartitioner
- extends Partitioner<KeyValueWritableComparable, KeyValue> {
- private static KeyValueWritableComparable[] START_KEYS = null;
- @Override
- public int getPartition(KeyValueWritableComparable key, KeyValue value,
- int numPartitions) {
- for (int i = 0; i < START_KEYS.length; ++i) {
- if (key.compareTo(START_KEYS[i]) <= 0) {
- return i;
- }
- }
- return START_KEYS.length;
- }
-
- }
-
- public static class KeyValueWritableComparable
- implements WritableComparable<KeyValueWritableComparable> {
-
- private KeyValue kv = null;
-
- static {
- // register this comparator
- WritableComparator.define(KeyValueWritableComparable.class,
- new KeyValueWritableComparator());
- }
-
- public KeyValueWritableComparable() {
- }
-
- public KeyValueWritableComparable(KeyValue kv) {
- this.kv = kv;
- }
-
- @Override
- public void write(DataOutput out) throws IOException {
- KeyValue.write(kv, out);
- }
-
- @Override
- public void readFields(DataInput in) throws IOException {
- kv = KeyValue.create(in);
- }
-
- @Override
- @edu.umd.cs.findbugs.annotations.SuppressWarnings(value="EQ_COMPARETO_USE_OBJECT_EQUALS",
- justification="This is wrong, yes, but we should be purging Writables, not fixing them")
- public int compareTo(KeyValueWritableComparable o) {
- return CellComparator.COMPARATOR.compare(this.kv, ((KeyValueWritableComparable)o).kv);
- }
-
- public static class KeyValueWritableComparator extends WritableComparator {
-
- @Override
- public int compare(byte[] b1, int s1, int l1, byte[] b2, int s2, int l2) {
- try {
- KeyValueWritableComparable kv1 = new KeyValueWritableComparable();
- kv1.readFields(new DataInputStream(new ByteArrayInputStream(b1, s1, l1)));
- KeyValueWritableComparable kv2 = new KeyValueWritableComparable();
- kv2.readFields(new DataInputStream(new ByteArrayInputStream(b2, s2, l2)));
- return compare(kv1, kv2);
- } catch (IOException e) {
- throw new RuntimeException(e);
- }
- }
-
- }
-
- }
-
- public static class KeyValueReducer
- extends
- Reducer<KeyValueWritableComparable, KeyValue, ImmutableBytesWritable, KeyValue> {
- protected void reduce(
- KeyValueWritableComparable row,
- Iterable<KeyValue> kvs,
- Reducer<KeyValueWritableComparable,
- KeyValue, ImmutableBytesWritable, KeyValue>.Context context)
- throws java.io.IOException, InterruptedException {
- int index = 0;
- for (KeyValue kv : kvs) {
- context.write(new ImmutableBytesWritable(kv.getRowArray()), kv);
- if (++index % 100 == 0)
- context.setStatus("Wrote " + index + " KeyValues, "
- + "and the rowkey whose is being wrote is " + Bytes.toString(kv.getRowArray()));
- }
- }
- }
-
- public static class KeyValueSortImporter
- extends TableMapper<KeyValueWritableComparable, KeyValue> {
- private Map<byte[], byte[]> cfRenameMap;
- private Filter filter;
- private static final Log LOG = LogFactory.getLog(KeyValueImporter.class);
-
- /**
- * @param row The current table row key.
- * @param value The columns.
- * @param context The current context.
- * @throws IOException When something is broken with the data.
- */
- @Override
- public void map(ImmutableBytesWritable row, Result value,
- Context context)
- throws IOException {
- try {
- if (LOG.isTraceEnabled()) {
- LOG.trace("Considering the row."
- + Bytes.toString(row.get(), row.getOffset(), row.getLength()));
- }
- if (filter == null
- || !filter.filterRowKey(CellUtil.createFirstOnRow(row.get(), row.getOffset(),
- (short) row.getLength()))) {
- for (Cell kv : value.rawCells()) {
- kv = filterKv(filter, kv);
- // skip if we filtered it out
- if (kv == null) continue;
- // TODO get rid of ensureKeyValue
- KeyValue ret = KeyValueUtil.ensureKeyValue(convertKv(kv, cfRenameMap));
- context.write(new KeyValueWritableComparable(ret.createKeyOnly(false)), ret);
- }
- }
- } catch (InterruptedException e) {
- e.printStackTrace();
- }
- }
-
- @Override
- public void setup(Context context) throws IOException {
- cfRenameMap = createCfRenameMap(context.getConfiguration());
- filter = instantiateFilter(context.getConfiguration());
- int reduceNum = context.getNumReduceTasks();
- Configuration conf = context.getConfiguration();
- TableName tableName = TableName.valueOf(context.getConfiguration().get(TABLE_NAME));
- try (Connection conn = ConnectionFactory.createConnection(conf);
- RegionLocator regionLocator = conn.getRegionLocator(tableName)) {
- byte[][] startKeys = regionLocator.getStartKeys();
- if (startKeys.length != reduceNum) {
- throw new IOException("Region split after job initialization");
- }
- KeyValueWritableComparable[] startKeyWraps =
- new KeyValueWritableComparable[startKeys.length - 1];
- for (int i = 1; i < startKeys.length; ++i) {
- startKeyWraps[i - 1] =
- new KeyValueWritableComparable(KeyValueUtil.createFirstOnRow(startKeys[i]));
- }
- KeyValueWritableComparablePartitioner.START_KEYS = startKeyWraps;
- }
- }
- }
-
- /**
- * A mapper that just writes out KeyValues.
- */
- @edu.umd.cs.findbugs.annotations.SuppressWarnings(value="EQ_COMPARETO_USE_OBJECT_EQUALS",
- justification="Writables are going away and this has been this way forever")
- public static class KeyValueImporter extends TableMapper<ImmutableBytesWritable, KeyValue> {
- private Map<byte[], byte[]> cfRenameMap;
- private Filter filter;
- private static final Log LOG = LogFactory.getLog(KeyValueImporter.class);
-
- /**
- * @param row The current table row key.
- * @param value The columns.
- * @param context The current context.
- * @throws IOException When something is broken with the data.
- */
- @Override
- public void map(ImmutableBytesWritable row, Result value,
- Context context)
- throws IOException {
- try {
- if (LOG.isTraceEnabled()) {
- LOG.trace("Considering the row."
- + Bytes.toString(row.get(), row.getOffset(), row.getLength()));
- }
- if (filter == null
- || !filter.filterRowKey(CellUtil.createFirstOnRow(row.get(), row.getOffset(),
- (short) row.getLength()))) {
- for (Cell kv : value.rawCells()) {
- kv = filterKv(filter, kv);
- // skip if we filtered it out
- if (kv == null) continue;
- // TODO get rid of ensureKeyValue
- context.write(row, KeyValueUtil.ensureKeyValue(convertKv(kv, cfRenameMap)));
- }
- }
- } catch (InterruptedException e) {
- e.printStackTrace();
- }
- }
-
- @Override
- public void setup(Context context) {
- cfRenameMap = createCfRenameMap(context.getConfiguration());
- filter = instantiateFilter(context.getConfiguration());
- }
- }
-
- /**
- * Write table content out to files in hdfs.
- */
- public static class Importer extends TableMapper<ImmutableBytesWritable, Mutation> {
- private Map<byte[], byte[]> cfRenameMap;
- private List<UUID> clusterIds;
- private Filter filter;
- private Durability durability;
-
- /**
- * @param row The current table row key.
- * @param value The columns.
- * @param context The current context.
- * @throws IOException When something is broken with the data.
- */
- @Override
- public void map(ImmutableBytesWritable row, Result value,
- Context context)
- throws IOException {
- try {
- writeResult(row, value, context);
- } catch (InterruptedException e) {
- e.printStackTrace();
- }
- }
-
- private void writeResult(ImmutableBytesWritable key, Result result, Context context)
- throws IOException, InterruptedException {
- Put put = null;
- Delete delete = null;
- if (LOG.isTraceEnabled()) {
- LOG.trace("Considering the row."
- + Bytes.toString(key.get(), key.getOffset(), key.getLength()));
- }
- if (filter == null
- || !filter.filterRowKey(CellUtil.createFirstOnRow(key.get(), key.getOffset(),
- (short) key.getLength()))) {
- processKV(key, result, context, put, delete);
- }
- }
-
- protected void processKV(ImmutableBytesWritable key, Result result, Context context, Put put,
- Delete delete) throws IOException, InterruptedException {
- for (Cell kv : result.rawCells()) {
- kv = filterKv(filter, kv);
- // skip if we filter it out
- if (kv == null) continue;
-
- kv = convertKv(kv, cfRenameMap);
- // Deletes and Puts are gathered and written when finished
- /*
- * If there are sequence of mutations and tombstones in an Export, and after Import the same
- * sequence should be restored as it is. If we combine all Delete tombstones into single
- * request then there is chance of ignoring few DeleteFamily tombstones, because if we
- * submit multiple DeleteFamily tombstones in single Delete request then we are maintaining
- * only newest in hbase table and ignoring other. Check - HBASE-12065
- */
- if (CellUtil.isDeleteFamily(kv)) {
- Delete deleteFamily = new Delete(key.get());
- deleteFamily.add(kv);
- if (durability != null) {
- deleteFamily.setDurability(durability);
- }
- deleteFamily.setClusterIds(clusterIds);
- context.write(key, deleteFamily);
- } else if (CellUtil.isDelete(kv)) {
- if (delete == null) {
- delete = new Delete(key.get());
- }
- delete.add(kv);
- } else {
- if (put == null) {
- put = new Put(key.get());
- }
- addPutToKv(put, kv);
- }
- }
- if (put != null) {
- if (durability != null) {
- put.setDurability(durability);
- }
- put.setClusterIds(clusterIds);
- context.write(key, put);
- }
- if (delete != null) {
- if (durability != null) {
- delete.setDurability(durability);
- }
- delete.setClusterIds(clusterIds);
- context.write(key, delete);
- }
- }
-
- protected void addPutToKv(Put put, Cell kv) throws IOException {
- put.add(kv);
- }
-
- @Override
- public void setup(Context context) {
- LOG.info("Setting up " + getClass() + " mapper.");
- Configuration conf = context.getConfiguration();
- cfRenameMap = createCfRenameMap(conf);
- filter = instantiateFilter(conf);
- String durabilityStr = conf.get(WAL_DURABILITY);
- if(durabilityStr != null){
- durability = Durability.valueOf(durabilityStr.toUpperCase(Locale.ROOT));
- LOG.info("setting WAL durability to " + durability);
- } else {
- LOG.info("setting WAL durability to default.");
- }
- // TODO: This is kind of ugly doing setup of ZKW just to read the clusterid.
- ZooKeeperWatcher zkw = null;
- Exception ex = null;
- try {
- zkw = new ZooKeeperWatcher(conf, context.getTaskAttemptID().toString(), null);
- clusterIds = Collections.singletonList(ZKClusterId.getUUIDForCluster(zkw));
- } catch (ZooKeeperConnectionException e) {
- ex = e;
- LOG.error("Problem connecting to ZooKeper during task setup", e);
- } catch (KeeperException e) {
- ex = e;
- LOG.error("Problem reading ZooKeeper data during task setup", e);
- } catch (IOException e) {
- ex = e;
- LOG.error("Problem setting up task", e);
- } finally {
- if (zkw != null) zkw.close();
- }
- if (clusterIds == null) {
- // exit early if setup fails
- throw new RuntimeException(ex);
- }
- }
- }
-
- /**
- * Create a {@link Filter} to apply to all incoming keys ({@link KeyValue KeyValues}) to
- * optionally not include in the job output
- * @param conf {@link Configuration} from which to load the filter
- * @return the filter to use for the task, or <tt>null</tt> if no filter to should be used
- * @throws IllegalArgumentException if the filter is misconfigured
- */
- public static Filter instantiateFilter(Configuration conf) {
- // get the filter, if it was configured
- Class<? extends Filter> filterClass = conf.getClass(FILTER_CLASS_CONF_KEY, null, Filter.class);
- if (filterClass == null) {
- LOG.debug("No configured filter class, accepting all keyvalues.");
- return null;
- }
- LOG.debug("Attempting to create filter:" + filterClass);
- String[] filterArgs = conf.getStrings(FILTER_ARGS_CONF_KEY);
- ArrayList<byte[]> quotedArgs = toQuotedByteArrays(filterArgs);
- try {
- Method m = filterClass.getMethod("createFilterFromArguments", ArrayList.class);
- return (Filter) m.invoke(null, quotedArgs);
- } catch (IllegalAccessException e) {
- LOG.error("Couldn't instantiate filter!", e);
- throw new RuntimeException(e);
- } catch (SecurityException e) {
- LOG.error("Couldn't instantiate filter!", e);
- throw new RuntimeException(e);
- } catch (NoSuchMethodException e) {
- LOG.error("Couldn't instantiate filter!", e);
- throw new RuntimeException(e);
- } catch (IllegalArgumentException e) {
- LOG.error("Couldn't instantiate filter!", e);
- throw new RuntimeException(e);
- } catch (InvocationTargetException e) {
- LOG.error("Couldn't instantiate filter!", e);
- throw new RuntimeException(e);
- }
- }
-
- private static ArrayList<byte[]> toQuotedByteArrays(String... stringArgs) {
- ArrayList<byte[]> quotedArgs = new ArrayList<>();
- for (String stringArg : stringArgs) {
- // all the filters' instantiation methods expected quoted args since they are coming from
- // the shell, so add them here, though it shouldn't really be needed :-/
- quotedArgs.add(Bytes.toBytes("'" + stringArg + "'"));
- }
- return quotedArgs;
- }
-
- /**
- * Attempt to filter out the keyvalue
- * @param kv {@link KeyValue} on which to apply the filter
- * @return <tt>null</tt> if the key should not be written, otherwise returns the original
- * {@link KeyValue}
- */
- public static Cell filterKv(Filter filter, Cell kv) throws IOException {
- // apply the filter and skip this kv if the filter doesn't apply
- if (filter != null) {
- Filter.ReturnCode code = filter.filterKeyValue(kv);
- if (LOG.isTraceEnabled()) {
- LOG.trace("Filter returned:" + code + " for the key value:" + kv);
- }
- // if its not an accept type, then skip this kv
- if (!(code.equals(Filter.ReturnCode.INCLUDE) || code
- .equals(Filter.ReturnCode.INCLUDE_AND_NEXT_COL))) {
- return null;
- }
- }
- return kv;
- }
-
- // helper: create a new KeyValue based on CF rename map
- private static Cell convertKv(Cell kv, Map<byte[], byte[]> cfRenameMap) {
- if(cfRenameMap != null) {
- // If there's a rename mapping for this CF, create a new KeyValue
- byte[] newCfName = cfRenameMap.get(CellUtil.cloneFamily(kv));
- if(newCfName != null) {
- kv = new KeyValue(kv.getRowArray(), // row buffer
- kv.getRowOffset(), // row offset
- kv.getRowLength(), // row length
- newCfName, // CF buffer
- 0, // CF offset
- newCfName.length, // CF length
- kv.getQualifierArray(), // qualifier buffer
- kv.getQualifierOffset(), // qualifier offset
- kv.getQualifierLength(), // qualifier length
- kv.getTimestamp(), // timestamp
- KeyValue.Type.codeToType(kv.getTypeByte()), // KV Type
- kv.getValueArray(), // value buffer
- kv.getValueOffset(), // value offset
- kv.getValueLength()); // value length
- }
- }
- return kv;
- }
-
- // helper: make a map from sourceCfName to destCfName by parsing a config key
- private static Map<byte[], byte[]> createCfRenameMap(Configuration conf) {
- Map<byte[], byte[]> cfRenameMap = null;
- String allMappingsPropVal = conf.get(CF_RENAME_PROP);
- if(allMappingsPropVal != null) {
- // The conf value format should be sourceCf1:destCf1,sourceCf2:destCf2,...
- String[] allMappings = allMappingsPropVal.split(",");
- for (String mapping: allMappings) {
- if(cfRenameMap == null) {
- cfRenameMap = new TreeMap<>(Bytes.BYTES_COMPARATOR);
- }
- String [] srcAndDest = mapping.split(":");
- if(srcAndDest.length != 2) {
- continue;
- }
- cfRenameMap.put(srcAndDest[0].getBytes(), srcAndDest[1].getBytes());
- }
- }
- return cfRenameMap;
- }
-
- /**
- * <p>Sets a configuration property with key {@link #CF_RENAME_PROP} in conf that tells
- * the mapper how to rename column families.
- *
- * <p>Alternately, instead of calling this function, you could set the configuration key
- * {@link #CF_RENAME_PROP} yourself. The value should look like
- * <pre>srcCf1:destCf1,srcCf2:destCf2,....</pre>. This would have the same effect on
- * the mapper behavior.
- *
- * @param conf the Configuration in which the {@link #CF_RENAME_PROP} key will be
- * set
- * @param renameMap a mapping from source CF names to destination CF names
- */
- static public void configureCfRenaming(Configuration conf,
- Map<String, String> renameMap) {
- StringBuilder sb = new StringBuilder();
- for(Map.Entry<String,String> entry: renameMap.entrySet()) {
- String sourceCf = entry.getKey();
- String destCf = entry.getValue();
-
- if(sourceCf.contains(":") || sourceCf.contains(",") ||
- destCf.contains(":") || destCf.contains(",")) {
- throw new IllegalArgumentException("Illegal character in CF names: "
- + sourceCf + ", " + destCf);
- }
-
- if(sb.length() != 0) {
- sb.append(",");
- }
- sb.append(sourceCf + ":" + destCf);
- }
- conf.set(CF_RENAME_PROP, sb.toString());
- }
-
- /**
- * Add a Filter to be instantiated on import
- * @param conf Configuration to update (will be passed to the job)
- * @param clazz {@link Filter} subclass to instantiate on the server.
- * @param filterArgs List of arguments to pass to the filter on instantiation
- */
- public static void addFilterAndArguments(Configuration conf, Class<? extends Filter> clazz,
- List<String> filterArgs) throws IOException {
- conf.set(Import.FILTER_CLASS_CONF_KEY, clazz.getName());
- conf.setStrings(Import.FILTER_ARGS_CONF_KEY, filterArgs.toArray(new String[filterArgs.size()]));
- }
-
- /**
- * Sets up the actual job.
- * @param conf The current configuration.
- * @param args The command line parameters.
- * @return The newly created job.
- * @throws IOException When setting up the job fails.
- */
- public static Job createSubmittableJob(Configuration conf, String[] args)
- throws IOException {
- TableName tableName = TableName.valueOf(args[0]);
- conf.set(TABLE_NAME, tableName.getNameAsString());
- Path inputDir = new Path(args[1]);
- Job job = Job.getInstance(conf, conf.get(JOB_NAME_CONF_KEY, NAME + "_" + tableName));
- job.setJarByClass(Importer.class);
- FileInputFormat.setInputPaths(job, inputDir);
- job.setInputFormatClass(SequenceFileInputFormat.class);
- String hfileOutPath = conf.get(BULK_OUTPUT_CONF_KEY);
-
- // make sure we get the filter in the jars
- try {
- Class<? extends Filter> filter = conf.getClass(FILTER_CLASS_CONF_KEY, null, Filter.class);
- if (filter != null) {
- TableMapReduceUtil.addDependencyJarsForClasses(conf, filter);
- }
- } catch (Exception e) {
- throw new IOException(e);
- }
-
- if (hfileOutPath != null && conf.getBoolean(HAS_LARGE_RESULT, false)) {
- LOG.info("Use Large Result!!");
- try (Connection conn = ConnectionFactory.createConnection(conf);
- Table table = conn.getTable(tableName);
- RegionLocator regionLocator = conn.getRegionLocator(tableName)) {
- HFileOutputFormat2.configureIncrementalLoad(job, table.getDescriptor(), regionLocator);
- job.setMapperClass(KeyValueSortImporter.class);
- job.setReducerClass(KeyValueReducer.class);
- Path outputDir = new Path(hfileOutPath);
- FileOutputFormat.setOutputPath(job, outputDir);
- job.setMapOutputKeyClass(KeyValueWritableComparable.class);
- job.setMapOutputValueClass(KeyValue.class);
- job.getConfiguration().setClass("mapreduce.job.output.key.comparator.class",
- KeyValueWritableComparable.KeyValueWritableComparator.class,
- RawComparator.class);
- Path partitionsPath =
- new Path(TotalOrderPartitioner.getPartitionFile(job.getConfiguration()));
- FileSystem fs = FileSystem.get(job.getConfiguration());
- fs.deleteOnExit(partitionsPath);
- job.setPartitionerClass(KeyValueWritableComparablePartitioner.class);
- job.setNumReduceTasks(regionLocator.getStartKeys().length);
- TableMapReduceUtil.addDependencyJarsForClasses(job.getConfiguration(),
- org.apache.hadoop.hbase.shaded.com.google.common.base.Preconditions.class);
- }
- } else if (hfileOutPath != null) {
- LOG.info("writing to hfiles for bulk load.");
- job.setMapperClass(KeyValueImporter.class);
- try (Connection conn = ConnectionFactory.createConnection(conf);
- Table table = conn.getTable(tableName);
- RegionLocator regionLocator = conn.getRegionLocator(tableName)){
- job.setReducerClass(KeyValueSortReducer.class);
- Path outputDir = new Path(hfileOutPath);
- FileOutputFormat.setOutputPath(job, outputDir);
- job.setMapOutputKeyClass(ImmutableBytesWritable.class);
- job.setMapOutputValueClass(KeyValue.class);
- HFileOutputFormat2.configureIncrementalLoad(job, table.getDescriptor(), regionLocator);
- TableMapReduceUtil.addDependencyJarsForClasses(job.getConfiguration(),
- org.apache.hadoop.hbase.shaded.com.google.common.base.Preconditions.class);
- }
- } else {
- LOG.info("writing directly to table from Mapper.");
- // No reducers. Just write straight to table. Call initTableReducerJob
- // because it sets up the TableOutputFormat.
- job.setMapperClass(Importer.class);
- TableMapReduceUtil.initTableReducerJob(tableName.getNameAsString(), null, job);
- job.setNumReduceTasks(0);
- }
- return job;
- }
-
- /*
- * @param errorMsg Error message. Can be null.
- */
- private static void usage(final String errorMsg) {
- if (errorMsg != null && errorMsg.length() > 0) {
- System.err.println("ERROR: " + errorMsg);
- }
- System.err.println("Usage: Import [options] <tablename> <inputdir>");
- System.err.println("By default Import will load data directly into HBase. To instead generate");
- System.err.println("HFiles of data to prepare for a bulk data load, pass the option:");
- System.err.println(" -D" + BULK_OUTPUT_CONF_KEY + "=/path/for/output");
- System.err.println("If there is a large result that includes too much KeyValue "
- + "whitch can occur OOME caused by the memery sort in reducer, pass the option:");
- System.err.println(" -D" + HAS_LARGE_RESULT + "=true");
- System.err
- .println(" To apply a generic org.apache.hadoop.hbase.filter.Filter to the input, use");
- System.err.println(" -D" + FILTER_CLASS_CONF_KEY + "=<name of filter class>");
- System.err.println(" -D" + FILTER_ARGS_CONF_KEY + "=<comma separated list of args for filter");
- System.err.println(" NOTE: The filter will be applied BEFORE doing key renames via the "
- + CF_RENAME_PROP + " property. Futher, filters will only use the"
- + " Filter#filterRowKey(byte[] buffer, int offset, int length) method to identify "
- + " whether the current row needs to be ignored completely for processing and "
- + " Filter#filterKeyValue(KeyValue) method to determine if the KeyValue should be added;"
- + " Filter.ReturnCode#INCLUDE and #INCLUDE_AND_NEXT_COL will be considered as including"
- + " the KeyValue.");
- System.err.println("To import data exported from HBase 0.94, use");
- System.err.println(" -Dhbase.import.version=0.94");
- System.err.println(" -D " + JOB_NAME_CONF_KEY
- + "=jobName - use the specified mapreduce job name for the import");
- System.err.println("For performance consider the following options:\n"
- + " -Dmapreduce.map.speculative=false\n"
- + " -Dmapreduce.reduce.speculative=false\n"
- + " -D" + WAL_DURABILITY + "=<Used while writing data to hbase."
- +" Allowed values are the supported durability values"
- +" like SKIP_WAL/ASYNC_WAL/SYNC_WAL/...>");
- }
-
- /**
- * If the durability is set to {@link Durability#SKIP_WAL} and the data is imported to hbase, we
- * need to flush all the regions of the table as the data is held in memory and is also not
- * present in the Write Ahead Log to replay in scenarios of a crash. This method flushes all the
- * regions of the table in the scenarios of import data to hbase with {@link Durability#SKIP_WAL}
- */
- public static void flushRegionsIfNecessary(Configuration conf) throws IOException,
- InterruptedException {
- String tableName = conf.get(TABLE_NAME);
- Admin hAdmin = null;
- Connection connection = null;
- String durability = conf.get(WAL_DURABILITY);
- // Need to flush if the data is written to hbase and skip wal is enabled.
- if (conf.get(BULK_OUTPUT_CONF_KEY) == null && durability != null
- && Durability.SKIP_WAL.name().equalsIgnoreCase(durability)) {
- LOG.info("Flushing all data that skipped the WAL.");
- try {
- connection = ConnectionFactory.createConnection(conf);
- hAdmin = connection.getAdmin();
- hAdmin.flush(TableName.valueOf(tableName));
- } finally {
- if (hAdmin != null) {
- hAdmin.close();
- }
- if (connection != null) {
- connection.close();
- }
- }
- }
- }
-
- @Override
- public int run(String[] args) throws Exception {
- if (args.length < 2) {
- usage("Wrong number of arguments: " + args.length);
- return -1;
- }
- String inputVersionString = System.getProperty(ResultSerialization.IMPORT_FORMAT_VER);
- if (inputVersionString != null) {
- getConf().set(ResultSerialization.IMPORT_FORMAT_VER, inputVersionString);
- }
- Job job = createSubmittableJob(getConf(), args);
- boolean isJobSuccessful = job.waitForCompletion(true);
- if(isJobSuccessful){
- // Flush all the regions of the table
- flushRegionsIfNecessary(getConf());
- }
- long inputRecords = job.getCounters().findCounter(TaskCounter.MAP_INPUT_RECORDS).getValue();
- long outputRecords = job.getCounters().findCounter(TaskCounter.MAP_OUTPUT_RECORDS).getValue();
- if (outputRecords < inputRecords) {
- System.err.println("Warning, not all records were imported (maybe filtered out).");
- if (outputRecords == 0) {
- System.err.println("If the data was exported from HBase 0.94 "+
- "consider using -Dhbase.import.version=0.94.");
- }
- }
-
- return (isJobSuccessful ? 0 : 1);
- }
-
- /**
- * Main entry point.
- * @param args The command line parameters.
- * @throws Exception When running the job fails.
- */
- public static void main(String[] args) throws Exception {
- int errCode = ToolRunner.run(HBaseConfiguration.create(), new Import(), args);
- System.exit(errCode);
- }
-
-}
http://git-wip-us.apache.org/repos/asf/hbase/blob/664b6be0/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/ImportTsv.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/ImportTsv.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/ImportTsv.java
deleted file mode 100644
index b64271e..0000000
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/ImportTsv.java
+++ /dev/null
@@ -1,793 +0,0 @@
-/**
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.hadoop.hbase.mapreduce;
-
-import static java.lang.String.format;
-
-import java.io.File;
-import java.io.IOException;
-import java.util.ArrayList;
-import java.util.HashSet;
-import java.util.Set;
-
-import org.apache.commons.lang.StringUtils;
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.conf.Configured;
-import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.hbase.HBaseConfiguration;
-import org.apache.hadoop.hbase.HColumnDescriptor;
-import org.apache.hadoop.hbase.HConstants;
-import org.apache.hadoop.hbase.HTableDescriptor;
-import org.apache.hadoop.hbase.TableName;
-import org.apache.hadoop.hbase.TableNotEnabledException;
-import org.apache.hadoop.hbase.TableNotFoundException;
-import org.apache.hadoop.hbase.classification.InterfaceAudience;
-import org.apache.hadoop.hbase.client.Admin;
-import org.apache.hadoop.hbase.client.Connection;
-import org.apache.hadoop.hbase.client.ConnectionFactory;
-import org.apache.hadoop.hbase.client.Put;
-import org.apache.hadoop.hbase.client.RegionLocator;
-import org.apache.hadoop.hbase.client.Table;
-import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
-import org.apache.hadoop.hbase.util.Base64;
-import org.apache.hadoop.hbase.util.Bytes;
-import org.apache.hadoop.hbase.util.Pair;
-import org.apache.hadoop.io.Text;
-import org.apache.hadoop.mapreduce.Job;
-import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
-import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
-import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
-import org.apache.hadoop.mapreduce.lib.output.NullOutputFormat;
-import org.apache.hadoop.security.Credentials;
-import org.apache.hadoop.util.Tool;
-import org.apache.hadoop.util.ToolRunner;
-
-import org.apache.hadoop.hbase.client.ColumnFamilyDescriptor;
-import org.apache.hadoop.hbase.client.TableDescriptor;
-import org.apache.hadoop.hbase.shaded.com.google.common.base.Preconditions;
-import org.apache.hadoop.hbase.shaded.com.google.common.base.Splitter;
-import org.apache.hadoop.hbase.shaded.com.google.common.collect.Lists;
-
-/**
- * Tool to import data from a TSV file.
- *
- * This tool is rather simplistic - it doesn't do any quoting or
- * escaping, but is useful for many data loads.
- *
- * @see ImportTsv#usage(String)
- */
-@InterfaceAudience.Public
-public class ImportTsv extends Configured implements Tool {
-
- protected static final Log LOG = LogFactory.getLog(ImportTsv.class);
-
- final static String NAME = "importtsv";
-
- public final static String MAPPER_CONF_KEY = "importtsv.mapper.class";
- public final static String BULK_OUTPUT_CONF_KEY = "importtsv.bulk.output";
- public final static String TIMESTAMP_CONF_KEY = "importtsv.timestamp";
- public final static String JOB_NAME_CONF_KEY = "mapreduce.job.name";
- // TODO: the rest of these configs are used exclusively by TsvImporterMapper.
- // Move them out of the tool and let the mapper handle its own validation.
- public final static String DRY_RUN_CONF_KEY = "importtsv.dry.run";
- // If true, bad lines are logged to stderr. Default: false.
- public final static String LOG_BAD_LINES_CONF_KEY = "importtsv.log.bad.lines";
- public final static String SKIP_LINES_CONF_KEY = "importtsv.skip.bad.lines";
- public final static String SKIP_EMPTY_COLUMNS = "importtsv.skip.empty.columns";
- public final static String COLUMNS_CONF_KEY = "importtsv.columns";
- public final static String SEPARATOR_CONF_KEY = "importtsv.separator";
- public final static String ATTRIBUTE_SEPERATOR_CONF_KEY = "attributes.seperator";
- //This config is used to propagate credentials from parent MR jobs which launch
- //ImportTSV jobs. SEE IntegrationTestImportTsv.
- public final static String CREDENTIALS_LOCATION = "credentials_location";
- final static String DEFAULT_SEPARATOR = "\t";
- final static String DEFAULT_ATTRIBUTES_SEPERATOR = "=>";
- final static String DEFAULT_MULTIPLE_ATTRIBUTES_SEPERATOR = ",";
- final static Class DEFAULT_MAPPER = TsvImporterMapper.class;
- public final static String CREATE_TABLE_CONF_KEY = "create.table";
- public final static String NO_STRICT_COL_FAMILY = "no.strict";
- /**
- * If table didn't exist and was created in dry-run mode, this flag is
- * flipped to delete it when MR ends.
- */
- private static boolean DRY_RUN_TABLE_CREATED;
-
- public static class TsvParser {
- /**
- * Column families and qualifiers mapped to the TSV columns
- */
- private final byte[][] families;
- private final byte[][] qualifiers;
-
- private final byte separatorByte;
-
- private int rowKeyColumnIndex;
-
- private int maxColumnCount;
-
- // Default value must be negative
- public static final int DEFAULT_TIMESTAMP_COLUMN_INDEX = -1;
-
- private int timestampKeyColumnIndex = DEFAULT_TIMESTAMP_COLUMN_INDEX;
-
- public static final String ROWKEY_COLUMN_SPEC = "HBASE_ROW_KEY";
-
- public static final String TIMESTAMPKEY_COLUMN_SPEC = "HBASE_TS_KEY";
-
- public static final String ATTRIBUTES_COLUMN_SPEC = "HBASE_ATTRIBUTES_KEY";
-
- public static final String CELL_VISIBILITY_COLUMN_SPEC = "HBASE_CELL_VISIBILITY";
-
- public static final String CELL_TTL_COLUMN_SPEC = "HBASE_CELL_TTL";
-
- private int attrKeyColumnIndex = DEFAULT_ATTRIBUTES_COLUMN_INDEX;
-
- public static final int DEFAULT_ATTRIBUTES_COLUMN_INDEX = -1;
-
- public static final int DEFAULT_CELL_VISIBILITY_COLUMN_INDEX = -1;
-
- public static final int DEFAULT_CELL_TTL_COLUMN_INDEX = -1;
-
- private int cellVisibilityColumnIndex = DEFAULT_CELL_VISIBILITY_COLUMN_INDEX;
-
- private int cellTTLColumnIndex = DEFAULT_CELL_TTL_COLUMN_INDEX;
-
- /**
- * @param columnsSpecification the list of columns to parser out, comma separated.
- * The row key should be the special token TsvParser.ROWKEY_COLUMN_SPEC
- * @param separatorStr
- */
- public TsvParser(String columnsSpecification, String separatorStr) {
- // Configure separator
- byte[] separator = Bytes.toBytes(separatorStr);
- Preconditions.checkArgument(separator.length == 1,
- "TsvParser only supports single-byte separators");
- separatorByte = separator[0];
-
- // Configure columns
- ArrayList<String> columnStrings = Lists.newArrayList(
- Splitter.on(',').trimResults().split(columnsSpecification));
-
- maxColumnCount = columnStrings.size();
- families = new byte[maxColumnCount][];
- qualifiers = new byte[maxColumnCount][];
-
- for (int i = 0; i < columnStrings.size(); i++) {
- String str = columnStrings.get(i);
- if (ROWKEY_COLUMN_SPEC.equals(str)) {
- rowKeyColumnIndex = i;
- continue;
- }
- if (TIMESTAMPKEY_COLUMN_SPEC.equals(str)) {
- timestampKeyColumnIndex = i;
- continue;
- }
- if (ATTRIBUTES_COLUMN_SPEC.equals(str)) {
- attrKeyColumnIndex = i;
- continue;
- }
- if (CELL_VISIBILITY_COLUMN_SPEC.equals(str)) {
- cellVisibilityColumnIndex = i;
- continue;
- }
- if (CELL_TTL_COLUMN_SPEC.equals(str)) {
- cellTTLColumnIndex = i;
- continue;
- }
- String[] parts = str.split(":", 2);
- if (parts.length == 1) {
- families[i] = str.getBytes();
- qualifiers[i] = HConstants.EMPTY_BYTE_ARRAY;
- } else {
- families[i] = parts[0].getBytes();
- qualifiers[i] = parts[1].getBytes();
- }
- }
- }
-
- public boolean hasTimestamp() {
- return timestampKeyColumnIndex != DEFAULT_TIMESTAMP_COLUMN_INDEX;
- }
-
- public int getTimestampKeyColumnIndex() {
- return timestampKeyColumnIndex;
- }
-
- public boolean hasAttributes() {
- return attrKeyColumnIndex != DEFAULT_ATTRIBUTES_COLUMN_INDEX;
- }
-
- public boolean hasCellVisibility() {
- return cellVisibilityColumnIndex != DEFAULT_CELL_VISIBILITY_COLUMN_INDEX;
- }
-
- public boolean hasCellTTL() {
- return cellTTLColumnIndex != DEFAULT_CELL_VISIBILITY_COLUMN_INDEX;
- }
-
- public int getAttributesKeyColumnIndex() {
- return attrKeyColumnIndex;
- }
-
- public int getCellVisibilityColumnIndex() {
- return cellVisibilityColumnIndex;
- }
-
- public int getCellTTLColumnIndex() {
- return cellTTLColumnIndex;
- }
-
- public int getRowKeyColumnIndex() {
- return rowKeyColumnIndex;
- }
-
- public byte[] getFamily(int idx) {
- return families[idx];
- }
- public byte[] getQualifier(int idx) {
- return qualifiers[idx];
- }
-
- public ParsedLine parse(byte[] lineBytes, int length)
- throws BadTsvLineException {
- // Enumerate separator offsets
- ArrayList<Integer> tabOffsets = new ArrayList<>(maxColumnCount);
- for (int i = 0; i < length; i++) {
- if (lineBytes[i] == separatorByte) {
- tabOffsets.add(i);
- }
- }
- if (tabOffsets.isEmpty()) {
- throw new BadTsvLineException("No delimiter");
- }
-
- tabOffsets.add(length);
-
- if (tabOffsets.size() > maxColumnCount) {
- throw new BadTsvLineException("Excessive columns");
- } else if (tabOffsets.size() <= getRowKeyColumnIndex()) {
- throw new BadTsvLineException("No row key");
- } else if (hasTimestamp()
- && tabOffsets.size() <= getTimestampKeyColumnIndex()) {
- throw new BadTsvLineException("No timestamp");
- } else if (hasAttributes() && tabOffsets.size() <= getAttributesKeyColumnIndex()) {
- throw new BadTsvLineException("No attributes specified");
- } else if (hasCellVisibility() && tabOffsets.size() <= getCellVisibilityColumnIndex()) {
- throw new BadTsvLineException("No cell visibility specified");
- } else if (hasCellTTL() && tabOffsets.size() <= getCellTTLColumnIndex()) {
- throw new BadTsvLineException("No cell TTL specified");
- }
- return new ParsedLine(tabOffsets, lineBytes);
- }
-
- class ParsedLine {
- private final ArrayList<Integer> tabOffsets;
- private byte[] lineBytes;
-
- ParsedLine(ArrayList<Integer> tabOffsets, byte[] lineBytes) {
- this.tabOffsets = tabOffsets;
- this.lineBytes = lineBytes;
- }
-
- public int getRowKeyOffset() {
- return getColumnOffset(rowKeyColumnIndex);
- }
- public int getRowKeyLength() {
- return getColumnLength(rowKeyColumnIndex);
- }
-
- public long getTimestamp(long ts) throws BadTsvLineException {
- // Return ts if HBASE_TS_KEY is not configured in column spec
- if (!hasTimestamp()) {
- return ts;
- }
-
- String timeStampStr = Bytes.toString(lineBytes,
- getColumnOffset(timestampKeyColumnIndex),
- getColumnLength(timestampKeyColumnIndex));
- try {
- return Long.parseLong(timeStampStr);
- } catch (NumberFormatException nfe) {
- // treat this record as bad record
- throw new BadTsvLineException("Invalid timestamp " + timeStampStr);
- }
- }
-
- private String getAttributes() {
- if (!hasAttributes()) {
- return null;
- } else {
- return Bytes.toString(lineBytes, getColumnOffset(attrKeyColumnIndex),
- getColumnLength(attrKeyColumnIndex));
- }
- }
-
- public String[] getIndividualAttributes() {
- String attributes = getAttributes();
- if (attributes != null) {
- return attributes.split(DEFAULT_MULTIPLE_ATTRIBUTES_SEPERATOR);
- } else {
- return null;
- }
- }
-
- public int getAttributeKeyOffset() {
- if (hasAttributes()) {
- return getColumnOffset(attrKeyColumnIndex);
- } else {
- return DEFAULT_ATTRIBUTES_COLUMN_INDEX;
- }
- }
-
- public int getAttributeKeyLength() {
- if (hasAttributes()) {
- return getColumnLength(attrKeyColumnIndex);
- } else {
- return DEFAULT_ATTRIBUTES_COLUMN_INDEX;
- }
- }
-
- public int getCellVisibilityColumnOffset() {
- if (hasCellVisibility()) {
- return getColumnOffset(cellVisibilityColumnIndex);
- } else {
- return DEFAULT_CELL_VISIBILITY_COLUMN_INDEX;
- }
- }
-
- public int getCellVisibilityColumnLength() {
- if (hasCellVisibility()) {
- return getColumnLength(cellVisibilityColumnIndex);
- } else {
- return DEFAULT_CELL_VISIBILITY_COLUMN_INDEX;
- }
- }
-
- public String getCellVisibility() {
- if (!hasCellVisibility()) {
- return null;
- } else {
- return Bytes.toString(lineBytes, getColumnOffset(cellVisibilityColumnIndex),
- getColumnLength(cellVisibilityColumnIndex));
- }
- }
-
- public int getCellTTLColumnOffset() {
- if (hasCellTTL()) {
- return getColumnOffset(cellTTLColumnIndex);
- } else {
- return DEFAULT_CELL_TTL_COLUMN_INDEX;
- }
- }
-
- public int getCellTTLColumnLength() {
- if (hasCellTTL()) {
- return getColumnLength(cellTTLColumnIndex);
- } else {
- return DEFAULT_CELL_TTL_COLUMN_INDEX;
- }
- }
-
- public long getCellTTL() {
- if (!hasCellTTL()) {
- return 0;
- } else {
- return Bytes.toLong(lineBytes, getColumnOffset(cellTTLColumnIndex),
- getColumnLength(cellTTLColumnIndex));
- }
- }
-
- public int getColumnOffset(int idx) {
- if (idx > 0)
- return tabOffsets.get(idx - 1) + 1;
- else
- return 0;
- }
- public int getColumnLength(int idx) {
- return tabOffsets.get(idx) - getColumnOffset(idx);
- }
- public int getColumnCount() {
- return tabOffsets.size();
- }
- public byte[] getLineBytes() {
- return lineBytes;
- }
- }
-
- public static class BadTsvLineException extends Exception {
- public BadTsvLineException(String err) {
- super(err);
- }
- private static final long serialVersionUID = 1L;
- }
-
- /**
- * Return starting position and length of row key from the specified line bytes.
- * @param lineBytes
- * @param length
- * @return Pair of row key offset and length.
- * @throws BadTsvLineException
- */
- public Pair<Integer, Integer> parseRowKey(byte[] lineBytes, int length)
- throws BadTsvLineException {
- int rkColumnIndex = 0;
- int startPos = 0, endPos = 0;
- for (int i = 0; i <= length; i++) {
- if (i == length || lineBytes[i] == separatorByte) {
- endPos = i - 1;
- if (rkColumnIndex++ == getRowKeyColumnIndex()) {
- if ((endPos + 1) == startPos) {
- throw new BadTsvLineException("Empty value for ROW KEY.");
- }
- break;
- } else {
- startPos = endPos + 2;
- }
- }
- if (i == length) {
- throw new BadTsvLineException(
- "Row key does not exist as number of columns in the line"
- + " are less than row key position.");
- }
- }
- return new Pair<>(startPos, endPos - startPos + 1);
- }
- }
-
- /**
- * Sets up the actual job.
- *
- * @param conf The current configuration.
- * @param args The command line parameters.
- * @return The newly created job.
- * @throws IOException When setting up the job fails.
- */
- protected static Job createSubmittableJob(Configuration conf, String[] args)
- throws IOException, ClassNotFoundException {
- Job job = null;
- boolean isDryRun = conf.getBoolean(DRY_RUN_CONF_KEY, false);
- try (Connection connection = ConnectionFactory.createConnection(conf)) {
- try (Admin admin = connection.getAdmin()) {
- // Support non-XML supported characters
- // by re-encoding the passed separator as a Base64 string.
- String actualSeparator = conf.get(SEPARATOR_CONF_KEY);
- if (actualSeparator != null) {
- conf.set(SEPARATOR_CONF_KEY,
- Base64.encodeBytes(actualSeparator.getBytes()));
- }
-
- // See if a non-default Mapper was set
- String mapperClassName = conf.get(MAPPER_CONF_KEY);
- Class mapperClass = mapperClassName != null? Class.forName(mapperClassName): DEFAULT_MAPPER;
-
- TableName tableName = TableName.valueOf(args[0]);
- Path inputDir = new Path(args[1]);
- String jobName = conf.get(JOB_NAME_CONF_KEY,NAME + "_" + tableName.getNameAsString());
- job = Job.getInstance(conf, jobName);
- job.setJarByClass(mapperClass);
- FileInputFormat.setInputPaths(job, inputDir);
- job.setInputFormatClass(TextInputFormat.class);
- job.setMapperClass(mapperClass);
- job.setMapOutputKeyClass(ImmutableBytesWritable.class);
- String hfileOutPath = conf.get(BULK_OUTPUT_CONF_KEY);
- String[] columns = conf.getStrings(COLUMNS_CONF_KEY);
- if(StringUtils.isNotEmpty(conf.get(CREDENTIALS_LOCATION))) {
- String fileLoc = conf.get(CREDENTIALS_LOCATION);
- Credentials cred = Credentials.readTokenStorageFile(new File(fileLoc), conf);
- job.getCredentials().addAll(cred);
- }
-
- if (hfileOutPath != null) {
- if (!admin.tableExists(tableName)) {
- LOG.warn(format("Table '%s' does not exist.", tableName));
- if ("yes".equalsIgnoreCase(conf.get(CREATE_TABLE_CONF_KEY, "yes"))) {
- // TODO: this is backwards. Instead of depending on the existence of a table,
- // create a sane splits file for HFileOutputFormat based on data sampling.
- createTable(admin, tableName, columns);
- if (isDryRun) {
- LOG.warn("Dry run: Table will be deleted at end of dry run.");
- synchronized (ImportTsv.class) {
- DRY_RUN_TABLE_CREATED = true;
- }
- }
- } else {
- String errorMsg =
- format("Table '%s' does not exist and '%s' is set to no.", tableName,
- CREATE_TABLE_CONF_KEY);
- LOG.error(errorMsg);
- throw new TableNotFoundException(errorMsg);
- }
- }
- try (Table table = connection.getTable(tableName);
- RegionLocator regionLocator = connection.getRegionLocator(tableName)) {
- boolean noStrict = conf.getBoolean(NO_STRICT_COL_FAMILY, false);
- // if no.strict is false then check column family
- if(!noStrict) {
- ArrayList<String> unmatchedFamilies = new ArrayList<>();
- Set<String> cfSet = getColumnFamilies(columns);
- TableDescriptor tDesc = table.getDescriptor();
- for (String cf : cfSet) {
- if(!tDesc.hasColumnFamily(Bytes.toBytes(cf))) {
- unmatchedFamilies.add(cf);
- }
- }
- if(unmatchedFamilies.size() > 0) {
- ArrayList<String> familyNames = new ArrayList<>();
- for (ColumnFamilyDescriptor family : table.getDescriptor().getColumnFamilies()) {
- familyNames.add(family.getNameAsString());
- }
- String msg =
- "Column Families " + unmatchedFamilies + " specified in " + COLUMNS_CONF_KEY
- + " does not match with any of the table " + tableName
- + " column families " + familyNames + ".\n"
- + "To disable column family check, use -D" + NO_STRICT_COL_FAMILY
- + "=true.\n";
- usage(msg);
- System.exit(-1);
- }
- }
- if (mapperClass.equals(TsvImporterTextMapper.class)) {
- job.setMapOutputValueClass(Text.class);
- job.setReducerClass(TextSortReducer.class);
- } else {
- job.setMapOutputValueClass(Put.class);
- job.setCombinerClass(PutCombiner.class);
- job.setReducerClass(PutSortReducer.class);
- }
- if (!isDryRun) {
- Path outputDir = new Path(hfileOutPath);
- FileOutputFormat.setOutputPath(job, outputDir);
- HFileOutputFormat2.configureIncrementalLoad(job, table.getDescriptor(),
- regionLocator);
- }
- }
- } else {
- if (!admin.tableExists(tableName)) {
- String errorMsg = format("Table '%s' does not exist.", tableName);
- LOG.error(errorMsg);
- throw new TableNotFoundException(errorMsg);
- }
- if (mapperClass.equals(TsvImporterTextMapper.class)) {
- usage(TsvImporterTextMapper.class.toString()
- + " should not be used for non bulkloading case. use "
- + TsvImporterMapper.class.toString()
- + " or custom mapper whose value type is Put.");
- System.exit(-1);
- }
- if (!isDryRun) {
- // No reducers. Just write straight to table. Call initTableReducerJob
- // to set up the TableOutputFormat.
- TableMapReduceUtil.initTableReducerJob(tableName.getNameAsString(), null, job);
- }
- job.setNumReduceTasks(0);
- }
- if (isDryRun) {
- job.setOutputFormatClass(NullOutputFormat.class);
- job.getConfiguration().setStrings("io.serializations",
- job.getConfiguration().get("io.serializations"),
- MutationSerialization.class.getName(), ResultSerialization.class.getName(),
- KeyValueSerialization.class.getName());
- }
- TableMapReduceUtil.addDependencyJars(job);
- TableMapReduceUtil.addDependencyJarsForClasses(job.getConfiguration(),
- org.apache.hadoop.hbase.shaded.com.google.common.base.Function.class /* Guava used by TsvParser */);
- }
- }
- return job;
- }
-
- private static void createTable(Admin admin, TableName tableName, String[] columns)
- throws IOException {
- HTableDescriptor htd = new HTableDescriptor(tableName);
- Set<String> cfSet = getColumnFamilies(columns);
- for (String cf : cfSet) {
- HColumnDescriptor hcd = new HColumnDescriptor(Bytes.toBytes(cf));
- htd.addFamily(hcd);
- }
- LOG.warn(format("Creating table '%s' with '%s' columns and default descriptors.",
- tableName, cfSet));
- admin.createTable(htd);
- }
-
- private static void deleteTable(Configuration conf, String[] args) {
- TableName tableName = TableName.valueOf(args[0]);
- try (Connection connection = ConnectionFactory.createConnection(conf);
- Admin admin = connection.getAdmin()) {
- try {
- admin.disableTable(tableName);
- } catch (TableNotEnabledException e) {
- LOG.debug("Dry mode: Table: " + tableName + " already disabled, so just deleting it.");
- }
- admin.deleteTable(tableName);
- } catch (IOException e) {
- LOG.error(format("***Dry run: Failed to delete table '%s'.***%n%s", tableName,
- e.toString()));
- return;
- }
- LOG.info(format("Dry run: Deleted table '%s'.", tableName));
- }
-
- private static Set<String> getColumnFamilies(String[] columns) {
- Set<String> cfSet = new HashSet<>();
- for (String aColumn : columns) {
- if (TsvParser.ROWKEY_COLUMN_SPEC.equals(aColumn)
- || TsvParser.TIMESTAMPKEY_COLUMN_SPEC.equals(aColumn)
- || TsvParser.CELL_VISIBILITY_COLUMN_SPEC.equals(aColumn)
- || TsvParser.CELL_TTL_COLUMN_SPEC.equals(aColumn)
- || TsvParser.ATTRIBUTES_COLUMN_SPEC.equals(aColumn))
- continue;
- // we are only concerned with the first one (in case this is a cf:cq)
- cfSet.add(aColumn.split(":", 2)[0]);
- }
- return cfSet;
- }
-
- /*
- * @param errorMsg Error message. Can be null.
- */
- private static void usage(final String errorMsg) {
- if (errorMsg != null && errorMsg.length() > 0) {
- System.err.println("ERROR: " + errorMsg);
- }
- String usage =
- "Usage: " + NAME + " -D"+ COLUMNS_CONF_KEY + "=a,b,c <tablename> <inputdir>\n" +
- "\n" +
- "Imports the given input directory of TSV data into the specified table.\n" +
- "\n" +
- "The column names of the TSV data must be specified using the -D" + COLUMNS_CONF_KEY + "\n" +
- "option. This option takes the form of comma-separated column names, where each\n" +
- "column name is either a simple column family, or a columnfamily:qualifier. The special\n" +
- "column name " + TsvParser.ROWKEY_COLUMN_SPEC + " is used to designate that this column should be used\n" +
- "as the row key for each imported record. You must specify exactly one column\n" +
- "to be the row key, and you must specify a column name for every column that exists in the\n" +
- "input data. Another special column" + TsvParser.TIMESTAMPKEY_COLUMN_SPEC +
- " designates that this column should be\n" +
- "used as timestamp for each record. Unlike " + TsvParser.ROWKEY_COLUMN_SPEC + ", " +
- TsvParser.TIMESTAMPKEY_COLUMN_SPEC + " is optional." + "\n" +
- "You must specify at most one column as timestamp key for each imported record.\n" +
- "Record with invalid timestamps (blank, non-numeric) will be treated as bad record.\n" +
- "Note: if you use this option, then '" + TIMESTAMP_CONF_KEY + "' option will be ignored.\n" +
- "\n" +
- "Other special columns that can be specified are " + TsvParser.CELL_TTL_COLUMN_SPEC +
- " and " + TsvParser.CELL_VISIBILITY_COLUMN_SPEC + ".\n" +
- TsvParser.CELL_TTL_COLUMN_SPEC + " designates that this column will be used " +
- "as a Cell's Time To Live (TTL) attribute.\n" +
- TsvParser.CELL_VISIBILITY_COLUMN_SPEC + " designates that this column contains the " +
- "visibility label expression.\n" +
- "\n" +
- TsvParser.ATTRIBUTES_COLUMN_SPEC+" can be used to specify Operation Attributes per record.\n"+
- " Should be specified as key=>value where "+TsvParser.DEFAULT_ATTRIBUTES_COLUMN_INDEX+ " is used \n"+
- " as the seperator. Note that more than one OperationAttributes can be specified.\n"+
- "By default importtsv will load data directly into HBase. To instead generate\n" +
- "HFiles of data to prepare for a bulk data load, pass the option:\n" +
- " -D" + BULK_OUTPUT_CONF_KEY + "=/path/for/output\n" +
- " Note: if you do not use this option, then the target table must already exist in HBase\n" +
- "\n" +
- "Other options that may be specified with -D include:\n" +
- " -D" + DRY_RUN_CONF_KEY + "=true - Dry run mode. Data is not actually populated into" +
- " table. If table does not exist, it is created but deleted in the end.\n" +
- " -D" + SKIP_LINES_CONF_KEY + "=false - fail if encountering an invalid line\n" +
- " -D" + LOG_BAD_LINES_CONF_KEY + "=true - logs invalid lines to stderr\n" +
- " -D" + SKIP_EMPTY_COLUMNS + "=false - If true then skip empty columns in bulk import\n" +
- " '-D" + SEPARATOR_CONF_KEY + "=|' - eg separate on pipes instead of tabs\n" +
- " -D" + TIMESTAMP_CONF_KEY + "=currentTimeAsLong - use the specified timestamp for the import\n" +
- " -D" + MAPPER_CONF_KEY + "=my.Mapper - A user-defined Mapper to use instead of " +
- DEFAULT_MAPPER.getName() + "\n" +
- " -D" + JOB_NAME_CONF_KEY + "=jobName - use the specified mapreduce job name for the import\n" +
- " -D" + CREATE_TABLE_CONF_KEY + "=no - can be used to avoid creation of table by this tool\n" +
- " Note: if you set this to 'no', then the target table must already exist in HBase\n" +
- " -D" + NO_STRICT_COL_FAMILY + "=true - ignore column family check in hbase table. " +
- "Default is false\n\n" +
- "For performance consider the following options:\n" +
- " -Dmapreduce.map.speculative=false\n" +
- " -Dmapreduce.reduce.speculative=false";
-
- System.err.println(usage);
- }
-
- @Override
- public int run(String[] args) throws Exception {
- if (args.length < 2) {
- usage("Wrong number of arguments: " + args.length);
- return -1;
- }
-
- // When MAPPER_CONF_KEY is null, the user wants to use the provided TsvImporterMapper, so
- // perform validation on these additional args. When it's not null, user has provided their
- // own mapper, thus these validation are not relevant.
- // TODO: validation for TsvImporterMapper, not this tool. Move elsewhere.
- if (null == getConf().get(MAPPER_CONF_KEY)) {
- // Make sure columns are specified
- String[] columns = getConf().getStrings(COLUMNS_CONF_KEY);
- if (columns == null) {
- usage("No columns specified. Please specify with -D" +
- COLUMNS_CONF_KEY+"=...");
- return -1;
- }
-
- // Make sure they specify exactly one column as the row key
- int rowkeysFound = 0;
- for (String col : columns) {
- if (col.equals(TsvParser.ROWKEY_COLUMN_SPEC)) rowkeysFound++;
- }
- if (rowkeysFound != 1) {
- usage("Must specify exactly one column as " + TsvParser.ROWKEY_COLUMN_SPEC);
- return -1;
- }
-
- // Make sure we have at most one column as the timestamp key
- int tskeysFound = 0;
- for (String col : columns) {
- if (col.equals(TsvParser.TIMESTAMPKEY_COLUMN_SPEC))
- tskeysFound++;
- }
- if (tskeysFound > 1) {
- usage("Must specify at most one column as "
- + TsvParser.TIMESTAMPKEY_COLUMN_SPEC);
- return -1;
- }
-
- int attrKeysFound = 0;
- for (String col : columns) {
- if (col.equals(TsvParser.ATTRIBUTES_COLUMN_SPEC))
- attrKeysFound++;
- }
- if (attrKeysFound > 1) {
- usage("Must specify at most one column as "
- + TsvParser.ATTRIBUTES_COLUMN_SPEC);
- return -1;
- }
-
- // Make sure one or more columns are specified excluding rowkey and
- // timestamp key
- if (columns.length - (rowkeysFound + tskeysFound + attrKeysFound) < 1) {
- usage("One or more columns in addition to the row key and timestamp(optional) are required");
- return -1;
- }
- }
-
- // If timestamp option is not specified, use current system time.
- long timstamp = getConf().getLong(TIMESTAMP_CONF_KEY, System.currentTimeMillis());
-
- // Set it back to replace invalid timestamp (non-numeric) with current
- // system time
- getConf().setLong(TIMESTAMP_CONF_KEY, timstamp);
-
- synchronized (ImportTsv.class) {
- DRY_RUN_TABLE_CREATED = false;
- }
- Job job = createSubmittableJob(getConf(), args);
- boolean success = job.waitForCompletion(true);
- boolean delete = false;
- synchronized (ImportTsv.class) {
- delete = DRY_RUN_TABLE_CREATED;
- }
- if (delete) {
- deleteTable(getConf(), args);
- }
- return success ? 0 : 1;
- }
-
- public static void main(String[] args) throws Exception {
- int status = ToolRunner.run(HBaseConfiguration.create(), new ImportTsv(), args);
- System.exit(status);
- }
-}
http://git-wip-us.apache.org/repos/asf/hbase/blob/664b6be0/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/JarFinder.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/JarFinder.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/JarFinder.java
deleted file mode 100644
index 953df62..0000000
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/JarFinder.java
+++ /dev/null
@@ -1,186 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.hadoop.hbase.mapreduce;
-
-import org.apache.hadoop.hbase.shaded.com.google.common.base.Preconditions;
-
-import java.io.BufferedOutputStream;
-import java.io.File;
-import java.io.FileInputStream;
-import java.io.FileOutputStream;
-import java.io.IOException;
-import java.io.InputStream;
-import java.net.URL;
-import java.net.URLDecoder;
-import java.text.MessageFormat;
-import java.util.Enumeration;
-import java.util.jar.JarFile;
-import java.util.jar.JarOutputStream;
-import java.util.jar.Manifest;
-import java.util.zip.ZipEntry;
-import java.util.zip.ZipOutputStream;
-
-/**
- * Finds the Jar for a class. If the class is in a directory in the
- * classpath, it creates a Jar on the fly with the contents of the directory
- * and returns the path to that Jar. If a Jar is created, it is created in
- * the system temporary directory.
- *
- * This file was forked from hadoop/common/branches/branch-2@1377176.
- */
-public class JarFinder {
-
- private static void copyToZipStream(File file, ZipEntry entry,
- ZipOutputStream zos) throws IOException {
- InputStream is = new FileInputStream(file);
- try {
- zos.putNextEntry(entry);
- byte[] arr = new byte[4096];
- int read = is.read(arr);
- while (read > -1) {
- zos.write(arr, 0, read);
- read = is.read(arr);
- }
- } finally {
- try {
- is.close();
- } finally {
- zos.closeEntry();
- }
- }
- }
-
- public static void jarDir(File dir, String relativePath, ZipOutputStream zos)
- throws IOException {
- Preconditions.checkNotNull(relativePath, "relativePath");
- Preconditions.checkNotNull(zos, "zos");
-
- // by JAR spec, if there is a manifest, it must be the first entry in the
- // ZIP.
- File manifestFile = new File(dir, JarFile.MANIFEST_NAME);
- ZipEntry manifestEntry = new ZipEntry(JarFile.MANIFEST_NAME);
- if (!manifestFile.exists()) {
- zos.putNextEntry(manifestEntry);
- new Manifest().write(new BufferedOutputStream(zos));
- zos.closeEntry();
- } else {
- copyToZipStream(manifestFile, manifestEntry, zos);
- }
- zos.closeEntry();
- zipDir(dir, relativePath, zos, true);
- zos.close();
- }
-
- private static void zipDir(File dir, String relativePath, ZipOutputStream zos,
- boolean start) throws IOException {
- String[] dirList = dir.list();
- if (dirList == null) {
- return;
- }
- for (String aDirList : dirList) {
- File f = new File(dir, aDirList);
- if (!f.isHidden()) {
- if (f.isDirectory()) {
- if (!start) {
- ZipEntry dirEntry = new ZipEntry(relativePath + f.getName() + "/");
- zos.putNextEntry(dirEntry);
- zos.closeEntry();
- }
- String filePath = f.getPath();
- File file = new File(filePath);
- zipDir(file, relativePath + f.getName() + "/", zos, false);
- }
- else {
- String path = relativePath + f.getName();
- if (!path.equals(JarFile.MANIFEST_NAME)) {
- ZipEntry anEntry = new ZipEntry(path);
- copyToZipStream(f, anEntry, zos);
- }
- }
- }
- }
- }
-
- private static void createJar(File dir, File jarFile) throws IOException {
- Preconditions.checkNotNull(dir, "dir");
- Preconditions.checkNotNull(jarFile, "jarFile");
- File jarDir = jarFile.getParentFile();
- if (!jarDir.exists()) {
- if (!jarDir.mkdirs()) {
- throw new IOException(MessageFormat.format("could not create dir [{0}]",
- jarDir));
- }
- }
- try (FileOutputStream fos = new FileOutputStream(jarFile);
- JarOutputStream jos = new JarOutputStream(fos)) {
- jarDir(dir, "", jos);
- }
- }
-
- /**
- * Returns the full path to the Jar containing the class. It always return a
- * JAR.
- *
- * @param klass class.
- *
- * @return path to the Jar containing the class.
- */
- public static String getJar(Class klass) {
- Preconditions.checkNotNull(klass, "klass");
- ClassLoader loader = klass.getClassLoader();
- if (loader != null) {
- String class_file = klass.getName().replaceAll("\\.", "/") + ".class";
- try {
- for (Enumeration itr = loader.getResources(class_file);
- itr.hasMoreElements(); ) {
- URL url = (URL) itr.nextElement();
- String path = url.getPath();
- if (path.startsWith("file:")) {
- path = path.substring("file:".length());
- }
- path = URLDecoder.decode(path, "UTF-8");
- if ("jar".equals(url.getProtocol())) {
- path = URLDecoder.decode(path, "UTF-8");
- return path.replaceAll("!.*$", "");
- }
- else if ("file".equals(url.getProtocol())) {
- String klassName = klass.getName();
- klassName = klassName.replace(".", "/") + ".class";
- path = path.substring(0, path.length() - klassName.length());
- File baseDir = new File(path);
- File testDir = new File(System.getProperty("test.build.dir", "target/test-dir"));
- testDir = testDir.getAbsoluteFile();
- if (!testDir.exists()) {
- testDir.mkdirs();
- }
- File tempJar = File.createTempFile("hadoop-", "", testDir);
- tempJar = new File(tempJar.getAbsolutePath() + ".jar");
- tempJar.deleteOnExit();
- createJar(baseDir, tempJar);
- return tempJar.getAbsolutePath();
- }
- }
- }
- catch (IOException e) {
- throw new RuntimeException(e);
- }
- }
- return null;
- }
-}
http://git-wip-us.apache.org/repos/asf/hbase/blob/664b6be0/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/KeyValueSerialization.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/KeyValueSerialization.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/KeyValueSerialization.java
deleted file mode 100644
index 241608b..0000000
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/KeyValueSerialization.java
+++ /dev/null
@@ -1,88 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.hadoop.hbase.mapreduce;
-
-import java.io.DataInputStream;
-import java.io.DataOutputStream;
-import java.io.IOException;
-import java.io.InputStream;
-import java.io.OutputStream;
-
-import org.apache.hadoop.hbase.KeyValue;
-import org.apache.hadoop.hbase.KeyValueUtil;
-import org.apache.hadoop.hbase.classification.InterfaceAudience;
-import org.apache.hadoop.io.serializer.Deserializer;
-import org.apache.hadoop.io.serializer.Serialization;
-import org.apache.hadoop.io.serializer.Serializer;
-
-@InterfaceAudience.Public
-public class KeyValueSerialization implements Serialization<KeyValue> {
- @Override
- public boolean accept(Class<?> c) {
- return KeyValue.class.isAssignableFrom(c);
- }
-
- @Override
- public KeyValueDeserializer getDeserializer(Class<KeyValue> t) {
- return new KeyValueDeserializer();
- }
-
- @Override
- public KeyValueSerializer getSerializer(Class<KeyValue> c) {
- return new KeyValueSerializer();
- }
-
- public static class KeyValueDeserializer implements Deserializer<KeyValue> {
- private DataInputStream dis;
-
- @Override
- public void close() throws IOException {
- this.dis.close();
- }
-
- @Override
- public KeyValue deserialize(KeyValue ignore) throws IOException {
- // I can't overwrite the passed in KV, not from a proto kv, not just yet. TODO
- return KeyValueUtil.create(this.dis);
- }
-
- @Override
- public void open(InputStream is) throws IOException {
- this.dis = new DataInputStream(is);
- }
- }
-
- public static class KeyValueSerializer implements Serializer<KeyValue> {
- private DataOutputStream dos;
-
- @Override
- public void close() throws IOException {
- this.dos.close();
- }
-
- @Override
- public void open(OutputStream os) throws IOException {
- this.dos = new DataOutputStream(os);
- }
-
- @Override
- public void serialize(KeyValue kv) throws IOException {
- KeyValueUtil.write(kv, this.dos);
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/hbase/blob/664b6be0/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/KeyValueSortReducer.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/KeyValueSortReducer.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/KeyValueSortReducer.java
deleted file mode 100644
index 5c7ace2..0000000
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/KeyValueSortReducer.java
+++ /dev/null
@@ -1,56 +0,0 @@
-/**
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.hadoop.hbase.mapreduce;
-
-import java.util.TreeSet;
-
-import org.apache.hadoop.hbase.classification.InterfaceAudience;
-import org.apache.hadoop.hbase.CellComparator;
-import org.apache.hadoop.hbase.KeyValue;
-import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
-import org.apache.hadoop.mapreduce.Reducer;
-
-/**
- * Emits sorted KeyValues.
- * Reads in all KeyValues from passed Iterator, sorts them, then emits
- * KeyValues in sorted order. If lots of columns per row, it will use lots of
- * memory sorting.
- * @see HFileOutputFormat2
- */
-@InterfaceAudience.Public
-public class KeyValueSortReducer extends Reducer<ImmutableBytesWritable, KeyValue, ImmutableBytesWritable, KeyValue> {
- protected void reduce(ImmutableBytesWritable row, java.lang.Iterable<KeyValue> kvs,
- org.apache.hadoop.mapreduce.Reducer<ImmutableBytesWritable, KeyValue, ImmutableBytesWritable, KeyValue>.Context context)
- throws java.io.IOException, InterruptedException {
- TreeSet<KeyValue> map = new TreeSet<>(CellComparator.COMPARATOR);
- for (KeyValue kv: kvs) {
- try {
- map.add(kv.clone());
- } catch (CloneNotSupportedException e) {
- throw new java.io.IOException(e);
- }
- }
- context.setStatus("Read " + map.getClass());
- int index = 0;
- for (KeyValue kv: map) {
- context.write(row, kv);
- if (++index % 100 == 0) context.setStatus("Wrote " + index);
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/hbase/blob/664b6be0/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/MultiTableHFileOutputFormat.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/MultiTableHFileOutputFormat.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/MultiTableHFileOutputFormat.java
deleted file mode 100644
index d7c7cc0..0000000
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/MultiTableHFileOutputFormat.java
+++ /dev/null
@@ -1,122 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- * <p>
- * http://www.apache.org/licenses/LICENSE-2.0
- * <p>
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.hadoop.hbase.mapreduce;
-
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
-import org.apache.hadoop.hbase.classification.InterfaceAudience;
-import org.apache.hadoop.hbase.client.TableDescriptor;
-import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
-import org.apache.hadoop.hbase.util.Bytes;
-import org.apache.hadoop.mapreduce.Job;
-
-import java.io.IOException;
-import java.nio.charset.Charset;
-import java.util.List;
-import org.apache.hadoop.hbase.shaded.com.google.common.annotations.VisibleForTesting;
-
-/**
- * Create 3 level tree directory, first level is using table name as parent
- * directory and then use family name as child directory, and all related HFiles
- * for one family are under child directory
- * -tableName1
- * -columnFamilyName1
- * -columnFamilyName2
- * -HFiles
- * -tableName2
- * -columnFamilyName1
- * -HFiles
- * -columnFamilyName2
- */
-@InterfaceAudience.Public
-@VisibleForTesting
-public class MultiTableHFileOutputFormat extends HFileOutputFormat2 {
- private static final Log LOG = LogFactory.getLog(MultiTableHFileOutputFormat.class);
-
- /**
- * Creates a composite key to use as a mapper output key when using
- * MultiTableHFileOutputFormat.configureIncrementaLoad to set up bulk ingest job
- *
- * @param tableName Name of the Table - Eg: TableName.getNameAsString()
- * @param suffix Usually represents a rowkey when creating a mapper key or column family
- * @return byte[] representation of composite key
- */
- public static byte[] createCompositeKey(byte[] tableName,
- byte[] suffix) {
- return combineTableNameSuffix(tableName, suffix);
- }
-
- /**
- * Alternate api which accepts an ImmutableBytesWritable for the suffix
- * @see MultiTableHFileOutputFormat#createCompositeKey(byte[], byte[])
- */
- public static byte[] createCompositeKey(byte[] tableName,
- ImmutableBytesWritable suffix) {
- return combineTableNameSuffix(tableName, suffix.get());
- }
-
- /**
- * Alternate api which accepts a String for the tableName and ImmutableBytesWritable for the
- * suffix
- * @see MultiTableHFileOutputFormat#createCompositeKey(byte[], byte[])
- */
- public static byte[] createCompositeKey(String tableName,
- ImmutableBytesWritable suffix) {
- return combineTableNameSuffix(tableName.getBytes(Charset.forName("UTF-8")), suffix.get());
- }
-
- /**
- * Analogous to
- * {@link HFileOutputFormat2#configureIncrementalLoad(Job, TableDescriptor, RegionLocator)},
- * this function will configure the requisite number of reducers to write HFiles for multple
- * tables simultaneously
- *
- * @param job See {@link org.apache.hadoop.mapreduce.Job}
- * @param multiTableDescriptors Table descriptor and region locator pairs
- * @throws IOException
- */
- public static void configureIncrementalLoad(Job job, List<TableInfo>
- multiTableDescriptors)
- throws IOException {
- MultiTableHFileOutputFormat.configureIncrementalLoad(job, multiTableDescriptors,
- MultiTableHFileOutputFormat.class);
- }
-
- final private static int validateCompositeKey(byte[] keyBytes) {
-
- int separatorIdx = Bytes.indexOf(keyBytes, HFileOutputFormat2.tableSeparator);
-
- // Either the separator was not found or a tablename wasn't present or a key wasn't present
- if (separatorIdx == -1) {
- throw new IllegalArgumentException("Invalid format for composite key [" + Bytes
- .toStringBinary(keyBytes) + "]. Cannot extract tablename and suffix from key");
- }
- return separatorIdx;
- }
-
- protected static byte[] getTableName(byte[] keyBytes) {
- int separatorIdx = validateCompositeKey(keyBytes);
- return Bytes.copy(keyBytes, 0, separatorIdx);
- }
-
- protected static byte[] getSuffix(byte[] keyBytes) {
- int separatorIdx = validateCompositeKey(keyBytes);
- return Bytes.copy(keyBytes, separatorIdx+1, keyBytes.length - separatorIdx - 1);
- }
-}
http://git-wip-us.apache.org/repos/asf/hbase/blob/664b6be0/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/MultiTableInputFormat.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/MultiTableInputFormat.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/MultiTableInputFormat.java
deleted file mode 100644
index a8e6837..0000000
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/MultiTableInputFormat.java
+++ /dev/null
@@ -1,104 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.hadoop.hbase.mapreduce;
-
-import java.io.IOException;
-import java.util.ArrayList;
-import java.util.List;
-
-import org.apache.hadoop.hbase.classification.InterfaceAudience;
-import org.apache.hadoop.conf.Configurable;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.hbase.client.Scan;
-
-/**
- * Convert HBase tabular data from multiple scanners into a format that
- * is consumable by Map/Reduce.
- *
- * <p>
- * Usage example
- * </p>
- *
- * <pre>
- * List<Scan> scans = new ArrayList<Scan>();
- *
- * Scan scan1 = new Scan();
- * scan1.setStartRow(firstRow1);
- * scan1.setStopRow(lastRow1);
- * scan1.setAttribute(Scan.SCAN_ATTRIBUTES_TABLE_NAME, table1);
- * scans.add(scan1);
- *
- * Scan scan2 = new Scan();
- * scan2.setStartRow(firstRow2);
- * scan2.setStopRow(lastRow2);
- * scan1.setAttribute(Scan.SCAN_ATTRIBUTES_TABLE_NAME, table2);
- * scans.add(scan2);
- *
- * TableMapReduceUtil.initTableMapperJob(scans, TableMapper.class, Text.class,
- * IntWritable.class, job);
- * </pre>
- */
-@InterfaceAudience.Public
-public class MultiTableInputFormat extends MultiTableInputFormatBase implements
- Configurable {
-
- /** Job parameter that specifies the scan list. */
- public static final String SCANS = "hbase.mapreduce.scans";
-
- /** The configuration. */
- private Configuration conf = null;
-
- /**
- * Returns the current configuration.
- *
- * @return The current configuration.
- * @see org.apache.hadoop.conf.Configurable#getConf()
- */
- @Override
- public Configuration getConf() {
- return conf;
- }
-
- /**
- * Sets the configuration. This is used to set the details for the tables to
- * be scanned.
- *
- * @param configuration The configuration to set.
- * @see org.apache.hadoop.conf.Configurable#setConf(
- * org.apache.hadoop.conf.Configuration)
- */
- @Override
- public void setConf(Configuration configuration) {
- this.conf = configuration;
- String[] rawScans = conf.getStrings(SCANS);
- if (rawScans.length <= 0) {
- throw new IllegalArgumentException("There must be at least 1 scan configuration set to : "
- + SCANS);
- }
- List<Scan> scans = new ArrayList<>();
-
- for (int i = 0; i < rawScans.length; i++) {
- try {
- scans.add(TableMapReduceUtil.convertStringToScan(rawScans[i]));
- } catch (IOException e) {
- throw new RuntimeException("Failed to convert Scan : " + rawScans[i] + " to string", e);
- }
- }
- this.setScans(scans);
- }
-}