You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hive.apache.org by da...@apache.org on 2015/09/22 07:03:35 UTC
[04/50] [abbrv] hive git commit: HIVE-11300 HBase metastore: Support
token and master key methods (gates)
http://git-wip-us.apache.org/repos/asf/hive/blob/a310524c/metastore/src/java/org/apache/hadoop/hive/metastore/hbase/HBaseReadWrite.java
----------------------------------------------------------------------
diff --git a/metastore/src/java/org/apache/hadoop/hive/metastore/hbase/HBaseReadWrite.java b/metastore/src/java/org/apache/hadoop/hive/metastore/hbase/HBaseReadWrite.java
index 332e30a..ae73feb 100644
--- a/metastore/src/java/org/apache/hadoop/hive/metastore/hbase/HBaseReadWrite.java
+++ b/metastore/src/java/org/apache/hadoop/hive/metastore/hbase/HBaseReadWrite.java
@@ -80,6 +80,8 @@ class HBaseReadWrite {
@VisibleForTesting final static String PART_TABLE = "HBMS_PARTITIONS";
@VisibleForTesting final static String ROLE_TABLE = "HBMS_ROLES";
@VisibleForTesting final static String SD_TABLE = "HBMS_SDS";
+ @VisibleForTesting final static String SECURITY_TABLE = "HBMS_SECURITY";
+ @VisibleForTesting final static String SEQUENCES_TABLE = "HBMS_SEQUENCES";
@VisibleForTesting final static String TABLE_TABLE = "HBMS_TBLS";
@VisibleForTesting final static String USER_TO_ROLE_TABLE = "HBMS_USER_TO_ROLE";
@VisibleForTesting final static byte[] CATALOG_CF = "c".getBytes(HBaseUtils.ENCODING);
@@ -90,7 +92,7 @@ class HBaseReadWrite {
*/
final static String[] tableNames = { AGGR_STATS_TABLE, DB_TABLE, FUNC_TABLE, GLOBAL_PRIVS_TABLE,
PART_TABLE, USER_TO_ROLE_TABLE, ROLE_TABLE, SD_TABLE,
- TABLE_TABLE };
+ SECURITY_TABLE, SEQUENCES_TABLE, TABLE_TABLE};
final static Map<String, List<byte[]>> columnFamilies =
new HashMap<String, List<byte[]>> (tableNames.length);
@@ -103,6 +105,8 @@ class HBaseReadWrite {
columnFamilies.put(USER_TO_ROLE_TABLE, Arrays.asList(CATALOG_CF));
columnFamilies.put(ROLE_TABLE, Arrays.asList(CATALOG_CF));
columnFamilies.put(SD_TABLE, Arrays.asList(CATALOG_CF));
+ columnFamilies.put(SECURITY_TABLE, Arrays.asList(CATALOG_CF));
+ columnFamilies.put(SEQUENCES_TABLE, Arrays.asList(CATALOG_CF));
columnFamilies.put(TABLE_TABLE, Arrays.asList(CATALOG_CF, STATS_CF));
}
@@ -110,12 +114,16 @@ class HBaseReadWrite {
* Stores the bloom filter for the aggregated stats, to determine what partitions are in this
* aggregate.
*/
+ final static byte[] MASTER_KEY_SEQUENCE = "mk".getBytes(HBaseUtils.ENCODING);
final static byte[] AGGR_STATS_BLOOM_COL = "b".getBytes(HBaseUtils.ENCODING);
private final static byte[] CATALOG_COL = "c".getBytes(HBaseUtils.ENCODING);
private final static byte[] ROLES_COL = "roles".getBytes(HBaseUtils.ENCODING);
private final static byte[] REF_COUNT_COL = "ref".getBytes(HBaseUtils.ENCODING);
+ private final static byte[] DELEGATION_TOKEN_COL = "dt".getBytes(HBaseUtils.ENCODING);
+ private final static byte[] MASTER_KEY_COL = "mk".getBytes(HBaseUtils.ENCODING);
private final static byte[] AGGR_STATS_STATS_COL = "s".getBytes(HBaseUtils.ENCODING);
private final static byte[] GLOBAL_PRIVS_KEY = "gp".getBytes(HBaseUtils.ENCODING);
+ private final static byte[] SEQUENCES_KEY = "seq".getBytes(HBaseUtils.ENCODING);
private final static int TABLES_TO_CACHE = 10;
// False positives are very bad here because they cause us to invalidate entries we shouldn't.
// Space used and # of hash functions grows in proportion to ln of num bits so a 10x increase
@@ -226,7 +234,7 @@ class HBaseReadWrite {
sdHits = new Counter("storage descriptor cache hits");
sdMisses = new Counter("storage descriptor cache misses");
sdOverflows = new Counter("storage descriptor cache overflows");
- counters = new ArrayList<Counter>();
+ counters = new ArrayList<>();
counters.add(tableHits);
counters.add(tableMisses);
counters.add(tableOverflows);
@@ -241,18 +249,16 @@ class HBaseReadWrite {
// (storage descriptors are shared, so 99% should be the same for a given table)
int sdsCacheSize = totalCatalogObjectsToCache / 100;
if (conf.getBoolean(NO_CACHE_CONF, false)) {
- tableCache = new BogusObjectCache<ObjectPair<String, String>, Table>();
- sdCache = new BogusObjectCache<ByteArrayWrapper, StorageDescriptor>();
+ tableCache = new BogusObjectCache<>();
+ sdCache = new BogusObjectCache<>();
partCache = new BogusPartitionCache();
} else {
- tableCache = new ObjectCache<ObjectPair<String, String>, Table>(TABLES_TO_CACHE, tableHits,
- tableMisses, tableOverflows);
- sdCache = new ObjectCache<ByteArrayWrapper, StorageDescriptor>(sdsCacheSize, sdHits,
- sdMisses, sdOverflows);
+ tableCache = new ObjectCache<>(TABLES_TO_CACHE, tableHits, tableMisses, tableOverflows);
+ sdCache = new ObjectCache<>(sdsCacheSize, sdHits, sdMisses, sdOverflows);
partCache = new PartitionCache(totalCatalogObjectsToCache, partHits, partMisses, partOverflows);
}
statsCache = StatsCache.getInstance(conf);
- roleCache = new HashMap<String, HbaseMetastoreProto.RoleGrantInfoList>();
+ roleCache = new HashMap<>();
entireRoleTableInCache = false;
}
@@ -338,7 +344,7 @@ class HBaseReadWrite {
}
Iterator<Result> iter =
scan(DB_TABLE, CATALOG_CF, CATALOG_COL, filter);
- List<Database> databases = new ArrayList<Database>();
+ List<Database> databases = new ArrayList<>();
while (iter.hasNext()) {
Result result = iter.next();
databases.add(HBaseUtils.deserializeDatabase(result.getRow(),
@@ -404,7 +410,7 @@ class HBaseReadWrite {
}
Iterator<Result> iter =
scan(FUNC_TABLE, keyPrefix, HBaseUtils.getEndPrefix(keyPrefix), CATALOG_CF, CATALOG_COL, filter);
- List<Function> functions = new ArrayList<Function>();
+ List<Function> functions = new ArrayList<>();
while (iter.hasNext()) {
Result result = iter.next();
functions.add(HBaseUtils.deserializeFunction(result.getRow(),
@@ -489,8 +495,8 @@ class HBaseReadWrite {
*/
List<Partition> getPartitions(String dbName, String tableName, List<List<String>> partValLists)
throws IOException {
- List<Partition> parts = new ArrayList<Partition>(partValLists.size());
- List<Get> gets = new ArrayList<Get>(partValLists.size());
+ List<Partition> parts = new ArrayList<>(partValLists.size());
+ List<Get> gets = new ArrayList<>(partValLists.size());
for (List<String> partVals : partValLists) {
byte[] key = HBaseUtils.buildPartitionKey(dbName, tableName, partVals);
Get get = new Get(key);
@@ -556,7 +562,7 @@ class HBaseReadWrite {
* @throws IOException
*/
void putPartitions(List<Partition> partitions) throws IOException {
- List<Put> puts = new ArrayList<Put>(partitions.size());
+ List<Put> puts = new ArrayList<>(partitions.size());
for (Partition partition : partitions) {
byte[] hash = putStorageDescriptor(partition.getSd());
byte[][] serialized = HBaseUtils.serializePartition(partition, hash);
@@ -615,8 +621,8 @@ class HBaseReadWrite {
Collection<Partition> cached = partCache.getAllForTable(dbName, tableName);
if (cached != null) {
return maxPartitions < cached.size()
- ? new ArrayList<Partition>(cached).subList(0, maxPartitions)
- : new ArrayList<Partition>(cached);
+ ? new ArrayList<>(cached).subList(0, maxPartitions)
+ : new ArrayList<>(cached);
}
byte[] keyPrefix = HBaseUtils.buildKeyWithTrailingSeparator(dbName, tableName);
List<Partition> parts = scanPartitionsWithFilter(keyPrefix, HBaseUtils.getEndPrefix(keyPrefix), -1, null);
@@ -645,7 +651,7 @@ class HBaseReadWrite {
List<Partition> scanPartitions(String dbName, String tableName, List<String> partVals,
int maxPartitions) throws IOException, NoSuchObjectException {
// First, build as much of the key as we can so that we make the scan as tight as possible.
- List<String> keyElements = new ArrayList<String>();
+ List<String> keyElements = new ArrayList<>();
keyElements.add(dbName);
keyElements.add(tableName);
@@ -712,7 +718,7 @@ class HBaseReadWrite {
List<Partition> scanPartitions(String dbName, String tableName, byte[] keyStart, byte[] keyEnd,
Filter filter, int maxPartitions) throws IOException, NoSuchObjectException {
- List<String> keyElements = new ArrayList<String>();
+ List<String> keyElements = new ArrayList<>();
keyElements.add(dbName);
keyElements.add(tableName);
@@ -780,7 +786,7 @@ class HBaseReadWrite {
throws IOException {
Iterator<Result> iter =
scan(PART_TABLE, startRow, endRow, CATALOG_CF, CATALOG_COL, filter);
- List<Partition> parts = new ArrayList<Partition>();
+ List<Partition> parts = new ArrayList<>();
int numToFetch = maxResults < 0 ? Integer.MAX_VALUE : maxResults;
for (int i = 0; i < numToFetch && iter.hasNext(); i++) {
Result result = iter.next();
@@ -821,7 +827,7 @@ class HBaseReadWrite {
throws IOException {
buildRoleCache();
- Set<String> rolesFound = new HashSet<String>();
+ Set<String> rolesFound = new HashSet<>();
for (Map.Entry<String, HbaseMetastoreProto.RoleGrantInfoList> e : roleCache.entrySet()) {
for (HbaseMetastoreProto.RoleGrantInfo giw : e.getValue().getGrantInfoList()) {
if (HBaseUtils.convertPrincipalTypes(giw.getPrincipalType()) == type &&
@@ -831,8 +837,8 @@ class HBaseReadWrite {
}
}
}
- List<Role> directRoles = new ArrayList<Role>(rolesFound.size());
- List<Get> gets = new ArrayList<Get>();
+ List<Role> directRoles = new ArrayList<>(rolesFound.size());
+ List<Get> gets = new ArrayList<>();
HTableInterface htab = conn.getHBaseTable(ROLE_TABLE);
for (String roleFound : rolesFound) {
byte[] key = HBaseUtils.buildKey(roleFound);
@@ -880,7 +886,7 @@ class HBaseReadWrite {
*/
Set<String> findAllUsersInRole(String roleName) throws IOException {
// Walk the userToRole table and collect every user that matches this role.
- Set<String> users = new HashSet<String>();
+ Set<String> users = new HashSet<>();
Iterator<Result> iter = scan(USER_TO_ROLE_TABLE, CATALOG_CF, CATALOG_COL);
while (iter.hasNext()) {
Result result = iter.next();
@@ -907,8 +913,7 @@ class HBaseReadWrite {
void addPrincipalToRole(String roleName, HbaseMetastoreProto.RoleGrantInfo grantInfo)
throws IOException, NoSuchObjectException {
HbaseMetastoreProto.RoleGrantInfoList proto = getRolePrincipals(roleName);
- List<HbaseMetastoreProto.RoleGrantInfo> rolePrincipals =
- new ArrayList<HbaseMetastoreProto.RoleGrantInfo>();
+ List<HbaseMetastoreProto.RoleGrantInfo> rolePrincipals = new ArrayList<>();
if (proto != null) {
rolePrincipals.addAll(proto.getGrantInfoList());
}
@@ -937,8 +942,7 @@ class HBaseReadWrite {
throws NoSuchObjectException, IOException {
HbaseMetastoreProto.RoleGrantInfoList proto = getRolePrincipals(roleName);
if (proto == null) return;
- List<HbaseMetastoreProto.RoleGrantInfo> rolePrincipals =
- new ArrayList<HbaseMetastoreProto.RoleGrantInfo>();
+ List<HbaseMetastoreProto.RoleGrantInfo> rolePrincipals = new ArrayList<>();
rolePrincipals.addAll(proto.getGrantInfoList());
for (int i = 0; i < rolePrincipals.size(); i++) {
@@ -976,8 +980,8 @@ class HBaseReadWrite {
LOG.debug("Building role map for " + userName);
// Second, find every role the user participates in directly.
- Set<String> rolesToAdd = new HashSet<String>();
- Set<String> rolesToCheckNext = new HashSet<String>();
+ Set<String> rolesToAdd = new HashSet<>();
+ Set<String> rolesToCheckNext = new HashSet<>();
for (Map.Entry<String, HbaseMetastoreProto.RoleGrantInfoList> e : roleCache.entrySet()) {
for (HbaseMetastoreProto.RoleGrantInfo grantInfo : e.getValue().getGrantInfoList()) {
if (HBaseUtils.convertPrincipalTypes(grantInfo.getPrincipalType()) == PrincipalType.USER &&
@@ -993,7 +997,7 @@ class HBaseReadWrite {
// Third, find every role the user participates in indirectly (that is, they have been
// granted into role X and role Y has been granted into role X).
while (rolesToCheckNext.size() > 0) {
- Set<String> tmpRolesToCheckNext = new HashSet<String>();
+ Set<String> tmpRolesToCheckNext = new HashSet<>();
for (String roleName : rolesToCheckNext) {
HbaseMetastoreProto.RoleGrantInfoList grantInfos = roleCache.get(roleName);
if (grantInfos == null) continue; // happens when a role contains no grants
@@ -1010,7 +1014,7 @@ class HBaseReadWrite {
}
byte[] key = HBaseUtils.buildKey(userName);
- byte[] serialized = HBaseUtils.serializeRoleList(new ArrayList<String>(rolesToAdd));
+ byte[] serialized = HBaseUtils.serializeRoleList(new ArrayList<>(rolesToAdd));
store(USER_TO_ROLE_TABLE, key, CATALOG_CF, CATALOG_COL, serialized);
}
@@ -1022,12 +1026,11 @@ class HBaseReadWrite {
void removeRoleGrants(String roleName) throws IOException {
buildRoleCache();
- List<Put> puts = new ArrayList<Put>();
+ List<Put> puts = new ArrayList<>();
// First, walk the role table and remove any references to this role
for (Map.Entry<String, HbaseMetastoreProto.RoleGrantInfoList> e : roleCache.entrySet()) {
boolean madeAChange = false;
- List<HbaseMetastoreProto.RoleGrantInfo> rgil =
- new ArrayList<HbaseMetastoreProto.RoleGrantInfo>();
+ List<HbaseMetastoreProto.RoleGrantInfo> rgil = new ArrayList<>();
rgil.addAll(e.getValue().getGrantInfoList());
for (int i = 0; i < rgil.size(); i++) {
if (HBaseUtils.convertPrincipalTypes(rgil.get(i).getPrincipalType()) == PrincipalType.ROLE &&
@@ -1066,7 +1069,7 @@ class HBaseReadWrite {
// Now, walk the db table
puts.clear();
List<Database> dbs = scanDatabases(null);
- if (dbs == null) dbs = new ArrayList<Database>(); // rare, but can happen
+ if (dbs == null) dbs = new ArrayList<>(); // rare, but can happen
for (Database db : dbs) {
if (db.getPrivileges() != null &&
db.getPrivileges().getRolePrivileges() != null &&
@@ -1130,7 +1133,7 @@ class HBaseReadWrite {
*/
List<Role> scanRoles() throws IOException {
Iterator<Result> iter = scan(ROLE_TABLE, CATALOG_CF, CATALOG_COL);
- List<Role> roles = new ArrayList<Role>();
+ List<Role> roles = new ArrayList<>();
while (iter.hasNext()) {
Result result = iter.next();
roles.add(HBaseUtils.deserializeRole(result.getRow(),
@@ -1199,11 +1202,11 @@ class HBaseReadWrite {
List<Table> getTables(String dbName, List<String> tableNames) throws IOException {
// I could implement getTable in terms of this method. But it is such a core function
// that I don't want to slow it down for the much less common fetching of multiple tables.
- List<Table> results = new ArrayList<Table>(tableNames.size());
+ List<Table> results = new ArrayList<>(tableNames.size());
ObjectPair<String, String>[] hashKeys = new ObjectPair[tableNames.size()];
boolean atLeastOneMissing = false;
for (int i = 0; i < tableNames.size(); i++) {
- hashKeys[i] = new ObjectPair<String, String>(dbName, tableNames.get(i));
+ hashKeys[i] = new ObjectPair<>(dbName, tableNames.get(i));
// The result may be null, but we still want to add it so that we have a slot in the list
// for it.
results.add(tableCache.get(hashKeys[i]));
@@ -1212,7 +1215,7 @@ class HBaseReadWrite {
if (!atLeastOneMissing) return results;
// Now build a single get that will fetch the remaining tables
- List<Get> gets = new ArrayList<Get>();
+ List<Get> gets = new ArrayList<>();
HTableInterface htab = conn.getHBaseTable(TABLE_TABLE);
for (int i = 0; i < tableNames.size(); i++) {
if (results.get(i) != null) continue;
@@ -1261,7 +1264,7 @@ class HBaseReadWrite {
Iterator<Result> iter =
scan(TABLE_TABLE, keyPrefix, HBaseUtils.getEndPrefix(keyPrefix),
CATALOG_CF, CATALOG_COL, filter);
- List<Table> tables = new ArrayList<Table>();
+ List<Table> tables = new ArrayList<>();
while (iter.hasNext()) {
Result result = iter.next();
HBaseUtils.StorageDescriptorParts sdParts =
@@ -1284,7 +1287,7 @@ class HBaseReadWrite {
byte[] hash = putStorageDescriptor(table.getSd());
byte[][] serialized = HBaseUtils.serializeTable(table, hash);
store(TABLE_TABLE, serialized[0], CATALOG_CF, CATALOG_COL, serialized[1]);
- tableCache.put(new ObjectPair<String, String>(table.getDbName(), table.getTableName()), table);
+ tableCache.put(new ObjectPair<>(table.getDbName(), table.getTableName()), table);
}
/**
@@ -1323,7 +1326,7 @@ class HBaseReadWrite {
private void deleteTable(String dbName, String tableName, boolean decrementRefCnt)
throws IOException {
- tableCache.remove(new ObjectPair<String, String>(dbName, tableName));
+ tableCache.remove(new ObjectPair<>(dbName, tableName));
if (decrementRefCnt) {
// Find the table so I can get the storage descriptor and drop it
Table t = getTable(dbName, tableName, false);
@@ -1335,7 +1338,7 @@ class HBaseReadWrite {
private Table getTable(String dbName, String tableName, boolean populateCache)
throws IOException {
- ObjectPair<String, String> hashKey = new ObjectPair<String, String>(dbName, tableName);
+ ObjectPair<String, String> hashKey = new ObjectPair<>(dbName, tableName);
Table cached = tableCache.get(hashKey);
if (cached != null) return cached;
byte[] key = HBaseUtils.buildKey(dbName, tableName);
@@ -1623,6 +1626,7 @@ class HBaseReadWrite {
byte[] serialized = read(AGGR_STATS_TABLE, key, CATALOG_CF, AGGR_STATS_STATS_COL);
if (serialized == null) return null;
return HBaseUtils.deserializeAggrStats(serialized);
+
}
/**
@@ -1696,6 +1700,134 @@ class HBaseReadWrite {
}
/**********************************************************************************************
+ * Security related methods
+ *********************************************************************************************/
+
+ /**
+ * Fetch a delegation token
+ * @param tokId identifier of the token to fetch
+ * @return the delegation token, or null if there is no such delegation token
+ * @throws IOException
+ */
+ String getDelegationToken(String tokId) throws IOException {
+ byte[] key = HBaseUtils.buildKey(tokId);
+ byte[] serialized = read(SECURITY_TABLE, key, CATALOG_CF, DELEGATION_TOKEN_COL);
+ if (serialized == null) return null;
+ return HBaseUtils.deserializeDelegationToken(serialized);
+ }
+
+ /**
+ * Get all delegation token ids
+ * @return list of all delegation token identifiers
+ * @throws IOException
+ */
+ List<String> scanDelegationTokenIdentifiers() throws IOException {
+ Iterator<Result> iter = scan(SECURITY_TABLE, CATALOG_CF, DELEGATION_TOKEN_COL);
+ List<String> ids = new ArrayList<>();
+ while (iter.hasNext()) {
+ Result result = iter.next();
+ byte[] serialized = result.getValue(CATALOG_CF, DELEGATION_TOKEN_COL);
+ if (serialized != null) {
+ // Don't deserialize the value, as what we're after is the key. We just had to check the
+ // value wasn't null in order to check this is a record with a delegation token and not a
+ // master key.
+ ids.add(new String(result.getRow(), HBaseUtils.ENCODING));
+
+ }
+ }
+ return ids;
+ }
+
+ /**
+ * Store a delegation token
+ * @param tokId token id
+ * @param token delegation token to store
+ * @throws IOException
+ */
+ void putDelegationToken(String tokId, String token) throws IOException {
+ byte[][] serialized = HBaseUtils.serializeDelegationToken(tokId, token);
+ store(SECURITY_TABLE, serialized[0], CATALOG_CF, DELEGATION_TOKEN_COL, serialized[1]);
+ }
+
+ /**
+ * Delete a delegation token
+ * @param tokId identifier of token to drop
+ * @throws IOException
+ */
+ void deleteDelegationToken(String tokId) throws IOException {
+ byte[] key = HBaseUtils.buildKey(tokId);
+ delete(SECURITY_TABLE, key, CATALOG_CF, DELEGATION_TOKEN_COL);
+ }
+
+ /**
+ * Fetch a master key
+ * @param seqNo sequence number of the master key
+ * @return the master key, or null if there is no such master key
+ * @throws IOException
+ */
+ String getMasterKey(Integer seqNo) throws IOException {
+ byte[] key = HBaseUtils.buildKey(seqNo.toString());
+ byte[] serialized = read(SECURITY_TABLE, key, CATALOG_CF, MASTER_KEY_COL);
+ if (serialized == null) return null;
+ return HBaseUtils.deserializeMasterKey(serialized);
+ }
+
+ /**
+ * Get all master keys
+ * @return list of all master keys
+ * @throws IOException
+ */
+ List<String> scanMasterKeys() throws IOException {
+ Iterator<Result> iter = scan(SECURITY_TABLE, CATALOG_CF, MASTER_KEY_COL);
+ List<String> keys = new ArrayList<>();
+ while (iter.hasNext()) {
+ Result result = iter.next();
+ byte[] serialized = result.getValue(CATALOG_CF, MASTER_KEY_COL);
+ if (serialized != null) {
+ keys.add(HBaseUtils.deserializeMasterKey(serialized));
+
+ }
+ }
+ return keys;
+ }
+
+ /**
+ * Store a master key
+ * @param seqNo sequence number
+ * @param key master key to store
+ * @throws IOException
+ */
+ void putMasterKey(Integer seqNo, String key) throws IOException {
+ byte[][] serialized = HBaseUtils.serializeMasterKey(seqNo, key);
+ store(SECURITY_TABLE, serialized[0], CATALOG_CF, MASTER_KEY_COL, serialized[1]);
+ }
+
+ /**
+ * Delete a master key
+ * @param seqNo sequence number of master key to delete
+ * @throws IOException
+ */
+ void deleteMasterKey(Integer seqNo) throws IOException {
+ byte[] key = HBaseUtils.buildKey(seqNo.toString());
+ delete(SECURITY_TABLE, key, CATALOG_CF, MASTER_KEY_COL);
+ }
+
+ /**********************************************************************************************
+ * Sequence methods
+ *********************************************************************************************/
+
+ long getNextSequence(byte[] sequence) throws IOException {
+ byte[] serialized = read(SEQUENCES_TABLE, SEQUENCES_KEY, CATALOG_CF, sequence);
+ long val = 0;
+ if (serialized != null) {
+ val = Long.valueOf(new String(serialized, HBaseUtils.ENCODING));
+ }
+ byte[] incrSerialized = new Long(val + 1).toString().getBytes(HBaseUtils.ENCODING);
+ store(SEQUENCES_TABLE, SEQUENCES_KEY, CATALOG_CF, sequence, incrSerialized);
+ return val;
+ }
+
+ /**********************************************************************************************
* Cache methods
*********************************************************************************************/
@@ -1772,8 +1904,7 @@ class HBaseReadWrite {
htab.delete(d);
}
- private Iterator<Result> scan(String table, byte[] colFam,
- byte[] colName) throws IOException {
+ private Iterator<Result> scan(String table, byte[] colFam, byte[] colName) throws IOException {
return scan(table, null, null, colFam, colName, null);
}
http://git-wip-us.apache.org/repos/asf/hive/blob/a310524c/metastore/src/java/org/apache/hadoop/hive/metastore/hbase/HBaseStore.java
----------------------------------------------------------------------
diff --git a/metastore/src/java/org/apache/hadoop/hive/metastore/hbase/HBaseStore.java b/metastore/src/java/org/apache/hadoop/hive/metastore/hbase/HBaseStore.java
index 9782859..744070d 100644
--- a/metastore/src/java/org/apache/hadoop/hive/metastore/hbase/HBaseStore.java
+++ b/metastore/src/java/org/apache/hadoop/hive/metastore/hbase/HBaseStore.java
@@ -1613,43 +1613,128 @@ public class HBaseStore implements RawStore {
@Override
public boolean addToken(String tokenIdentifier, String delegationToken) {
- throw new UnsupportedOperationException();
+ boolean commit = false;
+ openTransaction();
+ try {
+ getHBase().putDelegationToken(tokenIdentifier, delegationToken);
+ commit = true;
+ return commit; // See HIVE-11302, for now always returning true
+ } catch (IOException e) {
+ throw new RuntimeException(e);
+ } finally {
+ commitOrRoleBack(commit);
+ }
}
@Override
public boolean removeToken(String tokenIdentifier) {
- throw new UnsupportedOperationException();
+ boolean commit = false;
+ openTransaction();
+ try {
+ getHBase().deleteDelegationToken(tokenIdentifier);
+ commit = true;
+ return commit; // See HIVE-11302, for now always returning true
+ } catch (IOException e) {
+ throw new RuntimeException(e);
+ } finally {
+ commitOrRoleBack(commit);
+ }
}
@Override
public String getToken(String tokenIdentifier) {
- throw new UnsupportedOperationException();
+ boolean commit = false;
+ openTransaction();
+ try {
+ String token = getHBase().getDelegationToken(tokenIdentifier);
+ commit = true;
+ return token;
+ } catch (IOException e) {
+ throw new RuntimeException(e);
+ } finally {
+ commitOrRoleBack(commit);
+ }
}
@Override
public List<String> getAllTokenIdentifiers() {
- throw new UnsupportedOperationException();
+ boolean commit = false;
+ openTransaction();
+ try {
+ List<String> ids = getHBase().scanDelegationTokenIdentifiers();
+ commit = true;
+ return ids;
+ } catch (IOException e) {
+ throw new RuntimeException(e);
+ } finally {
+ commitOrRoleBack(commit);
+ }
}
@Override
public int addMasterKey(String key) throws MetaException {
- throw new UnsupportedOperationException();
+ boolean commit = false;
+ openTransaction();
+ try {
+ long seq = getHBase().getNextSequence(HBaseReadWrite.MASTER_KEY_SEQUENCE);
+ getHBase().putMasterKey((int) seq, key);
+ commit = true;
+ return (int)seq;
+ } catch (IOException e) {
+ LOG.error("Unable to add master key", e);
+ throw new MetaException("Failed adding master key, " + e.getMessage());
+ } finally {
+ commitOrRoleBack(commit);
+ }
}
@Override
public void updateMasterKey(Integer seqNo, String key) throws NoSuchObjectException,
MetaException {
- throw new UnsupportedOperationException();
+ boolean commit = false;
+ openTransaction();
+ try {
+ if (getHBase().getMasterKey(seqNo) == null) {
+ throw new NoSuchObjectException("No key found with keyId: " + seqNo);
+ }
+ getHBase().putMasterKey(seqNo, key);
+ commit = true;
+ } catch (IOException e) {
+ LOG.error("Unable to update master key", e);
+ throw new MetaException("Failed updating master key, " + e.getMessage());
+ } finally {
+ commitOrRoleBack(commit);
+ }
}
@Override
public boolean removeMasterKey(Integer keySeq) {
- throw new UnsupportedOperationException();
+ boolean commit = false;
+ openTransaction();
+ try {
+ getHBase().deleteMasterKey(keySeq);
+ commit = true;
+ return true; // See HIVE-11302, for now always returning true
+ } catch (IOException e) {
+ throw new RuntimeException(e);
+ } finally {
+ commitOrRoleBack(commit);
+ }
}
@Override
public String[] getMasterKeys() {
- throw new UnsupportedOperationException();
+ boolean commit = false;
+ openTransaction();
+ try {
+ List<String> keys = getHBase().scanMasterKeys();
+ commit = true;
+ return keys.toArray(new String[keys.size()]);
+ } catch (IOException e) {
+ throw new RuntimeException(e);
+ } finally {
+ commitOrRoleBack(commit);
+ }
}
@Override
http://git-wip-us.apache.org/repos/asf/hive/blob/a310524c/metastore/src/java/org/apache/hadoop/hive/metastore/hbase/HBaseUtils.java
----------------------------------------------------------------------
diff --git a/metastore/src/java/org/apache/hadoop/hive/metastore/hbase/HBaseUtils.java b/metastore/src/java/org/apache/hadoop/hive/metastore/hbase/HBaseUtils.java
index 4d57af2..62bb4de 100644
--- a/metastore/src/java/org/apache/hadoop/hive/metastore/hbase/HBaseUtils.java
+++ b/metastore/src/java/org/apache/hadoop/hive/metastore/hbase/HBaseUtils.java
@@ -104,8 +104,7 @@ class HBaseUtils {
}
private static HbaseMetastoreProto.Parameters buildParameters(Map<String, String> params) {
- List<HbaseMetastoreProto.ParameterEntry> entries =
- new ArrayList<HbaseMetastoreProto.ParameterEntry>();
+ List<HbaseMetastoreProto.ParameterEntry> entries = new ArrayList<>();
for (Map.Entry<String, String> e : params.entrySet()) {
entries.add(
HbaseMetastoreProto.ParameterEntry.newBuilder()
@@ -119,7 +118,7 @@ class HBaseUtils {
}
private static Map<String, String> buildParameters(HbaseMetastoreProto.Parameters protoParams) {
- Map<String, String> params = new HashMap<String, String>();
+ Map<String, String> params = new HashMap<>();
for (HbaseMetastoreProto.ParameterEntry pe : protoParams.getParameterList()) {
params.put(pe.getKey(), pe.getValue());
}
@@ -129,8 +128,7 @@ class HBaseUtils {
private static List<HbaseMetastoreProto.PrincipalPrivilegeSetEntry>
buildPrincipalPrivilegeSetEntry(Map<String, List<PrivilegeGrantInfo>> entries) {
- List<HbaseMetastoreProto.PrincipalPrivilegeSetEntry> results =
- new ArrayList<HbaseMetastoreProto.PrincipalPrivilegeSetEntry>();
+ List<HbaseMetastoreProto.PrincipalPrivilegeSetEntry> results = new ArrayList<>();
for (Map.Entry<String, List<PrivilegeGrantInfo>> entry : entries.entrySet()) {
results.add(HbaseMetastoreProto.PrincipalPrivilegeSetEntry.newBuilder()
.setPrincipalName(entry.getKey())
@@ -142,8 +140,7 @@ class HBaseUtils {
private static List<HbaseMetastoreProto.PrivilegeGrantInfo> buildPrivilegeGrantInfo(
List<PrivilegeGrantInfo> privileges) {
- List<HbaseMetastoreProto.PrivilegeGrantInfo> results =
- new ArrayList<HbaseMetastoreProto.PrivilegeGrantInfo>();
+ List<HbaseMetastoreProto.PrivilegeGrantInfo> results = new ArrayList<>();
for (PrivilegeGrantInfo privilege : privileges) {
HbaseMetastoreProto.PrivilegeGrantInfo.Builder builder =
HbaseMetastoreProto.PrivilegeGrantInfo.newBuilder();
@@ -187,8 +184,7 @@ class HBaseUtils {
private static Map<String, List<PrivilegeGrantInfo>> convertPrincipalPrivilegeSetEntries(
List<HbaseMetastoreProto.PrincipalPrivilegeSetEntry> entries) {
- Map<String, List<PrivilegeGrantInfo>> map =
- new HashMap<String, List<PrivilegeGrantInfo>>();
+ Map<String, List<PrivilegeGrantInfo>> map = new HashMap<>();
for (HbaseMetastoreProto.PrincipalPrivilegeSetEntry entry : entries) {
map.put(entry.getPrincipalName(), convertPrivilegeGrantInfos(entry.getPrivilegesList()));
}
@@ -197,7 +193,7 @@ class HBaseUtils {
private static List<PrivilegeGrantInfo> convertPrivilegeGrantInfos(
List<HbaseMetastoreProto.PrivilegeGrantInfo> privileges) {
- List<PrivilegeGrantInfo> results = new ArrayList<PrivilegeGrantInfo>();
+ List<PrivilegeGrantInfo> results = new ArrayList<>();
for (HbaseMetastoreProto.PrivilegeGrantInfo proto : privileges) {
PrivilegeGrantInfo pgi = new PrivilegeGrantInfo();
if (proto.hasPrivilege()) pgi.setPrivilege(proto.getPrivilege());
@@ -316,7 +312,7 @@ class HBaseUtils {
static List<String> deserializeRoleList(byte[] value) throws InvalidProtocolBufferException {
HbaseMetastoreProto.RoleList proto = HbaseMetastoreProto.RoleList.parseFrom(value);
- return new ArrayList<String>(proto.getRoleList());
+ return new ArrayList<>(proto.getRoleList());
}
/**
@@ -491,7 +487,7 @@ class HBaseUtils {
private static List<FieldSchema>
convertFieldSchemaListFromProto(List<HbaseMetastoreProto.FieldSchema> protoList) {
- List<FieldSchema> schemas = new ArrayList<FieldSchema>(protoList.size());
+ List<FieldSchema> schemas = new ArrayList<>(protoList.size());
for (HbaseMetastoreProto.FieldSchema proto : protoList) {
schemas.add(new FieldSchema(proto.getName(), proto.getType(),
proto.hasComment() ? proto.getComment() : null));
@@ -501,8 +497,7 @@ class HBaseUtils {
private static List<HbaseMetastoreProto.FieldSchema>
convertFieldSchemaListToProto(List<FieldSchema> schemas) {
- List<HbaseMetastoreProto.FieldSchema> protoList =
- new ArrayList<HbaseMetastoreProto.FieldSchema>(schemas.size());
+ List<HbaseMetastoreProto.FieldSchema> protoList = new ArrayList<>(schemas.size());
for (FieldSchema fs : schemas) {
HbaseMetastoreProto.FieldSchema.Builder builder =
HbaseMetastoreProto.FieldSchema.newBuilder();
@@ -552,8 +547,7 @@ class HBaseUtils {
}
if (sd.getSortCols() != null) {
List<Order> orders = sd.getSortCols();
- List<HbaseMetastoreProto.StorageDescriptor.Order> protoList =
- new ArrayList<HbaseMetastoreProto.StorageDescriptor.Order>(orders.size());
+ List<HbaseMetastoreProto.StorageDescriptor.Order> protoList = new ArrayList<>(orders.size());
for (Order order : orders) {
protoList.add(HbaseMetastoreProto.StorageDescriptor.Order.newBuilder()
.setColumnName(order.getCol())
@@ -625,7 +619,7 @@ class HBaseUtils {
md.update(serde.getSerializationLib().getBytes(ENCODING));
}
if (serde.getParameters() != null) {
- SortedMap<String, String> params = new TreeMap<String, String>(serde.getParameters());
+ SortedMap<String, String> params = new TreeMap<>(serde.getParameters());
for (Map.Entry<String, String> param : params.entrySet()) {
md.update(param.getKey().getBytes(ENCODING));
md.update(param.getValue().getBytes(ENCODING));
@@ -633,11 +627,11 @@ class HBaseUtils {
}
}
if (sd.getBucketCols() != null) {
- SortedSet<String> bucketCols = new TreeSet<String>(sd.getBucketCols());
+ SortedSet<String> bucketCols = new TreeSet<>(sd.getBucketCols());
for (String bucket : bucketCols) md.update(bucket.getBytes(ENCODING));
}
if (sd.getSortCols() != null) {
- SortedSet<Order> orders = new TreeSet<Order>(sd.getSortCols());
+ SortedSet<Order> orders = new TreeSet<>(sd.getSortCols());
for (Order order : orders) {
md.update(order.getCol().getBytes(ENCODING));
md.update(Integer.toString(order.getOrder()).getBytes(ENCODING));
@@ -646,21 +640,21 @@ class HBaseUtils {
if (sd.getSkewedInfo() != null) {
SkewedInfo skewed = sd.getSkewedInfo();
if (skewed.getSkewedColNames() != null) {
- SortedSet<String> colnames = new TreeSet<String>(skewed.getSkewedColNames());
+ SortedSet<String> colnames = new TreeSet<>(skewed.getSkewedColNames());
for (String colname : colnames) md.update(colname.getBytes(ENCODING));
}
if (skewed.getSkewedColValues() != null) {
- SortedSet<String> sortedOuterList = new TreeSet<String>();
+ SortedSet<String> sortedOuterList = new TreeSet<>();
for (List<String> innerList : skewed.getSkewedColValues()) {
- SortedSet<String> sortedInnerList = new TreeSet<String>(innerList);
+ SortedSet<String> sortedInnerList = new TreeSet<>(innerList);
sortedOuterList.add(StringUtils.join(sortedInnerList, "."));
}
for (String colval : sortedOuterList) md.update(colval.getBytes(ENCODING));
}
if (skewed.getSkewedColValueLocationMaps() != null) {
- SortedMap<String, String> sortedMap = new TreeMap<String, String>();
+ SortedMap<String, String> sortedMap = new TreeMap<>();
for (Map.Entry<List<String>, String> smap : skewed.getSkewedColValueLocationMaps().entrySet()) {
- SortedSet<String> sortedKey = new TreeSet<String>(smap.getKey());
+ SortedSet<String> sortedKey = new TreeSet<>(smap.getKey());
sortedMap.put(StringUtils.join(sortedKey, "."), smap.getValue());
}
for (Map.Entry<String, String> e : sortedMap.entrySet()) {
@@ -690,8 +684,8 @@ class HBaseUtils {
serde.setParameters(buildParameters(proto.getSerdeInfo().getParameters()));
sd.setSerdeInfo(serde);
}
- sd.setBucketCols(new ArrayList<String>(proto.getBucketColsList()));
- List<Order> sortCols = new ArrayList<Order>();
+ sd.setBucketCols(new ArrayList<>(proto.getBucketColsList()));
+ List<Order> sortCols = new ArrayList<>();
for (HbaseMetastoreProto.StorageDescriptor.Order protoOrder : proto.getSortColsList()) {
sortCols.add(new Order(protoOrder.getColumnName(), protoOrder.getOrder()));
}
@@ -699,15 +693,15 @@ class HBaseUtils {
if (proto.hasSkewedInfo()) {
SkewedInfo skewed = new SkewedInfo();
skewed
- .setSkewedColNames(new ArrayList<String>(proto.getSkewedInfo().getSkewedColNamesList()));
+ .setSkewedColNames(new ArrayList<>(proto.getSkewedInfo().getSkewedColNamesList()));
for (HbaseMetastoreProto.StorageDescriptor.SkewedInfo.SkewedColValueList innerList :
proto.getSkewedInfo().getSkewedColValuesList()) {
- skewed.addToSkewedColValues(new ArrayList<String>(innerList.getSkewedColValueList()));
+ skewed.addToSkewedColValues(new ArrayList<>(innerList.getSkewedColValueList()));
}
- Map<List<String>, String> colMaps = new HashMap<List<String>, String>();
+ Map<List<String>, String> colMaps = new HashMap<>();
for (HbaseMetastoreProto.StorageDescriptor.SkewedInfo.SkewedColValueLocationMap map :
proto.getSkewedInfo().getSkewedColValueLocationMapsList()) {
- colMaps.put(new ArrayList<String>(map.getKeyList()), map.getValue());
+ colMaps.put(new ArrayList<>(map.getKeyList()), map.getValue());
}
skewed.setSkewedColValueLocationMaps(colMaps);
sd.setSkewedInfo(skewed);
@@ -742,7 +736,7 @@ class HBaseUtils {
}
static byte[] buildPartitionKey(String dbName, String tableName, List<String> partVals) {
- Deque<String> keyParts = new ArrayDeque<String>(partVals);
+ Deque<String> keyParts = new ArrayDeque<>(partVals);
keyParts.addFirst(tableName);
keyParts.addFirst(dbName);
return buildKey(keyParts.toArray(new String[keyParts.size()]));
@@ -1135,6 +1129,61 @@ class HBaseUtils {
}
/**
+ * Serialize a delegation token
+ * @param tokenIdentifier
+ * @param delegationToken
+ * @return two byte arrays, first contains the key, the second the serialized value.
+ */
+ static byte[][] serializeDelegationToken(String tokenIdentifier, String delegationToken) {
+ byte[][] result = new byte[2][];
+ result[0] = buildKey(tokenIdentifier);
+ result[1] = HbaseMetastoreProto.DelegationToken.newBuilder()
+ .setTokenStr(delegationToken)
+ .build()
+ .toByteArray();
+ return result;
+ }
+
+ /**
+ * Deserialize a delegation token.
+ * @param value value fetched from hbase
+ * @return A delegation token.
+ * @throws InvalidProtocolBufferException
+ */
+ static String deserializeDelegationToken(byte[] value) throws InvalidProtocolBufferException {
+ HbaseMetastoreProto.DelegationToken protoToken =
+ HbaseMetastoreProto.DelegationToken.parseFrom(value);
+ return protoToken.getTokenStr();
+ }
+
+ /**
+ * Serialize a master key
+ * @param seqNo
+ * @param key
+ * @return two byte arrays, first contains the key, the second the serialized value.
+ */
+ static byte[][] serializeMasterKey(Integer seqNo, String key) {
+ byte[][] result = new byte[2][];
+ result[0] = buildKey(seqNo.toString());
+ result[1] = HbaseMetastoreProto.MasterKey.newBuilder()
+ .setMasterKey(key)
+ .build()
+ .toByteArray();
+ return result;
+ }
+
+ /**
+ * Deserialize a master key.
+ * @param value value fetched from hbase
+ * @return A master key
+ * @throws InvalidProtocolBufferException
+ */
+ static String deserializeMasterKey(byte[] value) throws InvalidProtocolBufferException {
+ HbaseMetastoreProto.MasterKey protoKey = HbaseMetastoreProto.MasterKey.parseFrom(value);
+ return protoKey.getMasterKey();
+ }
+
+ /**
* @param keyStart byte array representing the start prefix
* @return byte array corresponding to the next possible prefix
*/
http://git-wip-us.apache.org/repos/asf/hive/blob/a310524c/metastore/src/protobuf/org/apache/hadoop/hive/metastore/hbase/hbase_metastore_proto.proto
----------------------------------------------------------------------
diff --git a/metastore/src/protobuf/org/apache/hadoop/hive/metastore/hbase/hbase_metastore_proto.proto b/metastore/src/protobuf/org/apache/hadoop/hive/metastore/hbase/hbase_metastore_proto.proto
index 3cd8867..cba3671 100644
--- a/metastore/src/protobuf/org/apache/hadoop/hive/metastore/hbase/hbase_metastore_proto.proto
+++ b/metastore/src/protobuf/org/apache/hadoop/hive/metastore/hbase/hbase_metastore_proto.proto
@@ -104,6 +104,10 @@ message Database {
optional PrincipalType owner_type = 6;
}
+message DelegationToken {
+ required string token_str = 1;
+}
+
message FieldSchema {
required string name = 1;
required string type = 2;
@@ -133,6 +137,10 @@ message Function {
repeated ResourceUri resource_uris = 6;
}
+message MasterKey {
+ required string master_key = 1;
+}
+
message ParameterEntry {
required string key = 1;
required string value = 2;
@@ -247,8 +255,3 @@ message Table {
optional PrincipalPrivilegeSet privileges = 13;
optional bool is_temporary = 14;
}
-
-
-
-
-
http://git-wip-us.apache.org/repos/asf/hive/blob/a310524c/metastore/src/test/org/apache/hadoop/hive/metastore/hbase/TestHBaseStore.java
----------------------------------------------------------------------
diff --git a/metastore/src/test/org/apache/hadoop/hive/metastore/hbase/TestHBaseStore.java b/metastore/src/test/org/apache/hadoop/hive/metastore/hbase/TestHBaseStore.java
index 9878499..fac7dcc 100644
--- a/metastore/src/test/org/apache/hadoop/hive/metastore/hbase/TestHBaseStore.java
+++ b/metastore/src/test/org/apache/hadoop/hive/metastore/hbase/TestHBaseStore.java
@@ -23,7 +23,6 @@ import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.hbase.Cell;
import org.apache.hadoop.hbase.client.HTableInterface;
import org.apache.hadoop.hive.conf.HiveConf;
-import org.apache.hadoop.hive.metastore.api.AggrStats;
import org.apache.hadoop.hive.metastore.api.BinaryColumnStatsData;
import org.apache.hadoop.hive.metastore.api.BooleanColumnStatsData;
import org.apache.hadoop.hive.metastore.api.ColumnStatistics;
@@ -1246,56 +1245,6 @@ public class TestHBaseStore {
Assert.assertEquals(decimalData.getNumDVs(), decimalDataFromDB.getNumDVs());
}
- // TODO: Activate this test, when we are able to mock the HBaseReadWrite.NO_CACHE_CONF set to false
- // Right now, I have tested this by using aggrStatsCache despite NO_CACHE_CONF set to true
- // Also need to add tests for other data types + refactor a lot of duplicate code in stats testing
- //@Test
- public void AggrStats() throws Exception {
- int numParts = 3;
- ColumnStatistics stats;
- ColumnStatisticsDesc desc;
- ColumnStatisticsObj obj;
- List<String> partNames = new ArrayList<String>();
- List<String> colNames = new ArrayList<String>();
- colNames.add(BOOLEAN_COL);
- // Add boolean col stats to DB for numParts partitions:
- // PART_VALS(0), PART_VALS(1) & PART_VALS(2) for PART_KEYS(0)
- for (int i = 0; i < numParts; i++) {
- stats = new ColumnStatistics();
- // Get a default ColumnStatisticsDesc for partition level stats
- desc = getMockPartColStatsDesc(0, i);
- stats.setStatsDesc(desc);
- partNames.add(desc.getPartName());
- // Get one of the pre-created ColumnStatisticsObj
- obj = booleanColStatsObjs.get(i);
- stats.addToStatsObj(obj);
- // Add to DB
- List<String> parVals = new ArrayList<String>();
- parVals.add(PART_VALS.get(i));
- store.updatePartitionColumnStatistics(stats, parVals);
- }
- // Read aggregate stats
- AggrStats aggrStatsFromDB = store.get_aggr_stats_for(DB, TBL, partNames, colNames);
- // Verify
- Assert.assertEquals(1, aggrStatsFromDB.getColStatsSize());
- ColumnStatisticsObj objFromDB = aggrStatsFromDB.getColStats().get(0);
- Assert.assertNotNull(objFromDB);
- // Aggregate our mock values
- long numTrues = 0, numFalses = 0, numNulls = 0;
- BooleanColumnStatsData boolData;;
- for (int i = 0; i < numParts; i++) {
- boolData = booleanColStatsObjs.get(i).getStatsData().getBooleanStats();
- numTrues = numTrues + boolData.getNumTrues();
- numFalses = numFalses + boolData.getNumFalses();
- numNulls = numNulls + boolData.getNumNulls();
- }
- // Compare with what we got from the method call
- BooleanColumnStatsData boolDataFromDB = objFromDB.getStatsData().getBooleanStats();
- Assert.assertEquals(numTrues, boolDataFromDB.getNumTrues());
- Assert.assertEquals(numFalses, boolDataFromDB.getNumFalses());
- Assert.assertEquals(numNulls, boolDataFromDB.getNumNulls());
- }
-
/**
* Returns a dummy table level ColumnStatisticsDesc with default values
*/