You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hbase.apache.org by st...@apache.org on 2014/03/09 02:58:44 UTC
svn commit: r1575645 [2/2] - in /hbase/branches/0.96:
hbase-client/src/main/java/org/apache/hadoop/hbase/client/
hbase-common/src/main/java/org/apache/hadoop/hbase/
hbase-common/src/test/java/org/apache/hadoop/hbase/
hbase-it/src/test/java/org/apache/h...
Added: hbase/branches/0.96/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/TableSnapshotInputFormat.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.96/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/TableSnapshotInputFormat.java?rev=1575645&view=auto
==============================================================================
--- hbase/branches/0.96/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/TableSnapshotInputFormat.java (added)
+++ hbase/branches/0.96/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/TableSnapshotInputFormat.java Sun Mar 9 01:58:43 2014
@@ -0,0 +1,406 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hbase.mapreduce;
+
+import java.io.ByteArrayOutputStream;
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+import java.lang.reflect.Method;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Set;
+import java.util.UUID;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.classification.InterfaceStability;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hbase.CellUtil;
+import org.apache.hadoop.hbase.HConstants;
+import org.apache.hadoop.hbase.HDFSBlocksDistribution;
+import org.apache.hadoop.hbase.HDFSBlocksDistribution.HostAndWeight;
+import org.apache.hadoop.hbase.HRegionInfo;
+import org.apache.hadoop.hbase.HTableDescriptor;
+import org.apache.hadoop.hbase.client.ClientSideRegionScanner;
+import org.apache.hadoop.hbase.client.IsolationLevel;
+import org.apache.hadoop.hbase.client.Result;
+import org.apache.hadoop.hbase.client.Scan;
+import org.apache.hadoop.hbase.client.TableSnapshotScanner;
+import org.apache.hadoop.hbase.client.metrics.ScanMetrics;
+import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
+import org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.RegionSpecifier;
+import org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.RegionSpecifier.RegionSpecifierType;
+import org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.SnapshotDescription;
+import org.apache.hadoop.hbase.protobuf.generated.MapReduceProtos;
+import org.apache.hadoop.hbase.regionserver.HRegion;
+import org.apache.hadoop.hbase.regionserver.HRegionFileSystem;
+import org.apache.hadoop.hbase.snapshot.ExportSnapshot;
+import org.apache.hadoop.hbase.snapshot.RestoreSnapshotHelper;
+import org.apache.hadoop.hbase.snapshot.SnapshotDescriptionUtils;
+import org.apache.hadoop.hbase.snapshot.SnapshotReferenceUtil;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.hadoop.hbase.util.FSTableDescriptors;
+import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.mapreduce.InputFormat;
+import org.apache.hadoop.mapreduce.InputSplit;
+import org.apache.hadoop.mapreduce.Job;
+import org.apache.hadoop.mapreduce.JobContext;
+import org.apache.hadoop.mapreduce.RecordReader;
+import org.apache.hadoop.mapreduce.TaskAttemptContext;
+
+import com.google.common.annotations.VisibleForTesting;
+import com.google.protobuf.ByteString;
+
+/**
+ * TableSnapshotInputFormat allows a MapReduce job to run over a table snapshot. The job
+ * bypasses HBase servers, and directly accesses the underlying files (hfile, recovered edits,
+ * hlogs, etc) directly to provide maximum performance. The snapshot is not required to be
+ * restored to the live cluster or cloned. This also allows to run the mapreduce job from an
+ * online or offline hbase cluster. The snapshot files can be exported by using the
+ * {@link ExportSnapshot} tool, to a pure-hdfs cluster, and this InputFormat can be used to
+ * run the mapreduce job directly over the snapshot files. The snapshot should not be deleted
+ * while there are jobs reading from snapshot files.
+ * <p>
+ * Usage is similar to TableInputFormat, and
+ * {@link TableMapReduceUtil#initTableSnapshotMapperJob(String, Scan, Class, Class, Class, Job, boolean, Path)}
+ * can be used to configure the job.
+ * <pre>{@code
+ * Job job = new Job(conf);
+ * Scan scan = new Scan();
+ * TableMapReduceUtil.initTableSnapshotMapperJob(snapshotName,
+ * scan, MyTableMapper.class, MyMapKeyOutput.class,
+ * MyMapOutputValueWritable.class, job, true);
+ * }
+ * </pre>
+ * <p>
+ * Internally, this input format restores the snapshot into the given tmp directory. Similar to
+ * {@link TableInputFormat} an InputSplit is created per region. The region is opened for reading
+ * from each RecordReader. An internal RegionScanner is used to execute the {@link Scan} obtained
+ * from the user.
+ * <p>
+ * HBase owns all the data and snapshot files on the filesystem. Only the HBase user can read from
+ * snapshot files and data files. HBase also enforces security because all the requests are handled
+ * by the server layer, and the user cannot read from the data files directly. To read from snapshot
+ * files directly from the file system, the user who is running the MR job must have sufficient
+ * permissions to access snapshot and reference files. This means that to run mapreduce over
+ * snapshot files, the MR job has to be run as the HBase user or the user must have group or other
+ * priviledges in the filesystem (See HBASE-8369). Note that, given other users access to read from
+ * snapshot/data files will completely circumvent the access control enforced by HBase.
+ * @see TableSnapshotScanner
+ */
+@InterfaceAudience.Public
+@InterfaceStability.Evolving
+public class TableSnapshotInputFormat extends InputFormat<ImmutableBytesWritable, Result> {
+ // TODO: Snapshots files are owned in fs by the hbase user. There is no
+ // easy way to delegate access.
+
+ private static final Log LOG = LogFactory.getLog(TableSnapshotInputFormat.class);
+
+ /** See {@link #getBestLocations(Configuration, HDFSBlocksDistribution)} */
+ private static final String LOCALITY_CUTOFF_MULTIPLIER = "hbase.tablesnapshotinputformat.locality.cutoff.multiplier";
+ private static final float DEFAULT_LOCALITY_CUTOFF_MULTIPLIER = 0.8f;
+
+ private static final String SNAPSHOT_NAME_KEY = "hbase.TableSnapshotInputFormat.snapshot.name";
+ private static final String TABLE_DIR_KEY = "hbase.TableSnapshotInputFormat.table.dir";
+
+ public static class TableSnapshotRegionSplit extends InputSplit implements Writable {
+ private String regionName;
+ private String[] locations;
+
+ // constructor for mapreduce framework / Writable
+ public TableSnapshotRegionSplit() { }
+
+ TableSnapshotRegionSplit(String regionName, List<String> locations) {
+ this.regionName = regionName;
+ if (locations == null || locations.isEmpty()) {
+ this.locations = new String[0];
+ } else {
+ this.locations = locations.toArray(new String[locations.size()]);
+ }
+ }
+ @Override
+ public long getLength() throws IOException, InterruptedException {
+ //TODO: We can obtain the file sizes of the snapshot here.
+ return 0;
+ }
+
+ @Override
+ public String[] getLocations() throws IOException, InterruptedException {
+ return locations;
+ }
+
+ // TODO: We should have ProtobufSerialization in Hadoop, and directly use PB objects instead of
+ // doing this wrapping with Writables.
+ @Override
+ public void write(DataOutput out) throws IOException {
+ MapReduceProtos.TableSnapshotRegionSplit.Builder builder =
+ MapReduceProtos.TableSnapshotRegionSplit.newBuilder()
+ .setRegion(RegionSpecifier.newBuilder()
+ .setType(RegionSpecifierType.ENCODED_REGION_NAME)
+ .setValue(ByteString.copyFrom(Bytes.toBytes(regionName))).build());
+
+ for (String location : locations) {
+ builder.addLocations(location);
+ }
+
+ MapReduceProtos.TableSnapshotRegionSplit split = builder.build();
+
+ ByteArrayOutputStream baos = new ByteArrayOutputStream();
+ split.writeTo(baos);
+ baos.close();
+ byte[] buf = baos.toByteArray();
+ out.writeInt(buf.length);
+ out.write(buf);
+ }
+ @Override
+ public void readFields(DataInput in) throws IOException {
+ int len = in.readInt();
+ byte[] buf = new byte[len];
+ in.readFully(buf);
+ MapReduceProtos.TableSnapshotRegionSplit split = MapReduceProtos.TableSnapshotRegionSplit.PARSER.parseFrom(buf);
+ this.regionName = Bytes.toString(split.getRegion().getValue().toByteArray());
+ List<String> locationsList = split.getLocationsList();
+ this.locations = locationsList.toArray(new String[locationsList.size()]);
+ }
+ }
+
+ @VisibleForTesting
+ class TableSnapshotRegionRecordReader extends RecordReader<ImmutableBytesWritable, Result> {
+ private TableSnapshotRegionSplit split;
+ private Scan scan;
+ private Result result = null;
+ private ImmutableBytesWritable row = null;
+ private ClientSideRegionScanner scanner;
+ private TaskAttemptContext context;
+ private Method getCounter;
+
+ @Override
+ public void initialize(InputSplit split, TaskAttemptContext context) throws IOException,
+ InterruptedException {
+
+ Configuration conf = context.getConfiguration();
+ this.split = (TableSnapshotRegionSplit) split;
+ String regionName = this.split.regionName;
+ String snapshotName = getSnapshotName(conf);
+ Path rootDir = new Path(conf.get(HConstants.HBASE_DIR));
+ FileSystem fs = rootDir.getFileSystem(conf);
+
+ Path tmpRootDir = new Path(conf.get(TABLE_DIR_KEY)); // This is the user specified root
+ // directory where snapshot was restored
+
+ Path snapshotDir = SnapshotDescriptionUtils.getCompletedSnapshotDir(snapshotName, rootDir);
+
+ //load table descriptor
+ HTableDescriptor htd = FSTableDescriptors.getTableDescriptorFromFs(fs, snapshotDir);
+
+ //load region descriptor
+ Path regionDir = new Path(snapshotDir, regionName);
+ HRegionInfo hri = HRegionFileSystem.loadRegionInfoFileContent(fs, regionDir);
+
+ // create scan
+ String scanStr = conf.get(TableInputFormat.SCAN);
+ if (scanStr == null) {
+ throw new IllegalArgumentException("A Scan is not configured for this job");
+ }
+ scan = TableMapReduceUtil.convertStringToScan(scanStr);
+ scan.setIsolationLevel(IsolationLevel.READ_UNCOMMITTED); // region is immutable, this should be fine,
+ // otherwise we have to set the thread read point
+
+ scanner = new ClientSideRegionScanner(conf, fs, tmpRootDir, htd, hri, scan, null);
+ if (context != null) {
+ this.context = context;
+ getCounter = TableRecordReaderImpl.retrieveGetCounterWithStringsParams(context);
+ }
+ }
+
+ @Override
+ public boolean nextKeyValue() throws IOException, InterruptedException {
+ result = scanner.next();
+ if (result == null) {
+ //we are done
+ return false;
+ }
+
+ if (this.row == null) {
+ this.row = new ImmutableBytesWritable();
+ }
+ this.row.set(result.getRow());
+
+ ScanMetrics scanMetrics = scanner.getScanMetrics();
+ if (scanMetrics != null && context != null) {
+ TableRecordReaderImpl.updateCounters(scanMetrics, 0, getCounter, context);
+ }
+
+ return true;
+ }
+
+ @Override
+ public ImmutableBytesWritable getCurrentKey() throws IOException, InterruptedException {
+ return row;
+ }
+
+ @Override
+ public Result getCurrentValue() throws IOException, InterruptedException {
+ return result;
+ }
+
+ @Override
+ public float getProgress() throws IOException, InterruptedException {
+ return 0; // TODO: use total bytes to estimate
+ }
+
+ @Override
+ public void close() throws IOException {
+ if (this.scanner != null) {
+ this.scanner.close();
+ }
+ }
+ }
+
+ @Override
+ public RecordReader<ImmutableBytesWritable, Result> createRecordReader(
+ InputSplit split, TaskAttemptContext context) throws IOException {
+ return new TableSnapshotRegionRecordReader();
+ }
+
+ @Override
+ public List<InputSplit> getSplits(JobContext job) throws IOException, InterruptedException {
+ Configuration conf = job.getConfiguration();
+ String snapshotName = getSnapshotName(conf);
+
+ Path rootDir = new Path(conf.get(HConstants.HBASE_DIR));
+ FileSystem fs = rootDir.getFileSystem(conf);
+
+ Path snapshotDir = SnapshotDescriptionUtils.getCompletedSnapshotDir(snapshotName, rootDir);
+ SnapshotDescription snapshotDesc = SnapshotDescriptionUtils.readSnapshotInfo(fs, snapshotDir);
+
+ Set<String> snapshotRegionNames
+ = SnapshotReferenceUtil.getSnapshotRegionNames(fs, snapshotDir);
+ if (snapshotRegionNames == null) {
+ throw new IllegalArgumentException("Snapshot seems empty");
+ }
+
+ // load table descriptor
+ HTableDescriptor htd = FSTableDescriptors.getTableDescriptorFromFs(fs,
+ snapshotDir);
+
+ Scan scan = TableMapReduceUtil.convertStringToScan(conf
+ .get(TableInputFormat.SCAN));
+ Path tableDir = new Path(conf.get(TABLE_DIR_KEY));
+
+ List<InputSplit> splits = new ArrayList<InputSplit>();
+ for (String regionName : snapshotRegionNames) {
+ // load region descriptor
+ Path regionDir = new Path(snapshotDir, regionName);
+ HRegionInfo hri = HRegionFileSystem.loadRegionInfoFileContent(fs,
+ regionDir);
+
+ if (CellUtil.overlappingKeys(scan.getStartRow(), scan.getStopRow(),
+ hri.getStartKey(), hri.getEndKey())) {
+ // compute HDFS locations from snapshot files (which will get the locations for
+ // referred hfiles)
+ List<String> hosts = getBestLocations(conf,
+ HRegion.computeHDFSBlocksDistribution(conf, htd, hri, tableDir));
+
+ int len = Math.min(3, hosts.size());
+ hosts = hosts.subList(0, len);
+ splits.add(new TableSnapshotRegionSplit(regionName, hosts));
+ }
+ }
+
+ return splits;
+ }
+
+ /**
+ * This computes the locations to be passed from the InputSplit. MR/Yarn schedulers does not take
+ * weights into account, thus will treat every location passed from the input split as equal. We
+ * do not want to blindly pass all the locations, since we are creating one split per region, and
+ * the region's blocks are all distributed throughout the cluster unless favorite node assignment
+ * is used. On the expected stable case, only one location will contain most of the blocks as local.
+ * On the other hand, in favored node assignment, 3 nodes will contain highly local blocks. Here
+ * we are doing a simple heuristic, where we will pass all hosts which have at least 80%
+ * (hbase.tablesnapshotinputformat.locality.cutoff.multiplier) as much block locality as the top
+ * host with the best locality.
+ */
+ @VisibleForTesting
+ List<String> getBestLocations(Configuration conf, HDFSBlocksDistribution blockDistribution) {
+ List<String> locations = new ArrayList<String>(3);
+
+ HostAndWeight[] hostAndWeights = blockDistribution.getTopHostsWithWeights();
+
+ if (hostAndWeights.length == 0) {
+ return locations;
+ }
+
+ HostAndWeight topHost = hostAndWeights[0];
+ locations.add(topHost.getHost());
+
+ // Heuristic: filter all hosts which have at least cutoffMultiplier % of block locality
+ double cutoffMultiplier
+ = conf.getFloat(LOCALITY_CUTOFF_MULTIPLIER, DEFAULT_LOCALITY_CUTOFF_MULTIPLIER);
+
+ double filterWeight = topHost.getWeight() * cutoffMultiplier;
+
+ for (int i = 1; i < hostAndWeights.length; i++) {
+ if (hostAndWeights[i].getWeight() >= filterWeight) {
+ locations.add(hostAndWeights[i].getHost());
+ } else {
+ break;
+ }
+ }
+
+ return locations;
+ }
+
+ /**
+ * Configures the job to use TableSnapshotInputFormat to read from a snapshot.
+ * @param job the job to configure
+ * @param snapshotName the name of the snapshot to read from
+ * @param restoreDir a temporary directory to restore the snapshot into. Current user should
+ * have write permissions to this directory, and this should not be a subdirectory of rootdir.
+ * After the job is finished, restoreDir can be deleted.
+ * @throws IOException if an error occurs
+ */
+ public static void setInput(Job job, String snapshotName, Path restoreDir) throws IOException {
+ Configuration conf = job.getConfiguration();
+ conf.set(SNAPSHOT_NAME_KEY, snapshotName);
+
+ Path rootDir = new Path(conf.get(HConstants.HBASE_DIR));
+ FileSystem fs = rootDir.getFileSystem(conf);
+
+ restoreDir = new Path(restoreDir, UUID.randomUUID().toString());
+
+ // TODO: restore from record readers to parallelize.
+ RestoreSnapshotHelper.copySnapshotForScanner(conf, fs, rootDir, restoreDir, snapshotName);
+
+ conf.set(TABLE_DIR_KEY, restoreDir.toString());
+ }
+
+ private static String getSnapshotName(Configuration conf) {
+ String snapshotName = conf.get(SNAPSHOT_NAME_KEY);
+ if (snapshotName == null) {
+ throw new IllegalArgumentException("Snapshot name must be provided");
+ }
+ return snapshotName;
+ }
+}
Modified: hbase/branches/0.96/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.96/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java?rev=1575645&r1=1575644&r2=1575645&view=diff
==============================================================================
--- hbase/branches/0.96/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java (original)
+++ hbase/branches/0.96/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java Sun Mar 9 01:58:43 2014
@@ -773,8 +773,23 @@ public class HRegion implements HeapSize
*/
public static HDFSBlocksDistribution computeHDFSBlocksDistribution(final Configuration conf,
final HTableDescriptor tableDescriptor, final HRegionInfo regionInfo) throws IOException {
- HDFSBlocksDistribution hdfsBlocksDistribution = new HDFSBlocksDistribution();
Path tablePath = FSUtils.getTableDir(FSUtils.getRootDir(conf), tableDescriptor.getTableName());
+ return computeHDFSBlocksDistribution(conf, tableDescriptor, regionInfo, tablePath);
+ }
+
+ /**
+ * This is a helper function to compute HDFS block distribution on demand
+ * @param conf configuration
+ * @param tableDescriptor HTableDescriptor of the table
+ * @param regionInfo encoded name of the region
+ * @param tablePath the table directory
+ * @return The HDFS blocks distribution for the given region.
+ * @throws IOException
+ */
+ public static HDFSBlocksDistribution computeHDFSBlocksDistribution(final Configuration conf,
+ final HTableDescriptor tableDescriptor, final HRegionInfo regionInfo, Path tablePath)
+ throws IOException {
+ HDFSBlocksDistribution hdfsBlocksDistribution = new HDFSBlocksDistribution();
FileSystem fs = tablePath.getFileSystem(conf);
HRegionFileSystem regionFs = new HRegionFileSystem(conf, fs, tablePath, regionInfo);
@@ -3969,11 +3984,36 @@ public class HRegion implements HeapSize
final HLog hlog,
final boolean initialize, final boolean ignoreHLog)
throws IOException {
+ Path tableDir = FSUtils.getTableDir(rootDir, info.getTable());
+ return createHRegion(info, rootDir, tableDir, conf, hTableDescriptor, hlog, initialize, ignoreHLog);
+ }
+
+ /**
+ * Convenience method creating new HRegions. Used by createTable.
+ * The {@link HLog} for the created region needs to be closed
+ * explicitly, if it is not null.
+ * Use {@link HRegion#getLog()} to get access.
+ *
+ * @param info Info for region to create.
+ * @param rootDir Root directory for HBase instance
+ * @param tableDir table directory
+ * @param conf
+ * @param hTableDescriptor
+ * @param hlog shared HLog
+ * @param initialize - true to initialize the region
+ * @param ignoreHLog - true to skip generate new hlog if it is null, mostly for createTable
+ * @return new HRegion
+ * @throws IOException
+ */
+ public static HRegion createHRegion(final HRegionInfo info, final Path rootDir, final Path tableDir,
+ final Configuration conf,
+ final HTableDescriptor hTableDescriptor,
+ final HLog hlog,
+ final boolean initialize, final boolean ignoreHLog)
+ throws IOException {
LOG.info("creating HRegion " + info.getTable().getNameAsString()
+ " HTD == " + hTableDescriptor + " RootDir = " + rootDir +
" Table name == " + info.getTable().getNameAsString());
-
- Path tableDir = FSUtils.getTableDir(rootDir, info.getTable());
FileSystem fs = FileSystem.get(conf);
HRegionFileSystem rfs = HRegionFileSystem.createRegionOnFileSystem(conf, fs, tableDir, info);
HLog effectiveHLog = hlog;
@@ -4129,15 +4169,39 @@ public class HRegion implements HeapSize
final Path rootDir, final HRegionInfo info, final HTableDescriptor htd, final HLog wal,
final RegionServerServices rsServices, final CancelableProgressable reporter)
throws IOException {
+ Path tableDir = FSUtils.getTableDir(rootDir, info.getTable());
+ return openHRegion(conf, fs, rootDir, tableDir, info, htd, wal, rsServices, reporter);
+ }
+
+ /**
+ * Open a Region.
+ * @param conf The Configuration object to use.
+ * @param fs Filesystem to use
+ * @param rootDir Root directory for HBase instance
+ * @param info Info for region to be opened.
+ * @param htd the table descriptor
+ * @param wal HLog for region to use. This method will call
+ * HLog#setSequenceNumber(long) passing the result of the call to
+ * HRegion#getMinSequenceId() to ensure the log id is properly kept
+ * up. HRegionStore does this every time it opens a new region.
+ * @param rsServices An interface we can request flushes against.
+ * @param reporter An interface we can report progress against.
+ * @return new HRegion
+ * @throws IOException
+ */
+ public static HRegion openHRegion(final Configuration conf, final FileSystem fs,
+ final Path rootDir, final Path tableDir, final HRegionInfo info, final HTableDescriptor htd, final HLog wal,
+ final RegionServerServices rsServices, final CancelableProgressable reporter)
+ throws IOException {
if (info == null) throw new NullPointerException("Passed region info is null");
if (LOG.isDebugEnabled()) {
LOG.debug("Opening region: " + info);
}
- Path dir = FSUtils.getTableDir(rootDir, info.getTable());
- HRegion r = HRegion.newHRegion(dir, wal, fs, conf, info, htd, rsServices);
+ HRegion r = HRegion.newHRegion(tableDir, wal, fs, conf, info, htd, rsServices);
return r.openHRegion(reporter);
}
+
/**
* Useful when reopening a closed region (normally for unit tests)
* @param other original object
Modified: hbase/branches/0.96/hbase-server/src/main/java/org/apache/hadoop/hbase/snapshot/RestoreSnapshotHelper.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.96/hbase-server/src/main/java/org/apache/hadoop/hbase/snapshot/RestoreSnapshotHelper.java?rev=1575645&r1=1575644&r2=1575645&view=diff
==============================================================================
--- hbase/branches/0.96/hbase-server/src/main/java/org/apache/hadoop/hbase/snapshot/RestoreSnapshotHelper.java (original)
+++ hbase/branches/0.96/hbase-server/src/main/java/org/apache/hadoop/hbase/snapshot/RestoreSnapshotHelper.java Sun Mar 9 01:58:43 2014
@@ -18,8 +18,8 @@
package org.apache.hadoop.hbase.snapshot;
-import java.io.InputStream;
import java.io.IOException;
+import java.io.InputStream;
import java.io.OutputStream;
import java.util.Arrays;
import java.util.HashMap;
@@ -37,23 +37,24 @@ import org.apache.hadoop.conf.Configurat
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.HColumnDescriptor;
import org.apache.hadoop.hbase.HRegionInfo;
import org.apache.hadoop.hbase.HTableDescriptor;
+import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.backup.HFileArchiver;
import org.apache.hadoop.hbase.catalog.CatalogTracker;
import org.apache.hadoop.hbase.catalog.MetaEditor;
-import org.apache.hadoop.hbase.monitoring.MonitoredTask;
import org.apache.hadoop.hbase.errorhandling.ForeignExceptionDispatcher;
import org.apache.hadoop.hbase.io.HFileLink;
import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
-import org.apache.hadoop.hbase.protobuf.ProtobufUtil;
+import org.apache.hadoop.hbase.monitoring.MonitoredTask;
+import org.apache.hadoop.hbase.monitoring.TaskMonitor;
import org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.SnapshotDescription;
import org.apache.hadoop.hbase.regionserver.HRegion;
import org.apache.hadoop.hbase.regionserver.HRegionFileSystem;
import org.apache.hadoop.hbase.regionserver.StoreFileInfo;
import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.hadoop.hbase.util.FSTableDescriptors;
import org.apache.hadoop.hbase.util.FSUtils;
import org.apache.hadoop.hbase.util.FSVisitor;
import org.apache.hadoop.hbase.util.ModifyRegionUtils;
@@ -471,8 +472,9 @@ public class RestoreSnapshotHelper {
}
// create the regions on disk
- ModifyRegionUtils.createRegions(conf, rootDir,
+ ModifyRegionUtils.createRegions(conf, rootDir, tableDir,
tableDesc, clonedRegionsInfo, new ModifyRegionUtils.RegionFillTask() {
+ @Override
public void fillRegion(final HRegion region) throws IOException {
cloneRegion(region, snapshotRegions.get(region.getRegionInfo().getEncodedName()));
}
@@ -499,6 +501,7 @@ public class RestoreSnapshotHelper {
final String tableName = tableDesc.getTableName().getNameAsString();
SnapshotReferenceUtil.visitRegionStoreFiles(fs, snapshotRegionDir,
new FSVisitor.StoreFileVisitor() {
+ @Override
public void storeFile (final String region, final String family, final String hfile)
throws IOException {
LOG.info("Adding HFileLink " + hfile + " to table=" + tableName);
@@ -627,10 +630,13 @@ public class RestoreSnapshotHelper {
private void restoreWALs() throws IOException {
final SnapshotLogSplitter logSplitter = new SnapshotLogSplitter(conf, fs, tableDir,
snapshotTable, regionsMap);
+ // TODO: use executors to parallelize splitting
+ // TODO: once split, we do not need to split again for other restores
try {
// Recover.Edits
SnapshotReferenceUtil.visitRecoveredEdits(fs, snapshotDir,
new FSVisitor.RecoveredEditsVisitor() {
+ @Override
public void recoveredEdits (final String region, final String logfile) throws IOException {
Path path = SnapshotReferenceUtil.getRecoveredEdits(snapshotDir, region, logfile);
logSplitter.splitRecoveredEdit(path);
@@ -639,6 +645,7 @@ public class RestoreSnapshotHelper {
// Region Server Logs
SnapshotReferenceUtil.visitLogFiles(fs, snapshotDir, new FSVisitor.LogFileVisitor() {
+ @Override
public void logFile (final String server, final String logfile) throws IOException {
logSplitter.splitLog(server, logfile);
}
@@ -689,4 +696,45 @@ public class RestoreSnapshotHelper {
}
return htd;
}
+
+ /**
+ * Copy the snapshot files for a snapshot scanner, discards meta changes.
+ * @param conf
+ * @param fs
+ * @param rootDir
+ * @param restoreDir
+ * @param snapshotName
+ * @throws IOException
+ */
+ public static void copySnapshotForScanner(Configuration conf, FileSystem fs, Path rootDir,
+ Path restoreDir, String snapshotName) throws IOException {
+ // ensure that restore dir is not under root dir
+ if (!restoreDir.getFileSystem(conf).getUri().equals(rootDir.getFileSystem(conf).getUri())) {
+ throw new IllegalArgumentException("Filesystems for restore directory and HBase root directory " +
+ "should be the same");
+ }
+ if (restoreDir.toUri().getPath().startsWith(rootDir.toUri().getPath())) {
+ throw new IllegalArgumentException("Restore directory cannot be a sub directory of HBase " +
+ "root directory. RootDir: " + rootDir + ", restoreDir: " + restoreDir);
+ }
+
+ Path snapshotDir = SnapshotDescriptionUtils.getCompletedSnapshotDir(snapshotName, rootDir);
+ SnapshotDescription snapshotDesc = SnapshotDescriptionUtils.readSnapshotInfo(fs, snapshotDir);
+
+ //load table descriptor
+ HTableDescriptor htd = FSTableDescriptors.getTableDescriptorFromFs(fs, snapshotDir);
+
+ MonitoredTask status = TaskMonitor.get().createStatus(
+ "Restoring snapshot '" + snapshotName + "' to directory " + restoreDir);
+ ForeignExceptionDispatcher monitor = new ForeignExceptionDispatcher();
+
+ RestoreSnapshotHelper helper = new RestoreSnapshotHelper(conf, fs, snapshotDesc,
+ snapshotDir, htd, restoreDir, monitor, status);
+ helper.restoreHdfsRegions(); // TODO: parallelize.
+
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Restored table dir:" + restoreDir);
+ FSUtils.logFileSystemState(fs, restoreDir, LOG);
+ }
+ }
}
Modified: hbase/branches/0.96/hbase-server/src/main/java/org/apache/hadoop/hbase/util/AbstractHBaseTool.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.96/hbase-server/src/main/java/org/apache/hadoop/hbase/util/AbstractHBaseTool.java?rev=1575645&r1=1575644&r2=1575645&view=diff
==============================================================================
--- hbase/branches/0.96/hbase-server/src/main/java/org/apache/hadoop/hbase/util/AbstractHBaseTool.java (original)
+++ hbase/branches/0.96/hbase-server/src/main/java/org/apache/hadoop/hbase/util/AbstractHBaseTool.java Sun Mar 9 01:58:43 2014
@@ -54,7 +54,7 @@ public abstract class AbstractHBaseTool
protected Configuration conf = null;
private static final Set<String> requiredOptions = new TreeSet<String>();
-
+
protected String[] cmdLineArgs = null;
/**
@@ -151,6 +151,11 @@ public abstract class AbstractHBaseTool
addOptWithArg(opt, description);
}
+ protected void addRequiredOptWithArg(String shortOpt, String longOpt, String description) {
+ requiredOptions.add(longOpt);
+ addOptWithArg(shortOpt, longOpt, description);
+ }
+
protected void addOptNoArg(String opt, String description) {
options.addOption(opt, false, description);
}
Modified: hbase/branches/0.96/hbase-server/src/main/java/org/apache/hadoop/hbase/util/ModifyRegionUtils.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.96/hbase-server/src/main/java/org/apache/hadoop/hbase/util/ModifyRegionUtils.java?rev=1575645&r1=1575644&r2=1575645&view=diff
==============================================================================
--- hbase/branches/0.96/hbase-server/src/main/java/org/apache/hadoop/hbase/util/ModifyRegionUtils.java (original)
+++ hbase/branches/0.96/hbase-server/src/main/java/org/apache/hadoop/hbase/util/ModifyRegionUtils.java Sun Mar 9 01:58:43 2014
@@ -84,6 +84,26 @@ public abstract class ModifyRegionUtils
public static List<HRegionInfo> createRegions(final Configuration conf, final Path rootDir,
final HTableDescriptor hTableDescriptor, final HRegionInfo[] newRegions,
final RegionFillTask task) throws IOException {
+
+ Path tableDir = FSUtils.getTableDir(rootDir, hTableDescriptor.getTableName());
+ return createRegions(conf, rootDir, tableDir, hTableDescriptor, newRegions, task);
+ }
+
+ /**
+ * Create new set of regions on the specified file-system.
+ * NOTE: that you should add the regions to hbase:meta after this operation.
+ *
+ * @param conf {@link Configuration}
+ * @param rootDir Root directory for HBase instance
+ * @param tableDir table directory
+ * @param hTableDescriptor description of the table
+ * @param newRegions {@link HRegionInfo} that describes the regions to create
+ * @param task {@link RegionFillTask} custom code to populate region after creation
+ * @throws IOException
+ */
+ public static List<HRegionInfo> createRegions(final Configuration conf, final Path rootDir,
+ final Path tableDir, final HTableDescriptor hTableDescriptor, final HRegionInfo[] newRegions,
+ final RegionFillTask task) throws IOException {
if (newRegions == null) return null;
int regionNumber = newRegions.length;
ThreadPoolExecutor regionOpenAndInitThreadPool = getRegionOpenAndInitThreadPool(conf,
@@ -93,26 +113,14 @@ public abstract class ModifyRegionUtils
List<HRegionInfo> regionInfos = new ArrayList<HRegionInfo>();
for (final HRegionInfo newRegion : newRegions) {
completionService.submit(new Callable<HRegionInfo>() {
+ @Override
public HRegionInfo call() throws IOException {
- // 1. Create HRegion
- HRegion region = HRegion.createHRegion(newRegion,
- rootDir, conf, hTableDescriptor, null,
- false, true);
- try {
- // 2. Custom user code to interact with the created region
- if (task != null) {
- task.fillRegion(region);
- }
- } finally {
- // 3. Close the new region to flush to disk. Close log file too.
- region.close();
- }
- return region.getRegionInfo();
+ return createRegion(conf, rootDir, tableDir, hTableDescriptor, newRegion, task);
}
});
}
try {
- // 4. wait for all regions to finish creation
+ // wait for all regions to finish creation
for (int i = 0; i < regionNumber; i++) {
Future<HRegionInfo> future = completionService.take();
HRegionInfo regionInfo = future.get();
@@ -129,6 +137,35 @@ public abstract class ModifyRegionUtils
return regionInfos;
}
+ /**
+ * Create new set of regions on the specified file-system.
+ * @param conf {@link Configuration}
+ * @param rootDir Root directory for HBase instance
+ * @param tableDir table directory
+ * @param hTableDescriptor description of the table
+ * @param newRegion {@link HRegionInfo} that describes the region to create
+ * @param task {@link RegionFillTask} custom code to populate region after creation
+ * @throws IOException
+ */
+ public static HRegionInfo createRegion(final Configuration conf, final Path rootDir,
+ final Path tableDir, final HTableDescriptor hTableDescriptor, final HRegionInfo newRegion,
+ final RegionFillTask task) throws IOException {
+ // 1. Create HRegion
+ HRegion region = HRegion.createHRegion(newRegion,
+ rootDir, tableDir, conf, hTableDescriptor, null,
+ false, true);
+ try {
+ // 2. Custom user code to interact with the created region
+ if (task != null) {
+ task.fillRegion(region);
+ }
+ } finally {
+ // 3. Close the new region to flush to disk. Close log file too.
+ region.close();
+ }
+ return region.getRegionInfo();
+ }
+
/*
* used by createRegions() to get the thread pool executor based on the
* "hbase.hregion.open.and.init.threads.max" property.
@@ -142,6 +179,7 @@ public abstract class ModifyRegionUtils
new ThreadFactory() {
private int count = 1;
+ @Override
public Thread newThread(Runnable r) {
Thread t = new Thread(r, threadNamePrefix + "-" + count++);
return t;
Modified: hbase/branches/0.96/hbase-server/src/test/java/org/apache/hadoop/hbase/HBaseTestingUtility.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.96/hbase-server/src/test/java/org/apache/hadoop/hbase/HBaseTestingUtility.java?rev=1575645&r1=1575644&r2=1575645&view=diff
==============================================================================
--- hbase/branches/0.96/hbase-server/src/test/java/org/apache/hadoop/hbase/HBaseTestingUtility.java (original)
+++ hbase/branches/0.96/hbase-server/src/test/java/org/apache/hadoop/hbase/HBaseTestingUtility.java Sun Mar 9 01:58:43 2014
@@ -163,6 +163,7 @@ public class HBaseTestingUtility extends
* mini dfs.
* @deprecated can be used only with mini dfs
*/
+ @Deprecated
private static final String TEST_DIRECTORY_KEY = "test.build.data";
/** Filesystem URI used for map-reduce mini-cluster setup */
@@ -1612,24 +1613,7 @@ public class HBaseTestingUtility extends
* @throws IOException
*/
public int loadTable(final HTable t, final byte[] f) throws IOException {
- t.setAutoFlush(false, true);
- byte[] k = new byte[3];
- int rowCount = 0;
- for (byte b1 = 'a'; b1 <= 'z'; b1++) {
- for (byte b2 = 'a'; b2 <= 'z'; b2++) {
- for (byte b3 = 'a'; b3 <= 'z'; b3++) {
- k[0] = b1;
- k[1] = b2;
- k[2] = b3;
- Put put = new Put(k);
- put.add(f, null, k);
- t.put(put);
- rowCount++;
- }
- }
- }
- t.flushCommits();
- return rowCount;
+ return loadTable(t, new byte[][] {f});
}
/**
@@ -1640,28 +1624,83 @@ public class HBaseTestingUtility extends
* @throws IOException
*/
public int loadTable(final HTable t, final byte[][] f) throws IOException {
- t.setAutoFlush(false, true);
- byte[] k = new byte[3];
+ return loadTable(t, f, null);
+ }
+
+ /**
+ * Load table of multiple column families with rows from 'aaa' to 'zzz'.
+ * @param t Table
+ * @param f Array of Families to load
+ * @param value the values of the cells. If null is passed, the row key is used as value
+ * @return Count of rows loaded.
+ * @throws IOException
+ */
+ public int loadTable(final HTable t, final byte[][] f, byte[] value) throws IOException {
+ t.setAutoFlush(false);
int rowCount = 0;
- for (byte b1 = 'a'; b1 <= 'z'; b1++) {
- for (byte b2 = 'a'; b2 <= 'z'; b2++) {
- for (byte b3 = 'a'; b3 <= 'z'; b3++) {
- k[0] = b1;
- k[1] = b2;
- k[2] = b3;
- Put put = new Put(k);
- for (int i = 0; i < f.length; i++) {
- put.add(f[i], null, k);
- }
- t.put(put);
- rowCount++;
- }
+ for (byte[] row : HBaseTestingUtility.ROWS) {
+ Put put = new Put(row);
+ for (int i = 0; i < f.length; i++) {
+ put.add(f[i], null, value != null ? value : row);
}
+ t.put(put);
+ rowCount++;
}
t.flushCommits();
return rowCount;
}
+ /** A tracker for tracking and validating table rows
+ * generated with {@link HBaseTestingUtility#loadTable(HTable, byte[])}
+ */
+ public static class SeenRowTracker {
+ int dim = 'z' - 'a' + 1;
+ int[][][] seenRows = new int[dim][dim][dim]; //count of how many times the row is seen
+ byte[] startRow;
+ byte[] stopRow;
+
+ public SeenRowTracker(byte[] startRow, byte[] stopRow) {
+ this.startRow = startRow;
+ this.stopRow = stopRow;
+ }
+
+ void reset() {
+ for (byte[] row : ROWS) {
+ seenRows[i(row[0])][i(row[1])][i(row[2])] = 0;
+ }
+ }
+
+ int i(byte b) {
+ return b - 'a';
+ }
+
+ public void addRow(byte[] row) {
+ seenRows[i(row[0])][i(row[1])][i(row[2])]++;
+ }
+
+ /** Validate that all the rows between startRow and stopRow are seen exactly once, and
+ * all other rows none
+ */
+ public void validate() {
+ for (byte b1 = 'a'; b1 <= 'z'; b1++) {
+ for (byte b2 = 'a'; b2 <= 'z'; b2++) {
+ for (byte b3 = 'a'; b3 <= 'z'; b3++) {
+ int count = seenRows[i(b1)][i(b2)][i(b3)];
+ int expectedCount = 0;
+ if (Bytes.compareTo(new byte[] {b1,b2,b3}, startRow) >= 0
+ && Bytes.compareTo(new byte[] {b1,b2,b3}, stopRow) < 0) {
+ expectedCount = 1;
+ }
+ if (count != expectedCount) {
+ String row = new String(new byte[] {b1,b2,b3});
+ throw new RuntimeException("Row:" + row + " has a seen count of " + count + " instead of " + expectedCount);
+ }
+ }
+ }
+ }
+ }
+ }
+
public int loadRegion(final HRegion r, final byte[] f) throws IOException {
return loadRegion(r, f, false);
}
@@ -1773,6 +1812,22 @@ public class HBaseTestingUtility extends
return createMultiRegions(getConfiguration(), table, columnFamily);
}
+ /** All the row values for the data loaded by {@link #loadTable(HTable, byte[])} */
+ public static final byte[][] ROWS = new byte[(int) Math.pow('z' - 'a' + 1, 3)][3]; // ~52KB
+ static {
+ int i = 0;
+ for (byte b1 = 'a'; b1 <= 'z'; b1++) {
+ for (byte b2 = 'a'; b2 <= 'z'; b2++) {
+ for (byte b3 = 'a'; b3 <= 'z'; b3++) {
+ ROWS[i][0] = b1;
+ ROWS[i][1] = b2;
+ ROWS[i][2] = b3;
+ i++;
+ }
+ }
+ }
+ }
+
public static final byte[][] KEYS = {
HConstants.EMPTY_BYTE_ARRAY, Bytes.toBytes("bbb"),
Bytes.toBytes("ccc"), Bytes.toBytes("ddd"), Bytes.toBytes("eee"),
@@ -3206,4 +3261,15 @@ public class HBaseTestingUtility extends
};
}
+ /**
+ * Returns a {@link Predicate} for checking that table is enabled
+ */
+ public Waiter.Predicate<Exception> predicateTableEnabled(final TableName tableName) {
+ return new Waiter.Predicate<Exception>() {
+ @Override
+ public boolean evaluate() throws Exception {
+ return getHBaseAdmin().isTableEnabled(tableName);
+ }
+ };
+ }
}
Added: hbase/branches/0.96/hbase-server/src/test/java/org/apache/hadoop/hbase/ScanPerformanceEvaluation.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.96/hbase-server/src/test/java/org/apache/hadoop/hbase/ScanPerformanceEvaluation.java?rev=1575645&view=auto
==============================================================================
--- hbase/branches/0.96/hbase-server/src/test/java/org/apache/hadoop/hbase/ScanPerformanceEvaluation.java (added)
+++ hbase/branches/0.96/hbase-server/src/test/java/org/apache/hadoop/hbase/ScanPerformanceEvaluation.java Sun Mar 9 01:58:43 2014
@@ -0,0 +1,393 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hbase;
+
+import java.io.IOException;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.commons.cli.CommandLine;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FSDataInputStream;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hbase.client.HTable;
+import org.apache.hadoop.hbase.client.Result;
+import org.apache.hadoop.hbase.client.ResultScanner;
+import org.apache.hadoop.hbase.client.Scan;
+import org.apache.hadoop.hbase.client.TableSnapshotScanner;
+import org.apache.hadoop.hbase.client.metrics.ScanMetrics;
+import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
+import org.apache.hadoop.hbase.mapreduce.TableMapReduceUtil;
+import org.apache.hadoop.hbase.mapreduce.TableMapper;
+import org.apache.hadoop.hbase.protobuf.ProtobufUtil;
+import org.apache.hadoop.hbase.util.AbstractHBaseTool;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.hadoop.hbase.util.FSUtils;
+import org.apache.hadoop.io.NullWritable;
+import org.apache.hadoop.mapreduce.Counters;
+import org.apache.hadoop.mapreduce.Job;
+import org.apache.hadoop.mapreduce.lib.output.NullOutputFormat;
+import org.apache.hadoop.util.StringUtils;
+import org.apache.hadoop.util.ToolRunner;
+
+import com.google.common.base.Stopwatch;
+
+/**
+ * A simple performance evaluation tool for single client and MR scans
+ * and snapshot scans.
+ */
+public class ScanPerformanceEvaluation extends AbstractHBaseTool {
+
+ private static final String HBASE_COUNTER_GROUP_NAME = "HBase Counters";
+
+ private String type;
+ private String file;
+ private String tablename;
+ private String snapshotName;
+ private String restoreDir;
+ private String caching;
+
+ @Override
+ public void setConf(Configuration conf) {
+ super.setConf(conf);
+ Path rootDir;
+ try {
+ rootDir = FSUtils.getRootDir(conf);
+ rootDir.getFileSystem(conf);
+ } catch (IOException ex) {
+ throw new RuntimeException(ex);
+ }
+ }
+
+ @Override
+ protected void addOptions() {
+ this.addRequiredOptWithArg("t", "type", "the type of the test. One of the following: streaming|scan|snapshotscan|scanmapreduce|snapshotscanmapreduce");
+ this.addOptWithArg("f", "file", "the filename to read from");
+ this.addOptWithArg("tn", "table", "the tablename to read from");
+ this.addOptWithArg("sn", "snapshot", "the snapshot name to read from");
+ this.addOptWithArg("rs", "restoredir", "the directory to restore the snapshot");
+ this.addOptWithArg("ch", "caching", "scanner caching value");
+ }
+
+ @Override
+ protected void processOptions(CommandLine cmd) {
+ type = cmd.getOptionValue("type");
+ file = cmd.getOptionValue("file");
+ tablename = cmd.getOptionValue("table");
+ snapshotName = cmd.getOptionValue("snapshot");
+ restoreDir = cmd.getOptionValue("restoredir");
+ caching = cmd.getOptionValue("caching");
+ }
+
+ protected void testHdfsStreaming(Path filename) throws IOException {
+ byte[] buf = new byte[1024];
+ FileSystem fs = filename.getFileSystem(getConf());
+
+ // read the file from start to finish
+ Stopwatch fileOpenTimer = new Stopwatch();
+ Stopwatch streamTimer = new Stopwatch();
+
+ fileOpenTimer.start();
+ FSDataInputStream in = fs.open(filename);
+ fileOpenTimer.stop();
+
+ long totalBytes = 0;
+ streamTimer.start();
+ while (true) {
+ int read = in.read(buf);
+ if (read < 0) {
+ break;
+ }
+ totalBytes += read;
+ }
+ streamTimer.stop();
+
+ double throughput = (double)totalBytes / streamTimer.elapsedTime(TimeUnit.SECONDS);
+
+ System.out.println("HDFS streaming: ");
+ System.out.println("total time to open: " + fileOpenTimer.elapsedMillis() + " ms");
+ System.out.println("total time to read: " + streamTimer.elapsedMillis() + " ms");
+ System.out.println("total bytes: " + totalBytes + " bytes ("
+ + StringUtils.humanReadableInt(totalBytes) + ")");
+ System.out.println("throghput : " + StringUtils.humanReadableInt((long)throughput) + "B/s");
+ }
+
+ private Scan getScan() {
+ Scan scan = new Scan(); // default scan settings
+ scan.setCacheBlocks(false);
+ scan.setMaxVersions(1);
+ scan.setAttribute(Scan.SCAN_ATTRIBUTES_METRICS_ENABLE, Bytes.toBytes(Boolean.TRUE));
+ if (caching != null) {
+ scan.setCaching(Integer.parseInt(caching));
+ }
+
+ return scan;
+ }
+
+ public void testScan() throws IOException {
+ Stopwatch tableOpenTimer = new Stopwatch();
+ Stopwatch scanOpenTimer = new Stopwatch();
+ Stopwatch scanTimer = new Stopwatch();
+
+ tableOpenTimer.start();
+ HTable table = new HTable(getConf(), TableName.valueOf(tablename));
+ tableOpenTimer.stop();
+
+ Scan scan = getScan();
+ scanOpenTimer.start();
+ ResultScanner scanner = table.getScanner(scan);
+ scanOpenTimer.stop();
+
+ long numRows = 0;
+ long numCells = 0;
+ scanTimer.start();
+ while (true) {
+ Result result = scanner.next();
+ if (result == null) {
+ break;
+ }
+ numRows++;
+
+ numCells += result.rawCells().length;
+ }
+ scanTimer.stop();
+ scanner.close();
+ table.close();
+
+ ScanMetrics metrics = ProtobufUtil.toScanMetrics(scan.getAttribute(Scan.SCAN_ATTRIBUTES_METRICS_DATA));
+ long totalBytes = metrics.countOfBytesInResults.get();
+ double throughput = (double)totalBytes / scanTimer.elapsedTime(TimeUnit.SECONDS);
+ double throughputRows = (double)numRows / scanTimer.elapsedTime(TimeUnit.SECONDS);
+ double throughputCells = (double)numCells / scanTimer.elapsedTime(TimeUnit.SECONDS);
+
+ System.out.println("HBase scan: ");
+ System.out.println("total time to open table: " + tableOpenTimer.elapsedMillis() + " ms");
+ System.out.println("total time to open scanner: " + scanOpenTimer.elapsedMillis() + " ms");
+ System.out.println("total time to scan: " + scanTimer.elapsedMillis() + " ms");
+
+ System.out.println("Scan metrics:\n" + metrics.getMetricsMap());
+
+ System.out.println("total bytes: " + totalBytes + " bytes ("
+ + StringUtils.humanReadableInt(totalBytes) + ")");
+ System.out.println("throughput : " + StringUtils.humanReadableInt((long)throughput) + "B/s");
+ System.out.println("total rows : " + numRows);
+ System.out.println("throughput : " + StringUtils.humanReadableInt((long)throughputRows) + " rows/s");
+ System.out.println("total cells : " + numCells);
+ System.out.println("throughput : " + StringUtils.humanReadableInt((long)throughputCells) + " cells/s");
+ }
+
+
+ public void testSnapshotScan() throws IOException {
+ Stopwatch snapshotRestoreTimer = new Stopwatch();
+ Stopwatch scanOpenTimer = new Stopwatch();
+ Stopwatch scanTimer = new Stopwatch();
+
+ Path restoreDir = new Path(this.restoreDir);
+
+ snapshotRestoreTimer.start();
+ restoreDir.getFileSystem(conf).delete(restoreDir, true);
+ snapshotRestoreTimer.stop();
+
+ Scan scan = getScan();
+ scanOpenTimer.start();
+ TableSnapshotScanner scanner = new TableSnapshotScanner(conf, restoreDir, snapshotName, scan);
+ scanOpenTimer.stop();
+
+ long numRows = 0;
+ long numCells = 0;
+ scanTimer.start();
+ while (true) {
+ Result result = scanner.next();
+ if (result == null) {
+ break;
+ }
+ numRows++;
+
+ numCells += result.rawCells().length;
+ }
+ scanTimer.stop();
+ scanner.close();
+
+ ScanMetrics metrics = scanner.getScanMetrics();
+ long totalBytes = metrics.countOfBytesInResults.get();
+ double throughput = (double)totalBytes / scanTimer.elapsedTime(TimeUnit.SECONDS);
+ double throughputRows = (double)numRows / scanTimer.elapsedTime(TimeUnit.SECONDS);
+ double throughputCells = (double)numCells / scanTimer.elapsedTime(TimeUnit.SECONDS);
+
+ System.out.println("HBase scan snapshot: ");
+ System.out.println("total time to restore snapshot: " + snapshotRestoreTimer.elapsedMillis() + " ms");
+ System.out.println("total time to open scanner: " + scanOpenTimer.elapsedMillis() + " ms");
+ System.out.println("total time to scan: " + scanTimer.elapsedMillis() + " ms");
+
+ System.out.println("Scan metrics:\n" + metrics.getMetricsMap());
+
+ System.out.println("total bytes: " + totalBytes + " bytes ("
+ + StringUtils.humanReadableInt(totalBytes) + ")");
+ System.out.println("throughput : " + StringUtils.humanReadableInt((long)throughput) + "B/s");
+ System.out.println("total rows : " + numRows);
+ System.out.println("throughput : " + StringUtils.humanReadableInt((long)throughputRows) + " rows/s");
+ System.out.println("total cells : " + numCells);
+ System.out.println("throughput : " + StringUtils.humanReadableInt((long)throughputCells) + " cells/s");
+
+ }
+
+ public static enum ScanCounter {
+ NUM_ROWS,
+ NUM_CELLS,
+ }
+
+ public static class MyMapper<KEYOUT, VALUEOUT> extends TableMapper<KEYOUT, VALUEOUT> {
+ @Override
+ protected void map(ImmutableBytesWritable key, Result value,
+ Context context) throws IOException,
+ InterruptedException {
+ context.getCounter(ScanCounter.NUM_ROWS).increment(1);
+ context.getCounter(ScanCounter.NUM_CELLS).increment(value.rawCells().length);
+ }
+ }
+
+ public void testScanMapReduce() throws IOException, InterruptedException, ClassNotFoundException {
+ Stopwatch scanOpenTimer = new Stopwatch();
+ Stopwatch scanTimer = new Stopwatch();
+
+ Scan scan = getScan();
+
+ String jobName = "testScanMapReduce";
+
+ Job job = new Job(conf);
+ job.setJobName(jobName);
+
+ job.setJarByClass(getClass());
+
+ TableMapReduceUtil.initTableMapperJob(
+ this.tablename,
+ scan,
+ MyMapper.class,
+ NullWritable.class,
+ NullWritable.class,
+ job
+ );
+
+ job.setNumReduceTasks(0);
+ job.setOutputKeyClass(NullWritable.class);
+ job.setOutputValueClass(NullWritable.class);
+ job.setOutputFormatClass(NullOutputFormat.class);
+
+ scanTimer.start();
+ job.waitForCompletion(true);
+ scanTimer.stop();
+
+ Counters counters = job.getCounters();
+ long numRows = counters.findCounter(ScanCounter.NUM_ROWS).getValue();
+ long numCells = counters.findCounter(ScanCounter.NUM_CELLS).getValue();
+
+ long totalBytes = counters.findCounter(HBASE_COUNTER_GROUP_NAME, "BYTES_IN_RESULTS").getValue();
+ double throughput = (double)totalBytes / scanTimer.elapsedTime(TimeUnit.SECONDS);
+ double throughputRows = (double)numRows / scanTimer.elapsedTime(TimeUnit.SECONDS);
+ double throughputCells = (double)numCells / scanTimer.elapsedTime(TimeUnit.SECONDS);
+
+ System.out.println("HBase scan mapreduce: ");
+ System.out.println("total time to open scanner: " + scanOpenTimer.elapsedMillis() + " ms");
+ System.out.println("total time to scan: " + scanTimer.elapsedMillis() + " ms");
+
+ System.out.println("total bytes: " + totalBytes + " bytes ("
+ + StringUtils.humanReadableInt(totalBytes) + ")");
+ System.out.println("throughput : " + StringUtils.humanReadableInt((long)throughput) + "B/s");
+ System.out.println("total rows : " + numRows);
+ System.out.println("throughput : " + StringUtils.humanReadableInt((long)throughputRows) + " rows/s");
+ System.out.println("total cells : " + numCells);
+ System.out.println("throughput : " + StringUtils.humanReadableInt((long)throughputCells) + " cells/s");
+ }
+
+ public void testSnapshotScanMapReduce() throws IOException, InterruptedException, ClassNotFoundException {
+ Stopwatch scanOpenTimer = new Stopwatch();
+ Stopwatch scanTimer = new Stopwatch();
+
+ Scan scan = getScan();
+
+ String jobName = "testSnapshotScanMapReduce";
+
+ Job job = new Job(conf);
+ job.setJobName(jobName);
+
+ job.setJarByClass(getClass());
+
+ TableMapReduceUtil.initTableSnapshotMapperJob(
+ this.snapshotName,
+ scan,
+ MyMapper.class,
+ NullWritable.class,
+ NullWritable.class,
+ job,
+ true,
+ new Path(restoreDir)
+ );
+
+ job.setNumReduceTasks(0);
+ job.setOutputKeyClass(NullWritable.class);
+ job.setOutputValueClass(NullWritable.class);
+ job.setOutputFormatClass(NullOutputFormat.class);
+
+ scanTimer.start();
+ job.waitForCompletion(true);
+ scanTimer.stop();
+
+ Counters counters = job.getCounters();
+ long numRows = counters.findCounter(ScanCounter.NUM_ROWS).getValue();
+ long numCells = counters.findCounter(ScanCounter.NUM_CELLS).getValue();
+
+ long totalBytes = counters.findCounter(HBASE_COUNTER_GROUP_NAME, "BYTES_IN_RESULTS").getValue();
+ double throughput = (double)totalBytes / scanTimer.elapsedTime(TimeUnit.SECONDS);
+ double throughputRows = (double)numRows / scanTimer.elapsedTime(TimeUnit.SECONDS);
+ double throughputCells = (double)numCells / scanTimer.elapsedTime(TimeUnit.SECONDS);
+
+ System.out.println("HBase scan mapreduce: ");
+ System.out.println("total time to open scanner: " + scanOpenTimer.elapsedMillis() + " ms");
+ System.out.println("total time to scan: " + scanTimer.elapsedMillis() + " ms");
+
+ System.out.println("total bytes: " + totalBytes + " bytes ("
+ + StringUtils.humanReadableInt(totalBytes) + ")");
+ System.out.println("throughput : " + StringUtils.humanReadableInt((long)throughput) + "B/s");
+ System.out.println("total rows : " + numRows);
+ System.out.println("throughput : " + StringUtils.humanReadableInt((long)throughputRows) + " rows/s");
+ System.out.println("total cells : " + numCells);
+ System.out.println("throughput : " + StringUtils.humanReadableInt((long)throughputCells) + " cells/s");
+ }
+
+ @Override
+ protected int doWork() throws Exception {
+ if (type.equals("streaming")) {
+ testHdfsStreaming(new Path(file));
+ } else if (type.equals("scan")){
+ testScan();
+ } else if (type.equals("snapshotscan")) {
+ testSnapshotScan();
+ } else if (type.equals("scanmapreduce")) {
+ testScanMapReduce();
+ } else if (type.equals("snapshotscanmapreduce")) {
+ testSnapshotScanMapReduce();
+ }
+ return 0;
+ }
+
+ public static void main (String[] args) throws Exception {
+ int ret = ToolRunner.run(HBaseConfiguration.create(), new ScanPerformanceEvaluation(), args);
+ System.exit(ret);
+ }
+}
Added: hbase/branches/0.96/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestTableSnapshotScanner.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.96/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestTableSnapshotScanner.java?rev=1575645&view=auto
==============================================================================
--- hbase/branches/0.96/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestTableSnapshotScanner.java (added)
+++ hbase/branches/0.96/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestTableSnapshotScanner.java Sun Mar 9 01:58:43 2014
@@ -0,0 +1,190 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hbase.client;
+
+import java.io.IOException;
+import java.util.Arrays;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hbase.Cell;
+import org.apache.hadoop.hbase.CellScanner;
+import org.apache.hadoop.hbase.HBaseTestingUtility;
+import org.apache.hadoop.hbase.HConstants;
+import org.apache.hadoop.hbase.LargeTests;
+import org.apache.hadoop.hbase.TableName;
+import org.apache.hadoop.hbase.mapreduce.TestTableSnapshotInputFormat;
+import org.apache.hadoop.hbase.master.snapshot.SnapshotManager;
+import org.apache.hadoop.hbase.snapshot.SnapshotTestingUtils;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.junit.After;
+import org.junit.Assert;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+
+@Category(LargeTests.class)
+public class TestTableSnapshotScanner {
+
+ private static final Log LOG = LogFactory.getLog(TestTableSnapshotInputFormat.class);
+ private final HBaseTestingUtility UTIL = new HBaseTestingUtility();
+ private static final int NUM_REGION_SERVERS = 2;
+ private static final byte[][] FAMILIES = {Bytes.toBytes("f1"), Bytes.toBytes("f2")};
+ public static byte[] bbb = Bytes.toBytes("bbb");
+ public static byte[] yyy = Bytes.toBytes("yyy");
+
+ private FileSystem fs;
+ private Path rootDir;
+
+ public void setupCluster() throws Exception {
+ setupConf(UTIL.getConfiguration());
+ UTIL.startMiniCluster(NUM_REGION_SERVERS);
+ rootDir = UTIL.getHBaseCluster().getMaster().getMasterFileSystem().getRootDir();
+ fs = rootDir.getFileSystem(UTIL.getConfiguration());
+ }
+
+ public void tearDownCluster() throws Exception {
+ UTIL.shutdownMiniCluster();
+ }
+
+ private static void setupConf(Configuration conf) {
+ // Enable snapshot
+ conf.setBoolean(SnapshotManager.HBASE_SNAPSHOT_ENABLED, true);
+ }
+
+ @After
+ public void tearDown() throws Exception {
+ }
+
+ public static void createTableAndSnapshot(HBaseTestingUtility util, TableName tableName,
+ String snapshotName, int numRegions)
+ throws Exception {
+ try {
+ util.deleteTable(tableName);
+ } catch(Exception ex) {
+ // ignore
+ }
+
+ if (numRegions > 1) {
+ util.createTable(tableName, FAMILIES, 1, bbb, yyy, numRegions);
+ } else {
+ util.createTable(tableName, FAMILIES);
+ }
+ HBaseAdmin admin = util.getHBaseAdmin();
+
+ // put some stuff in the table
+ HTable table = new HTable(util.getConfiguration(), tableName);
+ util.loadTable(table, FAMILIES);
+
+ Path rootDir = new Path(util.getConfiguration().get(HConstants.HBASE_DIR));
+ FileSystem fs = rootDir.getFileSystem(util.getConfiguration());
+
+ SnapshotTestingUtils.createSnapshotAndValidate(admin, tableName,
+ Arrays.asList(FAMILIES), null, snapshotName, rootDir, fs, true);
+
+ // load different values
+ byte[] value = Bytes.toBytes("after_snapshot_value");
+ util.loadTable(table, FAMILIES, value);
+
+ // cause flush to create new files in the region
+ admin.flush(tableName.toString());
+ table.close();
+ }
+
+ @Test
+ public void testWithSingleRegion() throws Exception {
+ testScanner(UTIL, "testWithSingleRegion", 1, false);
+ }
+
+ @Test
+ public void testWithMultiRegion() throws Exception {
+ testScanner(UTIL, "testWithMultiRegion", 10, false);
+ }
+
+ @Test
+ public void testWithOfflineHBaseMultiRegion() throws Exception {
+ testScanner(UTIL, "testWithMultiRegion", 20, true);
+ }
+
+ private void testScanner(HBaseTestingUtility util, String snapshotName, int numRegions, boolean shutdownCluster)
+ throws Exception {
+ setupCluster();
+ TableName tableName = TableName.valueOf("testScanner");
+ try {
+ createTableAndSnapshot(util, tableName, snapshotName, numRegions);
+
+ if (shutdownCluster) {
+ util.shutdownMiniHBaseCluster();
+ }
+
+ Path restoreDir = util.getDataTestDirOnTestFS(snapshotName);
+ Scan scan = new Scan(bbb, yyy); // limit the scan
+
+ TableSnapshotScanner scanner = new TableSnapshotScanner(UTIL.getConfiguration(), restoreDir, snapshotName, scan);
+
+ verifyScanner(scanner, bbb, yyy);
+ scanner.close();
+ } finally {
+ if (!shutdownCluster) {
+ util.getHBaseAdmin().deleteSnapshot(snapshotName);
+ util.deleteTable(tableName);
+ tearDownCluster();
+ }
+ }
+ }
+
+ private void verifyScanner(ResultScanner scanner, byte[] startRow, byte[] stopRow)
+ throws IOException, InterruptedException {
+
+ HBaseTestingUtility.SeenRowTracker rowTracker = new HBaseTestingUtility.SeenRowTracker(startRow, stopRow);
+
+ while (true) {
+ Result result = scanner.next();
+ if (result == null) {
+ break;
+ }
+ verifyRow(result);
+ rowTracker.addRow(result.getRow());
+ }
+
+ // validate all rows are seen
+ rowTracker.validate();
+ }
+
+ private static void verifyRow(Result result) throws IOException {
+ byte[] row = result.getRow();
+ CellScanner scanner = result.cellScanner();
+ while (scanner.advance()) {
+ Cell cell = scanner.current();
+
+ //assert that all Cells in the Result have the same key
+ Assert.assertEquals(0, Bytes.compareTo(row, 0, row.length,
+ cell.getRowArray(), cell.getRowOffset(), cell.getRowLength()));
+ }
+
+ for (int j = 0; j < FAMILIES.length; j++) {
+ byte[] actual = result.getValue(FAMILIES[j], null);
+ Assert.assertArrayEquals("Row in snapshot does not match, expected:" + Bytes.toString(row)
+ + " ,actual:" + Bytes.toString(actual), row, actual);
+ }
+ }
+
+}
Added: hbase/branches/0.96/hbase-server/src/test/java/org/apache/hadoop/hbase/mapreduce/TestTableSnapshotInputFormat.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.96/hbase-server/src/test/java/org/apache/hadoop/hbase/mapreduce/TestTableSnapshotInputFormat.java?rev=1575645&view=auto
==============================================================================
--- hbase/branches/0.96/hbase-server/src/test/java/org/apache/hadoop/hbase/mapreduce/TestTableSnapshotInputFormat.java (added)
+++ hbase/branches/0.96/hbase-server/src/test/java/org/apache/hadoop/hbase/mapreduce/TestTableSnapshotInputFormat.java Sun Mar 9 01:58:43 2014
@@ -0,0 +1,354 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hbase.mapreduce;
+
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
+
+import java.io.IOException;
+import java.util.Arrays;
+import java.util.List;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hbase.Cell;
+import org.apache.hadoop.hbase.CellScanner;
+import org.apache.hadoop.hbase.HBaseTestingUtility;
+import org.apache.hadoop.hbase.HConstants;
+import org.apache.hadoop.hbase.HDFSBlocksDistribution;
+import org.apache.hadoop.hbase.LargeTests;
+import org.apache.hadoop.hbase.TableName;
+import org.apache.hadoop.hbase.client.HBaseAdmin;
+import org.apache.hadoop.hbase.client.HTable;
+import org.apache.hadoop.hbase.client.Result;
+import org.apache.hadoop.hbase.client.Scan;
+import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
+import org.apache.hadoop.hbase.mapreduce.TableSnapshotInputFormat.TableSnapshotRegionSplit;
+import org.apache.hadoop.hbase.master.snapshot.SnapshotManager;
+import org.apache.hadoop.hbase.snapshot.SnapshotTestingUtils;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.hadoop.io.NullWritable;
+import org.apache.hadoop.mapreduce.InputSplit;
+import org.apache.hadoop.mapreduce.Job;
+import org.apache.hadoop.mapreduce.RecordReader;
+import org.apache.hadoop.mapreduce.Reducer;
+import org.apache.hadoop.mapreduce.TaskAttemptContext;
+import org.apache.hadoop.mapreduce.lib.output.NullOutputFormat;
+import org.junit.After;
+import org.junit.Assert;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+
+import com.google.common.collect.Lists;
+
+@Category(LargeTests.class)
+public class TestTableSnapshotInputFormat {
+
+ private static final Log LOG = LogFactory.getLog(TestTableSnapshotInputFormat.class);
+ private final HBaseTestingUtility UTIL = new HBaseTestingUtility();
+ private static final int NUM_REGION_SERVERS = 2;
+ private static final String TABLE_NAME_STR = "TestTableSnapshotInputFormat";
+ private static final byte[][] FAMILIES = {Bytes.toBytes("f1"), Bytes.toBytes("f2")};
+ private static final TableName TABLE_NAME = TableName.valueOf(TABLE_NAME_STR);
+ public static byte[] bbb = Bytes.toBytes("bbb");
+ public static byte[] yyy = Bytes.toBytes("yyy");
+
+ private FileSystem fs;
+ private Path rootDir;
+
+ public void setupCluster() throws Exception {
+ setupConf(UTIL.getConfiguration());
+ UTIL.startMiniCluster(NUM_REGION_SERVERS);
+ rootDir = UTIL.getHBaseCluster().getMaster().getMasterFileSystem().getRootDir();
+ fs = rootDir.getFileSystem(UTIL.getConfiguration());
+ }
+
+ public void tearDownCluster() throws Exception {
+ UTIL.shutdownMiniCluster();
+ }
+
+ private static void setupConf(Configuration conf) {
+ // Enable snapshot
+ conf.setBoolean(SnapshotManager.HBASE_SNAPSHOT_ENABLED, true);
+ }
+
+ @After
+ public void tearDown() throws Exception {
+ }
+
+ @Test
+ public void testGetBestLocations() throws IOException {
+ TableSnapshotInputFormat tsif = new TableSnapshotInputFormat();
+ Configuration conf = UTIL.getConfiguration();
+
+ HDFSBlocksDistribution blockDistribution = new HDFSBlocksDistribution();
+ Assert.assertEquals(Lists.newArrayList(), tsif.getBestLocations(conf, blockDistribution));
+
+ blockDistribution.addHostsAndBlockWeight(new String[] {"h1"}, 1);
+ Assert.assertEquals(Lists.newArrayList("h1"), tsif.getBestLocations(conf, blockDistribution));
+
+ blockDistribution.addHostsAndBlockWeight(new String[] {"h1"}, 1);
+ Assert.assertEquals(Lists.newArrayList("h1"), tsif.getBestLocations(conf, blockDistribution));
+
+ blockDistribution.addHostsAndBlockWeight(new String[] {"h2"}, 1);
+ Assert.assertEquals(Lists.newArrayList("h1"), tsif.getBestLocations(conf, blockDistribution));
+
+ blockDistribution = new HDFSBlocksDistribution();
+ blockDistribution.addHostsAndBlockWeight(new String[] {"h1"}, 10);
+ blockDistribution.addHostsAndBlockWeight(new String[] {"h2"}, 7);
+ blockDistribution.addHostsAndBlockWeight(new String[] {"h3"}, 5);
+ blockDistribution.addHostsAndBlockWeight(new String[] {"h4"}, 1);
+ Assert.assertEquals(Lists.newArrayList("h1"), tsif.getBestLocations(conf, blockDistribution));
+
+ blockDistribution.addHostsAndBlockWeight(new String[] {"h2"}, 2);
+ Assert.assertEquals(Lists.newArrayList("h1", "h2"), tsif.getBestLocations(conf, blockDistribution));
+
+ blockDistribution.addHostsAndBlockWeight(new String[] {"h2"}, 3);
+ Assert.assertEquals(Lists.newArrayList("h2", "h1"), tsif.getBestLocations(conf, blockDistribution));
+
+ blockDistribution.addHostsAndBlockWeight(new String[] {"h3"}, 6);
+ blockDistribution.addHostsAndBlockWeight(new String[] {"h4"}, 9);
+
+ Assert.assertEquals(Lists.newArrayList("h2", "h3", "h4", "h1"), tsif.getBestLocations(conf, blockDistribution));
+ }
+
+ public static enum TestTableSnapshotCounters {
+ VALIDATION_ERROR
+ }
+
+ public static class TestTableSnapshotMapper
+ extends TableMapper<ImmutableBytesWritable, NullWritable> {
+ @Override
+ protected void map(ImmutableBytesWritable key, Result value,
+ Context context) throws IOException, InterruptedException {
+ // Validate a single row coming from the snapshot, and emit the row key
+ verifyRowFromMap(key, value);
+ context.write(key, NullWritable.get());
+ }
+ }
+
+ public static class TestTableSnapshotReducer
+ extends Reducer<ImmutableBytesWritable, NullWritable, NullWritable, NullWritable> {
+ HBaseTestingUtility.SeenRowTracker rowTracker = new HBaseTestingUtility.SeenRowTracker(bbb, yyy);
+ @Override
+ protected void reduce(ImmutableBytesWritable key, Iterable<NullWritable> values,
+ Context context) throws IOException, InterruptedException {
+ rowTracker.addRow(key.get());
+ }
+
+ @Override
+ protected void cleanup(Context context) throws IOException,
+ InterruptedException {
+ rowTracker.validate();
+ }
+ }
+
+ public static void createTableAndSnapshot(HBaseTestingUtility util, TableName tableName,
+ String snapshotName, int numRegions)
+ throws Exception {
+ try {
+ util.deleteTable(tableName);
+ } catch(Exception ex) {
+ // ignore
+ }
+
+ if (numRegions > 1) {
+ util.createTable(tableName, FAMILIES, 1, bbb, yyy, numRegions);
+ } else {
+ util.createTable(tableName, FAMILIES);
+ }
+ HBaseAdmin admin = util.getHBaseAdmin();
+
+ // put some stuff in the table
+ HTable table = new HTable(util.getConfiguration(), tableName);
+ util.loadTable(table, FAMILIES);
+
+ Path rootDir = new Path(util.getConfiguration().get(HConstants.HBASE_DIR));
+ FileSystem fs = rootDir.getFileSystem(util.getConfiguration());
+
+ SnapshotTestingUtils.createSnapshotAndValidate(admin, tableName,
+ Arrays.asList(FAMILIES), null, snapshotName, rootDir, fs, true);
+
+ // load different values
+ byte[] value = Bytes.toBytes("after_snapshot_value");
+ util.loadTable(table, FAMILIES, value);
+
+ // cause flush to create new files in the region
+ admin.flush(tableName.toString());
+ table.close();
+ }
+
+ @Test
+ public void testWithMockedMapReduceSingleRegion() throws Exception {
+ testWithMockedMapReduce(UTIL, "testWithMockedMapReduceSingleRegion", 1, 1);
+ }
+
+ @Test
+ public void testWithMockedMapReduceMultiRegion() throws Exception {
+ testWithMockedMapReduce(UTIL, "testWithMockedMapReduceMultiRegion", 10, 8);
+ }
+
+ public void testWithMockedMapReduce(HBaseTestingUtility util, String snapshotName, int numRegions, int expectedNumSplits)
+ throws Exception {
+ setupCluster();
+ TableName tableName = TableName.valueOf("testWithMockedMapReduce");
+ try {
+ createTableAndSnapshot(util, tableName, snapshotName, numRegions);
+
+ Job job = new Job(util.getConfiguration());
+ Path tmpTableDir = util.getDataTestDirOnTestFS(snapshotName);
+ Scan scan = new Scan(bbb, yyy); // limit the scan
+
+ TableMapReduceUtil.initTableSnapshotMapperJob(snapshotName,
+ scan, TestTableSnapshotMapper.class, ImmutableBytesWritable.class,
+ NullWritable.class, job, false, tmpTableDir);
+
+ verifyWithMockedMapReduce(job, numRegions, expectedNumSplits, bbb, yyy);
+
+ } finally {
+ util.getHBaseAdmin().deleteSnapshot(snapshotName);
+ util.deleteTable(tableName);
+ tearDownCluster();
+ }
+ }
+
+ private void verifyWithMockedMapReduce(Job job, int numRegions, int expectedNumSplits,
+ byte[] startRow, byte[] stopRow)
+ throws IOException, InterruptedException {
+ TableSnapshotInputFormat tsif = new TableSnapshotInputFormat();
+ List<InputSplit> splits = tsif.getSplits(job);
+
+ Assert.assertEquals(expectedNumSplits, splits.size());
+
+ HBaseTestingUtility.SeenRowTracker rowTracker = new HBaseTestingUtility.SeenRowTracker(startRow, stopRow);
+
+ for (int i = 0; i < splits.size(); i++) {
+ // validate input split
+ InputSplit split = splits.get(i);
+ Assert.assertTrue(split instanceof TableSnapshotRegionSplit);
+
+ // validate record reader
+ TaskAttemptContext taskAttemptContext = mock(TaskAttemptContext.class);
+ when(taskAttemptContext.getConfiguration()).thenReturn(job.getConfiguration());
+ RecordReader<ImmutableBytesWritable, Result> rr = tsif.createRecordReader(split, taskAttemptContext);
+ rr.initialize(split, taskAttemptContext);
+
+ // validate we can read all the data back
+ while (rr.nextKeyValue()) {
+ byte[] row = rr.getCurrentKey().get();
+ verifyRowFromMap(rr.getCurrentKey(), rr.getCurrentValue());
+ rowTracker.addRow(row);
+ }
+
+ rr.close();
+ }
+
+ // validate all rows are seen
+ rowTracker.validate();
+ }
+
+ public static void verifyRowFromMap(ImmutableBytesWritable key, Result result) throws IOException {
+ byte[] row = key.get();
+ CellScanner scanner = result.cellScanner();
+ while (scanner.advance()) {
+ Cell cell = scanner.current();
+
+ //assert that all Cells in the Result have the same key
+ Assert.assertEquals(0, Bytes.compareTo(row, 0, row.length,
+ cell.getRowArray(), cell.getRowOffset(), cell.getRowLength()));
+ }
+
+ for (int j = 0; j < FAMILIES.length; j++) {
+ byte[] actual = result.getValue(FAMILIES[j], null);
+ Assert.assertArrayEquals("Row in snapshot does not match, expected:" + Bytes.toString(row)
+ + " ,actual:" + Bytes.toString(actual), row, actual);
+ }
+ }
+
+ @Test
+ public void testWithMapReduceSingleRegion() throws Exception {
+ testWithMapReduce(UTIL, "testWithMapReduceSingleRegion", 1, 1, false);
+ }
+
+ @Test
+ public void testWithMapReduceMultiRegion() throws Exception {
+ testWithMapReduce(UTIL, "testWithMapReduceMultiRegion", 10, 8, false);
+ }
+
+ @Test
+ // run the MR job while HBase is offline
+ public void testWithMapReduceAndOfflineHBaseMultiRegion() throws Exception {
+ testWithMapReduce(UTIL, "testWithMapReduceAndOfflineHBaseMultiRegion", 10, 8, true);
+ }
+
+ private void testWithMapReduce(HBaseTestingUtility util, String snapshotName,
+ int numRegions, int expectedNumSplits, boolean shutdownCluster) throws Exception {
+ setupCluster();
+ util.startMiniMapReduceCluster();
+ try {
+ Path tableDir = util.getDataTestDirOnTestFS(snapshotName);
+ TableName tableName = TableName.valueOf("testWithMapReduce");
+ doTestWithMapReduce(util, tableName, snapshotName, tableDir, numRegions,
+ expectedNumSplits, shutdownCluster);
+ } finally {
+ util.shutdownMiniMapReduceCluster();
+ tearDownCluster();
+ }
+ }
+
+ // this is also called by the IntegrationTestTableSnapshotInputFormat
+ public static void doTestWithMapReduce(HBaseTestingUtility util, TableName tableName,
+ String snapshotName, Path tableDir, int numRegions, int expectedNumSplits, boolean shutdownCluster)
+ throws Exception {
+
+ //create the table and snapshot
+ createTableAndSnapshot(util, tableName, snapshotName, numRegions);
+
+ if (shutdownCluster) {
+ util.shutdownMiniHBaseCluster();
+ }
+
+ try {
+ // create the job
+ Job job = new Job(util.getConfiguration());
+ Scan scan = new Scan(bbb, yyy); // limit the scan
+
+ job.setJarByClass(util.getClass());
+ TableMapReduceUtil.addDependencyJars(job.getConfiguration(), TestTableSnapshotInputFormat.class);
+
+ TableMapReduceUtil.initTableSnapshotMapperJob(snapshotName,
+ scan, TestTableSnapshotMapper.class, ImmutableBytesWritable.class,
+ NullWritable.class, job, true, tableDir);
+
+ job.setReducerClass(TestTableSnapshotInputFormat.TestTableSnapshotReducer.class);
+ job.setNumReduceTasks(1);
+ job.setOutputFormatClass(NullOutputFormat.class);
+
+ Assert.assertTrue(job.waitForCompletion(true));
+ } finally {
+ if (!shutdownCluster) {
+ util.getHBaseAdmin().deleteSnapshot(snapshotName);
+ util.deleteTable(tableName);
+ }
+ }
+ }
+}
Modified: hbase/branches/0.96/hbase-server/src/test/java/org/apache/hadoop/hbase/master/TestRegionPlacement.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.96/hbase-server/src/test/java/org/apache/hadoop/hbase/master/TestRegionPlacement.java?rev=1575645&r1=1575644&r2=1575645&view=diff
==============================================================================
--- hbase/branches/0.96/hbase-server/src/test/java/org/apache/hadoop/hbase/master/TestRegionPlacement.java (original)
+++ hbase/branches/0.96/hbase-server/src/test/java/org/apache/hadoop/hbase/master/TestRegionPlacement.java Sun Mar 9 01:58:43 2014
@@ -664,7 +664,7 @@ public class TestRegionPlacement {
/**
* Create a table with specified table name and region number.
- * @param table
+ * @param tablename
* @param regionNum
* @return
* @throws IOException