You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hbase.apache.org by ap...@apache.org on 2018/02/02 01:10:46 UTC

hbase git commit: HBASE-19858 Backport HBASE-14061 (Support CF-level Storage Policy) to branch-1

Repository: hbase
Updated Branches:
  refs/heads/branch-1 1a930cff6 -> f35bcd2fe


HBASE-19858 Backport HBASE-14061 (Support CF-level Storage Policy) to branch-1

HBASE-14061 Support CF-level Storage Policy
HBASE-14061 Support CF-level Storage Policy (addendum)
HBASE-14061 Support CF-level Storage Policy (addendum2)
HBASE-15172 Support setting storage policy in bulkload
HBASE-17538 HDFS.setStoragePolicy() logs errors on local fs
HBASE-18015 Storage class aware block placement for procedure v2 WALs
HBASE-18017 Reduce frequency of setStoragePolicy failure warnings
Default storage policy if not configured cannot be "NONE"
HBASE-19016 Coordinate storage policy property name for table schema and bulkload
Fix checkstyle warnings
Addressed additional review feedback on backport


Project: http://git-wip-us.apache.org/repos/asf/hbase/repo
Commit: http://git-wip-us.apache.org/repos/asf/hbase/commit/f35bcd2f
Tree: http://git-wip-us.apache.org/repos/asf/hbase/tree/f35bcd2f
Diff: http://git-wip-us.apache.org/repos/asf/hbase/diff/f35bcd2f

Branch: refs/heads/branch-1
Commit: f35bcd2fe3e27d2ff211c731a7300d80c658bcb3
Parents: 1a930cf
Author: Andrew Purtell <ap...@apache.org>
Authored: Thu Feb 1 16:49:02 2018 -0800
Committer: Andrew Purtell <ap...@apache.org>
Committed: Thu Feb 1 17:10:39 2018 -0800

----------------------------------------------------------------------
 .../apache/hadoop/hbase/HColumnDescriptor.java  |  22 +++
 .../org/apache/hadoop/hbase/HConstants.java     |  10 +-
 .../hadoop/hbase/util/ReflectionUtils.java      |  34 +++++
 .../org/apache/hadoop/hbase/fs/HFileSystem.java |  77 +++++++++-
 .../hbase/mapreduce/HFileOutputFormat2.java     |  23 ++-
 .../org/apache/hadoop/hbase/master/HMaster.java |  15 +-
 .../hbase/regionserver/HRegionFileSystem.java   |  30 ++++
 .../hadoop/hbase/regionserver/HStore.java       |  13 +-
 .../hadoop/hbase/regionserver/StoreFile.java    |  10 ++
 .../org/apache/hadoop/hbase/util/FSUtils.java   | 153 +++++++++++++------
 .../hbase/regionserver/TestCompaction.java      |   7 +-
 .../hadoop/hbase/regionserver/TestHRegion.java  |   2 +-
 .../regionserver/TestHRegionFileSystem.java     | 130 ++++++++++++++++
 hbase-shell/src/main/ruby/hbase/admin.rb        |   4 +
 14 files changed, 470 insertions(+), 60 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hbase/blob/f35bcd2f/hbase-client/src/main/java/org/apache/hadoop/hbase/HColumnDescriptor.java
----------------------------------------------------------------------
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/HColumnDescriptor.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/HColumnDescriptor.java
index 560b74b..4a6550b 100644
--- a/hbase-client/src/main/java/org/apache/hadoop/hbase/HColumnDescriptor.java
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/HColumnDescriptor.java
@@ -127,6 +127,8 @@ public class HColumnDescriptor implements WritableComparable<HColumnDescriptor>
   public static final String DFS_REPLICATION = "DFS_REPLICATION";
   public static final short DEFAULT_DFS_REPLICATION = 0;
 
+  public static final String STORAGE_POLICY = "STORAGE_POLICY";
+
   /**
    * Default compression type.
    */
@@ -1567,4 +1569,24 @@ public class HColumnDescriptor implements WritableComparable<HColumnDescriptor>
     setValue(DFS_REPLICATION, Short.toString(replication));
     return this;
   }
+
+  /**
+   * Return the storage policy in use by this family
+   * <p/>
+   * Not using {@code enum} here because HDFS is not using {@code enum} for storage policy, see
+   * org.apache.hadoop.hdfs.server.blockmanagement.BlockStoragePolicySuite for more details
+   */
+  public String getStoragePolicy() {
+    return getValue(STORAGE_POLICY);
+  }
+
+  /**
+   * Set the storage policy for use with this family
+   * @param policy the policy to set, valid setting includes: <i>"LAZY_PERSIST"</i>,
+   *          <i>"ALL_SSD"</i>, <i>"ONE_SSD"</i>, <i>"HOT"</i>, <i>"WARM"</i>, <i>"COLD"</i>
+   */
+  public HColumnDescriptor setStoragePolicy(String policy) {
+    setValue(STORAGE_POLICY, policy);
+    return this;
+  }
 }

http://git-wip-us.apache.org/repos/asf/hbase/blob/f35bcd2f/hbase-common/src/main/java/org/apache/hadoop/hbase/HConstants.java
----------------------------------------------------------------------
diff --git a/hbase-common/src/main/java/org/apache/hadoop/hbase/HConstants.java b/hbase-common/src/main/java/org/apache/hadoop/hbase/HConstants.java
index c9f9ded..63dade1 100644
--- a/hbase-common/src/main/java/org/apache/hadoop/hbase/HConstants.java
+++ b/hbase-common/src/main/java/org/apache/hadoop/hbase/HConstants.java
@@ -1086,14 +1086,10 @@ public final class HConstants {
     "hbase.regionserver.wal.enablecompression";
 
   /** Configuration name of WAL storage policy
-   * Valid values are:
-   *  NONE: no preference in destination of block replicas
-   *  ONE_SSD: place only one block replica in SSD and the remaining in default storage
-   *  and ALL_SSD: place all block replicas on SSD
-   *
-   * See http://hadoop.apache.org/docs/r2.6.0/hadoop-project-dist/hadoop-hdfs/ArchivalStorage.html*/
+   * Valid values are: HOT, COLD, WARM, ALL_SSD, ONE_SSD, LAZY_PERSIST
+   * See http://hadoop.apache.org/docs/r2.7.3/hadoop-project-dist/hadoop-hdfs/ArchivalStorage.html*/
   public static final String WAL_STORAGE_POLICY = "hbase.wal.storage.policy";
-  public static final String DEFAULT_WAL_STORAGE_POLICY = "NONE";
+  public static final String DEFAULT_WAL_STORAGE_POLICY = "HOT";
 
   /** Region in Transition metrics threshold time */
   public static final String METRICS_RIT_STUCK_WARNING_THRESHOLD =

http://git-wip-us.apache.org/repos/asf/hbase/blob/f35bcd2f/hbase-common/src/main/java/org/apache/hadoop/hbase/util/ReflectionUtils.java
----------------------------------------------------------------------
diff --git a/hbase-common/src/main/java/org/apache/hadoop/hbase/util/ReflectionUtils.java b/hbase-common/src/main/java/org/apache/hadoop/hbase/util/ReflectionUtils.java
index 15b3930..4b50a2f 100644
--- a/hbase-common/src/main/java/org/apache/hadoop/hbase/util/ReflectionUtils.java
+++ b/hbase-common/src/main/java/org/apache/hadoop/hbase/util/ReflectionUtils.java
@@ -26,6 +26,7 @@ import java.lang.management.ThreadInfo;
 import java.lang.management.ThreadMXBean;
 import java.lang.reflect.Constructor;
 import java.lang.reflect.InvocationTargetException;
+import java.lang.reflect.Method;
 import java.nio.charset.Charset;
 
 import org.apache.commons.logging.Log;
@@ -188,4 +189,37 @@ public class ReflectionUtils {
     return id + " (" + name + ")";
   }
 
+  /**
+   * Get and invoke the target method from the given object with given parameters
+   * @param obj the object to get and invoke method from
+   * @param methodName the name of the method to invoke
+   * @param params the parameters for the method to invoke
+   * @return the return value of the method invocation
+   */
+  public static Object invokeMethod(Object obj, String methodName, Object... params) {
+    Method m;
+    try {
+      m = obj.getClass().getMethod(methodName, getParameterTypes(params));
+      m.setAccessible(true);
+      return m.invoke(obj, params);
+    } catch (NoSuchMethodException e) {
+      throw new UnsupportedOperationException("Cannot find specified method " + methodName, e);
+    } catch (IllegalAccessException e) {
+      throw new UnsupportedOperationException("Unable to access specified method " + methodName, e);
+    } catch (IllegalArgumentException e) {
+      throw new UnsupportedOperationException("Illegal arguments supplied for method " + methodName,
+        e);
+    } catch (InvocationTargetException e) {
+      throw new UnsupportedOperationException("Method threw an exception for " + methodName, e);
+    }
+  }
+
+  private static Class<?>[] getParameterTypes(Object[] params) {
+    Class<?>[] parameterTypes = new Class<?>[params.length];
+    for (int i = 0; i < params.length; i++) {
+      parameterTypes[i] = params[i].getClass();
+    }
+    return parameterTypes;
+  }
+
 }

http://git-wip-us.apache.org/repos/asf/hbase/blob/f35bcd2f/hbase-server/src/main/java/org/apache/hadoop/hbase/fs/HFileSystem.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/fs/HFileSystem.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/fs/HFileSystem.java
index 754ea65..e55916f 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/fs/HFileSystem.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/fs/HFileSystem.java
@@ -41,16 +41,20 @@ import org.apache.hadoop.fs.FilterFileSystem;
 import org.apache.hadoop.fs.LocalFileSystem;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.hbase.ServerName;
+import org.apache.hadoop.hbase.util.FSUtils;
+import org.apache.hadoop.hbase.util.ReflectionUtils;
 import org.apache.hadoop.hbase.wal.DefaultWALProvider;
 import org.apache.hadoop.hdfs.DFSClient;
 import org.apache.hadoop.hdfs.DistributedFileSystem;
+import org.apache.hadoop.hdfs.protocol.BlockStoragePolicy;
 import org.apache.hadoop.hdfs.protocol.ClientProtocol;
 import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
+import org.apache.hadoop.hdfs.protocol.HdfsFileStatus;
 import org.apache.hadoop.hdfs.protocol.LocatedBlock;
 import org.apache.hadoop.hdfs.protocol.LocatedBlocks;
+import org.apache.hadoop.hdfs.server.blockmanagement.BlockStoragePolicySuite;
 import org.apache.hadoop.ipc.RPC;
 import org.apache.hadoop.util.Progressable;
-import org.apache.hadoop.util.ReflectionUtils;
 
 import static org.apache.hadoop.hbase.HConstants.HBASE_DIR;
 
@@ -67,6 +71,7 @@ public class HFileSystem extends FilterFileSystem {
 
   private final FileSystem noChecksumFs;   // read hfile data from storage
   private final boolean useHBaseChecksum;
+  private static volatile byte unspecifiedStoragePolicyId = Byte.MIN_VALUE;
 
   /**
    * Create a FileSystem object for HBase regionservers.
@@ -180,7 +185,7 @@ public class HFileSystem extends FilterFileSystem {
     Class<?> clazz = conf.getClass("fs." + uri.getScheme() + ".impl", null);
     if (clazz != null) {
       // This will be true for Hadoop 1.0, or 0.20.
-      fs = (FileSystem)ReflectionUtils.newInstance(clazz, conf);
+      fs = (FileSystem) org.apache.hadoop.util.ReflectionUtils.newInstance(clazz, conf);
       fs.initialize(uri, conf);
     } else {
       // For Hadoop 2.0, we have to go through FileSystem for the filesystem
@@ -421,4 +426,72 @@ public class HFileSystem extends FilterFileSystem {
     return fs.createNonRecursive(f, overwrite, bufferSize, replication,
                                  blockSize, progress);
   }
+
+  /**
+   * Set the source path (directory/file) to the specified storage policy.
+   * @param path The source path (directory/file).
+   * @param policyName The name of the storage policy: 'HOT', 'COLD', etc.
+   * See see hadoop 2.6+ org.apache.hadoop.hdfs.protocol.HdfsConstants for possible list e.g
+   * 'COLD', 'WARM', 'HOT', 'ONE_SSD', 'ALL_SSD', 'LAZY_PERSIST'.
+   */
+  public void setStoragePolicy(Path path, String policyName) {
+    FSUtils.setStoragePolicy(this.fs, path, policyName);
+  }
+
+  /**
+   * Get the storage policy of the source path (directory/file).
+   * @param path The source path (directory/file).
+   * @return Storage policy name, or {@code null} if not using {@link DistributedFileSystem} or
+   *         exception thrown when trying to get policy
+   */
+  public String getStoragePolicyName(Path path) {
+    try {
+      Object blockStoragePolicySpi =
+          ReflectionUtils.invokeMethod(this.fs, "getStoragePolicy", path);
+      return (String) ReflectionUtils.invokeMethod(blockStoragePolicySpi, "getName");
+    } catch (Exception e) {
+      // Maybe fail because of using old HDFS version, try the old way
+      if (LOG.isTraceEnabled()) {
+        LOG.trace("Failed to get policy directly", e);
+      }
+      return getStoragePolicyForOldHDFSVersion(path);
+    }
+  }
+
+  /**
+   * Before Hadoop 2.8.0, there's no getStoragePolicy method for FileSystem interface, and we need
+   * to keep compatible with it. See HADOOP-12161 for more details.
+   * @param path Path to get storage policy against
+   * @return the storage policy name
+   */
+  private String getStoragePolicyForOldHDFSVersion(Path path) {
+    try {
+      if (this.fs instanceof DistributedFileSystem) {
+        DistributedFileSystem dfs = (DistributedFileSystem) this.fs;
+        HdfsFileStatus status = dfs.getClient().getFileInfo(path.toUri().getPath());
+        if (null != status) {
+          if (unspecifiedStoragePolicyId < 0) {
+            // Get the unspecified id field through reflection to avoid compilation error.
+            // In later version BlockStoragePolicySuite#ID_UNSPECIFIED is moved to
+            // HdfsConstants#BLOCK_STORAGE_POLICY_ID_UNSPECIFIED
+            Field idUnspecified = BlockStoragePolicySuite.class.getField("ID_UNSPECIFIED");
+            unspecifiedStoragePolicyId = idUnspecified.getByte(BlockStoragePolicySuite.class);
+          }
+          byte storagePolicyId = status.getStoragePolicy();
+          if (storagePolicyId != unspecifiedStoragePolicyId) {
+            BlockStoragePolicy[] policies = dfs.getStoragePolicies();
+            for (BlockStoragePolicy policy : policies) {
+              if (policy.getId() == storagePolicyId) {
+                return policy.getName();
+              }
+            }
+          }
+        }
+      }
+    } catch (Throwable e) {
+      LOG.warn("failed to get block storage policy of [" + path + "]", e);
+    }
+
+    return null;
+  }
 }

http://git-wip-us.apache.org/repos/asf/hbase/blob/f35bcd2f/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/HFileOutputFormat2.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/HFileOutputFormat2.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/HFileOutputFormat2.java
index 492b5d1..31e7e5a 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/HFileOutputFormat2.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/HFileOutputFormat2.java
@@ -68,6 +68,7 @@ import org.apache.hadoop.hbase.regionserver.BloomType;
 import org.apache.hadoop.hbase.regionserver.HStore;
 import org.apache.hadoop.hbase.regionserver.StoreFile;
 import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.hadoop.hbase.util.FSUtils;
 import org.apache.hadoop.io.NullWritable;
 import org.apache.hadoop.io.SequenceFile;
 import org.apache.hadoop.io.Text;
@@ -124,6 +125,9 @@ public class HFileOutputFormat2
   private static final String OUTPUT_TABLE_NAME_CONF_KEY =
       "hbase.mapreduce.hfileoutputformat.table.name";
 
+  public static final String STORAGE_POLICY_PROPERTY = HStore.BLOCK_STORAGE_POLICY_KEY;
+  public static final String STORAGE_POLICY_PROPERTY_CF_PREFIX = STORAGE_POLICY_PROPERTY + ".";
+
   @Override
   public RecordWriter<ImmutableBytesWritable, Cell> getRecordWriter(
       final TaskAttemptContext context) throws IOException, InterruptedException {
@@ -190,7 +194,9 @@ public class HFileOutputFormat2
 
         // If this is a new column family, verify that the directory exists
         if (wl == null) {
-          fs.mkdirs(new Path(outputdir, Bytes.toString(family)));
+          Path cfPath = new Path(outputdir, Bytes.toString(family));
+          fs.mkdirs(cfPath);
+          configureStoragePolicy(conf, fs, family, cfPath);
         }
 
         // If any of the HFiles for the column families has reached
@@ -351,6 +357,21 @@ public class HFileOutputFormat2
     };
   }
 
+  /**
+   * Configure block storage policy for CF after the directory is created.
+   */
+  static void configureStoragePolicy(final Configuration conf, final FileSystem fs,
+      byte[] family, Path cfPath) {
+    if (null == conf || null == fs || null == family || null == cfPath) {
+      return;
+    }
+    String policy =
+        conf.get(STORAGE_POLICY_PROPERTY_CF_PREFIX + Bytes.toString(family),
+          conf.get(STORAGE_POLICY_PROPERTY));
+
+    FSUtils.setStoragePolicy(fs, cfPath, policy);
+  }
+
   /*
    * Data structure to hold a Writer and amount of data written on it.
    */

http://git-wip-us.apache.org/repos/asf/hbase/blob/f35bcd2f/hbase-server/src/main/java/org/apache/hadoop/hbase/master/HMaster.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/HMaster.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/HMaster.java
index d7fc2e5..f8bbc65 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/HMaster.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/HMaster.java
@@ -48,6 +48,7 @@ import javax.servlet.http.HttpServletResponse;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.hbase.ClusterStatus;
 import org.apache.hadoop.hbase.CoordinatedStateException;
@@ -1304,7 +1305,19 @@ public class HMaster extends HRegionServer implements MasterServices, Server {
     final Path walDir = new Path(FSUtils.getWALRootDir(this.conf),
         MasterProcedureConstants.MASTER_PROCEDURE_LOGDIR);
 
-    procedureStore = new WALProcedureStore(conf, walDir.getFileSystem(conf), walDir,
+    final FileSystem walFs = walDir.getFileSystem(conf);
+
+    // Create the log directory for the procedure store
+    if (!walFs.exists(walDir)) {
+      if (!walFs.mkdirs(walDir)) {
+        throw new IOException("Unable to mkdir " + walDir);
+      }
+    }
+    // Now that it exists, set the log policy
+    FSUtils.setStoragePolicy(walFs, conf, walDir, HConstants.WAL_STORAGE_POLICY,
+      HConstants.DEFAULT_WAL_STORAGE_POLICY);
+
+    procedureStore = new WALProcedureStore(conf, walFs, walDir,
         new MasterProcedureEnv.WALStoreLeaseRecovery(this));
     procedureStore.registerListener(new MasterProcedureEnv.MasterProcedureStoreListener(this));
     procedureExecutor = new ProcedureExecutor(conf, procEnv, procedureStore,

http://git-wip-us.apache.org/repos/asf/hbase/blob/f35bcd2f/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionFileSystem.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionFileSystem.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionFileSystem.java
index 884485c..7c36511 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionFileSystem.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionFileSystem.java
@@ -179,6 +179,36 @@ public class HRegionFileSystem {
   }
 
   /**
+   * Set storage policy for a given column family.
+   * <p>
+   * If we're running on a version of HDFS that doesn't support the given storage policy
+   * (or storage policies at all), then we'll issue a log message and continue.
+   * See http://hadoop.apache.org/docs/r2.6.0/hadoop-project-dist/hadoop-hdfs/ArchivalStorage.html
+   * for possible list e.g 'COLD', 'WARM', 'HOT', 'ONE_SSD', 'ALL_SSD', 'LAZY_PERSIST'.
+   *
+   * @param familyName The name of column family.
+   * @param policyName The name of the storage policy
+   */
+  public void setStoragePolicy(String familyName, String policyName) {
+    FSUtils.setStoragePolicy(this.fs, getStoreDir(familyName), policyName);
+  }
+
+  /**
+   * Get the storage policy of the directory of CF.
+   * @param familyName The name of column family.
+   * @return Storage policy name, or {@code null} if not using {@link HFileSystem} or exception
+   *         thrown when trying to get policy
+   */
+  public String getStoragePolicyName(String familyName) {
+    if (this.fs instanceof HFileSystem) {
+      Path storeDir = getStoreDir(familyName);
+      return ((HFileSystem) this.fs).getStoragePolicyName(storeDir);
+    }
+
+    return null;
+  }
+
+  /**
    * Returns the store files available for the family.
    * This methods performs the filtering based on the valid store files.
    * @param familyName Column Family Name

http://git-wip-us.apache.org/repos/asf/hbase/blob/f35bcd2f/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HStore.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HStore.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HStore.java
index 730deea..e0f4042 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HStore.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HStore.java
@@ -124,6 +124,9 @@ public class HStore implements Store {
   public static final String COMPACTCHECKER_INTERVAL_MULTIPLIER_KEY =
       "hbase.server.compactchecker.interval.multiplier";
   public static final String BLOCKING_STOREFILES_KEY = "hbase.hstore.blockingStoreFiles";
+  public static final String BLOCK_STORAGE_POLICY_KEY = "hbase.hstore.block.storage.policy";
+  // keep in accordance with HDFS default storage policy
+  public static final String DEFAULT_BLOCK_STORAGE_POLICY = "HOT";
   public static final int DEFAULT_COMPACTCHECKER_INTERVAL_MULTIPLIER = 1000;
   public static final int DEFAULT_BLOCKING_STOREFILE_COUNT = 7;
 
@@ -237,6 +240,13 @@ public class HStore implements Store {
       .addWritableMap(family.getValues());
     this.blocksize = family.getBlocksize();
 
+    // set block storage policy for store directory
+    String policyName = family.getStoragePolicy();
+    if (null == policyName) {
+      policyName = this.conf.get(BLOCK_STORAGE_POLICY_KEY, DEFAULT_BLOCK_STORAGE_POLICY);
+    }
+    this.fs.setStoragePolicy(family.getNameAsString(), policyName.trim());
+
     this.dataBlockEncoder =
         new HFileDataBlockEncoderImpl(family.getDataBlockEncoding());
 
@@ -1078,11 +1088,12 @@ public class HStore implements Store {
       favoredNodes = region.getRegionServerServices().getFavoredNodesForRegion(
           region.getRegionInfo().getEncodedName());
     }
+    Path familyTempDir = new Path(fs.getTempDir(), family.getNameAsString());
     HFileContext hFileContext = createFileContext(compression, includeMVCCReadpoint, includesTag,
       cryptoContext);
     StoreFile.WriterBuilder builder = new StoreFile.WriterBuilder(conf, writerCacheConf,
         this.getFileSystem())
-            .withFilePath(fs.createTempName())
+            .withOutputDir(familyTempDir)
             .withComparator(comparator)
             .withBloomType(family.getBloomFilterType())
             .withMaxKeyCount(maxKeyCount)

http://git-wip-us.apache.org/repos/asf/hbase/blob/f35bcd2f/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreFile.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreFile.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreFile.java
index 803bfb3..1e74911 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreFile.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreFile.java
@@ -40,6 +40,7 @@ import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.hbase.Cell;
 import org.apache.hadoop.hbase.CellUtil;
 import org.apache.hadoop.hbase.HBaseInterfaceAudience;
+import org.apache.hadoop.hbase.HColumnDescriptor;
 import org.apache.hadoop.hbase.HConstants;
 import org.apache.hadoop.hbase.HDFSBlocksDistribution;
 import org.apache.hadoop.hbase.KeyValue;
@@ -60,6 +61,7 @@ import org.apache.hadoop.hbase.util.BloomFilter;
 import org.apache.hadoop.hbase.util.BloomFilterFactory;
 import org.apache.hadoop.hbase.util.BloomFilterWriter;
 import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.hadoop.hbase.util.FSUtils;
 import org.apache.hadoop.io.WritableUtils;
 
 import com.google.common.annotations.VisibleForTesting;
@@ -726,6 +728,14 @@ public class StoreFile {
         HRegionFileSystem.mkdirs(fs, conf, dir);
       }
 
+      // set block storage policy for temp path
+      String policyName = this.conf.get(HColumnDescriptor.STORAGE_POLICY);
+      if (null == policyName) {
+        policyName = this.conf.get(HStore.BLOCK_STORAGE_POLICY_KEY,
+          HStore.DEFAULT_BLOCK_STORAGE_POLICY);
+      }
+      FSUtils.setStoragePolicy(this.fs, dir, policyName);
+
       if (filePath == null) {
         filePath = getUniqueFile(fs, dir);
         if (!BloomFilterFactory.isGeneralBloomEnabled(conf)) {

http://git-wip-us.apache.org/repos/asf/hbase/blob/f35bcd2f/hbase-server/src/main/java/org/apache/hadoop/hbase/util/FSUtils.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/util/FSUtils.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/util/FSUtils.java
index 5d850b0..f3e2c63 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/util/FSUtils.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/util/FSUtils.java
@@ -141,59 +141,122 @@ public abstract class FSUtils {
   public static void setStoragePolicy(final FileSystem fs, final Configuration conf,
       final Path path, final String policyKey, final String defaultPolicy) {
     String storagePolicy = conf.get(policyKey, defaultPolicy).toUpperCase(Locale.ROOT);
-    if (storagePolicy.equals(defaultPolicy)) {
+    setStoragePolicy(fs, path, storagePolicy);
+  }
+
+  private static final Map<FileSystem, Boolean> warningMap =
+      new ConcurrentHashMap<FileSystem, Boolean>();
+
+  /**
+   * Sets storage policy for given path.
+   * <p>
+   * If the passed path is a directory, we'll set the storage policy for all files
+   * created in the future in said directory. Note that this change in storage
+   * policy takes place at the HDFS level; it will persist beyond this RS's lifecycle.
+   * If we're running on a version of HDFS that doesn't support the given storage policy
+   * (or storage policies at all), then we'll issue a log message and continue.
+   * See http://hadoop.apache.org/docs/r2.6.0/hadoop-project-dist/hadoop-hdfs/ArchivalStorage.html
+   * for possible list e.g 'COLD', 'WARM', 'HOT', 'ONE_SSD', 'ALL_SSD', 'LAZY_PERSIST'.
+   *
+   * @param fs We only do anything if an instance of DistributedFileSystem
+   * @param path the Path whose storage policy is to be set
+   * @param storagePolicy Policy to set on <code>path</code>
+   */
+  public static void setStoragePolicy(final FileSystem fs, final Path path,
+      final String storagePolicy) {
+    if (storagePolicy == null) {
       if (LOG.isTraceEnabled()) {
-        LOG.trace("default policy of " + defaultPolicy + " requested, exiting early.");
+        LOG.trace("We were passed a null storagePolicy, exiting early.");
       }
       return;
     }
-    if (fs instanceof DistributedFileSystem) {
-      DistributedFileSystem dfs = (DistributedFileSystem)fs;
-      // Once our minimum supported Hadoop version is 2.6.0 we can remove reflection.
-      Class<? extends DistributedFileSystem> dfsClass = dfs.getClass();
-      Method m = null;
-      try {
-        m = dfsClass.getDeclaredMethod("setStoragePolicy",
-            new Class<?>[] { Path.class, String.class });
-        m.setAccessible(true);
-      } catch (NoSuchMethodException e) {
-        LOG.info("FileSystem doesn't support"
-            + " setStoragePolicy; --HDFS-6584 not available");
-      } catch (SecurityException e) {
-        LOG.info("Doesn't have access to setStoragePolicy on "
-            + "FileSystems --HDFS-6584 not available", e);
-        m = null; // could happen on setAccessible()
+    final String trimmedStoragePolicy = storagePolicy.trim();
+    if (trimmedStoragePolicy.isEmpty()) {
+      if (LOG.isTraceEnabled()) {
+        LOG.trace("We were passed an empty storagePolicy, exiting early.");
       }
-      if (m != null) {
-        try {
-          m.invoke(dfs, path, storagePolicy);
-          LOG.info("set " + storagePolicy + " for " + path);
-        } catch (Exception e) {
-          // check for lack of HDFS-7228
-          boolean probablyBadPolicy = false;
-          if (e instanceof InvocationTargetException) {
-            final Throwable exception = e.getCause();
-            if (exception instanceof RemoteException &&
-                HadoopIllegalArgumentException.class.getName().equals(
-                    ((RemoteException)exception).getClassName())) {
-              LOG.warn("Given storage policy, '" + storagePolicy + "', was rejected and probably " +
-                  "isn't a valid policy for the version of Hadoop you're running. I.e. if you're " +
-                  "trying to use SSD related policies then you're likely missing HDFS-7228. For " +
-                  "more information see the 'ArchivalStorage' docs for your Hadoop release.");
-              LOG.debug("More information about the invalid storage policy.", exception);
-              probablyBadPolicy = true;
+      return;
+    }
+    boolean distributed = false;
+    try {
+      distributed = isDistributedFileSystem(fs);
+    } catch (IOException ioe) {
+      if (!warningMap.containsKey(fs)) {
+        warningMap.put(fs, true);
+        LOG.warn("FileSystem isn't an instance of DistributedFileSystem; presuming it doesn't "
+            + "support setStoragePolicy. Unable to set storagePolicy=" + trimmedStoragePolicy
+            + " on path=" + path);
+      } else if (LOG.isDebugEnabled()) {
+        LOG.debug("FileSystem isn't an instance of DistributedFileSystem; presuming it doesn't "
+            + "support setStoragePolicy. Unable to set storagePolicy=" + trimmedStoragePolicy
+            + " on path=" + path);
+      }
+      return;
+    }
+    if (distributed) {
+      invokeSetStoragePolicy(fs, path, trimmedStoragePolicy);
+    }
+  }
+
+  /*
+   * All args have been checked and are good. Run the setStoragePolicy invocation.
+   */
+  private static void invokeSetStoragePolicy(final FileSystem fs, final Path path,
+      final String storagePolicy) {
+    Method m = null;
+    try {
+      m = fs.getClass().getDeclaredMethod("setStoragePolicy",
+        new Class<?>[] { Path.class, String.class });
+      m.setAccessible(true);
+    } catch (NoSuchMethodException e) {
+      final String msg = "FileSystem doesn't support setStoragePolicy; HDFS-6584 not available";
+      if (!warningMap.containsKey(fs)) {
+        warningMap.put(fs, true);
+        LOG.warn(msg, e);
+      } else if (LOG.isDebugEnabled()) {
+        LOG.debug(msg, e);
+      }
+      m = null;
+    } catch (SecurityException e) {
+      final String msg = "No access to setStoragePolicy on FileSystem; HDFS-6584 not available";
+      if (!warningMap.containsKey(fs)) {
+        warningMap.put(fs, true);
+        LOG.warn(msg, e);
+      } else if (LOG.isDebugEnabled()) {
+        LOG.debug(msg, e);
+      }
+      m = null; // could happen on setAccessible()
+    }
+    if (m != null) {
+      try {
+        m.invoke(fs, path, storagePolicy);
+        if (LOG.isDebugEnabled()) {
+          LOG.debug("Set storagePolicy=" + storagePolicy + " for path=" + path);
+        }
+      } catch (Exception e) {
+        // This swallows FNFE, should we be throwing it? seems more likely to indicate dev
+        // misuse than a runtime problem with HDFS.
+        if (!warningMap.containsKey(fs)) {
+          warningMap.put(fs, true);
+          LOG.warn("Unable to set storagePolicy=" + storagePolicy + " for path=" + path, e);
+        } else if (LOG.isDebugEnabled()) {
+          LOG.debug("Unable to set storagePolicy=" + storagePolicy + " for path=" + path, e);
+        }
+        // check for lack of HDFS-7228
+        if (e instanceof InvocationTargetException) {
+          final Throwable exception = e.getCause();
+          if (exception instanceof RemoteException &&
+              HadoopIllegalArgumentException.class.getName().equals(
+                ((RemoteException)exception).getClassName())) {
+            if (LOG.isDebugEnabled()) {
+              LOG.debug("Given storage policy, '" +storagePolicy +"', was rejected and probably " +
+                "isn't a valid policy for the version of Hadoop you're running. I.e. if you're " +
+                "trying to use SSD related policies then you're likely missing HDFS-7228. For " +
+                "more information see the 'ArchivalStorage' docs for your Hadoop release.");
             }
           }
-          if (!probablyBadPolicy) {
-            // This swallows FNFE, should we be throwing it? seems more likely to indicate dev
-            // misuse than a runtime problem with HDFS.
-            LOG.warn("Unable to set " + storagePolicy + " for " + path, e);
-          }
         }
       }
-    } else {
-      LOG.info("FileSystem isn't an instance of DistributedFileSystem; presuming it doesn't " +
-          "support setStoragePolicy.");
     }
   }
 
@@ -210,7 +273,7 @@ public abstract class FSUtils {
     }
     return fileSystem instanceof DistributedFileSystem;
   }
- 
+
   /**
    * Compare of path component. Does not consider schema; i.e. if schemas
    * different but <code>path</code> starts with <code>rootPath</code>,

http://git-wip-us.apache.org/repos/asf/hbase/blob/f35bcd2f/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestCompaction.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestCompaction.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestCompaction.java
index b1f94fb..b9d1dd3 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestCompaction.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestCompaction.java
@@ -186,10 +186,13 @@ public class TestCompaction {
       Store s = r.stores.get(COLUMN_FAMILY);
       assertEquals(compactionThreshold, s.getStorefilesCount());
       assertTrue(s.getStorefilesSize() > 15*1000);
-      // and no new store files persisted past compactStores()
+      // only one empty dir exists in temp dir
       FileStatus[] ls = r.getFilesystem().listStatus(r.getRegionFileSystem().getTempDir());
+      assertEquals(1, ls.length);
+      Path storeTempDir = new Path(r.getRegionFileSystem().getTempDir(), Bytes.toString(COLUMN_FAMILY));
+      assertTrue(r.getFilesystem().exists(storeTempDir));
+      ls = r.getFilesystem().listStatus(storeTempDir);
       assertEquals(0, ls.length);
-
     } finally {
       // don't mess up future tests
       r.writestate.writesEnabled = true;

http://git-wip-us.apache.org/repos/asf/hbase/blob/f35bcd2f/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestHRegion.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestHRegion.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestHRegion.java
index 09d0b4f..e93fcf0 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestHRegion.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestHRegion.java
@@ -912,7 +912,7 @@ public class TestHRegion {
       assertEquals(3, region.getStore(family).getStorefilesCount());
 
       // now find the compacted file, and manually add it to the recovered edits
-      Path tmpDir = region.getRegionFileSystem().getTempDir();
+      Path tmpDir = new Path(region.getRegionFileSystem().getTempDir(), Bytes.toString(family));
       FileStatus[] files = FSUtils.listStatus(fs, tmpDir);
       String errorMsg = "Expected to find 1 file in the region temp directory "
           + "from the compaction, could not find any";

http://git-wip-us.apache.org/repos/asf/hbase/blob/f35bcd2f/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestHRegionFileSystem.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestHRegionFileSystem.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestHRegionFileSystem.java
index dfb20da..dfef63f 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestHRegionFileSystem.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestHRegionFileSystem.java
@@ -21,11 +21,14 @@ package org.apache.hadoop.hbase.regionserver;
 
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertNull;
 import static org.junit.Assert.assertTrue;
 
 import java.io.IOException;
 import java.net.URI;
 import java.util.Collection;
+import java.util.List;
 
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
@@ -37,9 +40,16 @@ import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.fs.permission.FsPermission;
 import org.apache.hadoop.hbase.TableName;
+import org.apache.hadoop.hbase.client.Admin;
+import org.apache.hadoop.hbase.client.HTable;
+import org.apache.hadoop.hbase.client.Put;
+import org.apache.hadoop.hbase.fs.HFileSystem;
 import org.apache.hadoop.hbase.HRegionInfo;
+import org.apache.hadoop.hbase.PerformanceEvaluation;
 import org.apache.hadoop.hbase.HBaseTestingUtility;
+import org.apache.hadoop.hbase.HColumnDescriptor;
 import org.apache.hadoop.hbase.testclassification.SmallTests;
+import org.apache.hadoop.hbase.util.Bytes;
 import org.apache.hadoop.hbase.util.FSUtils;
 import org.apache.hadoop.util.Progressable;
 
@@ -50,6 +60,126 @@ import org.junit.experimental.categories.Category;
 public class TestHRegionFileSystem {
   private static HBaseTestingUtility TEST_UTIL = new HBaseTestingUtility();
   private static final Log LOG = LogFactory.getLog(TestHRegionFileSystem.class);
+  private static final byte[][] FAMILIES = {
+      Bytes.add(PerformanceEvaluation.FAMILY_NAME, Bytes.toBytes("-A")),
+      Bytes.add(PerformanceEvaluation.FAMILY_NAME, Bytes.toBytes("-B")) };
+  private static final TableName TABLE_NAME = TableName.valueOf("TestTable");
+
+  @Test
+  public void testBlockStoragePolicy() throws Exception {
+    TEST_UTIL = new HBaseTestingUtility();
+    Configuration conf = TEST_UTIL.getConfiguration();
+    TEST_UTIL.startMiniCluster();
+    HTable table = (HTable) TEST_UTIL.createTable(TABLE_NAME, FAMILIES);
+    assertEquals("Should start with empty table", 0, TEST_UTIL.countRows(table));
+    HRegionFileSystem regionFs = getHRegionFS(table, conf);
+    // the original block storage policy would be HOT
+    String spA = regionFs.getStoragePolicyName(Bytes.toString(FAMILIES[0]));
+    String spB = regionFs.getStoragePolicyName(Bytes.toString(FAMILIES[1]));
+    LOG.debug("Storage policy of cf 0: [" + spA + "].");
+    LOG.debug("Storage policy of cf 1: [" + spB + "].");
+    assertEquals("HOT", spA);
+    assertEquals("HOT", spB);
+
+    // Recreate table and make sure storage policy could be set through configuration
+    TEST_UTIL.shutdownMiniCluster();
+    TEST_UTIL.getConfiguration().set(HStore.BLOCK_STORAGE_POLICY_KEY, "WARM");
+    TEST_UTIL.startMiniCluster();
+    table = (HTable) TEST_UTIL.createTable(TABLE_NAME, FAMILIES);
+    regionFs = getHRegionFS(table, conf);
+
+    try (Admin admin = TEST_UTIL.getConnection().getAdmin()) {
+      spA = regionFs.getStoragePolicyName(Bytes.toString(FAMILIES[0]));
+      spB = regionFs.getStoragePolicyName(Bytes.toString(FAMILIES[1]));
+      LOG.debug("Storage policy of cf 0: [" + spA + "].");
+      LOG.debug("Storage policy of cf 1: [" + spB + "].");
+      assertEquals("WARM", spA);
+      assertEquals("WARM", spB);
+
+      // alter table cf schema to change storage policies
+      // and make sure it could override settings in conf
+      HColumnDescriptor hcdA = new HColumnDescriptor(Bytes.toString(FAMILIES[0]));
+      // alter through setting HStore#BLOCK_STORAGE_POLICY_KEY in HColumnDescriptor
+      hcdA.setValue(HStore.BLOCK_STORAGE_POLICY_KEY, "ONE_SSD");
+      admin.modifyColumn(TABLE_NAME, hcdA);
+      while (TEST_UTIL.getMiniHBaseCluster().getMaster().getAssignmentManager().getRegionStates()
+          .isRegionsInTransition()) {
+        Thread.sleep(200);
+        LOG.debug("Waiting on table to finish schema altering");
+      }
+      // alter through HColumnDescriptor#setStoragePolicy
+      HColumnDescriptor hcdB = new HColumnDescriptor(Bytes.toString(FAMILIES[1]));
+      hcdB.setStoragePolicy("ALL_SSD");
+      admin.modifyColumn(TABLE_NAME, hcdB);
+      while (TEST_UTIL.getMiniHBaseCluster().getMaster().getAssignmentManager().getRegionStates()
+          .isRegionsInTransition()) {
+        Thread.sleep(200);
+        LOG.debug("Waiting on table to finish schema altering");
+      }
+      spA = regionFs.getStoragePolicyName(Bytes.toString(FAMILIES[0]));
+      spB = regionFs.getStoragePolicyName(Bytes.toString(FAMILIES[1]));
+      LOG.debug("Storage policy of cf 0: [" + spA + "].");
+      LOG.debug("Storage policy of cf 1: [" + spB + "].");
+      assertNotNull(spA);
+      assertEquals("ONE_SSD", spA);
+      assertNotNull(spB);
+      assertEquals("ALL_SSD", spB);
+
+      // flush memstore snapshot into 3 files
+      for (long i = 0; i < 3; i++) {
+        Put put = new Put(Bytes.toBytes(i));
+        put.addColumn(FAMILIES[0], Bytes.toBytes(i), Bytes.toBytes(i));
+        table.put(put);
+        admin.flush(TABLE_NAME);
+      }
+      // there should be 3 files in store dir
+      FileSystem fs = TEST_UTIL.getDFSCluster().getFileSystem();
+      Path storePath = regionFs.getStoreDir(Bytes.toString(FAMILIES[0]));
+      FileStatus[] storeFiles = FSUtils.listStatus(fs, storePath);
+      assertNotNull(storeFiles);
+      assertEquals(3, storeFiles.length);
+      // store temp dir still exists but empty
+      Path storeTempDir = new Path(regionFs.getTempDir(), Bytes.toString(FAMILIES[0]));
+      assertTrue(fs.exists(storeTempDir));
+      FileStatus[] tempFiles = FSUtils.listStatus(fs, storeTempDir);
+      assertNull(tempFiles);
+      // storage policy of cf temp dir and 3 store files should be ONE_SSD
+      assertEquals("ONE_SSD",
+        ((HFileSystem) regionFs.getFileSystem()).getStoragePolicyName(storeTempDir));
+      for (FileStatus status : storeFiles) {
+        assertEquals("ONE_SSD",
+          ((HFileSystem) regionFs.getFileSystem()).getStoragePolicyName(status.getPath()));
+      }
+
+      // change storage policies by calling raw api directly
+      regionFs.setStoragePolicy(Bytes.toString(FAMILIES[0]), "ALL_SSD");
+      regionFs.setStoragePolicy(Bytes.toString(FAMILIES[1]), "ONE_SSD");
+      spA = regionFs.getStoragePolicyName(Bytes.toString(FAMILIES[0]));
+      spB = regionFs.getStoragePolicyName(Bytes.toString(FAMILIES[1]));
+      LOG.debug("Storage policy of cf 0: [" + spA + "].");
+      LOG.debug("Storage policy of cf 1: [" + spB + "].");
+      assertNotNull(spA);
+      assertEquals("ALL_SSD", spA);
+      assertNotNull(spB);
+      assertEquals("ONE_SSD", spB);
+    } finally {
+      table.close();
+      TEST_UTIL.deleteTable(TABLE_NAME);
+      TEST_UTIL.shutdownMiniCluster();
+    }
+  }
+
+  private HRegionFileSystem getHRegionFS(HTable table, Configuration conf) throws IOException {
+    FileSystem fs = TEST_UTIL.getDFSCluster().getFileSystem();
+    Path tableDir = FSUtils.getTableDir(TEST_UTIL.getDefaultRootDirPath(), table.getName());
+    List<Path> regionDirs = FSUtils.getRegionDirs(fs, tableDir);
+    assertEquals(1, regionDirs.size());
+    List<Path> familyDirs = FSUtils.getFamilyDirs(fs, regionDirs.get(0));
+    assertEquals(2, familyDirs.size());
+    HRegionInfo hri = table.getRegionLocator().getAllRegionLocations().get(0).getRegionInfo();
+    HRegionFileSystem regionFs = new HRegionFileSystem(conf, new HFileSystem(fs), tableDir, hri);
+    return regionFs;
+  }
 
   @Test
   public void testOnDiskRegionCreation() throws IOException {

http://git-wip-us.apache.org/repos/asf/hbase/blob/f35bcd2f/hbase-shell/src/main/ruby/hbase/admin.rb
----------------------------------------------------------------------
diff --git a/hbase-shell/src/main/ruby/hbase/admin.rb b/hbase-shell/src/main/ruby/hbase/admin.rb
index 60ae155..2dd69f8 100644
--- a/hbase-shell/src/main/ruby/hbase/admin.rb
+++ b/hbase-shell/src/main/ruby/hbase/admin.rb
@@ -859,6 +859,10 @@ module Hbase
             algorithm))
         end
       end
+      if arg.include?(org.apache.hadoop.hbase.HColumnDescriptor::STORAGE_POLICY)
+          storage_policy = arg.delete(org.apache.hadoop.hbase.HColumnDescriptor::STORAGE_POLICY).upcase
+          family.setStoragePolicy(storage_policy)
+      end
 
       set_user_metadata(family, arg.delete(METADATA)) if arg[METADATA]
       set_descriptor_config(family, arg.delete(CONFIGURATION)) if arg[CONFIGURATION]