You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hive.apache.org by sa...@apache.org on 2019/03/25 05:43:14 UTC
[hive] branch master updated: HIVE-21471: Replicating conversion of
managed to external table leaks HDFS files at target (Sankar Hariappan,
reviewed by Mahesh Kumar Behera)
This is an automated email from the ASF dual-hosted git repository.
sankarh pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hive.git
The following commit(s) were added to refs/heads/master by this push:
new 5e8f3f5 HIVE-21471: Replicating conversion of managed to external table leaks HDFS files at target (Sankar Hariappan, reviewed by Mahesh Kumar Behera)
5e8f3f5 is described below
commit 5e8f3f594d0e7ecfecdf1ecb2f7959e26d14f9d1
Author: Sankar Hariappan <sa...@apache.org>
AuthorDate: Mon Mar 25 11:12:22 2019 +0530
HIVE-21471: Replicating conversion of managed to external table leaks HDFS files at target (Sankar Hariappan, reviewed by Mahesh Kumar Behera)
Signed-off-by: Sankar Hariappan <sa...@apache.org>
---
.../TestReplicationScenariosExternalTables.java | 54 +++++++
.../hive/ql/ddl/table/CreateTableOperation.java | 30 +++-
.../hadoop/hive/ql/exec/repl/util/ReplUtils.java | 10 ++
.../org/apache/hadoop/hive/common/ReplConst.java | 33 ++++
.../hadoop/hive/metastore/HiveAlterHandler.java | 168 +++++++++++++--------
5 files changed, 224 insertions(+), 71 deletions(-)
diff --git a/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestReplicationScenariosExternalTables.java b/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestReplicationScenariosExternalTables.java
index 72da2f1..b8e96f2 100644
--- a/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestReplicationScenariosExternalTables.java
+++ b/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestReplicationScenariosExternalTables.java
@@ -25,6 +25,7 @@ import org.apache.hadoop.hive.conf.HiveConf;
import org.apache.hadoop.hive.metastore.InjectableBehaviourObjectStore;
import org.apache.hadoop.hive.metastore.InjectableBehaviourObjectStore.BehaviourInjection;
import org.apache.hadoop.hive.metastore.InjectableBehaviourObjectStore.CallerArguments;
+import org.apache.hadoop.hive.metastore.TableType;
import org.apache.hadoop.hive.metastore.conf.MetastoreConf;
import org.apache.hadoop.hive.metastore.messaging.json.gzip.GzipJSONMessageEncoder;
import org.apache.hadoop.hive.ql.ErrorMsg;
@@ -673,6 +674,59 @@ public class TestReplicationScenariosExternalTables extends BaseReplicationAcros
ErrorMsg.REPL_BOOTSTRAP_LOAD_PATH_NOT_VALID.getErrorCode());
}
+ @Test
+ public void dynamicallyConvertManagedToExternalTable() throws Throwable {
+ List<String> dumpWithClause = Collections.singletonList(
+ "'" + HiveConf.ConfVars.REPL_INCLUDE_EXTERNAL_TABLES.varname + "'='true'"
+ );
+ List<String> loadWithClause = externalTableBasePathWithClause();
+
+ WarehouseInstance.Tuple tupleBootstrapManagedTable = primary.run("use " + primaryDbName)
+ .run("create table t1 (id int)")
+ .run("insert into table t1 values (1)")
+ .run("create table t2 (id int) partitioned by (key int)")
+ .run("insert into table t2 partition(key=10) values (1)")
+ .dump(primaryDbName, null, dumpWithClause);
+
+ replica.load(replicatedDbName, tupleBootstrapManagedTable.dumpLocation, loadWithClause);
+
+ Hive hiveForReplica = Hive.get(replica.hiveConf);
+ Table replicaTable = hiveForReplica.getTable(replicatedDbName + ".t1");
+ Path oldTblLocT1 = replicaTable.getDataLocation();
+
+ replicaTable = hiveForReplica.getTable(replicatedDbName + ".t2");
+ Path oldTblLocT2 = replicaTable.getDataLocation();
+
+ WarehouseInstance.Tuple tupleIncConvertToExternalTbl = primary.run("use " + primaryDbName)
+ .run("alter table t1 set tblproperties('EXTERNAL'='true')")
+ .run("alter table t2 set tblproperties('EXTERNAL'='true')")
+ .dump(primaryDbName, tupleBootstrapManagedTable.lastReplicationId, dumpWithClause);
+
+ assertExternalFileInfo(Arrays.asList("t1", "t2"),
+ new Path(tupleIncConvertToExternalTbl.dumpLocation, FILE_NAME));
+ replica.load(replicatedDbName, tupleIncConvertToExternalTbl.dumpLocation, loadWithClause)
+ .run("use " + replicatedDbName)
+ .run("select id from t1")
+ .verifyResult("1")
+ .run("select id from t2 where key=10")
+ .verifyResult("1");
+
+ // Check if the table type is set correctly in target.
+ replicaTable = hiveForReplica.getTable(replicatedDbName + ".t1");
+ assertTrue(TableType.EXTERNAL_TABLE.equals(replicaTable.getTableType()));
+
+ replicaTable = hiveForReplica.getTable(replicatedDbName + ".t2");
+ assertTrue(TableType.EXTERNAL_TABLE.equals(replicaTable.getTableType()));
+
+ // Verify if new table location is set inside the base directory.
+ assertTablePartitionLocation(primaryDbName + ".t1", replicatedDbName + ".t1");
+ assertTablePartitionLocation(primaryDbName + ".t2", replicatedDbName + ".t2");
+
+ // Old location should be removed and set to new location.
+ assertFalse(replica.miniDFSCluster.getFileSystem().exists(oldTblLocT1));
+ assertFalse(replica.miniDFSCluster.getFileSystem().exists(oldTblLocT2));
+ }
+
private List<String> externalTableBasePathWithClause() throws IOException, SemanticException {
Path externalTableLocation = new Path(REPLICA_EXTERNAL_BASE);
DistributedFileSystem fileSystem = replica.miniDFSCluster.getFileSystem();
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/ddl/table/CreateTableOperation.java b/ql/src/java/org/apache/hadoop/hive/ql/ddl/table/CreateTableOperation.java
index af39c16..24373fe 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/ddl/table/CreateTableOperation.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/ddl/table/CreateTableOperation.java
@@ -23,6 +23,7 @@ import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hive.common.StatsSetupConst;
import org.apache.hadoop.hive.conf.Constants;
import org.apache.hadoop.hive.conf.HiveConf;
+import org.apache.hadoop.hive.metastore.TableType;
import org.apache.hadoop.hive.metastore.api.EnvironmentContext;
import org.apache.hadoop.hive.metastore.api.StorageDescriptor;
import org.apache.hadoop.hive.ql.ddl.DDLOperation;
@@ -54,17 +55,24 @@ public class CreateTableOperation extends DDLOperation {
Table tbl = desc.toTable(context.getConf());
LOG.debug("creating table {} on {}", tbl.getFullyQualifiedName(), tbl.getDataLocation());
- if (desc.getReplicationSpec().isInReplicationScope() && (!desc.getReplaceMode())){
- // if this is a replication spec, then replace-mode semantics might apply.
- // if we're already asking for a table replacement, then we can skip this check.
- // however, otherwise, if in replication scope, and we've not been explicitly asked
- // to replace, we should check if the object we're looking at exists, and if so,
+ boolean replDataLocationChanged = false;
+ if (desc.getReplicationSpec().isInReplicationScope()) {
+ // If in replication scope, we should check if the object we're looking at exists, and if so,
// trigger replace-mode semantics.
Table existingTable = context.getDb().getTable(tbl.getDbName(), tbl.getTableName(), false);
- if (existingTable != null){
+ if (existingTable != null) {
if (desc.getReplicationSpec().allowEventReplacementInto(existingTable.getParameters())) {
desc.setReplaceMode(true); // we replace existing table.
ReplicationSpec.copyLastReplId(existingTable.getParameters(), tbl.getParameters());
+
+ // If location of an existing managed table is changed, then need to delete the old location if exists.
+ // This scenario occurs when a managed table is converted into external table at source. In this case,
+ // at target, the table data would be moved to different location under base directory for external tables.
+ if (existingTable.getTableType().equals(TableType.MANAGED_TABLE)
+ && tbl.getTableType().equals(TableType.EXTERNAL_TABLE)
+ && (!existingTable.getDataLocation().equals(tbl.getDataLocation()))) {
+ replDataLocationChanged = true;
+ }
} else {
LOG.debug("DDLTask: Create Table is skipped as table {} is newer than update", desc.getTableName());
return 0; // no replacement, the existing table state is newer than our update.
@@ -74,7 +82,7 @@ public class CreateTableOperation extends DDLOperation {
// create the table
if (desc.getReplaceMode()) {
- createTableReplaceMode(tbl);
+ createTableReplaceMode(tbl, replDataLocationChanged);
} else {
createTableNonReplaceMode(tbl);
}
@@ -83,7 +91,7 @@ public class CreateTableOperation extends DDLOperation {
return 0;
}
- private void createTableReplaceMode(Table tbl) throws HiveException {
+ private void createTableReplaceMode(Table tbl, boolean replDataLocationChanged) throws HiveException {
ReplicationSpec replicationSpec = desc.getReplicationSpec();
long writeId = 0;
EnvironmentContext environmentContext = null;
@@ -108,6 +116,12 @@ public class CreateTableOperation extends DDLOperation {
}
}
+ // In replication flow, if table's data location is changed, then set the corresponding flag in
+ // environment context to notify Metastore to update location of all partitions and delete old directory.
+ if (replDataLocationChanged) {
+ environmentContext = ReplUtils.setReplDataLocationChangedFlag(environmentContext);
+ }
+
// replace-mode creates are really alters using CreateTableDesc.
context.getDb().alterTable(tbl.getCatName(), tbl.getDbName(), tbl.getTableName(), tbl, false, environmentContext,
true, writeId);
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/util/ReplUtils.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/util/ReplUtils.java
index a5ed840..072189b 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/util/ReplUtils.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/util/ReplUtils.java
@@ -19,8 +19,10 @@ package org.apache.hadoop.hive.ql.exec.repl.util;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.PathFilter;
+import org.apache.hadoop.hive.common.ReplConst;
import org.apache.hadoop.hive.conf.HiveConf;
import org.apache.hadoop.hive.metastore.TableType;
+import org.apache.hadoop.hive.metastore.api.EnvironmentContext;
import org.apache.hadoop.hive.metastore.api.InvalidOperationException;
import org.apache.hadoop.hive.ql.ErrorMsg;
import org.apache.hadoop.hive.ql.exec.Task;
@@ -206,4 +208,12 @@ public class ReplUtils {
// and not through replication.
return firstIncPendFlag != null && !firstIncPendFlag.isEmpty() && "true".equalsIgnoreCase(firstIncPendFlag);
}
+
+ public static EnvironmentContext setReplDataLocationChangedFlag(EnvironmentContext envContext) {
+ if (envContext == null) {
+ envContext = new EnvironmentContext();
+ }
+ envContext.putToProperties(ReplConst.REPL_DATA_LOCATION_CHANGED, ReplConst.TRUE);
+ return envContext;
+ }
}
diff --git a/standalone-metastore/metastore-common/src/main/java/org/apache/hadoop/hive/common/ReplConst.java b/standalone-metastore/metastore-common/src/main/java/org/apache/hadoop/hive/common/ReplConst.java
new file mode 100644
index 0000000..e25189d
--- /dev/null
+++ b/standalone-metastore/metastore-common/src/main/java/org/apache/hadoop/hive/common/ReplConst.java
@@ -0,0 +1,33 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hive.common;
+
+/**
+ * A class that defines the constant strings used by the replication implementation.
+ */
+
+public class ReplConst {
+
+ /**
+ * The constant that denotes the table data location is changed to different path. This indicates
+ * Metastore to update corresponding path in Partitions and also need to delete old path.
+ */
+ public static final String REPL_DATA_LOCATION_CHANGED = "REPL_DATA_LOCATION_CHANGED";
+
+ public static final String TRUE = "true";
+}
diff --git a/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/HiveAlterHandler.java b/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/HiveAlterHandler.java
index 617c7bc..ad670c9 100644
--- a/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/HiveAlterHandler.java
+++ b/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/HiveAlterHandler.java
@@ -21,6 +21,7 @@ import com.google.common.annotations.VisibleForTesting;
import com.google.common.collect.Lists;
import org.apache.commons.lang.StringUtils;
+import org.apache.hadoop.hive.common.ReplConst;
import org.apache.hadoop.hive.common.TableName;
import org.apache.hadoop.hive.metastore.conf.MetastoreConf;
import org.apache.hadoop.hive.metastore.events.AlterPartitionEvent;
@@ -98,10 +99,16 @@ public class HiveAlterHandler implements AlterHandler {
name = name.toLowerCase();
dbname = dbname.toLowerCase();
- final boolean cascade = environmentContext != null
- && environmentContext.isSetProperties()
- && StatsSetupConst.TRUE.equals(environmentContext.getProperties().get(
- StatsSetupConst.CASCADE));
+ final boolean cascade;
+ final boolean replDataLocationChanged;
+ if ((environmentContext != null) && environmentContext.isSetProperties()) {
+ cascade = StatsSetupConst.TRUE.equals(environmentContext.getProperties().get(StatsSetupConst.CASCADE));
+ replDataLocationChanged = ReplConst.TRUE.equals(environmentContext.getProperties().get(ReplConst.REPL_DATA_LOCATION_CHANGED));
+ } else {
+ cascade = false;
+ replDataLocationChanged = false;
+ }
+
if (newt == null) {
throw new InvalidOperationException("New table is null");
}
@@ -126,6 +133,7 @@ public class HiveAlterHandler implements AlterHandler {
boolean dataWasMoved = false;
boolean isPartitionedTable = false;
+ Database olddb = null;
Table oldt = null;
List<TransactionalMetaStoreEventListener> transactionalListeners = handler.getTransactionalListeners();
@@ -154,6 +162,7 @@ public class HiveAlterHandler implements AlterHandler {
msdb.openTransaction();
// get old table
// Note: we don't verify stats here; it's done below in alterTableUpdateTableColumnStats.
+ olddb = msdb.getDatabase(catName, dbname);
oldt = msdb.getTable(catName, dbname, name, null);
if (oldt == null) {
throw new InvalidOperationException("table " +
@@ -186,73 +195,87 @@ public class HiveAlterHandler implements AlterHandler {
}
}
- // rename needs change the data location and move the data to the new location corresponding
+ // Two mutually exclusive flows possible.
+ // i) Partition locations needs update if replDataLocationChanged is true which means table's
+ // data location is changed with all partition sub-directories.
+ // ii) Rename needs change the data location and move the data to the new location corresponding
// to the new name if:
// 1) the table is not a virtual view, and
// 2) the table is not an external table, and
// 3) the user didn't change the default location (or new location is empty), and
// 4) the table was not initially created with a specified location
- if (rename
- && !oldt.getTableType().equals(TableType.VIRTUAL_VIEW.toString())
- && (oldt.getSd().getLocation().compareTo(newt.getSd().getLocation()) == 0
- || StringUtils.isEmpty(newt.getSd().getLocation()))
- && !MetaStoreUtils.isExternalTable(oldt)) {
- Database olddb = msdb.getDatabase(catName, dbname);
- // if a table was created in a user specified location using the DDL like
- // create table tbl ... location ...., it should be treated like an external table
- // in the table rename, its data location should not be changed. We can check
- // if the table directory was created directly under its database directory to tell
- // if it is such a table
+ if (replDataLocationChanged
+ || (rename
+ && !oldt.getTableType().equals(TableType.VIRTUAL_VIEW.toString())
+ && (oldt.getSd().getLocation().compareTo(newt.getSd().getLocation()) == 0
+ || StringUtils.isEmpty(newt.getSd().getLocation()))
+ && !MetaStoreUtils.isExternalTable(oldt))) {
srcPath = new Path(oldt.getSd().getLocation());
- String oldtRelativePath = (new Path(olddb.getLocationUri()).toUri())
- .relativize(srcPath.toUri()).toString();
- boolean tableInSpecifiedLoc = !oldtRelativePath.equalsIgnoreCase(name)
- && !oldtRelativePath.equalsIgnoreCase(name + Path.SEPARATOR);
-
- if (!tableInSpecifiedLoc) {
- srcFs = wh.getFs(srcPath);
-
- // get new location
- Database db = msdb.getDatabase(catName, newDbName);
- Path databasePath = constructRenamedPath(wh.getDatabasePath(db), srcPath);
- destPath = new Path(databasePath, newTblName);
- destFs = wh.getFs(destPath);
- newt.getSd().setLocation(destPath.toString());
-
- // check that destination does not exist otherwise we will be
- // overwriting data
- // check that src and dest are on the same file system
- if (!FileUtils.equalsFileSystem(srcFs, destFs)) {
- throw new InvalidOperationException("table new location " + destPath
- + " is on a different file system than the old location "
- + srcPath + ". This operation is not supported");
- }
-
- try {
- if (destFs.exists(destPath)) {
- throw new InvalidOperationException("New location for this table " +
- TableName.getQualified(catName, newDbName, newTblName) +
- " already exists : " + destPath);
+ if (replDataLocationChanged) {
+ // If data location is changed in replication flow, then new path was already set in
+ // the newt. Also, it is as good as the data is moved and set dataWasMoved=true so that
+ // location in partitions are also updated accordingly.
+ // No need to validate if the destPath exists as in replication flow, data gets replicated
+ // separately.
+ destPath = new Path(newt.getSd().getLocation());
+ dataWasMoved = true;
+ } else {
+ // Rename flow.
+ // If a table was created in a user specified location using the DDL like
+ // create table tbl ... location ...., it should be treated like an external table
+ // in the table rename, its data location should not be changed. We can check
+ // if the table directory was created directly under its database directory to tell
+ // if it is such a table
+ String oldtRelativePath = (new Path(olddb.getLocationUri()).toUri())
+ .relativize(srcPath.toUri()).toString();
+ boolean tableInSpecifiedLoc = !oldtRelativePath.equalsIgnoreCase(name)
+ && !oldtRelativePath.equalsIgnoreCase(name + Path.SEPARATOR);
+ if (!tableInSpecifiedLoc) {
+ srcFs = wh.getFs(srcPath);
+
+ // get new location
+ Database db = msdb.getDatabase(catName, newDbName);
+ Path databasePath = constructRenamedPath(wh.getDatabasePath(db), srcPath);
+ destPath = new Path(databasePath, newTblName);
+ destFs = wh.getFs(destPath);
+
+ newt.getSd().setLocation(destPath.toString());
+
+ // check that destination does not exist otherwise we will be
+ // overwriting data
+ // check that src and dest are on the same file system
+ if (!FileUtils.equalsFileSystem(srcFs, destFs)) {
+ throw new InvalidOperationException("table new location " + destPath
+ + " is on a different file system than the old location "
+ + srcPath + ". This operation is not supported");
}
- // check that src exists and also checks permissions necessary, rename src to dest
- if (srcFs.exists(srcPath) && wh.renameDir(srcPath, destPath,
- ReplChangeManager.isSourceOfReplication(olddb))) {
- dataWasMoved = true;
+
+ try {
+ if (destFs.exists(destPath)) {
+ throw new InvalidOperationException("New location for this table " +
+ TableName.getQualified(catName, newDbName, newTblName) +
+ " already exists : " + destPath);
+ }
+ // check that src exists and also checks permissions necessary, rename src to dest
+ if (srcFs.exists(srcPath) && wh.renameDir(srcPath, destPath,
+ ReplChangeManager.isSourceOfReplication(olddb))) {
+ dataWasMoved = true;
+ }
+ } catch (IOException | MetaException e) {
+ LOG.error("Alter Table operation for " + dbname + "." + name + " failed.", e);
+ throw new InvalidOperationException("Alter Table operation for " + dbname + "." + name +
+ " failed to move data due to: '" + getSimpleMessage(e)
+ + "' See hive log file for details.");
}
- } catch (IOException | MetaException e) {
- LOG.error("Alter Table operation for " + dbname + "." + name + " failed.", e);
- throw new InvalidOperationException("Alter Table operation for " + dbname + "." + name +
- " failed to move data due to: '" + getSimpleMessage(e)
- + "' See hive log file for details.");
- }
- if (!HiveMetaStore.isRenameAllowed(olddb, db)) {
- LOG.error("Alter Table operation for " + TableName.getQualified(catName, dbname, name) +
- "to new table = " + TableName.getQualified(catName, newDbName, newTblName) + " failed ");
- throw new MetaException("Alter table not allowed for table " +
- TableName.getQualified(catName, dbname, name) +
- "to new table = " + TableName.getQualified(catName, newDbName, newTblName));
+ if (!HiveMetaStore.isRenameAllowed(olddb, db)) {
+ LOG.error("Alter Table operation for " + TableName.getQualified(catName, dbname, name) +
+ "to new table = " + TableName.getQualified(catName, newDbName, newTblName) + " failed ");
+ throw new MetaException("Alter table not allowed for table " +
+ TableName.getQualified(catName, dbname, name) +
+ "to new table = " + TableName.getQualified(catName, newDbName, newTblName));
+ }
}
}
@@ -382,10 +405,29 @@ public class HiveAlterHandler implements AlterHandler {
"Unable to change partition or table. Object " + e.getMessage() + " does not exist."
+ " Check metastore logs for detailed stack.");
} finally {
- if (!success) {
+ if (success) {
+ // Txn was committed successfully.
+ // If data location is changed in replication flow, then need to delete the old path.
+ if (replDataLocationChanged) {
+ assert(olddb != null);
+ assert(oldt != null);
+ Path deleteOldDataLoc = new Path(oldt.getSd().getLocation());
+ boolean isAutoPurge = "true".equalsIgnoreCase(oldt.getParameters().get("auto.purge"));
+ try {
+ wh.deleteDir(deleteOldDataLoc, true, isAutoPurge, olddb);
+ LOG.info("Deleted the old data location: {} for the table: {}",
+ deleteOldDataLoc, dbname + "." + name);
+ } catch (MetaException ex) {
+ // Eat the exception as it doesn't affect the state of existing tables.
+ // Expect, user to manually drop this path when exception and so logging a warning.
+ LOG.warn("Unable to delete the old data location: {} for the table: {}",
+ deleteOldDataLoc, dbname + "." + name);
+ }
+ }
+ } else {
LOG.error("Failed to alter table " + TableName.getQualified(catName, dbname, name));
msdb.rollbackTransaction();
- if (dataWasMoved) {
+ if (!replDataLocationChanged && dataWasMoved) {
try {
if (destFs.exists(destPath)) {
if (!destFs.rename(destPath, srcPath)) {