You are viewing a plain text version of this content. The canonical link for it is here.
Posted to gitbox@hive.apache.org by GitBox <gi...@apache.org> on 2021/03/30 20:11:05 UTC

[GitHub] [hive] pkumarsinha commented on a change in pull request #2083: HIVE-24895. Add a DataCopyEnd stage in ReplStateLogTask for external table replication.

pkumarsinha commented on a change in pull request #2083:
URL: https://github.com/apache/hive/pull/2083#discussion_r604340386



##########
File path: ql/src/java/org/apache/hadoop/hive/ql/exec/repl/DirCopyTask.java
##########
@@ -182,6 +183,13 @@ public int execute() {
       Exception ex = new SecurityException(ErrorMsg.REPL_RETRY_EXHAUSTED.format(e.getMessage()), e);
       setException(ex);
       return ReplUtils.handleException(true, ex, work.getDumpDirectory(), work.getMetricCollector(), getName(), conf);
+    } finally {
+      String jobId = conf.get("distcp.job.id", "UNAVAILABLE");

Review comment:
       Can we  define constants for these - k,v

##########
File path: ql/src/java/org/apache/hadoop/hive/ql/exec/repl/DirCopyTask.java
##########
@@ -182,6 +183,13 @@ public int execute() {
       Exception ex = new SecurityException(ErrorMsg.REPL_RETRY_EXHAUSTED.format(e.getMessage()), e);
       setException(ex);
       return ReplUtils.handleException(true, ex, work.getDumpDirectory(), work.getMetricCollector(), getName(), conf);
+    } finally {
+      String jobId = conf.get("distcp.job.id", "UNAVAILABLE");
+      LOG.info("Completed DirCopyTask for source: {} to  target: {}. Took {}"
+              + ". DistCp JobId {}", work.getFullyQualifiedSourcePath(),
+          work.getFullyQualifiedTargetPath(), ReplUtils
+              .convertToHumanReadableTime(

Review comment:
       nit: could you please format the code

##########
File path: ql/src/java/org/apache/hadoop/hive/ql/exec/repl/ReplDumpTask.java
##########
@@ -269,12 +270,20 @@ private Path getCurrentDumpPath(Path dumpRoot, boolean isBootstrap) throws IOExc
     }
   }
 
-  private void initiateDataCopyTasks() throws SemanticException, IOException {
+  private void initiateDataCopyTasks(ReplLogger replLogger) throws SemanticException, IOException {
     TaskTracker taskTracker = new TaskTracker(conf.getIntVar(HiveConf.ConfVars.REPL_APPROX_MAX_LOAD_TASKS));
     if (childTasks == null) {
       childTasks = new ArrayList<>();
     }
-    childTasks.addAll(work.externalTableCopyTasks(taskTracker, conf));
+    List<Task<?>> externalTableCopyTasks =

Review comment:
       nit: Please format the code. lines can accommodate in their respective line length limit itself

##########
File path: ql/src/java/org/apache/hadoop/hive/ql/exec/repl/ReplLoadTask.java
##########
@@ -376,7 +379,14 @@ private void addLazyDataCopyTask(TaskTracker loadTaskTracker) throws IOException
       if (childTasks == null) {
         childTasks = new ArrayList<>();
       }
-      childTasks.addAll(work.externalTableCopyTasks(loadTaskTracker, conf));
+      List<Task<?>> externalTableCopyTasks =
+          work.externalTableCopyTasks(loadTaskTracker, conf);

Review comment:
       nit: please format

##########
File path: shims/0.23/src/main/java/org/apache/hadoop/hive/shims/Hadoop23Shims.java
##########
@@ -1184,6 +1186,14 @@ public boolean runDistCp(List<Path> srcPaths, Path dst, Configuration conf) thro
     } catch (Exception e) {
       throw new IOException("Cannot execute DistCp process: " + e, e);
     } finally {
+      // Set the job id from distCp conf to the callers configuration.
+      if (distcp != null) {
+        String jobId = distcp.getConf().get(CONF_LABEL_DISTCP_JOB_ID);
+        if (jobId != null) {

Review comment:
       When would job id be null? 

##########
File path: ql/src/java/org/apache/hadoop/hive/ql/exec/repl/ReplStateLogWork.java
##########
@@ -55,7 +56,8 @@
     TABLE,
     FUNCTION,
     EVENT,
-    END
+    END,
+    DATACOPYEND

Review comment:
       Also, why not to add DATA_COPY_START also?

##########
File path: ql/src/java/org/apache/hadoop/hive/ql/exec/repl/DirCopyTask.java
##########
@@ -182,6 +183,13 @@ public int execute() {
       Exception ex = new SecurityException(ErrorMsg.REPL_RETRY_EXHAUSTED.format(e.getMessage()), e);
       setException(ex);
       return ReplUtils.handleException(true, ex, work.getDumpDirectory(), work.getMetricCollector(), getName(), conf);
+    } finally {
+      String jobId = conf.get("distcp.job.id", "UNAVAILABLE");
+      LOG.info("Completed DirCopyTask for source: {} to  target: {}. Took {}"

Review comment:
       Wouldn't this always be printed. Even in case of failure. 

##########
File path: ql/src/java/org/apache/hadoop/hive/ql/exec/repl/ReplDumpTask.java
##########
@@ -120,6 +119,7 @@
 import static org.apache.hadoop.hive.metastore.ReplChangeManager.getReplPolicyIdString;
 import static org.apache.hadoop.hive.ql.exec.repl.ReplAck.LOAD_ACKNOWLEDGEMENT;
 import static org.apache.hadoop.hive.ql.exec.repl.util.ReplUtils.RANGER_AUTHORIZER;
+import static org.apache.hadoop.hive.ql.exec.repl.util.ReplUtils.addLoggerTask;

Review comment:
       nit: It's not very readable if you use a static method import. Why not to import ReplUtils and use the method with that classification.

##########
File path: ql/src/java/org/apache/hadoop/hive/ql/exec/repl/ReplDumpTask.java
##########
@@ -158,7 +158,8 @@ public int execute() {
     try {
       SecurityUtils.reloginExpiringKeytabUser();
       if (work.dataCopyIteratorsInitialized()) {
-        initiateDataCopyTasks();
+        initiateDataCopyTasks(
+            replLogger == null ? work.getReplLogger() : replLogger);

Review comment:
       Wouldn't it always be work.getReplLogger()

##########
File path: ql/src/java/org/apache/hadoop/hive/ql/parse/repl/load/log/BootstrapLoadLogger.java
##########
@@ -68,4 +68,13 @@ public void endLog(String lastReplId) {
     (new BootstrapLoadEnd(dbName, numTables, numFunctions, dumpDir, lastReplId))
             .log(LogTag.END);
   }
+
+  @Override
+  public void setParams(String dbName, String dumpDirectory, long numTables,
+      long numFunctions) {

Review comment:
       nit:accommodate in same line

##########
File path: ql/src/java/org/apache/hadoop/hive/ql/exec/repl/ReplDumpTask.java
##########
@@ -589,6 +598,7 @@ private Long incrementalDump(Path dumpRoot, DumpMetaData dmd, Path cmRoot, Hive
     try {
       replLogger = new IncrementalDumpLogger(dbName, dumpRoot.toString(), estimatedNumEvents,
         work.eventFrom, work.eventTo, maxEventLimit);
+      work.setReplLogger(replLogger);

Review comment:
       If all the times we are setting Repl Logger, we don't need ReplDumpTask instance level replLogger, no?

##########
File path: ql/src/java/org/apache/hadoop/hive/ql/exec/repl/ReplStateLogWork.java
##########
@@ -55,7 +56,8 @@
     TABLE,
     FUNCTION,
     EVENT,
-    END
+    END,
+    DATACOPYEND

Review comment:
       DATA_COPY_END?

##########
File path: ql/src/java/org/apache/hadoop/hive/ql/exec/repl/ReplLoadTask.java
##########
@@ -85,6 +85,7 @@
 import static org.apache.hadoop.hive.ql.exec.repl.bootstrap.load.LoadDatabase.AlterDatabase;
 import static org.apache.hadoop.hive.ql.exec.repl.ReplAck.LOAD_ACKNOWLEDGEMENT;
 import static org.apache.hadoop.hive.ql.exec.repl.util.ReplUtils.RANGER_AUTHORIZER;
+import static org.apache.hadoop.hive.ql.exec.repl.util.ReplUtils.addLoggerTask;

Review comment:
       nit: same as above

##########
File path: ql/src/java/org/apache/hadoop/hive/ql/exec/repl/util/ReplUtils.java
##########
@@ -471,4 +473,31 @@ public static String getDistCpCustomName(HiveConf conf) {
     }
     return userChosenName;
   }
+
+  /**
+   * Convert to a human time of minutes:seconds.millis.
+   * @param time time to humanize.
+   * @return a printable value.
+   */
+  public static String convertToHumanReadableTime(long time) {
+    long seconds = (time / 1000);
+    long minutes = (seconds / 60);
+    return String.format("%d:%02d.%03ds", minutes, seconds % 60, time % 1000);
+  }
+
+  /**
+   * Adds a logger task at the end of the tasks passed.
+   */
+  public static void addLoggerTask(ReplLogger replLogger, List<Task<?>> tasks,
+      HiveConf conf) {
+    ReplStateLogWork replStateLogWork =
+        new ReplStateLogWork(replLogger, System.currentTimeMillis());

Review comment:
       What is the intention of System.currentTimeMillis() here? If it is to capture the start time of all the external table data copy dir task, then isn't it incorrect?

##########
File path: ql/src/java/org/apache/hadoop/hive/ql/exec/repl/ReplDumpTask.java
##########
@@ -269,12 +270,20 @@ private Path getCurrentDumpPath(Path dumpRoot, boolean isBootstrap) throws IOExc
     }
   }
 
-  private void initiateDataCopyTasks() throws SemanticException, IOException {
+  private void initiateDataCopyTasks(ReplLogger replLogger) throws SemanticException, IOException {
     TaskTracker taskTracker = new TaskTracker(conf.getIntVar(HiveConf.ConfVars.REPL_APPROX_MAX_LOAD_TASKS));
     if (childTasks == null) {
       childTasks = new ArrayList<>();
     }
-    childTasks.addAll(work.externalTableCopyTasks(taskTracker, conf));
+    List<Task<?>> externalTableCopyTasks =
+        work.externalTableCopyTasks(taskTracker, conf);
+    childTasks.addAll(externalTableCopyTasks);
+    // If external table data copy tasks are present add a task to mark the
+    // end of data copy
+    if (!externalTableCopyTasks.isEmpty() && !work

Review comment:
       When this logTask is not added and hence no log line, how will user differentiate between no external table data copy task to copy data vs the tasks not completed yet case? Can we have some log statement for the first case?

##########
File path: shims/0.23/src/main/java/org/apache/hadoop/hive/shims/Hadoop23Shims.java
##########
@@ -1184,6 +1186,14 @@ public boolean runDistCp(List<Path> srcPaths, Path dst, Configuration conf) thro
     } catch (Exception e) {
       throw new IOException("Cannot execute DistCp process: " + e, e);
     } finally {
+      // Set the job id from distCp conf to the callers configuration.
+      if (distcp != null) {
+        String jobId = distcp.getConf().get(CONF_LABEL_DISTCP_JOB_ID);
+        if (jobId != null) {
+          conf.set(CONF_LABEL_DISTCP_JOB_ID,

Review comment:
       nit: pls format

##########
File path: ql/src/java/org/apache/hadoop/hive/ql/exec/repl/DirCopyTask.java
##########
@@ -182,6 +183,13 @@ public int execute() {
       Exception ex = new SecurityException(ErrorMsg.REPL_RETRY_EXHAUSTED.format(e.getMessage()), e);
       setException(ex);
       return ReplUtils.handleException(true, ex, work.getDumpDirectory(), work.getMetricCollector(), getName(), conf);
+    } finally {
+      String jobId = conf.get("distcp.job.id", "UNAVAILABLE");
+      LOG.info("Completed DirCopyTask for source: {} to  target: {}. Took {}"

Review comment:
       Is the idea to capture the time only for distCp or the DirTask itself?

##########
File path: ql/src/java/org/apache/hadoop/hive/ql/parse/repl/load/log/state/DataCopyEnd.java
##########
@@ -0,0 +1,34 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hive.ql.parse.repl.load.log.state;
+
+import org.apache.hadoop.hive.ql.exec.repl.util.ReplUtils;
+import org.apache.hadoop.hive.ql.parse.repl.ReplState;
+import org.codehaus.jackson.annotate.JsonProperty;
+
+public class DataCopyEnd extends ReplState {
+
+  @JsonProperty String time;
+
+  public DataCopyEnd(long startTime) {
+    long duration = System.currentTimeMillis() - startTime;
+    this.time =
+        "Total time taken for all external table copy tasks: " + ReplUtils

Review comment:
       Can we externalise the message such that the same DataCopyEnd can be used for Managed table data copy as well in future ( applicable in bootstrap case )




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: gitbox-unsubscribe@hive.apache.org
For additional commands, e-mail: gitbox-help@hive.apache.org