You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hbase.apache.org by sy...@apache.org on 2017/01/07 08:04:52 UTC

[10/16] hbase git commit: HBASE-15172 Support setting storage policy in bulkload

HBASE-15172 Support setting storage policy in bulkload


Project: http://git-wip-us.apache.org/repos/asf/hbase/repo
Commit: http://git-wip-us.apache.org/repos/asf/hbase/commit/629b04f4
Tree: http://git-wip-us.apache.org/repos/asf/hbase/tree/629b04f4
Diff: http://git-wip-us.apache.org/repos/asf/hbase/diff/629b04f4

Branch: refs/heads/hbase-12439
Commit: 629b04f44f19b9589c9bcfb84da0cf5e0d4d1f18
Parents: e02ae77
Author: Yu Li <li...@apache.org>
Authored: Fri Jan 6 18:35:38 2017 +0800
Committer: Yu Li <li...@apache.org>
Committed: Fri Jan 6 18:35:38 2017 +0800

----------------------------------------------------------------------
 .../hbase/mapreduce/HFileOutputFormat2.java     | 31 ++++++++-
 .../hbase/mapreduce/TestHFileOutputFormat2.java | 70 ++++++++++++++++++++
 2 files changed, 100 insertions(+), 1 deletion(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hbase/blob/629b04f4/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/HFileOutputFormat2.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/HFileOutputFormat2.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/HFileOutputFormat2.java
index 22a73c9..6987bf7 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/HFileOutputFormat2.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/HFileOutputFormat2.java
@@ -68,6 +68,7 @@ import org.apache.hadoop.hbase.regionserver.StoreFile;
 import org.apache.hadoop.hbase.regionserver.StoreFileWriter;
 import org.apache.hadoop.hbase.util.Bytes;
 import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
+import org.apache.hadoop.hdfs.DistributedFileSystem;
 import org.apache.hadoop.io.NullWritable;
 import org.apache.hadoop.io.SequenceFile;
 import org.apache.hadoop.io.Text;
@@ -125,6 +126,9 @@ public class HFileOutputFormat2
   private static final String OUTPUT_TABLE_NAME_CONF_KEY =
       "hbase.mapreduce.hfileoutputformat.table.name";
 
+  public static final String STORAGE_POLICY_PROPERTY = "hbase.hstore.storagepolicy";
+  public static final String STORAGE_POLICY_PROPERTY_CF_PREFIX = STORAGE_POLICY_PROPERTY + ".";
+
   @Override
   public RecordWriter<ImmutableBytesWritable, Cell> getRecordWriter(
       final TaskAttemptContext context) throws IOException, InterruptedException {
@@ -230,7 +234,9 @@ public class HFileOutputFormat2
 
       // If this is a new column family, verify that the directory exists
       if (wl == null) {
-        fs.mkdirs(new Path(outputDir, Bytes.toString(family)));
+        Path cfPath = new Path(outputDir, Bytes.toString(family));
+        fs.mkdirs(cfPath);
+        configureStoragePolicy(conf, fs, family, cfPath);
       }
 
       // If any of the HFiles for the column families has reached
@@ -382,6 +388,29 @@ public class HFileOutputFormat2
     }
   }
 
+  /**
+   * Configure block storage policy for CF after the directory is created.
+   */
+  static void configureStoragePolicy(final Configuration conf, final FileSystem fs,
+      byte[] family, Path cfPath) {
+    if (null == conf || null == fs || null == family || null == cfPath) {
+      return;
+    }
+
+    String policy =
+        conf.get(STORAGE_POLICY_PROPERTY_CF_PREFIX + Bytes.toString(family),
+          conf.get(STORAGE_POLICY_PROPERTY));
+    if (null != policy && !policy.trim().isEmpty()) {
+      try {
+        if (fs instanceof DistributedFileSystem) {
+          ((DistributedFileSystem) fs).setStoragePolicy(cfPath, policy.trim());
+        }
+      } catch (Throwable e) {
+        LOG.warn("failed to set block storage policy of [" + cfPath + "] to [" + policy + "]", e);
+      }
+    }
+  }
+
   /*
    * Data structure to hold a Writer and amount of data written on it.
    */

http://git-wip-us.apache.org/repos/asf/hbase/blob/629b04f4/hbase-server/src/test/java/org/apache/hadoop/hbase/mapreduce/TestHFileOutputFormat2.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/mapreduce/TestHFileOutputFormat2.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/mapreduce/TestHFileOutputFormat2.java
index 486c961..21a39d4 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/mapreduce/TestHFileOutputFormat2.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/mapreduce/TestHFileOutputFormat2.java
@@ -22,6 +22,7 @@ import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertFalse;
 import static org.junit.Assert.assertNotNull;
 import static org.junit.Assert.assertNotSame;
+import static org.junit.Assert.assertNull;
 import static org.junit.Assert.assertTrue;
 import static org.junit.Assert.fail;
 
@@ -92,6 +93,10 @@ import org.apache.hadoop.hbase.util.Bytes;
 import org.apache.hadoop.hbase.util.FSUtils;
 import org.apache.hadoop.hbase.util.Threads;
 import org.apache.hadoop.hbase.util.Writables;
+import org.apache.hadoop.hdfs.DistributedFileSystem;
+import org.apache.hadoop.hdfs.protocol.BlockStoragePolicy;
+import org.apache.hadoop.hdfs.protocol.HdfsFileStatus;
+import org.apache.hadoop.hdfs.server.blockmanagement.BlockStoragePolicySuite;
 import org.apache.hadoop.io.NullWritable;
 import org.apache.hadoop.mapreduce.Job;
 import org.apache.hadoop.mapreduce.Mapper;
@@ -1292,5 +1297,70 @@ public class TestHFileOutputFormat2  {
     }
   }
 
+  @Test
+  public void testBlockStoragePolicy() throws Exception {
+    util = new HBaseTestingUtility();
+    Configuration conf = util.getConfiguration();
+    conf.set(HFileOutputFormat2.STORAGE_POLICY_PROPERTY, "ALL_SSD");
+    conf.set(HFileOutputFormat2.STORAGE_POLICY_PROPERTY_CF_PREFIX + Bytes.toString(FAMILIES[0]),
+      "ONE_SSD");
+    Path cf1Dir = new Path(util.getDataTestDir(), Bytes.toString(FAMILIES[0]));
+    Path cf2Dir = new Path(util.getDataTestDir(), Bytes.toString(FAMILIES[1]));
+    util.startMiniDFSCluster(3);
+    FileSystem fs = util.getDFSCluster().getFileSystem();
+    try {
+      fs.mkdirs(cf1Dir);
+      fs.mkdirs(cf2Dir);
+
+      // the original block storage policy would be NULL
+      String spA = getStoragePolicyName(fs, cf1Dir);
+      String spB = getStoragePolicyName(fs, cf2Dir);
+      LOG.debug("Storage policy of cf 0: [" + spA + "].");
+      LOG.debug("Storage policy of cf 1: [" + spB + "].");
+      assertNull(spA);
+      assertNull(spB);
+
+      // alter table cf schema to change storage policies
+      HFileOutputFormat2.configureStoragePolicy(conf, fs, FAMILIES[0], cf1Dir);
+      HFileOutputFormat2.configureStoragePolicy(conf, fs, FAMILIES[1], cf2Dir);
+      spA = getStoragePolicyName(fs, cf1Dir);
+      spB = getStoragePolicyName(fs, cf2Dir);
+      LOG.debug("Storage policy of cf 0: [" + spA + "].");
+      LOG.debug("Storage policy of cf 1: [" + spB + "].");
+      assertNotNull(spA);
+      assertEquals("ONE_SSD", spA);
+      assertNotNull(spB);
+      assertEquals("ALL_SSD", spB);
+    } finally {
+      fs.delete(cf1Dir, true);
+      fs.delete(cf2Dir, true);
+      util.shutdownMiniDFSCluster();
+    }
+  }
+
+  private String getStoragePolicyName(FileSystem fs, Path path) {
+    try {
+      if (fs instanceof DistributedFileSystem) {
+        DistributedFileSystem dfs = (DistributedFileSystem) fs;
+        HdfsFileStatus status = dfs.getClient().getFileInfo(path.toUri().getPath());
+        if (null != status) {
+          byte storagePolicyId = status.getStoragePolicy();
+          if (storagePolicyId != BlockStoragePolicySuite.ID_UNSPECIFIED) {
+            BlockStoragePolicy[] policies = dfs.getStoragePolicies();
+            for (BlockStoragePolicy policy : policies) {
+              if (policy.getId() == storagePolicyId) {
+                return policy.getName();
+              }
+            }
+          }
+        }
+      }
+    } catch (Throwable e) {
+      LOG.warn("failed to get block storage policy of [" + path + "]", e);
+    }
+
+    return null;
+  }
+
 }