You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hbase.apache.org by ap...@apache.org on 2018/02/02 01:10:46 UTC
hbase git commit: HBASE-19858 Backport HBASE-14061 (Support CF-level
Storage Policy) to branch-1
Repository: hbase
Updated Branches:
refs/heads/branch-1 1a930cff6 -> f35bcd2fe
HBASE-19858 Backport HBASE-14061 (Support CF-level Storage Policy) to branch-1
HBASE-14061 Support CF-level Storage Policy
HBASE-14061 Support CF-level Storage Policy (addendum)
HBASE-14061 Support CF-level Storage Policy (addendum2)
HBASE-15172 Support setting storage policy in bulkload
HBASE-17538 HDFS.setStoragePolicy() logs errors on local fs
HBASE-18015 Storage class aware block placement for procedure v2 WALs
HBASE-18017 Reduce frequency of setStoragePolicy failure warnings
Default storage policy if not configured cannot be "NONE"
HBASE-19016 Coordinate storage policy property name for table schema and bulkload
Fix checkstyle warnings
Addressed additional review feedback on backport
Project: http://git-wip-us.apache.org/repos/asf/hbase/repo
Commit: http://git-wip-us.apache.org/repos/asf/hbase/commit/f35bcd2f
Tree: http://git-wip-us.apache.org/repos/asf/hbase/tree/f35bcd2f
Diff: http://git-wip-us.apache.org/repos/asf/hbase/diff/f35bcd2f
Branch: refs/heads/branch-1
Commit: f35bcd2fe3e27d2ff211c731a7300d80c658bcb3
Parents: 1a930cf
Author: Andrew Purtell <ap...@apache.org>
Authored: Thu Feb 1 16:49:02 2018 -0800
Committer: Andrew Purtell <ap...@apache.org>
Committed: Thu Feb 1 17:10:39 2018 -0800
----------------------------------------------------------------------
.../apache/hadoop/hbase/HColumnDescriptor.java | 22 +++
.../org/apache/hadoop/hbase/HConstants.java | 10 +-
.../hadoop/hbase/util/ReflectionUtils.java | 34 +++++
.../org/apache/hadoop/hbase/fs/HFileSystem.java | 77 +++++++++-
.../hbase/mapreduce/HFileOutputFormat2.java | 23 ++-
.../org/apache/hadoop/hbase/master/HMaster.java | 15 +-
.../hbase/regionserver/HRegionFileSystem.java | 30 ++++
.../hadoop/hbase/regionserver/HStore.java | 13 +-
.../hadoop/hbase/regionserver/StoreFile.java | 10 ++
.../org/apache/hadoop/hbase/util/FSUtils.java | 153 +++++++++++++------
.../hbase/regionserver/TestCompaction.java | 7 +-
.../hadoop/hbase/regionserver/TestHRegion.java | 2 +-
.../regionserver/TestHRegionFileSystem.java | 130 ++++++++++++++++
hbase-shell/src/main/ruby/hbase/admin.rb | 4 +
14 files changed, 470 insertions(+), 60 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/hbase/blob/f35bcd2f/hbase-client/src/main/java/org/apache/hadoop/hbase/HColumnDescriptor.java
----------------------------------------------------------------------
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/HColumnDescriptor.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/HColumnDescriptor.java
index 560b74b..4a6550b 100644
--- a/hbase-client/src/main/java/org/apache/hadoop/hbase/HColumnDescriptor.java
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/HColumnDescriptor.java
@@ -127,6 +127,8 @@ public class HColumnDescriptor implements WritableComparable<HColumnDescriptor>
public static final String DFS_REPLICATION = "DFS_REPLICATION";
public static final short DEFAULT_DFS_REPLICATION = 0;
+ public static final String STORAGE_POLICY = "STORAGE_POLICY";
+
/**
* Default compression type.
*/
@@ -1567,4 +1569,24 @@ public class HColumnDescriptor implements WritableComparable<HColumnDescriptor>
setValue(DFS_REPLICATION, Short.toString(replication));
return this;
}
+
+ /**
+ * Return the storage policy in use by this family
+ * <p/>
+ * Not using {@code enum} here because HDFS is not using {@code enum} for storage policy, see
+ * org.apache.hadoop.hdfs.server.blockmanagement.BlockStoragePolicySuite for more details
+ */
+ public String getStoragePolicy() {
+ return getValue(STORAGE_POLICY);
+ }
+
+ /**
+ * Set the storage policy for use with this family
+ * @param policy the policy to set, valid setting includes: <i>"LAZY_PERSIST"</i>,
+ * <i>"ALL_SSD"</i>, <i>"ONE_SSD"</i>, <i>"HOT"</i>, <i>"WARM"</i>, <i>"COLD"</i>
+ */
+ public HColumnDescriptor setStoragePolicy(String policy) {
+ setValue(STORAGE_POLICY, policy);
+ return this;
+ }
}
http://git-wip-us.apache.org/repos/asf/hbase/blob/f35bcd2f/hbase-common/src/main/java/org/apache/hadoop/hbase/HConstants.java
----------------------------------------------------------------------
diff --git a/hbase-common/src/main/java/org/apache/hadoop/hbase/HConstants.java b/hbase-common/src/main/java/org/apache/hadoop/hbase/HConstants.java
index c9f9ded..63dade1 100644
--- a/hbase-common/src/main/java/org/apache/hadoop/hbase/HConstants.java
+++ b/hbase-common/src/main/java/org/apache/hadoop/hbase/HConstants.java
@@ -1086,14 +1086,10 @@ public final class HConstants {
"hbase.regionserver.wal.enablecompression";
/** Configuration name of WAL storage policy
- * Valid values are:
- * NONE: no preference in destination of block replicas
- * ONE_SSD: place only one block replica in SSD and the remaining in default storage
- * and ALL_SSD: place all block replicas on SSD
- *
- * See http://hadoop.apache.org/docs/r2.6.0/hadoop-project-dist/hadoop-hdfs/ArchivalStorage.html*/
+ * Valid values are: HOT, COLD, WARM, ALL_SSD, ONE_SSD, LAZY_PERSIST
+ * See http://hadoop.apache.org/docs/r2.7.3/hadoop-project-dist/hadoop-hdfs/ArchivalStorage.html*/
public static final String WAL_STORAGE_POLICY = "hbase.wal.storage.policy";
- public static final String DEFAULT_WAL_STORAGE_POLICY = "NONE";
+ public static final String DEFAULT_WAL_STORAGE_POLICY = "HOT";
/** Region in Transition metrics threshold time */
public static final String METRICS_RIT_STUCK_WARNING_THRESHOLD =
http://git-wip-us.apache.org/repos/asf/hbase/blob/f35bcd2f/hbase-common/src/main/java/org/apache/hadoop/hbase/util/ReflectionUtils.java
----------------------------------------------------------------------
diff --git a/hbase-common/src/main/java/org/apache/hadoop/hbase/util/ReflectionUtils.java b/hbase-common/src/main/java/org/apache/hadoop/hbase/util/ReflectionUtils.java
index 15b3930..4b50a2f 100644
--- a/hbase-common/src/main/java/org/apache/hadoop/hbase/util/ReflectionUtils.java
+++ b/hbase-common/src/main/java/org/apache/hadoop/hbase/util/ReflectionUtils.java
@@ -26,6 +26,7 @@ import java.lang.management.ThreadInfo;
import java.lang.management.ThreadMXBean;
import java.lang.reflect.Constructor;
import java.lang.reflect.InvocationTargetException;
+import java.lang.reflect.Method;
import java.nio.charset.Charset;
import org.apache.commons.logging.Log;
@@ -188,4 +189,37 @@ public class ReflectionUtils {
return id + " (" + name + ")";
}
+ /**
+ * Get and invoke the target method from the given object with given parameters
+ * @param obj the object to get and invoke method from
+ * @param methodName the name of the method to invoke
+ * @param params the parameters for the method to invoke
+ * @return the return value of the method invocation
+ */
+ public static Object invokeMethod(Object obj, String methodName, Object... params) {
+ Method m;
+ try {
+ m = obj.getClass().getMethod(methodName, getParameterTypes(params));
+ m.setAccessible(true);
+ return m.invoke(obj, params);
+ } catch (NoSuchMethodException e) {
+ throw new UnsupportedOperationException("Cannot find specified method " + methodName, e);
+ } catch (IllegalAccessException e) {
+ throw new UnsupportedOperationException("Unable to access specified method " + methodName, e);
+ } catch (IllegalArgumentException e) {
+ throw new UnsupportedOperationException("Illegal arguments supplied for method " + methodName,
+ e);
+ } catch (InvocationTargetException e) {
+ throw new UnsupportedOperationException("Method threw an exception for " + methodName, e);
+ }
+ }
+
+ private static Class<?>[] getParameterTypes(Object[] params) {
+ Class<?>[] parameterTypes = new Class<?>[params.length];
+ for (int i = 0; i < params.length; i++) {
+ parameterTypes[i] = params[i].getClass();
+ }
+ return parameterTypes;
+ }
+
}
http://git-wip-us.apache.org/repos/asf/hbase/blob/f35bcd2f/hbase-server/src/main/java/org/apache/hadoop/hbase/fs/HFileSystem.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/fs/HFileSystem.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/fs/HFileSystem.java
index 754ea65..e55916f 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/fs/HFileSystem.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/fs/HFileSystem.java
@@ -41,16 +41,20 @@ import org.apache.hadoop.fs.FilterFileSystem;
import org.apache.hadoop.fs.LocalFileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.ServerName;
+import org.apache.hadoop.hbase.util.FSUtils;
+import org.apache.hadoop.hbase.util.ReflectionUtils;
import org.apache.hadoop.hbase.wal.DefaultWALProvider;
import org.apache.hadoop.hdfs.DFSClient;
import org.apache.hadoop.hdfs.DistributedFileSystem;
+import org.apache.hadoop.hdfs.protocol.BlockStoragePolicy;
import org.apache.hadoop.hdfs.protocol.ClientProtocol;
import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
+import org.apache.hadoop.hdfs.protocol.HdfsFileStatus;
import org.apache.hadoop.hdfs.protocol.LocatedBlock;
import org.apache.hadoop.hdfs.protocol.LocatedBlocks;
+import org.apache.hadoop.hdfs.server.blockmanagement.BlockStoragePolicySuite;
import org.apache.hadoop.ipc.RPC;
import org.apache.hadoop.util.Progressable;
-import org.apache.hadoop.util.ReflectionUtils;
import static org.apache.hadoop.hbase.HConstants.HBASE_DIR;
@@ -67,6 +71,7 @@ public class HFileSystem extends FilterFileSystem {
private final FileSystem noChecksumFs; // read hfile data from storage
private final boolean useHBaseChecksum;
+ private static volatile byte unspecifiedStoragePolicyId = Byte.MIN_VALUE;
/**
* Create a FileSystem object for HBase regionservers.
@@ -180,7 +185,7 @@ public class HFileSystem extends FilterFileSystem {
Class<?> clazz = conf.getClass("fs." + uri.getScheme() + ".impl", null);
if (clazz != null) {
// This will be true for Hadoop 1.0, or 0.20.
- fs = (FileSystem)ReflectionUtils.newInstance(clazz, conf);
+ fs = (FileSystem) org.apache.hadoop.util.ReflectionUtils.newInstance(clazz, conf);
fs.initialize(uri, conf);
} else {
// For Hadoop 2.0, we have to go through FileSystem for the filesystem
@@ -421,4 +426,72 @@ public class HFileSystem extends FilterFileSystem {
return fs.createNonRecursive(f, overwrite, bufferSize, replication,
blockSize, progress);
}
+
+ /**
+ * Set the source path (directory/file) to the specified storage policy.
+ * @param path The source path (directory/file).
+ * @param policyName The name of the storage policy: 'HOT', 'COLD', etc.
+ * See see hadoop 2.6+ org.apache.hadoop.hdfs.protocol.HdfsConstants for possible list e.g
+ * 'COLD', 'WARM', 'HOT', 'ONE_SSD', 'ALL_SSD', 'LAZY_PERSIST'.
+ */
+ public void setStoragePolicy(Path path, String policyName) {
+ FSUtils.setStoragePolicy(this.fs, path, policyName);
+ }
+
+ /**
+ * Get the storage policy of the source path (directory/file).
+ * @param path The source path (directory/file).
+ * @return Storage policy name, or {@code null} if not using {@link DistributedFileSystem} or
+ * exception thrown when trying to get policy
+ */
+ public String getStoragePolicyName(Path path) {
+ try {
+ Object blockStoragePolicySpi =
+ ReflectionUtils.invokeMethod(this.fs, "getStoragePolicy", path);
+ return (String) ReflectionUtils.invokeMethod(blockStoragePolicySpi, "getName");
+ } catch (Exception e) {
+ // Maybe fail because of using old HDFS version, try the old way
+ if (LOG.isTraceEnabled()) {
+ LOG.trace("Failed to get policy directly", e);
+ }
+ return getStoragePolicyForOldHDFSVersion(path);
+ }
+ }
+
+ /**
+ * Before Hadoop 2.8.0, there's no getStoragePolicy method for FileSystem interface, and we need
+ * to keep compatible with it. See HADOOP-12161 for more details.
+ * @param path Path to get storage policy against
+ * @return the storage policy name
+ */
+ private String getStoragePolicyForOldHDFSVersion(Path path) {
+ try {
+ if (this.fs instanceof DistributedFileSystem) {
+ DistributedFileSystem dfs = (DistributedFileSystem) this.fs;
+ HdfsFileStatus status = dfs.getClient().getFileInfo(path.toUri().getPath());
+ if (null != status) {
+ if (unspecifiedStoragePolicyId < 0) {
+ // Get the unspecified id field through reflection to avoid compilation error.
+ // In later version BlockStoragePolicySuite#ID_UNSPECIFIED is moved to
+ // HdfsConstants#BLOCK_STORAGE_POLICY_ID_UNSPECIFIED
+ Field idUnspecified = BlockStoragePolicySuite.class.getField("ID_UNSPECIFIED");
+ unspecifiedStoragePolicyId = idUnspecified.getByte(BlockStoragePolicySuite.class);
+ }
+ byte storagePolicyId = status.getStoragePolicy();
+ if (storagePolicyId != unspecifiedStoragePolicyId) {
+ BlockStoragePolicy[] policies = dfs.getStoragePolicies();
+ for (BlockStoragePolicy policy : policies) {
+ if (policy.getId() == storagePolicyId) {
+ return policy.getName();
+ }
+ }
+ }
+ }
+ }
+ } catch (Throwable e) {
+ LOG.warn("failed to get block storage policy of [" + path + "]", e);
+ }
+
+ return null;
+ }
}
http://git-wip-us.apache.org/repos/asf/hbase/blob/f35bcd2f/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/HFileOutputFormat2.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/HFileOutputFormat2.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/HFileOutputFormat2.java
index 492b5d1..31e7e5a 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/HFileOutputFormat2.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/HFileOutputFormat2.java
@@ -68,6 +68,7 @@ import org.apache.hadoop.hbase.regionserver.BloomType;
import org.apache.hadoop.hbase.regionserver.HStore;
import org.apache.hadoop.hbase.regionserver.StoreFile;
import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.hadoop.hbase.util.FSUtils;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.SequenceFile;
import org.apache.hadoop.io.Text;
@@ -124,6 +125,9 @@ public class HFileOutputFormat2
private static final String OUTPUT_TABLE_NAME_CONF_KEY =
"hbase.mapreduce.hfileoutputformat.table.name";
+ public static final String STORAGE_POLICY_PROPERTY = HStore.BLOCK_STORAGE_POLICY_KEY;
+ public static final String STORAGE_POLICY_PROPERTY_CF_PREFIX = STORAGE_POLICY_PROPERTY + ".";
+
@Override
public RecordWriter<ImmutableBytesWritable, Cell> getRecordWriter(
final TaskAttemptContext context) throws IOException, InterruptedException {
@@ -190,7 +194,9 @@ public class HFileOutputFormat2
// If this is a new column family, verify that the directory exists
if (wl == null) {
- fs.mkdirs(new Path(outputdir, Bytes.toString(family)));
+ Path cfPath = new Path(outputdir, Bytes.toString(family));
+ fs.mkdirs(cfPath);
+ configureStoragePolicy(conf, fs, family, cfPath);
}
// If any of the HFiles for the column families has reached
@@ -351,6 +357,21 @@ public class HFileOutputFormat2
};
}
+ /**
+ * Configure block storage policy for CF after the directory is created.
+ */
+ static void configureStoragePolicy(final Configuration conf, final FileSystem fs,
+ byte[] family, Path cfPath) {
+ if (null == conf || null == fs || null == family || null == cfPath) {
+ return;
+ }
+ String policy =
+ conf.get(STORAGE_POLICY_PROPERTY_CF_PREFIX + Bytes.toString(family),
+ conf.get(STORAGE_POLICY_PROPERTY));
+
+ FSUtils.setStoragePolicy(fs, cfPath, policy);
+ }
+
/*
* Data structure to hold a Writer and amount of data written on it.
*/
http://git-wip-us.apache.org/repos/asf/hbase/blob/f35bcd2f/hbase-server/src/main/java/org/apache/hadoop/hbase/master/HMaster.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/HMaster.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/HMaster.java
index d7fc2e5..f8bbc65 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/HMaster.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/HMaster.java
@@ -48,6 +48,7 @@ import javax.servlet.http.HttpServletResponse;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.ClusterStatus;
import org.apache.hadoop.hbase.CoordinatedStateException;
@@ -1304,7 +1305,19 @@ public class HMaster extends HRegionServer implements MasterServices, Server {
final Path walDir = new Path(FSUtils.getWALRootDir(this.conf),
MasterProcedureConstants.MASTER_PROCEDURE_LOGDIR);
- procedureStore = new WALProcedureStore(conf, walDir.getFileSystem(conf), walDir,
+ final FileSystem walFs = walDir.getFileSystem(conf);
+
+ // Create the log directory for the procedure store
+ if (!walFs.exists(walDir)) {
+ if (!walFs.mkdirs(walDir)) {
+ throw new IOException("Unable to mkdir " + walDir);
+ }
+ }
+ // Now that it exists, set the log policy
+ FSUtils.setStoragePolicy(walFs, conf, walDir, HConstants.WAL_STORAGE_POLICY,
+ HConstants.DEFAULT_WAL_STORAGE_POLICY);
+
+ procedureStore = new WALProcedureStore(conf, walFs, walDir,
new MasterProcedureEnv.WALStoreLeaseRecovery(this));
procedureStore.registerListener(new MasterProcedureEnv.MasterProcedureStoreListener(this));
procedureExecutor = new ProcedureExecutor(conf, procEnv, procedureStore,
http://git-wip-us.apache.org/repos/asf/hbase/blob/f35bcd2f/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionFileSystem.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionFileSystem.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionFileSystem.java
index 884485c..7c36511 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionFileSystem.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionFileSystem.java
@@ -179,6 +179,36 @@ public class HRegionFileSystem {
}
/**
+ * Set storage policy for a given column family.
+ * <p>
+ * If we're running on a version of HDFS that doesn't support the given storage policy
+ * (or storage policies at all), then we'll issue a log message and continue.
+ * See http://hadoop.apache.org/docs/r2.6.0/hadoop-project-dist/hadoop-hdfs/ArchivalStorage.html
+ * for possible list e.g 'COLD', 'WARM', 'HOT', 'ONE_SSD', 'ALL_SSD', 'LAZY_PERSIST'.
+ *
+ * @param familyName The name of column family.
+ * @param policyName The name of the storage policy
+ */
+ public void setStoragePolicy(String familyName, String policyName) {
+ FSUtils.setStoragePolicy(this.fs, getStoreDir(familyName), policyName);
+ }
+
+ /**
+ * Get the storage policy of the directory of CF.
+ * @param familyName The name of column family.
+ * @return Storage policy name, or {@code null} if not using {@link HFileSystem} or exception
+ * thrown when trying to get policy
+ */
+ public String getStoragePolicyName(String familyName) {
+ if (this.fs instanceof HFileSystem) {
+ Path storeDir = getStoreDir(familyName);
+ return ((HFileSystem) this.fs).getStoragePolicyName(storeDir);
+ }
+
+ return null;
+ }
+
+ /**
* Returns the store files available for the family.
* This methods performs the filtering based on the valid store files.
* @param familyName Column Family Name
http://git-wip-us.apache.org/repos/asf/hbase/blob/f35bcd2f/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HStore.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HStore.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HStore.java
index 730deea..e0f4042 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HStore.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HStore.java
@@ -124,6 +124,9 @@ public class HStore implements Store {
public static final String COMPACTCHECKER_INTERVAL_MULTIPLIER_KEY =
"hbase.server.compactchecker.interval.multiplier";
public static final String BLOCKING_STOREFILES_KEY = "hbase.hstore.blockingStoreFiles";
+ public static final String BLOCK_STORAGE_POLICY_KEY = "hbase.hstore.block.storage.policy";
+ // keep in accordance with HDFS default storage policy
+ public static final String DEFAULT_BLOCK_STORAGE_POLICY = "HOT";
public static final int DEFAULT_COMPACTCHECKER_INTERVAL_MULTIPLIER = 1000;
public static final int DEFAULT_BLOCKING_STOREFILE_COUNT = 7;
@@ -237,6 +240,13 @@ public class HStore implements Store {
.addWritableMap(family.getValues());
this.blocksize = family.getBlocksize();
+ // set block storage policy for store directory
+ String policyName = family.getStoragePolicy();
+ if (null == policyName) {
+ policyName = this.conf.get(BLOCK_STORAGE_POLICY_KEY, DEFAULT_BLOCK_STORAGE_POLICY);
+ }
+ this.fs.setStoragePolicy(family.getNameAsString(), policyName.trim());
+
this.dataBlockEncoder =
new HFileDataBlockEncoderImpl(family.getDataBlockEncoding());
@@ -1078,11 +1088,12 @@ public class HStore implements Store {
favoredNodes = region.getRegionServerServices().getFavoredNodesForRegion(
region.getRegionInfo().getEncodedName());
}
+ Path familyTempDir = new Path(fs.getTempDir(), family.getNameAsString());
HFileContext hFileContext = createFileContext(compression, includeMVCCReadpoint, includesTag,
cryptoContext);
StoreFile.WriterBuilder builder = new StoreFile.WriterBuilder(conf, writerCacheConf,
this.getFileSystem())
- .withFilePath(fs.createTempName())
+ .withOutputDir(familyTempDir)
.withComparator(comparator)
.withBloomType(family.getBloomFilterType())
.withMaxKeyCount(maxKeyCount)
http://git-wip-us.apache.org/repos/asf/hbase/blob/f35bcd2f/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreFile.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreFile.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreFile.java
index 803bfb3..1e74911 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreFile.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreFile.java
@@ -40,6 +40,7 @@ import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.Cell;
import org.apache.hadoop.hbase.CellUtil;
import org.apache.hadoop.hbase.HBaseInterfaceAudience;
+import org.apache.hadoop.hbase.HColumnDescriptor;
import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.HDFSBlocksDistribution;
import org.apache.hadoop.hbase.KeyValue;
@@ -60,6 +61,7 @@ import org.apache.hadoop.hbase.util.BloomFilter;
import org.apache.hadoop.hbase.util.BloomFilterFactory;
import org.apache.hadoop.hbase.util.BloomFilterWriter;
import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.hadoop.hbase.util.FSUtils;
import org.apache.hadoop.io.WritableUtils;
import com.google.common.annotations.VisibleForTesting;
@@ -726,6 +728,14 @@ public class StoreFile {
HRegionFileSystem.mkdirs(fs, conf, dir);
}
+ // set block storage policy for temp path
+ String policyName = this.conf.get(HColumnDescriptor.STORAGE_POLICY);
+ if (null == policyName) {
+ policyName = this.conf.get(HStore.BLOCK_STORAGE_POLICY_KEY,
+ HStore.DEFAULT_BLOCK_STORAGE_POLICY);
+ }
+ FSUtils.setStoragePolicy(this.fs, dir, policyName);
+
if (filePath == null) {
filePath = getUniqueFile(fs, dir);
if (!BloomFilterFactory.isGeneralBloomEnabled(conf)) {
http://git-wip-us.apache.org/repos/asf/hbase/blob/f35bcd2f/hbase-server/src/main/java/org/apache/hadoop/hbase/util/FSUtils.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/util/FSUtils.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/util/FSUtils.java
index 5d850b0..f3e2c63 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/util/FSUtils.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/util/FSUtils.java
@@ -141,59 +141,122 @@ public abstract class FSUtils {
public static void setStoragePolicy(final FileSystem fs, final Configuration conf,
final Path path, final String policyKey, final String defaultPolicy) {
String storagePolicy = conf.get(policyKey, defaultPolicy).toUpperCase(Locale.ROOT);
- if (storagePolicy.equals(defaultPolicy)) {
+ setStoragePolicy(fs, path, storagePolicy);
+ }
+
+ private static final Map<FileSystem, Boolean> warningMap =
+ new ConcurrentHashMap<FileSystem, Boolean>();
+
+ /**
+ * Sets storage policy for given path.
+ * <p>
+ * If the passed path is a directory, we'll set the storage policy for all files
+ * created in the future in said directory. Note that this change in storage
+ * policy takes place at the HDFS level; it will persist beyond this RS's lifecycle.
+ * If we're running on a version of HDFS that doesn't support the given storage policy
+ * (or storage policies at all), then we'll issue a log message and continue.
+ * See http://hadoop.apache.org/docs/r2.6.0/hadoop-project-dist/hadoop-hdfs/ArchivalStorage.html
+ * for possible list e.g 'COLD', 'WARM', 'HOT', 'ONE_SSD', 'ALL_SSD', 'LAZY_PERSIST'.
+ *
+ * @param fs We only do anything if an instance of DistributedFileSystem
+ * @param path the Path whose storage policy is to be set
+ * @param storagePolicy Policy to set on <code>path</code>
+ */
+ public static void setStoragePolicy(final FileSystem fs, final Path path,
+ final String storagePolicy) {
+ if (storagePolicy == null) {
if (LOG.isTraceEnabled()) {
- LOG.trace("default policy of " + defaultPolicy + " requested, exiting early.");
+ LOG.trace("We were passed a null storagePolicy, exiting early.");
}
return;
}
- if (fs instanceof DistributedFileSystem) {
- DistributedFileSystem dfs = (DistributedFileSystem)fs;
- // Once our minimum supported Hadoop version is 2.6.0 we can remove reflection.
- Class<? extends DistributedFileSystem> dfsClass = dfs.getClass();
- Method m = null;
- try {
- m = dfsClass.getDeclaredMethod("setStoragePolicy",
- new Class<?>[] { Path.class, String.class });
- m.setAccessible(true);
- } catch (NoSuchMethodException e) {
- LOG.info("FileSystem doesn't support"
- + " setStoragePolicy; --HDFS-6584 not available");
- } catch (SecurityException e) {
- LOG.info("Doesn't have access to setStoragePolicy on "
- + "FileSystems --HDFS-6584 not available", e);
- m = null; // could happen on setAccessible()
+ final String trimmedStoragePolicy = storagePolicy.trim();
+ if (trimmedStoragePolicy.isEmpty()) {
+ if (LOG.isTraceEnabled()) {
+ LOG.trace("We were passed an empty storagePolicy, exiting early.");
}
- if (m != null) {
- try {
- m.invoke(dfs, path, storagePolicy);
- LOG.info("set " + storagePolicy + " for " + path);
- } catch (Exception e) {
- // check for lack of HDFS-7228
- boolean probablyBadPolicy = false;
- if (e instanceof InvocationTargetException) {
- final Throwable exception = e.getCause();
- if (exception instanceof RemoteException &&
- HadoopIllegalArgumentException.class.getName().equals(
- ((RemoteException)exception).getClassName())) {
- LOG.warn("Given storage policy, '" + storagePolicy + "', was rejected and probably " +
- "isn't a valid policy for the version of Hadoop you're running. I.e. if you're " +
- "trying to use SSD related policies then you're likely missing HDFS-7228. For " +
- "more information see the 'ArchivalStorage' docs for your Hadoop release.");
- LOG.debug("More information about the invalid storage policy.", exception);
- probablyBadPolicy = true;
+ return;
+ }
+ boolean distributed = false;
+ try {
+ distributed = isDistributedFileSystem(fs);
+ } catch (IOException ioe) {
+ if (!warningMap.containsKey(fs)) {
+ warningMap.put(fs, true);
+ LOG.warn("FileSystem isn't an instance of DistributedFileSystem; presuming it doesn't "
+ + "support setStoragePolicy. Unable to set storagePolicy=" + trimmedStoragePolicy
+ + " on path=" + path);
+ } else if (LOG.isDebugEnabled()) {
+ LOG.debug("FileSystem isn't an instance of DistributedFileSystem; presuming it doesn't "
+ + "support setStoragePolicy. Unable to set storagePolicy=" + trimmedStoragePolicy
+ + " on path=" + path);
+ }
+ return;
+ }
+ if (distributed) {
+ invokeSetStoragePolicy(fs, path, trimmedStoragePolicy);
+ }
+ }
+
+ /*
+ * All args have been checked and are good. Run the setStoragePolicy invocation.
+ */
+ private static void invokeSetStoragePolicy(final FileSystem fs, final Path path,
+ final String storagePolicy) {
+ Method m = null;
+ try {
+ m = fs.getClass().getDeclaredMethod("setStoragePolicy",
+ new Class<?>[] { Path.class, String.class });
+ m.setAccessible(true);
+ } catch (NoSuchMethodException e) {
+ final String msg = "FileSystem doesn't support setStoragePolicy; HDFS-6584 not available";
+ if (!warningMap.containsKey(fs)) {
+ warningMap.put(fs, true);
+ LOG.warn(msg, e);
+ } else if (LOG.isDebugEnabled()) {
+ LOG.debug(msg, e);
+ }
+ m = null;
+ } catch (SecurityException e) {
+ final String msg = "No access to setStoragePolicy on FileSystem; HDFS-6584 not available";
+ if (!warningMap.containsKey(fs)) {
+ warningMap.put(fs, true);
+ LOG.warn(msg, e);
+ } else if (LOG.isDebugEnabled()) {
+ LOG.debug(msg, e);
+ }
+ m = null; // could happen on setAccessible()
+ }
+ if (m != null) {
+ try {
+ m.invoke(fs, path, storagePolicy);
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Set storagePolicy=" + storagePolicy + " for path=" + path);
+ }
+ } catch (Exception e) {
+ // This swallows FNFE, should we be throwing it? seems more likely to indicate dev
+ // misuse than a runtime problem with HDFS.
+ if (!warningMap.containsKey(fs)) {
+ warningMap.put(fs, true);
+ LOG.warn("Unable to set storagePolicy=" + storagePolicy + " for path=" + path, e);
+ } else if (LOG.isDebugEnabled()) {
+ LOG.debug("Unable to set storagePolicy=" + storagePolicy + " for path=" + path, e);
+ }
+ // check for lack of HDFS-7228
+ if (e instanceof InvocationTargetException) {
+ final Throwable exception = e.getCause();
+ if (exception instanceof RemoteException &&
+ HadoopIllegalArgumentException.class.getName().equals(
+ ((RemoteException)exception).getClassName())) {
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Given storage policy, '" +storagePolicy +"', was rejected and probably " +
+ "isn't a valid policy for the version of Hadoop you're running. I.e. if you're " +
+ "trying to use SSD related policies then you're likely missing HDFS-7228. For " +
+ "more information see the 'ArchivalStorage' docs for your Hadoop release.");
}
}
- if (!probablyBadPolicy) {
- // This swallows FNFE, should we be throwing it? seems more likely to indicate dev
- // misuse than a runtime problem with HDFS.
- LOG.warn("Unable to set " + storagePolicy + " for " + path, e);
- }
}
}
- } else {
- LOG.info("FileSystem isn't an instance of DistributedFileSystem; presuming it doesn't " +
- "support setStoragePolicy.");
}
}
@@ -210,7 +273,7 @@ public abstract class FSUtils {
}
return fileSystem instanceof DistributedFileSystem;
}
-
+
/**
* Compare of path component. Does not consider schema; i.e. if schemas
* different but <code>path</code> starts with <code>rootPath</code>,
http://git-wip-us.apache.org/repos/asf/hbase/blob/f35bcd2f/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestCompaction.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestCompaction.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestCompaction.java
index b1f94fb..b9d1dd3 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestCompaction.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestCompaction.java
@@ -186,10 +186,13 @@ public class TestCompaction {
Store s = r.stores.get(COLUMN_FAMILY);
assertEquals(compactionThreshold, s.getStorefilesCount());
assertTrue(s.getStorefilesSize() > 15*1000);
- // and no new store files persisted past compactStores()
+ // only one empty dir exists in temp dir
FileStatus[] ls = r.getFilesystem().listStatus(r.getRegionFileSystem().getTempDir());
+ assertEquals(1, ls.length);
+ Path storeTempDir = new Path(r.getRegionFileSystem().getTempDir(), Bytes.toString(COLUMN_FAMILY));
+ assertTrue(r.getFilesystem().exists(storeTempDir));
+ ls = r.getFilesystem().listStatus(storeTempDir);
assertEquals(0, ls.length);
-
} finally {
// don't mess up future tests
r.writestate.writesEnabled = true;
http://git-wip-us.apache.org/repos/asf/hbase/blob/f35bcd2f/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestHRegion.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestHRegion.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestHRegion.java
index 09d0b4f..e93fcf0 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestHRegion.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestHRegion.java
@@ -912,7 +912,7 @@ public class TestHRegion {
assertEquals(3, region.getStore(family).getStorefilesCount());
// now find the compacted file, and manually add it to the recovered edits
- Path tmpDir = region.getRegionFileSystem().getTempDir();
+ Path tmpDir = new Path(region.getRegionFileSystem().getTempDir(), Bytes.toString(family));
FileStatus[] files = FSUtils.listStatus(fs, tmpDir);
String errorMsg = "Expected to find 1 file in the region temp directory "
+ "from the compaction, could not find any";
http://git-wip-us.apache.org/repos/asf/hbase/blob/f35bcd2f/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestHRegionFileSystem.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestHRegionFileSystem.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestHRegionFileSystem.java
index dfb20da..dfef63f 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestHRegionFileSystem.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestHRegionFileSystem.java
@@ -21,11 +21,14 @@ package org.apache.hadoop.hbase.regionserver;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertNull;
import static org.junit.Assert.assertTrue;
import java.io.IOException;
import java.net.URI;
import java.util.Collection;
+import java.util.List;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
@@ -37,9 +40,16 @@ import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.permission.FsPermission;
import org.apache.hadoop.hbase.TableName;
+import org.apache.hadoop.hbase.client.Admin;
+import org.apache.hadoop.hbase.client.HTable;
+import org.apache.hadoop.hbase.client.Put;
+import org.apache.hadoop.hbase.fs.HFileSystem;
import org.apache.hadoop.hbase.HRegionInfo;
+import org.apache.hadoop.hbase.PerformanceEvaluation;
import org.apache.hadoop.hbase.HBaseTestingUtility;
+import org.apache.hadoop.hbase.HColumnDescriptor;
import org.apache.hadoop.hbase.testclassification.SmallTests;
+import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hbase.util.FSUtils;
import org.apache.hadoop.util.Progressable;
@@ -50,6 +60,126 @@ import org.junit.experimental.categories.Category;
public class TestHRegionFileSystem {
private static HBaseTestingUtility TEST_UTIL = new HBaseTestingUtility();
private static final Log LOG = LogFactory.getLog(TestHRegionFileSystem.class);
+ private static final byte[][] FAMILIES = {
+ Bytes.add(PerformanceEvaluation.FAMILY_NAME, Bytes.toBytes("-A")),
+ Bytes.add(PerformanceEvaluation.FAMILY_NAME, Bytes.toBytes("-B")) };
+ private static final TableName TABLE_NAME = TableName.valueOf("TestTable");
+
+ @Test
+ public void testBlockStoragePolicy() throws Exception {
+ TEST_UTIL = new HBaseTestingUtility();
+ Configuration conf = TEST_UTIL.getConfiguration();
+ TEST_UTIL.startMiniCluster();
+ HTable table = (HTable) TEST_UTIL.createTable(TABLE_NAME, FAMILIES);
+ assertEquals("Should start with empty table", 0, TEST_UTIL.countRows(table));
+ HRegionFileSystem regionFs = getHRegionFS(table, conf);
+ // the original block storage policy would be HOT
+ String spA = regionFs.getStoragePolicyName(Bytes.toString(FAMILIES[0]));
+ String spB = regionFs.getStoragePolicyName(Bytes.toString(FAMILIES[1]));
+ LOG.debug("Storage policy of cf 0: [" + spA + "].");
+ LOG.debug("Storage policy of cf 1: [" + spB + "].");
+ assertEquals("HOT", spA);
+ assertEquals("HOT", spB);
+
+ // Recreate table and make sure storage policy could be set through configuration
+ TEST_UTIL.shutdownMiniCluster();
+ TEST_UTIL.getConfiguration().set(HStore.BLOCK_STORAGE_POLICY_KEY, "WARM");
+ TEST_UTIL.startMiniCluster();
+ table = (HTable) TEST_UTIL.createTable(TABLE_NAME, FAMILIES);
+ regionFs = getHRegionFS(table, conf);
+
+ try (Admin admin = TEST_UTIL.getConnection().getAdmin()) {
+ spA = regionFs.getStoragePolicyName(Bytes.toString(FAMILIES[0]));
+ spB = regionFs.getStoragePolicyName(Bytes.toString(FAMILIES[1]));
+ LOG.debug("Storage policy of cf 0: [" + spA + "].");
+ LOG.debug("Storage policy of cf 1: [" + spB + "].");
+ assertEquals("WARM", spA);
+ assertEquals("WARM", spB);
+
+ // alter table cf schema to change storage policies
+ // and make sure it could override settings in conf
+ HColumnDescriptor hcdA = new HColumnDescriptor(Bytes.toString(FAMILIES[0]));
+ // alter through setting HStore#BLOCK_STORAGE_POLICY_KEY in HColumnDescriptor
+ hcdA.setValue(HStore.BLOCK_STORAGE_POLICY_KEY, "ONE_SSD");
+ admin.modifyColumn(TABLE_NAME, hcdA);
+ while (TEST_UTIL.getMiniHBaseCluster().getMaster().getAssignmentManager().getRegionStates()
+ .isRegionsInTransition()) {
+ Thread.sleep(200);
+ LOG.debug("Waiting on table to finish schema altering");
+ }
+ // alter through HColumnDescriptor#setStoragePolicy
+ HColumnDescriptor hcdB = new HColumnDescriptor(Bytes.toString(FAMILIES[1]));
+ hcdB.setStoragePolicy("ALL_SSD");
+ admin.modifyColumn(TABLE_NAME, hcdB);
+ while (TEST_UTIL.getMiniHBaseCluster().getMaster().getAssignmentManager().getRegionStates()
+ .isRegionsInTransition()) {
+ Thread.sleep(200);
+ LOG.debug("Waiting on table to finish schema altering");
+ }
+ spA = regionFs.getStoragePolicyName(Bytes.toString(FAMILIES[0]));
+ spB = regionFs.getStoragePolicyName(Bytes.toString(FAMILIES[1]));
+ LOG.debug("Storage policy of cf 0: [" + spA + "].");
+ LOG.debug("Storage policy of cf 1: [" + spB + "].");
+ assertNotNull(spA);
+ assertEquals("ONE_SSD", spA);
+ assertNotNull(spB);
+ assertEquals("ALL_SSD", spB);
+
+ // flush memstore snapshot into 3 files
+ for (long i = 0; i < 3; i++) {
+ Put put = new Put(Bytes.toBytes(i));
+ put.addColumn(FAMILIES[0], Bytes.toBytes(i), Bytes.toBytes(i));
+ table.put(put);
+ admin.flush(TABLE_NAME);
+ }
+ // there should be 3 files in store dir
+ FileSystem fs = TEST_UTIL.getDFSCluster().getFileSystem();
+ Path storePath = regionFs.getStoreDir(Bytes.toString(FAMILIES[0]));
+ FileStatus[] storeFiles = FSUtils.listStatus(fs, storePath);
+ assertNotNull(storeFiles);
+ assertEquals(3, storeFiles.length);
+ // store temp dir still exists but empty
+ Path storeTempDir = new Path(regionFs.getTempDir(), Bytes.toString(FAMILIES[0]));
+ assertTrue(fs.exists(storeTempDir));
+ FileStatus[] tempFiles = FSUtils.listStatus(fs, storeTempDir);
+ assertNull(tempFiles);
+ // storage policy of cf temp dir and 3 store files should be ONE_SSD
+ assertEquals("ONE_SSD",
+ ((HFileSystem) regionFs.getFileSystem()).getStoragePolicyName(storeTempDir));
+ for (FileStatus status : storeFiles) {
+ assertEquals("ONE_SSD",
+ ((HFileSystem) regionFs.getFileSystem()).getStoragePolicyName(status.getPath()));
+ }
+
+ // change storage policies by calling raw api directly
+ regionFs.setStoragePolicy(Bytes.toString(FAMILIES[0]), "ALL_SSD");
+ regionFs.setStoragePolicy(Bytes.toString(FAMILIES[1]), "ONE_SSD");
+ spA = regionFs.getStoragePolicyName(Bytes.toString(FAMILIES[0]));
+ spB = regionFs.getStoragePolicyName(Bytes.toString(FAMILIES[1]));
+ LOG.debug("Storage policy of cf 0: [" + spA + "].");
+ LOG.debug("Storage policy of cf 1: [" + spB + "].");
+ assertNotNull(spA);
+ assertEquals("ALL_SSD", spA);
+ assertNotNull(spB);
+ assertEquals("ONE_SSD", spB);
+ } finally {
+ table.close();
+ TEST_UTIL.deleteTable(TABLE_NAME);
+ TEST_UTIL.shutdownMiniCluster();
+ }
+ }
+
+ private HRegionFileSystem getHRegionFS(HTable table, Configuration conf) throws IOException {
+ FileSystem fs = TEST_UTIL.getDFSCluster().getFileSystem();
+ Path tableDir = FSUtils.getTableDir(TEST_UTIL.getDefaultRootDirPath(), table.getName());
+ List<Path> regionDirs = FSUtils.getRegionDirs(fs, tableDir);
+ assertEquals(1, regionDirs.size());
+ List<Path> familyDirs = FSUtils.getFamilyDirs(fs, regionDirs.get(0));
+ assertEquals(2, familyDirs.size());
+ HRegionInfo hri = table.getRegionLocator().getAllRegionLocations().get(0).getRegionInfo();
+ HRegionFileSystem regionFs = new HRegionFileSystem(conf, new HFileSystem(fs), tableDir, hri);
+ return regionFs;
+ }
@Test
public void testOnDiskRegionCreation() throws IOException {
http://git-wip-us.apache.org/repos/asf/hbase/blob/f35bcd2f/hbase-shell/src/main/ruby/hbase/admin.rb
----------------------------------------------------------------------
diff --git a/hbase-shell/src/main/ruby/hbase/admin.rb b/hbase-shell/src/main/ruby/hbase/admin.rb
index 60ae155..2dd69f8 100644
--- a/hbase-shell/src/main/ruby/hbase/admin.rb
+++ b/hbase-shell/src/main/ruby/hbase/admin.rb
@@ -859,6 +859,10 @@ module Hbase
algorithm))
end
end
+ if arg.include?(org.apache.hadoop.hbase.HColumnDescriptor::STORAGE_POLICY)
+ storage_policy = arg.delete(org.apache.hadoop.hbase.HColumnDescriptor::STORAGE_POLICY).upcase
+ family.setStoragePolicy(storage_policy)
+ end
set_user_metadata(family, arg.delete(METADATA)) if arg[METADATA]
set_descriptor_config(family, arg.delete(CONFIGURATION)) if arg[CONFIGURATION]