You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hive.apache.org by sa...@apache.org on 2018/07/19 10:57:57 UTC

hive git commit: HIVE-20120: Incremental repl load DAG generation is causing OOM error (Mahesh Kumar Behera, reviewed by Sankar Hariappan)

Repository: hive
Updated Branches:
  refs/heads/master 2cfe57faa -> c17888647


HIVE-20120: Incremental repl load DAG generation is causing OOM error (Mahesh Kumar Behera, reviewed by Sankar Hariappan)


Project: http://git-wip-us.apache.org/repos/asf/hive/repo
Commit: http://git-wip-us.apache.org/repos/asf/hive/commit/c1788864
Tree: http://git-wip-us.apache.org/repos/asf/hive/tree/c1788864
Diff: http://git-wip-us.apache.org/repos/asf/hive/diff/c1788864

Branch: refs/heads/master
Commit: c17888647582a35b2c2fe2f8806eb1e099059b3f
Parents: 2cfe57f
Author: Sankar Hariappan <sa...@apache.org>
Authored: Thu Jul 19 16:27:27 2018 +0530
Committer: Sankar Hariappan <sa...@apache.org>
Committed: Thu Jul 19 16:27:27 2018 +0530

----------------------------------------------------------------------
 .../org/apache/hadoop/hive/ql/exec/repl/ReplLoadTask.java |  6 +-----
 .../repl/incremental/IncrementalLoadTasksBuilder.java     | 10 +++++++---
 2 files changed, 8 insertions(+), 8 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hive/blob/c1788864/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/ReplLoadTask.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/ReplLoadTask.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/ReplLoadTask.java
index 76ba975..6b5fc54 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/ReplLoadTask.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/ReplLoadTask.java
@@ -329,11 +329,7 @@ public class ReplLoadTask extends Task<ReplLoadWork> implements Serializable {
   private int executeIncrementalLoad(DriverContext driverContext) {
     try {
       IncrementalLoadTasksBuilder load = work.getIncrementalLoadTaskBuilder();
-      this.childTasks = Collections.singletonList(load.build(driverContext, getHive(), LOG));
-      if (work.getIncrementalIterator().hasNext()) {
-        // attach a load task at the tail of task list to start the next iteration.
-        createBuilderTask(this.childTasks);
-      }
+      this.childTasks = Collections.singletonList(load.build(driverContext, getHive(), LOG, work));
       return 0;
     } catch (Exception e) {
       LOG.error("failed replication", e);

http://git-wip-us.apache.org/repos/asf/hive/blob/c1788864/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/incremental/IncrementalLoadTasksBuilder.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/incremental/IncrementalLoadTasksBuilder.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/incremental/IncrementalLoadTasksBuilder.java
index 5e113c1..2aefb57 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/incremental/IncrementalLoadTasksBuilder.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/incremental/IncrementalLoadTasksBuilder.java
@@ -25,6 +25,7 @@ import org.apache.hadoop.hive.metastore.api.Database;
 import org.apache.hadoop.hive.ql.DriverContext;
 import org.apache.hadoop.hive.ql.exec.Task;
 import org.apache.hadoop.hive.ql.exec.TaskFactory;
+import org.apache.hadoop.hive.ql.exec.repl.ReplLoadWork;
 import org.apache.hadoop.hive.ql.exec.repl.ReplStateLogWork;
 import org.apache.hadoop.hive.ql.exec.repl.util.AddDependencyToLeaves;
 import org.apache.hadoop.hive.ql.exec.repl.util.TaskTracker;
@@ -84,7 +85,8 @@ public class IncrementalLoadTasksBuilder {
     replLogger.startLog();
   }
 
-  public Task<? extends Serializable> build(DriverContext driverContext, Hive hive, Logger log) throws Exception {
+  public Task<? extends Serializable> build(DriverContext driverContext, Hive hive, Logger log,
+                                            ReplLoadWork loadWork) throws Exception {
     Task<? extends Serializable> evTaskRoot = TaskFactory.get(new DependencyCollectionWork());
     Task<? extends Serializable> taskChainTail = evTaskRoot;
     Long lastReplayedEvent = null;
@@ -145,8 +147,10 @@ public class IncrementalLoadTasksBuilder {
       lastReplayedEvent = eventDmd.getEventTo();
     }
 
-    // If any event is there and db name is known, then dump the start and end logs
-    if (!evTaskRoot.equals(taskChainTail) && !iterator.hasNext()) {
+    if (iterator.hasNext()) {
+      // add load task to start the next iteration
+      taskChainTail.addDependentTask(TaskFactory.get(loadWork, conf));
+    } else {
       Map<String, String> dbProps = new HashMap<>();
       dbProps.put(ReplicationSpec.KEY.CURR_STATE_ID.toString(), String.valueOf(lastReplayedEvent));
       ReplStateLogWork replStateLogWork = new ReplStateLogWork(replLogger, dbProps);