You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hbase.apache.org by ap...@apache.org on 2017/08/26 01:39:13 UTC
[13/41] hbase git commit: HBASE-18640 Move mapreduce out of
hbase-server into separate module.
http://git-wip-us.apache.org/repos/asf/hbase/blob/664b6be0/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/TableSnapshotInputFormatImpl.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/TableSnapshotInputFormatImpl.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/TableSnapshotInputFormatImpl.java
deleted file mode 100644
index bf11473..0000000
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/TableSnapshotInputFormatImpl.java
+++ /dev/null
@@ -1,412 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.hadoop.hbase.mapreduce;
-
-import org.apache.hadoop.hbase.client.TableDescriptor;
-import org.apache.hadoop.hbase.shaded.com.google.common.collect.Lists;
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.hbase.CellUtil;
-import org.apache.hadoop.hbase.HDFSBlocksDistribution;
-import org.apache.hadoop.hbase.HDFSBlocksDistribution.HostAndWeight;
-import org.apache.hadoop.hbase.HRegionInfo;
-import org.apache.hadoop.hbase.classification.InterfaceAudience;
-import org.apache.hadoop.hbase.classification.InterfaceStability;
-import org.apache.hadoop.hbase.client.ClientSideRegionScanner;
-import org.apache.hadoop.hbase.client.IsolationLevel;
-import org.apache.hadoop.hbase.client.Result;
-import org.apache.hadoop.hbase.client.Scan;
-import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
-import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil;
-import org.apache.hadoop.hbase.shaded.protobuf.generated.MapReduceProtos.TableSnapshotRegionSplit;
-import org.apache.hadoop.hbase.shaded.protobuf.generated.SnapshotProtos.SnapshotDescription;
-import org.apache.hadoop.hbase.shaded.protobuf.generated.SnapshotProtos.SnapshotRegionManifest;
-import org.apache.hadoop.hbase.regionserver.HRegion;
-import org.apache.hadoop.hbase.snapshot.RestoreSnapshotHelper;
-import org.apache.hadoop.hbase.snapshot.SnapshotDescriptionUtils;
-import org.apache.hadoop.hbase.snapshot.SnapshotManifest;
-import org.apache.hadoop.hbase.util.Bytes;
-import org.apache.hadoop.hbase.util.FSUtils;
-import org.apache.hadoop.io.Writable;
-
-import java.io.ByteArrayOutputStream;
-import java.io.DataInput;
-import java.io.DataOutput;
-import java.io.IOException;
-import java.util.ArrayList;
-import java.util.List;
-import java.util.UUID;
-
-/**
- * Hadoop MR API-agnostic implementation for mapreduce over table snapshots.
- */
-@InterfaceAudience.Private
-@InterfaceStability.Evolving
-public class TableSnapshotInputFormatImpl {
- // TODO: Snapshots files are owned in fs by the hbase user. There is no
- // easy way to delegate access.
-
- public static final Log LOG = LogFactory.getLog(TableSnapshotInputFormatImpl.class);
-
- private static final String SNAPSHOT_NAME_KEY = "hbase.TableSnapshotInputFormat.snapshot.name";
- // key for specifying the root dir of the restored snapshot
- protected static final String RESTORE_DIR_KEY = "hbase.TableSnapshotInputFormat.restore.dir";
-
- /** See {@link #getBestLocations(Configuration, HDFSBlocksDistribution)} */
- private static final String LOCALITY_CUTOFF_MULTIPLIER =
- "hbase.tablesnapshotinputformat.locality.cutoff.multiplier";
- private static final float DEFAULT_LOCALITY_CUTOFF_MULTIPLIER = 0.8f;
-
- /**
- * Implementation class for InputSplit logic common between mapred and mapreduce.
- */
- public static class InputSplit implements Writable {
-
- private TableDescriptor htd;
- private HRegionInfo regionInfo;
- private String[] locations;
- private String scan;
- private String restoreDir;
-
- // constructor for mapreduce framework / Writable
- public InputSplit() {}
-
- public InputSplit(TableDescriptor htd, HRegionInfo regionInfo, List<String> locations,
- Scan scan, Path restoreDir) {
- this.htd = htd;
- this.regionInfo = regionInfo;
- if (locations == null || locations.isEmpty()) {
- this.locations = new String[0];
- } else {
- this.locations = locations.toArray(new String[locations.size()]);
- }
- try {
- this.scan = scan != null ? TableMapReduceUtil.convertScanToString(scan) : "";
- } catch (IOException e) {
- LOG.warn("Failed to convert Scan to String", e);
- }
-
- this.restoreDir = restoreDir.toString();
- }
-
- public TableDescriptor getHtd() {
- return htd;
- }
-
- public String getScan() {
- return scan;
- }
-
- public String getRestoreDir() {
- return restoreDir;
- }
-
- public long getLength() {
- //TODO: We can obtain the file sizes of the snapshot here.
- return 0;
- }
-
- public String[] getLocations() {
- return locations;
- }
-
- public TableDescriptor getTableDescriptor() {
- return htd;
- }
-
- public HRegionInfo getRegionInfo() {
- return regionInfo;
- }
-
- // TODO: We should have ProtobufSerialization in Hadoop, and directly use PB objects instead of
- // doing this wrapping with Writables.
- @Override
- public void write(DataOutput out) throws IOException {
- TableSnapshotRegionSplit.Builder builder = TableSnapshotRegionSplit.newBuilder()
- .setTable(ProtobufUtil.toTableSchema(htd))
- .setRegion(HRegionInfo.convert(regionInfo));
-
- for (String location : locations) {
- builder.addLocations(location);
- }
-
- TableSnapshotRegionSplit split = builder.build();
-
- ByteArrayOutputStream baos = new ByteArrayOutputStream();
- split.writeTo(baos);
- baos.close();
- byte[] buf = baos.toByteArray();
- out.writeInt(buf.length);
- out.write(buf);
-
- Bytes.writeByteArray(out, Bytes.toBytes(scan));
- Bytes.writeByteArray(out, Bytes.toBytes(restoreDir));
-
- }
-
- @Override
- public void readFields(DataInput in) throws IOException {
- int len = in.readInt();
- byte[] buf = new byte[len];
- in.readFully(buf);
- TableSnapshotRegionSplit split = TableSnapshotRegionSplit.PARSER.parseFrom(buf);
- this.htd = ProtobufUtil.toTableDescriptor(split.getTable());
- this.regionInfo = HRegionInfo.convert(split.getRegion());
- List<String> locationsList = split.getLocationsList();
- this.locations = locationsList.toArray(new String[locationsList.size()]);
-
- this.scan = Bytes.toString(Bytes.readByteArray(in));
- this.restoreDir = Bytes.toString(Bytes.readByteArray(in));
- }
- }
-
- /**
- * Implementation class for RecordReader logic common between mapred and mapreduce.
- */
- public static class RecordReader {
- private InputSplit split;
- private Scan scan;
- private Result result = null;
- private ImmutableBytesWritable row = null;
- private ClientSideRegionScanner scanner;
-
- public ClientSideRegionScanner getScanner() {
- return scanner;
- }
-
- public void initialize(InputSplit split, Configuration conf) throws IOException {
- this.scan = TableMapReduceUtil.convertStringToScan(split.getScan());
- this.split = split;
- TableDescriptor htd = split.htd;
- HRegionInfo hri = this.split.getRegionInfo();
- FileSystem fs = FSUtils.getCurrentFileSystem(conf);
-
-
- // region is immutable, this should be fine,
- // otherwise we have to set the thread read point
- scan.setIsolationLevel(IsolationLevel.READ_UNCOMMITTED);
- // disable caching of data blocks
- scan.setCacheBlocks(false);
-
- scanner =
- new ClientSideRegionScanner(conf, fs, new Path(split.restoreDir), htd, hri, scan, null);
- }
-
- public boolean nextKeyValue() throws IOException {
- result = scanner.next();
- if (result == null) {
- //we are done
- return false;
- }
-
- if (this.row == null) {
- this.row = new ImmutableBytesWritable();
- }
- this.row.set(result.getRow());
- return true;
- }
-
- public ImmutableBytesWritable getCurrentKey() {
- return row;
- }
-
- public Result getCurrentValue() {
- return result;
- }
-
- public long getPos() {
- return 0;
- }
-
- public float getProgress() {
- return 0; // TODO: use total bytes to estimate
- }
-
- public void close() {
- if (this.scanner != null) {
- this.scanner.close();
- }
- }
- }
-
- public static List<InputSplit> getSplits(Configuration conf) throws IOException {
- String snapshotName = getSnapshotName(conf);
-
- Path rootDir = FSUtils.getRootDir(conf);
- FileSystem fs = rootDir.getFileSystem(conf);
-
- SnapshotManifest manifest = getSnapshotManifest(conf, snapshotName, rootDir, fs);
-
- List<HRegionInfo> regionInfos = getRegionInfosFromManifest(manifest);
-
- // TODO: mapred does not support scan as input API. Work around for now.
- Scan scan = extractScanFromConf(conf);
- // the temp dir where the snapshot is restored
- Path restoreDir = new Path(conf.get(RESTORE_DIR_KEY));
-
- return getSplits(scan, manifest, regionInfos, restoreDir, conf);
- }
-
- public static List<HRegionInfo> getRegionInfosFromManifest(SnapshotManifest manifest) {
- List<SnapshotRegionManifest> regionManifests = manifest.getRegionManifests();
- if (regionManifests == null) {
- throw new IllegalArgumentException("Snapshot seems empty");
- }
-
- List<HRegionInfo> regionInfos = Lists.newArrayListWithCapacity(regionManifests.size());
-
- for (SnapshotRegionManifest regionManifest : regionManifests) {
- HRegionInfo hri = HRegionInfo.convert(regionManifest.getRegionInfo());
- if (hri.isOffline() && (hri.isSplit() || hri.isSplitParent())) {
- continue;
- }
- regionInfos.add(hri);
- }
- return regionInfos;
- }
-
- public static SnapshotManifest getSnapshotManifest(Configuration conf, String snapshotName,
- Path rootDir, FileSystem fs) throws IOException {
- Path snapshotDir = SnapshotDescriptionUtils.getCompletedSnapshotDir(snapshotName, rootDir);
- SnapshotDescription snapshotDesc = SnapshotDescriptionUtils.readSnapshotInfo(fs, snapshotDir);
- return SnapshotManifest.open(conf, fs, snapshotDir, snapshotDesc);
- }
-
- public static Scan extractScanFromConf(Configuration conf) throws IOException {
- Scan scan = null;
- if (conf.get(TableInputFormat.SCAN) != null) {
- scan = TableMapReduceUtil.convertStringToScan(conf.get(TableInputFormat.SCAN));
- } else if (conf.get(org.apache.hadoop.hbase.mapred.TableInputFormat.COLUMN_LIST) != null) {
- String[] columns =
- conf.get(org.apache.hadoop.hbase.mapred.TableInputFormat.COLUMN_LIST).split(" ");
- scan = new Scan();
- for (String col : columns) {
- scan.addFamily(Bytes.toBytes(col));
- }
- } else {
- throw new IllegalArgumentException("Unable to create scan");
- }
- return scan;
- }
-
- public static List<InputSplit> getSplits(Scan scan, SnapshotManifest manifest,
- List<HRegionInfo> regionManifests, Path restoreDir, Configuration conf) throws IOException {
- // load table descriptor
- TableDescriptor htd = manifest.getTableDescriptor();
-
- Path tableDir = FSUtils.getTableDir(restoreDir, htd.getTableName());
-
- List<InputSplit> splits = new ArrayList<>();
- for (HRegionInfo hri : regionManifests) {
- // load region descriptor
-
- if (CellUtil.overlappingKeys(scan.getStartRow(), scan.getStopRow(), hri.getStartKey(),
- hri.getEndKey())) {
- // compute HDFS locations from snapshot files (which will get the locations for
- // referred hfiles)
- List<String> hosts = getBestLocations(conf,
- HRegion.computeHDFSBlocksDistribution(conf, htd, hri, tableDir));
-
- int len = Math.min(3, hosts.size());
- hosts = hosts.subList(0, len);
- splits.add(new InputSplit(htd, hri, hosts, scan, restoreDir));
- }
- }
-
- return splits;
-
- }
-
- /**
- * This computes the locations to be passed from the InputSplit. MR/Yarn schedulers does not take
- * weights into account, thus will treat every location passed from the input split as equal. We
- * do not want to blindly pass all the locations, since we are creating one split per region, and
- * the region's blocks are all distributed throughout the cluster unless favorite node assignment
- * is used. On the expected stable case, only one location will contain most of the blocks as
- * local.
- * On the other hand, in favored node assignment, 3 nodes will contain highly local blocks. Here
- * we are doing a simple heuristic, where we will pass all hosts which have at least 80%
- * (hbase.tablesnapshotinputformat.locality.cutoff.multiplier) as much block locality as the top
- * host with the best locality.
- */
- public static List<String> getBestLocations(
- Configuration conf, HDFSBlocksDistribution blockDistribution) {
- List<String> locations = new ArrayList<>(3);
-
- HostAndWeight[] hostAndWeights = blockDistribution.getTopHostsWithWeights();
-
- if (hostAndWeights.length == 0) {
- return locations;
- }
-
- HostAndWeight topHost = hostAndWeights[0];
- locations.add(topHost.getHost());
-
- // Heuristic: filter all hosts which have at least cutoffMultiplier % of block locality
- double cutoffMultiplier
- = conf.getFloat(LOCALITY_CUTOFF_MULTIPLIER, DEFAULT_LOCALITY_CUTOFF_MULTIPLIER);
-
- double filterWeight = topHost.getWeight() * cutoffMultiplier;
-
- for (int i = 1; i < hostAndWeights.length; i++) {
- if (hostAndWeights[i].getWeight() >= filterWeight) {
- locations.add(hostAndWeights[i].getHost());
- } else {
- break;
- }
- }
-
- return locations;
- }
-
- private static String getSnapshotName(Configuration conf) {
- String snapshotName = conf.get(SNAPSHOT_NAME_KEY);
- if (snapshotName == null) {
- throw new IllegalArgumentException("Snapshot name must be provided");
- }
- return snapshotName;
- }
-
- /**
- * Configures the job to use TableSnapshotInputFormat to read from a snapshot.
- * @param conf the job to configuration
- * @param snapshotName the name of the snapshot to read from
- * @param restoreDir a temporary directory to restore the snapshot into. Current user should
- * have write permissions to this directory, and this should not be a subdirectory of rootdir.
- * After the job is finished, restoreDir can be deleted.
- * @throws IOException if an error occurs
- */
- public static void setInput(Configuration conf, String snapshotName, Path restoreDir)
- throws IOException {
- conf.set(SNAPSHOT_NAME_KEY, snapshotName);
-
- Path rootDir = FSUtils.getRootDir(conf);
- FileSystem fs = rootDir.getFileSystem(conf);
-
- restoreDir = new Path(restoreDir, UUID.randomUUID().toString());
-
- // TODO: restore from record readers to parallelize.
- RestoreSnapshotHelper.copySnapshotForScanner(conf, fs, rootDir, restoreDir, snapshotName);
-
- conf.set(RESTORE_DIR_KEY, restoreDir.toString());
- }
-}
http://git-wip-us.apache.org/repos/asf/hbase/blob/664b6be0/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/TableSplit.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/TableSplit.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/TableSplit.java
deleted file mode 100644
index 13c7c67..0000000
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/TableSplit.java
+++ /dev/null
@@ -1,395 +0,0 @@
-/**
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.hadoop.hbase.mapreduce;
-
-import java.io.DataInput;
-import java.io.DataOutput;
-import java.io.IOException;
-import java.util.Arrays;
-
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
-import org.apache.hadoop.hbase.classification.InterfaceAudience;
-import org.apache.hadoop.hbase.TableName;
-import org.apache.hadoop.hbase.HConstants;
-import org.apache.hadoop.hbase.client.Scan;
-import org.apache.hadoop.hbase.util.Bytes;
-import org.apache.hadoop.io.Writable;
-import org.apache.hadoop.io.WritableUtils;
-import org.apache.hadoop.mapreduce.InputSplit;
-
-/**
- * A table split corresponds to a key range (low, high) and an optional scanner.
- * All references to row below refer to the key of the row.
- */
-@InterfaceAudience.Public
-public class TableSplit extends InputSplit
-implements Writable, Comparable<TableSplit> {
- /** @deprecated LOG variable would be made private. fix in hbase 3.0 */
- @Deprecated
- public static final Log LOG = LogFactory.getLog(TableSplit.class);
-
- // should be < 0 (@see #readFields(DataInput))
- // version 1 supports Scan data member
- enum Version {
- UNVERSIONED(0),
- // Initial number we put on TableSplit when we introduced versioning.
- INITIAL(-1),
- // Added an encoded region name field for easier identification of split -> region
- WITH_ENCODED_REGION_NAME(-2);
-
- final int code;
- static final Version[] byCode;
- static {
- byCode = Version.values();
- for (int i = 0; i < byCode.length; i++) {
- if (byCode[i].code != -1 * i) {
- throw new AssertionError("Values in this enum should be descending by one");
- }
- }
- }
-
- Version(int code) {
- this.code = code;
- }
-
- boolean atLeast(Version other) {
- return code <= other.code;
- }
-
- static Version fromCode(int code) {
- return byCode[code * -1];
- }
- }
-
- private static final Version VERSION = Version.WITH_ENCODED_REGION_NAME;
- private TableName tableName;
- private byte [] startRow;
- private byte [] endRow;
- private String regionLocation;
- private String encodedRegionName = "";
- private String scan = ""; // stores the serialized form of the Scan
- private long length; // Contains estimation of region size in bytes
-
- /** Default constructor. */
- public TableSplit() {
- this((TableName)null, null, HConstants.EMPTY_BYTE_ARRAY,
- HConstants.EMPTY_BYTE_ARRAY, "");
- }
-
- /**
- * Creates a new instance while assigning all variables.
- * Length of region is set to 0
- * Encoded name of the region is set to blank
- *
- * @param tableName The name of the current table.
- * @param scan The scan associated with this split.
- * @param startRow The start row of the split.
- * @param endRow The end row of the split.
- * @param location The location of the region.
- */
- public TableSplit(TableName tableName, Scan scan, byte [] startRow, byte [] endRow,
- final String location) {
- this(tableName, scan, startRow, endRow, location, 0L);
- }
-
- /**
- * Creates a new instance while assigning all variables.
- * Encoded name of region is set to blank
- *
- * @param tableName The name of the current table.
- * @param scan The scan associated with this split.
- * @param startRow The start row of the split.
- * @param endRow The end row of the split.
- * @param location The location of the region.
- */
- public TableSplit(TableName tableName, Scan scan, byte [] startRow, byte [] endRow,
- final String location, long length) {
- this(tableName, scan, startRow, endRow, location, "", length);
- }
-
- /**
- * Creates a new instance while assigning all variables.
- *
- * @param tableName The name of the current table.
- * @param scan The scan associated with this split.
- * @param startRow The start row of the split.
- * @param endRow The end row of the split.
- * @param encodedRegionName The region ID.
- * @param location The location of the region.
- */
- public TableSplit(TableName tableName, Scan scan, byte [] startRow, byte [] endRow,
- final String location, final String encodedRegionName, long length) {
- this.tableName = tableName;
- try {
- this.scan =
- (null == scan) ? "" : TableMapReduceUtil.convertScanToString(scan);
- } catch (IOException e) {
- LOG.warn("Failed to convert Scan to String", e);
- }
- this.startRow = startRow;
- this.endRow = endRow;
- this.regionLocation = location;
- this.encodedRegionName = encodedRegionName;
- this.length = length;
- }
-
- /**
- * Creates a new instance without a scanner.
- * Length of region is set to 0
- *
- * @param tableName The name of the current table.
- * @param startRow The start row of the split.
- * @param endRow The end row of the split.
- * @param location The location of the region.
- */
- public TableSplit(TableName tableName, byte[] startRow, byte[] endRow,
- final String location) {
- this(tableName, null, startRow, endRow, location);
- }
-
- /**
- * Creates a new instance without a scanner.
- *
- * @param tableName The name of the current table.
- * @param startRow The start row of the split.
- * @param endRow The end row of the split.
- * @param location The location of the region.
- * @param length Size of region in bytes
- */
- public TableSplit(TableName tableName, byte[] startRow, byte[] endRow,
- final String location, long length) {
- this(tableName, null, startRow, endRow, location, length);
- }
-
- /**
- * Returns a Scan object from the stored string representation.
- *
- * @return Returns a Scan object based on the stored scanner.
- * @throws IOException
- */
- public Scan getScan() throws IOException {
- return TableMapReduceUtil.convertStringToScan(this.scan);
- }
-
- /**
- * Returns the table name converted to a byte array.
- * @see #getTable()
- * @return The table name.
- */
- public byte [] getTableName() {
- return tableName.getName();
- }
-
- /**
- * Returns the table name.
- *
- * @return The table name.
- */
- public TableName getTable() {
- // It is ugly that usually to get a TableName, the method is called getTableName. We can't do
- // that in here though because there was an existing getTableName in place already since
- // deprecated.
- return tableName;
- }
-
- /**
- * Returns the start row.
- *
- * @return The start row.
- */
- public byte [] getStartRow() {
- return startRow;
- }
-
- /**
- * Returns the end row.
- *
- * @return The end row.
- */
- public byte [] getEndRow() {
- return endRow;
- }
-
- /**
- * Returns the region location.
- *
- * @return The region's location.
- */
- public String getRegionLocation() {
- return regionLocation;
- }
-
- /**
- * Returns the region's location as an array.
- *
- * @return The array containing the region location.
- * @see org.apache.hadoop.mapreduce.InputSplit#getLocations()
- */
- @Override
- public String[] getLocations() {
- return new String[] {regionLocation};
- }
-
- /**
- * Returns the region's encoded name.
- *
- * @return The region's encoded name.
- */
- public String getEncodedRegionName() {
- return encodedRegionName;
- }
-
- /**
- * Returns the length of the split.
- *
- * @return The length of the split.
- * @see org.apache.hadoop.mapreduce.InputSplit#getLength()
- */
- @Override
- public long getLength() {
- return length;
- }
-
- /**
- * Reads the values of each field.
- *
- * @param in The input to read from.
- * @throws IOException When reading the input fails.
- */
- @Override
- public void readFields(DataInput in) throws IOException {
- Version version = Version.UNVERSIONED;
- // TableSplit was not versioned in the beginning.
- // In order to introduce it now, we make use of the fact
- // that tableName was written with Bytes.writeByteArray,
- // which encodes the array length as a vint which is >= 0.
- // Hence if the vint is >= 0 we have an old version and the vint
- // encodes the length of tableName.
- // If < 0 we just read the version and the next vint is the length.
- // @see Bytes#readByteArray(DataInput)
- int len = WritableUtils.readVInt(in);
- if (len < 0) {
- // what we just read was the version
- version = Version.fromCode(len);
- len = WritableUtils.readVInt(in);
- }
- byte[] tableNameBytes = new byte[len];
- in.readFully(tableNameBytes);
- tableName = TableName.valueOf(tableNameBytes);
- startRow = Bytes.readByteArray(in);
- endRow = Bytes.readByteArray(in);
- regionLocation = Bytes.toString(Bytes.readByteArray(in));
- if (version.atLeast(Version.INITIAL)) {
- scan = Bytes.toString(Bytes.readByteArray(in));
- }
- length = WritableUtils.readVLong(in);
- if (version.atLeast(Version.WITH_ENCODED_REGION_NAME)) {
- encodedRegionName = Bytes.toString(Bytes.readByteArray(in));
- }
- }
-
- /**
- * Writes the field values to the output.
- *
- * @param out The output to write to.
- * @throws IOException When writing the values to the output fails.
- */
- @Override
- public void write(DataOutput out) throws IOException {
- WritableUtils.writeVInt(out, VERSION.code);
- Bytes.writeByteArray(out, tableName.getName());
- Bytes.writeByteArray(out, startRow);
- Bytes.writeByteArray(out, endRow);
- Bytes.writeByteArray(out, Bytes.toBytes(regionLocation));
- Bytes.writeByteArray(out, Bytes.toBytes(scan));
- WritableUtils.writeVLong(out, length);
- Bytes.writeByteArray(out, Bytes.toBytes(encodedRegionName));
- }
-
- /**
- * Returns the details about this instance as a string.
- *
- * @return The values of this instance as a string.
- * @see java.lang.Object#toString()
- */
- @Override
- public String toString() {
- StringBuilder sb = new StringBuilder();
- sb.append("HBase table split(");
- sb.append("table name: ").append(tableName);
- // null scan input is represented by ""
- String printScan = "";
- if (!scan.equals("")) {
- try {
- // get the real scan here in toString, not the Base64 string
- printScan = TableMapReduceUtil.convertStringToScan(scan).toString();
- }
- catch (IOException e) {
- printScan = "";
- }
- }
- sb.append(", scan: ").append(printScan);
- sb.append(", start row: ").append(Bytes.toStringBinary(startRow));
- sb.append(", end row: ").append(Bytes.toStringBinary(endRow));
- sb.append(", region location: ").append(regionLocation);
- sb.append(", encoded region name: ").append(encodedRegionName);
- sb.append(")");
- return sb.toString();
- }
-
- /**
- * Compares this split against the given one.
- *
- * @param split The split to compare to.
- * @return The result of the comparison.
- * @see java.lang.Comparable#compareTo(java.lang.Object)
- */
- @Override
- public int compareTo(TableSplit split) {
- // If The table name of the two splits is the same then compare start row
- // otherwise compare based on table names
- int tableNameComparison =
- getTable().compareTo(split.getTable());
- return tableNameComparison != 0 ? tableNameComparison : Bytes.compareTo(
- getStartRow(), split.getStartRow());
- }
-
- @Override
- public boolean equals(Object o) {
- if (o == null || !(o instanceof TableSplit)) {
- return false;
- }
- return tableName.equals(((TableSplit)o).tableName) &&
- Bytes.equals(startRow, ((TableSplit)o).startRow) &&
- Bytes.equals(endRow, ((TableSplit)o).endRow) &&
- regionLocation.equals(((TableSplit)o).regionLocation);
- }
-
- @Override
- public int hashCode() {
- int result = tableName != null ? tableName.hashCode() : 0;
- result = 31 * result + (scan != null ? scan.hashCode() : 0);
- result = 31 * result + (startRow != null ? Arrays.hashCode(startRow) : 0);
- result = 31 * result + (endRow != null ? Arrays.hashCode(endRow) : 0);
- result = 31 * result + (regionLocation != null ? regionLocation.hashCode() : 0);
- result = 31 * result + (encodedRegionName != null ? encodedRegionName.hashCode() : 0);
- return result;
- }
-}
http://git-wip-us.apache.org/repos/asf/hbase/blob/664b6be0/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/TextSortReducer.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/TextSortReducer.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/TextSortReducer.java
deleted file mode 100644
index 84324e2..0000000
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/TextSortReducer.java
+++ /dev/null
@@ -1,213 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.hadoop.hbase.mapreduce;
-
-import java.io.IOException;
-import java.util.ArrayList;
-import java.util.Iterator;
-import java.util.List;
-import java.util.Set;
-import java.util.TreeSet;
-
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.hbase.ArrayBackedTag;
-import org.apache.hadoop.hbase.Cell;
-import org.apache.hadoop.hbase.CellComparator;
-import org.apache.hadoop.hbase.KeyValue;
-import org.apache.hadoop.hbase.KeyValueUtil;
-import org.apache.hadoop.hbase.Tag;
-import org.apache.hadoop.hbase.TagType;
-import org.apache.hadoop.hbase.classification.InterfaceAudience;
-import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
-import org.apache.hadoop.hbase.security.visibility.InvalidLabelException;
-import org.apache.hadoop.hbase.util.Base64;
-import org.apache.hadoop.hbase.util.Bytes;
-import org.apache.hadoop.io.Text;
-import org.apache.hadoop.mapreduce.Counter;
-import org.apache.hadoop.mapreduce.Reducer;
-import org.apache.hadoop.util.StringUtils;
-
-/**
- * Emits Sorted KeyValues. Parse the passed text and creates KeyValues. Sorts them before emit.
- * @see HFileOutputFormat2
- * @see KeyValueSortReducer
- * @see PutSortReducer
- */
-@InterfaceAudience.Public
-public class TextSortReducer extends
- Reducer<ImmutableBytesWritable, Text, ImmutableBytesWritable, KeyValue> {
-
- /** Timestamp for all inserted rows */
- private long ts;
-
- /** Column seperator */
- private String separator;
-
- /** Should skip bad lines */
- private boolean skipBadLines;
-
- private Counter badLineCount;
-
- private ImportTsv.TsvParser parser;
-
- /** Cell visibility expr **/
- private String cellVisibilityExpr;
-
- /** Cell TTL */
- private long ttl;
-
- private CellCreator kvCreator;
-
- public long getTs() {
- return ts;
- }
-
- public boolean getSkipBadLines() {
- return skipBadLines;
- }
-
- public Counter getBadLineCount() {
- return badLineCount;
- }
-
- public void incrementBadLineCount(int count) {
- this.badLineCount.increment(count);
- }
-
- /**
- * Handles initializing this class with objects specific to it (i.e., the parser).
- * Common initialization that might be leveraged by a subsclass is done in
- * <code>doSetup</code>. Hence a subclass may choose to override this method
- * and call <code>doSetup</code> as well before handling it's own custom params.
- *
- * @param context
- */
- @Override
- protected void setup(Context context) {
- Configuration conf = context.getConfiguration();
- doSetup(context, conf);
-
- parser = new ImportTsv.TsvParser(conf.get(ImportTsv.COLUMNS_CONF_KEY), separator);
- if (parser.getRowKeyColumnIndex() == -1) {
- throw new RuntimeException("No row key column specified");
- }
- this.kvCreator = new CellCreator(conf);
- }
-
- /**
- * Handles common parameter initialization that a subclass might want to leverage.
- * @param context
- * @param conf
- */
- protected void doSetup(Context context, Configuration conf) {
- // If a custom separator has been used,
- // decode it back from Base64 encoding.
- separator = conf.get(ImportTsv.SEPARATOR_CONF_KEY);
- if (separator == null) {
- separator = ImportTsv.DEFAULT_SEPARATOR;
- } else {
- separator = new String(Base64.decode(separator));
- }
-
- // Should never get 0 as we are setting this to a valid value in job configuration.
- ts = conf.getLong(ImportTsv.TIMESTAMP_CONF_KEY, 0);
-
- skipBadLines = context.getConfiguration().getBoolean(ImportTsv.SKIP_LINES_CONF_KEY, true);
- badLineCount = context.getCounter("ImportTsv", "Bad Lines");
- }
-
- @Override
- protected void reduce(
- ImmutableBytesWritable rowKey,
- java.lang.Iterable<Text> lines,
- Reducer<ImmutableBytesWritable, Text,
- ImmutableBytesWritable, KeyValue>.Context context)
- throws java.io.IOException, InterruptedException
- {
- // although reduce() is called per-row, handle pathological case
- long threshold = context.getConfiguration().getLong(
- "reducer.row.threshold", 1L * (1<<30));
- Iterator<Text> iter = lines.iterator();
- while (iter.hasNext()) {
- Set<KeyValue> kvs = new TreeSet<>(CellComparator.COMPARATOR);
- long curSize = 0;
- // stop at the end or the RAM threshold
- while (iter.hasNext() && curSize < threshold) {
- Text line = iter.next();
- byte[] lineBytes = line.getBytes();
- try {
- ImportTsv.TsvParser.ParsedLine parsed = parser.parse(lineBytes, line.getLength());
- // Retrieve timestamp if exists
- ts = parsed.getTimestamp(ts);
- cellVisibilityExpr = parsed.getCellVisibility();
- ttl = parsed.getCellTTL();
-
- // create tags for the parsed line
- List<Tag> tags = new ArrayList<>();
- if (cellVisibilityExpr != null) {
- tags.addAll(kvCreator.getVisibilityExpressionResolver().createVisibilityExpTags(
- cellVisibilityExpr));
- }
- // Add TTL directly to the KV so we can vary them when packing more than one KV
- // into puts
- if (ttl > 0) {
- tags.add(new ArrayBackedTag(TagType.TTL_TAG_TYPE, Bytes.toBytes(ttl)));
- }
- for (int i = 0; i < parsed.getColumnCount(); i++) {
- if (i == parser.getRowKeyColumnIndex() || i == parser.getTimestampKeyColumnIndex()
- || i == parser.getAttributesKeyColumnIndex() || i == parser.getCellVisibilityColumnIndex()
- || i == parser.getCellTTLColumnIndex()) {
- continue;
- }
- // Creating the KV which needs to be directly written to HFiles. Using the Facade
- // KVCreator for creation of kvs.
- Cell cell = this.kvCreator.create(lineBytes, parsed.getRowKeyOffset(),
- parsed.getRowKeyLength(), parser.getFamily(i), 0, parser.getFamily(i).length,
- parser.getQualifier(i), 0, parser.getQualifier(i).length, ts, lineBytes,
- parsed.getColumnOffset(i), parsed.getColumnLength(i), tags);
- KeyValue kv = KeyValueUtil.ensureKeyValue(cell);
- kvs.add(kv);
- curSize += kv.heapSize();
- }
- } catch (ImportTsv.TsvParser.BadTsvLineException | IllegalArgumentException
- | InvalidLabelException badLine) {
- if (skipBadLines) {
- System.err.println("Bad line." + badLine.getMessage());
- incrementBadLineCount(1);
- continue;
- }
- throw new IOException(badLine);
- }
- }
- context.setStatus("Read " + kvs.size() + " entries of " + kvs.getClass()
- + "(" + StringUtils.humanReadableInt(curSize) + ")");
- int index = 0;
- for (KeyValue kv : kvs) {
- context.write(rowKey, kv);
- if (++index > 0 && index % 100 == 0)
- context.setStatus("Wrote " + index + " key values.");
- }
-
- // if we have more entries to process
- if (iter.hasNext()) {
- // force flush because we cannot guarantee intra-row sorted order
- context.write(null, null);
- }
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/hbase/blob/664b6be0/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/TsvImporterMapper.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/TsvImporterMapper.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/TsvImporterMapper.java
deleted file mode 100644
index a9d8e03..0000000
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/TsvImporterMapper.java
+++ /dev/null
@@ -1,232 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.hadoop.hbase.mapreduce;
-
-import java.io.IOException;
-import java.util.ArrayList;
-import java.util.List;
-
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.hbase.ArrayBackedTag;
-import org.apache.hadoop.hbase.Cell;
-import org.apache.hadoop.hbase.KeyValue;
-import org.apache.hadoop.hbase.Tag;
-import org.apache.hadoop.hbase.TagType;
-import org.apache.hadoop.hbase.classification.InterfaceAudience;
-import org.apache.hadoop.hbase.client.Put;
-import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
-import org.apache.hadoop.hbase.mapreduce.ImportTsv.TsvParser.BadTsvLineException;
-import org.apache.hadoop.hbase.security.visibility.CellVisibility;
-import org.apache.hadoop.hbase.security.visibility.InvalidLabelException;
-import org.apache.hadoop.hbase.util.Base64;
-import org.apache.hadoop.hbase.util.Bytes;
-import org.apache.hadoop.io.LongWritable;
-import org.apache.hadoop.io.Text;
-import org.apache.hadoop.mapreduce.Counter;
-import org.apache.hadoop.mapreduce.Mapper;
-
-/**
- * Write table content out to files in hdfs.
- */
-@InterfaceAudience.Public
-public class TsvImporterMapper
-extends Mapper<LongWritable, Text, ImmutableBytesWritable, Put>
-{
-
- /** Timestamp for all inserted rows */
- protected long ts;
-
- /** Column seperator */
- private String separator;
-
- /** Should skip bad lines */
- private boolean skipBadLines;
- /** Should skip empty columns*/
- private boolean skipEmptyColumns;
- private Counter badLineCount;
- private boolean logBadLines;
-
- protected ImportTsv.TsvParser parser;
-
- protected Configuration conf;
-
- protected String cellVisibilityExpr;
-
- protected long ttl;
-
- protected CellCreator kvCreator;
-
- private String hfileOutPath;
-
- /** List of cell tags */
- private List<Tag> tags;
-
- public long getTs() {
- return ts;
- }
-
- public boolean getSkipBadLines() {
- return skipBadLines;
- }
-
- public Counter getBadLineCount() {
- return badLineCount;
- }
-
- public void incrementBadLineCount(int count) {
- this.badLineCount.increment(count);
- }
-
- /**
- * Handles initializing this class with objects specific to it (i.e., the parser).
- * Common initialization that might be leveraged by a subsclass is done in
- * <code>doSetup</code>. Hence a subclass may choose to override this method
- * and call <code>doSetup</code> as well before handling it's own custom params.
- *
- * @param context
- */
- @Override
- protected void setup(Context context) {
- doSetup(context);
-
- conf = context.getConfiguration();
- parser = new ImportTsv.TsvParser(conf.get(ImportTsv.COLUMNS_CONF_KEY),
- separator);
- if (parser.getRowKeyColumnIndex() == -1) {
- throw new RuntimeException("No row key column specified");
- }
- this.kvCreator = new CellCreator(conf);
- tags = new ArrayList<>();
- }
-
- /**
- * Handles common parameter initialization that a subclass might want to leverage.
- * @param context
- */
- protected void doSetup(Context context) {
- Configuration conf = context.getConfiguration();
-
- // If a custom separator has been used,
- // decode it back from Base64 encoding.
- separator = conf.get(ImportTsv.SEPARATOR_CONF_KEY);
- if (separator == null) {
- separator = ImportTsv.DEFAULT_SEPARATOR;
- } else {
- separator = new String(Base64.decode(separator));
- }
- // Should never get 0 as we are setting this to a valid value in job
- // configuration.
- ts = conf.getLong(ImportTsv.TIMESTAMP_CONF_KEY, 0);
-
- skipEmptyColumns = context.getConfiguration().getBoolean(
- ImportTsv.SKIP_EMPTY_COLUMNS, false);
- skipBadLines = context.getConfiguration().getBoolean(
- ImportTsv.SKIP_LINES_CONF_KEY, true);
- badLineCount = context.getCounter("ImportTsv", "Bad Lines");
- logBadLines = context.getConfiguration().getBoolean(ImportTsv.LOG_BAD_LINES_CONF_KEY, false);
- hfileOutPath = conf.get(ImportTsv.BULK_OUTPUT_CONF_KEY);
- }
-
- /**
- * Convert a line of TSV text into an HBase table row.
- */
- @Override
- public void map(LongWritable offset, Text value,
- Context context)
- throws IOException {
- byte[] lineBytes = value.getBytes();
-
- try {
- ImportTsv.TsvParser.ParsedLine parsed = parser.parse(
- lineBytes, value.getLength());
- ImmutableBytesWritable rowKey =
- new ImmutableBytesWritable(lineBytes,
- parsed.getRowKeyOffset(),
- parsed.getRowKeyLength());
- // Retrieve timestamp if exists
- ts = parsed.getTimestamp(ts);
- cellVisibilityExpr = parsed.getCellVisibility();
- ttl = parsed.getCellTTL();
-
- // create tags for the parsed line
- if (hfileOutPath != null) {
- tags.clear();
- if (cellVisibilityExpr != null) {
- tags.addAll(kvCreator.getVisibilityExpressionResolver().createVisibilityExpTags(
- cellVisibilityExpr));
- }
- // Add TTL directly to the KV so we can vary them when packing more than one KV
- // into puts
- if (ttl > 0) {
- tags.add(new ArrayBackedTag(TagType.TTL_TAG_TYPE, Bytes.toBytes(ttl)));
- }
- }
- Put put = new Put(rowKey.copyBytes());
- for (int i = 0; i < parsed.getColumnCount(); i++) {
- if (i == parser.getRowKeyColumnIndex() || i == parser.getTimestampKeyColumnIndex()
- || i == parser.getAttributesKeyColumnIndex() || i == parser.getCellVisibilityColumnIndex()
- || i == parser.getCellTTLColumnIndex() || (skipEmptyColumns
- && parsed.getColumnLength(i) == 0)) {
- continue;
- }
- populatePut(lineBytes, parsed, put, i);
- }
- context.write(rowKey, put);
- } catch (ImportTsv.TsvParser.BadTsvLineException | IllegalArgumentException
- | InvalidLabelException badLine) {
- if (logBadLines) {
- System.err.println(value);
- }
- System.err.println("Bad line at offset: " + offset.get() + ":\n" + badLine.getMessage());
- if (skipBadLines) {
- incrementBadLineCount(1);
- return;
- }
- throw new IOException(badLine);
- } catch (InterruptedException e) {
- e.printStackTrace();
- }
- }
-
- protected void populatePut(byte[] lineBytes, ImportTsv.TsvParser.ParsedLine parsed, Put put,
- int i) throws BadTsvLineException, IOException {
- Cell cell = null;
- if (hfileOutPath == null) {
- cell = new KeyValue(lineBytes, parsed.getRowKeyOffset(), parsed.getRowKeyLength(),
- parser.getFamily(i), 0, parser.getFamily(i).length, parser.getQualifier(i), 0,
- parser.getQualifier(i).length, ts, KeyValue.Type.Put, lineBytes,
- parsed.getColumnOffset(i), parsed.getColumnLength(i));
- if (cellVisibilityExpr != null) {
- // We won't be validating the expression here. The Visibility CP will do
- // the validation
- put.setCellVisibility(new CellVisibility(cellVisibilityExpr));
- }
- if (ttl > 0) {
- put.setTTL(ttl);
- }
- } else {
- // Creating the KV which needs to be directly written to HFiles. Using the Facade
- // KVCreator for creation of kvs.
- cell = this.kvCreator.create(lineBytes, parsed.getRowKeyOffset(), parsed.getRowKeyLength(),
- parser.getFamily(i), 0, parser.getFamily(i).length, parser.getQualifier(i), 0,
- parser.getQualifier(i).length, ts, lineBytes, parsed.getColumnOffset(i),
- parsed.getColumnLength(i), tags);
- }
- put.add(cell);
- }
-}
http://git-wip-us.apache.org/repos/asf/hbase/blob/664b6be0/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/TsvImporterTextMapper.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/TsvImporterTextMapper.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/TsvImporterTextMapper.java
deleted file mode 100644
index 581f0d0..0000000
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/TsvImporterTextMapper.java
+++ /dev/null
@@ -1,128 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.hadoop.hbase.mapreduce;
-
-import org.apache.hadoop.io.LongWritable;
-import org.apache.hadoop.io.Text;
-import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
-import org.apache.hadoop.hbase.util.Base64;
-import org.apache.hadoop.hbase.util.Pair;
-import org.apache.hadoop.mapreduce.Mapper;
-import org.apache.hadoop.mapreduce.Counter;
-import org.apache.hadoop.hbase.classification.InterfaceAudience;
-import org.apache.hadoop.conf.Configuration;
-
-import java.io.IOException;
-
-/**
- * Write table content out to map output files.
- */
-@InterfaceAudience.Public
-public class TsvImporterTextMapper
-extends Mapper<LongWritable, Text, ImmutableBytesWritable, Text>
-{
-
- /** Column seperator */
- private String separator;
-
- /** Should skip bad lines */
- private boolean skipBadLines;
- private Counter badLineCount;
- private boolean logBadLines;
-
- private ImportTsv.TsvParser parser;
-
- public boolean getSkipBadLines() {
- return skipBadLines;
- }
-
- public Counter getBadLineCount() {
- return badLineCount;
- }
-
- public void incrementBadLineCount(int count) {
- this.badLineCount.increment(count);
- }
-
- /**
- * Handles initializing this class with objects specific to it (i.e., the parser).
- * Common initialization that might be leveraged by a subsclass is done in
- * <code>doSetup</code>. Hence a subclass may choose to override this method
- * and call <code>doSetup</code> as well before handling it's own custom params.
- *
- * @param context
- */
- @Override
- protected void setup(Context context) {
- doSetup(context);
-
- Configuration conf = context.getConfiguration();
-
- parser = new ImportTsv.TsvParser(conf.get(ImportTsv.COLUMNS_CONF_KEY), separator);
- if (parser.getRowKeyColumnIndex() == -1) {
- throw new RuntimeException("No row key column specified");
- }
- }
-
- /**
- * Handles common parameter initialization that a subclass might want to leverage.
- * @param context
- */
- protected void doSetup(Context context) {
- Configuration conf = context.getConfiguration();
-
- // If a custom separator has been used,
- // decode it back from Base64 encoding.
- separator = conf.get(ImportTsv.SEPARATOR_CONF_KEY);
- if (separator == null) {
- separator = ImportTsv.DEFAULT_SEPARATOR;
- } else {
- separator = new String(Base64.decode(separator));
- }
-
- skipBadLines = context.getConfiguration().getBoolean(ImportTsv.SKIP_LINES_CONF_KEY, true);
- logBadLines = context.getConfiguration().getBoolean(ImportTsv.LOG_BAD_LINES_CONF_KEY, false);
- badLineCount = context.getCounter("ImportTsv", "Bad Lines");
- }
-
- /**
- * Convert a line of TSV text into an HBase table row.
- */
- @Override
- public void map(LongWritable offset, Text value, Context context) throws IOException {
- try {
- Pair<Integer,Integer> rowKeyOffests = parser.parseRowKey(value.getBytes(), value.getLength());
- ImmutableBytesWritable rowKey = new ImmutableBytesWritable(
- value.getBytes(), rowKeyOffests.getFirst(), rowKeyOffests.getSecond());
- context.write(rowKey, value);
- } catch (ImportTsv.TsvParser.BadTsvLineException|IllegalArgumentException badLine) {
- if (logBadLines) {
- System.err.println(value);
- }
- System.err.println("Bad line at offset: " + offset.get() + ":\n" + badLine.getMessage());
- if (skipBadLines) {
- incrementBadLineCount(1);
- return;
- }
- throw new IOException(badLine);
- } catch (InterruptedException e) {
- e.printStackTrace();
- Thread.currentThread().interrupt();
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/hbase/blob/664b6be0/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/VisibilityExpressionResolver.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/VisibilityExpressionResolver.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/VisibilityExpressionResolver.java
deleted file mode 100644
index a83a88f..0000000
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/VisibilityExpressionResolver.java
+++ /dev/null
@@ -1,45 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.hadoop.hbase.mapreduce;
-
-import java.io.IOException;
-import java.util.List;
-
-import org.apache.hadoop.hbase.classification.InterfaceAudience;
-import org.apache.hadoop.conf.Configurable;
-import org.apache.hadoop.hbase.Tag;
-
-/**
- * Interface to convert visibility expressions into Tags for storing along with Cells in HFiles.
- */
-@InterfaceAudience.Public
-public interface VisibilityExpressionResolver extends Configurable {
-
- /**
- * Giving a chance for the initialization.
- */
- void init();
-
- /**
- * Convert visibility expression into tags to be serialized.
- * @param visExpression the label expression
- * @return The list of tags corresponds to the visibility expression. These tags will be stored
- * along with the Cells.
- */
- List<Tag> createVisibilityExpTags(String visExpression) throws IOException;
-}
http://git-wip-us.apache.org/repos/asf/hbase/blob/664b6be0/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/WALInputFormat.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/WALInputFormat.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/WALInputFormat.java
deleted file mode 100644
index 8b4e967..0000000
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/WALInputFormat.java
+++ /dev/null
@@ -1,344 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.hadoop.hbase.mapreduce;
-
-import java.io.DataInput;
-import java.io.DataOutput;
-import java.io.EOFException;
-import java.io.FileNotFoundException;
-import java.io.IOException;
-import java.util.ArrayList;
-import java.util.Collections;
-import java.util.List;
-
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.FileStatus;
-import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.fs.LocatedFileStatus;
-import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.fs.RemoteIterator;
-import org.apache.hadoop.hbase.classification.InterfaceAudience;
-import org.apache.hadoop.hbase.regionserver.wal.WALEdit;
-import org.apache.hadoop.hbase.wal.AbstractFSWALProvider;
-import org.apache.hadoop.hbase.wal.WAL;
-import org.apache.hadoop.hbase.wal.WAL.Entry;
-import org.apache.hadoop.hbase.wal.WAL.Reader;
-import org.apache.hadoop.hbase.wal.WALKey;
-import org.apache.hadoop.io.Writable;
-import org.apache.hadoop.mapreduce.InputFormat;
-import org.apache.hadoop.mapreduce.InputSplit;
-import org.apache.hadoop.mapreduce.JobContext;
-import org.apache.hadoop.mapreduce.RecordReader;
-import org.apache.hadoop.mapreduce.TaskAttemptContext;
-import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
-import org.apache.hadoop.util.StringUtils;
-
-/**
- * Simple {@link InputFormat} for {@link org.apache.hadoop.hbase.wal.WAL} files.
- */
-@InterfaceAudience.Public
-public class WALInputFormat extends InputFormat<WALKey, WALEdit> {
- private static final Log LOG = LogFactory.getLog(WALInputFormat.class);
-
- public static final String START_TIME_KEY = "wal.start.time";
- public static final String END_TIME_KEY = "wal.end.time";
-
- /**
- * {@link InputSplit} for {@link WAL} files. Each split represent
- * exactly one log file.
- */
- static class WALSplit extends InputSplit implements Writable {
- private String logFileName;
- private long fileSize;
- private long startTime;
- private long endTime;
-
- /** for serialization */
- public WALSplit() {}
-
- /**
- * Represent an WALSplit, i.e. a single WAL file.
- * Start- and EndTime are managed by the split, so that WAL files can be
- * filtered before WALEdits are passed to the mapper(s).
- * @param logFileName
- * @param fileSize
- * @param startTime
- * @param endTime
- */
- public WALSplit(String logFileName, long fileSize, long startTime, long endTime) {
- this.logFileName = logFileName;
- this.fileSize = fileSize;
- this.startTime = startTime;
- this.endTime = endTime;
- }
-
- @Override
- public long getLength() throws IOException, InterruptedException {
- return fileSize;
- }
-
- @Override
- public String[] getLocations() throws IOException, InterruptedException {
- // TODO: Find the data node with the most blocks for this WAL?
- return new String[] {};
- }
-
- public String getLogFileName() {
- return logFileName;
- }
-
- public long getStartTime() {
- return startTime;
- }
-
- public long getEndTime() {
- return endTime;
- }
-
- @Override
- public void readFields(DataInput in) throws IOException {
- logFileName = in.readUTF();
- fileSize = in.readLong();
- startTime = in.readLong();
- endTime = in.readLong();
- }
-
- @Override
- public void write(DataOutput out) throws IOException {
- out.writeUTF(logFileName);
- out.writeLong(fileSize);
- out.writeLong(startTime);
- out.writeLong(endTime);
- }
-
- @Override
- public String toString() {
- return logFileName + " (" + startTime + ":" + endTime + ") length:" + fileSize;
- }
- }
-
- /**
- * {@link RecordReader} for an {@link WAL} file.
- * Implementation shared with deprecated HLogInputFormat.
- */
- static abstract class WALRecordReader<K extends WALKey> extends RecordReader<K, WALEdit> {
- private Reader reader = null;
- // visible until we can remove the deprecated HLogInputFormat
- Entry currentEntry = new Entry();
- private long startTime;
- private long endTime;
- private Configuration conf;
- private Path logFile;
- private long currentPos;
-
- @Override
- public void initialize(InputSplit split, TaskAttemptContext context)
- throws IOException, InterruptedException {
- WALSplit hsplit = (WALSplit)split;
- logFile = new Path(hsplit.getLogFileName());
- conf = context.getConfiguration();
- LOG.info("Opening reader for "+split);
- openReader(logFile);
- this.startTime = hsplit.getStartTime();
- this.endTime = hsplit.getEndTime();
- }
-
- private void openReader(Path path) throws IOException
- {
- closeReader();
- reader = AbstractFSWALProvider.openReader(path, conf);
- seek();
- setCurrentPath(path);
- }
-
- private void setCurrentPath(Path path) {
- this.logFile = path;
- }
-
- private void closeReader() throws IOException {
- if (reader != null) {
- reader.close();
- reader = null;
- }
- }
-
- private void seek() throws IOException {
- if (currentPos != 0) {
- reader.seek(currentPos);
- }
- }
-
- @Override
- public boolean nextKeyValue() throws IOException, InterruptedException {
- if (reader == null) return false;
- this.currentPos = reader.getPosition();
- Entry temp;
- long i = -1;
- try {
- do {
- // skip older entries
- try {
- temp = reader.next(currentEntry);
- i++;
- } catch (EOFException x) {
- LOG.warn("Corrupted entry detected. Ignoring the rest of the file."
- + " (This is normal when a RegionServer crashed.)");
- return false;
- }
- } while (temp != null && temp.getKey().getWriteTime() < startTime);
-
- if (temp == null) {
- if (i > 0) LOG.info("Skipped " + i + " entries.");
- LOG.info("Reached end of file.");
- return false;
- } else if (i > 0) {
- LOG.info("Skipped " + i + " entries, until ts: " + temp.getKey().getWriteTime() + ".");
- }
- boolean res = temp.getKey().getWriteTime() <= endTime;
- if (!res) {
- LOG.info("Reached ts: " + temp.getKey().getWriteTime()
- + " ignoring the rest of the file.");
- }
- return res;
- } catch (IOException e) {
- Path archivedLog = AbstractFSWALProvider.getArchivedLogPath(logFile, conf);
- if (logFile != archivedLog) {
- openReader(archivedLog);
- // Try call again in recursion
- return nextKeyValue();
- } else {
- throw e;
- }
- }
- }
-
- @Override
- public WALEdit getCurrentValue() throws IOException, InterruptedException {
- return currentEntry.getEdit();
- }
-
- @Override
- public float getProgress() throws IOException, InterruptedException {
- // N/A depends on total number of entries, which is unknown
- return 0;
- }
-
- @Override
- public void close() throws IOException {
- LOG.info("Closing reader");
- if (reader != null) this.reader.close();
- }
- }
-
- /**
- * handler for non-deprecated WALKey version. fold into WALRecordReader once we no longer
- * need to support HLogInputFormat.
- */
- static class WALKeyRecordReader extends WALRecordReader<WALKey> {
- @Override
- public WALKey getCurrentKey() throws IOException, InterruptedException {
- return currentEntry.getKey();
- }
- }
-
- @Override
- public List<InputSplit> getSplits(JobContext context) throws IOException,
- InterruptedException {
- return getSplits(context, START_TIME_KEY, END_TIME_KEY);
- }
-
- /**
- * implementation shared with deprecated HLogInputFormat
- */
- List<InputSplit> getSplits(final JobContext context, final String startKey, final String endKey)
- throws IOException, InterruptedException {
- Configuration conf = context.getConfiguration();
- boolean ignoreMissing = conf.getBoolean(WALPlayer.IGNORE_MISSING_FILES, false);
- Path[] inputPaths = getInputPaths(conf);
- long startTime = conf.getLong(startKey, Long.MIN_VALUE);
- long endTime = conf.getLong(endKey, Long.MAX_VALUE);
-
- List<FileStatus> allFiles = new ArrayList<FileStatus>();
- for(Path inputPath: inputPaths){
- FileSystem fs = inputPath.getFileSystem(conf);
- try {
- List<FileStatus> files = getFiles(fs, inputPath, startTime, endTime);
- allFiles.addAll(files);
- } catch (FileNotFoundException e) {
- if (ignoreMissing) {
- LOG.warn("File "+ inputPath +" is missing. Skipping it.");
- continue;
- }
- throw e;
- }
- }
- List<InputSplit> splits = new ArrayList<InputSplit>(allFiles.size());
- for (FileStatus file : allFiles) {
- splits.add(new WALSplit(file.getPath().toString(), file.getLen(), startTime, endTime));
- }
- return splits;
- }
-
- private Path[] getInputPaths(Configuration conf) {
- String inpDirs = conf.get(FileInputFormat.INPUT_DIR);
- return StringUtils.stringToPath(
- inpDirs.split(conf.get(WALPlayer.INPUT_FILES_SEPARATOR_KEY, ",")));
- }
-
- private List<FileStatus> getFiles(FileSystem fs, Path dir, long startTime, long endTime)
- throws IOException {
- List<FileStatus> result = new ArrayList<>();
- LOG.debug("Scanning " + dir.toString() + " for WAL files");
-
- RemoteIterator<LocatedFileStatus> iter = fs.listLocatedStatus(dir);
- if (!iter.hasNext()) return Collections.emptyList();
- while (iter.hasNext()) {
- LocatedFileStatus file = iter.next();
- if (file.isDirectory()) {
- // recurse into sub directories
- result.addAll(getFiles(fs, file.getPath(), startTime, endTime));
- } else {
- String name = file.getPath().toString();
- int idx = name.lastIndexOf('.');
- if (idx > 0) {
- try {
- long fileStartTime = Long.parseLong(name.substring(idx+1));
- if (fileStartTime <= endTime) {
- LOG.info("Found: " + file);
- result.add(file);
- }
- } catch (NumberFormatException x) {
- idx = 0;
- }
- }
- if (idx == 0) {
- LOG.warn("File " + name + " does not appear to be an WAL file. Skipping...");
- }
- }
- }
- return result;
- }
-
- @Override
- public RecordReader<WALKey, WALEdit> createRecordReader(InputSplit split,
- TaskAttemptContext context) throws IOException, InterruptedException {
- return new WALKeyRecordReader();
- }
-}
http://git-wip-us.apache.org/repos/asf/hbase/blob/664b6be0/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/WALPlayer.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/WALPlayer.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/WALPlayer.java
deleted file mode 100644
index b1e655c..0000000
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/WALPlayer.java
+++ /dev/null
@@ -1,384 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.hadoop.hbase.mapreduce;
-
-import java.io.IOException;
-import java.text.ParseException;
-import java.text.SimpleDateFormat;
-import java.util.Map;
-import java.util.TreeMap;
-
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.conf.Configured;
-import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.hbase.Cell;
-import org.apache.hadoop.hbase.CellUtil;
-import org.apache.hadoop.hbase.HBaseConfiguration;
-import org.apache.hadoop.hbase.KeyValue;
-import org.apache.hadoop.hbase.KeyValueUtil;
-import org.apache.hadoop.hbase.TableName;
-import org.apache.hadoop.hbase.classification.InterfaceAudience;
-import org.apache.hadoop.hbase.client.Connection;
-import org.apache.hadoop.hbase.client.ConnectionFactory;
-import org.apache.hadoop.hbase.client.Delete;
-import org.apache.hadoop.hbase.client.Mutation;
-import org.apache.hadoop.hbase.client.Put;
-import org.apache.hadoop.hbase.client.RegionLocator;
-import org.apache.hadoop.hbase.client.Table;
-import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
-import org.apache.hadoop.hbase.regionserver.wal.WALCellCodec;
-import org.apache.hadoop.hbase.regionserver.wal.WALEdit;
-import org.apache.hadoop.hbase.util.Bytes;
-import org.apache.hadoop.hbase.wal.WALKey;
-import org.apache.hadoop.mapreduce.Job;
-import org.apache.hadoop.mapreduce.Mapper;
-import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
-import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
-import org.apache.hadoop.util.Tool;
-import org.apache.hadoop.util.ToolRunner;
-
-/**
- * A tool to replay WAL files as a M/R job.
- * The WAL can be replayed for a set of tables or all tables,
- * and a time range can be provided (in milliseconds).
- * The WAL is filtered to the passed set of tables and the output
- * can optionally be mapped to another set of tables.
- *
- * WAL replay can also generate HFiles for later bulk importing,
- * in that case the WAL is replayed for a single table only.
- */
-@InterfaceAudience.Public
-public class WALPlayer extends Configured implements Tool {
- private static final Log LOG = LogFactory.getLog(WALPlayer.class);
- final static String NAME = "WALPlayer";
- public final static String BULK_OUTPUT_CONF_KEY = "wal.bulk.output";
- public final static String TABLES_KEY = "wal.input.tables";
- public final static String TABLE_MAP_KEY = "wal.input.tablesmap";
- public final static String INPUT_FILES_SEPARATOR_KEY = "wal.input.separator";
- public final static String IGNORE_MISSING_FILES = "wal.input.ignore.missing.files";
-
-
- // This relies on Hadoop Configuration to handle warning about deprecated configs and
- // to set the correct non-deprecated configs when an old one shows up.
- static {
- Configuration.addDeprecation("hlog.bulk.output", BULK_OUTPUT_CONF_KEY);
- Configuration.addDeprecation("hlog.input.tables", TABLES_KEY);
- Configuration.addDeprecation("hlog.input.tablesmap", TABLE_MAP_KEY);
- }
-
- private final static String JOB_NAME_CONF_KEY = "mapreduce.job.name";
-
- public WALPlayer(){
- }
-
- protected WALPlayer(final Configuration c) {
- super(c);
- }
-
- /**
- * A mapper that just writes out KeyValues.
- * This one can be used together with {@link KeyValueSortReducer}
- */
- static class WALKeyValueMapper
- extends Mapper<WALKey, WALEdit, ImmutableBytesWritable, KeyValue> {
- private byte[] table;
-
- @Override
- public void map(WALKey key, WALEdit value,
- Context context)
- throws IOException {
- try {
- // skip all other tables
- if (Bytes.equals(table, key.getTablename().getName())) {
- for (Cell cell : value.getCells()) {
- KeyValue kv = KeyValueUtil.ensureKeyValue(cell);
- if (WALEdit.isMetaEditFamily(kv)) {
- continue;
- }
- context.write(new ImmutableBytesWritable(CellUtil.cloneRow(kv)), kv);
- }
- }
- } catch (InterruptedException e) {
- e.printStackTrace();
- }
- }
-
- @Override
- public void setup(Context context) throws IOException {
- // only a single table is supported when HFiles are generated with HFileOutputFormat
- String[] tables = context.getConfiguration().getStrings(TABLES_KEY);
- if (tables == null || tables.length != 1) {
- // this can only happen when WALMapper is used directly by a class other than WALPlayer
- throw new IOException("Exactly one table must be specified for bulk HFile case.");
- }
- table = Bytes.toBytes(tables[0]);
-
- }
-
- }
-
- /**
- * A mapper that writes out {@link Mutation} to be directly applied to
- * a running HBase instance.
- */
- protected static class WALMapper
- extends Mapper<WALKey, WALEdit, ImmutableBytesWritable, Mutation> {
- private Map<TableName, TableName> tables = new TreeMap<>();
-
- @Override
- public void map(WALKey key, WALEdit value, Context context)
- throws IOException {
- try {
- if (tables.isEmpty() || tables.containsKey(key.getTablename())) {
- TableName targetTable = tables.isEmpty() ?
- key.getTablename() :
- tables.get(key.getTablename());
- ImmutableBytesWritable tableOut = new ImmutableBytesWritable(targetTable.getName());
- Put put = null;
- Delete del = null;
- Cell lastCell = null;
- for (Cell cell : value.getCells()) {
- // filtering WAL meta entries
- if (WALEdit.isMetaEditFamily(cell)) {
- continue;
- }
-
- // Allow a subclass filter out this cell.
- if (filter(context, cell)) {
- // A WALEdit may contain multiple operations (HBASE-3584) and/or
- // multiple rows (HBASE-5229).
- // Aggregate as much as possible into a single Put/Delete
- // operation before writing to the context.
- if (lastCell == null || lastCell.getTypeByte() != cell.getTypeByte()
- || !CellUtil.matchingRow(lastCell, cell)) {
- // row or type changed, write out aggregate KVs.
- if (put != null) {
- context.write(tableOut, put);
- }
- if (del != null) {
- context.write(tableOut, del);
- }
- if (CellUtil.isDelete(cell)) {
- del = new Delete(CellUtil.cloneRow(cell));
- } else {
- put = new Put(CellUtil.cloneRow(cell));
- }
- }
- if (CellUtil.isDelete(cell)) {
- del.add(cell);
- } else {
- put.add(cell);
- }
- }
- lastCell = cell;
- }
- // write residual KVs
- if (put != null) {
- context.write(tableOut, put);
- }
- if (del != null) {
- context.write(tableOut, del);
- }
- }
- } catch (InterruptedException e) {
- e.printStackTrace();
- }
- }
-
- protected boolean filter(Context context, final Cell cell) {
- return true;
- }
-
- @Override
- protected void
- cleanup(Mapper<WALKey, WALEdit, ImmutableBytesWritable, Mutation>.Context context)
- throws IOException, InterruptedException {
- super.cleanup(context);
- }
-
- @Override
- public void setup(Context context) throws IOException {
- String[] tableMap = context.getConfiguration().getStrings(TABLE_MAP_KEY);
- String[] tablesToUse = context.getConfiguration().getStrings(TABLES_KEY);
- if (tableMap == null) {
- tableMap = tablesToUse;
- }
- if (tablesToUse == null) {
- // Then user wants all tables.
- } else if (tablesToUse.length != tableMap.length) {
- // this can only happen when WALMapper is used directly by a class other than WALPlayer
- throw new IOException("Incorrect table mapping specified .");
- }
- int i = 0;
- if (tablesToUse != null) {
- for (String table : tablesToUse) {
- tables.put(TableName.valueOf(table),
- TableName.valueOf(tableMap[i++]));
- }
- }
- }
- }
-
- void setupTime(Configuration conf, String option) throws IOException {
- String val = conf.get(option);
- if (null == val) {
- return;
- }
- long ms;
- try {
- // first try to parse in user friendly form
- ms = new SimpleDateFormat("yyyy-MM-dd'T'HH:mm:ss.SS").parse(val).getTime();
- } catch (ParseException pe) {
- try {
- // then see if just a number of ms's was specified
- ms = Long.parseLong(val);
- } catch (NumberFormatException nfe) {
- throw new IOException(option
- + " must be specified either in the form 2001-02-20T16:35:06.99 "
- + "or as number of milliseconds");
- }
- }
- conf.setLong(option, ms);
- }
-
- /**
- * Sets up the actual job.
- *
- * @param args The command line parameters.
- * @return The newly created job.
- * @throws IOException When setting up the job fails.
- */
- public Job createSubmittableJob(String[] args) throws IOException {
- Configuration conf = getConf();
- setupTime(conf, WALInputFormat.START_TIME_KEY);
- setupTime(conf, WALInputFormat.END_TIME_KEY);
- String inputDirs = args[0];
- String[] tables = args[1].split(",");
- String[] tableMap;
- if (args.length > 2) {
- tableMap = args[2].split(",");
- if (tableMap.length != tables.length) {
- throw new IOException("The same number of tables and mapping must be provided.");
- }
- } else {
- // if not mapping is specified map each table to itself
- tableMap = tables;
- }
- conf.setStrings(TABLES_KEY, tables);
- conf.setStrings(TABLE_MAP_KEY, tableMap);
- conf.set(FileInputFormat.INPUT_DIR, inputDirs);
- Job job = Job.getInstance(conf, conf.get(JOB_NAME_CONF_KEY, NAME + "_" + System.currentTimeMillis()));
- job.setJarByClass(WALPlayer.class);
-
- job.setInputFormatClass(WALInputFormat.class);
- job.setMapOutputKeyClass(ImmutableBytesWritable.class);
-
- String hfileOutPath = conf.get(BULK_OUTPUT_CONF_KEY);
- if (hfileOutPath != null) {
- LOG.debug("add incremental job :" + hfileOutPath + " from " + inputDirs);
-
- // the bulk HFile case
- if (tables.length != 1) {
- throw new IOException("Exactly one table must be specified for the bulk export option");
- }
- TableName tableName = TableName.valueOf(tables[0]);
- job.setMapperClass(WALKeyValueMapper.class);
- job.setReducerClass(KeyValueSortReducer.class);
- Path outputDir = new Path(hfileOutPath);
- FileOutputFormat.setOutputPath(job, outputDir);
- job.setMapOutputValueClass(KeyValue.class);
- try (Connection conn = ConnectionFactory.createConnection(conf);
- Table table = conn.getTable(tableName);
- RegionLocator regionLocator = conn.getRegionLocator(tableName)) {
- HFileOutputFormat2.configureIncrementalLoad(job, table.getDescriptor(), regionLocator);
- }
- TableMapReduceUtil.addDependencyJarsForClasses(job.getConfiguration(),
- org.apache.hadoop.hbase.shaded.com.google.common.base.Preconditions.class);
- } else {
- // output to live cluster
- job.setMapperClass(WALMapper.class);
- job.setOutputFormatClass(MultiTableOutputFormat.class);
- TableMapReduceUtil.addDependencyJars(job);
- TableMapReduceUtil.initCredentials(job);
- // No reducers.
- job.setNumReduceTasks(0);
- }
- String codecCls = WALCellCodec.getWALCellCodecClass(conf);
- try {
- TableMapReduceUtil.addDependencyJarsForClasses(job.getConfiguration(), Class.forName(codecCls));
- } catch (Exception e) {
- throw new IOException("Cannot determine wal codec class " + codecCls, e);
- }
- return job;
- }
-
-
- /**
- * Print usage
- * @param errorMsg Error message. Can be null.
- */
- private void usage(final String errorMsg) {
- if (errorMsg != null && errorMsg.length() > 0) {
- System.err.println("ERROR: " + errorMsg);
- }
- System.err.println("Usage: " + NAME + " [options] <wal inputdir> <tables> [<tableMappings>]");
- System.err.println("Read all WAL entries for <tables>.");
- System.err.println("If no tables (\"\") are specific, all tables are imported.");
- System.err.println("(Careful, even hbase:meta entries will be imported"+
- " in that case.)");
- System.err.println("Otherwise <tables> is a comma separated list of tables.\n");
- System.err.println("The WAL entries can be mapped to new set of tables via <tableMapping>.");
- System.err.println("<tableMapping> is a command separated list of targettables.");
- System.err.println("If specified, each table in <tables> must have a mapping.\n");
- System.err.println("By default " + NAME + " will load data directly into HBase.");
- System.err.println("To generate HFiles for a bulk data load instead, pass the option:");
- System.err.println(" -D" + BULK_OUTPUT_CONF_KEY + "=/path/for/output");
- System.err.println(" (Only one table can be specified, and no mapping is allowed!)");
- System.err.println("Other options: (specify time range to WAL edit to consider)");
- System.err.println(" -D" + WALInputFormat.START_TIME_KEY + "=[date|ms]");
- System.err.println(" -D" + WALInputFormat.END_TIME_KEY + "=[date|ms]");
- System.err.println(" -D " + JOB_NAME_CONF_KEY
- + "=jobName - use the specified mapreduce job name for the wal player");
- System.err.println("For performance also consider the following options:\n"
- + " -Dmapreduce.map.speculative=false\n"
- + " -Dmapreduce.reduce.speculative=false");
- }
-
- /**
- * Main entry point.
- *
- * @param args The command line parameters.
- * @throws Exception When running the job fails.
- */
- public static void main(String[] args) throws Exception {
- int ret = ToolRunner.run(new WALPlayer(HBaseConfiguration.create()), args);
- System.exit(ret);
- }
-
- @Override
- public int run(String[] args) throws Exception {
- if (args.length < 2) {
- usage("Wrong number of arguments: " + args.length);
- System.exit(-1);
- }
- Job job = createSubmittableJob(args);
- return job.waitForCompletion(true) ? 0 : 1;
- }
-}
http://git-wip-us.apache.org/repos/asf/hbase/blob/664b6be0/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/package-info.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/package-info.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/package-info.java
deleted file mode 100644
index 199e168..0000000
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/package-info.java
+++ /dev/null
@@ -1,26 +0,0 @@
-/*
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-/**
-Provides HBase <a href="http://wiki.apache.org/hadoop/HadoopMapReduce">MapReduce</a>
-Input/OutputFormats, a table indexing MapReduce job, and utility methods.
-
-<p>See <a href="http://hbase.apache.org/book.html#mapreduce">HBase and MapReduce</a>
-in the HBase Reference Guide for mapreduce over hbase documentation.
-*/
-package org.apache.hadoop.hbase.mapreduce;