You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hive.apache.org by an...@apache.org on 2018/05/25 07:43:35 UTC
hive git commit: HIVE-19499: Bootstrap REPL LOAD shall add tasks to
create checkpoints for db/tables/partitions. (Sankar Hariappan,
reviewed by Mahesh Kumar Behera, Anishek Agarwal)
Repository: hive
Updated Branches:
refs/heads/master c358ef5af -> 8d5e8a60a
HIVE-19499: Bootstrap REPL LOAD shall add tasks to create checkpoints for db/tables/partitions. (Sankar Hariappan, reviewed by Mahesh Kumar Behera,Anishek Agarwal)
Project: http://git-wip-us.apache.org/repos/asf/hive/repo
Commit: http://git-wip-us.apache.org/repos/asf/hive/commit/8d5e8a60
Tree: http://git-wip-us.apache.org/repos/asf/hive/tree/8d5e8a60
Diff: http://git-wip-us.apache.org/repos/asf/hive/diff/8d5e8a60
Branch: refs/heads/master
Commit: 8d5e8a60ae6eeaaac8369a718fa7d5c04d142f57
Parents: c358ef5
Author: Anishek Agarwal <an...@gmail.com>
Authored: Fri May 25 13:13:22 2018 +0530
Committer: Anishek Agarwal <an...@gmail.com>
Committed: Fri May 25 13:13:22 2018 +0530
----------------------------------------------------------------------
...TestReplicationScenariosAcrossInstances.java | 46 ++++++++++++++++
.../hadoop/hive/ql/parse/WarehouseInstance.java | 15 +++++
.../hadoop/hive/ql/exec/repl/ReplUtils.java | 58 ++++++++++++++++++++
.../ql/exec/repl/bootstrap/ReplLoadTask.java | 3 +-
.../ql/exec/repl/bootstrap/ReplLoadWork.java | 2 +
.../exec/repl/bootstrap/load/LoadDatabase.java | 6 ++
.../exec/repl/bootstrap/load/TaskTracker.java | 19 ++++++-
.../bootstrap/load/table/LoadPartitions.java | 40 +++++++-------
.../repl/bootstrap/load/table/LoadTable.java | 34 +++++-------
.../exec/repl/bootstrap/load/util/Context.java | 4 +-
.../hadoop/hive/metastore/txn/TxnHandler.java | 49 ++++++++---------
11 files changed, 203 insertions(+), 73 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/hive/blob/8d5e8a60/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestReplicationScenariosAcrossInstances.java
----------------------------------------------------------------------
diff --git a/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestReplicationScenariosAcrossInstances.java b/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestReplicationScenariosAcrossInstances.java
index 8caa55c..bcbf113 100644
--- a/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestReplicationScenariosAcrossInstances.java
+++ b/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestReplicationScenariosAcrossInstances.java
@@ -25,9 +25,13 @@ import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hdfs.MiniDFSCluster;
import org.apache.hadoop.hive.conf.HiveConf;
+import org.apache.hadoop.hive.ql.exec.repl.ReplUtils;
import org.apache.hadoop.hive.ql.parse.repl.PathBuilder;
import org.apache.hadoop.hive.ql.util.DependencyResolver;
import org.apache.hadoop.hive.shims.Utils;
+import org.apache.hadoop.hive.metastore.api.Database;
+import org.apache.hadoop.hive.metastore.api.Partition;
+import org.apache.hadoop.hive.metastore.api.Table;
import org.junit.After;
import org.junit.AfterClass;
import org.junit.Before;
@@ -47,6 +51,7 @@ import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
+import java.util.Map;
import java.util.stream.Collectors;
import static org.hamcrest.CoreMatchers.equalTo;
@@ -54,6 +59,7 @@ import static org.hamcrest.CoreMatchers.is;
import static org.hamcrest.MatcherAssert.assertThat;
import static org.hamcrest.Matchers.containsInAnyOrder;
import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
public class TestReplicationScenariosAcrossInstances {
@Rule
@@ -757,4 +763,44 @@ public class TestReplicationScenariosAcrossInstances {
FileSystem fs = cSerdesTableDumpLocation.getFileSystem(primary.hiveConf);
assertFalse(fs.exists(cSerdesTableDumpLocation));
}
+
+ private void verifyIfCkptSet(Map<String, String> props, String dumpDir) {
+ assertTrue(props.containsKey(ReplUtils.REPL_CHECKPOINT_KEY));
+ assertTrue(props.get(ReplUtils.REPL_CHECKPOINT_KEY).equals(dumpDir));
+ }
+
+ @Test
+ public void testIfCkptSetForObjectsByBootstrapReplLoad() throws Throwable {
+ WarehouseInstance.Tuple tuple = primary
+ .run("use " + primaryDbName)
+ .run("create table t1 (id int)")
+ .run("insert into table t1 values (10)")
+ .run("create table t2 (place string) partitioned by (country string)")
+ .run("insert into table t2 partition(country='india') values ('bangalore')")
+ .run("insert into table t2 partition(country='uk') values ('london')")
+ .run("insert into table t2 partition(country='us') values ('sfo')")
+ .dump(primaryDbName, null);
+
+ replica.load(replicatedDbName, tuple.dumpLocation)
+ .run("use " + replicatedDbName)
+ .run("repl status " + replicatedDbName)
+ .verifyResult(tuple.lastReplicationId)
+ .run("show tables")
+ .verifyResults(new String[] { "t1", "t2" })
+ .run("select country from t2")
+ .verifyResults(Arrays.asList("india", "uk", "us"));
+
+ Database db = replica.getDatabase(replicatedDbName);
+ verifyIfCkptSet(db.getParameters(), tuple.dumpLocation);
+ Table t1 = replica.getTable(replicatedDbName, "t1");
+ verifyIfCkptSet(t1.getParameters(), tuple.dumpLocation);
+ Table t2 = replica.getTable(replicatedDbName, "t2");
+ verifyIfCkptSet(t2.getParameters(), tuple.dumpLocation);
+ Partition india = replica.getPartition(replicatedDbName, "t2", Collections.singletonList("india"));
+ verifyIfCkptSet(india.getParameters(), tuple.dumpLocation);
+ Partition us = replica.getPartition(replicatedDbName, "t2", Collections.singletonList("us"));
+ verifyIfCkptSet(us.getParameters(), tuple.dumpLocation);
+ Partition uk = replica.getPartition(replicatedDbName, "t2", Collections.singletonList("uk"));
+ verifyIfCkptSet(uk.getParameters(), tuple.dumpLocation);
+ }
}
http://git-wip-us.apache.org/repos/asf/hive/blob/8d5e8a60/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/WarehouseInstance.java
----------------------------------------------------------------------
diff --git a/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/WarehouseInstance.java b/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/WarehouseInstance.java
index dc31e92..17fd799 100644
--- a/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/WarehouseInstance.java
+++ b/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/WarehouseInstance.java
@@ -30,6 +30,9 @@ import org.apache.hadoop.hive.common.FileUtils;
import org.apache.hadoop.hive.conf.HiveConf;
import org.apache.hadoop.hive.metastore.HiveMetaStoreClient;
import org.apache.hadoop.hive.metastore.MetaStoreTestUtils;
+import org.apache.hadoop.hive.metastore.api.Database;
+import org.apache.hadoop.hive.metastore.api.Partition;
+import org.apache.hadoop.hive.metastore.api.Table;
import org.apache.hadoop.hive.metastore.txn.TxnDbUtil;
import org.apache.hadoop.hive.ql.DriverFactory;
import org.apache.hadoop.hive.ql.IDriver;
@@ -319,6 +322,18 @@ public class WarehouseInstance implements Closeable {
}
}
+ public Database getDatabase(String dbName) throws Exception {
+ return client.getDatabase(dbName);
+ }
+
+ public Table getTable(String dbName, String tableName) throws Exception {
+ return client.getTable(dbName, tableName);
+ }
+
+ public Partition getPartition(String dbName, String tableName, List<String> partValues) throws Exception {
+ return client.getPartition(dbName, tableName, partValues);
+ }
+
ReplicationV1CompatRule getReplivationV1CompatRule(List<String> testsToSkip) {
return new ReplicationV1CompatRule(client, hiveConf, testsToSkip);
}
http://git-wip-us.apache.org/repos/asf/hive/blob/8d5e8a60/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/ReplUtils.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/ReplUtils.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/ReplUtils.java
new file mode 100644
index 0000000..cbec3ad
--- /dev/null
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/ReplUtils.java
@@ -0,0 +1,58 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hive.ql.exec.repl;
+
+import org.apache.hadoop.hive.conf.HiveConf;
+import org.apache.hadoop.hive.ql.exec.Task;
+import org.apache.hadoop.hive.ql.exec.TaskFactory;
+import org.apache.hadoop.hive.ql.parse.SemanticException;
+import org.apache.hadoop.hive.ql.parse.repl.ReplLogger;
+import org.apache.hadoop.hive.ql.plan.AlterTableDesc;
+import org.apache.hadoop.hive.ql.plan.DDLWork;
+import org.apache.hadoop.hive.ql.plan.ImportTableDesc;
+import org.apache.hadoop.hive.ql.stats.StatsUtils;
+
+import java.util.HashMap;
+import java.util.HashSet;
+
+
+public class ReplUtils {
+
+ public static final String REPL_CHECKPOINT_KEY = "hive.repl.ckpt.key";
+
+ public static Task<?> getTableReplLogTask(ImportTableDesc tableDesc, ReplLogger replLogger, HiveConf conf)
+ throws SemanticException {
+ ReplStateLogWork replLogWork = new ReplStateLogWork(replLogger, tableDesc.getTableName(), tableDesc.tableType());
+ return TaskFactory.get(replLogWork, conf);
+ }
+
+ public static Task<?> getTableCheckpointTask(ImportTableDesc tableDesc, HashMap<String, String> partSpec,
+ String dumpRoot, HiveConf conf) throws SemanticException {
+ HashMap<String, String> mapProp = new HashMap<>();
+ mapProp.put(REPL_CHECKPOINT_KEY, dumpRoot);
+
+ AlterTableDesc alterTblDesc = new AlterTableDesc(AlterTableDesc.AlterTableTypes.ADDPROPS);
+ alterTblDesc.setProps(mapProp);
+ alterTblDesc.setOldName(
+ StatsUtils.getFullyQualifiedTableName(tableDesc.getDatabaseName(), tableDesc.getTableName()));
+ if (partSpec != null) {
+ alterTblDesc.setPartSpec(partSpec);
+ }
+ return TaskFactory.get(new DDLWork(new HashSet<>(), new HashSet<>(), alterTblDesc), conf);
+ }
+}
http://git-wip-us.apache.org/repos/asf/hive/blob/8d5e8a60/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/bootstrap/ReplLoadTask.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/bootstrap/ReplLoadTask.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/bootstrap/ReplLoadTask.java
index 748d318..97917f8 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/bootstrap/ReplLoadTask.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/bootstrap/ReplLoadTask.java
@@ -72,7 +72,8 @@ public class ReplLoadTask extends Task<ReplLoadWork> implements Serializable {
protected int execute(DriverContext driverContext) {
try {
int maxTasks = conf.getIntVar(HiveConf.ConfVars.REPL_APPROX_MAX_LOAD_TASKS);
- Context context = new Context(conf, getHive(), work.sessionStateLineageState, driverContext.getCtx());
+ Context context = new Context(work.dumpDirectory, conf, getHive(),
+ work.sessionStateLineageState, driverContext.getCtx());
TaskTracker loadTaskTracker = new TaskTracker(maxTasks);
/*
for now for simplicity we are doing just one directory ( one database ), come back to use
http://git-wip-us.apache.org/repos/asf/hive/blob/8d5e8a60/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/bootstrap/ReplLoadWork.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/bootstrap/ReplLoadWork.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/bootstrap/ReplLoadWork.java
index c1a9a62..048727f 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/bootstrap/ReplLoadWork.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/bootstrap/ReplLoadWork.java
@@ -33,6 +33,7 @@ import java.io.Serializable;
public class ReplLoadWork implements Serializable {
final String dbNameToLoadIn;
final String tableNameToLoadIn;
+ final String dumpDirectory;
private final BootstrapEventsIterator iterator;
private final ConstraintEventsIterator constraintsIterator;
private int loadTaskRunCount = 0;
@@ -50,6 +51,7 @@ public class ReplLoadWork implements Serializable {
throws IOException {
this.tableNameToLoadIn = tableNameToLoadIn;
sessionStateLineageState = lineageState;
+ this.dumpDirectory = dumpDirectory;
this.iterator = new BootstrapEventsIterator(dumpDirectory, dbNameToLoadIn, hiveConf);
this.constraintsIterator = new ConstraintEventsIterator(dumpDirectory, hiveConf);
this.dbNameToLoadIn = dbNameToLoadIn;
http://git-wip-us.apache.org/repos/asf/hive/blob/8d5e8a60/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/bootstrap/load/LoadDatabase.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/bootstrap/load/LoadDatabase.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/bootstrap/load/LoadDatabase.java
index 537c5e9..c5f2779 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/bootstrap/load/LoadDatabase.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/bootstrap/load/LoadDatabase.java
@@ -31,6 +31,7 @@ import org.apache.hadoop.hive.ql.plan.AlterDatabaseDesc;
import org.apache.hadoop.hive.ql.plan.CreateDatabaseDesc;
import org.apache.hadoop.hive.ql.plan.DDLWork;
import org.apache.hadoop.hive.ql.plan.PrincipalDesc;
+import org.apache.hadoop.hive.ql.exec.repl.ReplUtils;
import java.io.Serializable;
import java.util.HashMap;
@@ -84,6 +85,11 @@ public class LoadDatabase {
*/
Map<String, String> parameters = new HashMap<>(dbObj.getParameters());
parameters.remove(ReplicationSpec.KEY.CURR_STATE_ID.toString());
+
+ // Add the checkpoint key to the Database binding it to current dump directory.
+ // So, if retry using same dump, we shall skip Database object update.
+ parameters.put(ReplUtils.REPL_CHECKPOINT_KEY, context.dumpDirectory);
+
createDbDesc.setDatabaseProperties(parameters);
// note that we do not set location - for repl load, we want that auto-created.
createDbDesc.setIfNotExists(false);
http://git-wip-us.apache.org/repos/asf/hive/blob/8d5e8a60/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/bootstrap/load/TaskTracker.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/bootstrap/load/TaskTracker.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/bootstrap/load/TaskTracker.java
index 95f484b..f8f0801 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/bootstrap/load/TaskTracker.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/bootstrap/load/TaskTracker.java
@@ -18,6 +18,8 @@
package org.apache.hadoop.hive.ql.exec.repl.bootstrap.load;
import org.apache.hadoop.hive.ql.exec.Task;
+import org.apache.hadoop.hive.ql.exec.repl.bootstrap.AddDependencyToLeaves;
+import org.apache.hadoop.hive.ql.exec.util.DAGTraversal;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -64,8 +66,21 @@ public class TaskTracker {
updateTaskCount(task, visited);
}
- public void updateTaskCount(Task<? extends Serializable> task,
- List <Task<? extends Serializable>> visited) {
+ // This method is used to traverse the DAG created in tasks list and add the dependent task to
+ // the tail of each task chain.
+ public void addDependentTask(Task<? extends Serializable> dependent) {
+ if (tasks.isEmpty()) {
+ addTask(dependent);
+ } else {
+ DAGTraversal.traverse(tasks, new AddDependencyToLeaves(dependent));
+
+ List<Task<? extends Serializable>> visited = new ArrayList<>();
+ updateTaskCount(dependent, visited);
+ }
+ }
+
+ private void updateTaskCount(Task<? extends Serializable> task,
+ List <Task<? extends Serializable>> visited) {
numberOfTasks += 1;
visited.add(task);
if (task.getChildTasks() != null) {
http://git-wip-us.apache.org/repos/asf/hive/blob/8d5e8a60/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/bootstrap/load/table/LoadPartitions.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/bootstrap/load/table/LoadPartitions.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/bootstrap/load/table/LoadPartitions.java
index df7f30d..870f70a 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/bootstrap/load/table/LoadPartitions.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/bootstrap/load/table/LoadPartitions.java
@@ -26,14 +26,12 @@ import org.apache.hadoop.hive.ql.exec.ReplCopyTask;
import org.apache.hadoop.hive.ql.exec.Task;
import org.apache.hadoop.hive.ql.exec.TaskFactory;
import org.apache.hadoop.hive.ql.exec.Utilities;
-import org.apache.hadoop.hive.ql.exec.repl.ReplStateLogWork;
-import org.apache.hadoop.hive.ql.exec.repl.bootstrap.AddDependencyToLeaves;
+import org.apache.hadoop.hive.ql.exec.repl.ReplUtils;
import org.apache.hadoop.hive.ql.exec.repl.bootstrap.events.TableEvent;
import org.apache.hadoop.hive.ql.exec.repl.bootstrap.load.ReplicationState;
import org.apache.hadoop.hive.ql.exec.repl.bootstrap.load.TaskTracker;
import org.apache.hadoop.hive.ql.exec.repl.bootstrap.load.util.Context;
import org.apache.hadoop.hive.ql.exec.repl.bootstrap.load.util.PathUtils;
-import org.apache.hadoop.hive.ql.exec.util.DAGTraversal;
import org.apache.hadoop.hive.ql.io.AcidUtils;
import org.apache.hadoop.hive.ql.metadata.HiveException;
import org.apache.hadoop.hive.ql.metadata.Partition;
@@ -55,8 +53,8 @@ import org.slf4j.LoggerFactory;
import java.io.IOException;
import java.io.Serializable;
-import java.util.ArrayList;
import java.util.Collections;
+import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
@@ -112,21 +110,6 @@ public class LoadPartitions {
}
}
- private void createTableReplLogTask() throws SemanticException {
- ReplStateLogWork replLogWork = new ReplStateLogWork(replLogger,
- tableDesc.getTableName(), tableDesc.tableType());
- Task<ReplStateLogWork> replLogTask = TaskFactory.get(replLogWork, context.hiveConf);
-
- if (tracker.tasks().isEmpty()) {
- tracker.addTask(replLogTask);
- } else {
- DAGTraversal.traverse(tracker.tasks(), new AddDependencyToLeaves(replLogTask));
-
- List<Task<? extends Serializable>> visited = new ArrayList<>();
- tracker.updateTaskCount(replLogTask, visited);
- }
- }
-
public TaskTracker tasks() throws SemanticException {
try {
/*
@@ -143,7 +126,9 @@ public class LoadPartitions {
updateReplicationState(initialReplicationState());
if (!forNewTable().hasReplicationState()) {
// Add ReplStateLogTask only if no pending table load tasks left for next cycle
- createTableReplLogTask();
+ Task<? extends Serializable> replLogTask
+ = ReplUtils.getTableReplLogTask(tableDesc, replLogger, context.hiveConf);
+ tracker.addDependentTask(replLogTask);
}
return tracker;
}
@@ -155,7 +140,9 @@ public class LoadPartitions {
updateReplicationState(initialReplicationState());
if (!forExistingTable(lastReplicatedPartition).hasReplicationState()) {
// Add ReplStateLogTask only if no pending table load tasks left for next cycle
- createTableReplLogTask();
+ Task<? extends Serializable> replLogTask
+ = ReplUtils.getTableReplLogTask(tableDesc, replLogger, context.hiveConf);
+ tracker.addDependentTask(replLogTask);
}
return tracker;
}
@@ -229,8 +216,19 @@ public class LoadPartitions {
Task<?> movePartitionTask = movePartitionTask(table, partSpec, tmpPath);
+ // Set Checkpoint task as dependant to add partition tasks. So, if same dump is retried for
+ // bootstrap, we skip current partition update.
+ Task<?> ckptTask = ReplUtils.getTableCheckpointTask(
+ tableDesc,
+ (HashMap<String, String>)partSpec.getPartSpec(),
+ context.dumpDirectory,
+ context.hiveConf
+ );
+
copyTask.addDependentTask(addPartTask);
addPartTask.addDependentTask(movePartitionTask);
+ movePartitionTask.addDependentTask(ckptTask);
+
return copyTask;
}
http://git-wip-us.apache.org/repos/asf/hive/blob/8d5e8a60/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/bootstrap/load/table/LoadTable.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/bootstrap/load/table/LoadTable.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/bootstrap/load/table/LoadTable.java
index d10ca76..f2b7fa4 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/bootstrap/load/table/LoadTable.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/bootstrap/load/table/LoadTable.java
@@ -18,7 +18,6 @@
package org.apache.hadoop.hive.ql.exec.repl.bootstrap.load.table;
import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.hive.metastore.TableType;
import org.apache.hadoop.hive.metastore.api.Database;
import org.apache.hadoop.hive.metastore.api.MetaException;
import org.apache.hadoop.hive.metastore.utils.MetaStoreUtils;
@@ -27,13 +26,11 @@ import org.apache.hadoop.hive.ql.exec.ReplCopyTask;
import org.apache.hadoop.hive.ql.exec.Task;
import org.apache.hadoop.hive.ql.exec.TaskFactory;
import org.apache.hadoop.hive.ql.exec.Utilities;
-import org.apache.hadoop.hive.ql.exec.repl.ReplStateLogWork;
-import org.apache.hadoop.hive.ql.exec.repl.bootstrap.AddDependencyToLeaves;
+import org.apache.hadoop.hive.ql.exec.repl.ReplUtils;
import org.apache.hadoop.hive.ql.exec.repl.bootstrap.events.TableEvent;
import org.apache.hadoop.hive.ql.exec.repl.bootstrap.load.TaskTracker;
import org.apache.hadoop.hive.ql.exec.repl.bootstrap.load.util.Context;
import org.apache.hadoop.hive.ql.exec.repl.bootstrap.load.util.PathUtils;
-import org.apache.hadoop.hive.ql.exec.util.DAGTraversal;
import org.apache.hadoop.hive.ql.io.AcidUtils;
import org.apache.hadoop.hive.ql.metadata.Table;
import org.apache.hadoop.hive.ql.parse.EximUtil;
@@ -52,7 +49,6 @@ import org.slf4j.LoggerFactory;
import java.io.IOException;
import java.io.Serializable;
-import java.util.ArrayList;
import java.util.Collections;
import java.util.HashSet;
import java.util.List;
@@ -79,21 +75,6 @@ public class LoadTable {
this.tracker = new TaskTracker(limiter);
}
- private void createTableReplLogTask(String tableName, TableType tableType) throws SemanticException {
- ReplStateLogWork replLogWork = new ReplStateLogWork(replLogger,tableName, tableType);
- Task<ReplStateLogWork> replLogTask = TaskFactory.get(replLogWork);
- DAGTraversal.traverse(tracker.tasks(), new AddDependencyToLeaves(replLogTask));
-
- if (tracker.tasks().isEmpty()) {
- tracker.addTask(replLogTask);
- } else {
- DAGTraversal.traverse(tracker.tasks(), new AddDependencyToLeaves(replLogTask));
-
- List<Task<? extends Serializable>> visited = new ArrayList<>();
- tracker.updateTaskCount(replLogTask, visited);
- }
- }
-
public TaskTracker tasks() throws SemanticException {
// Path being passed to us is a table dump location. We go ahead and load it in as needed.
// If tblName is null, then we default to the table name specified in _metadata, which is good.
@@ -159,9 +140,20 @@ public class LoadTable {
existingTableTasks(tableDesc, table, replicationSpec);
}
+ // Set Checkpoint task as dependant to create table task. So, if same dump is retried for
+ // bootstrap, we skip current table update.
+ Task<?> ckptTask = ReplUtils.getTableCheckpointTask(
+ tableDesc,
+ null,
+ context.dumpDirectory,
+ context.hiveConf
+ );
if (!isPartitioned(tableDesc)) {
- createTableReplLogTask(tableDesc.getTableName(), tableDesc.tableType());
+ Task<? extends Serializable> replLogTask
+ = ReplUtils.getTableReplLogTask(tableDesc, replLogger, context.hiveConf);
+ ckptTask.addDependentTask(replLogTask);
}
+ tracker.addDependentTask(ckptTask);
return tracker;
} catch (Exception e) {
throw new SemanticException(e);
http://git-wip-us.apache.org/repos/asf/hive/blob/8d5e8a60/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/bootstrap/load/util/Context.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/bootstrap/load/util/Context.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/bootstrap/load/util/Context.java
index 7eae1ea..b90da06 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/bootstrap/load/util/Context.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/bootstrap/load/util/Context.java
@@ -24,6 +24,7 @@ import org.apache.hadoop.hive.ql.metadata.Hive;
import org.apache.hadoop.hive.ql.session.LineageState;
public class Context {
+ public final String dumpDirectory;
public final HiveConf hiveConf;
public final Hive hiveDb;
public final Warehouse warehouse;
@@ -38,9 +39,10 @@ public class Context {
public final LineageState sessionStateLineageState;
- public Context(HiveConf hiveConf, Hive hiveDb,
+ public Context(String dumpDirectory, HiveConf hiveConf, Hive hiveDb,
LineageState lineageState,
org.apache.hadoop.hive.ql.Context nestedContext) throws MetaException {
+ this.dumpDirectory = dumpDirectory;
this.hiveConf = hiveConf;
this.hiveDb = hiveDb;
this.warehouse = new Warehouse(hiveConf);
http://git-wip-us.apache.org/repos/asf/hive/blob/8d5e8a60/standalone-metastore/src/main/java/org/apache/hadoop/hive/metastore/txn/TxnHandler.java
----------------------------------------------------------------------
diff --git a/standalone-metastore/src/main/java/org/apache/hadoop/hive/metastore/txn/TxnHandler.java b/standalone-metastore/src/main/java/org/apache/hadoop/hive/metastore/txn/TxnHandler.java
index 21b9865..a2b5897 100644
--- a/standalone-metastore/src/main/java/org/apache/hadoop/hive/metastore/txn/TxnHandler.java
+++ b/standalone-metastore/src/main/java/org/apache/hadoop/hive/metastore/txn/TxnHandler.java
@@ -1136,11 +1136,19 @@ abstract class TxnHandler implements TxnStore, TxnStore.MutexAPI {
dbConn = getDbConn(Connection.TRANSACTION_READ_COMMITTED);
stmt = dbConn.createStatement();
- // Clean the txn to writeid map/TXN_COMPONENTS for the given table as we bootstrap here
- String sql = "delete from TXN_TO_WRITE_ID where t2w_database = " + quoteString(dbName)
- + " and t2w_table = " + quoteString(tblName);
- LOG.debug("Going to execute delete <" + sql + ">");
- stmt.executeUpdate(sql);
+ // Check if this txn state is already replicated for this given table. If yes, then it is
+ // idempotent case and just return.
+ String sql = "select nwi_next from NEXT_WRITE_ID where nwi_database = " + quoteString(dbName)
+ + " and nwi_table = " + quoteString(tblName);
+ LOG.debug("Going to execute query <" + sql + ">");
+
+ rs = stmt.executeQuery(sql);
+ if (rs.next()) {
+ LOG.info("Idempotent flow: WriteId state <" + validWriteIdList + "> is already applied for the table: "
+ + dbName + "." + tblName);
+ rollbackDBConn(dbConn);
+ return;
+ }
if (numAbortedWrites > 0) {
// Allocate/Map one txn per aborted writeId and abort the txn to mark writeid as aborted.
@@ -1173,30 +1181,17 @@ abstract class TxnHandler implements TxnStore, TxnStore.MutexAPI {
// There are some txns in the list which has no write id allocated and hence go ahead and do it.
// Get the next write id for the given table and update it with new next write id.
- // This is select for update query which takes a lock if the table entry is already there in NEXT_WRITE_ID
- sql = sqlGenerator.addForUpdateClause(
- "select nwi_next from NEXT_WRITE_ID where nwi_database = " + quoteString(dbName)
- + " and nwi_table = " + quoteString(tblName));
- LOG.debug("Going to execute query <" + sql + ">");
-
+ // It is expected NEXT_WRITE_ID doesn't have entry for this table and hence directly insert it.
long nextWriteId = validWriteIdList.getHighWatermark() + 1;
- rs = stmt.executeQuery(sql);
- if (!rs.next()) {
- // First allocation of write id (hwm+1) should add the table to the next_write_id meta table.
- sql = "insert into NEXT_WRITE_ID (nwi_database, nwi_table, nwi_next) values ("
- + quoteString(dbName) + "," + quoteString(tblName) + ","
- + Long.toString(nextWriteId) + ")";
- LOG.debug("Going to execute insert <" + sql + ">");
- stmt.execute(sql);
- } else {
- // Update the NEXT_WRITE_ID for the given table with hwm+1 from source
- sql = "update NEXT_WRITE_ID set nwi_next = " + (nextWriteId)
- + " where nwi_database = " + quoteString(dbName)
- + " and nwi_table = " + quoteString(tblName);
- LOG.debug("Going to execute update <" + sql + ">");
- stmt.executeUpdate(sql);
- }
+ // First allocation of write id (hwm+1) should add the table to the next_write_id meta table.
+ sql = "insert into NEXT_WRITE_ID (nwi_database, nwi_table, nwi_next) values ("
+ + quoteString(dbName) + "," + quoteString(tblName) + ","
+ + Long.toString(nextWriteId) + ")";
+ LOG.debug("Going to execute insert <" + sql + ">");
+ stmt.execute(sql);
+
+ LOG.info("WriteId state <" + validWriteIdList + "> is applied for the table: " + dbName + "." + tblName);
LOG.debug("Going to commit");
dbConn.commit();
} catch (SQLException e) {