You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hive.apache.org by an...@apache.org on 2019/01/08 08:26:17 UTC
[2/3] hive git commit: HIVE-20911: External Table Replication for
Hive (Anishek Agarwal, reviewed by Sankar Hariappan, Ashutosh Bapat)
http://git-wip-us.apache.org/repos/asf/hive/blob/b3ef75ea/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/ReplDumpTask.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/ReplDumpTask.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/ReplDumpTask.java
index 5d6ae7f..497e103 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/ReplDumpTask.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/ReplDumpTask.java
@@ -23,6 +23,7 @@ import org.apache.hadoop.hive.common.ValidTxnList;
import org.apache.hadoop.hive.common.ValidWriteIdList;
import org.apache.hadoop.hive.conf.HiveConf;
import org.apache.hadoop.hive.metastore.IMetaStoreClient;
+import org.apache.hadoop.hive.metastore.TableType;
import org.apache.hadoop.hive.metastore.api.Database;
import org.apache.hadoop.hive.metastore.api.Function;
import org.apache.hadoop.hive.metastore.api.NoSuchObjectException;
@@ -75,12 +76,15 @@ import java.util.Collections;
import java.util.List;
import java.util.UUID;
import java.util.concurrent.TimeUnit;
+import static org.apache.hadoop.hive.ql.exec.repl.ReplExternalTables.Writer;
public class ReplDumpTask extends Task<ReplDumpWork> implements Serializable {
private static final String dumpSchema = "dump_dir,last_repl_id#string,string";
private static final String FUNCTIONS_ROOT_DIR_NAME = "_functions";
private static final String CONSTRAINTS_ROOT_DIR_NAME = "_constraints";
private static final String FUNCTION_METADATA_FILE_NAME = "_metadata";
+ private static final long SLEEP_TIME = 60000;
+
public enum ConstraintFileType {COMMON("common", "c_"), FOREIGNKEY("fk", "f_");
private final String name;
private final String prefix;
@@ -97,7 +101,6 @@ public class ReplDumpTask extends Task<ReplDumpWork> implements Serializable {
}
}
- private static long sleepTime = 60000;
private Logger LOG = LoggerFactory.getLogger(ReplDumpTask.class);
private ReplLogger replLogger;
@@ -119,11 +122,7 @@ public class ReplDumpTask extends Task<ReplDumpWork> implements Serializable {
} else {
lastReplId = incrementalDump(dumpRoot, dmd, cmRoot, hiveDb);
}
- prepareReturnValues(Arrays.asList(dumpRoot.toUri().toString(), String.valueOf(lastReplId)), dumpSchema);
- } catch (RuntimeException e) {
- LOG.error("failed", e);
- setException(e);
- return ErrorMsg.getErrorMsg(e.getMessage()).getErrorCode();
+ prepareReturnValues(Arrays.asList(dumpRoot.toUri().toString(), String.valueOf(lastReplId)));
} catch (Exception e) {
LOG.error("failed", e);
setException(e);
@@ -132,8 +131,8 @@ public class ReplDumpTask extends Task<ReplDumpWork> implements Serializable {
return 0;
}
- private void prepareReturnValues(List<String> values, String schema) throws SemanticException {
- LOG.debug("prepareReturnValues : " + schema);
+ private void prepareReturnValues(List<String> values) throws SemanticException {
+ LOG.debug("prepareReturnValues : " + dumpSchema);
for (String s : values) {
LOG.debug(" > " + s);
}
@@ -195,6 +194,18 @@ public class ReplDumpTask extends Task<ReplDumpWork> implements Serializable {
dmd.getDumpFilePath(), conf);
dmd.setDump(DumpType.INCREMENTAL, work.eventFrom, lastReplId, cmRoot);
dmd.write();
+
+ if (conf.getBoolVar(HiveConf.ConfVars.REPL_INCLUDE_EXTERNAL_TABLES) &&
+ !conf.getBoolVar(HiveConf.ConfVars.REPL_DUMP_METADATA_ONLY)) {
+ try (Writer writer = new Writer(dumpRoot, conf)) {
+ for (String tableName : Utils.matchesTbl(hiveDb, dbName, work.tableNameOrPattern)) {
+ Table table = hiveDb.getTable(dbName, tableName);
+ if (TableType.EXTERNAL_TABLE.equals(table.getTableType())) {
+ writer.dataLocationDump(table);
+ }
+ }
+ }
+ }
return lastReplId;
}
@@ -241,11 +252,27 @@ public class ReplDumpTask extends Task<ReplDumpWork> implements Serializable {
String uniqueKey = Utils.setDbBootstrapDumpState(hiveDb, dbName);
Exception caught = null;
- try {
+ try (Writer writer = new Writer(dbRoot, conf)) {
for (String tblName : Utils.matchesTbl(hiveDb, dbName, work.tableNameOrPattern)) {
LOG.debug(
"analyzeReplDump dumping table: " + tblName + " to db root " + dbRoot.toUri());
- dumpTable(dbName, tblName, validTxnList, dbRoot, bootDumpBeginReplId, hiveDb);
+ try {
+ HiveWrapper.Tuple<Table> tableTuple = new HiveWrapper(hiveDb, dbName).table(tblName);
+ boolean shouldWriteExternalTableLocationInfo =
+ conf.getBoolVar(HiveConf.ConfVars.REPL_INCLUDE_EXTERNAL_TABLES)
+ && TableType.EXTERNAL_TABLE.equals(tableTuple.object.getTableType())
+ && !conf.getBoolVar(HiveConf.ConfVars.REPL_DUMP_METADATA_ONLY);
+ if (shouldWriteExternalTableLocationInfo) {
+ LOG.debug("adding table {} to external tables list");
+ writer.dataLocationDump(tableTuple.object);
+ }
+ dumpTable(dbName, tblName, validTxnList, dbRoot, bootDumpBeginReplId, hiveDb,
+ tableTuple);
+ } catch (InvalidTableException te) {
+ // Bootstrap dump shouldn't fail if the table is dropped/renamed while dumping it.
+ // Just log a debug message and skip it.
+ LOG.debug(te.getMessage());
+ }
dumpConstraintMetadata(dbName, tblName, dbRoot, hiveDb);
}
} catch (Exception e) {
@@ -293,34 +320,27 @@ public class ReplDumpTask extends Task<ReplDumpWork> implements Serializable {
return dbRoot;
}
- void dumpTable(String dbName, String tblName, String validTxnList, Path dbRoot, long lastReplId, Hive hiveDb) throws Exception {
- try {
- HiveWrapper.Tuple<Table> tuple = new HiveWrapper(hiveDb, dbName).table(tblName);
- TableSpec tableSpec = new TableSpec(tuple.object);
- TableExport.Paths exportPaths =
- new TableExport.Paths(work.astRepresentationForErrorMsg, dbRoot, tblName, conf, true);
- String distCpDoAsUser = conf.getVar(HiveConf.ConfVars.HIVE_DISTCP_DOAS_USER);
- tuple.replicationSpec.setIsReplace(true); // by default for all other objects this is false
- if (AcidUtils.isTransactionalTable(tableSpec.tableHandle)) {
- tuple.replicationSpec.setValidTxnList(validTxnList);
- tuple.replicationSpec.setValidWriteIdList(getValidWriteIdList(dbName, tblName, validTxnList));
-
- // For transactional table, data would be valid snapshot for current txn and doesn't include data
- // added/modified by concurrent txns which are later than current txn. So, need to set last repl Id of this table
- // as bootstrap dump's last repl Id.
- tuple.replicationSpec.setCurrentReplicationState(String.valueOf(lastReplId));
- }
- MmContext mmCtx = MmContext.createIfNeeded(tableSpec.tableHandle);
- new TableExport(
- exportPaths, tableSpec, tuple.replicationSpec, hiveDb, distCpDoAsUser, conf, mmCtx).write();
-
-
- replLogger.tableLog(tblName, tableSpec.tableHandle.getTableType());
- } catch (InvalidTableException te) {
- // Bootstrap dump shouldn't fail if the table is dropped/renamed while dumping it.
- // Just log a debug message and skip it.
- LOG.debug(te.getMessage());
+ void dumpTable(String dbName, String tblName, String validTxnList, Path dbRoot, long lastReplId,
+ Hive hiveDb, HiveWrapper.Tuple<Table> tuple) throws Exception {
+ TableSpec tableSpec = new TableSpec(tuple.object);
+ TableExport.Paths exportPaths =
+ new TableExport.Paths(work.astRepresentationForErrorMsg, dbRoot, tblName, conf, true);
+ String distCpDoAsUser = conf.getVar(HiveConf.ConfVars.HIVE_DISTCP_DOAS_USER);
+ tuple.replicationSpec.setIsReplace(true); // by default for all other objects this is false
+ if (AcidUtils.isTransactionalTable(tableSpec.tableHandle)) {
+ tuple.replicationSpec.setValidTxnList(validTxnList);
+ tuple.replicationSpec.setValidWriteIdList(getValidWriteIdList(dbName, tblName, validTxnList));
+
+ // For transactional table, data would be valid snapshot for current txn and doesn't include data
+ // added/modified by concurrent txns which are later than current txn. So, need to set last repl Id of this table
+ // as bootstrap dump's last repl Id.
+ tuple.replicationSpec.setCurrentReplicationState(String.valueOf(lastReplId));
}
+ MmContext mmCtx = MmContext.createIfNeeded(tableSpec.tableHandle);
+ new TableExport(
+ exportPaths, tableSpec, tuple.replicationSpec, hiveDb, distCpDoAsUser, conf, mmCtx).write();
+
+ replLogger.tableLog(tblName, tableSpec.tableHandle.getTableType());
}
private String getValidWriteIdList(String dbName, String tblName, String validTxnString) throws LockException {
@@ -365,7 +385,7 @@ public class ReplDumpTask extends Task<ReplDumpWork> implements Serializable {
// Wait for 1 minute and check again.
try {
- Thread.sleep(sleepTime);
+ Thread.sleep(SLEEP_TIME);
} catch (InterruptedException e) {
LOG.info("REPL DUMP thread sleep interrupted", e);
}
http://git-wip-us.apache.org/repos/asf/hive/blob/b3ef75ea/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/ReplExternalTables.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/ReplExternalTables.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/ReplExternalTables.java
new file mode 100644
index 0000000..012df9d
--- /dev/null
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/ReplExternalTables.java
@@ -0,0 +1,272 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hive.ql.exec.repl;
+
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hive.common.FileUtils;
+import org.apache.hadoop.hive.conf.HiveConf;
+import org.apache.hadoop.hive.metastore.TableType;
+import org.apache.hadoop.hive.ql.metadata.Hive;
+import org.apache.hadoop.hive.ql.metadata.HiveException;
+import org.apache.hadoop.hive.ql.metadata.Partition;
+import org.apache.hadoop.hive.ql.metadata.Table;
+import org.apache.hadoop.hive.ql.parse.SemanticException;
+import org.apache.hadoop.hive.ql.parse.repl.PathBuilder;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.BufferedReader;
+import java.io.Closeable;
+import java.io.IOException;
+import java.io.InputStreamReader;
+import java.io.OutputStream;
+import java.io.StringWriter;
+import java.net.URI;
+import java.nio.charset.StandardCharsets;
+import java.util.Base64;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Set;
+
+/**
+ * Format of the file used to dump information about external tables:
+ * <p>
+ * table_name1,[base64Encoded(table_dir_location)]\n
+ *
+ * The file generated here is explicitly used for data copy of external tables and hence handling of
+ * writing this file is different than regular event handling for replication based on the conditions
+ * specified in {@link org.apache.hadoop.hive.ql.parse.repl.dump.Utils#shouldReplicate}
+ */
+public final class ReplExternalTables {
+ private static final Logger LOG = LoggerFactory.getLogger(ReplExternalTables.class);
+ private static final String FIELD_SEPARATOR = ",";
+ public static final String FILE_NAME = "_external_tables_info";
+ private static final int MAX_RETRIES = 5;
+
+ private ReplExternalTables(){}
+
+ public static String externalTableLocation(HiveConf hiveConf, String location) {
+ String currentPath = new Path(location).toUri().getPath();
+ String baseDir = hiveConf.get(HiveConf.ConfVars.REPL_EXTERNAL_TABLE_BASE_DIR.varname);
+ URI basePath = new Path(baseDir).toUri();
+ String dataPath = currentPath.replaceFirst(Path.SEPARATOR, basePath.getPath() + Path.SEPARATOR);
+ Path dataLocation = new Path(basePath.getScheme(), basePath.getAuthority(), dataPath);
+ LOG.debug("incoming location: {} , new location: {}", location, dataLocation.toString());
+ return dataLocation.toString();
+ }
+
+ public static class Writer implements Closeable {
+ private static Logger LOG = LoggerFactory.getLogger(Writer.class);
+ private final HiveConf hiveConf;
+ private final Path writePath;
+ private final Boolean excludeExternalTables, dumpMetadataOnly;
+ private OutputStream writer;
+
+ Writer(Path dbRoot, HiveConf hiveConf) throws IOException {
+ this.hiveConf = hiveConf;
+ writePath = new Path(dbRoot, FILE_NAME);
+ excludeExternalTables = !hiveConf.getBoolVar(HiveConf.ConfVars.REPL_INCLUDE_EXTERNAL_TABLES);
+ dumpMetadataOnly = hiveConf.getBoolVar(HiveConf.ConfVars.REPL_DUMP_METADATA_ONLY);
+ if (shouldWrite()) {
+ this.writer = FileSystem.get(hiveConf).create(writePath);
+ }
+ }
+
+ private boolean shouldWrite() {
+ return !dumpMetadataOnly && !excludeExternalTables;
+ }
+
+ /**
+ * this will dump a single line per external table. it can include additional lines for the same
+ * table if the table is partitioned and the partition location is outside the table.
+ */
+ void dataLocationDump(Table table)
+ throws InterruptedException, IOException, HiveException {
+ if (!shouldWrite()) {
+ return;
+ }
+ if (!TableType.EXTERNAL_TABLE.equals(table.getTableType())) {
+ throw new IllegalArgumentException(
+ "only External tables can be writen via this writer, provided table is " + table
+ .getTableType());
+ }
+ Path fullyQualifiedDataLocation =
+ PathBuilder.fullyQualifiedHDFSUri(table.getDataLocation(), FileSystem.get(hiveConf));
+ write(lineFor(table.getTableName(), fullyQualifiedDataLocation, hiveConf));
+ if (table.isPartitioned()) {
+ List<Partition> partitions = Hive.get(hiveConf).getPartitions(table);
+ for (Partition partition : partitions) {
+ boolean partitionLocOutsideTableLoc = !FileUtils.isPathWithinSubtree(
+ partition.getDataLocation(), table.getDataLocation()
+ );
+ if (partitionLocOutsideTableLoc) {
+ fullyQualifiedDataLocation = PathBuilder
+ .fullyQualifiedHDFSUri(partition.getDataLocation(), FileSystem.get(hiveConf));
+ write(lineFor(table.getTableName(), fullyQualifiedDataLocation, hiveConf));
+ }
+ }
+ }
+ }
+
+ private static String lineFor(String tableName, Path dataLoc, HiveConf hiveConf)
+ throws IOException, SemanticException {
+ StringWriter lineToWrite = new StringWriter();
+ lineToWrite.append(tableName).append(FIELD_SEPARATOR);
+ Path dataLocation =
+ PathBuilder.fullyQualifiedHDFSUri(dataLoc, dataLoc.getFileSystem(hiveConf));
+ byte[] encodedBytes = Base64.getEncoder()
+ .encode(dataLocation.toString().getBytes(StandardCharsets.UTF_8));
+ String encodedPath = new String(encodedBytes, StandardCharsets.UTF_8);
+ lineToWrite.append(encodedPath).append("\n");
+ return lineToWrite.toString();
+ }
+
+ private void write(String line) throws InterruptedException {
+ int currentRetry = 0;
+ while (currentRetry < MAX_RETRIES) {
+ try {
+ writer.write(line.getBytes(StandardCharsets.UTF_8));
+ break;
+ } catch (IOException e) {
+ currentRetry++;
+ if (currentRetry < MAX_RETRIES) {
+ LOG.warn("failed to write data with maxRetries {} due to", currentRetry, e);
+ } else {
+ LOG.error("failed to write data with maxRetries {} due to", currentRetry, e);
+ throw new RuntimeException("failed to write data", e);
+ }
+ Thread.sleep(100 * currentRetry * currentRetry);
+ writer = openWriterAppendMode();
+ }
+ }
+ }
+
+ private OutputStream openWriterAppendMode() {
+ try {
+ // not sure if the exception was due to a incorrect state within the writer hence closing it
+ close();
+ return FileSystem.get(hiveConf).append(writePath);
+ } catch (IOException e1) {
+ String message = "there was an error to open the file {} in append mode";
+ LOG.error(message, writePath.toString(), e1);
+ throw new IllegalStateException(message, e1);
+ }
+ }
+
+ @Override
+ public void close() throws IOException {
+ if (writer != null) {
+ writer.close();
+ }
+ }
+ }
+
+ public static class Reader {
+ private static Logger LOG = LoggerFactory.getLogger(Reader.class);
+ private final HiveConf hiveConf;
+ private final Path rootReplLoadPath;
+ private final boolean isIncrementalPhase;
+
+ public Reader(HiveConf conf, Path rootReplLoadPath, boolean isIncrementalPhase) {
+ this.hiveConf = conf;
+ this.rootReplLoadPath = rootReplLoadPath;
+ this.isIncrementalPhase = isIncrementalPhase;
+ }
+
+ /**
+ * currently we only support dump/load of single db and the db Dump location cannot be inferred from
+ * the incoming dbNameOfPattern value since the load db name can be different from the target db Name
+ * hence traverse 1 level down from rootReplLoadPath to look for the file providing the hdfs locations.
+ */
+ public Set<String> sourceLocationsToCopy() throws IOException {
+ if (isIncrementalPhase) {
+ return sourceLocationsToCopy(new Path(rootReplLoadPath, FILE_NAME));
+ }
+
+ // this is bootstrap load path
+ Set<String> locationsToCopy = new HashSet<>();
+ FileSystem fileSystem = rootReplLoadPath.getFileSystem(hiveConf);
+ FileStatus[] fileStatuses = fileSystem.listStatus(rootReplLoadPath);
+ for (FileStatus next : fileStatuses) {
+ if (next.isDirectory()) {
+ Path externalTableInfoPath = new Path(next.getPath(), FILE_NAME);
+ locationsToCopy.addAll(sourceLocationsToCopy(externalTableInfoPath));
+ }
+ }
+ return locationsToCopy;
+ }
+
+ private BufferedReader reader(FileSystem fs, Path externalTableInfo) throws IOException {
+ InputStreamReader in = new InputStreamReader(fs.open(externalTableInfo), StandardCharsets.UTF_8);
+ return new BufferedReader(in);
+ }
+
+ /**
+ * The SET of source locations should never be created based on the table Name in
+ * {@link #FILE_NAME} since there can be multiple entries for the same table in case the table is
+ * partitioned and the partitions are added by providing a separate Location for that partition,
+ * different than the table location.
+ */
+ private Set<String> sourceLocationsToCopy(Path externalTableInfo) throws IOException {
+ Set<String> locationsToCopy = new HashSet<>();
+ FileSystem fileSystem = externalTableInfo.getFileSystem(hiveConf);
+ if (!fileSystem.exists(externalTableInfo)) {
+ return locationsToCopy;
+ }
+
+ int currentRetry = 0;
+ BufferedReader reader = null;
+ while (currentRetry < MAX_RETRIES) {
+ try {
+ reader = reader(fileSystem, externalTableInfo);
+ for (String line = reader.readLine(); line != null; line = reader.readLine()) {
+ String[] splits = line.split(FIELD_SEPARATOR);
+ locationsToCopy
+ .add(new String(Base64.getDecoder().decode(splits[1]), StandardCharsets.UTF_8));
+ }
+ return locationsToCopy;
+ } catch (IOException e) {
+ currentRetry++;
+ if (currentRetry < MAX_RETRIES) {
+ closeQuietly(reader);
+ LOG.warn("failed to read {}", externalTableInfo.toString(), e);
+ } else {
+ LOG.error("failed to read {}", externalTableInfo.toString(), e);
+ throw e;
+ }
+ } finally {
+ closeQuietly(reader);
+ }
+ }
+ // we should never reach here
+ throw new IllegalStateException("we should never reach this condition");
+ }
+
+ private static void closeQuietly(BufferedReader reader) {
+ try {
+ if (reader != null) {
+ reader.close();
+ }
+ } catch (IOException e) {
+ LOG.debug("error while closing reader ", e);
+ }
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/hive/blob/b3ef75ea/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/ReplLoadTask.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/ReplLoadTask.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/ReplLoadTask.java
index 270670d..2126aab 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/ReplLoadTask.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/ReplLoadTask.java
@@ -228,8 +228,16 @@ public class ReplLoadTask extends Task<ReplLoadWork> implements Serializable {
createEndReplLogTask(context, scope, iterator.replLogger());
}
}
- boolean addAnotherLoadTask = iterator.hasNext() || loadTaskTracker.hasReplicationState()
- || constraintIterator.hasNext();
+
+ if (loadTaskTracker.canAddMoreTasks()) {
+ scope.rootTasks.addAll(new ExternalTableCopyTaskBuilder(work, conf).tasks(loadTaskTracker));
+ }
+
+ boolean addAnotherLoadTask = iterator.hasNext()
+ || loadTaskTracker.hasReplicationState()
+ || constraintIterator.hasNext()
+ || work.getPathsToCopyIterator().hasNext();
+
if (addAnotherLoadTask) {
createBuilderTask(scope.rootTasks);
}
@@ -300,7 +308,7 @@ public class ReplLoadTask extends Task<ReplLoadWork> implements Serializable {
private void partitionsPostProcessing(BootstrapEventsIterator iterator,
Scope scope, TaskTracker loadTaskTracker, TaskTracker tableTracker,
- TaskTracker partitionsTracker) throws SemanticException {
+ TaskTracker partitionsTracker) {
setUpDependencies(tableTracker, partitionsTracker);
if (!scope.database && !scope.table) {
scope.rootTasks.addAll(partitionsTracker.tasks());
@@ -343,8 +351,43 @@ public class ReplLoadTask extends Task<ReplLoadWork> implements Serializable {
private int executeIncrementalLoad(DriverContext driverContext) {
try {
- IncrementalLoadTasksBuilder load = work.getIncrementalLoadTaskBuilder();
- this.childTasks = Collections.singletonList(load.build(driverContext, getHive(), LOG, work));
+ List<Task<? extends Serializable>> childTasks = new ArrayList<>();
+ int parallelism = conf.getIntVar(HiveConf.ConfVars.EXECPARALLETHREADNUMBER);
+ // during incremental we will have no parallelism from replication tasks since they are event based
+ // and hence are linear. To achieve prallelism we have to use copy tasks(which have no DAG) for
+ // all threads except one, in execution phase.
+ int maxTasks = conf.getIntVar(HiveConf.ConfVars.REPL_APPROX_MAX_LOAD_TASKS);
+ IncrementalLoadTasksBuilder builder = work.getIncrementalLoadTaskBuilder();
+
+ // If the total number of tasks that can be created are less than the parallelism we can achieve
+ // do nothing since someone is working on 1950's machine. else try to achieve max parallelism
+ int calculatedMaxNumOfTasks = 0, maxNumOfHDFSTasks = 0;
+ if (maxTasks <= parallelism) {
+ if (builder.hasMoreWork()) {
+ calculatedMaxNumOfTasks = maxTasks;
+ } else {
+ maxNumOfHDFSTasks = maxTasks;
+ }
+ } else {
+ calculatedMaxNumOfTasks = maxTasks - parallelism + 1;
+ maxNumOfHDFSTasks = parallelism - 1;
+ }
+ TaskTracker trackerForReplIncremental = new TaskTracker(calculatedMaxNumOfTasks);
+ Task<? extends Serializable> incrementalLoadTaskRoot =
+ builder.build(driverContext, getHive(), LOG, work, trackerForReplIncremental);
+ // we are adding the incremental task first so that its always processed first,
+ // followed by dir copy tasks if capacity allows.
+ childTasks.add(incrementalLoadTaskRoot);
+
+ TaskTracker trackerForCopy = new TaskTracker(maxNumOfHDFSTasks);
+ childTasks
+ .addAll(new ExternalTableCopyTaskBuilder(work, conf).tasks(trackerForCopy));
+
+ // either the incremental has more work or the external table file copy has more paths to process
+ if (builder.hasMoreWork() || work.getPathsToCopyIterator().hasNext()) {
+ DAGTraversal.traverse(childTasks, new AddDependencyToLeaves(TaskFactory.get(work, conf)));
+ }
+ this.childTasks = childTasks;
return 0;
} catch (Exception e) {
LOG.error("failed replication", e);
http://git-wip-us.apache.org/repos/asf/hive/blob/b3ef75ea/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/ReplLoadWork.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/ReplLoadWork.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/ReplLoadWork.java
index ff21b6a..e86a5fa 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/ReplLoadWork.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/ReplLoadWork.java
@@ -29,6 +29,9 @@ import org.apache.hadoop.hive.ql.exec.Task;
import java.io.IOException;
import java.io.Serializable;
+import java.util.Iterator;
+import java.util.List;
+import static org.apache.hadoop.hive.ql.exec.repl.ExternalTableCopyTaskBuilder.DirCopyWork;
@Explain(displayName = "Replication Load Operator", explainLevels = { Explain.Level.USER,
Explain.Level.DEFAULT,
@@ -37,13 +40,14 @@ public class ReplLoadWork implements Serializable {
final String dbNameToLoadIn;
final String tableNameToLoadIn;
final String dumpDirectory;
- private final transient BootstrapEventsIterator bootstrapIterator;
private final ConstraintEventsIterator constraintsIterator;
- private final transient IncrementalLoadEventsIterator incrementalIterator;
private int loadTaskRunCount = 0;
private DatabaseEvent.State state = null;
+ private final transient BootstrapEventsIterator bootstrapIterator;
+ private final transient IncrementalLoadEventsIterator incrementalIterator;
private final transient IncrementalLoadTasksBuilder incrementalLoad;
private transient Task<? extends Serializable> rootTask;
+ private final transient Iterator<DirCopyWork> pathsToCopyIterator;
/*
these are sessionState objects that are copied over to work to allow for parallel execution.
@@ -53,7 +57,8 @@ public class ReplLoadWork implements Serializable {
final LineageState sessionStateLineageState;
public ReplLoadWork(HiveConf hiveConf, String dumpDirectory, String dbNameToLoadIn,
- String tableNameToLoadIn, LineageState lineageState, boolean isIncrementalDump, Long eventTo) throws IOException {
+ String tableNameToLoadIn, LineageState lineageState, boolean isIncrementalDump, Long eventTo,
+ List<DirCopyWork> pathsToCopyIterator) throws IOException {
this.tableNameToLoadIn = tableNameToLoadIn;
sessionStateLineageState = lineageState;
this.dumpDirectory = dumpDirectory;
@@ -63,7 +68,8 @@ public class ReplLoadWork implements Serializable {
incrementalIterator = new IncrementalLoadEventsIterator(dumpDirectory, hiveConf);
this.bootstrapIterator = null;
this.constraintsIterator = null;
- incrementalLoad = new IncrementalLoadTasksBuilder(dbNameToLoadIn, tableNameToLoadIn, dumpDirectory,
+ incrementalLoad =
+ new IncrementalLoadTasksBuilder(dbNameToLoadIn, tableNameToLoadIn, dumpDirectory,
incrementalIterator, hiveConf, eventTo);
} else {
this.bootstrapIterator = new BootstrapEventsIterator(dumpDirectory, dbNameToLoadIn, hiveConf);
@@ -71,6 +77,7 @@ public class ReplLoadWork implements Serializable {
incrementalIterator = null;
incrementalLoad = null;
}
+ this.pathsToCopyIterator = pathsToCopyIterator.iterator();
}
public BootstrapEventsIterator iterator() {
@@ -116,4 +123,8 @@ public class ReplLoadWork implements Serializable {
public void setRootTask(Task<? extends Serializable> rootTask) {
this.rootTask = rootTask;
}
+
+ public Iterator<DirCopyWork> getPathsToCopyIterator() {
+ return pathsToCopyIterator;
+ }
}
http://git-wip-us.apache.org/repos/asf/hive/blob/b3ef75ea/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/bootstrap/events/filesystem/BootstrapEventsIterator.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/bootstrap/events/filesystem/BootstrapEventsIterator.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/bootstrap/events/filesystem/BootstrapEventsIterator.java
index ebe0090..60ad6d3 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/bootstrap/events/filesystem/BootstrapEventsIterator.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/bootstrap/events/filesystem/BootstrapEventsIterator.java
@@ -37,6 +37,7 @@ import java.util.stream.Collectors;
/**
* Replication layout is from the root directory of replication Dump is
* db
+ * _external_tables_info
* table1
* _metadata
* data
http://git-wip-us.apache.org/repos/asf/hive/blob/b3ef75ea/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/bootstrap/events/filesystem/DatabaseEventsIterator.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/bootstrap/events/filesystem/DatabaseEventsIterator.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/bootstrap/events/filesystem/DatabaseEventsIterator.java
index d909945..e0f8f72 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/bootstrap/events/filesystem/DatabaseEventsIterator.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/bootstrap/events/filesystem/DatabaseEventsIterator.java
@@ -24,6 +24,7 @@ import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.RemoteIterator;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.hive.conf.HiveConf;
+import org.apache.hadoop.hive.ql.exec.repl.ReplExternalTables;
import org.apache.hadoop.hive.ql.exec.repl.bootstrap.events.BootstrapEvent;
import org.apache.hadoop.hive.ql.exec.repl.bootstrap.load.ReplicationState;
import org.apache.hadoop.hive.ql.parse.EximUtil;
@@ -120,6 +121,11 @@ class DatabaseEventsIterator implements Iterator<BootstrapEvent> {
if (replicationState == null && next == null) {
while (remoteIterator.hasNext()) {
LocatedFileStatus next = remoteIterator.next();
+ // we want to skip this file, this also means there cant be a table with name represented
+ // by constantReplExternalTables.FILE_NAME
+ if(next.getPath().toString().endsWith(ReplExternalTables.FILE_NAME)) {
+ continue;
+ }
if (next.getPath().toString().endsWith(EximUtil.METADATA_NAME)) {
String replacedString = next.getPath().toString().replace(dbLevelPath.toString(), "");
List<String> filteredNames = Arrays.stream(replacedString.split(Path.SEPARATOR))
http://git-wip-us.apache.org/repos/asf/hive/blob/b3ef75ea/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/bootstrap/events/filesystem/FSTableEvent.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/bootstrap/events/filesystem/FSTableEvent.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/bootstrap/events/filesystem/FSTableEvent.java
index 357c693..599eb04 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/bootstrap/events/filesystem/FSTableEvent.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/bootstrap/events/filesystem/FSTableEvent.java
@@ -18,7 +18,6 @@
package org.apache.hadoop.hive.ql.exec.repl.bootstrap.events.filesystem;
import org.apache.commons.lang.StringUtils;
-import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hive.conf.HiveConf;
@@ -72,30 +71,50 @@ public class FSTableEvent implements TableEvent {
return fromPath;
}
+ /**
+ * To determine if the tableDesc is for an external table,
+ * use {@link ImportTableDesc#isExternal()}
+ * and not {@link ImportTableDesc#tableType()} method.
+ */
@Override
public ImportTableDesc tableDesc(String dbName) throws SemanticException {
try {
Table table = new Table(metadata.getTable());
+ boolean externalTableOnSource = TableType.EXTERNAL_TABLE.equals(table.getTableType());
// The table can be non acid in case of replication from 2.6 cluster.
if (!AcidUtils.isTransactionalTable(table)
&& hiveConf.getBoolVar(HiveConf.ConfVars.HIVE_STRICT_MANAGED_TABLES)
&& (table.getTableType() == TableType.MANAGED_TABLE)) {
Hive hiveDb = Hive.get(hiveConf);
//TODO : dump metadata should be read to make sure that migration is required.
- HiveStrictManagedMigration.TableMigrationOption migrationOption
- = HiveStrictManagedMigration.determineMigrationTypeAutomatically(table.getTTable(),
- table.getTableType(),null, (Configuration)hiveConf,
- hiveDb.getMSC(),true);
+ HiveStrictManagedMigration.TableMigrationOption migrationOption =
+ HiveStrictManagedMigration.determineMigrationTypeAutomatically(table.getTTable(),
+ table.getTableType(), null, hiveConf,
+ hiveDb.getMSC(), true);
HiveStrictManagedMigration.migrateTable(table.getTTable(), table.getTableType(),
migrationOption, false,
- getHiveUpdater(hiveConf), hiveDb.getMSC(), (Configuration)hiveConf);
+ getHiveUpdater(hiveConf), hiveDb.getMSC(), hiveConf);
// If the conversion is from non transactional to transactional table
if (AcidUtils.isTransactionalTable(table)) {
replicationSpec().setMigratingToTxnTable();
}
+ if (TableType.EXTERNAL_TABLE.equals(table.getTableType())) {
+ // since we have converted to an external table now after applying the migration rules the
+ // table location has to be set to null so that the location on the target is picked up
+ // based on default configuration
+ table.setDataLocation(null);
+ if(!externalTableOnSource) {
+ replicationSpec().setMigratingToExternalTable();
+ }
+ }
}
ImportTableDesc tableDesc
= new ImportTableDesc(StringUtils.isBlank(dbName) ? table.getDbName() : dbName, table);
+ if (TableType.EXTERNAL_TABLE.equals(table.getTableType())) {
+ tableDesc.setLocation(
+ table.getDataLocation() == null ? null : table.getDataLocation().toString());
+ tableDesc.setExternal(true);
+ }
tableDesc.setReplicationSpec(replicationSpec());
if (table.getTableType() == TableType.EXTERNAL_TABLE) {
tableDesc.setExternal(true);
@@ -150,8 +169,17 @@ public class FSTableEvent implements TableEvent {
partDesc.setSerdeParams(partition.getSd().getSerdeInfo().getParameters());
partDesc.setBucketCols(partition.getSd().getBucketCols());
partDesc.setSortCols(partition.getSd().getSortCols());
- partDesc.setLocation(new Path(fromPath,
- Warehouse.makePartName(tblDesc.getPartCols(), partition.getValues())).toString());
+ if (tblDesc.isExternal() && !replicationSpec().isMigratingToExternalTable()) {
+ // we have to provide the source location so target location can be derived.
+ partDesc.setLocation(partition.getSd().getLocation());
+ } else {
+ /**
+ * this is required for file listing of all files in a partition for managed table as described in
+ * {@link org.apache.hadoop.hive.ql.exec.repl.bootstrap.events.filesystem.BootstrapEventsIterator}
+ */
+ partDesc.setLocation(new Path(fromPath,
+ Warehouse.makePartName(tblDesc.getPartCols(), partition.getValues())).toString());
+ }
partsDesc.setReplicationSpec(replicationSpec());
return partsDesc;
} catch (Exception e) {
http://git-wip-us.apache.org/repos/asf/hive/blob/b3ef75ea/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/bootstrap/load/table/LoadPartitions.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/bootstrap/load/table/LoadPartitions.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/bootstrap/load/table/LoadPartitions.java
index 2e895a8..e182f31 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/bootstrap/load/table/LoadPartitions.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/bootstrap/load/table/LoadPartitions.java
@@ -18,15 +18,16 @@
package org.apache.hadoop.hive.ql.exec.repl.bootstrap.load.table;
import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hive.metastore.TableType;
import org.apache.hadoop.hive.metastore.Warehouse;
import org.apache.hadoop.hive.metastore.api.Database;
import org.apache.hadoop.hive.metastore.api.InvalidOperationException;
import org.apache.hadoop.hive.metastore.api.MetaException;
-import org.apache.hadoop.hive.metastore.utils.MetaStoreUtils;
import org.apache.hadoop.hive.ql.exec.ReplCopyTask;
import org.apache.hadoop.hive.ql.exec.Task;
import org.apache.hadoop.hive.ql.exec.TaskFactory;
import org.apache.hadoop.hive.ql.exec.Utilities;
+import org.apache.hadoop.hive.ql.exec.repl.ReplExternalTables;
import org.apache.hadoop.hive.ql.exec.repl.util.ReplUtils;
import org.apache.hadoop.hive.ql.exec.repl.util.ReplUtils.ReplLoadOpType;
import org.apache.hadoop.hive.ql.exec.repl.bootstrap.events.TableEvent;
@@ -54,7 +55,6 @@ import org.datanucleus.util.StringUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import java.io.IOException;
import java.io.Serializable;
import java.util.Collections;
import java.util.HashMap;
@@ -83,13 +83,13 @@ public class LoadPartitions {
public LoadPartitions(Context context, ReplLogger replLogger, TaskTracker tableTracker,
TableEvent event, String dbNameToLoadIn,
- TableContext tableContext) throws HiveException, IOException {
+ TableContext tableContext) throws HiveException {
this(context, replLogger, tableContext, tableTracker, event, dbNameToLoadIn, null);
}
public LoadPartitions(Context context, ReplLogger replLogger, TableContext tableContext,
TaskTracker limiter, TableEvent event, String dbNameToLoadIn,
- AddPartitionDesc lastReplicatedPartition) throws HiveException, IOException {
+ AddPartitionDesc lastReplicatedPartition) throws HiveException {
this.tracker = new TaskTracker(limiter);
this.event = event;
this.context = context;
@@ -101,26 +101,15 @@ public class LoadPartitions {
this.table = ImportSemanticAnalyzer.tableIfExists(tableDesc, context.hiveDb);
}
- private String location() throws MetaException, HiveException {
- Database parentDb = context.hiveDb.getDatabase(tableDesc.getDatabaseName());
- if (!tableContext.waitOnPrecursor()) {
- return context.warehouse.getDefaultTablePath(
- parentDb, tableDesc.getTableName(), tableDesc.isExternal()).toString();
- } else {
- Path tablePath = context.warehouse.getDefaultTablePath(
- tableDesc.getDatabaseName(), tableDesc.getTableName(), tableDesc.isExternal());
- return context.warehouse.getDnsPath(tablePath).toString();
- }
- }
-
public TaskTracker tasks() throws SemanticException {
try {
/*
We are doing this both in load table and load partitions
*/
- if (tableDesc.getLocation() == null) {
- tableDesc.setLocation(location());
- }
+ Database parentDb = context.hiveDb.getDatabase(tableDesc.getDatabaseName());
+ LoadTable.TableLocationTuple tableLocationTuple =
+ LoadTable.tableLocation(tableDesc, parentDb, tableContext, context);
+ tableDesc.setLocation(tableLocationTuple.location);
if (table == null) {
//new table
@@ -157,7 +146,7 @@ public class LoadPartitions {
}
}
- private void updateReplicationState(ReplicationState replicationState) throws SemanticException {
+ private void updateReplicationState(ReplicationState replicationState) {
if (!tracker.canAddMoreTasks()) {
tracker.setReplicationState(replicationState);
}
@@ -203,12 +192,26 @@ public class LoadPartitions {
* returns the root task for adding a partition
*/
private Task<?> tasksForAddPartition(Table table, AddPartitionDesc addPartitionDesc, Task<?> ptnRootTask)
- throws MetaException, IOException, HiveException {
+ throws MetaException, HiveException {
+ AddPartitionDesc.OnePartitionDesc partSpec = addPartitionDesc.getPartition(0);
+ Path sourceWarehousePartitionLocation = new Path(partSpec.getLocation());
+ Path replicaWarehousePartitionLocation = locationOnReplicaWarehouse(table, partSpec);
+ partSpec.setLocation(replicaWarehousePartitionLocation.toString());
+ LOG.debug("adding dependent CopyWork/AddPart/MoveWork for partition "
+ + partSpecToString(partSpec.getPartSpec()) + " with source location: "
+ + partSpec.getLocation());
+
Task<?> addPartTask = TaskFactory.get(
new DDLWork(new HashSet<>(), new HashSet<>(), addPartitionDesc),
context.hiveConf
);
- if (event.replicationSpec().isMetadataOnly()) {
+
+ boolean isOnlyDDLOperation = event.replicationSpec().isMetadataOnly()
+ || (TableType.EXTERNAL_TABLE.equals(table.getTableType())
+ && !event.replicationSpec().isMigratingToExternalTable()
+ );
+
+ if (isOnlyDDLOperation) {
if (ptnRootTask == null) {
ptnRootTask = addPartTask;
} else {
@@ -217,16 +220,7 @@ public class LoadPartitions {
return ptnRootTask;
}
- AddPartitionDesc.OnePartitionDesc partSpec = addPartitionDesc.getPartition(0);
- Path sourceWarehousePartitionLocation = new Path(partSpec.getLocation());
- Path replicaWarehousePartitionLocation = locationOnReplicaWarehouse(table, partSpec);
- partSpec.setLocation(replicaWarehousePartitionLocation.toString());
- LOG.debug("adding dependent CopyWork/AddPart/MoveWork for partition "
- + partSpecToString(partSpec.getPartSpec()) + " with source location: "
- + partSpec.getLocation());
-
- Path tmpPath = replicaWarehousePartitionLocation;
-
+ Path stagingDir = replicaWarehousePartitionLocation;
// if move optimization is enabled, copy the files directly to the target path. No need to create the staging dir.
LoadFileType loadFileType;
if (event.replicationSpec().isInReplicationScope() &&
@@ -236,25 +230,27 @@ public class LoadPartitions {
// Migrating to transactional tables in bootstrap load phase.
// It is enough to copy all the original files under base_1 dir and so write-id is hardcoded to 1.
// ReplTxnTask added earlier in the DAG ensure that the write-id=1 is made valid in HMS metadata.
- tmpPath = new Path(tmpPath, AcidUtils.baseDir(ReplUtils.REPL_BOOTSTRAP_MIGRATION_BASE_WRITE_ID));
+ stagingDir = new Path(stagingDir, AcidUtils.baseDir(ReplUtils.REPL_BOOTSTRAP_MIGRATION_BASE_WRITE_ID));
}
} else {
- loadFileType = (event.replicationSpec().isReplace() || event.replicationSpec().isMigratingToTxnTable())
- ? LoadFileType.REPLACE_ALL : LoadFileType.OVERWRITE_EXISTING;
- tmpPath = PathUtils.getExternalTmpPath(replicaWarehousePartitionLocation, context.pathInfo);
+ loadFileType = event.replicationSpec().isReplace() ? LoadFileType.REPLACE_ALL :
+ (event.replicationSpec().isMigratingToTxnTable()
+ ? LoadFileType.KEEP_EXISTING
+ : LoadFileType.OVERWRITE_EXISTING);
+ stagingDir = PathUtils.getExternalTmpPath(replicaWarehousePartitionLocation, context.pathInfo);
}
Task<?> copyTask = ReplCopyTask.getLoadCopyTask(
event.replicationSpec(),
sourceWarehousePartitionLocation,
- tmpPath,
+ stagingDir,
context.hiveConf
);
Task<?> movePartitionTask = null;
if (loadFileType != LoadFileType.IGNORE) {
// no need to create move task, if file is moved directly to target location.
- movePartitionTask = movePartitionTask(table, partSpec, tmpPath, loadFileType);
+ movePartitionTask = movePartitionTask(table, partSpec, stagingDir, loadFileType);
}
// Set Checkpoint task as dependant to add partition tasks. So, if same dump is retried for
@@ -321,9 +317,26 @@ public class LoadPartitions {
return TaskFactory.get(moveWork, context.hiveConf);
}
+ /**
+ * Since the table level location will be set by taking into account the base directory configuration
+ * for external table, we don't have to do anything specific for partition location since it will always
+ * be a child of the table level location.
+ * Looks like replication does not handle a specific location provided for a partition and the partition
+ * path will always be a child on target.
+ */
+
private Path locationOnReplicaWarehouse(Table table, AddPartitionDesc.OnePartitionDesc partSpec)
- throws MetaException, HiveException, IOException {
+ throws MetaException, HiveException {
String child = Warehouse.makePartPath(partSpec.getPartSpec());
+ if (tableDesc.isExternal()) {
+ if (event.replicationSpec().isMigratingToExternalTable()) {
+ return new Path(tableDesc.getLocation(), child);
+ }
+ String externalLocation =
+ ReplExternalTables.externalTableLocation(context.hiveConf, partSpec.getLocation());
+ return new Path(externalLocation);
+ }
+
if (tableDesc.getLocation() == null) {
if (table.getDataLocation() == null) {
Database parentDb = context.hiveDb.getDatabase(tableDesc.getDatabaseName());
http://git-wip-us.apache.org/repos/asf/hive/blob/b3ef75ea/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/bootstrap/load/table/LoadTable.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/bootstrap/load/table/LoadTable.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/bootstrap/load/table/LoadTable.java
index 520b410..e0f0979 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/bootstrap/load/table/LoadTable.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/bootstrap/load/table/LoadTable.java
@@ -20,15 +20,17 @@ package org.apache.hadoop.hive.ql.exec.repl.bootstrap.load.table;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hive.common.ValidReaderWriteIdList;
import org.apache.hadoop.hive.common.ValidWriteIdList;
+import org.apache.hadoop.hive.metastore.TableType;
+import org.apache.hadoop.hive.metastore.Warehouse;
import org.apache.hadoop.hive.metastore.api.Database;
import org.apache.hadoop.hive.metastore.api.InvalidOperationException;
import org.apache.hadoop.hive.metastore.api.MetaException;
-import org.apache.hadoop.hive.metastore.utils.MetaStoreUtils;
import org.apache.hadoop.hive.ql.ErrorMsg;
import org.apache.hadoop.hive.ql.exec.ReplCopyTask;
import org.apache.hadoop.hive.ql.exec.Task;
import org.apache.hadoop.hive.ql.exec.TaskFactory;
import org.apache.hadoop.hive.ql.exec.Utilities;
+import org.apache.hadoop.hive.ql.exec.repl.ReplExternalTables;
import org.apache.hadoop.hive.ql.exec.repl.util.ReplUtils;
import org.apache.hadoop.hive.ql.exec.repl.util.ReplUtils.ReplLoadOpType;
import org.apache.hadoop.hive.ql.exec.repl.bootstrap.events.TableEvent;
@@ -54,7 +56,6 @@ import org.apache.hadoop.hive.ql.plan.ReplTxnWork;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import java.io.IOException;
import java.io.Serializable;
import java.util.BitSet;
import java.util.Collections;
@@ -75,8 +76,7 @@ public class LoadTable {
private final TableEvent event;
public LoadTable(TableEvent event, Context context, ReplLogger replLogger,
- TableContext tableContext, TaskTracker limiter)
- throws SemanticException, IOException {
+ TableContext tableContext, TaskTracker limiter) {
this.event = event;
this.context = context;
this.replLogger = replLogger;
@@ -128,9 +128,9 @@ public class LoadTable {
break;
}
- if (tableDesc.getLocation() == null) {
- tableDesc.setLocation(location(tableDesc, parentDb));
- }
+ TableLocationTuple
+ tableLocationTuple = tableLocation(tableDesc, parentDb, tableContext, context);
+ tableDesc.setLocation(tableLocationTuple.location);
/* Note: In the following section, Metadata-only import handling logic is
interleaved with regular repl-import logic. The rule of thumb being
@@ -141,7 +141,7 @@ public class LoadTable {
or in the case of an unpartitioned table. In all other cases, it should
behave like a noop or a pure MD alter.
*/
- newTableTasks(tableDesc, tblRootTask);
+ newTableTasks(tableDesc, tblRootTask, tableLocationTuple);
// Set Checkpoint task as dependant to create table task. So, if same dump is retried for
// bootstrap, we skip current table update.
@@ -173,7 +173,8 @@ public class LoadTable {
return ReplLoadOpType.LOAD_REPLACE;
}
- private void newTableTasks(ImportTableDesc tblDesc, Task<?> tblRootTask) throws Exception {
+ private void newTableTasks(ImportTableDesc tblDesc, Task<?> tblRootTask, TableLocationTuple tuple)
+ throws Exception {
Table table = tblDesc.toTable(context.hiveConf);
ReplicationSpec replicationSpec = event.replicationSpec();
Task<?> createTableTask =
@@ -208,26 +209,58 @@ public class LoadTable {
parentTask.addDependentTask(replTxnTask);
parentTask = replTxnTask;
}
- if (!isPartitioned(tblDesc)) {
+ boolean shouldCreateLoadTableTask = (
+ !isPartitioned(tblDesc)
+ && !TableType.EXTERNAL_TABLE.equals(table.getTableType())
+ ) || tuple.isConvertedFromManagedToExternal;
+ if (shouldCreateLoadTableTask) {
LOG.debug("adding dependent ReplTxnTask/CopyWork/MoveWork for table");
- Task<?> loadTableTask =
- loadTableTask(table, replicationSpec, new Path(tblDesc.getLocation()),
+ Task<?> loadTableTask = loadTableTask(table, replicationSpec, new Path(tblDesc.getLocation()),
event.metadataPath());
parentTask.addDependentTask(loadTableTask);
}
tracker.addTask(tblRootTask);
}
- private String location(ImportTableDesc tblDesc, Database parentDb)
- throws MetaException, SemanticException {
- if (!tableContext.waitOnPrecursor()) {
- return context.warehouse.getDefaultTablePath(
- parentDb, tblDesc.getTableName(), tblDesc.isExternal()).toString();
+ static class TableLocationTuple {
+ final String location;
+ private final boolean isConvertedFromManagedToExternal;
+
+ TableLocationTuple(String location, boolean isConvertedFromManagedToExternal) {
+ this.location = location;
+ this.isConvertedFromManagedToExternal = isConvertedFromManagedToExternal;
+ }
+ }
+
+ static TableLocationTuple tableLocation(ImportTableDesc tblDesc, Database parentDb,
+ TableContext tableContext, Context context) throws MetaException, SemanticException {
+ Warehouse wh = context.warehouse;
+ Path defaultTablePath;
+ if (parentDb == null) {
+ defaultTablePath = wh.getDefaultTablePath(tblDesc.getDatabaseName(), tblDesc.getTableName(),
+ tblDesc.isExternal());
} else {
- Path tablePath = context.warehouse.getDefaultTablePath(
- tblDesc.getDatabaseName(), tblDesc.getTableName(), tblDesc.isExternal());
- return context.warehouse.getDnsPath(tablePath).toString();
+ defaultTablePath = wh.getDefaultTablePath(
+ parentDb, tblDesc.getTableName(), tblDesc.isExternal()
+ );
+ }
+ // dont use TableType.EXTERNAL_TABLE.equals(tblDesc.tableType()) since this comes in as managed always for tables.
+ if (tblDesc.isExternal()) {
+ if (tblDesc.getLocation() == null) {
+ // this is the use case when the table got converted to external table as part of migration
+ // related rules to be applied to replicated tables across different versions of hive.
+ return new TableLocationTuple(wh.getDnsPath(defaultTablePath).toString(), true);
+ }
+ String currentLocation = new Path(tblDesc.getLocation()).toUri().getPath();
+ String newLocation =
+ ReplExternalTables.externalTableLocation(context.hiveConf, currentLocation);
+ LOG.debug("external table {} data location is: {}", tblDesc.getTableName(), newLocation);
+ return new TableLocationTuple(newLocation, false);
}
+ Path path = tableContext.waitOnPrecursor()
+ ? wh.getDnsPath(defaultTablePath)
+ : wh.getDefaultTablePath(parentDb, tblDesc.getTableName(), tblDesc.isExternal());
+ return new TableLocationTuple(path.toString(), false);
}
private Task<?> loadTableTask(Table table, ReplicationSpec replicationSpec, Path tgtPath,
http://git-wip-us.apache.org/repos/asf/hive/blob/b3ef75ea/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/incremental/IncrementalLoadTasksBuilder.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/incremental/IncrementalLoadTasksBuilder.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/incremental/IncrementalLoadTasksBuilder.java
index ae6411d..7ae33e3 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/incremental/IncrementalLoadTasksBuilder.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/incremental/IncrementalLoadTasksBuilder.java
@@ -92,14 +92,13 @@ public class IncrementalLoadTasksBuilder {
}
public Task<? extends Serializable> build(DriverContext driverContext, Hive hive, Logger log,
- ReplLoadWork loadWork) throws Exception {
+ ReplLoadWork loadWork, TaskTracker tracker) throws Exception {
Task<? extends Serializable> evTaskRoot = TaskFactory.get(new DependencyCollectionWork());
Task<? extends Serializable> taskChainTail = evTaskRoot;
Long lastReplayedEvent = null;
this.log = log;
numIteration++;
this.log.debug("Iteration num " + numIteration);
- TaskTracker tracker = new TaskTracker(conf.getIntVar(HiveConf.ConfVars.REPL_APPROX_MAX_LOAD_TASKS));
while (iterator.hasNext() && tracker.canAddMoreTasks()) {
FileStatus dir = iterator.next();
@@ -153,10 +152,7 @@ public class IncrementalLoadTasksBuilder {
lastReplayedEvent = eventDmd.getEventTo();
}
- if (iterator.hasNext()) {
- // add load task to start the next iteration
- taskChainTail.addDependentTask(TaskFactory.get(loadWork, conf));
- } else {
+ if (!hasMoreWork()) {
// if no events were replayed, then add a task to update the last repl id of the database/table to last event id.
if (taskChainTail == evTaskRoot) {
String lastEventid = eventTo.toString();
@@ -177,10 +173,17 @@ public class IncrementalLoadTasksBuilder {
this.log.debug("Added {}:{} as a precursor of barrier task {}:{}",
taskChainTail.getClass(), taskChainTail.getId(),
barrierTask.getClass(), barrierTask.getId());
+ if (loadWork.getPathsToCopyIterator().hasNext()) {
+ taskChainTail.addDependentTask(TaskFactory.get(loadWork, conf));
+ }
}
return evTaskRoot;
}
+ public boolean hasMoreWork() {
+ return iterator.hasNext();
+ }
+
private boolean isEventNotReplayed(Map<String, String> params, FileStatus dir, DumpType dumpType) {
if (params != null && (params.containsKey(ReplicationSpec.KEY.CURR_STATE_ID.toString()))) {
String replLastId = params.get(ReplicationSpec.KEY.CURR_STATE_ID.toString());
http://git-wip-us.apache.org/repos/asf/hive/blob/b3ef75ea/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/util/ReplUtils.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/util/ReplUtils.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/util/ReplUtils.java
index 014192b..4fdd12a 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/util/ReplUtils.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/util/ReplUtils.java
@@ -108,7 +108,8 @@ public class ReplUtils {
public static Task<?> getTableReplLogTask(ImportTableDesc tableDesc, ReplLogger replLogger, HiveConf conf)
throws SemanticException {
- ReplStateLogWork replLogWork = new ReplStateLogWork(replLogger, tableDesc.getTableName(), tableDesc.tableType());
+ TableType tableType = tableDesc.isExternal() ? TableType.EXTERNAL_TABLE : tableDesc.tableType();
+ ReplStateLogWork replLogWork = new ReplStateLogWork(replLogger, tableDesc.getTableName(), tableType);
return TaskFactory.get(replLogWork, conf);
}
http://git-wip-us.apache.org/repos/asf/hive/blob/b3ef75ea/ql/src/java/org/apache/hadoop/hive/ql/metadata/Table.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/metadata/Table.java b/ql/src/java/org/apache/hadoop/hive/ql/metadata/Table.java
index 26f21cf..89b2db3 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/metadata/Table.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/metadata/Table.java
@@ -561,7 +561,7 @@ public class Table implements Serializable {
public void setDataLocation(Path path) {
this.path = path;
- tTable.getSd().setLocation(path.toString());
+ tTable.getSd().setLocation(path == null ? null : path.toString());
}
public void unsetDataLocation() {
http://git-wip-us.apache.org/repos/asf/hive/blob/b3ef75ea/ql/src/java/org/apache/hadoop/hive/ql/parse/BaseSemanticAnalyzer.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/BaseSemanticAnalyzer.java b/ql/src/java/org/apache/hadoop/hive/ql/parse/BaseSemanticAnalyzer.java
index 833757c..fb31254 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/parse/BaseSemanticAnalyzer.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/BaseSemanticAnalyzer.java
@@ -19,6 +19,7 @@
package org.apache.hadoop.hive.ql.parse;
import java.io.IOException;
+import java.io.Serializable;
import java.io.UnsupportedEncodingException;
import java.text.ParseException;
import java.util.ArrayList;
@@ -118,7 +119,7 @@ public abstract class BaseSemanticAnalyzer {
protected final Hive db;
protected final HiveConf conf;
protected final QueryState queryState;
- protected List<Task<?>> rootTasks;
+ protected List<Task<? extends Serializable>> rootTasks;
protected FetchTask fetchTask;
protected final Logger LOG;
protected final LogHelper console;
http://git-wip-us.apache.org/repos/asf/hive/blob/b3ef75ea/ql/src/java/org/apache/hadoop/hive/ql/parse/ImportSemanticAnalyzer.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/ImportSemanticAnalyzer.java b/ql/src/java/org/apache/hadoop/hive/ql/parse/ImportSemanticAnalyzer.java
index 9c78108..a843987 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/parse/ImportSemanticAnalyzer.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/ImportSemanticAnalyzer.java
@@ -20,6 +20,7 @@ package org.apache.hadoop.hive.ql.parse;
import org.antlr.runtime.tree.Tree;
import org.apache.commons.lang.ObjectUtils;
+import org.apache.commons.lang.StringUtils;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
@@ -41,6 +42,7 @@ import org.apache.hadoop.hive.ql.exec.ReplCopyTask;
import org.apache.hadoop.hive.ql.exec.Task;
import org.apache.hadoop.hive.ql.exec.TaskFactory;
import org.apache.hadoop.hive.ql.exec.Utilities;
+import org.apache.hadoop.hive.ql.exec.repl.ReplExternalTables;
import org.apache.hadoop.hive.ql.hooks.WriteEntity;
import org.apache.hadoop.hive.ql.io.AcidUtils;
import org.apache.hadoop.hive.ql.io.HiveFileFormatUtils;
@@ -67,8 +69,8 @@ import org.apache.hadoop.hive.ql.util.HiveStrictManagedMigration;
import org.apache.hadoop.hive.serde.serdeConstants;
import org.apache.hadoop.mapred.OutputFormat;
import org.apache.thrift.TException;
-import org.datanucleus.util.StringUtils;
import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
import java.io.IOException;
import java.io.Serializable;
@@ -92,6 +94,8 @@ import static org.apache.hadoop.hive.ql.util.HiveStrictManagedMigration.getHiveU
*/
public class ImportSemanticAnalyzer extends BaseSemanticAnalyzer {
+ private static final Logger LOG = LoggerFactory.getLogger(ImportSemanticAnalyzer.class);
+
public ImportSemanticAnalyzer(QueryState queryState) throws SemanticException {
super(queryState);
}
@@ -283,7 +287,10 @@ public class ImportSemanticAnalyzer extends BaseSemanticAnalyzer {
}
tblDesc = getBaseCreateTableDescFromTable(dbname, tblObj);
if (TableType.valueOf(tblObj.getTableType()) == TableType.EXTERNAL_TABLE) {
+ replicationSpec.setMigratingToExternalTable();
tblDesc.setExternal(true);
+ // we should set this to null so default location for external tables is chosen on target
+ tblDesc.setLocation(null);
}
} else {
tblDesc = getBaseCreateTableDescFromTable(dbname, tblObj);
@@ -308,6 +315,7 @@ public class ImportSemanticAnalyzer extends BaseSemanticAnalyzer {
}
if (isLocationSet) {
+ STATIC_LOG.debug("table {} location is {}", tblDesc.getTableName(), parsedLocation);
tblDesc.setLocation(parsedLocation);
x.getInputs().add(toReadEntity(new Path(parsedLocation), x.getConf()));
}
@@ -316,11 +324,13 @@ public class ImportSemanticAnalyzer extends BaseSemanticAnalyzer {
tblDesc.setTableName(parsedTableName);
}
- List<AddPartitionDesc> partitionDescs = new ArrayList<AddPartitionDesc>();
+ List<AddPartitionDesc> partitionDescs = new ArrayList<>();
Iterable<Partition> partitions = rv.getPartitions();
for (Partition partition : partitions) {
// TODO: this should ideally not create AddPartitionDesc per partition
- AddPartitionDesc partsDesc = getBaseAddPartitionDescFromPartition(fromPath, dbname, tblDesc, partition);
+ AddPartitionDesc partsDesc =
+ getBaseAddPartitionDescFromPartition(fromPath, dbname, tblDesc, partition,
+ replicationSpec, x.getConf());
if (inReplicationScope){
StatsSetupConst.setBasicStatsState(partsDesc.getPartition(0).getPartParams(), StatsSetupConst.FALSE);
}
@@ -387,24 +397,25 @@ public class ImportSemanticAnalyzer extends BaseSemanticAnalyzer {
}
}
- if (!inReplicationScope) {
+ if (inReplicationScope) {
+ createReplImportTasks(
+ tblDesc, partitionDescs,
+ replicationSpec, waitOnPrecursor, table,
+ fromURI, wh, x, writeId, stmtId, updatedMetadata);
+ } else {
createRegularImportTasks(
tblDesc, partitionDescs,
isPartSpecSet, replicationSpec, table,
fromURI, fs, wh, x, writeId, stmtId);
- } else {
- createReplImportTasks(
- tblDesc, partitionDescs,
- replicationSpec, waitOnPrecursor, table,
- fromURI, fs, wh, x, writeId, stmtId, updatedMetadata);
}
return tableExists;
}
private static AddPartitionDesc getBaseAddPartitionDescFromPartition(
- Path fromPath, String dbname,
- ImportTableDesc tblDesc, Partition partition) throws MetaException, SemanticException {
- AddPartitionDesc partsDesc = new AddPartitionDesc(dbname, tblDesc.getTableName(),
+ Path fromPath, String dbName, ImportTableDesc tblDesc, Partition partition,
+ ReplicationSpec replicationSpec, HiveConf conf)
+ throws MetaException, SemanticException {
+ AddPartitionDesc partsDesc = new AddPartitionDesc(dbName, tblDesc.getTableName(),
EximUtil.makePartSpec(tblDesc.getPartCols(), partition.getValues()),
partition.getSd().getLocation(), partition.getParameters());
AddPartitionDesc.OnePartitionDesc partDesc = partsDesc.getPartition(0);
@@ -416,16 +427,23 @@ public class ImportSemanticAnalyzer extends BaseSemanticAnalyzer {
partDesc.setSerdeParams(partition.getSd().getSerdeInfo().getParameters());
partDesc.setBucketCols(partition.getSd().getBucketCols());
partDesc.setSortCols(partition.getSd().getSortCols());
- partDesc.setLocation(new Path(fromPath,
- Warehouse.makePartName(tblDesc.getPartCols(), partition.getValues())).toString());
+ if (replicationSpec.isInReplicationScope() && tblDesc.isExternal()
+ && !replicationSpec.isMigratingToExternalTable()) {
+ String newLocation = ReplExternalTables
+ .externalTableLocation(conf, partition.getSd().getLocation());
+ LOG.debug("partition {} has data location: {}", partition, newLocation);
+ partDesc.setLocation(newLocation);
+ } else {
+ partDesc.setLocation(new Path(fromPath,
+ Warehouse.makePartName(tblDesc.getPartCols(), partition.getValues())).toString());
+ }
return partsDesc;
}
private static ImportTableDesc getBaseCreateTableDescFromTable(String dbName,
org.apache.hadoop.hive.metastore.api.Table tblObj) throws Exception {
Table table = new Table(tblObj);
- ImportTableDesc tblDesc = new ImportTableDesc(dbName, table);
- return tblDesc;
+ return new ImportTableDesc(dbName, table);
}
private static Task<?> loadTable(URI fromURI, Table table, boolean replace, Path tgtPath,
@@ -543,8 +561,7 @@ public class ImportSemanticAnalyzer extends BaseSemanticAnalyzer {
}
private static Task<? extends Serializable> alterSinglePartition(
- URI fromURI, FileSystem fs, ImportTableDesc tblDesc,
- Table table, Warehouse wh, AddPartitionDesc addPartitionDesc,
+ ImportTableDesc tblDesc, Table table, Warehouse wh, AddPartitionDesc addPartitionDesc,
ReplicationSpec replicationSpec, org.apache.hadoop.hive.ql.metadata.Partition ptn,
EximUtil.SemanticAnalyzerWrapperContext x) throws MetaException, IOException, HiveException {
addPartitionDesc.setReplaceMode(true);
@@ -553,14 +570,14 @@ public class ImportSemanticAnalyzer extends BaseSemanticAnalyzer {
}
AddPartitionDesc.OnePartitionDesc partSpec = addPartitionDesc.getPartition(0);
if (ptn == null) {
- fixLocationInPartSpec(fs, tblDesc, table, wh, replicationSpec, partSpec, x);
- } else {
+ fixLocationInPartSpec(tblDesc, table, wh, replicationSpec, partSpec, x);
+ } else if (!externalTablePartition(tblDesc, replicationSpec)) {
partSpec.setLocation(ptn.getLocation()); // use existing location
}
return TaskFactory.get(new DDLWork(x.getInputs(), x.getOutputs(), addPartitionDesc), x.getConf());
}
- private static Task<?> addSinglePartition(URI fromURI, FileSystem fs, ImportTableDesc tblDesc,
+ private static Task<?> addSinglePartition(ImportTableDesc tblDesc,
Table table, Warehouse wh, AddPartitionDesc addPartitionDesc, ReplicationSpec replicationSpec,
EximUtil.SemanticAnalyzerWrapperContext x, Long writeId, int stmtId)
throws MetaException, IOException, HiveException {
@@ -579,7 +596,7 @@ public class ImportSemanticAnalyzer extends BaseSemanticAnalyzer {
return addPartTask;
} else {
String srcLocation = partSpec.getLocation();
- fixLocationInPartSpec(fs, tblDesc, table, wh, replicationSpec, partSpec, x);
+ fixLocationInPartSpec(tblDesc, table, wh, replicationSpec, partSpec, x);
x.getLOG().debug("adding dependent CopyWork/AddPart/MoveWork for partition "
+ partSpecToString(partSpec.getPartSpec())
+ " with source location: " + srcLocation);
@@ -604,7 +621,7 @@ public class ImportSemanticAnalyzer extends BaseSemanticAnalyzer {
LoadFileType.REPLACE_ALL :
replicationSpec.isMigratingToTxnTable() ? LoadFileType.KEEP_EXISTING : LoadFileType.OVERWRITE_EXISTING;
//Replication scope the write id will be invalid
- Boolean useStagingDirectory = !AcidUtils.isTransactionalTable(table.getParameters()) ||
+ boolean useStagingDirectory = !AcidUtils.isTransactionalTable(table.getParameters()) ||
replicationSpec.isInReplicationScope();
destPath = useStagingDirectory ? x.getCtx().getExternalTmpPath(tgtLocation)
: new Path(tgtLocation, AcidUtils.deltaSubdir(writeId, writeId, stmtId));
@@ -683,12 +700,29 @@ public class ImportSemanticAnalyzer extends BaseSemanticAnalyzer {
/**
* Helper method to set location properly in partSpec
*/
- private static void fixLocationInPartSpec(
- FileSystem fs, ImportTableDesc tblDesc, Table table,
- Warehouse wh, ReplicationSpec replicationSpec,
- AddPartitionDesc.OnePartitionDesc partSpec,
+ private static void fixLocationInPartSpec(ImportTableDesc tblDesc, Table table,
+ Warehouse wh, ReplicationSpec replicationSpec, AddPartitionDesc.OnePartitionDesc partSpec,
EximUtil.SemanticAnalyzerWrapperContext x) throws MetaException, HiveException, IOException {
- Path tgtPath = null;
+ if (externalTablePartition(tblDesc, replicationSpec)) {
+ /*
+ we use isExternal and not tableType() method since that always gives type as managed table.
+ we don't do anything since for external table partitions the path is already set correctly
+ in {@link org.apache.hadoop.hive.ql.parse.repl.load.message.TableHandler}
+ */
+ if (replicationSpec.isMigratingToExternalTable()) {
+ // at this point the table.getDataLocation() should be set already for external tables
+ // using the correct values of default warehouse external table location on target.
+ partSpec.setLocation(new Path(tblDesc.getLocation(),
+ Warehouse.makePartPath(partSpec.getPartSpec())).toString());
+ LOG.debug("partition spec {} has location set to {} for a table migrating to external table"
+ + " from managed table",
+ StringUtils.join(partSpec.getPartSpec().entrySet(), ","),
+ partSpec.getLocation()
+ );
+ }
+ return;
+ }
+ Path tgtPath;
if (tblDesc.getLocation() == null) {
if (table.getDataLocation() != null) {
tgtPath = new Path(table.getDataLocation().toString(),
@@ -708,6 +742,12 @@ public class ImportSemanticAnalyzer extends BaseSemanticAnalyzer {
partSpec.setLocation(tgtPath.toString());
}
+ private static boolean externalTablePartition(ImportTableDesc tblDesc,
+ ReplicationSpec replicationSpec) {
+ return (replicationSpec != null) && replicationSpec.isInReplicationScope()
+ && tblDesc.isExternal();
+ }
+
public static void checkTargetLocationEmpty(FileSystem fs, Path targetPath, ReplicationSpec replicationSpec,
Logger logger)
throws IOException, SemanticException {
@@ -976,7 +1016,7 @@ public class ImportSemanticAnalyzer extends BaseSemanticAnalyzer {
org.apache.hadoop.hive.ql.metadata.Partition ptn = null;
if ((ptn = x.getHive().getPartition(table, partSpec, false)) == null) {
x.getTasks().add(addSinglePartition(
- fromURI, fs, tblDesc, table, wh, addPartitionDesc, replicationSpec, x, writeId, stmtId));
+ tblDesc, table, wh, addPartitionDesc, replicationSpec, x, writeId, stmtId));
} else {
throw new SemanticException(
ErrorMsg.PARTITION_EXISTS.getMsg(partSpecToString(partSpec)));
@@ -1006,7 +1046,7 @@ public class ImportSemanticAnalyzer extends BaseSemanticAnalyzer {
if (isPartitioned(tblDesc)) {
for (AddPartitionDesc addPartitionDesc : partitionDescs) {
- t.addDependentTask(addSinglePartition(fromURI, fs, tblDesc, table, wh, addPartitionDesc,
+ t.addDependentTask(addSinglePartition(tblDesc, table, wh, addPartitionDesc,
replicationSpec, x, writeId, stmtId));
}
} else {
@@ -1056,10 +1096,10 @@ public class ImportSemanticAnalyzer extends BaseSemanticAnalyzer {
ImportTableDesc tblDesc,
List<AddPartitionDesc> partitionDescs,
ReplicationSpec replicationSpec, boolean waitOnPrecursor,
- Table table, URI fromURI, FileSystem fs, Warehouse wh,
+ Table table, URI fromURI, Warehouse wh,
EximUtil.SemanticAnalyzerWrapperContext x, Long writeId, int stmtId,
UpdatedMetaDataTracker updatedMetadata)
- throws HiveException, URISyntaxException, IOException, MetaException {
+ throws HiveException, IOException, MetaException {
Task<?> dropTblTask = null;
WriteEntity.WriteType lockType = WriteEntity.WriteType.DDL_NO_LOCK;
@@ -1121,7 +1161,6 @@ public class ImportSemanticAnalyzer extends BaseSemanticAnalyzer {
tblDesc.setLocation(
wh.getDnsPath(wh.getDefaultTablePath(tblDesc.getDatabaseName(), tblDesc.getTableName(), tblDesc.isExternal())
).toString());
-
}
}
@@ -1147,7 +1186,8 @@ public class ImportSemanticAnalyzer extends BaseSemanticAnalyzer {
for (AddPartitionDesc addPartitionDesc : partitionDescs) {
addPartitionDesc.setReplicationSpec(replicationSpec);
t.addDependentTask(
- addSinglePartition(fromURI, fs, tblDesc, table, wh, addPartitionDesc, replicationSpec, x, writeId, stmtId));
+ addSinglePartition(tblDesc, table, wh, addPartitionDesc, replicationSpec, x,
+ writeId, stmtId));
if (updatedMetadata != null) {
updatedMetadata.addPartition(table.getDbName(), table.getTableName(),
addPartitionDesc.getPartition(0).getPartSpec());
@@ -1200,14 +1240,14 @@ public class ImportSemanticAnalyzer extends BaseSemanticAnalyzer {
if (ptn == null) {
if (!replicationSpec.isMetadataOnly()){
x.getTasks().add(addSinglePartition(
- fromURI, fs, tblDesc, table, wh, addPartitionDesc, replicationSpec, x, writeId, stmtId));
+ tblDesc, table, wh, addPartitionDesc, replicationSpec, x, writeId, stmtId));
if (updatedMetadata != null) {
updatedMetadata.addPartition(table.getDbName(), table.getTableName(),
addPartitionDesc.getPartition(0).getPartSpec());
}
} else {
x.getTasks().add(alterSinglePartition(
- fromURI, fs, tblDesc, table, wh, addPartitionDesc, replicationSpec, null, x));
+ tblDesc, table, wh, addPartitionDesc, replicationSpec, null, x));
if (updatedMetadata != null) {
updatedMetadata.addPartition(table.getDbName(), table.getTableName(),
addPartitionDesc.getPartition(0).getPartSpec());
@@ -1219,10 +1259,10 @@ public class ImportSemanticAnalyzer extends BaseSemanticAnalyzer {
if (replicationSpec.allowReplacementInto(ptn.getParameters())){
if (!replicationSpec.isMetadataOnly()){
x.getTasks().add(addSinglePartition(
- fromURI, fs, tblDesc, table, wh, addPartitionDesc, replicationSpec, x, writeId, stmtId));
+ tblDesc, table, wh, addPartitionDesc, replicationSpec, x, writeId, stmtId));
} else {
x.getTasks().add(alterSinglePartition(
- fromURI, fs, tblDesc, table, wh, addPartitionDesc, replicationSpec, ptn, x));
+ tblDesc, table, wh, addPartitionDesc, replicationSpec, ptn, x));
}
if (updatedMetadata != null) {
updatedMetadata.addPartition(table.getDbName(), table.getTableName(),
http://git-wip-us.apache.org/repos/asf/hive/blob/b3ef75ea/ql/src/java/org/apache/hadoop/hive/ql/parse/ReplicationSemanticAnalyzer.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/ReplicationSemanticAnalyzer.java b/ql/src/java/org/apache/hadoop/hive/ql/parse/ReplicationSemanticAnalyzer.java
index 1ebbb82..4e7595c 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/parse/ReplicationSemanticAnalyzer.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/ReplicationSemanticAnalyzer.java
@@ -19,7 +19,6 @@ package org.apache.hadoop.hive.ql.parse;
import org.antlr.runtime.tree.Tree;
import org.apache.commons.lang.StringUtils;
-import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hive.common.ValidTxnList;
@@ -37,17 +36,21 @@ import org.apache.hadoop.hive.ql.hooks.ReadEntity;
import org.apache.hadoop.hive.ql.metadata.Hive;
import org.apache.hadoop.hive.ql.metadata.HiveException;
import org.apache.hadoop.hive.ql.metadata.Table;
+import org.apache.hadoop.hive.ql.parse.repl.PathBuilder;
import org.apache.hadoop.hive.ql.parse.repl.dump.Utils;
import org.apache.hadoop.hive.ql.parse.repl.load.DumpMetaData;
import org.apache.hadoop.hive.ql.plan.PlanUtils;
import java.io.FileNotFoundException;
+import java.io.IOException;
import java.net.URI;
-import java.util.Arrays;
+import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.Map;
+import static org.apache.hadoop.hive.ql.exec.repl.ReplExternalTables.Reader;
+import static org.apache.hadoop.hive.ql.exec.repl.ExternalTableCopyTaskBuilder.DirCopyWork;
import static org.apache.hadoop.hive.conf.HiveConf.ConfVars.HIVEQUERYID;
import static org.apache.hadoop.hive.conf.HiveConf.ConfVars.REPL_DUMP_METADATA_ONLY;
import static org.apache.hadoop.hive.conf.HiveConf.ConfVars.REPL_ENABLE_MOVE_OPTIMIZATION;
@@ -371,15 +374,16 @@ public class ReplicationSemanticAnalyzer extends BaseSemanticAnalyzer {
DumpMetaData dmd = new DumpMetaData(loadPath, conf);
boolean evDump = false;
- if (dmd.isIncrementalDump()){
+ // we will decide what hdfs locations needs to be copied over here as well.
+ if (dmd.isIncrementalDump()) {
LOG.debug("{} contains an incremental dump", loadPath);
evDump = true;
} else {
LOG.debug("{} contains an bootstrap dump", loadPath);
}
-
ReplLoadWork replLoadWork = new ReplLoadWork(conf, loadPath.toString(), dbNameOrPattern,
- tblNameOrPattern, queryState.getLineageState(), evDump, dmd.getEventTo());
+ tblNameOrPattern, queryState.getLineageState(), evDump, dmd.getEventTo(),
+ dirLocationsToCopy(loadPath, evDump));
rootTasks.add(TaskFactory.get(replLoadWork, conf));
} catch (Exception e) {
// TODO : simple wrap & rethrow for now, clean up with error codes
@@ -387,6 +391,26 @@ public class ReplicationSemanticAnalyzer extends BaseSemanticAnalyzer {
}
}
+ private List<DirCopyWork> dirLocationsToCopy(Path loadPath, boolean isIncrementalPhase)
+ throws HiveException, IOException {
+ List<DirCopyWork> list = new ArrayList<>();
+ String baseDir = conf.get(HiveConf.ConfVars.REPL_EXTERNAL_TABLE_BASE_DIR.varname);
+ // this is done to remove any scheme related information that will be present in the base path
+ // specifically when we are replicating to cloud storage
+ Path basePath = new Path(baseDir);
+
+ for (String location : new Reader(conf, loadPath, isIncrementalPhase).sourceLocationsToCopy()) {
+ Path sourcePath = new Path(location);
+ String targetPathWithoutSchemeAndAuth = basePath.toUri().getPath() + sourcePath.toUri().getPath();
+ Path fullyQualifiedTargetUri = PathBuilder.fullyQualifiedHDFSUri(
+ new Path(targetPathWithoutSchemeAndAuth),
+ basePath.getFileSystem(conf)
+ );
+ list.add(new DirCopyWork(sourcePath, fullyQualifiedTargetUri));
+ }
+ return list;
+ }
+
private void setConfigs(ASTNode node) throws SemanticException {
Map<String, String> replConfigs = DDLSemanticAnalyzer.getProps(node);
if (null != replConfigs) {
http://git-wip-us.apache.org/repos/asf/hive/blob/b3ef75ea/ql/src/java/org/apache/hadoop/hive/ql/parse/ReplicationSpec.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/ReplicationSpec.java b/ql/src/java/org/apache/hadoop/hive/ql/parse/ReplicationSpec.java
index 39009ce..b087831 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/parse/ReplicationSpec.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/ReplicationSpec.java
@@ -48,6 +48,7 @@ public class ReplicationSpec {
private String validTxnList = null;
private Type specType = Type.DEFAULT; // DEFAULT means REPL_LOAD or BOOTSTRAP_DUMP or EXPORT
private boolean isMigratingToTxnTable = false;
+ private boolean isMigratingToExternalTable = false;
// Key definitions related to replication
public enum KEY {
@@ -410,4 +411,12 @@ public class ReplicationSpec {
public void setMigratingToTxnTable() {
isMigratingToTxnTable = true;
}
+
+ public boolean isMigratingToExternalTable() {
+ return isMigratingToExternalTable;
+ }
+
+ public void setMigratingToExternalTable() {
+ isMigratingToExternalTable = true;
+ }
}
http://git-wip-us.apache.org/repos/asf/hive/blob/b3ef75ea/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/CopyUtils.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/CopyUtils.java b/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/CopyUtils.java
index fe0eaf8..686fe7b 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/CopyUtils.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/CopyUtils.java
@@ -49,7 +49,7 @@ public class CopyUtils {
private static final Logger LOG = LoggerFactory.getLogger(CopyUtils.class);
// https://hadoop.apache.org/docs/stable/hadoop-project-dist/hadoop-hdfs/TransparentEncryption.html#Running_as_the_superuser
- private static final String RAW_RESERVED_VIRTUAL_PATH = "/.reserved/raw/";
+ public static final String RAW_RESERVED_VIRTUAL_PATH = "/.reserved/raw/";
private static final int MAX_COPY_RETRY = 5;
private final HiveConf hiveConf;
http://git-wip-us.apache.org/repos/asf/hive/blob/b3ef75ea/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/Utils.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/Utils.java b/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/Utils.java
index 83a9642..21df63c 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/Utils.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/Utils.java
@@ -182,12 +182,13 @@ public class Utils {
}
if (replicationSpec.isInReplicationScope()) {
- if (!hiveConf.getBoolVar(HiveConf.ConfVars.REPL_INCLUDE_EXTERNAL_TABLES) &&
- MetaStoreUtils.isExternalTable(tableHandle.getTTable()) && !replicationSpec.isMetadataOnly()) {
+ if (tableHandle.isTemporary()) {
return false;
}
- return !tableHandle.isTemporary();
+ if (MetaStoreUtils.isExternalTable(tableHandle.getTTable())) {
+ return hiveConf.getBoolVar(HiveConf.ConfVars.REPL_INCLUDE_EXTERNAL_TABLES) || replicationSpec.isMetadataOnly();
+ }
}
return true;
}
http://git-wip-us.apache.org/repos/asf/hive/blob/b3ef75ea/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/events/DropTableHandler.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/events/DropTableHandler.java b/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/events/DropTableHandler.java
index 7d17de2..f8a9ace 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/events/DropTableHandler.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/events/DropTableHandler.java
@@ -18,12 +18,11 @@
package org.apache.hadoop.hive.ql.parse.repl.dump.events;
import org.apache.hadoop.hive.metastore.api.NotificationEvent;
-
import org.apache.hadoop.hive.metastore.messaging.DropTableMessage;
import org.apache.hadoop.hive.ql.parse.repl.DumpType;
-
import org.apache.hadoop.hive.ql.parse.repl.load.DumpMetaData;
+
class DropTableHandler extends AbstractEventHandler<DropTableMessage> {
DropTableHandler(NotificationEvent event) {
http://git-wip-us.apache.org/repos/asf/hive/blob/b3ef75ea/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/events/InsertHandler.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/events/InsertHandler.java b/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/events/InsertHandler.java
index 842e20a..f029fee 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/events/InsertHandler.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/events/InsertHandler.java
@@ -20,6 +20,7 @@ package org.apache.hadoop.hive.ql.parse.repl.dump.events;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hive.conf.HiveConf;
+import org.apache.hadoop.hive.metastore.TableType;
import org.apache.hadoop.hive.metastore.api.NotificationEvent;
import org.apache.hadoop.hive.metastore.messaging.InsertMessage;
import org.apache.hadoop.hive.ql.io.AcidUtils;
@@ -53,6 +54,9 @@ class InsertHandler extends AbstractEventHandler<InsertMessage> {
return;
}
org.apache.hadoop.hive.ql.metadata.Table qlMdTable = tableObject(eventMessage);
+ if (TableType.EXTERNAL_TABLE.equals(qlMdTable.getTableType())) {
+ withinContext.replicationSpec.setNoop(true);
+ }
if (!Utils.shouldReplicate(withinContext.replicationSpec, qlMdTable, withinContext.hiveConf)) {
return;
http://git-wip-us.apache.org/repos/asf/hive/blob/b3ef75ea/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/io/PartitionSerializer.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/io/PartitionSerializer.java b/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/io/PartitionSerializer.java
index ecde3ce..ecd4c84 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/io/PartitionSerializer.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/io/PartitionSerializer.java
@@ -58,10 +58,6 @@ public class PartitionSerializer implements JsonWriter.Serializer {
ReplicationSpec.KEY.CURR_STATE_ID.toString(),
additionalPropertiesProvider.getCurrentReplicationState());
}
- if (isPartitionExternal()) {
- // Replication destination will not be external
- partition.putToParameters("EXTERNAL", "FALSE");
- }
}
writer.jsonGenerator.writeString(serializer.toString(partition, UTF_8));
writer.jsonGenerator.flush();