You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hbase.apache.org by sy...@apache.org on 2017/01/07 08:04:52 UTC
[10/16] hbase git commit: HBASE-15172 Support setting storage policy
in bulkload
HBASE-15172 Support setting storage policy in bulkload
Project: http://git-wip-us.apache.org/repos/asf/hbase/repo
Commit: http://git-wip-us.apache.org/repos/asf/hbase/commit/629b04f4
Tree: http://git-wip-us.apache.org/repos/asf/hbase/tree/629b04f4
Diff: http://git-wip-us.apache.org/repos/asf/hbase/diff/629b04f4
Branch: refs/heads/hbase-12439
Commit: 629b04f44f19b9589c9bcfb84da0cf5e0d4d1f18
Parents: e02ae77
Author: Yu Li <li...@apache.org>
Authored: Fri Jan 6 18:35:38 2017 +0800
Committer: Yu Li <li...@apache.org>
Committed: Fri Jan 6 18:35:38 2017 +0800
----------------------------------------------------------------------
.../hbase/mapreduce/HFileOutputFormat2.java | 31 ++++++++-
.../hbase/mapreduce/TestHFileOutputFormat2.java | 70 ++++++++++++++++++++
2 files changed, 100 insertions(+), 1 deletion(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/hbase/blob/629b04f4/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/HFileOutputFormat2.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/HFileOutputFormat2.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/HFileOutputFormat2.java
index 22a73c9..6987bf7 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/HFileOutputFormat2.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/HFileOutputFormat2.java
@@ -68,6 +68,7 @@ import org.apache.hadoop.hbase.regionserver.StoreFile;
import org.apache.hadoop.hbase.regionserver.StoreFileWriter;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
+import org.apache.hadoop.hdfs.DistributedFileSystem;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.SequenceFile;
import org.apache.hadoop.io.Text;
@@ -125,6 +126,9 @@ public class HFileOutputFormat2
private static final String OUTPUT_TABLE_NAME_CONF_KEY =
"hbase.mapreduce.hfileoutputformat.table.name";
+ public static final String STORAGE_POLICY_PROPERTY = "hbase.hstore.storagepolicy";
+ public static final String STORAGE_POLICY_PROPERTY_CF_PREFIX = STORAGE_POLICY_PROPERTY + ".";
+
@Override
public RecordWriter<ImmutableBytesWritable, Cell> getRecordWriter(
final TaskAttemptContext context) throws IOException, InterruptedException {
@@ -230,7 +234,9 @@ public class HFileOutputFormat2
// If this is a new column family, verify that the directory exists
if (wl == null) {
- fs.mkdirs(new Path(outputDir, Bytes.toString(family)));
+ Path cfPath = new Path(outputDir, Bytes.toString(family));
+ fs.mkdirs(cfPath);
+ configureStoragePolicy(conf, fs, family, cfPath);
}
// If any of the HFiles for the column families has reached
@@ -382,6 +388,29 @@ public class HFileOutputFormat2
}
}
+ /**
+ * Configure block storage policy for CF after the directory is created.
+ */
+ static void configureStoragePolicy(final Configuration conf, final FileSystem fs,
+ byte[] family, Path cfPath) {
+ if (null == conf || null == fs || null == family || null == cfPath) {
+ return;
+ }
+
+ String policy =
+ conf.get(STORAGE_POLICY_PROPERTY_CF_PREFIX + Bytes.toString(family),
+ conf.get(STORAGE_POLICY_PROPERTY));
+ if (null != policy && !policy.trim().isEmpty()) {
+ try {
+ if (fs instanceof DistributedFileSystem) {
+ ((DistributedFileSystem) fs).setStoragePolicy(cfPath, policy.trim());
+ }
+ } catch (Throwable e) {
+ LOG.warn("failed to set block storage policy of [" + cfPath + "] to [" + policy + "]", e);
+ }
+ }
+ }
+
/*
* Data structure to hold a Writer and amount of data written on it.
*/
http://git-wip-us.apache.org/repos/asf/hbase/blob/629b04f4/hbase-server/src/test/java/org/apache/hadoop/hbase/mapreduce/TestHFileOutputFormat2.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/mapreduce/TestHFileOutputFormat2.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/mapreduce/TestHFileOutputFormat2.java
index 486c961..21a39d4 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/mapreduce/TestHFileOutputFormat2.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/mapreduce/TestHFileOutputFormat2.java
@@ -22,6 +22,7 @@ import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertNotSame;
+import static org.junit.Assert.assertNull;
import static org.junit.Assert.assertTrue;
import static org.junit.Assert.fail;
@@ -92,6 +93,10 @@ import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hbase.util.FSUtils;
import org.apache.hadoop.hbase.util.Threads;
import org.apache.hadoop.hbase.util.Writables;
+import org.apache.hadoop.hdfs.DistributedFileSystem;
+import org.apache.hadoop.hdfs.protocol.BlockStoragePolicy;
+import org.apache.hadoop.hdfs.protocol.HdfsFileStatus;
+import org.apache.hadoop.hdfs.server.blockmanagement.BlockStoragePolicySuite;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
@@ -1292,5 +1297,70 @@ public class TestHFileOutputFormat2 {
}
}
+ @Test
+ public void testBlockStoragePolicy() throws Exception {
+ util = new HBaseTestingUtility();
+ Configuration conf = util.getConfiguration();
+ conf.set(HFileOutputFormat2.STORAGE_POLICY_PROPERTY, "ALL_SSD");
+ conf.set(HFileOutputFormat2.STORAGE_POLICY_PROPERTY_CF_PREFIX + Bytes.toString(FAMILIES[0]),
+ "ONE_SSD");
+ Path cf1Dir = new Path(util.getDataTestDir(), Bytes.toString(FAMILIES[0]));
+ Path cf2Dir = new Path(util.getDataTestDir(), Bytes.toString(FAMILIES[1]));
+ util.startMiniDFSCluster(3);
+ FileSystem fs = util.getDFSCluster().getFileSystem();
+ try {
+ fs.mkdirs(cf1Dir);
+ fs.mkdirs(cf2Dir);
+
+ // the original block storage policy would be NULL
+ String spA = getStoragePolicyName(fs, cf1Dir);
+ String spB = getStoragePolicyName(fs, cf2Dir);
+ LOG.debug("Storage policy of cf 0: [" + spA + "].");
+ LOG.debug("Storage policy of cf 1: [" + spB + "].");
+ assertNull(spA);
+ assertNull(spB);
+
+ // alter table cf schema to change storage policies
+ HFileOutputFormat2.configureStoragePolicy(conf, fs, FAMILIES[0], cf1Dir);
+ HFileOutputFormat2.configureStoragePolicy(conf, fs, FAMILIES[1], cf2Dir);
+ spA = getStoragePolicyName(fs, cf1Dir);
+ spB = getStoragePolicyName(fs, cf2Dir);
+ LOG.debug("Storage policy of cf 0: [" + spA + "].");
+ LOG.debug("Storage policy of cf 1: [" + spB + "].");
+ assertNotNull(spA);
+ assertEquals("ONE_SSD", spA);
+ assertNotNull(spB);
+ assertEquals("ALL_SSD", spB);
+ } finally {
+ fs.delete(cf1Dir, true);
+ fs.delete(cf2Dir, true);
+ util.shutdownMiniDFSCluster();
+ }
+ }
+
+ private String getStoragePolicyName(FileSystem fs, Path path) {
+ try {
+ if (fs instanceof DistributedFileSystem) {
+ DistributedFileSystem dfs = (DistributedFileSystem) fs;
+ HdfsFileStatus status = dfs.getClient().getFileInfo(path.toUri().getPath());
+ if (null != status) {
+ byte storagePolicyId = status.getStoragePolicy();
+ if (storagePolicyId != BlockStoragePolicySuite.ID_UNSPECIFIED) {
+ BlockStoragePolicy[] policies = dfs.getStoragePolicies();
+ for (BlockStoragePolicy policy : policies) {
+ if (policy.getId() == storagePolicyId) {
+ return policy.getName();
+ }
+ }
+ }
+ }
+ }
+ } catch (Throwable e) {
+ LOG.warn("failed to get block storage policy of [" + path + "]", e);
+ }
+
+ return null;
+ }
+
}