You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hive.apache.org by sa...@apache.org on 2018/06/28 04:19:31 UTC
[1/2] hive git commit: HIVE-19829: Incremental replication load
should create tasks in execution phase rather than semantic phase (Mahesh
Kumar Behera, reviewed by Sankar Hariappan)
Repository: hive
Updated Branches:
refs/heads/master 6a8f4cbe3 -> 67b0a67c3
http://git-wip-us.apache.org/repos/asf/hive/blob/67b0a67c/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/incremental/IncrementalLoadEventsIterator.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/incremental/IncrementalLoadEventsIterator.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/incremental/IncrementalLoadEventsIterator.java
new file mode 100644
index 0000000..4b37c8d
--- /dev/null
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/incremental/IncrementalLoadEventsIterator.java
@@ -0,0 +1,73 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hive.ql.exec.repl.incremental;
+
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hive.conf.HiveConf;
+import org.apache.hadoop.hive.ql.parse.EximUtil;
+import org.apache.hadoop.hive.ql.parse.repl.load.EventDumpDirComparator;
+
+import java.io.IOException;
+import java.util.Arrays;
+import java.util.Iterator;
+import java.util.NoSuchElementException;
+
+/**
+ * IncrementalLoadEventsIterator
+ * Helper class to iterate through event dump directory.
+ */
+public class IncrementalLoadEventsIterator implements Iterator<FileStatus> {
+ private FileStatus[] eventDirs;
+ private int currentIndex;
+ private int numEvents;
+
+ public IncrementalLoadEventsIterator(String loadPath, HiveConf conf) throws IOException {
+ Path eventPath = new Path(loadPath);
+ FileSystem fs = eventPath.getFileSystem(conf);
+ eventDirs = fs.listStatus(eventPath, EximUtil.getDirectoryFilter(fs));
+ if ((eventDirs == null) || (eventDirs.length == 0)) {
+ throw new IllegalArgumentException("No data to load in path " + loadPath);
+ }
+ // For event dump, each sub-dir is an individual event dump.
+ // We need to guarantee that the directory listing we got is in order of event id.
+ Arrays.sort(eventDirs, new EventDumpDirComparator());
+ currentIndex = 0;
+ numEvents = eventDirs.length;
+ }
+
+ @Override
+ public boolean hasNext() {
+ return (eventDirs != null && currentIndex < numEvents);
+ }
+
+ @Override
+ public FileStatus next() {
+ if (hasNext()) {
+ return eventDirs[currentIndex++];
+ } else {
+ throw new NoSuchElementException("no more events");
+ }
+ }
+
+ public int getNumEvents() {
+ return numEvents;
+ }
+}
http://git-wip-us.apache.org/repos/asf/hive/blob/67b0a67c/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/incremental/IncrementalLoadTasksBuilder.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/incremental/IncrementalLoadTasksBuilder.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/incremental/IncrementalLoadTasksBuilder.java
new file mode 100644
index 0000000..9e0ce82
--- /dev/null
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/incremental/IncrementalLoadTasksBuilder.java
@@ -0,0 +1,311 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hive.ql.exec.repl.incremental;
+
+import org.apache.commons.lang3.StringUtils;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hive.conf.HiveConf;
+import org.apache.hadoop.hive.metastore.api.Database;
+import org.apache.hadoop.hive.ql.DriverContext;
+import org.apache.hadoop.hive.ql.exec.Task;
+import org.apache.hadoop.hive.ql.exec.TaskFactory;
+import org.apache.hadoop.hive.ql.exec.repl.ReplStateLogWork;
+import org.apache.hadoop.hive.ql.exec.repl.util.AddDependencyToLeaves;
+import org.apache.hadoop.hive.ql.exec.repl.util.TaskTracker;
+import org.apache.hadoop.hive.ql.exec.util.DAGTraversal;
+import org.apache.hadoop.hive.ql.hooks.ReadEntity;
+import org.apache.hadoop.hive.ql.hooks.WriteEntity;
+import org.apache.hadoop.hive.ql.metadata.Hive;
+import org.apache.hadoop.hive.ql.metadata.HiveException;
+import org.apache.hadoop.hive.ql.metadata.Table;
+import org.apache.hadoop.hive.ql.parse.ReplicationSpec;
+import org.apache.hadoop.hive.ql.parse.SemanticException;
+import org.apache.hadoop.hive.ql.parse.repl.DumpType;
+import org.apache.hadoop.hive.ql.parse.repl.ReplLogger;
+import org.apache.hadoop.hive.ql.parse.repl.load.DumpMetaData;
+import org.apache.hadoop.hive.ql.parse.repl.load.UpdatedMetaDataTracker;
+import org.apache.hadoop.hive.ql.parse.repl.load.log.IncrementalLoadLogger;
+import org.apache.hadoop.hive.ql.parse.repl.load.message.MessageHandler;
+import org.apache.hadoop.hive.ql.plan.AlterDatabaseDesc;
+import org.apache.hadoop.hive.ql.plan.AlterTableDesc;
+import org.apache.hadoop.hive.ql.plan.DDLWork;
+import org.apache.hadoop.hive.ql.plan.DependencyCollectionWork;
+import org.apache.hadoop.hive.ql.stats.StatsUtils;
+import org.slf4j.Logger;
+
+import java.io.Serializable;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.HashSet;
+
+/**
+ * IncrementalLoad
+ * Iterate through the dump directory and create tasks to load the events.
+ */
+public class IncrementalLoadTasksBuilder {
+ private final String dbName, tableName;
+ private final IncrementalLoadEventsIterator iterator;
+ private HashSet<ReadEntity> inputs;
+ private HashSet<WriteEntity> outputs;
+ private Logger log;
+ private final HiveConf conf;
+ private final ReplLogger replLogger;
+ private static long numIteration;
+
+ public IncrementalLoadTasksBuilder(String dbName, String tableName, String loadPath,
+ IncrementalLoadEventsIterator iterator, HiveConf conf) {
+ this.dbName = dbName;
+ this.tableName = tableName;
+ this.iterator = iterator;
+ inputs = new HashSet<>();
+ outputs = new HashSet<>();
+ log = null;
+ this.conf = conf;
+ replLogger = new IncrementalLoadLogger(dbName, loadPath, iterator.getNumEvents());
+ numIteration = 0;
+ replLogger.startLog();
+ }
+
+ public Task<? extends Serializable> build(DriverContext driverContext, Hive hive, Logger log) throws Exception {
+ Task<? extends Serializable> evTaskRoot = TaskFactory.get(new DependencyCollectionWork());
+ Task<? extends Serializable> taskChainTail = evTaskRoot;
+ Long lastReplayedEvent = null;
+ this.log = log;
+ numIteration++;
+ this.log.debug("Iteration num " + numIteration);
+ TaskTracker tracker = new TaskTracker(conf.getIntVar(HiveConf.ConfVars.REPL_APPROX_MAX_LOAD_TASKS));
+
+ while (iterator.hasNext() && tracker.canAddMoreTasks()) {
+ FileStatus dir = iterator.next();
+ String location = dir.getPath().toUri().toString();
+ DumpMetaData eventDmd = new DumpMetaData(new Path(location), conf);
+
+ if (!shouldReplayEvent(dir, eventDmd.getDumpType(), dbName, tableName)) {
+ this.log.debug("Skipping event {} from {} for table {}.{} maxTasks: {}",
+ eventDmd.getDumpType(), dir.getPath().toUri(), dbName, tableName, tracker.numberOfTasks());
+ continue;
+ }
+
+ this.log.debug("Loading event {} from {} for table {}.{} maxTasks: {}",
+ eventDmd.getDumpType(), dir.getPath().toUri(), dbName, tableName, tracker.numberOfTasks());
+
+ // event loads will behave similar to table loads, with one crucial difference
+ // precursor order is strict, and each event must be processed after the previous one.
+ // The way we handle this strict order is as follows:
+ // First, we start with a taskChainTail which is a dummy noop task (a DependecyCollectionTask)
+ // at the head of our event chain. For each event we process, we tell analyzeTableLoad to
+ // create tasks that use the taskChainTail as a dependency. Then, we collect all those tasks
+ // and introduce a new barrier task(also a DependencyCollectionTask) which depends on all
+ // these tasks. Then, this barrier task becomes our new taskChainTail. Thus, we get a set of
+ // tasks as follows:
+ //
+ // --->ev1.task1-- --->ev2.task1--
+ // / \ / \
+ // evTaskRoot-->*---->ev1.task2---*--> ev1.barrierTask-->*---->ev2.task2---*->evTaskChainTail
+ // \ /
+ // --->ev1.task3--
+ //
+ // Once this entire chain is generated, we add evTaskRoot to rootTasks, so as to execute the
+ // entire chain
+
+ MessageHandler.Context context = new MessageHandler.Context(dbName, tableName, location,
+ taskChainTail, eventDmd, conf, hive, driverContext.getCtx(), this.log);
+ List<Task<? extends Serializable>> evTasks = analyzeEventLoad(context);
+
+ if ((evTasks != null) && (!evTasks.isEmpty())) {
+ ReplStateLogWork replStateLogWork = new ReplStateLogWork(replLogger,
+ dir.getPath().getName(),
+ eventDmd.getDumpType().toString());
+ Task<? extends Serializable> barrierTask = TaskFactory.get(replStateLogWork);
+ AddDependencyToLeaves function = new AddDependencyToLeaves(barrierTask);
+ DAGTraversal.traverse(evTasks, function);
+ this.log.debug("Updated taskChainTail from {}:{} to {}:{}",
+ taskChainTail.getClass(), taskChainTail.getId(), barrierTask.getClass(), barrierTask.getId());
+ tracker.addTaskList(taskChainTail.getChildTasks());
+ taskChainTail = barrierTask;
+ }
+ lastReplayedEvent = eventDmd.getEventTo();
+ }
+
+ // If any event is there and db name is known, then dump the start and end logs
+ if (!evTaskRoot.equals(taskChainTail) && !iterator.hasNext()) {
+ Map<String, String> dbProps = new HashMap<>();
+ dbProps.put(ReplicationSpec.KEY.CURR_STATE_ID.toString(), String.valueOf(lastReplayedEvent));
+ ReplStateLogWork replStateLogWork = new ReplStateLogWork(replLogger, dbProps);
+ Task<? extends Serializable> barrierTask = TaskFactory.get(replStateLogWork, conf);
+ taskChainTail.addDependentTask(barrierTask);
+ this.log.debug("Added {}:{} as a precursor of barrier task {}:{}",
+ taskChainTail.getClass(), taskChainTail.getId(),
+ barrierTask.getClass(), barrierTask.getId());
+ }
+ return evTaskRoot;
+ }
+
+ private boolean isEventNotReplayed(Map<String, String> params, FileStatus dir, DumpType dumpType) {
+ if (params != null && (params.containsKey(ReplicationSpec.KEY.CURR_STATE_ID.toString()))) {
+ String replLastId = params.get(ReplicationSpec.KEY.CURR_STATE_ID.toString());
+ if (Long.parseLong(replLastId) >= Long.parseLong(dir.getPath().getName())) {
+ log.debug("Event " + dumpType + " with replId " + Long.parseLong(dir.getPath().getName())
+ + " is already replayed. LastReplId - " + Long.parseLong(replLastId));
+ return false;
+ }
+ }
+ return true;
+ }
+
+ private boolean shouldReplayEvent(FileStatus dir, DumpType dumpType, String dbName, String tableName) {
+ // if database itself is null then we can not filter out anything.
+ if (dbName == null || dbName.isEmpty()) {
+ return true;
+ } else if ((tableName == null) || (tableName.isEmpty())) {
+ Database database;
+ try {
+ database = Hive.get().getDatabase(dbName);
+ return isEventNotReplayed(database.getParameters(), dir, dumpType);
+ } catch (HiveException e) {
+ //may be the db is getting created in this load
+ log.debug("failed to get the database " + dbName);
+ return true;
+ }
+ } else {
+ Table tbl;
+ try {
+ tbl = Hive.get().getTable(dbName, tableName);
+ return isEventNotReplayed(tbl.getParameters(), dir, dumpType);
+ } catch (HiveException e) {
+ // may be the table is getting created in this load
+ log.debug("failed to get the table " + dbName + "." + tableName);
+ return true;
+ }
+ }
+ }
+
+ private List<Task<? extends Serializable>> analyzeEventLoad(MessageHandler.Context context) throws SemanticException {
+ MessageHandler messageHandler = context.dmd.getDumpType().handler();
+ List<Task<? extends Serializable>> tasks = messageHandler.handle(context);
+
+ if (context.precursor != null) {
+ for (Task<? extends Serializable> t : tasks) {
+ context.precursor.addDependentTask(t);
+ log.debug("Added {}:{} as a precursor of {}:{}",
+ context.precursor.getClass(), context.precursor.getId(), t.getClass(), t.getId());
+ }
+ }
+
+ inputs.addAll(messageHandler.readEntities());
+ outputs.addAll(messageHandler.writeEntities());
+ return addUpdateReplStateTasks(StringUtils.isEmpty(context.tableName), messageHandler.getUpdatedMetadata(), tasks);
+ }
+
+ private Task<? extends Serializable> tableUpdateReplStateTask(String dbName, String tableName,
+ Map<String, String> partSpec, String replState,
+ Task<? extends Serializable> preCursor) throws SemanticException {
+ HashMap<String, String> mapProp = new HashMap<>();
+ mapProp.put(ReplicationSpec.KEY.CURR_STATE_ID.toString(), replState);
+
+ AlterTableDesc alterTblDesc = new AlterTableDesc(
+ AlterTableDesc.AlterTableTypes.ADDPROPS, new ReplicationSpec(replState, replState));
+ alterTblDesc.setProps(mapProp);
+ alterTblDesc.setOldName(StatsUtils.getFullyQualifiedTableName(dbName, tableName));
+ alterTblDesc.setPartSpec((HashMap<String, String>)partSpec);
+
+ Task<? extends Serializable> updateReplIdTask = TaskFactory.get(new DDLWork(inputs, outputs, alterTblDesc), conf);
+
+ // Link the update repl state task with dependency collection task
+ if (preCursor != null) {
+ preCursor.addDependentTask(updateReplIdTask);
+ log.debug("Added {}:{} as a precursor of {}:{}", preCursor.getClass(), preCursor.getId(),
+ updateReplIdTask.getClass(), updateReplIdTask.getId());
+ }
+ return updateReplIdTask;
+ }
+
+ private Task<? extends Serializable> dbUpdateReplStateTask(String dbName, String replState,
+ Task<? extends Serializable> preCursor) {
+ HashMap<String, String> mapProp = new HashMap<>();
+ mapProp.put(ReplicationSpec.KEY.CURR_STATE_ID.toString(), replState);
+
+ AlterDatabaseDesc alterDbDesc = new AlterDatabaseDesc(dbName, mapProp, new ReplicationSpec(replState, replState));
+ Task<? extends Serializable> updateReplIdTask = TaskFactory.get(new DDLWork(inputs, outputs, alterDbDesc), conf);
+
+ // Link the update repl state task with dependency collection task
+ if (preCursor != null) {
+ preCursor.addDependentTask(updateReplIdTask);
+ log.debug("Added {}:{} as a precursor of {}:{}", preCursor.getClass(), preCursor.getId(),
+ updateReplIdTask.getClass(), updateReplIdTask.getId());
+ }
+ return updateReplIdTask;
+ }
+
+ private List<Task<? extends Serializable>> addUpdateReplStateTasks(boolean isDatabaseLoad,
+ UpdatedMetaDataTracker updatedMetadata,
+ List<Task<? extends Serializable>> importTasks) throws SemanticException {
+ String replState = updatedMetadata.getReplicationState();
+ String database = updatedMetadata.getDatabase();
+ String table = updatedMetadata.getTable();
+
+ // If no import tasks generated by the event or no table updated for table level load, then no
+ // need to update the repl state to any object.
+ if (importTasks.isEmpty() || (!isDatabaseLoad && (table == null))) {
+ log.debug("No objects need update of repl state: Either 0 import tasks or table level load");
+ return importTasks;
+ }
+
+ // Create a barrier task for dependency collection of import tasks
+ Task<? extends Serializable> barrierTask = TaskFactory.get(new DependencyCollectionWork());
+
+ // Link import tasks to the barrier task which will in-turn linked with repl state update tasks
+ for (Task<? extends Serializable> t : importTasks){
+ t.addDependentTask(barrierTask);
+ log.debug("Added {}:{} as a precursor of barrier task {}:{}",
+ t.getClass(), t.getId(), barrierTask.getClass(), barrierTask.getId());
+ }
+
+ List<Task<? extends Serializable>> tasks = new ArrayList<>();
+ Task<? extends Serializable> updateReplIdTask;
+
+ // If any partition is updated, then update repl state in partition object
+ for (final Map<String, String> partSpec : updatedMetadata.getPartitions()) {
+ updateReplIdTask = tableUpdateReplStateTask(database, table, partSpec, replState, barrierTask);
+ tasks.add(updateReplIdTask);
+ }
+
+ if (table != null) {
+ // If any table/partition is updated, then update repl state in table object
+ updateReplIdTask = tableUpdateReplStateTask(database, table, null, replState, barrierTask);
+ tasks.add(updateReplIdTask);
+ }
+
+ // For table level load, need not update replication state for the database
+ if (isDatabaseLoad) {
+ // If any table/partition is updated, then update repl state in db object
+ updateReplIdTask = dbUpdateReplStateTask(database, replState, barrierTask);
+ tasks.add(updateReplIdTask);
+ }
+
+ // At least one task would have been added to update the repl state
+ return tasks;
+ }
+
+ public static long getNumIteration() {
+ return numIteration;
+ }
+}
http://git-wip-us.apache.org/repos/asf/hive/blob/67b0a67c/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/util/AddDependencyToLeaves.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/util/AddDependencyToLeaves.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/util/AddDependencyToLeaves.java
new file mode 100644
index 0000000..284796f
--- /dev/null
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/util/AddDependencyToLeaves.java
@@ -0,0 +1,51 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hive.ql.exec.repl.util;
+
+import org.apache.hadoop.hive.ql.exec.Task;
+import org.apache.hadoop.hive.ql.exec.util.DAGTraversal;
+
+import java.io.Serializable;
+import java.util.Collections;
+import java.util.List;
+
+public class AddDependencyToLeaves implements DAGTraversal.Function {
+ private List<Task<? extends Serializable>> postDependencyCollectionTasks;
+
+ public AddDependencyToLeaves(List<Task<? extends Serializable>> postDependencyCollectionTasks) {
+ this.postDependencyCollectionTasks = postDependencyCollectionTasks;
+ }
+
+ public AddDependencyToLeaves(Task<? extends Serializable> postDependencyTask) {
+ this(Collections.singletonList(postDependencyTask));
+ }
+
+
+ @Override
+ public void process(Task<? extends Serializable> task) {
+ if (task.getChildTasks() == null) {
+ postDependencyCollectionTasks.forEach(task::addDependentTask);
+ }
+ }
+
+ @Override
+ public boolean skipProcessing(Task<? extends Serializable> task) {
+ return postDependencyCollectionTasks.contains(task);
+ }
+}
http://git-wip-us.apache.org/repos/asf/hive/blob/67b0a67c/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/util/ReplUtils.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/util/ReplUtils.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/util/ReplUtils.java
new file mode 100644
index 0000000..618be1d
--- /dev/null
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/util/ReplUtils.java
@@ -0,0 +1,124 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hive.ql.exec.repl.util;
+
+import org.apache.hadoop.hive.conf.HiveConf;
+import org.apache.hadoop.hive.metastore.api.InvalidOperationException;
+import org.apache.hadoop.hive.ql.exec.Task;
+import org.apache.hadoop.hive.ql.exec.TaskFactory;
+import org.apache.hadoop.hive.ql.exec.repl.ReplStateLogWork;
+import org.apache.hadoop.hive.ql.metadata.Table;
+import org.apache.hadoop.hive.ql.parse.DDLSemanticAnalyzer;
+import org.apache.hadoop.hive.ql.parse.SemanticException;
+import org.apache.hadoop.hive.ql.parse.repl.ReplLogger;
+import org.apache.hadoop.hive.ql.plan.AlterTableDesc;
+import org.apache.hadoop.hive.ql.plan.DDLWork;
+import org.apache.hadoop.hive.ql.plan.ExprNodeColumnDesc;
+import org.apache.hadoop.hive.ql.plan.ExprNodeConstantDesc;
+import org.apache.hadoop.hive.ql.plan.ExprNodeGenericFuncDesc;
+import org.apache.hadoop.hive.ql.plan.ImportTableDesc;
+import org.apache.hadoop.hive.ql.stats.StatsUtils;
+import org.apache.hadoop.hive.serde2.typeinfo.PrimitiveTypeInfo;
+import org.apache.hadoop.hive.serde2.typeinfo.TypeInfoFactory;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+
+
+public class ReplUtils {
+
+ public static final String REPL_CHECKPOINT_KEY = "hive.repl.ckpt.key";
+
+ /**
+ * Bootstrap REPL LOAD operation type on the examined object based on ckpt state.
+ */
+ public enum ReplLoadOpType {
+ LOAD_NEW, LOAD_SKIP, LOAD_REPLACE
+ }
+
+ public static Map<Integer, List<ExprNodeGenericFuncDesc>> genPartSpecs(
+ Table table, List<Map<String, String>> partitions) throws SemanticException {
+ Map<Integer, List<ExprNodeGenericFuncDesc>> partSpecs = new HashMap<>();
+ int partPrefixLength = 0;
+ if (partitions.size() > 0) {
+ partPrefixLength = partitions.get(0).size();
+ // pick the length of the first ptn, we expect all ptns listed to have the same number of
+ // key-vals.
+ }
+ List<ExprNodeGenericFuncDesc> partitionDesc = new ArrayList<>();
+ for (Map<String, String> ptn : partitions) {
+ // convert each key-value-map to appropriate expression.
+ ExprNodeGenericFuncDesc expr = null;
+ for (Map.Entry<String, String> kvp : ptn.entrySet()) {
+ String key = kvp.getKey();
+ Object val = kvp.getValue();
+ String type = table.getPartColByName(key).getType();
+ PrimitiveTypeInfo pti = TypeInfoFactory.getPrimitiveTypeInfo(type);
+ ExprNodeColumnDesc column = new ExprNodeColumnDesc(pti, key, null, true);
+ ExprNodeGenericFuncDesc op = DDLSemanticAnalyzer.makeBinaryPredicate(
+ "=", column, new ExprNodeConstantDesc(TypeInfoFactory.stringTypeInfo, val));
+ expr = (expr == null) ? op : DDLSemanticAnalyzer.makeBinaryPredicate("and", expr, op);
+ }
+ if (expr != null) {
+ partitionDesc.add(expr);
+ }
+ }
+ if (partitionDesc.size() > 0) {
+ partSpecs.put(partPrefixLength, partitionDesc);
+ }
+ return partSpecs;
+ }
+
+ public static Task<?> getTableReplLogTask(ImportTableDesc tableDesc, ReplLogger replLogger, HiveConf conf)
+ throws SemanticException {
+ ReplStateLogWork replLogWork = new ReplStateLogWork(replLogger, tableDesc.getTableName(), tableDesc.tableType());
+ return TaskFactory.get(replLogWork, conf);
+ }
+
+ public static Task<?> getTableCheckpointTask(ImportTableDesc tableDesc, HashMap<String, String> partSpec,
+ String dumpRoot, HiveConf conf) throws SemanticException {
+ HashMap<String, String> mapProp = new HashMap<>();
+ mapProp.put(REPL_CHECKPOINT_KEY, dumpRoot);
+
+ AlterTableDesc alterTblDesc = new AlterTableDesc(AlterTableDesc.AlterTableTypes.ADDPROPS);
+ alterTblDesc.setProps(mapProp);
+ alterTblDesc.setOldName(
+ StatsUtils.getFullyQualifiedTableName(tableDesc.getDatabaseName(), tableDesc.getTableName()));
+ if (partSpec != null) {
+ alterTblDesc.setPartSpec(partSpec);
+ }
+ return TaskFactory.get(new DDLWork(new HashSet<>(), new HashSet<>(), alterTblDesc), conf);
+ }
+
+ public static boolean replCkptStatus(String dbName, Map<String, String> props, String dumpRoot)
+ throws InvalidOperationException {
+ // If ckpt property not set or empty means, bootstrap is not run on this object.
+ if ((props != null) && props.containsKey(REPL_CHECKPOINT_KEY) && !props.get(REPL_CHECKPOINT_KEY).isEmpty()) {
+ if (props.get(REPL_CHECKPOINT_KEY).equals(dumpRoot)) {
+ return true;
+ }
+ throw new InvalidOperationException("REPL LOAD with Dump: " + dumpRoot
+ + " is not allowed as the target DB: " + dbName
+ + " is already bootstrap loaded by another Dump " + props.get(REPL_CHECKPOINT_KEY));
+ }
+ return false;
+ }
+}
http://git-wip-us.apache.org/repos/asf/hive/blob/67b0a67c/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/util/TaskTracker.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/util/TaskTracker.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/util/TaskTracker.java
new file mode 100644
index 0000000..1d01bc9
--- /dev/null
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/util/TaskTracker.java
@@ -0,0 +1,145 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hive.ql.exec.repl.util;
+
+import org.apache.hadoop.hive.ql.exec.Task;
+import org.apache.hadoop.hive.ql.exec.repl.bootstrap.load.ReplicationState;
+import org.apache.hadoop.hive.ql.exec.util.DAGTraversal;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.Serializable;
+import java.util.ArrayList;
+import java.util.List;
+
+/**
+ * This class will be responsible to track how many tasks have been created,
+ * organization of tasks such that after the number of tasks for next execution are created
+ * we create a dependency collection task(DCT) -> another bootstrap task,
+ * and then add DCT as dependent to all existing tasks that are created so the cycle can continue.
+ */
+public class TaskTracker {
+ private static Logger LOG = LoggerFactory.getLogger(TaskTracker.class);
+ /**
+ * used to identify the list of tasks at root level for a given level like table / db / partition.
+ * this does not include the task dependency notion of "table tasks < ---- partition task"
+ */
+ private final List<Task<? extends Serializable>> tasks = new ArrayList<>();
+ private ReplicationState replicationState = null;
+ // since tasks themselves can be graphs we want to limit the number of created
+ // tasks including all of dependencies.
+ private int numberOfTasks = 0;
+ private final int maxTasksAllowed;
+
+ public TaskTracker(int defaultMaxTasks) {
+ maxTasksAllowed = defaultMaxTasks;
+ }
+
+ public TaskTracker(TaskTracker existing) {
+ maxTasksAllowed = existing.maxTasksAllowed - existing.numberOfTasks;
+ }
+
+ /**
+ * this method is used to identify all the tasks in a graph.
+ * the graph however might get created in a disjoint fashion, in which case we can just update
+ * the number of tasks using the "update" method.
+ */
+ public void addTask(Task<? extends Serializable> task) {
+ tasks.add(task);
+
+ List <Task<? extends Serializable>> visited = new ArrayList<>();
+ updateTaskCount(task, visited);
+ }
+
+ public void addTaskList(List <Task<? extends Serializable>> taskList) {
+ List <Task<? extends Serializable>> visited = new ArrayList<>();
+ for (Task<? extends Serializable> task : taskList) {
+ if (!visited.contains(task)) {
+ tasks.add(task);
+ updateTaskCount(task, visited);
+ }
+ }
+ }
+
+ // This method is used to traverse the DAG created in tasks list and add the dependent task to
+ // the tail of each task chain.
+ public void addDependentTask(Task<? extends Serializable> dependent) {
+ if (tasks.isEmpty()) {
+ addTask(dependent);
+ } else {
+ DAGTraversal.traverse(tasks, new AddDependencyToLeaves(dependent));
+
+ List<Task<? extends Serializable>> visited = new ArrayList<>();
+ updateTaskCount(dependent, visited);
+ }
+ }
+
+ private void updateTaskCount(Task<? extends Serializable> task,
+ List <Task<? extends Serializable>> visited) {
+ numberOfTasks += 1;
+ visited.add(task);
+ if (task.getChildTasks() != null) {
+ for (Task<? extends Serializable> childTask : task.getChildTasks()) {
+ if (visited.contains(childTask)) {
+ continue;
+ }
+ updateTaskCount(childTask, visited);
+ }
+ }
+ }
+
+ public boolean canAddMoreTasks() {
+ return numberOfTasks < maxTasksAllowed;
+ }
+
+ public boolean hasTasks() {
+ return numberOfTasks != 0;
+ }
+
+ public void update(TaskTracker withAnother) {
+ numberOfTasks += withAnother.numberOfTasks;
+ if (withAnother.hasReplicationState()) {
+ this.replicationState = withAnother.replicationState;
+ }
+ }
+
+ public void setReplicationState(ReplicationState state) {
+ this.replicationState = state;
+ }
+
+ public boolean hasReplicationState() {
+ return replicationState != null;
+ }
+
+ public ReplicationState replicationState() {
+ return replicationState;
+ }
+
+ public List<Task<? extends Serializable>> tasks() {
+ return tasks;
+ }
+
+ public void debugLog(String forEventType) {
+ LOG.debug("{} event with total / root number of tasks:{}/{}", forEventType, numberOfTasks,
+ tasks.size());
+ }
+
+ public int numberOfTasks() {
+ return numberOfTasks;
+ }
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/hive/blob/67b0a67c/ql/src/java/org/apache/hadoop/hive/ql/optimizer/QueryPlanPostProcessor.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/optimizer/QueryPlanPostProcessor.java b/ql/src/java/org/apache/hadoop/hive/ql/optimizer/QueryPlanPostProcessor.java
index a91f45e..cf54aa3 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/optimizer/QueryPlanPostProcessor.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/optimizer/QueryPlanPostProcessor.java
@@ -22,7 +22,7 @@ import org.apache.hadoop.hive.ql.exec.Operator;
import org.apache.hadoop.hive.ql.exec.OperatorUtils;
import org.apache.hadoop.hive.ql.exec.Task;
import org.apache.hadoop.hive.ql.exec.repl.ReplStateLogWork;
-import org.apache.hadoop.hive.ql.exec.repl.bootstrap.ReplLoadWork;
+import org.apache.hadoop.hive.ql.exec.repl.ReplLoadWork;
import org.apache.hadoop.hive.ql.io.AcidUtils;
import org.apache.hadoop.hive.ql.parse.GenTezProcContext;
import org.apache.hadoop.hive.ql.parse.GenTezWork;
http://git-wip-us.apache.org/repos/asf/hive/blob/67b0a67c/ql/src/java/org/apache/hadoop/hive/ql/parse/EximUtil.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/EximUtil.java b/ql/src/java/org/apache/hadoop/hive/ql/parse/EximUtil.java
index 0a5ecf9..0a535d1 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/parse/EximUtil.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/EximUtil.java
@@ -30,7 +30,7 @@ import org.apache.hadoop.hive.metastore.api.FieldSchema;
import org.apache.hadoop.hive.ql.Context;
import org.apache.hadoop.hive.ql.ErrorMsg;
import org.apache.hadoop.hive.ql.exec.Task;
-import org.apache.hadoop.hive.ql.exec.repl.ReplUtils;
+import org.apache.hadoop.hive.ql.exec.repl.util.ReplUtils;
import org.apache.hadoop.hive.ql.hooks.ReadEntity;
import org.apache.hadoop.hive.ql.hooks.WriteEntity;
import org.apache.hadoop.hive.ql.metadata.Hive;
http://git-wip-us.apache.org/repos/asf/hive/blob/67b0a67c/ql/src/java/org/apache/hadoop/hive/ql/parse/ReplicationSemanticAnalyzer.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/ReplicationSemanticAnalyzer.java b/ql/src/java/org/apache/hadoop/hive/ql/parse/ReplicationSemanticAnalyzer.java
index 5aeae16..6ed792c 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/parse/ReplicationSemanticAnalyzer.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/ReplicationSemanticAnalyzer.java
@@ -18,7 +18,6 @@
package org.apache.hadoop.hive.ql.parse;
import org.antlr.runtime.tree.Tree;
-import org.apache.commons.lang3.StringUtils;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
@@ -30,33 +29,17 @@ import org.apache.hadoop.hive.ql.QueryState;
import org.apache.hadoop.hive.ql.exec.Task;
import org.apache.hadoop.hive.ql.exec.TaskFactory;
import org.apache.hadoop.hive.ql.exec.repl.ReplDumpWork;
-import org.apache.hadoop.hive.ql.exec.repl.ReplStateLogWork;
-import org.apache.hadoop.hive.ql.exec.repl.bootstrap.ReplLoadWork;
+import org.apache.hadoop.hive.ql.exec.repl.ReplLoadWork;
import org.apache.hadoop.hive.ql.hooks.ReadEntity;
import org.apache.hadoop.hive.ql.metadata.Hive;
import org.apache.hadoop.hive.ql.metadata.HiveException;
import org.apache.hadoop.hive.ql.metadata.Table;
-import org.apache.hadoop.hive.ql.parse.repl.DumpType;
-import org.apache.hadoop.hive.ql.parse.repl.ReplLogger;
import org.apache.hadoop.hive.ql.parse.repl.dump.Utils;
import org.apache.hadoop.hive.ql.parse.repl.load.DumpMetaData;
-import org.apache.hadoop.hive.ql.parse.repl.load.EventDumpDirComparator;
-import org.apache.hadoop.hive.ql.parse.repl.load.UpdatedMetaDataTracker;
-import org.apache.hadoop.hive.ql.parse.repl.load.log.IncrementalLoadLogger;
-import org.apache.hadoop.hive.ql.parse.repl.load.message.MessageHandler;
-import org.apache.hadoop.hive.ql.plan.AlterDatabaseDesc;
-import org.apache.hadoop.hive.ql.plan.AlterTableDesc;
-import org.apache.hadoop.hive.ql.plan.DDLWork;
-import org.apache.hadoop.hive.ql.plan.DependencyCollectionWork;
import org.apache.hadoop.hive.ql.plan.PlanUtils;
-import org.apache.hadoop.hive.ql.stats.StatsUtils;
import java.io.FileNotFoundException;
-import java.io.Serializable;
-import java.util.ArrayList;
-import java.util.Arrays;
import java.util.Collections;
-import java.util.HashMap;
import java.util.List;
import java.util.Map;
@@ -265,45 +248,6 @@ public class ReplicationSemanticAnalyzer extends BaseSemanticAnalyzer {
}
}
- private boolean isEventNotReplayed(Map<String, String> params, FileStatus dir, DumpType dumpType) {
- if (params != null && (params.containsKey(ReplicationSpec.KEY.CURR_STATE_ID.toString()))) {
- String replLastId = params.get(ReplicationSpec.KEY.CURR_STATE_ID.toString());
- if (Long.parseLong(replLastId) >= Long.parseLong(dir.getPath().getName())) {
- LOG.debug("Event " + dumpType + " with replId " + Long.parseLong(dir.getPath().getName())
- + " is already replayed. LastReplId - " + Long.parseLong(replLastId));
- return false;
- }
- }
- return true;
- }
-
- private boolean shouldReplayEvent(FileStatus dir, DumpType dumpType) throws SemanticException {
- // if database itself is null then we can not filter out anything.
- if (dbNameOrPattern == null || dbNameOrPattern.isEmpty()) {
- return true;
- } else if ((tblNameOrPattern == null) || (tblNameOrPattern.isEmpty())) {
- Database database;
- try {
- database = Hive.get().getDatabase(dbNameOrPattern);
- return isEventNotReplayed(database.getParameters(), dir, dumpType);
- } catch (HiveException e) {
- //may be the db is getting created in this load
- LOG.debug("failed to get the database " + dbNameOrPattern);
- return true;
- }
- } else {
- Table tbl;
- try {
- tbl = Hive.get().getTable(dbNameOrPattern, tblNameOrPattern);
- return isEventNotReplayed(tbl.getParameters(), dir, dumpType);
- } catch (HiveException e) {
- // may be the table is getting created in this load
- LOG.debug("failed to get the table " + dbNameOrPattern + "." + tblNameOrPattern);
- return true;
- }
- }
- }
-
/*
* Example dump dirs we need to be able to handle :
*
@@ -395,7 +339,7 @@ public class ReplicationSemanticAnalyzer extends BaseSemanticAnalyzer {
if ((!evDump) && (tblNameOrPattern != null) && !(tblNameOrPattern.isEmpty())) {
ReplLoadWork replLoadWork = new ReplLoadWork(conf, loadPath.toString(), dbNameOrPattern,
- tblNameOrPattern, queryState.getLineageState());
+ tblNameOrPattern, queryState.getLineageState(), false);
rootTasks.add(TaskFactory.get(replLoadWork, conf));
return;
}
@@ -406,236 +350,15 @@ public class ReplicationSemanticAnalyzer extends BaseSemanticAnalyzer {
return;
}
- FileStatus[] dirsInLoadPath = fs.listStatus(loadPath, EximUtil.getDirectoryFilter(fs));
-
- if ((dirsInLoadPath == null) || (dirsInLoadPath.length == 0)) {
- throw new IllegalArgumentException("No data to load in path " + loadPath.toUri().toString());
- }
-
- if (!evDump){
- // not an event dump, not a table dump - thus, a db dump
- if ((dbNameOrPattern != null) && (dirsInLoadPath.length > 1)) {
- LOG.debug("Found multiple dirs when we expected 1:");
- for (FileStatus d : dirsInLoadPath) {
- LOG.debug("> " + d.getPath().toUri().toString());
- }
- throw new IllegalArgumentException(
- "Multiple dirs in "
- + loadPath.toUri().toString()
- + " does not correspond to REPL LOAD expecting to load to a singular destination point.");
- }
-
- ReplLoadWork replLoadWork = new ReplLoadWork(conf, loadPath.toString(), dbNameOrPattern,
- queryState.getLineageState());
- rootTasks.add(TaskFactory.get(replLoadWork, conf));
- //
- // for (FileStatus dir : dirsInLoadPath) {
- // analyzeDatabaseLoad(dbNameOrPattern, fs, dir);
- // }
- } else {
- // Event dump, each sub-dir is an individual event dump.
- // We need to guarantee that the directory listing we got is in order of evid.
- Arrays.sort(dirsInLoadPath, new EventDumpDirComparator());
-
- Task<? extends Serializable> evTaskRoot = TaskFactory.get(new DependencyCollectionWork());
- Task<? extends Serializable> taskChainTail = evTaskRoot;
-
- ReplLogger replLogger = new IncrementalLoadLogger(dbNameOrPattern,
- loadPath.toString(), dirsInLoadPath.length);
-
- for (FileStatus dir : dirsInLoadPath){
- String locn = dir.getPath().toUri().toString();
- DumpMetaData eventDmd = new DumpMetaData(new Path(locn), conf);
-
- if (!shouldReplayEvent(dir, eventDmd.getDumpType())) {
- continue;
- }
-
- LOG.debug("Loading event from {} to {}.{}", dir.getPath().toUri(), dbNameOrPattern, tblNameOrPattern);
-
- // event loads will behave similar to table loads, with one crucial difference
- // precursor order is strict, and each event must be processed after the previous one.
- // The way we handle this strict order is as follows:
- // First, we start with a taskChainTail which is a dummy noop task (a DependecyCollectionTask)
- // at the head of our event chain. For each event we process, we tell analyzeTableLoad to
- // create tasks that use the taskChainTail as a dependency. Then, we collect all those tasks
- // and introduce a new barrier task(also a DependencyCollectionTask) which depends on all
- // these tasks. Then, this barrier task becomes our new taskChainTail. Thus, we get a set of
- // tasks as follows:
- //
- // --->ev1.task1-- --->ev2.task1--
- // / \ / \
- // evTaskRoot-->*---->ev1.task2---*--> ev1.barrierTask-->*---->ev2.task2---*->evTaskChainTail
- // \ /
- // --->ev1.task3--
- //
- // Once this entire chain is generated, we add evTaskRoot to rootTasks, so as to execute the
- // entire chain
-
- MessageHandler.Context context = new MessageHandler.Context(dbNameOrPattern,
- tblNameOrPattern, locn, taskChainTail,
- eventDmd, conf, db, ctx, LOG);
- List<Task<? extends Serializable>> evTasks = analyzeEventLoad(context);
-
- if ((evTasks != null) && (!evTasks.isEmpty())){
- ReplStateLogWork replStateLogWork = new ReplStateLogWork(replLogger,
- dir.getPath().getName(),
- eventDmd.getDumpType().toString());
- Task<? extends Serializable> barrierTask = TaskFactory.get(replStateLogWork);
- for (Task<? extends Serializable> t : evTasks){
- t.addDependentTask(barrierTask);
- LOG.debug("Added {}:{} as a precursor of barrier task {}:{}",
- t.getClass(), t.getId(), barrierTask.getClass(), barrierTask.getId());
- }
- LOG.debug("Updated taskChainTail from {}:{} to {}:{}",
- taskChainTail.getClass(), taskChainTail.getId(), barrierTask.getClass(), barrierTask.getId());
- taskChainTail = barrierTask;
- }
- }
-
- // If any event is there and db name is known, then dump the start and end logs
- if (!evTaskRoot.equals(taskChainTail)) {
- Map<String, String> dbProps = new HashMap<>();
- dbProps.put(ReplicationSpec.KEY.CURR_STATE_ID.toString(), String.valueOf(dmd.getEventTo()));
- ReplStateLogWork replStateLogWork = new ReplStateLogWork(replLogger, dbProps);
- Task<? extends Serializable> barrierTask = TaskFactory.get(replStateLogWork, conf);
- taskChainTail.addDependentTask(barrierTask);
- LOG.debug("Added {}:{} as a precursor of barrier task {}:{}",
- taskChainTail.getClass(), taskChainTail.getId(),
- barrierTask.getClass(), barrierTask.getId());
-
- replLogger.startLog();
- }
- rootTasks.add(evTaskRoot);
- }
-
+ ReplLoadWork replLoadWork = new ReplLoadWork(conf, loadPath.toString(), dbNameOrPattern,
+ tblNameOrPattern, queryState.getLineageState(), evDump);
+ rootTasks.add(TaskFactory.get(replLoadWork, conf));
} catch (Exception e) {
// TODO : simple wrap & rethrow for now, clean up with error codes
throw new SemanticException(e.getMessage(), e);
}
}
- private List<Task<? extends Serializable>> analyzeEventLoad(
- MessageHandler.Context context)
- throws SemanticException {
- MessageHandler messageHandler = context.dmd.getDumpType().handler();
- List<Task<? extends Serializable>> tasks = messageHandler.handle(context);
-
- if (context.precursor != null) {
- for (Task<? extends Serializable> t : tasks) {
- context.precursor.addDependentTask(t);
- LOG.debug("Added {}:{} as a precursor of {}:{}",
- context.precursor.getClass(), context.precursor.getId(), t.getClass(), t.getId());
- }
- }
-
- inputs.addAll(messageHandler.readEntities());
- outputs.addAll(messageHandler.writeEntities());
- return addUpdateReplStateTasks(StringUtils.isEmpty(context.tableName),
- messageHandler.getUpdatedMetadata(), tasks);
- }
-
- private Task<? extends Serializable> tableUpdateReplStateTask(
- String dbName,
- String tableName,
- Map<String, String> partSpec,
- String replState,
- Task<? extends Serializable> preCursor) throws SemanticException {
- HashMap<String, String> mapProp = new HashMap<>();
- mapProp.put(ReplicationSpec.KEY.CURR_STATE_ID.toString(), replState);
-
- AlterTableDesc alterTblDesc = new AlterTableDesc(
- AlterTableDesc.AlterTableTypes.ADDPROPS, new ReplicationSpec(replState, replState));
- alterTblDesc.setProps(mapProp);
- alterTblDesc.setOldName(StatsUtils.getFullyQualifiedTableName(dbName, tableName));
- alterTblDesc.setPartSpec((HashMap<String, String>)partSpec);
-
- Task<? extends Serializable> updateReplIdTask = TaskFactory.get(
- new DDLWork(inputs, outputs, alterTblDesc), conf);
-
- // Link the update repl state task with dependency collection task
- if (preCursor != null) {
- preCursor.addDependentTask(updateReplIdTask);
- LOG.debug("Added {}:{} as a precursor of {}:{}",
- preCursor.getClass(), preCursor.getId(),
- updateReplIdTask.getClass(), updateReplIdTask.getId());
- }
- return updateReplIdTask;
- }
-
- private Task<? extends Serializable> dbUpdateReplStateTask(
- String dbName,
- String replState,
- Task<? extends Serializable> preCursor) {
- HashMap<String, String> mapProp = new HashMap<>();
- mapProp.put(ReplicationSpec.KEY.CURR_STATE_ID.toString(), replState);
-
- AlterDatabaseDesc alterDbDesc = new AlterDatabaseDesc(
- dbName, mapProp, new ReplicationSpec(replState, replState));
- Task<? extends Serializable> updateReplIdTask = TaskFactory.get(
- new DDLWork(inputs, outputs, alterDbDesc), conf);
-
- // Link the update repl state task with dependency collection task
- if (preCursor != null) {
- preCursor.addDependentTask(updateReplIdTask);
- LOG.debug("Added {}:{} as a precursor of {}:{}",
- preCursor.getClass(), preCursor.getId(),
- updateReplIdTask.getClass(), updateReplIdTask.getId());
- }
- return updateReplIdTask;
- }
-
- private List<Task<? extends Serializable>> addUpdateReplStateTasks(
- boolean isDatabaseLoad,
- UpdatedMetaDataTracker updatedMetadata,
- List<Task<? extends Serializable>> importTasks) throws SemanticException {
- String replState = updatedMetadata.getReplicationState();
- String dbName = updatedMetadata.getDatabase();
- String tableName = updatedMetadata.getTable();
-
- // If no import tasks generated by the event or no table updated for table level load, then no
- // need to update the repl state to any object.
- if (importTasks.isEmpty() || (!isDatabaseLoad && (tableName == null))) {
- LOG.debug("No objects need update of repl state: Either 0 import tasks or table level load");
- return importTasks;
- }
-
- // Create a barrier task for dependency collection of import tasks
- Task<? extends Serializable> barrierTask = TaskFactory.get(new DependencyCollectionWork());
-
- // Link import tasks to the barrier task which will in-turn linked with repl state update tasks
- for (Task<? extends Serializable> t : importTasks){
- t.addDependentTask(barrierTask);
- LOG.debug("Added {}:{} as a precursor of barrier task {}:{}",
- t.getClass(), t.getId(), barrierTask.getClass(), barrierTask.getId());
- }
-
- List<Task<? extends Serializable>> tasks = new ArrayList<>();
- Task<? extends Serializable> updateReplIdTask;
-
- // If any partition is updated, then update repl state in partition object
- for (final Map<String, String> partSpec : updatedMetadata.getPartitions()) {
- updateReplIdTask = tableUpdateReplStateTask(dbName, tableName, partSpec, replState, barrierTask);
- tasks.add(updateReplIdTask);
- }
-
- if (tableName != null) {
- // If any table/partition is updated, then update repl state in table object
- updateReplIdTask = tableUpdateReplStateTask(dbName, tableName, null, replState, barrierTask);
- tasks.add(updateReplIdTask);
- }
-
- // For table level load, need not update replication state for the database
- if (isDatabaseLoad) {
- // If any table/partition is updated, then update repl state in db object
- updateReplIdTask = dbUpdateReplStateTask(dbName, replState, barrierTask);
- tasks.add(updateReplIdTask);
- }
-
- // At least one task would have been added to update the repl state
- return tasks;
- }
-
// REPL STATUS
private void initReplStatus(ASTNode ast) throws SemanticException{
dbNameOrPattern = PlanUtils.stripQuotes(ast.getChild(0).getText());
http://git-wip-us.apache.org/repos/asf/hive/blob/67b0a67c/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/io/PartitionSerializer.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/io/PartitionSerializer.java b/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/io/PartitionSerializer.java
index 9fdf742..ecde3ce 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/io/PartitionSerializer.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/io/PartitionSerializer.java
@@ -19,7 +19,7 @@ package org.apache.hadoop.hive.ql.parse.repl.dump.io;
import org.apache.hadoop.hive.metastore.api.Partition;
import org.apache.hadoop.hive.ql.ErrorMsg;
-import org.apache.hadoop.hive.ql.exec.repl.ReplUtils;
+import org.apache.hadoop.hive.ql.exec.repl.util.ReplUtils;
import org.apache.hadoop.hive.ql.parse.ReplicationSpec;
import org.apache.hadoop.hive.ql.parse.SemanticException;
import org.apache.thrift.TException;
http://git-wip-us.apache.org/repos/asf/hive/blob/67b0a67c/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/io/TableSerializer.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/io/TableSerializer.java b/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/io/TableSerializer.java
index 70f4fed..f05c231 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/io/TableSerializer.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/io/TableSerializer.java
@@ -21,7 +21,7 @@ import org.apache.hadoop.hive.conf.HiveConf;
import org.apache.hadoop.hive.metastore.TableType;
import org.apache.hadoop.hive.metastore.api.Table;
import org.apache.hadoop.hive.ql.ErrorMsg;
-import org.apache.hadoop.hive.ql.exec.repl.ReplUtils;
+import org.apache.hadoop.hive.ql.exec.repl.util.ReplUtils;
import org.apache.hadoop.hive.ql.metadata.Partition;
import org.apache.hadoop.hive.ql.parse.ReplicationSpec;
import org.apache.hadoop.hive.ql.parse.SemanticException;
http://git-wip-us.apache.org/repos/asf/hive/blob/67b0a67c/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/load/message/AlterDatabaseHandler.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/load/message/AlterDatabaseHandler.java b/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/load/message/AlterDatabaseHandler.java
index b59cdf2..e68e055 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/load/message/AlterDatabaseHandler.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/load/message/AlterDatabaseHandler.java
@@ -22,7 +22,7 @@ import org.apache.hadoop.hive.metastore.api.Database;
import org.apache.hadoop.hive.metastore.messaging.AlterDatabaseMessage;
import org.apache.hadoop.hive.ql.exec.Task;
import org.apache.hadoop.hive.ql.exec.TaskFactory;
-import org.apache.hadoop.hive.ql.exec.repl.ReplUtils;
+import org.apache.hadoop.hive.ql.exec.repl.util.ReplUtils;
import org.apache.hadoop.hive.ql.parse.ReplicationSpec;
import org.apache.hadoop.hive.ql.parse.SemanticException;
import org.apache.hadoop.hive.ql.plan.AlterDatabaseDesc;
http://git-wip-us.apache.org/repos/asf/hive/blob/67b0a67c/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/load/message/DropPartitionHandler.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/load/message/DropPartitionHandler.java b/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/load/message/DropPartitionHandler.java
index 939884d..4a2fdd2 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/load/message/DropPartitionHandler.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/load/message/DropPartitionHandler.java
@@ -20,7 +20,7 @@ package org.apache.hadoop.hive.ql.parse.repl.load.message;
import org.apache.hadoop.hive.metastore.messaging.DropPartitionMessage;
import org.apache.hadoop.hive.ql.exec.Task;
import org.apache.hadoop.hive.ql.exec.TaskFactory;
-import org.apache.hadoop.hive.ql.exec.repl.ReplUtils;
+import org.apache.hadoop.hive.ql.exec.repl.util.ReplUtils;
import org.apache.hadoop.hive.ql.metadata.Table;
import org.apache.hadoop.hive.ql.parse.SemanticException;
import org.apache.hadoop.hive.ql.plan.DDLWork;
http://git-wip-us.apache.org/repos/asf/hive/blob/67b0a67c/ql/src/test/org/apache/hadoop/hive/ql/exec/repl/bootstrap/AddDependencyToLeavesTest.java
----------------------------------------------------------------------
diff --git a/ql/src/test/org/apache/hadoop/hive/ql/exec/repl/bootstrap/AddDependencyToLeavesTest.java b/ql/src/test/org/apache/hadoop/hive/ql/exec/repl/bootstrap/AddDependencyToLeavesTest.java
index 309debe..166cf87 100644
--- a/ql/src/test/org/apache/hadoop/hive/ql/exec/repl/bootstrap/AddDependencyToLeavesTest.java
+++ b/ql/src/test/org/apache/hadoop/hive/ql/exec/repl/bootstrap/AddDependencyToLeavesTest.java
@@ -21,6 +21,7 @@ package org.apache.hadoop.hive.ql.exec.repl.bootstrap;
import org.apache.hadoop.hive.conf.HiveConf;
import org.apache.hadoop.hive.ql.exec.Task;
import org.apache.hadoop.hive.ql.exec.TaskFactory;
+import org.apache.hadoop.hive.ql.exec.repl.util.AddDependencyToLeaves;
import org.apache.hadoop.hive.ql.exec.util.DAGTraversal;
import org.apache.hadoop.hive.ql.plan.DependencyCollectionWork;
import org.junit.Test;
http://git-wip-us.apache.org/repos/asf/hive/blob/67b0a67c/ql/src/test/org/apache/hadoop/hive/ql/exec/repl/bootstrap/load/TestTaskTracker.java
----------------------------------------------------------------------
diff --git a/ql/src/test/org/apache/hadoop/hive/ql/exec/repl/bootstrap/load/TestTaskTracker.java b/ql/src/test/org/apache/hadoop/hive/ql/exec/repl/bootstrap/load/TestTaskTracker.java
index 32b4a72..41ab447 100644
--- a/ql/src/test/org/apache/hadoop/hive/ql/exec/repl/bootstrap/load/TestTaskTracker.java
+++ b/ql/src/test/org/apache/hadoop/hive/ql/exec/repl/bootstrap/load/TestTaskTracker.java
@@ -18,6 +18,7 @@
package org.apache.hadoop.hive.ql.exec.repl.bootstrap.load;
import org.apache.hadoop.hive.ql.exec.Task;
+import org.apache.hadoop.hive.ql.exec.repl.util.TaskTracker;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.mockito.Mock;
[2/2] hive git commit: HIVE-19829: Incremental replication load
should create tasks in execution phase rather than semantic phase (Mahesh
Kumar Behera, reviewed by Sankar Hariappan)
Posted by sa...@apache.org.
HIVE-19829: Incremental replication load should create tasks in execution phase rather than semantic phase (Mahesh Kumar Behera, reviewed by Sankar Hariappan)
Project: http://git-wip-us.apache.org/repos/asf/hive/repo
Commit: http://git-wip-us.apache.org/repos/asf/hive/commit/67b0a67c
Tree: http://git-wip-us.apache.org/repos/asf/hive/tree/67b0a67c
Diff: http://git-wip-us.apache.org/repos/asf/hive/diff/67b0a67c
Branch: refs/heads/master
Commit: 67b0a67c324bd78224e0b8338eda63824d104cdc
Parents: 6a8f4cb
Author: Sankar Hariappan <sa...@apache.org>
Authored: Thu Jun 28 09:48:47 2018 +0530
Committer: Sankar Hariappan <sa...@apache.org>
Committed: Thu Jun 28 09:48:47 2018 +0530
----------------------------------------------------------------------
.../TestReplicationScenariosAcidTables.java | 12 +-
...TestReplicationScenariosAcrossInstances.java | 112 +++++-
ql/if/queryplan.thrift | 3 +-
ql/src/gen/thrift/gen-cpp/queryplan_types.cpp | 8 +-
ql/src/gen/thrift/gen-cpp/queryplan_types.h | 3 +-
.../hadoop/hive/ql/plan/api/StageType.java | 5 +-
ql/src/gen/thrift/gen-php/Types.php | 2 +
ql/src/gen/thrift/gen-py/queryplan/ttypes.py | 3 +
ql/src/gen/thrift/gen-rb/queryplan_types.rb | 5 +-
.../apache/hadoop/hive/ql/exec/TaskFactory.java | 4 +-
.../hadoop/hive/ql/exec/repl/ReplLoadTask.java | 344 +++++++++++++++++++
.../hadoop/hive/ql/exec/repl/ReplLoadWork.java | 113 ++++++
.../hadoop/hive/ql/exec/repl/ReplUtils.java | 123 -------
.../repl/bootstrap/AddDependencyToLeaves.java | 51 ---
.../ql/exec/repl/bootstrap/ReplLoadTask.java | 319 -----------------
.../ql/exec/repl/bootstrap/ReplLoadWork.java | 88 -----
.../filesystem/BootstrapEventsIterator.java | 9 +
.../repl/bootstrap/load/LoadConstraint.java | 1 +
.../exec/repl/bootstrap/load/LoadDatabase.java | 5 +-
.../exec/repl/bootstrap/load/LoadFunction.java | 3 +-
.../exec/repl/bootstrap/load/TaskTracker.java | 135 --------
.../bootstrap/load/table/LoadPartitions.java | 6 +-
.../repl/bootstrap/load/table/LoadTable.java | 6 +-
.../repl/bootstrap/load/table/TableContext.java | 2 +-
.../IncrementalLoadEventsIterator.java | 73 ++++
.../IncrementalLoadTasksBuilder.java | 311 +++++++++++++++++
.../exec/repl/util/AddDependencyToLeaves.java | 51 +++
.../hive/ql/exec/repl/util/ReplUtils.java | 124 +++++++
.../hive/ql/exec/repl/util/TaskTracker.java | 145 ++++++++
.../ql/optimizer/QueryPlanPostProcessor.java | 2 +-
.../apache/hadoop/hive/ql/parse/EximUtil.java | 2 +-
.../ql/parse/ReplicationSemanticAnalyzer.java | 287 +---------------
.../parse/repl/dump/io/PartitionSerializer.java | 2 +-
.../ql/parse/repl/dump/io/TableSerializer.java | 2 +-
.../repl/load/message/AlterDatabaseHandler.java | 2 +-
.../repl/load/message/DropPartitionHandler.java | 2 +-
.../bootstrap/AddDependencyToLeavesTest.java | 1 +
.../repl/bootstrap/load/TestTaskTracker.java | 1 +
38 files changed, 1316 insertions(+), 1051 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/hive/blob/67b0a67c/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestReplicationScenariosAcidTables.java
----------------------------------------------------------------------
diff --git a/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestReplicationScenariosAcidTables.java b/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestReplicationScenariosAcidTables.java
index a1498ca..4892486 100644
--- a/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestReplicationScenariosAcidTables.java
+++ b/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestReplicationScenariosAcidTables.java
@@ -345,7 +345,7 @@ public class TestReplicationScenariosAcidTables {
WarehouseInstance.Tuple incrementalDump =
primary.dump(primaryDbName, bootStrapDump.lastReplicationId);
- replicaNonAcid.loadFailure(replicatedDbName, incrementalDump.dumpLocation)
+ replicaNonAcid.runFailure("REPL LOAD " + replicatedDbName + " FROM '" + incrementalDump.dumpLocation + "'")
.run("REPL STATUS " + replicatedDbName)
.verifyResult(bootStrapDump.lastReplicationId);
}
@@ -395,10 +395,8 @@ public class TestReplicationScenariosAcidTables {
replica.run("use " + replicatedDbName)
.run("repl status " + replicatedDbName)
.verifyResult("null")
- .run("show tables")
- .verifyResults(new String[] { "t1" })
- .run("select id from t1")
- .verifyResults(Arrays.asList("1"));
+ .run("show tables like t2")
+ .verifyResults(new String[] { });
// Retry with different dump should fail.
replica.loadFailure(replicatedDbName, tuple2.dumpLocation);
@@ -413,10 +411,6 @@ public class TestReplicationScenariosAcidTables {
LOG.warn("Verifier - DB: " + String.valueOf(args.dbName));
return false;
}
- if (args.tblName != null) {
- LOG.warn("Verifier - Table: " + String.valueOf(args.tblName));
- return args.tblName.equals("t2");
- }
return true;
}
};
http://git-wip-us.apache.org/repos/asf/hive/blob/67b0a67c/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestReplicationScenariosAcrossInstances.java
----------------------------------------------------------------------
diff --git a/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestReplicationScenariosAcrossInstances.java b/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestReplicationScenariosAcrossInstances.java
index 0f67174..d0608cf 100644
--- a/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestReplicationScenariosAcrossInstances.java
+++ b/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestReplicationScenariosAcrossInstances.java
@@ -25,7 +25,7 @@ import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hdfs.MiniDFSCluster;
import org.apache.hadoop.hive.conf.HiveConf;
-import org.apache.hadoop.hive.ql.exec.repl.ReplUtils;
+import org.apache.hadoop.hive.ql.exec.repl.util.ReplUtils;
import org.apache.hadoop.hive.ql.parse.repl.PathBuilder;
import org.apache.hadoop.hive.ql.util.DependencyResolver;
import org.apache.hadoop.hive.shims.Utils;
@@ -45,6 +45,8 @@ import org.junit.rules.TestName;
import org.junit.rules.TestRule;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import org.apache.hadoop.hive.ql.exec.repl.incremental.IncrementalLoadTasksBuilder;
+import org.junit.Assert;
import java.io.IOException;
import java.net.URI;
@@ -77,10 +79,11 @@ public class TestReplicationScenariosAcrossInstances {
protected static final Logger LOG = LoggerFactory.getLogger(TestReplicationScenarios.class);
private static WarehouseInstance primary, replica;
private String primaryDbName, replicatedDbName;
+ private static HiveConf conf;
@BeforeClass
public static void classLevelSetup() throws Exception {
- Configuration conf = new Configuration();
+ conf = new HiveConf(TestReplicationScenarios.class);
conf.set("dfs.client.use.datanode.hostname", "true");
conf.set("hadoop.proxyuser." + Utils.getUGI().getShortUserName() + ".hosts", "*");
MiniDFSCluster miniDFSCluster =
@@ -876,6 +879,91 @@ public class TestReplicationScenariosAcrossInstances {
}
@Test
+ public void testIfCkptSetForObjectsByBootstrapReplLoad() throws Throwable {
+ WarehouseInstance.Tuple tuple = primary
+ .run("use " + primaryDbName)
+ .run("create table t1 (id int)")
+ .run("insert into table t1 values (10)")
+ .run("create table t2 (place string) partitioned by (country string)")
+ .run("insert into table t2 partition(country='india') values ('bangalore')")
+ .run("insert into table t2 partition(country='uk') values ('london')")
+ .run("insert into table t2 partition(country='us') values ('sfo')")
+ .dump(primaryDbName, null);
+
+ replica.load(replicatedDbName, tuple.dumpLocation)
+ .run("use " + replicatedDbName)
+ .run("repl status " + replicatedDbName)
+ .verifyResult(tuple.lastReplicationId)
+ .run("show tables")
+ .verifyResults(new String[] { "t1", "t2" })
+ .run("select country from t2")
+ .verifyResults(Arrays.asList("india", "uk", "us"));
+
+ Database db = replica.getDatabase(replicatedDbName);
+ verifyIfCkptSet(db.getParameters(), tuple.dumpLocation);
+ Table t1 = replica.getTable(replicatedDbName, "t1");
+ verifyIfCkptSet(t1.getParameters(), tuple.dumpLocation);
+ Table t2 = replica.getTable(replicatedDbName, "t2");
+ verifyIfCkptSet(t2.getParameters(), tuple.dumpLocation);
+ Partition india = replica.getPartition(replicatedDbName, "t2", Collections.singletonList("india"));
+ verifyIfCkptSet(india.getParameters(), tuple.dumpLocation);
+ Partition us = replica.getPartition(replicatedDbName, "t2", Collections.singletonList("us"));
+ verifyIfCkptSet(us.getParameters(), tuple.dumpLocation);
+ Partition uk = replica.getPartition(replicatedDbName, "t2", Collections.singletonList("uk"));
+ verifyIfCkptSet(uk.getParameters(), tuple.dumpLocation);
+ }
+
+ @Test
+ public void testIncrementalDumpMultiIteration() throws Throwable {
+ WarehouseInstance.Tuple bootstrapTuple = primary.dump(primaryDbName, null);
+
+ replica.load(replicatedDbName, bootstrapTuple.dumpLocation)
+ .status(replicatedDbName)
+ .verifyResult(bootstrapTuple.lastReplicationId);
+
+ WarehouseInstance.Tuple incremental = primary.run("use " + primaryDbName)
+ .run("create table table1 (id int) partitioned by (country string)")
+ .run("create table table2 (id int)")
+ .run("create table table3 (id int) partitioned by (country string)")
+ .run("insert into table1 partition(country='india') values(1)")
+ .run("insert into table2 values(2)")
+ .run("insert into table3 partition(country='india') values(3)")
+ .dump(primaryDbName, bootstrapTuple.lastReplicationId);
+
+ replica.load(replicatedDbName, incremental.dumpLocation, Arrays.asList("'hive.repl.approx.max.load.tasks'='10'"))
+ .status(replicatedDbName)
+ .verifyResult(incremental.lastReplicationId)
+ .run("use " + replicatedDbName)
+ .run("select id from table1")
+ .verifyResults(new String[] {"1" })
+ .run("select * from table2")
+ .verifyResults(new String[] {"2" })
+ .run("select id from table3")
+ .verifyResults(new String[] {"3" });
+ assert(IncrementalLoadTasksBuilder.getNumIteration() > 1);
+
+ incremental = primary.run("use " + primaryDbName)
+ .run("create table table5 (key int, value int) partitioned by (load_date date) " +
+ "clustered by(key) into 2 buckets stored as orc")
+ .run("create table table4 (i int, j int)")
+ .run("insert into table4 values (1,2)")
+ .dump(primaryDbName, incremental.lastReplicationId);
+
+ Path path = new Path(incremental.dumpLocation);
+ FileSystem fs = path.getFileSystem(conf);
+ FileStatus[] fileStatus = fs.listStatus(path);
+ int numEvents = fileStatus.length - 1; //one is metadata file
+
+ replica.load(replicatedDbName, incremental.dumpLocation, Arrays.asList("'hive.repl.approx.max.load.tasks'='1'"))
+ .run("use " + replicatedDbName)
+ .run("show tables")
+ .verifyResults(new String[] {"table1", "table2", "table3", "table4", "table5" })
+ .run("select i from table4")
+ .verifyResult("1");
+ Assert.assertEquals(IncrementalLoadTasksBuilder.getNumIteration(), numEvents);
+ }
+
+ @Test
public void testIfCkptAndSourceOfReplPropsIgnoredByReplDump() throws Throwable {
WarehouseInstance.Tuple tuplePrimary = primary
.run("use " + primaryDbName)
@@ -1087,9 +1175,7 @@ public class TestReplicationScenariosAcrossInstances {
replica.run("use " + replicatedDbName)
.run("repl status " + replicatedDbName)
- .verifyResult("null")
- .run("show tables")
- .verifyResults(new String[] { "t1" });
+ .verifyResult("null");
assertEquals(0, replica.getPrimaryKeyList(replicatedDbName, "t1").size());
assertEquals(0, replica.getUniqueConstraintList(replicatedDbName, "t3").size());
assertEquals(0, replica.getNotNullConstraintList(replicatedDbName, "t3").size());
@@ -1109,10 +1195,6 @@ public class TestReplicationScenariosAcrossInstances {
LOG.warn("Verifier - DB: " + String.valueOf(args.dbName) + " Func: " + String.valueOf(args.funcName));
return false;
}
- if (args.tblName != null) {
- LOG.warn("Verifier - Table: " + String.valueOf(args.tblName));
- return (args.tblName.equals("t2") || args.tblName.equals("t3"));
- }
if (args.constraintTblName != null) {
LOG.warn("Verifier - Constraint Table: " + String.valueOf(args.constraintTblName));
return (args.constraintTblName.equals("t1") || args.constraintTblName.equals("t3"));
@@ -1179,8 +1261,6 @@ public class TestReplicationScenariosAcrossInstances {
public void testBootstrapReplLoadRetryAfterFailureForPartitions() throws Throwable {
WarehouseInstance.Tuple tuple = primary
.run("use " + primaryDbName)
- .run("create table t1 (id int)")
- .run("insert into table t1 values (10)")
.run("create table t2 (place string) partitioned by (country string)")
.run("insert into table t2 partition(country='india') values ('bangalore')")
.run("insert into table t2 partition(country='uk') values ('london')")
@@ -1195,7 +1275,7 @@ public class TestReplicationScenariosAcrossInstances {
.dump(primaryDbName, null);
// Inject a behavior where REPL LOAD failed when try to load table "t2" and partition "uk".
- // So, table "t1" and "t2" will exist and partition "india" will exist, rest failed as operation failed.
+ // So, table "t2" will exist and partition "india" will exist, rest failed as operation failed.
BehaviourInjection<Partition, Partition> getPartitionStub
= new BehaviourInjection<Partition, Partition>() {
@Nullable
@@ -1220,9 +1300,7 @@ public class TestReplicationScenariosAcrossInstances {
.run("repl status " + replicatedDbName)
.verifyResult("null")
.run("show tables")
- .verifyResults(new String[] { "t1", "t2" })
- .run("select id from t1")
- .verifyResults(Arrays.asList("10"))
+ .verifyResults(new String[] {"t2" })
.run("select country from t2 order by country")
.verifyResults(Arrays.asList("india"))
.run("show functions like '" + replicatedDbName + "*'")
@@ -1259,9 +1337,7 @@ public class TestReplicationScenariosAcrossInstances {
.run("repl status " + replicatedDbName)
.verifyResult(tuple.lastReplicationId)
.run("show tables")
- .verifyResults(new String[] { "t1", "t2" })
- .run("select id from t1")
- .verifyResults(Arrays.asList("10"))
+ .verifyResults(new String[] { "t2" })
.run("select country from t2 order by country")
.verifyResults(Arrays.asList("india", "uk", "us"))
.run("show functions like '" + replicatedDbName + "*'")
http://git-wip-us.apache.org/repos/asf/hive/blob/67b0a67c/ql/if/queryplan.thrift
----------------------------------------------------------------------
diff --git a/ql/if/queryplan.thrift b/ql/if/queryplan.thrift
index ad778e3..d43eed3 100644
--- a/ql/if/queryplan.thrift
+++ b/ql/if/queryplan.thrift
@@ -103,7 +103,8 @@ enum StageType {
REPL_DUMP,
REPL_BOOTSTRAP_LOAD,
REPL_STATE_LOG,
- REPL_TXN
+ REPL_TXN,
+ REPL_INCREMENTAL_LOAD
}
struct Stage {
http://git-wip-us.apache.org/repos/asf/hive/blob/67b0a67c/ql/src/gen/thrift/gen-cpp/queryplan_types.cpp
----------------------------------------------------------------------
diff --git a/ql/src/gen/thrift/gen-cpp/queryplan_types.cpp b/ql/src/gen/thrift/gen-cpp/queryplan_types.cpp
index b6eb12a..73bbe3a 100644
--- a/ql/src/gen/thrift/gen-cpp/queryplan_types.cpp
+++ b/ql/src/gen/thrift/gen-cpp/queryplan_types.cpp
@@ -119,7 +119,8 @@ int _kStageTypeValues[] = {
StageType::REPL_DUMP,
StageType::REPL_BOOTSTRAP_LOAD,
StageType::REPL_STATE_LOG,
- StageType::REPL_TXN
+ StageType::REPL_TXN,
+ StageType::REPL_INCREMENTAL_LOAD
};
const char* _kStageTypeNames[] = {
"CONDITIONAL",
@@ -137,9 +138,10 @@ const char* _kStageTypeNames[] = {
"REPL_DUMP",
"REPL_BOOTSTRAP_LOAD",
"REPL_STATE_LOG",
- "REPL_TXN"
+ "REPL_TXN",
+ "REPL_INCREMENTAL_LOAD"
};
-const std::map<int, const char*> _StageType_VALUES_TO_NAMES(::apache::thrift::TEnumIterator(16, _kStageTypeValues, _kStageTypeNames), ::apache::thrift::TEnumIterator(-1, NULL, NULL));
+const std::map<int, const char*> _StageType_VALUES_TO_NAMES(::apache::thrift::TEnumIterator(17, _kStageTypeValues, _kStageTypeNames), ::apache::thrift::TEnumIterator(-1, NULL, NULL));
Adjacency::~Adjacency() throw() {
http://git-wip-us.apache.org/repos/asf/hive/blob/67b0a67c/ql/src/gen/thrift/gen-cpp/queryplan_types.h
----------------------------------------------------------------------
diff --git a/ql/src/gen/thrift/gen-cpp/queryplan_types.h b/ql/src/gen/thrift/gen-cpp/queryplan_types.h
index eb02107..04c749f 100644
--- a/ql/src/gen/thrift/gen-cpp/queryplan_types.h
+++ b/ql/src/gen/thrift/gen-cpp/queryplan_types.h
@@ -97,7 +97,8 @@ struct StageType {
REPL_DUMP = 12,
REPL_BOOTSTRAP_LOAD = 13,
REPL_STATE_LOG = 14,
- REPL_TXN = 15
+ REPL_TXN = 15,
+ REPL_INCREMENTAL_LOAD = 16
};
};
http://git-wip-us.apache.org/repos/asf/hive/blob/67b0a67c/ql/src/gen/thrift/gen-javabean/org/apache/hadoop/hive/ql/plan/api/StageType.java
----------------------------------------------------------------------
diff --git a/ql/src/gen/thrift/gen-javabean/org/apache/hadoop/hive/ql/plan/api/StageType.java b/ql/src/gen/thrift/gen-javabean/org/apache/hadoop/hive/ql/plan/api/StageType.java
index 08822b3..7eebe28 100644
--- a/ql/src/gen/thrift/gen-javabean/org/apache/hadoop/hive/ql/plan/api/StageType.java
+++ b/ql/src/gen/thrift/gen-javabean/org/apache/hadoop/hive/ql/plan/api/StageType.java
@@ -27,7 +27,8 @@ public enum StageType implements org.apache.thrift.TEnum {
REPL_DUMP(12),
REPL_BOOTSTRAP_LOAD(13),
REPL_STATE_LOG(14),
- REPL_TXN(15);
+ REPL_TXN(15),
+ REPL_INCREMENTAL_LOAD(16);
private final int value;
@@ -80,6 +81,8 @@ public enum StageType implements org.apache.thrift.TEnum {
return REPL_STATE_LOG;
case 15:
return REPL_TXN;
+ case 16:
+ return REPL_INCREMENTAL_LOAD;
default:
return null;
}
http://git-wip-us.apache.org/repos/asf/hive/blob/67b0a67c/ql/src/gen/thrift/gen-php/Types.php
----------------------------------------------------------------------
diff --git a/ql/src/gen/thrift/gen-php/Types.php b/ql/src/gen/thrift/gen-php/Types.php
index df4e41d..1a36d08 100644
--- a/ql/src/gen/thrift/gen-php/Types.php
+++ b/ql/src/gen/thrift/gen-php/Types.php
@@ -118,6 +118,7 @@ final class StageType {
const REPL_BOOTSTRAP_LOAD = 13;
const REPL_STATE_LOG = 14;
const REPL_TXN = 15;
+ const REPL_INCREMENTAL_LOAD = 16;
static public $__names = array(
0 => 'CONDITIONAL',
1 => 'COPY',
@@ -135,6 +136,7 @@ final class StageType {
13 => 'REPL_BOOTSTRAP_LOAD',
14 => 'REPL_STATE_LOG',
15 => 'REPL_TXN',
+ 16 => 'REPL_INCREMENTAL_LOAD',
);
}
http://git-wip-us.apache.org/repos/asf/hive/blob/67b0a67c/ql/src/gen/thrift/gen-py/queryplan/ttypes.py
----------------------------------------------------------------------
diff --git a/ql/src/gen/thrift/gen-py/queryplan/ttypes.py b/ql/src/gen/thrift/gen-py/queryplan/ttypes.py
index 85d39fd..c0a2204 100644
--- a/ql/src/gen/thrift/gen-py/queryplan/ttypes.py
+++ b/ql/src/gen/thrift/gen-py/queryplan/ttypes.py
@@ -164,6 +164,7 @@ class StageType:
REPL_BOOTSTRAP_LOAD = 13
REPL_STATE_LOG = 14
REPL_TXN = 15
+ REPL_INCREMENTAL_LOAD = 16
_VALUES_TO_NAMES = {
0: "CONDITIONAL",
@@ -182,6 +183,7 @@ class StageType:
13: "REPL_BOOTSTRAP_LOAD",
14: "REPL_STATE_LOG",
15: "REPL_TXN",
+ 16: "REPL_INCREMENTAL_LOAD",
}
_NAMES_TO_VALUES = {
@@ -201,6 +203,7 @@ class StageType:
"REPL_BOOTSTRAP_LOAD": 13,
"REPL_STATE_LOG": 14,
"REPL_TXN": 15,
+ "REPL_INCREMENTAL_LOAD": 16,
}
http://git-wip-us.apache.org/repos/asf/hive/blob/67b0a67c/ql/src/gen/thrift/gen-rb/queryplan_types.rb
----------------------------------------------------------------------
diff --git a/ql/src/gen/thrift/gen-rb/queryplan_types.rb b/ql/src/gen/thrift/gen-rb/queryplan_types.rb
index 6010f3d..61349a2 100644
--- a/ql/src/gen/thrift/gen-rb/queryplan_types.rb
+++ b/ql/src/gen/thrift/gen-rb/queryplan_types.rb
@@ -76,8 +76,9 @@ module StageType
REPL_BOOTSTRAP_LOAD = 13
REPL_STATE_LOG = 14
REPL_TXN = 15
- VALUE_MAP = {0 => "CONDITIONAL", 1 => "COPY", 2 => "DDL", 3 => "MAPRED", 4 => "EXPLAIN", 5 => "FETCH", 6 => "FUNC", 7 => "MAPREDLOCAL", 8 => "MOVE", 9 => "STATS", 10 => "DEPENDENCY_COLLECTION", 11 => "COLUMNSTATS", 12 => "REPL_DUMP", 13 => "REPL_BOOTSTRAP_LOAD", 14 => "REPL_STATE_LOG", 15 => "REPL_TXN"}
- VALID_VALUES = Set.new([CONDITIONAL, COPY, DDL, MAPRED, EXPLAIN, FETCH, FUNC, MAPREDLOCAL, MOVE, STATS, DEPENDENCY_COLLECTION, COLUMNSTATS, REPL_DUMP, REPL_BOOTSTRAP_LOAD, REPL_STATE_LOG, REPL_TXN]).freeze
+ REPL_INCREMENTAL_LOAD = 16
+ VALUE_MAP = {0 => "CONDITIONAL", 1 => "COPY", 2 => "DDL", 3 => "MAPRED", 4 => "EXPLAIN", 5 => "FETCH", 6 => "FUNC", 7 => "MAPREDLOCAL", 8 => "MOVE", 9 => "STATS", 10 => "DEPENDENCY_COLLECTION", 11 => "COLUMNSTATS", 12 => "REPL_DUMP", 13 => "REPL_BOOTSTRAP_LOAD", 14 => "REPL_STATE_LOG", 15 => "REPL_TXN", 16 => "REPL_INCREMENTAL_LOAD"}
+ VALID_VALUES = Set.new([CONDITIONAL, COPY, DDL, MAPRED, EXPLAIN, FETCH, FUNC, MAPREDLOCAL, MOVE, STATS, DEPENDENCY_COLLECTION, COLUMNSTATS, REPL_DUMP, REPL_BOOTSTRAP_LOAD, REPL_STATE_LOG, REPL_TXN, REPL_INCREMENTAL_LOAD]).freeze
end
class Adjacency
http://git-wip-us.apache.org/repos/asf/hive/blob/67b0a67c/ql/src/java/org/apache/hadoop/hive/ql/exec/TaskFactory.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/TaskFactory.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/TaskFactory.java
index 3a107b7..47a802f 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/TaskFactory.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/TaskFactory.java
@@ -29,8 +29,8 @@ import org.apache.hadoop.hive.ql.exec.repl.ReplDumpTask;
import org.apache.hadoop.hive.ql.exec.repl.ReplDumpWork;
import org.apache.hadoop.hive.ql.exec.repl.ReplStateLogTask;
import org.apache.hadoop.hive.ql.exec.repl.ReplStateLogWork;
-import org.apache.hadoop.hive.ql.exec.repl.bootstrap.ReplLoadTask;
-import org.apache.hadoop.hive.ql.exec.repl.bootstrap.ReplLoadWork;
+import org.apache.hadoop.hive.ql.exec.repl.ReplLoadTask;
+import org.apache.hadoop.hive.ql.exec.repl.ReplLoadWork;
import org.apache.hadoop.hive.ql.exec.spark.SparkTask;
import org.apache.hadoop.hive.ql.exec.tez.TezTask;
import org.apache.hadoop.hive.ql.io.merge.MergeFileTask;
http://git-wip-us.apache.org/repos/asf/hive/blob/67b0a67c/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/ReplLoadTask.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/ReplLoadTask.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/ReplLoadTask.java
new file mode 100644
index 0000000..76ba975
--- /dev/null
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/ReplLoadTask.java
@@ -0,0 +1,344 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hive.ql.exec.repl;
+
+import org.apache.hadoop.hive.conf.HiveConf;
+import org.apache.hadoop.hive.metastore.api.Database;
+import org.apache.hadoop.hive.ql.DriverContext;
+import org.apache.hadoop.hive.ql.ErrorMsg;
+import org.apache.hadoop.hive.ql.exec.Task;
+import org.apache.hadoop.hive.ql.exec.TaskFactory;
+import org.apache.hadoop.hive.ql.exec.repl.util.AddDependencyToLeaves;
+import org.apache.hadoop.hive.ql.exec.repl.bootstrap.events.BootstrapEvent;
+import org.apache.hadoop.hive.ql.exec.repl.bootstrap.events.ConstraintEvent;
+import org.apache.hadoop.hive.ql.exec.repl.bootstrap.events.DatabaseEvent;
+import org.apache.hadoop.hive.ql.exec.repl.bootstrap.events.FunctionEvent;
+import org.apache.hadoop.hive.ql.exec.repl.bootstrap.events.PartitionEvent;
+import org.apache.hadoop.hive.ql.exec.repl.bootstrap.events.TableEvent;
+import org.apache.hadoop.hive.ql.exec.repl.bootstrap.events.filesystem.BootstrapEventsIterator;
+import org.apache.hadoop.hive.ql.exec.repl.bootstrap.events.filesystem.ConstraintEventsIterator;
+import org.apache.hadoop.hive.ql.exec.repl.bootstrap.load.LoadConstraint;
+import org.apache.hadoop.hive.ql.exec.repl.bootstrap.load.LoadDatabase;
+import org.apache.hadoop.hive.ql.exec.repl.bootstrap.load.LoadFunction;
+import org.apache.hadoop.hive.ql.exec.repl.incremental.IncrementalLoadTasksBuilder;
+import org.apache.hadoop.hive.ql.exec.repl.util.TaskTracker;
+import org.apache.hadoop.hive.ql.exec.repl.bootstrap.load.table.LoadPartitions;
+import org.apache.hadoop.hive.ql.exec.repl.bootstrap.load.table.LoadTable;
+import org.apache.hadoop.hive.ql.exec.repl.bootstrap.load.table.TableContext;
+import org.apache.hadoop.hive.ql.exec.repl.bootstrap.load.util.Context;
+import org.apache.hadoop.hive.ql.exec.util.DAGTraversal;
+import org.apache.hadoop.hive.ql.parse.SemanticException;
+import org.apache.hadoop.hive.ql.parse.repl.ReplLogger;
+import org.apache.hadoop.hive.ql.plan.api.StageType;
+
+import java.io.Serializable;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+
+import static org.apache.hadoop.hive.ql.exec.repl.bootstrap.load.LoadDatabase.AlterDatabase;
+
+public class ReplLoadTask extends Task<ReplLoadWork> implements Serializable {
+ private final static int ZERO_TASKS = 0;
+
+ @Override
+ public String getName() {
+ return (work.isIncrementalLoad() ? "REPL_INCREMENTAL_LOAD" : "REPL_BOOTSTRAP_LOAD");
+ }
+
+ /**
+ * Provides the root Tasks created as a result of this loadTask run which will be executed
+ * by the driver. It does not track details across multiple runs of LoadTask.
+ */
+ private static class Scope {
+ boolean database = false, table = false, partition = false;
+ List<Task<? extends Serializable>> rootTasks = new ArrayList<>();
+ }
+
+ @Override
+ protected int execute(DriverContext driverContext) {
+ if (work.isIncrementalLoad()) {
+ return executeIncrementalLoad(driverContext);
+ } else {
+ return executeBootStrapLoad(driverContext);
+ }
+ }
+
+ private int executeBootStrapLoad(DriverContext driverContext) {
+ try {
+ int maxTasks = conf.getIntVar(HiveConf.ConfVars.REPL_APPROX_MAX_LOAD_TASKS);
+ Context context = new Context(work.dumpDirectory, conf, getHive(),
+ work.sessionStateLineageState, driverContext.getCtx());
+ TaskTracker loadTaskTracker = new TaskTracker(maxTasks);
+ /*
+ for now for simplicity we are doing just one directory ( one database ), come back to use
+ of multiple databases once we have the basic flow to chain creating of tasks in place for
+ a database ( directory )
+ */
+ BootstrapEventsIterator iterator = work.iterator();
+ ConstraintEventsIterator constraintIterator = work.constraintIterator();
+ /*
+ This is used to get hold of a reference during the current creation of tasks and is initialized
+ with "0" tasks such that it will be non consequential in any operations done with task tracker
+ compositions.
+ */
+ TaskTracker dbTracker = new TaskTracker(ZERO_TASKS);
+ TaskTracker tableTracker = new TaskTracker(ZERO_TASKS);
+ Scope scope = new Scope();
+ boolean loadingConstraint = false;
+ if (!iterator.hasNext() && constraintIterator.hasNext()) {
+ loadingConstraint = true;
+ }
+ while ((iterator.hasNext() || (loadingConstraint && constraintIterator.hasNext())) && loadTaskTracker.canAddMoreTasks()) {
+ BootstrapEvent next;
+ if (!loadingConstraint) {
+ next = iterator.next();
+ } else {
+ next = constraintIterator.next();
+ }
+ switch (next.eventType()) {
+ case Database:
+ DatabaseEvent dbEvent = (DatabaseEvent) next;
+ dbTracker =
+ new LoadDatabase(context, dbEvent, work.dbNameToLoadIn, loadTaskTracker)
+ .tasks();
+ loadTaskTracker.update(dbTracker);
+ if (work.hasDbState()) {
+ loadTaskTracker.update(updateDatabaseLastReplID(maxTasks, context, scope));
+ }
+ work.updateDbEventState(dbEvent.toState());
+ if (dbTracker.hasTasks()) {
+ scope.rootTasks.addAll(dbTracker.tasks());
+ scope.database = true;
+ }
+ dbTracker.debugLog("database");
+ break;
+ case Table: {
+ /*
+ Implicit assumption here is that database level is processed first before table level,
+ which will depend on the iterator used since it should provide the higher level directory
+ listing before providing the lower level listing. This is also required such that
+ the dbTracker / tableTracker are setup correctly always.
+ */
+ TableContext tableContext =
+ new TableContext(dbTracker, work.dbNameToLoadIn, work.tableNameToLoadIn);
+ TableEvent tableEvent = (TableEvent) next;
+ LoadTable loadTable = new LoadTable(tableEvent, context, iterator.replLogger(),
+ tableContext, loadTaskTracker);
+ tableTracker = loadTable.tasks();
+ setUpDependencies(dbTracker, tableTracker);
+ if (!scope.database && tableTracker.hasTasks()) {
+ scope.rootTasks.addAll(tableTracker.tasks());
+ scope.table = true;
+ }
+ /*
+ for table replication if we reach the max number of tasks then for the next run we will
+ try to reload the same table again, this is mainly for ease of understanding the code
+ as then we can avoid handling == > loading partitions for the table given that
+ the creation of table lead to reaching max tasks vs, loading next table since current
+ one does not have partitions.
+ */
+
+ // for a table we explicitly try to load partitions as there is no separate partitions events.
+ LoadPartitions loadPartitions =
+ new LoadPartitions(context, iterator.replLogger(), loadTaskTracker, tableEvent,
+ work.dbNameToLoadIn, tableContext);
+ TaskTracker partitionsTracker = loadPartitions.tasks();
+ partitionsPostProcessing(iterator, scope, loadTaskTracker, tableTracker,
+ partitionsTracker);
+ tableTracker.debugLog("table");
+ partitionsTracker.debugLog("partitions for table");
+ break;
+ }
+ case Partition: {
+ /*
+ This will happen only when loading tables and we reach the limit of number of tasks we can create;
+ hence we know here that the table should exist and there should be a lastPartitionName
+ */
+ PartitionEvent event = (PartitionEvent) next;
+ TableContext tableContext = new TableContext(dbTracker, work.dbNameToLoadIn,
+ work.tableNameToLoadIn);
+ LoadPartitions loadPartitions =
+ new LoadPartitions(context, iterator.replLogger(), tableContext, loadTaskTracker,
+ event.asTableEvent(), work.dbNameToLoadIn, event.lastPartitionReplicated());
+ /*
+ the tableTracker here should be a new instance and not an existing one as this can
+ only happen when we break in between loading partitions.
+ */
+ TaskTracker partitionsTracker = loadPartitions.tasks();
+ partitionsPostProcessing(iterator, scope, loadTaskTracker, tableTracker,
+ partitionsTracker);
+ partitionsTracker.debugLog("partitions");
+ break;
+ }
+ case Function: {
+ LoadFunction loadFunction = new LoadFunction(context, iterator.replLogger(),
+ (FunctionEvent) next, work.dbNameToLoadIn, dbTracker);
+ TaskTracker functionsTracker = loadFunction.tasks();
+ if (!scope.database) {
+ scope.rootTasks.addAll(functionsTracker.tasks());
+ } else {
+ setUpDependencies(dbTracker, functionsTracker);
+ }
+ loadTaskTracker.update(functionsTracker);
+ functionsTracker.debugLog("functions");
+ break;
+ }
+ case Constraint: {
+ LoadConstraint loadConstraint =
+ new LoadConstraint(context, (ConstraintEvent) next, work.dbNameToLoadIn, dbTracker);
+ TaskTracker constraintTracker = loadConstraint.tasks();
+ scope.rootTasks.addAll(constraintTracker.tasks());
+ loadTaskTracker.update(constraintTracker);
+ constraintTracker.debugLog("constraints");
+ }
+ }
+
+ if (!loadingConstraint && !iterator.currentDbHasNext()) {
+ createEndReplLogTask(context, scope, iterator.replLogger());
+ }
+ }
+ boolean addAnotherLoadTask = iterator.hasNext() || loadTaskTracker.hasReplicationState()
+ || constraintIterator.hasNext();
+ if (addAnotherLoadTask) {
+ createBuilderTask(scope.rootTasks);
+ }
+ if (!iterator.hasNext() && !constraintIterator.hasNext()) {
+ loadTaskTracker.update(updateDatabaseLastReplID(maxTasks, context, scope));
+ work.updateDbEventState(null);
+ }
+ this.childTasks = scope.rootTasks;
+ /*
+ Since there can be multiple rounds of this run all of which will be tied to the same
+ query id -- generated in compile phase , adding a additional UUID to the end to print each run
+ in separate files.
+ */
+ LOG.info("Root Tasks / Total Tasks : {} / {} ", childTasks.size(), loadTaskTracker.numberOfTasks());
+
+ // Populate the driver context with the scratch dir info from the repl context, so that the temp dirs will be cleaned up later
+ driverContext.getCtx().getFsScratchDirs().putAll(context.pathInfo.getFsScratchDirs());
+ } catch (RuntimeException e) {
+ LOG.error("replication failed with run time exception", e);
+ throw e;
+ } catch (Exception e) {
+ LOG.error("replication failed", e);
+ setException(e);
+ return ErrorMsg.getErrorMsg(e.getMessage()).getErrorCode();
+ }
+ LOG.info("completed load task run : {}", work.executedLoadTask());
+ return 0;
+ }
+
+ private void createEndReplLogTask(Context context, Scope scope,
+ ReplLogger replLogger) throws SemanticException {
+ Database dbInMetadata = work.databaseEvent(context.hiveConf).dbInMetadata(work.dbNameToLoadIn);
+ ReplStateLogWork replLogWork = new ReplStateLogWork(replLogger, dbInMetadata.getParameters());
+ Task<ReplStateLogWork> replLogTask = TaskFactory.get(replLogWork);
+ if (scope.rootTasks.isEmpty()) {
+ scope.rootTasks.add(replLogTask);
+ } else {
+ DAGTraversal.traverse(scope.rootTasks,
+ new AddDependencyToLeaves(Collections.singletonList(replLogTask)));
+ }
+ }
+
+ /**
+ * There was a database update done before and we want to make sure we update the last repl
+ * id on this database as we are now going to switch to processing a new database.
+ *
+ * This has to be last task in the graph since if there are intermediate tasks and the last.repl.id
+ * is a root level task then in the execution phase the root level tasks will get executed first,
+ * however if any of the child tasks of the bootstrap load failed then even though the bootstrap has failed
+ * the last repl status of the target database will return a valid value, which will not represent
+ * the state of the database.
+ */
+ private TaskTracker updateDatabaseLastReplID(int maxTasks, Context context, Scope scope)
+ throws SemanticException {
+ /*
+ we don't want to put any limits on this task as this is essential before we start
+ processing new database events.
+ */
+ TaskTracker taskTracker =
+ new AlterDatabase(context, work.databaseEvent(context.hiveConf), work.dbNameToLoadIn,
+ new TaskTracker(maxTasks)).tasks();
+
+ AddDependencyToLeaves function = new AddDependencyToLeaves(taskTracker.tasks());
+ DAGTraversal.traverse(scope.rootTasks, function);
+
+ return taskTracker;
+ }
+
+ private void partitionsPostProcessing(BootstrapEventsIterator iterator,
+ Scope scope, TaskTracker loadTaskTracker, TaskTracker tableTracker,
+ TaskTracker partitionsTracker) throws SemanticException {
+ setUpDependencies(tableTracker, partitionsTracker);
+ if (!scope.database && !scope.table) {
+ scope.rootTasks.addAll(partitionsTracker.tasks());
+ scope.partition = true;
+ }
+ loadTaskTracker.update(tableTracker);
+ loadTaskTracker.update(partitionsTracker);
+ if (partitionsTracker.hasReplicationState()) {
+ iterator.setReplicationState(partitionsTracker.replicationState());
+ }
+ }
+
+ /*
+ This sets up dependencies such that a child task is dependant on the parent to be complete.
+ */
+ private void setUpDependencies(TaskTracker parentTasks, TaskTracker childTasks) {
+ if (parentTasks.hasTasks()) {
+ for (Task<? extends Serializable> parentTask : parentTasks.tasks()) {
+ for (Task<? extends Serializable> childTask : childTasks.tasks()) {
+ parentTask.addDependentTask(childTask);
+ }
+ }
+ } else {
+ for (Task<? extends Serializable> childTask : childTasks.tasks()) {
+ parentTasks.addTask(childTask);
+ }
+ }
+ }
+
+ private void createBuilderTask(List<Task<? extends Serializable>> rootTasks) {
+ // Use loadTask as dependencyCollection
+ Task<ReplLoadWork> loadTask = TaskFactory.get(work, conf);
+ DAGTraversal.traverse(rootTasks, new AddDependencyToLeaves(loadTask));
+ }
+
+ @Override
+ public StageType getType() {
+ return work.isIncrementalLoad() ? StageType.REPL_INCREMENTAL_LOAD : StageType.REPL_BOOTSTRAP_LOAD;
+ }
+
+ private int executeIncrementalLoad(DriverContext driverContext) {
+ try {
+ IncrementalLoadTasksBuilder load = work.getIncrementalLoadTaskBuilder();
+ this.childTasks = Collections.singletonList(load.build(driverContext, getHive(), LOG));
+ if (work.getIncrementalIterator().hasNext()) {
+ // attach a load task at the tail of task list to start the next iteration.
+ createBuilderTask(this.childTasks);
+ }
+ return 0;
+ } catch (Exception e) {
+ LOG.error("failed replication", e);
+ setException(e);
+ return 1;
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/hive/blob/67b0a67c/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/ReplLoadWork.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/ReplLoadWork.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/ReplLoadWork.java
new file mode 100644
index 0000000..8921e94
--- /dev/null
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/ReplLoadWork.java
@@ -0,0 +1,113 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hive.ql.exec.repl;
+
+import org.apache.hadoop.hive.conf.HiveConf;
+import org.apache.hadoop.hive.ql.exec.repl.bootstrap.events.DatabaseEvent;
+import org.apache.hadoop.hive.ql.exec.repl.bootstrap.events.filesystem.BootstrapEventsIterator;
+import org.apache.hadoop.hive.ql.exec.repl.bootstrap.events.filesystem.ConstraintEventsIterator;
+import org.apache.hadoop.hive.ql.exec.repl.incremental.IncrementalLoadEventsIterator;
+import org.apache.hadoop.hive.ql.exec.repl.incremental.IncrementalLoadTasksBuilder;
+import org.apache.hadoop.hive.ql.plan.Explain;
+import org.apache.hadoop.hive.ql.session.LineageState;
+
+import java.io.IOException;
+import java.io.Serializable;
+
+@Explain(displayName = "Replication Load Operator", explainLevels = { Explain.Level.USER,
+ Explain.Level.DEFAULT,
+ Explain.Level.EXTENDED })
+public class ReplLoadWork implements Serializable {
+ final String dbNameToLoadIn;
+ final String tableNameToLoadIn;
+ final String dumpDirectory;
+ private final transient BootstrapEventsIterator bootstrapIterator;
+ private final ConstraintEventsIterator constraintsIterator;
+ private final transient IncrementalLoadEventsIterator incrementalIterator;
+ private int loadTaskRunCount = 0;
+ private DatabaseEvent.State state = null;
+ private final transient IncrementalLoadTasksBuilder incrementalLoad;
+
+ /*
+ these are sessionState objects that are copied over to work to allow for parallel execution.
+ based on the current use case the methods are selectively synchronized, which might need to be
+ taken care when using other methods.
+ */
+ final LineageState sessionStateLineageState;
+
+ public ReplLoadWork(HiveConf hiveConf, String dumpDirectory, String dbNameToLoadIn,
+ String tableNameToLoadIn, LineageState lineageState, boolean isIncrementalDump) throws IOException {
+ this.tableNameToLoadIn = tableNameToLoadIn;
+ sessionStateLineageState = lineageState;
+ this.dumpDirectory = dumpDirectory;
+ this.dbNameToLoadIn = dbNameToLoadIn;
+ if (isIncrementalDump) {
+ incrementalIterator = new IncrementalLoadEventsIterator(dumpDirectory, hiveConf);
+ this.bootstrapIterator = null;
+ this.constraintsIterator = null;
+ incrementalLoad = new IncrementalLoadTasksBuilder(dbNameToLoadIn, tableNameToLoadIn, dumpDirectory,
+ incrementalIterator, hiveConf);
+ } else {
+ this.bootstrapIterator = new BootstrapEventsIterator(dumpDirectory, dbNameToLoadIn, hiveConf);
+ this.constraintsIterator = new ConstraintEventsIterator(dumpDirectory, hiveConf);
+ incrementalIterator = null;
+ incrementalLoad = null;
+ }
+ }
+
+ public ReplLoadWork(HiveConf hiveConf, String dumpDirectory, String dbNameOrPattern,
+ LineageState lineageState) throws IOException {
+ this(hiveConf, dumpDirectory, dbNameOrPattern, null, lineageState, false);
+ }
+
+ public BootstrapEventsIterator iterator() {
+ return bootstrapIterator;
+ }
+
+ public ConstraintEventsIterator constraintIterator() {
+ return constraintsIterator;
+ }
+
+ int executedLoadTask() {
+ return ++loadTaskRunCount;
+ }
+
+ void updateDbEventState(DatabaseEvent.State state) {
+ this.state = state;
+ }
+
+ DatabaseEvent databaseEvent(HiveConf hiveConf) {
+ return state.toEvent(hiveConf);
+ }
+
+ boolean hasDbState() {
+ return state != null;
+ }
+
+ public boolean isIncrementalLoad() {
+ return incrementalIterator != null;
+ }
+
+ public IncrementalLoadEventsIterator getIncrementalIterator() {
+ return incrementalIterator;
+ }
+
+ public IncrementalLoadTasksBuilder getIncrementalLoadTaskBuilder() {
+ return incrementalLoad;
+ }
+}
http://git-wip-us.apache.org/repos/asf/hive/blob/67b0a67c/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/ReplUtils.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/ReplUtils.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/ReplUtils.java
deleted file mode 100644
index 18a8304..0000000
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/ReplUtils.java
+++ /dev/null
@@ -1,123 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.hadoop.hive.ql.exec.repl;
-
-import org.apache.hadoop.hive.conf.HiveConf;
-import org.apache.hadoop.hive.metastore.api.InvalidOperationException;
-import org.apache.hadoop.hive.ql.exec.Task;
-import org.apache.hadoop.hive.ql.exec.TaskFactory;
-import org.apache.hadoop.hive.ql.metadata.Table;
-import org.apache.hadoop.hive.ql.parse.DDLSemanticAnalyzer;
-import org.apache.hadoop.hive.ql.parse.SemanticException;
-import org.apache.hadoop.hive.ql.parse.repl.ReplLogger;
-import org.apache.hadoop.hive.ql.plan.AlterTableDesc;
-import org.apache.hadoop.hive.ql.plan.DDLWork;
-import org.apache.hadoop.hive.ql.plan.ExprNodeColumnDesc;
-import org.apache.hadoop.hive.ql.plan.ExprNodeConstantDesc;
-import org.apache.hadoop.hive.ql.plan.ExprNodeGenericFuncDesc;
-import org.apache.hadoop.hive.ql.plan.ImportTableDesc;
-import org.apache.hadoop.hive.ql.stats.StatsUtils;
-import org.apache.hadoop.hive.serde2.typeinfo.PrimitiveTypeInfo;
-import org.apache.hadoop.hive.serde2.typeinfo.TypeInfoFactory;
-
-import java.util.ArrayList;
-import java.util.HashMap;
-import java.util.HashSet;
-import java.util.List;
-import java.util.Map;
-
-
-public class ReplUtils {
-
- public static final String REPL_CHECKPOINT_KEY = "hive.repl.ckpt.key";
-
- /**
- * Bootstrap REPL LOAD operation type on the examined object based on ckpt state.
- */
- public enum ReplLoadOpType {
- LOAD_NEW, LOAD_SKIP, LOAD_REPLACE
- }
-
- public static Map<Integer, List<ExprNodeGenericFuncDesc>> genPartSpecs(
- Table table, List<Map<String, String>> partitions) throws SemanticException {
- Map<Integer, List<ExprNodeGenericFuncDesc>> partSpecs = new HashMap<>();
- int partPrefixLength = 0;
- if (partitions.size() > 0) {
- partPrefixLength = partitions.get(0).size();
- // pick the length of the first ptn, we expect all ptns listed to have the same number of
- // key-vals.
- }
- List<ExprNodeGenericFuncDesc> partitionDesc = new ArrayList<>();
- for (Map<String, String> ptn : partitions) {
- // convert each key-value-map to appropriate expression.
- ExprNodeGenericFuncDesc expr = null;
- for (Map.Entry<String, String> kvp : ptn.entrySet()) {
- String key = kvp.getKey();
- Object val = kvp.getValue();
- String type = table.getPartColByName(key).getType();
- PrimitiveTypeInfo pti = TypeInfoFactory.getPrimitiveTypeInfo(type);
- ExprNodeColumnDesc column = new ExprNodeColumnDesc(pti, key, null, true);
- ExprNodeGenericFuncDesc op = DDLSemanticAnalyzer.makeBinaryPredicate(
- "=", column, new ExprNodeConstantDesc(TypeInfoFactory.stringTypeInfo, val));
- expr = (expr == null) ? op : DDLSemanticAnalyzer.makeBinaryPredicate("and", expr, op);
- }
- if (expr != null) {
- partitionDesc.add(expr);
- }
- }
- if (partitionDesc.size() > 0) {
- partSpecs.put(partPrefixLength, partitionDesc);
- }
- return partSpecs;
- }
-
- public static Task<?> getTableReplLogTask(ImportTableDesc tableDesc, ReplLogger replLogger, HiveConf conf)
- throws SemanticException {
- ReplStateLogWork replLogWork = new ReplStateLogWork(replLogger, tableDesc.getTableName(), tableDesc.tableType());
- return TaskFactory.get(replLogWork, conf);
- }
-
- public static Task<?> getTableCheckpointTask(ImportTableDesc tableDesc, HashMap<String, String> partSpec,
- String dumpRoot, HiveConf conf) throws SemanticException {
- HashMap<String, String> mapProp = new HashMap<>();
- mapProp.put(REPL_CHECKPOINT_KEY, dumpRoot);
-
- AlterTableDesc alterTblDesc = new AlterTableDesc(AlterTableDesc.AlterTableTypes.ADDPROPS);
- alterTblDesc.setProps(mapProp);
- alterTblDesc.setOldName(
- StatsUtils.getFullyQualifiedTableName(tableDesc.getDatabaseName(), tableDesc.getTableName()));
- if (partSpec != null) {
- alterTblDesc.setPartSpec(partSpec);
- }
- return TaskFactory.get(new DDLWork(new HashSet<>(), new HashSet<>(), alterTblDesc), conf);
- }
-
- public static boolean replCkptStatus(String dbName, Map<String, String> props, String dumpRoot)
- throws InvalidOperationException {
- // If ckpt property not set or empty means, bootstrap is not run on this object.
- if ((props != null) && props.containsKey(REPL_CHECKPOINT_KEY) && !props.get(REPL_CHECKPOINT_KEY).isEmpty()) {
- if (props.get(REPL_CHECKPOINT_KEY).equals(dumpRoot)) {
- return true;
- }
- throw new InvalidOperationException("REPL LOAD with Dump: " + dumpRoot
- + " is not allowed as the target DB: " + dbName
- + " is already bootstrap loaded by another Dump " + props.get(REPL_CHECKPOINT_KEY));
- }
- return false;
- }
-}
http://git-wip-us.apache.org/repos/asf/hive/blob/67b0a67c/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/bootstrap/AddDependencyToLeaves.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/bootstrap/AddDependencyToLeaves.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/bootstrap/AddDependencyToLeaves.java
deleted file mode 100644
index 0313058..0000000
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/bootstrap/AddDependencyToLeaves.java
+++ /dev/null
@@ -1,51 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.hadoop.hive.ql.exec.repl.bootstrap;
-
-import org.apache.hadoop.hive.ql.exec.Task;
-import org.apache.hadoop.hive.ql.exec.util.DAGTraversal;
-
-import java.io.Serializable;
-import java.util.Collections;
-import java.util.List;
-
-public class AddDependencyToLeaves implements DAGTraversal.Function {
- private List<Task<? extends Serializable>> postDependencyCollectionTasks;
-
- AddDependencyToLeaves(List<Task<? extends Serializable>> postDependencyCollectionTasks) {
- this.postDependencyCollectionTasks = postDependencyCollectionTasks;
- }
-
- public AddDependencyToLeaves(Task<? extends Serializable> postDependencyTask) {
- this(Collections.singletonList(postDependencyTask));
- }
-
-
- @Override
- public void process(Task<? extends Serializable> task) {
- if (task.getChildTasks() == null) {
- postDependencyCollectionTasks.forEach(task::addDependentTask);
- }
- }
-
- @Override
- public boolean skipProcessing(Task<? extends Serializable> task) {
- return postDependencyCollectionTasks.contains(task);
- }
-}
http://git-wip-us.apache.org/repos/asf/hive/blob/67b0a67c/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/bootstrap/ReplLoadTask.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/bootstrap/ReplLoadTask.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/bootstrap/ReplLoadTask.java
deleted file mode 100644
index 50fe3ac..0000000
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/bootstrap/ReplLoadTask.java
+++ /dev/null
@@ -1,319 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.hadoop.hive.ql.exec.repl.bootstrap;
-
-import org.apache.hadoop.hive.conf.HiveConf;
-import org.apache.hadoop.hive.metastore.api.Database;
-import org.apache.hadoop.hive.ql.DriverContext;
-import org.apache.hadoop.hive.ql.ErrorMsg;
-import org.apache.hadoop.hive.ql.exec.Task;
-import org.apache.hadoop.hive.ql.exec.TaskFactory;
-import org.apache.hadoop.hive.ql.exec.repl.ReplStateLogWork;
-import org.apache.hadoop.hive.ql.exec.repl.bootstrap.events.BootstrapEvent;
-import org.apache.hadoop.hive.ql.exec.repl.bootstrap.events.ConstraintEvent;
-import org.apache.hadoop.hive.ql.exec.repl.bootstrap.events.DatabaseEvent;
-import org.apache.hadoop.hive.ql.exec.repl.bootstrap.events.FunctionEvent;
-import org.apache.hadoop.hive.ql.exec.repl.bootstrap.events.PartitionEvent;
-import org.apache.hadoop.hive.ql.exec.repl.bootstrap.events.TableEvent;
-import org.apache.hadoop.hive.ql.exec.repl.bootstrap.events.filesystem.BootstrapEventsIterator;
-import org.apache.hadoop.hive.ql.exec.repl.bootstrap.events.filesystem.ConstraintEventsIterator;
-import org.apache.hadoop.hive.ql.exec.repl.bootstrap.load.LoadConstraint;
-import org.apache.hadoop.hive.ql.exec.repl.bootstrap.load.LoadDatabase;
-import org.apache.hadoop.hive.ql.exec.repl.bootstrap.load.LoadFunction;
-import org.apache.hadoop.hive.ql.exec.repl.bootstrap.load.TaskTracker;
-import org.apache.hadoop.hive.ql.exec.repl.bootstrap.load.table.LoadPartitions;
-import org.apache.hadoop.hive.ql.exec.repl.bootstrap.load.table.LoadTable;
-import org.apache.hadoop.hive.ql.exec.repl.bootstrap.load.table.TableContext;
-import org.apache.hadoop.hive.ql.exec.repl.bootstrap.load.util.Context;
-import org.apache.hadoop.hive.ql.exec.util.DAGTraversal;
-import org.apache.hadoop.hive.ql.parse.SemanticException;
-import org.apache.hadoop.hive.ql.parse.repl.ReplLogger;
-import org.apache.hadoop.hive.ql.plan.api.StageType;
-
-import java.io.Serializable;
-import java.util.ArrayList;
-import java.util.Collections;
-import java.util.List;
-
-import static org.apache.hadoop.hive.ql.exec.repl.bootstrap.load.LoadDatabase.AlterDatabase;
-
-public class ReplLoadTask extends Task<ReplLoadWork> implements Serializable {
- private final static int ZERO_TASKS = 0;
-
- @Override
- public String getName() {
- return "REPL_BOOTSTRAP_LOAD";
- }
-
- /**
- * Provides the root Tasks created as a result of this loadTask run which will be executed
- * by the driver. It does not track details across multiple runs of LoadTask.
- */
- private static class Scope {
- boolean database = false, table = false, partition = false;
- List<Task<? extends Serializable>> rootTasks = new ArrayList<>();
- }
-
- @Override
- protected int execute(DriverContext driverContext) {
- try {
- int maxTasks = conf.getIntVar(HiveConf.ConfVars.REPL_APPROX_MAX_LOAD_TASKS);
- Context context = new Context(work.dumpDirectory, conf, getHive(),
- work.sessionStateLineageState, driverContext.getCtx());
- TaskTracker loadTaskTracker = new TaskTracker(maxTasks);
- /*
- for now for simplicity we are doing just one directory ( one database ), come back to use
- of multiple databases once we have the basic flow to chain creating of tasks in place for
- a database ( directory )
- */
- BootstrapEventsIterator iterator = work.iterator();
- ConstraintEventsIterator constraintIterator = work.constraintIterator();
- /*
- This is used to get hold of a reference during the current creation of tasks and is initialized
- with "0" tasks such that it will be non consequential in any operations done with task tracker
- compositions.
- */
- TaskTracker dbTracker = new TaskTracker(ZERO_TASKS);
- TaskTracker tableTracker = new TaskTracker(ZERO_TASKS);
- Scope scope = new Scope();
- boolean loadingConstraint = false;
- if (!iterator.hasNext() && constraintIterator.hasNext()) {
- loadingConstraint = true;
- }
- while ((iterator.hasNext() || (loadingConstraint && constraintIterator.hasNext())) && loadTaskTracker.canAddMoreTasks()) {
- BootstrapEvent next;
- if (!loadingConstraint) {
- next = iterator.next();
- } else {
- next = constraintIterator.next();
- }
- switch (next.eventType()) {
- case Database:
- DatabaseEvent dbEvent = (DatabaseEvent) next;
- dbTracker =
- new LoadDatabase(context, dbEvent, work.dbNameToLoadIn, loadTaskTracker)
- .tasks();
- loadTaskTracker.update(dbTracker);
- if (work.hasDbState()) {
- loadTaskTracker.update(updateDatabaseLastReplID(maxTasks, context, scope));
- }
- work.updateDbEventState(dbEvent.toState());
- if (dbTracker.hasTasks()) {
- scope.rootTasks.addAll(dbTracker.tasks());
- scope.database = true;
- }
- dbTracker.debugLog("database");
- break;
- case Table: {
- /*
- Implicit assumption here is that database level is processed first before table level,
- which will depend on the iterator used since it should provide the higher level directory
- listing before providing the lower level listing. This is also required such that
- the dbTracker / tableTracker are setup correctly always.
- */
- TableContext tableContext =
- new TableContext(dbTracker, work.dbNameToLoadIn, work.tableNameToLoadIn);
- TableEvent tableEvent = (TableEvent) next;
- LoadTable loadTable = new LoadTable(tableEvent, context, iterator.replLogger(),
- tableContext, loadTaskTracker);
- tableTracker = loadTable.tasks();
- setUpDependencies(dbTracker, tableTracker);
- if (!scope.database && tableTracker.hasTasks()) {
- scope.rootTasks.addAll(tableTracker.tasks());
- scope.table = true;
- }
- /*
- for table replication if we reach the max number of tasks then for the next run we will
- try to reload the same table again, this is mainly for ease of understanding the code
- as then we can avoid handling == > loading partitions for the table given that
- the creation of table lead to reaching max tasks vs, loading next table since current
- one does not have partitions.
- */
-
- // for a table we explicitly try to load partitions as there is no separate partitions events.
- LoadPartitions loadPartitions =
- new LoadPartitions(context, iterator.replLogger(), loadTaskTracker, tableEvent,
- work.dbNameToLoadIn, tableContext);
- TaskTracker partitionsTracker = loadPartitions.tasks();
- partitionsPostProcessing(iterator, scope, loadTaskTracker, tableTracker,
- partitionsTracker);
- tableTracker.debugLog("table");
- partitionsTracker.debugLog("partitions for table");
- break;
- }
- case Partition: {
- /*
- This will happen only when loading tables and we reach the limit of number of tasks we can create;
- hence we know here that the table should exist and there should be a lastPartitionName
- */
- PartitionEvent event = (PartitionEvent) next;
- TableContext tableContext = new TableContext(dbTracker, work.dbNameToLoadIn,
- work.tableNameToLoadIn);
- LoadPartitions loadPartitions =
- new LoadPartitions(context, iterator.replLogger(), tableContext, loadTaskTracker,
- event.asTableEvent(), work.dbNameToLoadIn, event.lastPartitionReplicated());
- /*
- the tableTracker here should be a new instance and not an existing one as this can
- only happen when we break in between loading partitions.
- */
- TaskTracker partitionsTracker = loadPartitions.tasks();
- partitionsPostProcessing(iterator, scope, loadTaskTracker, tableTracker,
- partitionsTracker);
- partitionsTracker.debugLog("partitions");
- break;
- }
- case Function: {
- LoadFunction loadFunction = new LoadFunction(context, iterator.replLogger(),
- (FunctionEvent) next, work.dbNameToLoadIn, dbTracker);
- TaskTracker functionsTracker = loadFunction.tasks();
- if (!scope.database) {
- scope.rootTasks.addAll(functionsTracker.tasks());
- } else {
- setUpDependencies(dbTracker, functionsTracker);
- }
- loadTaskTracker.update(functionsTracker);
- functionsTracker.debugLog("functions");
- break;
- }
- case Constraint: {
- LoadConstraint loadConstraint =
- new LoadConstraint(context, (ConstraintEvent) next, work.dbNameToLoadIn, dbTracker);
- TaskTracker constraintTracker = loadConstraint.tasks();
- scope.rootTasks.addAll(constraintTracker.tasks());
- loadTaskTracker.update(constraintTracker);
- constraintTracker.debugLog("constraints");
- }
- }
-
- if (!loadingConstraint && !iterator.currentDbHasNext()) {
- createEndReplLogTask(context, scope, iterator.replLogger());
- }
- }
- boolean addAnotherLoadTask = iterator.hasNext() || loadTaskTracker.hasReplicationState()
- || constraintIterator.hasNext();
- createBuilderTask(scope.rootTasks, addAnotherLoadTask);
- if (!iterator.hasNext() && !constraintIterator.hasNext()) {
- loadTaskTracker.update(updateDatabaseLastReplID(maxTasks, context, scope));
- work.updateDbEventState(null);
- }
- this.childTasks = scope.rootTasks;
- /*
- Since there can be multiple rounds of this run all of which will be tied to the same
- query id -- generated in compile phase , adding a additional UUID to the end to print each run
- in separate files.
- */
- LOG.info("Root Tasks / Total Tasks : {} / {} ", childTasks.size(), loadTaskTracker.numberOfTasks());
-
- // Populate the driver context with the scratch dir info from the repl context, so that the temp dirs will be cleaned up later
- driverContext.getCtx().getFsScratchDirs().putAll(context.pathInfo.getFsScratchDirs());
- } catch (Exception e) {
- LOG.error("failed replication", e);
- setException(e);
- return ErrorMsg.getErrorMsg(e.getMessage()).getErrorCode();
- }
- LOG.info("completed load task run : {}", work.executedLoadTask());
- return 0;
- }
-
- private void createEndReplLogTask(Context context, Scope scope,
- ReplLogger replLogger) throws SemanticException {
- Database dbInMetadata = work.databaseEvent(context.hiveConf).dbInMetadata(work.dbNameToLoadIn);
- ReplStateLogWork replLogWork = new ReplStateLogWork(replLogger, dbInMetadata.getParameters());
- Task<ReplStateLogWork> replLogTask = TaskFactory.get(replLogWork);
- if (scope.rootTasks.isEmpty()) {
- scope.rootTasks.add(replLogTask);
- } else {
- DAGTraversal.traverse(scope.rootTasks,
- new AddDependencyToLeaves(Collections.singletonList(replLogTask)));
- }
- }
-
- /**
- * There was a database update done before and we want to make sure we update the last repl
- * id on this database as we are now going to switch to processing a new database.
- *
- * This has to be last task in the graph since if there are intermediate tasks and the last.repl.id
- * is a root level task then in the execution phase the root level tasks will get executed first,
- * however if any of the child tasks of the bootstrap load failed then even though the bootstrap has failed
- * the last repl status of the target database will return a valid value, which will not represent
- * the state of the database.
- */
- private TaskTracker updateDatabaseLastReplID(int maxTasks, Context context, Scope scope)
- throws SemanticException {
- /*
- we don't want to put any limits on this task as this is essential before we start
- processing new database events.
- */
- TaskTracker taskTracker =
- new AlterDatabase(context, work.databaseEvent(context.hiveConf), work.dbNameToLoadIn,
- new TaskTracker(maxTasks)).tasks();
-
- AddDependencyToLeaves function = new AddDependencyToLeaves(taskTracker.tasks());
- DAGTraversal.traverse(scope.rootTasks, function);
-
- return taskTracker;
- }
-
- private void partitionsPostProcessing(BootstrapEventsIterator iterator,
- Scope scope, TaskTracker loadTaskTracker, TaskTracker tableTracker,
- TaskTracker partitionsTracker) throws SemanticException {
- setUpDependencies(tableTracker, partitionsTracker);
- if (!scope.database && !scope.table) {
- scope.rootTasks.addAll(partitionsTracker.tasks());
- scope.partition = true;
- }
- loadTaskTracker.update(tableTracker);
- loadTaskTracker.update(partitionsTracker);
- if (partitionsTracker.hasReplicationState()) {
- iterator.setReplicationState(partitionsTracker.replicationState());
- }
- }
-
- /*
- This sets up dependencies such that a child task is dependant on the parent to be complete.
- */
- private void setUpDependencies(TaskTracker parentTasks, TaskTracker childTasks) {
- if (parentTasks.hasTasks()) {
- for (Task<? extends Serializable> parentTask : parentTasks.tasks()) {
- for (Task<? extends Serializable> childTask : childTasks.tasks()) {
- parentTask.addDependentTask(childTask);
- }
- }
- } else {
- for (Task<? extends Serializable> childTask : childTasks.tasks()) {
- parentTasks.addTask(childTask);
- }
- }
- }
-
- private void createBuilderTask(List<Task<? extends Serializable>> rootTasks,
- boolean shouldCreateAnotherLoadTask) {
- /*
- use loadTask as dependencyCollection
- */
- if (shouldCreateAnotherLoadTask) {
- Task<ReplLoadWork> loadTask = TaskFactory.get(work, conf);
- DAGTraversal.traverse(rootTasks, new AddDependencyToLeaves(loadTask));
- }
- }
-
- @Override
- public StageType getType() {
- return StageType.REPL_BOOTSTRAP_LOAD;
- }
-}
http://git-wip-us.apache.org/repos/asf/hive/blob/67b0a67c/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/bootstrap/ReplLoadWork.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/bootstrap/ReplLoadWork.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/bootstrap/ReplLoadWork.java
deleted file mode 100644
index 048727f..0000000
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/bootstrap/ReplLoadWork.java
+++ /dev/null
@@ -1,88 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.hadoop.hive.ql.exec.repl.bootstrap;
-
-import org.apache.hadoop.hive.conf.HiveConf;
-import org.apache.hadoop.hive.ql.exec.repl.bootstrap.events.DatabaseEvent;
-import org.apache.hadoop.hive.ql.exec.repl.bootstrap.events.filesystem.BootstrapEventsIterator;
-import org.apache.hadoop.hive.ql.exec.repl.bootstrap.events.filesystem.ConstraintEventsIterator;
-import org.apache.hadoop.hive.ql.plan.Explain;
-import org.apache.hadoop.hive.ql.session.LineageState;
-
-import java.io.IOException;
-import java.io.Serializable;
-
-@Explain(displayName = "Replication Load Operator", explainLevels = { Explain.Level.USER,
- Explain.Level.DEFAULT,
- Explain.Level.EXTENDED })
-public class ReplLoadWork implements Serializable {
- final String dbNameToLoadIn;
- final String tableNameToLoadIn;
- final String dumpDirectory;
- private final BootstrapEventsIterator iterator;
- private final ConstraintEventsIterator constraintsIterator;
- private int loadTaskRunCount = 0;
- private DatabaseEvent.State state = null;
-
- /*
- these are sessionState objects that are copied over to work to allow for parallel execution.
- based on the current use case the methods are selectively synchronized, which might need to be
- taken care when using other methods.
- */
- final LineageState sessionStateLineageState;
-
- public ReplLoadWork(HiveConf hiveConf, String dumpDirectory, String dbNameToLoadIn,
- String tableNameToLoadIn, LineageState lineageState)
- throws IOException {
- this.tableNameToLoadIn = tableNameToLoadIn;
- sessionStateLineageState = lineageState;
- this.dumpDirectory = dumpDirectory;
- this.iterator = new BootstrapEventsIterator(dumpDirectory, dbNameToLoadIn, hiveConf);
- this.constraintsIterator = new ConstraintEventsIterator(dumpDirectory, hiveConf);
- this.dbNameToLoadIn = dbNameToLoadIn;
- }
-
- public ReplLoadWork(HiveConf hiveConf, String dumpDirectory, String dbNameOrPattern,
- LineageState lineageState) throws IOException {
- this(hiveConf, dumpDirectory, dbNameOrPattern, null, lineageState);
- }
-
- public BootstrapEventsIterator iterator() {
- return iterator;
- }
-
- public ConstraintEventsIterator constraintIterator() {
- return constraintsIterator;
- }
-
- int executedLoadTask() {
- return ++loadTaskRunCount;
- }
-
- void updateDbEventState(DatabaseEvent.State state) {
- this.state = state;
- }
-
- DatabaseEvent databaseEvent(HiveConf hiveConf) {
- return state.toEvent(hiveConf);
- }
-
- boolean hasDbState() {
- return state != null;
- }
-}
http://git-wip-us.apache.org/repos/asf/hive/blob/67b0a67c/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/bootstrap/events/filesystem/BootstrapEventsIterator.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/bootstrap/events/filesystem/BootstrapEventsIterator.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/bootstrap/events/filesystem/BootstrapEventsIterator.java
index 89d2ac2..ebe0090 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/bootstrap/events/filesystem/BootstrapEventsIterator.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/bootstrap/events/filesystem/BootstrapEventsIterator.java
@@ -82,6 +82,15 @@ public class BootstrapEventsIterator implements Iterator<BootstrapEvent> {
FileSystem fileSystem = path.getFileSystem(hiveConf);
FileStatus[] fileStatuses =
fileSystem.listStatus(new Path(dumpDirectory), EximUtil.getDirectoryFilter(fileSystem));
+ if ((fileStatuses == null) || (fileStatuses.length == 0)) {
+ throw new IllegalArgumentException("No data to load in path " + dumpDirectory);
+ }
+ if ((dbNameToLoadIn != null) && (fileStatuses.length > 1)) {
+ throw new IllegalArgumentException(
+ "Multiple dirs in "
+ + dumpDirectory
+ + " does not correspond to REPL LOAD expecting to load to a singular destination point.");
+ }
List<FileStatus> dbsToCreate = Arrays.stream(fileStatuses).filter(f -> {
Path metadataPath = new Path(f.getPath() + Path.SEPARATOR + EximUtil.METADATA_NAME);
http://git-wip-us.apache.org/repos/asf/hive/blob/67b0a67c/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/bootstrap/load/LoadConstraint.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/bootstrap/load/LoadConstraint.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/bootstrap/load/LoadConstraint.java
index 26f4892..d09b98c 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/bootstrap/load/LoadConstraint.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/bootstrap/load/LoadConstraint.java
@@ -35,6 +35,7 @@ import org.apache.hadoop.hive.ql.ErrorMsg;
import org.apache.hadoop.hive.ql.exec.Task;
import org.apache.hadoop.hive.ql.exec.repl.bootstrap.events.ConstraintEvent;
import org.apache.hadoop.hive.ql.exec.repl.bootstrap.load.util.Context;
+import org.apache.hadoop.hive.ql.exec.repl.util.TaskTracker;
import org.apache.hadoop.hive.ql.parse.EximUtil;
import org.apache.hadoop.hive.ql.parse.SemanticException;
import org.apache.hadoop.hive.ql.parse.repl.DumpType;
http://git-wip-us.apache.org/repos/asf/hive/blob/67b0a67c/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/bootstrap/load/LoadDatabase.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/bootstrap/load/LoadDatabase.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/bootstrap/load/LoadDatabase.java
index 0270d2a..054153c 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/bootstrap/load/LoadDatabase.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/bootstrap/load/LoadDatabase.java
@@ -24,6 +24,7 @@ import org.apache.hadoop.hive.ql.exec.Task;
import org.apache.hadoop.hive.ql.exec.TaskFactory;
import org.apache.hadoop.hive.ql.exec.repl.bootstrap.events.DatabaseEvent;
import org.apache.hadoop.hive.ql.exec.repl.bootstrap.load.util.Context;
+import org.apache.hadoop.hive.ql.exec.repl.util.TaskTracker;
import org.apache.hadoop.hive.ql.metadata.HiveException;
import org.apache.hadoop.hive.ql.parse.ReplicationSpec;
import org.apache.hadoop.hive.ql.parse.SemanticException;
@@ -31,8 +32,8 @@ import org.apache.hadoop.hive.ql.plan.AlterDatabaseDesc;
import org.apache.hadoop.hive.ql.plan.CreateDatabaseDesc;
import org.apache.hadoop.hive.ql.plan.DDLWork;
import org.apache.hadoop.hive.ql.plan.PrincipalDesc;
-import org.apache.hadoop.hive.ql.exec.repl.ReplUtils;
-import org.apache.hadoop.hive.ql.exec.repl.ReplUtils.ReplLoadOpType;
+import org.apache.hadoop.hive.ql.exec.repl.util.ReplUtils;
+import org.apache.hadoop.hive.ql.exec.repl.util.ReplUtils.ReplLoadOpType;
import java.io.Serializable;
import java.util.HashMap;
http://git-wip-us.apache.org/repos/asf/hive/blob/67b0a67c/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/bootstrap/load/LoadFunction.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/bootstrap/load/LoadFunction.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/bootstrap/load/LoadFunction.java
index b886ff4..a7c8ca4 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/bootstrap/load/LoadFunction.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/bootstrap/load/LoadFunction.java
@@ -26,9 +26,10 @@ import org.apache.hadoop.hive.ql.ErrorMsg;
import org.apache.hadoop.hive.ql.exec.Task;
import org.apache.hadoop.hive.ql.exec.TaskFactory;
import org.apache.hadoop.hive.ql.exec.repl.ReplStateLogWork;
-import org.apache.hadoop.hive.ql.exec.repl.bootstrap.AddDependencyToLeaves;
+import org.apache.hadoop.hive.ql.exec.repl.util.AddDependencyToLeaves;
import org.apache.hadoop.hive.ql.exec.repl.bootstrap.events.FunctionEvent;
import org.apache.hadoop.hive.ql.exec.repl.bootstrap.load.util.Context;
+import org.apache.hadoop.hive.ql.exec.repl.util.TaskTracker;
import org.apache.hadoop.hive.ql.exec.util.DAGTraversal;
import org.apache.hadoop.hive.ql.metadata.HiveException;
import org.apache.hadoop.hive.ql.parse.EximUtil;
http://git-wip-us.apache.org/repos/asf/hive/blob/67b0a67c/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/bootstrap/load/TaskTracker.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/bootstrap/load/TaskTracker.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/bootstrap/load/TaskTracker.java
deleted file mode 100644
index f8f0801..0000000
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/bootstrap/load/TaskTracker.java
+++ /dev/null
@@ -1,135 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.hadoop.hive.ql.exec.repl.bootstrap.load;
-
-import org.apache.hadoop.hive.ql.exec.Task;
-import org.apache.hadoop.hive.ql.exec.repl.bootstrap.AddDependencyToLeaves;
-import org.apache.hadoop.hive.ql.exec.util.DAGTraversal;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.io.Serializable;
-import java.util.ArrayList;
-import java.util.List;
-
-/**
- * This class will be responsible to track how many tasks have been created,
- * organization of tasks such that after the number of tasks for next execution are created
- * we create a dependency collection task(DCT) -> another bootstrap task,
- * and then add DCT as dependent to all existing tasks that are created so the cycle can continue.
- */
-public class TaskTracker {
- private static Logger LOG = LoggerFactory.getLogger(TaskTracker.class);
- /**
- * used to identify the list of tasks at root level for a given level like table / db / partition.
- * this does not include the task dependency notion of "table tasks < ---- partition task"
- */
- private final List<Task<? extends Serializable>> tasks = new ArrayList<>();
- private ReplicationState replicationState = null;
- // since tasks themselves can be graphs we want to limit the number of created
- // tasks including all of dependencies.
- private int numberOfTasks = 0;
- private final int maxTasksAllowed;
-
- public TaskTracker(int defaultMaxTasks) {
- maxTasksAllowed = defaultMaxTasks;
- }
-
- public TaskTracker(TaskTracker existing) {
- maxTasksAllowed = existing.maxTasksAllowed - existing.numberOfTasks;
- }
-
- /**
- * this method is used to identify all the tasks in a graph.
- * the graph however might get created in a disjoint fashion, in which case we can just update
- * the number of tasks using the "update" method.
- */
- public void addTask(Task<? extends Serializable> task) {
- tasks.add(task);
-
- List <Task<? extends Serializable>> visited = new ArrayList<>();
- updateTaskCount(task, visited);
- }
-
- // This method is used to traverse the DAG created in tasks list and add the dependent task to
- // the tail of each task chain.
- public void addDependentTask(Task<? extends Serializable> dependent) {
- if (tasks.isEmpty()) {
- addTask(dependent);
- } else {
- DAGTraversal.traverse(tasks, new AddDependencyToLeaves(dependent));
-
- List<Task<? extends Serializable>> visited = new ArrayList<>();
- updateTaskCount(dependent, visited);
- }
- }
-
- private void updateTaskCount(Task<? extends Serializable> task,
- List <Task<? extends Serializable>> visited) {
- numberOfTasks += 1;
- visited.add(task);
- if (task.getChildTasks() != null) {
- for (Task<? extends Serializable> childTask : task.getChildTasks()) {
- if (visited.contains(childTask)) {
- continue;
- }
- updateTaskCount(childTask, visited);
- }
- }
- }
-
- public boolean canAddMoreTasks() {
- return numberOfTasks < maxTasksAllowed;
- }
-
- public boolean hasTasks() {
- return numberOfTasks != 0;
- }
-
- public void update(TaskTracker withAnother) {
- numberOfTasks += withAnother.numberOfTasks;
- if (withAnother.hasReplicationState()) {
- this.replicationState = withAnother.replicationState;
- }
- }
-
- public void setReplicationState(ReplicationState state) {
- this.replicationState = state;
- }
-
- public boolean hasReplicationState() {
- return replicationState != null;
- }
-
- public ReplicationState replicationState() {
- return replicationState;
- }
-
- public List<Task<? extends Serializable>> tasks() {
- return tasks;
- }
-
- public void debugLog(String forEventType) {
- LOG.debug("{} event with total / root number of tasks:{}/{}", forEventType, numberOfTasks,
- tasks.size());
- }
-
- public int numberOfTasks() {
- return numberOfTasks;
- }
-}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/hive/blob/67b0a67c/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/bootstrap/load/table/LoadPartitions.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/bootstrap/load/table/LoadPartitions.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/bootstrap/load/table/LoadPartitions.java
index f6493f7..c0cfc43 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/bootstrap/load/table/LoadPartitions.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/bootstrap/load/table/LoadPartitions.java
@@ -27,11 +27,11 @@ import org.apache.hadoop.hive.ql.exec.ReplCopyTask;
import org.apache.hadoop.hive.ql.exec.Task;
import org.apache.hadoop.hive.ql.exec.TaskFactory;
import org.apache.hadoop.hive.ql.exec.Utilities;
-import org.apache.hadoop.hive.ql.exec.repl.ReplUtils;
-import org.apache.hadoop.hive.ql.exec.repl.ReplUtils.ReplLoadOpType;
+import org.apache.hadoop.hive.ql.exec.repl.util.ReplUtils;
+import org.apache.hadoop.hive.ql.exec.repl.util.ReplUtils.ReplLoadOpType;
import org.apache.hadoop.hive.ql.exec.repl.bootstrap.events.TableEvent;
import org.apache.hadoop.hive.ql.exec.repl.bootstrap.load.ReplicationState;
-import org.apache.hadoop.hive.ql.exec.repl.bootstrap.load.TaskTracker;
+import org.apache.hadoop.hive.ql.exec.repl.util.TaskTracker;
import org.apache.hadoop.hive.ql.exec.repl.bootstrap.load.util.Context;
import org.apache.hadoop.hive.ql.exec.repl.bootstrap.load.util.PathUtils;
import org.apache.hadoop.hive.ql.io.AcidUtils;
http://git-wip-us.apache.org/repos/asf/hive/blob/67b0a67c/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/bootstrap/load/table/LoadTable.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/bootstrap/load/table/LoadTable.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/bootstrap/load/table/LoadTable.java
index 419a511..089b529 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/bootstrap/load/table/LoadTable.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/bootstrap/load/table/LoadTable.java
@@ -27,10 +27,10 @@ import org.apache.hadoop.hive.ql.exec.ReplCopyTask;
import org.apache.hadoop.hive.ql.exec.Task;
import org.apache.hadoop.hive.ql.exec.TaskFactory;
import org.apache.hadoop.hive.ql.exec.Utilities;
-import org.apache.hadoop.hive.ql.exec.repl.ReplUtils;
-import org.apache.hadoop.hive.ql.exec.repl.ReplUtils.ReplLoadOpType;
+import org.apache.hadoop.hive.ql.exec.repl.util.ReplUtils;
+import org.apache.hadoop.hive.ql.exec.repl.util.ReplUtils.ReplLoadOpType;
import org.apache.hadoop.hive.ql.exec.repl.bootstrap.events.TableEvent;
-import org.apache.hadoop.hive.ql.exec.repl.bootstrap.load.TaskTracker;
+import org.apache.hadoop.hive.ql.exec.repl.util.TaskTracker;
import org.apache.hadoop.hive.ql.exec.repl.bootstrap.load.util.Context;
import org.apache.hadoop.hive.ql.exec.repl.bootstrap.load.util.PathUtils;
import org.apache.hadoop.hive.ql.io.AcidUtils;
http://git-wip-us.apache.org/repos/asf/hive/blob/67b0a67c/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/bootstrap/load/table/TableContext.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/bootstrap/load/table/TableContext.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/bootstrap/load/table/TableContext.java
index b5b5b90..8e01fb1 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/bootstrap/load/table/TableContext.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/bootstrap/load/table/TableContext.java
@@ -18,7 +18,7 @@
package org.apache.hadoop.hive.ql.exec.repl.bootstrap.load.table;
import org.apache.commons.lang.StringUtils;
-import org.apache.hadoop.hive.ql.exec.repl.bootstrap.load.TaskTracker;
+import org.apache.hadoop.hive.ql.exec.repl.util.TaskTracker;
import org.apache.hadoop.hive.ql.parse.SemanticException;
import org.apache.hadoop.hive.ql.plan.ImportTableDesc;