You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hbase.apache.org by mb...@apache.org on 2014/05/20 11:47:01 UTC
svn commit: r1596168 - in
/hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase:
master/snapshot/ regionserver/ snapshot/ util/
Author: mbertozzi
Date: Tue May 20 09:47:01 2014
New Revision: 1596168
URL: http://svn.apache.org/r1596168
Log:
HBASE-11185 Parallelize Snapshot operations
Modified:
hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/master/snapshot/DisabledTableSnapshotHandler.java
hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/master/snapshot/MasterSnapshotVerifier.java
hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreFileInfo.java
hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/snapshot/RestoreSnapshotHelper.java
hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/snapshot/SnapshotInfo.java
hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/snapshot/SnapshotManifest.java
hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/snapshot/SnapshotManifestV2.java
hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/snapshot/SnapshotReferenceUtil.java
hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/util/ModifyRegionUtils.java
Modified: hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/master/snapshot/DisabledTableSnapshotHandler.java
URL: http://svn.apache.org/viewvc/hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/master/snapshot/DisabledTableSnapshotHandler.java?rev=1596168&r1=1596167&r2=1596168&view=diff
==============================================================================
--- hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/master/snapshot/DisabledTableSnapshotHandler.java (original)
+++ hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/master/snapshot/DisabledTableSnapshotHandler.java Tue May 20 09:47:01 2014
@@ -21,6 +21,7 @@ import java.io.IOException;
import java.util.HashSet;
import java.util.List;
import java.util.Set;
+import java.util.concurrent.ThreadPoolExecutor;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
@@ -36,6 +37,9 @@ import org.apache.hadoop.hbase.master.Ma
import org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.SnapshotDescription;
import org.apache.hadoop.hbase.snapshot.ClientSnapshotDescriptionUtils;
import org.apache.hadoop.hbase.snapshot.SnapshotDescriptionUtils;
+import org.apache.hadoop.hbase.snapshot.SnapshotManifest;
+import org.apache.hadoop.hbase.util.FSUtils;
+import org.apache.hadoop.hbase.util.ModifyRegionUtils;
import org.apache.hadoop.hbase.util.Pair;
import org.apache.zookeeper.KeeperException;
@@ -88,8 +92,17 @@ public class DisabledTableSnapshotHandle
+ ClientSnapshotDescriptionUtils.toString(snapshot);
LOG.info(msg);
status.setStatus(msg);
- for (HRegionInfo regionInfo: regions) {
- snapshotDisabledRegion(regionInfo);
+
+ ThreadPoolExecutor exec = SnapshotManifest.createExecutor(conf, "DisabledTableSnapshot");
+ try {
+ ModifyRegionUtils.editRegions(exec, regions, new ModifyRegionUtils.RegionEditTask() {
+ @Override
+ public void editRegion(final HRegionInfo regionInfo) throws IOException {
+ snapshotManifest.addRegion(FSUtils.getTableDir(rootDir, snapshotTable), regionInfo);
+ }
+ });
+ } finally {
+ exec.shutdown();
}
} catch (Exception e) {
// make sure we capture the exception to propagate back to the client later
Modified: hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/master/snapshot/MasterSnapshotVerifier.java
URL: http://svn.apache.org/viewvc/hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/master/snapshot/MasterSnapshotVerifier.java?rev=1596168&r1=1596167&r2=1596168&view=diff
==============================================================================
--- hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/master/snapshot/MasterSnapshotVerifier.java (original)
+++ hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/master/snapshot/MasterSnapshotVerifier.java Tue May 20 09:47:01 2014
@@ -129,7 +129,7 @@ public final class MasterSnapshotVerifie
/**
* Check that the table descriptor for the snapshot is a valid table descriptor
- * @param snapshotDir snapshot directory to check
+ * @param manifest snapshot manifest to inspect
*/
private void verifyTableInfo(final SnapshotManifest manifest) throws IOException {
HTableDescriptor htd = manifest.getTableDescriptor();
@@ -145,7 +145,7 @@ public final class MasterSnapshotVerifie
/**
* Check that all the regions in the snapshot are valid, and accounted for.
- * @param snapshotDir snapshot directory to check
+ * @param manifest snapshot manifest to inspect
* @throws IOException if we can't reach hbase:meta or read the files from the FS
*/
private void verifyRegions(final SnapshotManifest manifest) throws IOException {
@@ -167,6 +167,7 @@ public final class MasterSnapshotVerifie
LOG.error(errorMsg);
}
+ // Verify HRegionInfo
for (HRegionInfo region : regions) {
SnapshotRegionManifest regionManifest = regionManifests.get(region.getEncodedName());
if (regionManifest == null) {
@@ -177,20 +178,23 @@ public final class MasterSnapshotVerifie
continue;
}
- verifyRegion(fs, manifest.getSnapshotDir(), region, regionManifest);
+ verifyRegionInfo(region, regionManifest);
}
+
if (!errorMsg.isEmpty()) {
throw new CorruptedSnapshotException(errorMsg);
}
+
+ // Verify Snapshot HFiles
+ SnapshotReferenceUtil.verifySnapshot(services.getConfiguration(), fs, manifest);
}
/**
- * Verify that the region (regioninfo, hfiles) are valid
- * @param fs the FileSystem instance
- * @param snapshotDir snapshot directory to check
+ * Verify that the regionInfo is valid
* @param region the region to check
+ * @param manifest snapshot manifest to inspect
*/
- private void verifyRegion(final FileSystem fs, final Path snapshotDir, final HRegionInfo region,
+ private void verifyRegionInfo(final HRegionInfo region,
final SnapshotRegionManifest manifest) throws IOException {
HRegionInfo manifestRegionInfo = HRegionInfo.convert(manifest.getRegionInfo());
if (!region.equals(manifestRegionInfo)) {
@@ -198,16 +202,5 @@ public final class MasterSnapshotVerifie
"doesn't match expected region:" + region;
throw new CorruptedSnapshotException(msg, snapshot);
}
-
- // make sure we have all the expected store files
- SnapshotReferenceUtil.visitRegionStoreFiles(manifest,
- new SnapshotReferenceUtil.StoreFileVisitor() {
- @Override
- public void storeFile(final HRegionInfo regionInfo, final String family,
- final SnapshotRegionManifest.StoreFile storeFile) throws IOException {
- SnapshotReferenceUtil.verifyStoreFile(services.getConfiguration(), fs, snapshotDir,
- snapshot, region, family, storeFile);
- }
- });
}
}
Modified: hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreFileInfo.java
URL: http://svn.apache.org/viewvc/hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreFileInfo.java?rev=1596168&r1=1596167&r2=1596168&view=diff
==============================================================================
--- hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreFileInfo.java (original)
+++ hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreFileInfo.java Tue May 20 09:47:01 2014
@@ -223,6 +223,20 @@ public class StoreFileInfo {
*/
public HDFSBlocksDistribution computeHDFSBlocksDistribution(final FileSystem fs)
throws IOException {
+ FileStatus status = getReferencedFileStatus(fs);
+ if (this.reference != null) {
+ return computeRefFileHDFSBlockDistribution(fs, reference, status);
+ } else {
+ return FSUtils.computeHDFSBlocksDistribution(fs, status, 0, status.getLen());
+ }
+ }
+
+ /**
+ * Get the {@link FileStatus} of the file referenced by this StoreFileInfo
+ * @param fs The current file system to use.
+ * @return The {@link FileStatus} of the file referenced by this StoreFileInfo
+ */
+ public FileStatus getReferencedFileStatus(final FileSystem fs) throws IOException {
FileStatus status;
if (this.reference != null) {
if (this.link != null) {
@@ -233,7 +247,6 @@ public class StoreFileInfo {
Path referencePath = getReferredToFile(this.getPath());
status = fs.getFileStatus(referencePath);
}
- return computeRefFileHDFSBlockDistribution(fs, reference, status);
} else {
if (this.link != null) {
// HFileLink
@@ -241,8 +254,8 @@ public class StoreFileInfo {
} else {
status = this.fileStatus;
}
- return FSUtils.computeHDFSBlocksDistribution(fs, status, 0, status.getLen());
}
+ return status;
}
/** @return The {@link Path} of the file */
Modified: hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/snapshot/RestoreSnapshotHelper.java
URL: http://svn.apache.org/viewvc/hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/snapshot/RestoreSnapshotHelper.java?rev=1596168&r1=1596167&r2=1596168&view=diff
==============================================================================
--- hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/snapshot/RestoreSnapshotHelper.java (original)
+++ hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/snapshot/RestoreSnapshotHelper.java Tue May 20 09:47:01 2014
@@ -19,6 +19,7 @@
package org.apache.hadoop.hbase.snapshot;
import java.io.IOException;
+import java.io.InterruptedIOException;
import java.io.InputStream;
import java.io.OutputStream;
import java.util.ArrayList;
@@ -30,6 +31,7 @@ import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.TreeMap;
+import java.util.concurrent.ThreadPoolExecutor;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
@@ -153,6 +155,15 @@ public class RestoreSnapshotHelper {
* @return the set of regions touched by the restore operation
*/
public RestoreMetaChanges restoreHdfsRegions() throws IOException {
+ ThreadPoolExecutor exec = SnapshotManifest.createExecutor(conf, "RestoreSnapshot");
+ try {
+ return restoreHdfsRegions(exec);
+ } finally {
+ exec.shutdown();
+ }
+ }
+
+ private RestoreMetaChanges restoreHdfsRegions(final ThreadPoolExecutor exec) throws IOException {
LOG.debug("starting restore");
Map<String, SnapshotRegionManifest> regionManifests = snapshotManifest.getRegionManifestsMap();
@@ -187,13 +198,13 @@ public class RestoreSnapshotHelper {
// Restore regions using the snapshot data
monitor.rethrowException();
status.setStatus("Restoring table regions...");
- restoreHdfsRegions(regionManifests, metaChanges.getRegionsToRestore());
+ restoreHdfsRegions(exec, regionManifests, metaChanges.getRegionsToRestore());
status.setStatus("Finished restoring all table regions.");
// Remove regions from the current table
monitor.rethrowException();
status.setStatus("Starting to delete excess regions from table");
- removeHdfsRegions(metaChanges.getRegionsToRemove());
+ removeHdfsRegions(exec, metaChanges.getRegionsToRemove());
status.setStatus("Finished deleting excess regions from table.");
}
@@ -210,7 +221,7 @@ public class RestoreSnapshotHelper {
// Create new regions cloning from the snapshot
monitor.rethrowException();
status.setStatus("Cloning regions...");
- HRegionInfo[] clonedRegions = cloneHdfsRegions(regionManifests, regionsToAdd);
+ HRegionInfo[] clonedRegions = cloneHdfsRegions(exec, regionManifests, regionsToAdd);
metaChanges.setNewRegions(clonedRegions);
status.setStatus("Finished cloning regions.");
}
@@ -345,23 +356,30 @@ public class RestoreSnapshotHelper {
/**
* Remove specified regions from the file-system, using the archiver.
*/
- private void removeHdfsRegions(final List<HRegionInfo> regions) throws IOException {
- if (regions != null && regions.size() > 0) {
- for (HRegionInfo hri: regions) {
+ private void removeHdfsRegions(final ThreadPoolExecutor exec, final List<HRegionInfo> regions)
+ throws IOException {
+ if (regions == null || regions.size() == 0) return;
+ ModifyRegionUtils.editRegions(exec, regions, new ModifyRegionUtils.RegionEditTask() {
+ @Override
+ public void editRegion(final HRegionInfo hri) throws IOException {
HFileArchiver.archiveRegion(conf, fs, hri);
}
- }
+ });
}
/**
* Restore specified regions by restoring content to the snapshot state.
*/
- private void restoreHdfsRegions(final Map<String, SnapshotRegionManifest> regionManifests,
+ private void restoreHdfsRegions(final ThreadPoolExecutor exec,
+ final Map<String, SnapshotRegionManifest> regionManifests,
final List<HRegionInfo> regions) throws IOException {
if (regions == null || regions.size() == 0) return;
- for (HRegionInfo hri: regions) {
- restoreRegion(hri, regionManifests.get(hri.getEncodedName()));
- }
+ ModifyRegionUtils.editRegions(exec, regions, new ModifyRegionUtils.RegionEditTask() {
+ @Override
+ public void editRegion(final HRegionInfo hri) throws IOException {
+ restoreRegion(hri, regionManifests.get(hri.getEncodedName()));
+ }
+ });
}
private Map<String, List<SnapshotRegionManifest.StoreFile>> getRegionHFileReferences(
@@ -465,7 +483,8 @@ public class RestoreSnapshotHelper {
* Clone specified regions. For each region create a new region
* and create a HFileLink for each hfile.
*/
- private HRegionInfo[] cloneHdfsRegions(final Map<String, SnapshotRegionManifest> regionManifests,
+ private HRegionInfo[] cloneHdfsRegions(final ThreadPoolExecutor exec,
+ final Map<String, SnapshotRegionManifest> regionManifests,
final List<HRegionInfo> regions) throws IOException {
if (regions == null || regions.size() == 0) return null;
@@ -490,7 +509,7 @@ public class RestoreSnapshotHelper {
}
// create the regions on disk
- ModifyRegionUtils.createRegions(conf, rootDir, tableDir,
+ ModifyRegionUtils.createRegions(exec, conf, rootDir, tableDir,
tableDesc, clonedRegionsInfo, new ModifyRegionUtils.RegionFillTask() {
@Override
public void fillRegion(final HRegion region) throws IOException {
Modified: hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/snapshot/SnapshotInfo.java
URL: http://svn.apache.org/viewvc/hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/snapshot/SnapshotInfo.java?rev=1596168&r1=1596167&r2=1596168&view=diff
==============================================================================
--- hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/snapshot/SnapshotInfo.java (original)
+++ hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/snapshot/SnapshotInfo.java Tue May 20 09:47:01 2014
@@ -25,6 +25,8 @@ import java.text.SimpleDateFormat;
import java.util.ArrayList;
import java.util.Date;
import java.util.List;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.atomic.AtomicLong;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
@@ -99,14 +101,14 @@ public final class SnapshotInfo extends
}
}
- private int hfileArchiveCount = 0;
- private int hfilesMissing = 0;
- private int hfilesCount = 0;
- private int logsMissing = 0;
- private int logsCount = 0;
- private long hfileArchiveSize = 0;
- private long hfileSize = 0;
- private long logSize = 0;
+ private AtomicInteger hfileArchiveCount = new AtomicInteger();
+ private AtomicInteger hfilesMissing = new AtomicInteger();
+ private AtomicInteger hfilesCount = new AtomicInteger();
+ private AtomicInteger logsMissing = new AtomicInteger();
+ private AtomicInteger logsCount = new AtomicInteger();
+ private AtomicLong hfileArchiveSize = new AtomicLong();
+ private AtomicLong hfileSize = new AtomicLong();
+ private AtomicLong logSize = new AtomicLong();
private final SnapshotDescription snapshot;
private final TableName snapshotTable;
@@ -128,57 +130,57 @@ public final class SnapshotInfo extends
/** @return true if the snapshot is corrupted */
public boolean isSnapshotCorrupted() {
- return hfilesMissing > 0 || logsMissing > 0;
+ return hfilesMissing.get() > 0 || logsMissing.get() > 0;
}
/** @return the number of available store files */
public int getStoreFilesCount() {
- return hfilesCount + hfileArchiveCount;
+ return hfilesCount.get() + hfileArchiveCount.get();
}
/** @return the number of available store files in the archive */
public int getArchivedStoreFilesCount() {
- return hfileArchiveCount;
+ return hfileArchiveCount.get();
}
/** @return the number of available log files */
public int getLogsCount() {
- return logsCount;
+ return logsCount.get();
}
/** @return the number of missing store files */
public int getMissingStoreFilesCount() {
- return hfilesMissing;
+ return hfilesMissing.get();
}
/** @return the number of missing log files */
public int getMissingLogsCount() {
- return logsMissing;
+ return logsMissing.get();
}
/** @return the total size of the store files referenced by the snapshot */
public long getStoreFilesSize() {
- return hfileSize + hfileArchiveSize;
+ return hfileSize.get() + hfileArchiveSize.get();
}
/** @return the total size of the store files shared */
public long getSharedStoreFilesSize() {
- return hfileSize;
+ return hfileSize.get();
}
/** @return the total size of the store files in the archive */
public long getArchivedStoreFileSize() {
- return hfileArchiveSize;
+ return hfileArchiveSize.get();
}
/** @return the percentage of the shared store files */
public float getSharedStoreFilePercentage() {
- return ((float)hfileSize / (hfileSize + hfileArchiveSize)) * 100;
+ return ((float)hfileSize.get() / (hfileSize.get() + hfileArchiveSize.get())) * 100;
}
/** @return the total log size */
public long getLogsSize() {
- return logSize;
+ return logSize.get();
}
/**
@@ -197,15 +199,15 @@ public final class SnapshotInfo extends
try {
if ((inArchive = fs.exists(link.getArchivePath()))) {
size = fs.getFileStatus(link.getArchivePath()).getLen();
- hfileArchiveSize += size;
- hfileArchiveCount++;
+ hfileArchiveSize.addAndGet(size);
+ hfileArchiveCount.incrementAndGet();
} else {
size = link.getFileStatus(fs).getLen();
- hfileSize += size;
- hfilesCount++;
+ hfileSize.addAndGet(size);
+ hfilesCount.incrementAndGet();
}
} catch (FileNotFoundException e) {
- hfilesMissing++;
+ hfilesMissing.incrementAndGet();
}
return new FileInfo(inArchive, size);
}
@@ -221,10 +223,10 @@ public final class SnapshotInfo extends
long size = -1;
try {
size = logLink.getFileStatus(fs).getLen();
- logSize += size;
- logsCount++;
+ logSize.addAndGet(size);
+ logsCount.incrementAndGet();
} catch (FileNotFoundException e) {
- logsMissing++;
+ logsMissing.incrementAndGet();
}
return new FileInfo(false, size);
}
@@ -368,8 +370,8 @@ public final class SnapshotInfo extends
final SnapshotDescription snapshotDesc = snapshotManifest.getSnapshotDescription();
final String table = snapshotDesc.getTable();
final SnapshotStats stats = new SnapshotStats(this.getConf(), this.fs, snapshotDesc);
- SnapshotReferenceUtil.visitReferencedFiles(getConf(), fs,
- snapshotManifest.getSnapshotDir(), snapshotDesc, new SnapshotReferenceUtil.SnapshotVisitor() {
+ SnapshotReferenceUtil.concurrentVisitReferencedFiles(getConf(), fs, snapshotManifest,
+ new SnapshotReferenceUtil.SnapshotVisitor() {
@Override
public void storeFile(final HRegionInfo regionInfo, final String family,
final SnapshotRegionManifest.StoreFile storeFile) throws IOException {
@@ -448,8 +450,9 @@ public final class SnapshotInfo extends
Path rootDir = FSUtils.getRootDir(conf);
FileSystem fs = FileSystem.get(rootDir.toUri(), conf);
Path snapshotDir = SnapshotDescriptionUtils.getCompletedSnapshotDir(snapshot, rootDir);
+ SnapshotManifest manifest = SnapshotManifest.open(conf, fs, snapshotDir, snapshot);
final SnapshotStats stats = new SnapshotStats(conf, fs, snapshot);
- SnapshotReferenceUtil.visitReferencedFiles(conf, fs, snapshotDir, snapshot,
+ SnapshotReferenceUtil.concurrentVisitReferencedFiles(conf, fs, manifest,
new SnapshotReferenceUtil.SnapshotVisitor() {
@Override
public void storeFile(final HRegionInfo regionInfo, final String family,
Modified: hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/snapshot/SnapshotManifest.java
URL: http://svn.apache.org/viewvc/hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/snapshot/SnapshotManifest.java?rev=1596168&r1=1596167&r2=1596168&view=diff
==============================================================================
--- hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/snapshot/SnapshotManifest.java (original)
+++ hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/snapshot/SnapshotManifest.java Tue May 20 09:47:01 2014
@@ -442,8 +442,8 @@ public class SnapshotManifest {
return createExecutor(conf, name);
}
- static ThreadPoolExecutor createExecutor(final Configuration conf, final String name) {
- int maxThreads = conf.getInt("hbase.snapshot.thread.pool.max", 4);
+ public static ThreadPoolExecutor createExecutor(final Configuration conf, final String name) {
+ int maxThreads = conf.getInt("hbase.snapshot.thread.pool.max", 8);
return Threads.getBoundedCachedThreadPool(maxThreads, 30L, TimeUnit.SECONDS,
Threads.getNamedThreadFactory(name));
}
Modified: hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/snapshot/SnapshotManifestV2.java
URL: http://svn.apache.org/viewvc/hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/snapshot/SnapshotManifestV2.java?rev=1596168&r1=1596167&r2=1596168&view=diff
==============================================================================
--- hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/snapshot/SnapshotManifestV2.java (original)
+++ hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/snapshot/SnapshotManifestV2.java Tue May 20 09:47:01 2014
@@ -103,14 +103,15 @@ public class SnapshotManifestV2 {
}
public void storeFile(final SnapshotRegionManifest.Builder region,
- final SnapshotRegionManifest.FamilyFiles.Builder family, final StoreFileInfo storeFile) {
+ final SnapshotRegionManifest.FamilyFiles.Builder family, final StoreFileInfo storeFile)
+ throws IOException {
SnapshotRegionManifest.StoreFile.Builder sfManifest =
SnapshotRegionManifest.StoreFile.newBuilder();
sfManifest.setName(storeFile.getPath().getName());
if (storeFile.isReference()) {
sfManifest.setReference(storeFile.getReference().convert());
}
- sfManifest.setFileSize(storeFile.getFileStatus().getLen());
+ sfManifest.setFileSize(storeFile.getReferencedFileStatus(fs).getLen());
family.addStoreFiles(sfManifest.build());
}
}
Modified: hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/snapshot/SnapshotReferenceUtil.java
URL: http://svn.apache.org/viewvc/hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/snapshot/SnapshotReferenceUtil.java?rev=1596168&r1=1596167&r2=1596168&view=diff
==============================================================================
--- hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/snapshot/SnapshotReferenceUtil.java (original)
+++ hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/snapshot/SnapshotReferenceUtil.java Tue May 20 09:47:01 2014
@@ -18,6 +18,7 @@
package org.apache.hadoop.hbase.snapshot;
+import java.io.FileNotFoundException;
import java.io.IOException;
import java.io.InterruptedIOException;
import java.util.HashSet;
@@ -32,6 +33,7 @@ import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.HRegionInfo;
@@ -107,7 +109,7 @@ public final class SnapshotReferenceUtil
visitLogFiles(fs, snapshotDir, visitor);
}
- /**
+ /**©
* Iterate over the snapshot store files
*
* @param conf The current {@link Configuration} instance.
@@ -117,7 +119,7 @@ public final class SnapshotReferenceUtil
* @param visitor callback object to get the store files
* @throws IOException if an error occurred while scanning the directory
*/
- public static void visitTableStoreFiles(final Configuration conf, final FileSystem fs,
+ static void visitTableStoreFiles(final Configuration conf, final FileSystem fs,
final Path snapshotDir, final SnapshotDescription desc, final StoreFileVisitor visitor)
throws IOException {
SnapshotManifest manifest = SnapshotManifest.open(conf, fs, snapshotDir, desc);
@@ -139,7 +141,7 @@ public final class SnapshotReferenceUtil
* @param visitor callback object to get the store files
* @throws IOException if an error occurred while scanning the directory
*/
- public static void visitRegionStoreFiles(final SnapshotRegionManifest manifest,
+ static void visitRegionStoreFiles(final SnapshotRegionManifest manifest,
final StoreFileVisitor visitor) throws IOException {
HRegionInfo regionInfo = HRegionInfo.convert(manifest.getRegionInfo());
for (SnapshotRegionManifest.FamilyFiles familyFiles: manifest.getFamilyFilesList()) {
@@ -192,6 +194,19 @@ public final class SnapshotReferenceUtil
final SnapshotManifest manifest) throws IOException {
final SnapshotDescription snapshotDesc = manifest.getSnapshotDescription();
final Path snapshotDir = manifest.getSnapshotDir();
+ concurrentVisitReferencedFiles(conf, fs, manifest, new StoreFileVisitor() {
+ @Override
+ public void storeFile(final HRegionInfo regionInfo, final String family,
+ final SnapshotRegionManifest.StoreFile storeFile) throws IOException {
+ verifyStoreFile(conf, fs, snapshotDir, snapshotDesc, regionInfo, family, storeFile);
+ }
+ });
+ }
+
+ public static void concurrentVisitReferencedFiles(final Configuration conf, final FileSystem fs,
+ final SnapshotManifest manifest, final StoreFileVisitor visitor) throws IOException {
+ final SnapshotDescription snapshotDesc = manifest.getSnapshotDescription();
+ final Path snapshotDir = manifest.getSnapshotDir();
List<SnapshotRegionManifest> regionManifests = manifest.getRegionManifests();
if (regionManifests == null || regionManifests.size() == 0) {
@@ -207,13 +222,7 @@ public final class SnapshotReferenceUtil
completionService.submit(new Callable<Void>() {
@Override
public Void call() throws IOException {
- visitRegionStoreFiles(regionManifest, new StoreFileVisitor() {
- @Override
- public void storeFile(final HRegionInfo regionInfo, final String family,
- final SnapshotRegionManifest.StoreFile storeFile) throws IOException {
- verifyStoreFile(conf, fs, snapshotDir, snapshotDesc, regionInfo, family, storeFile);
- }
- });
+ visitRegionStoreFiles(regionManifest, visitor);
return null;
}
});
@@ -251,19 +260,28 @@ public final class SnapshotReferenceUtil
* @throws CorruptedSnapshotException if the snapshot is corrupted
* @throws IOException if an error occurred while scanning the directory
*/
- public static void verifyStoreFile(final Configuration conf, final FileSystem fs,
+ private static void verifyStoreFile(final Configuration conf, final FileSystem fs,
final Path snapshotDir, final SnapshotDescription snapshot, final HRegionInfo regionInfo,
final String family, final SnapshotRegionManifest.StoreFile storeFile) throws IOException {
+ TableName table = TableName.valueOf(snapshot.getTable());
String fileName = storeFile.getName();
Path refPath = null;
if (StoreFileInfo.isReference(fileName)) {
// If is a reference file check if the parent file is present in the snapshot
- Path snapshotHFilePath = new Path(new Path(
- new Path(snapshotDir, regionInfo.getEncodedName()), family), fileName);
- refPath = StoreFileInfo.getReferredToFile(snapshotHFilePath);
- if (!fs.exists(refPath)) {
- throw new CorruptedSnapshotException("Missing parent hfile for: " + fileName, snapshot);
+ refPath = new Path(new Path(regionInfo.getEncodedName(), family), fileName);
+ refPath = StoreFileInfo.getReferredToFile(refPath);
+ String refRegion = refPath.getParent().getParent().getName();
+ refPath = HFileLink.createPath(table, refRegion, family, refPath.getName());
+ if (!new HFileLink(conf, refPath).exists(fs)) {
+ throw new CorruptedSnapshotException("Missing parent hfile for: " + fileName +
+ " path=" + refPath, snapshot);
+ }
+
+ if (storeFile.hasReference()) {
+ // We don't really need to look for the file on-disk
+ // we already have the Reference information embedded here.
+ return;
}
}
@@ -274,15 +292,25 @@ public final class SnapshotReferenceUtil
linkPath = new Path(family, fileName);
} else {
linkPath = new Path(family, HFileLink.createHFileLinkName(
- TableName.valueOf(snapshot.getTable()), regionInfo.getEncodedName(), fileName));
+ table, regionInfo.getEncodedName(), fileName));
}
// check if the linked file exists (in the archive, or in the table dir)
HFileLink link = new HFileLink(conf, linkPath);
- if (!link.exists(fs)) {
- throw new CorruptedSnapshotException("Can't find hfile: " + fileName
- + " in the real (" + link.getOriginPath() + ") or archive (" + link.getArchivePath()
- + ") directory for the primary table.", snapshot);
+ try {
+ FileStatus fstat = link.getFileStatus(fs);
+ if (storeFile.hasFileSize() && storeFile.getFileSize() != fstat.getLen()) {
+ String msg = "hfile: " + fileName + " size does not match with the expected one. " +
+ " found=" + fstat.getLen() + " expected=" + storeFile.getFileSize();
+ LOG.error(msg);
+ throw new CorruptedSnapshotException(msg, snapshot);
+ }
+ } catch (FileNotFoundException e) {
+ String msg = "Can't find hfile: " + fileName + " in the real (" +
+ link.getOriginPath() + ") or archive (" + link.getArchivePath()
+ + ") directory for the primary table.";
+ LOG.error(msg);
+ throw new CorruptedSnapshotException(msg, snapshot);
}
}
Modified: hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/util/ModifyRegionUtils.java
URL: http://svn.apache.org/viewvc/hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/util/ModifyRegionUtils.java?rev=1596168&r1=1596167&r2=1596168&view=diff
==============================================================================
--- hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/util/ModifyRegionUtils.java (original)
+++ hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/util/ModifyRegionUtils.java Tue May 20 09:47:01 2014
@@ -22,6 +22,7 @@ package org.apache.hadoop.hbase.util;
import java.io.IOException;
import java.io.InterruptedIOException;
import java.util.ArrayList;
+import java.util.Collection;
import java.util.List;
import java.util.concurrent.Callable;
import java.util.concurrent.CompletionService;
@@ -56,6 +57,10 @@ public abstract class ModifyRegionUtils
void fillRegion(final HRegion region) throws IOException;
}
+ public interface RegionEditTask {
+ void editRegion(final HRegionInfo region) throws IOException;
+ }
+
/**
* Create new set of regions on the specified file-system.
* NOTE: that you should add the regions to hbase:meta after this operation.
@@ -107,10 +112,36 @@ public abstract class ModifyRegionUtils
final RegionFillTask task) throws IOException {
if (newRegions == null) return null;
int regionNumber = newRegions.length;
- ThreadPoolExecutor regionOpenAndInitThreadPool = getRegionOpenAndInitThreadPool(conf,
+ ThreadPoolExecutor exec = getRegionOpenAndInitThreadPool(conf,
"RegionOpenAndInitThread-" + hTableDescriptor.getTableName(), regionNumber);
- CompletionService<HRegionInfo> completionService = new ExecutorCompletionService<HRegionInfo>(
- regionOpenAndInitThreadPool);
+ try {
+ return createRegions(exec, conf, rootDir, tableDir, hTableDescriptor, newRegions, task);
+ } finally {
+ exec.shutdownNow();
+ }
+ }
+
+ /**
+ * Create new set of regions on the specified file-system.
+ * NOTE: that you should add the regions to hbase:meta after this operation.
+ *
+ * @param exec Thread Pool Executor
+ * @param conf {@link Configuration}
+ * @param rootDir Root directory for HBase instance
+ * @param tableDir table directory
+ * @param hTableDescriptor description of the table
+ * @param newRegions {@link HRegionInfo} that describes the regions to create
+ * @param task {@link RegionFillTask} custom code to populate region after creation
+ * @throws IOException
+ */
+ public static List<HRegionInfo> createRegions(final ThreadPoolExecutor exec,
+ final Configuration conf, final Path rootDir, final Path tableDir,
+ final HTableDescriptor hTableDescriptor, final HRegionInfo[] newRegions,
+ final RegionFillTask task) throws IOException {
+ if (newRegions == null) return null;
+ int regionNumber = newRegions.length;
+ CompletionService<HRegionInfo> completionService =
+ new ExecutorCompletionService<HRegionInfo>(exec);
List<HRegionInfo> regionInfos = new ArrayList<HRegionInfo>();
for (final HRegionInfo newRegion : newRegions) {
completionService.submit(new Callable<HRegionInfo>() {
@@ -132,8 +163,6 @@ public abstract class ModifyRegionUtils
throw new InterruptedIOException(e.getMessage());
} catch (ExecutionException e) {
throw new IOException(e);
- } finally {
- regionOpenAndInitThreadPool.shutdownNow();
}
return regionInfos;
}
@@ -167,6 +196,41 @@ public abstract class ModifyRegionUtils
return region.getRegionInfo();
}
+ /**
+ * Execute the task on the specified set of regions.
+ *
+ * @param exec Thread Pool Executor
+ * @param regions {@link HRegionInfo} that describes the regions to edit
+ * @param task {@link RegionFillTask} custom code to edit the region
+ * @throws IOException
+ */
+ public static void editRegions(final ThreadPoolExecutor exec,
+ final Collection<HRegionInfo> regions, final RegionEditTask task) throws IOException {
+ final ExecutorCompletionService<Void> completionService =
+ new ExecutorCompletionService<Void>(exec);
+ for (final HRegionInfo hri: regions) {
+ completionService.submit(new Callable<Void>() {
+ @Override
+ public Void call() throws IOException {
+ task.editRegion(hri);
+ return null;
+ }
+ });
+ }
+
+ try {
+ for (HRegionInfo hri: regions) {
+ completionService.take().get();
+ }
+ } catch (InterruptedException e) {
+ throw new InterruptedIOException(e.getMessage());
+ } catch (ExecutionException e) {
+ IOException ex = new IOException();
+ ex.initCause(e.getCause());
+ throw ex;
+ }
+ }
+
/*
* used by createRegions() to get the thread pool executor based on the
* "hbase.hregion.open.and.init.threads.max" property.