You are viewing a plain text version of this content. The canonical link for it is here.
Posted to gitbox@hive.apache.org by GitBox <gi...@apache.org> on 2021/06/03 10:20:40 UTC

[GitHub] [hive] marton-bod opened a new pull request #2347: Store commit info and ctas info in QueryState

marton-bod opened a new pull request #2347:
URL: https://github.com/apache/hive/pull/2347


   ### What changes were proposed in this pull request?
   
   
   ### Why are the changes needed?
   
   
   ### Does this PR introduce _any_ user-facing change?
   
   
   ### How was this patch tested?
   
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: gitbox-unsubscribe@hive.apache.org
For additional commands, e-mail: gitbox-help@hive.apache.org


[GitHub] [hive] marton-bod commented on pull request #2347: HIVE-25195: Store Iceberg write commit and ctas information in QueryState

Posted by GitBox <gi...@apache.org>.
marton-bod commented on pull request #2347:
URL: https://github.com/apache/hive/pull/2347#issuecomment-855660993


   @pvary @lcspinter Let me know if you have more comments. Thanks!


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: gitbox-unsubscribe@hive.apache.org
For additional commands, e-mail: gitbox-help@hive.apache.org


[GitHub] [hive] pvary commented on a change in pull request #2347: HIVE-25195: Store Iceberg write commit and ctas information in QueryState

Posted by GitBox <gi...@apache.org>.
pvary commented on a change in pull request #2347:
URL: https://github.com/apache/hive/pull/2347#discussion_r644908593



##########
File path: ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezTask.java
##########
@@ -384,24 +382,28 @@ private void collectCommitInformation(TezWork work) throws IOException, TezExcep
               // get all target tables this vertex wrote to
               List<String> tables = new ArrayList<>();
               for (Map.Entry<String, String> entry : jobConf) {

Review comment:
       Do we have a faster solution for this? The `jobConf` could be very-very big




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: gitbox-unsubscribe@hive.apache.org
For additional commands, e-mail: gitbox-help@hive.apache.org


[GitHub] [hive] marton-bod commented on pull request #2347: HIVE-25195: Store Iceberg write commit and ctas information in QueryState

Posted by GitBox <gi...@apache.org>.
marton-bod commented on pull request #2347:
URL: https://github.com/apache/hive/pull/2347#issuecomment-853950016


   @pvary @lcspinter @szlta Can you please review this? Thank you!


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: gitbox-unsubscribe@hive.apache.org
For additional commands, e-mail: gitbox-help@hive.apache.org


[GitHub] [hive] marton-bod commented on a change in pull request #2347: HIVE-25195: Store Iceberg write commit and ctas information in QueryState

Posted by GitBox <gi...@apache.org>.
marton-bod commented on a change in pull request #2347:
URL: https://github.com/apache/hive/pull/2347#discussion_r645459374



##########
File path: ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezTask.java
##########
@@ -384,24 +382,28 @@ private void collectCommitInformation(TezWork work) throws IOException, TezExcep
               // get all target tables this vertex wrote to
               List<String> tables = new ArrayList<>();
               for (Map.Entry<String, String> entry : jobConf) {
-                if (entry.getKey().startsWith("iceberg.mr.serialized.table.")) {
-                  tables.add(entry.getKey().substring("iceberg.mr.serialized.table.".length()));
+                if (entry.getKey().startsWith(ICEBERG_SERIALIZED_TABLE_PREFIX)) {
+                  tables.add(entry.getKey().substring(ICEBERG_SERIALIZED_TABLE_PREFIX.length()));
                 }
               }
-              // save information for each target table (jobID, task num, query state)
+              // find iceberg props in jobConf as they can be needed, but not available, during job commit
+              Map<String, String> icebergProperties = new HashMap<>();
+              jobConf.forEach(e -> {
+                // don't copy the serialized tables, they're not needed anymore and take up lots of space
+                if (e.getKey().startsWith("iceberg.mr.") && !e.getKey().startsWith(ICEBERG_SERIALIZED_TABLE_PREFIX)) {
+                  icebergProperties.put(e.getKey(), e.getValue());
+                }
+              });
+              // save information for each target table (jobID, task num)
               for (String table : tables) {
-                sessionConf.set(HIVE_TEZ_COMMIT_JOB_ID_PREFIX + table, jobIdStr);
-                sessionConf.setInt(HIVE_TEZ_COMMIT_TASK_COUNT_PREFIX + table,
-                    status.getProgress().getSucceededTaskCount());
+                SessionStateUtil.newCommitInfo(jobConf, table)

Review comment:
       So what we want to do here is somehow store several pieces information that belong together, related to commits.
   We have a few options:
   1. Have a single util method with a long list of parameters, e.g. `SessionStateUtil.addCommitInfo(conf, tableName, jobID, taskNum, additionalProps)`. The `CommitInfo` object would be constructed then in the Util, and retrieved on the client side in the MetaHook/JobCommitter.
   2. Get a `CommitInfo` object from the Util, populate it and save/cache it with a fluent API (as in the PR)
   2. Get a `CommitInfo` object from the Util, populate it but use a Util method to actually save it, so make it a 3-step process e.g.  `1. CommitInfo info = Util.newCommit(); 2. // populate info fields ; 3. Util.addCommitInfo(info);`
   
   I don't have a strong preference for either. I chose the second option because I don't favour long parameter lists which are unnamed, and I like fluent APIs :)  But I'm open to option 1 and 3 as well. With only this many parameters, option 1 seems alright. What is your take on this?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: gitbox-unsubscribe@hive.apache.org
For additional commands, e-mail: gitbox-help@hive.apache.org


[GitHub] [hive] marton-bod commented on a change in pull request #2347: HIVE-25195: Store Iceberg write commit and ctas information in QueryState

Posted by GitBox <gi...@apache.org>.
marton-bod commented on a change in pull request #2347:
URL: https://github.com/apache/hive/pull/2347#discussion_r646327446



##########
File path: ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezTask.java
##########
@@ -384,24 +382,28 @@ private void collectCommitInformation(TezWork work) throws IOException, TezExcep
               // get all target tables this vertex wrote to
               List<String> tables = new ArrayList<>();
               for (Map.Entry<String, String> entry : jobConf) {

Review comment:
       Merged the two loops.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: gitbox-unsubscribe@hive.apache.org
For additional commands, e-mail: gitbox-help@hive.apache.org


[GitHub] [hive] pvary commented on a change in pull request #2347: HIVE-25195: Store Iceberg write commit and ctas information in QueryState

Posted by GitBox <gi...@apache.org>.
pvary commented on a change in pull request #2347:
URL: https://github.com/apache/hive/pull/2347#discussion_r644915386



##########
File path: ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezTask.java
##########
@@ -384,24 +382,28 @@ private void collectCommitInformation(TezWork work) throws IOException, TezExcep
               // get all target tables this vertex wrote to
               List<String> tables = new ArrayList<>();
               for (Map.Entry<String, String> entry : jobConf) {
-                if (entry.getKey().startsWith("iceberg.mr.serialized.table.")) {
-                  tables.add(entry.getKey().substring("iceberg.mr.serialized.table.".length()));
+                if (entry.getKey().startsWith(ICEBERG_SERIALIZED_TABLE_PREFIX)) {
+                  tables.add(entry.getKey().substring(ICEBERG_SERIALIZED_TABLE_PREFIX.length()));
                 }
               }
-              // save information for each target table (jobID, task num, query state)
+              // find iceberg props in jobConf as they can be needed, but not available, during job commit
+              Map<String, String> icebergProperties = new HashMap<>();
+              jobConf.forEach(e -> {
+                // don't copy the serialized tables, they're not needed anymore and take up lots of space
+                if (e.getKey().startsWith("iceberg.mr.") && !e.getKey().startsWith(ICEBERG_SERIALIZED_TABLE_PREFIX)) {
+                  icebergProperties.put(e.getKey(), e.getValue());
+                }
+              });
+              // save information for each target table (jobID, task num)
               for (String table : tables) {
-                sessionConf.set(HIVE_TEZ_COMMIT_JOB_ID_PREFIX + table, jobIdStr);
-                sessionConf.setInt(HIVE_TEZ_COMMIT_TASK_COUNT_PREFIX + table,
-                    status.getProgress().getSucceededTaskCount());
+                SessionStateUtil.newCommitInfo(jobConf, table)

Review comment:
       This is a little bit odd to me.
   I mean I understand what did you do, but it still feels strange. Convince me 😄 




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: gitbox-unsubscribe@hive.apache.org
For additional commands, e-mail: gitbox-help@hive.apache.org


[GitHub] [hive] pvary commented on a change in pull request #2347: HIVE-25195: Store Iceberg write commit and ctas information in QueryState

Posted by GitBox <gi...@apache.org>.
pvary commented on a change in pull request #2347:
URL: https://github.com/apache/hive/pull/2347#discussion_r644910919



##########
File path: ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezTask.java
##########
@@ -384,24 +382,28 @@ private void collectCommitInformation(TezWork work) throws IOException, TezExcep
               // get all target tables this vertex wrote to
               List<String> tables = new ArrayList<>();
               for (Map.Entry<String, String> entry : jobConf) {
-                if (entry.getKey().startsWith("iceberg.mr.serialized.table.")) {
-                  tables.add(entry.getKey().substring("iceberg.mr.serialized.table.".length()));
+                if (entry.getKey().startsWith(ICEBERG_SERIALIZED_TABLE_PREFIX)) {
+                  tables.add(entry.getKey().substring(ICEBERG_SERIALIZED_TABLE_PREFIX.length()));
                 }
               }
-              // save information for each target table (jobID, task num, query state)
+              // find iceberg props in jobConf as they can be needed, but not available, during job commit
+              Map<String, String> icebergProperties = new HashMap<>();
+              jobConf.forEach(e -> {

Review comment:
       WE iterated through the jobConf a few lines ago. Might worth to consider to merge the loops, if all else fails




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: gitbox-unsubscribe@hive.apache.org
For additional commands, e-mail: gitbox-help@hive.apache.org


[GitHub] [hive] pvary commented on a change in pull request #2347: HIVE-25195: Store Iceberg write commit and ctas information in QueryState

Posted by GitBox <gi...@apache.org>.
pvary commented on a change in pull request #2347:
URL: https://github.com/apache/hive/pull/2347#discussion_r646354849



##########
File path: ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezTask.java
##########
@@ -381,27 +379,23 @@ private void collectCommitInformation(TezWork work) throws IOException, TezExcep
             if (child.isDirectory() && child.getPath().getName().contains(jobIdPrefix)) {
               // folder name pattern is queryID-jobID, we're removing the queryID part to get the jobID
               String jobIdStr = child.getPath().getName().substring(jobConf.get("hive.query.id").length() + 1);
-              // get all target tables this vertex wrote to
+
               List<String> tables = new ArrayList<>();
+              Map<String, String> icebergProperties = new HashMap<>();
               for (Map.Entry<String, String> entry : jobConf) {
-                if (entry.getKey().startsWith("iceberg.mr.serialized.table.")) {
-                  tables.add(entry.getKey().substring("iceberg.mr.serialized.table.".length()));
+                if (entry.getKey().startsWith(ICEBERG_SERIALIZED_TABLE_PREFIX)) {
+                  // get all target tables this vertex wrote to
+                  tables.add(entry.getKey().substring(ICEBERG_SERIALIZED_TABLE_PREFIX.length()));
+                } else if (entry.getKey().startsWith("iceberg.mr.")) {
+                  // find iceberg props in jobConf as they can be needed, but not available, during job commit
+                  icebergProperties.put(entry.getKey(), entry.getValue());
                 }
               }
-              // save information for each target table (jobID, task num, query state)
-              for (String table : tables) {
-                sessionConf.set(HIVE_TEZ_COMMIT_JOB_ID_PREFIX + table, jobIdStr);
-                sessionConf.setInt(HIVE_TEZ_COMMIT_TASK_COUNT_PREFIX + table,
-                    status.getProgress().getSucceededTaskCount());
-              }
+              // save information for each target table

Review comment:
       nit: newline after block




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: gitbox-unsubscribe@hive.apache.org
For additional commands, e-mail: gitbox-help@hive.apache.org


[GitHub] [hive] marton-bod commented on a change in pull request #2347: HIVE-25195: Store Iceberg write commit and ctas information in QueryState

Posted by GitBox <gi...@apache.org>.
marton-bod commented on a change in pull request #2347:
URL: https://github.com/apache/hive/pull/2347#discussion_r645354876



##########
File path: iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/hive/HiveIcebergSerDe.java
##########
@@ -183,10 +183,9 @@ private void createTableForCTAS(Configuration configuration, Properties serDePro
         serDeProperties.get(Catalogs.NAME), tableSchema, serDeProperties.get(InputFormatConfig.PARTITION_SPEC));
     Catalogs.createTable(configuration, serDeProperties);
 
-    // set these in the global conf so that we can rollback the table in the lifecycle hook in case of failures
-    String queryId = configuration.get(HiveConf.ConfVars.HIVEQUERYID.varname);
-    configuration.set(String.format(InputFormatConfig.IS_CTAS_QUERY_TEMPLATE, queryId), "true");
-    configuration.set(String.format(InputFormatConfig.CTAS_TABLE_NAME_TEMPLATE, queryId),
+    // set these in the query state so that we can rollback the table in the lifecycle hook in case of failures
+    SessionStateUtil.addResource(configuration, InputFormatConfig.IS_CTAS_QUERY, "true");

Review comment:
       Yes, true, CTAS_TABLE_NAME should be enough




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: gitbox-unsubscribe@hive.apache.org
For additional commands, e-mail: gitbox-help@hive.apache.org


[GitHub] [hive] pvary commented on a change in pull request #2347: HIVE-25195: Store Iceberg write commit and ctas information in QueryState

Posted by GitBox <gi...@apache.org>.
pvary commented on a change in pull request #2347:
URL: https://github.com/apache/hive/pull/2347#discussion_r646355105



##########
File path: iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/hive/HiveIcebergOutputCommitter.java
##########
@@ -191,13 +191,16 @@ public void commitJob(JobContext originalContext) throws IOException {
           .stopOnFailure()
           .executeWith(tableExecutor)
           .run(output -> {
-            Table table = HiveIcebergStorageHandler.table(jobConf, output);
+            Table table = SessionStateUtil.getResource(jobConf, output)

Review comment:
       nit: Maybe:
   ```
   Object res = SessionStateUtil.getResource(jobConf, output);
   Table table = res instanceof Table ? (Table) res : HiveIcebergStorageHandler.table(jobConf, output);
   ```




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: gitbox-unsubscribe@hive.apache.org
For additional commands, e-mail: gitbox-help@hive.apache.org


[GitHub] [hive] marton-bod commented on a change in pull request #2347: HIVE-25195: Store Iceberg write commit and ctas information in QueryState

Posted by GitBox <gi...@apache.org>.
marton-bod commented on a change in pull request #2347:
URL: https://github.com/apache/hive/pull/2347#discussion_r645354564



##########
File path: iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/hive/HiveIcebergMetaHook.java
##########
@@ -459,35 +454,35 @@ public void rollbackInsertTable(org.apache.hadoop.hive.metastore.api.Table table
       throws MetaException {
     String tableName = TableIdentifier.of(table.getDbName(), table.getTableName()).toString();
     JobContext jobContext = getJobContextForCommitOrAbort(tableName, overwrite);
-    OutputCommitter committer = new HiveIcebergOutputCommitter();
-    try {
-      LOG.info("rollbackInsertTable: Aborting job for jobID: {} and table: {}", jobContext.getJobID(), tableName);
-      committer.abortJob(jobContext, JobStatus.State.FAILED);
-    } catch (IOException e) {
-      LOG.error("Error while trying to abort failed job. There might be uncleaned data files.", e);
-      // no throwing here because the original commitInsertTable exception should be propagated
-    } finally {
-      // avoid config pollution with prefixed/suffixed keys
-      cleanCommitConfig(tableName);
+    if (jobContext != null) {
+      OutputCommitter committer = new HiveIcebergOutputCommitter();
+      try {
+        LOG.info("rollbackInsertTable: Aborting job for jobID: {} and table: {}", jobContext.getJobID(), tableName);
+        committer.abortJob(jobContext, JobStatus.State.FAILED);
+      } catch (IOException e) {
+        LOG.error("Error while trying to abort failed job. There might be uncleaned data files.", e);
+        // no throwing here because the original commitInsertTable exception should be propagated
+      }
     }
   }
 
-  private void cleanCommitConfig(String tableName) {
-    conf.unset(TezTask.HIVE_TEZ_COMMIT_JOB_ID_PREFIX + tableName);
-    conf.unset(TezTask.HIVE_TEZ_COMMIT_TASK_COUNT_PREFIX + tableName);
-    conf.unset(InputFormatConfig.SERIALIZED_TABLE_PREFIX + tableName);
-    conf.unset(InputFormatConfig.OUTPUT_TABLES);
-  }
-
   private JobContext getJobContextForCommitOrAbort(String tableName, boolean overwrite) {

Review comment:
       Done

##########
File path: iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/hive/HiveIcebergSerDe.java
##########
@@ -183,10 +183,9 @@ private void createTableForCTAS(Configuration configuration, Properties serDePro
         serDeProperties.get(Catalogs.NAME), tableSchema, serDeProperties.get(InputFormatConfig.PARTITION_SPEC));
     Catalogs.createTable(configuration, serDeProperties);
 
-    // set these in the global conf so that we can rollback the table in the lifecycle hook in case of failures
-    String queryId = configuration.get(HiveConf.ConfVars.HIVEQUERYID.varname);
-    configuration.set(String.format(InputFormatConfig.IS_CTAS_QUERY_TEMPLATE, queryId), "true");
-    configuration.set(String.format(InputFormatConfig.CTAS_TABLE_NAME_TEMPLATE, queryId),
+    // set these in the query state so that we can rollback the table in the lifecycle hook in case of failures
+    SessionStateUtil.addResource(configuration, InputFormatConfig.IS_CTAS_QUERY, "true");

Review comment:
       Yes, true, CTAS_TABLE_NAME should be enough

##########
File path: ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezTask.java
##########
@@ -384,24 +382,28 @@ private void collectCommitInformation(TezWork work) throws IOException, TezExcep
               // get all target tables this vertex wrote to
               List<String> tables = new ArrayList<>();
               for (Map.Entry<String, String> entry : jobConf) {

Review comment:
       Good question. Don't think there's a faster way if we want to keep this generic and work for all existing and future iceberg properties. I did find a convenience method though which can do this for us without a loop:
   `public Map<String, String> getPropsWithPrefix(String confPrefix)`, but it also loops through the keys internally.
   
   If we are concerned about performance, we can find out which exact properties are needed to be propagated, hardcode those and simply pass only those few without the looping.

##########
File path: ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezTask.java
##########
@@ -384,24 +382,28 @@ private void collectCommitInformation(TezWork work) throws IOException, TezExcep
               // get all target tables this vertex wrote to
               List<String> tables = new ArrayList<>();
               for (Map.Entry<String, String> entry : jobConf) {
-                if (entry.getKey().startsWith("iceberg.mr.serialized.table.")) {
-                  tables.add(entry.getKey().substring("iceberg.mr.serialized.table.".length()));
+                if (entry.getKey().startsWith(ICEBERG_SERIALIZED_TABLE_PREFIX)) {
+                  tables.add(entry.getKey().substring(ICEBERG_SERIALIZED_TABLE_PREFIX.length()));
                 }
               }
-              // save information for each target table (jobID, task num, query state)
+              // find iceberg props in jobConf as they can be needed, but not available, during job commit
+              Map<String, String> icebergProperties = new HashMap<>();
+              jobConf.forEach(e -> {

Review comment:
       You're right, will definitely merge them




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: gitbox-unsubscribe@hive.apache.org
For additional commands, e-mail: gitbox-help@hive.apache.org


[GitHub] [hive] pvary commented on a change in pull request #2347: HIVE-25195: Store Iceberg write commit and ctas information in QueryState

Posted by GitBox <gi...@apache.org>.
pvary commented on a change in pull request #2347:
URL: https://github.com/apache/hive/pull/2347#discussion_r644897608



##########
File path: iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/hive/HiveIcebergMetaHook.java
##########
@@ -437,19 +438,13 @@ public void commitInsertTable(org.apache.hadoop.hive.metastore.api.Table table,
       throws MetaException {
     String tableName = TableIdentifier.of(table.getDbName(), table.getTableName()).toString();
     JobContext jobContext = getJobContextForCommitOrAbort(tableName, overwrite);
-    boolean failure = false;
-    try {
-      OutputCommitter committer = new HiveIcebergOutputCommitter();
-      committer.commitJob(jobContext);
-    } catch (Exception e) {
-      failure = true;
-      LOG.error("Error while trying to commit job", e);
-      throw new MetaException(StringUtils.stringifyException(e));
-    } finally {
-      // if there's a failure, the configs will still be needed in rollbackInsertTable
-      if (!failure) {
-        // avoid config pollution with prefixed/suffixed keys
-        cleanCommitConfig(tableName);
+    if (jobContext != null) {

Review comment:
       Is this a bugfix?
   Do we expect null here?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: gitbox-unsubscribe@hive.apache.org
For additional commands, e-mail: gitbox-help@hive.apache.org


[GitHub] [hive] pvary commented on a change in pull request #2347: HIVE-25195: Store Iceberg write commit and ctas information in QueryState

Posted by GitBox <gi...@apache.org>.
pvary commented on a change in pull request #2347:
URL: https://github.com/apache/hive/pull/2347#discussion_r644899593



##########
File path: iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/hive/HiveIcebergMetaHook.java
##########
@@ -459,35 +454,35 @@ public void rollbackInsertTable(org.apache.hadoop.hive.metastore.api.Table table
       throws MetaException {
     String tableName = TableIdentifier.of(table.getDbName(), table.getTableName()).toString();
     JobContext jobContext = getJobContextForCommitOrAbort(tableName, overwrite);
-    OutputCommitter committer = new HiveIcebergOutputCommitter();
-    try {
-      LOG.info("rollbackInsertTable: Aborting job for jobID: {} and table: {}", jobContext.getJobID(), tableName);
-      committer.abortJob(jobContext, JobStatus.State.FAILED);
-    } catch (IOException e) {
-      LOG.error("Error while trying to abort failed job. There might be uncleaned data files.", e);
-      // no throwing here because the original commitInsertTable exception should be propagated
-    } finally {
-      // avoid config pollution with prefixed/suffixed keys
-      cleanCommitConfig(tableName);
+    if (jobContext != null) {
+      OutputCommitter committer = new HiveIcebergOutputCommitter();
+      try {
+        LOG.info("rollbackInsertTable: Aborting job for jobID: {} and table: {}", jobContext.getJobID(), tableName);
+        committer.abortJob(jobContext, JobStatus.State.FAILED);
+      } catch (IOException e) {
+        LOG.error("Error while trying to abort failed job. There might be uncleaned data files.", e);
+        // no throwing here because the original commitInsertTable exception should be propagated
+      }
     }
   }
 
-  private void cleanCommitConfig(String tableName) {
-    conf.unset(TezTask.HIVE_TEZ_COMMIT_JOB_ID_PREFIX + tableName);
-    conf.unset(TezTask.HIVE_TEZ_COMMIT_TASK_COUNT_PREFIX + tableName);
-    conf.unset(InputFormatConfig.SERIALIZED_TABLE_PREFIX + tableName);
-    conf.unset(InputFormatConfig.OUTPUT_TABLES);
-  }
-
   private JobContext getJobContextForCommitOrAbort(String tableName, boolean overwrite) {

Review comment:
       Maybe Optional?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: gitbox-unsubscribe@hive.apache.org
For additional commands, e-mail: gitbox-help@hive.apache.org


[GitHub] [hive] marton-bod commented on a change in pull request #2347: HIVE-25195: Store Iceberg write commit and ctas information in QueryState

Posted by GitBox <gi...@apache.org>.
marton-bod commented on a change in pull request #2347:
URL: https://github.com/apache/hive/pull/2347#discussion_r645354564



##########
File path: iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/hive/HiveIcebergMetaHook.java
##########
@@ -459,35 +454,35 @@ public void rollbackInsertTable(org.apache.hadoop.hive.metastore.api.Table table
       throws MetaException {
     String tableName = TableIdentifier.of(table.getDbName(), table.getTableName()).toString();
     JobContext jobContext = getJobContextForCommitOrAbort(tableName, overwrite);
-    OutputCommitter committer = new HiveIcebergOutputCommitter();
-    try {
-      LOG.info("rollbackInsertTable: Aborting job for jobID: {} and table: {}", jobContext.getJobID(), tableName);
-      committer.abortJob(jobContext, JobStatus.State.FAILED);
-    } catch (IOException e) {
-      LOG.error("Error while trying to abort failed job. There might be uncleaned data files.", e);
-      // no throwing here because the original commitInsertTable exception should be propagated
-    } finally {
-      // avoid config pollution with prefixed/suffixed keys
-      cleanCommitConfig(tableName);
+    if (jobContext != null) {
+      OutputCommitter committer = new HiveIcebergOutputCommitter();
+      try {
+        LOG.info("rollbackInsertTable: Aborting job for jobID: {} and table: {}", jobContext.getJobID(), tableName);
+        committer.abortJob(jobContext, JobStatus.State.FAILED);
+      } catch (IOException e) {
+        LOG.error("Error while trying to abort failed job. There might be uncleaned data files.", e);
+        // no throwing here because the original commitInsertTable exception should be propagated
+      }
     }
   }
 
-  private void cleanCommitConfig(String tableName) {
-    conf.unset(TezTask.HIVE_TEZ_COMMIT_JOB_ID_PREFIX + tableName);
-    conf.unset(TezTask.HIVE_TEZ_COMMIT_TASK_COUNT_PREFIX + tableName);
-    conf.unset(InputFormatConfig.SERIALIZED_TABLE_PREFIX + tableName);
-    conf.unset(InputFormatConfig.OUTPUT_TABLES);
-  }
-
   private JobContext getJobContextForCommitOrAbort(String tableName, boolean overwrite) {

Review comment:
       Done




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: gitbox-unsubscribe@hive.apache.org
For additional commands, e-mail: gitbox-help@hive.apache.org


[GitHub] [hive] marton-bod commented on pull request #2347: HIVE-25195: Store Iceberg write commit and ctas information in QueryState

Posted by GitBox <gi...@apache.org>.
marton-bod commented on pull request #2347:
URL: https://github.com/apache/hive/pull/2347#issuecomment-853950016


   @pvary @lcspinter @szlta Can you please review this? Thank you!


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: gitbox-unsubscribe@hive.apache.org
For additional commands, e-mail: gitbox-help@hive.apache.org


[GitHub] [hive] pvary commented on a change in pull request #2347: HIVE-25195: Store Iceberg write commit and ctas information in QueryState

Posted by GitBox <gi...@apache.org>.
pvary commented on a change in pull request #2347:
URL: https://github.com/apache/hive/pull/2347#discussion_r644897608



##########
File path: iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/hive/HiveIcebergMetaHook.java
##########
@@ -437,19 +438,13 @@ public void commitInsertTable(org.apache.hadoop.hive.metastore.api.Table table,
       throws MetaException {
     String tableName = TableIdentifier.of(table.getDbName(), table.getTableName()).toString();
     JobContext jobContext = getJobContextForCommitOrAbort(tableName, overwrite);
-    boolean failure = false;
-    try {
-      OutputCommitter committer = new HiveIcebergOutputCommitter();
-      committer.commitJob(jobContext);
-    } catch (Exception e) {
-      failure = true;
-      LOG.error("Error while trying to commit job", e);
-      throw new MetaException(StringUtils.stringifyException(e));
-    } finally {
-      // if there's a failure, the configs will still be needed in rollbackInsertTable
-      if (!failure) {
-        // avoid config pollution with prefixed/suffixed keys
-        cleanCommitConfig(tableName);
+    if (jobContext != null) {

Review comment:
       Is this a bugfix?
   Do we expect null here?

##########
File path: iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/hive/HiveIcebergMetaHook.java
##########
@@ -437,19 +438,13 @@ public void commitInsertTable(org.apache.hadoop.hive.metastore.api.Table table,
       throws MetaException {
     String tableName = TableIdentifier.of(table.getDbName(), table.getTableName()).toString();
     JobContext jobContext = getJobContextForCommitOrAbort(tableName, overwrite);
-    boolean failure = false;
-    try {
-      OutputCommitter committer = new HiveIcebergOutputCommitter();
-      committer.commitJob(jobContext);
-    } catch (Exception e) {
-      failure = true;
-      LOG.error("Error while trying to commit job", e);
-      throw new MetaException(StringUtils.stringifyException(e));
-    } finally {
-      // if there's a failure, the configs will still be needed in rollbackInsertTable
-      if (!failure) {
-        // avoid config pollution with prefixed/suffixed keys
-        cleanCommitConfig(tableName);
+    if (jobContext != null) {

Review comment:
       Oh... I found the reason behind it 😄 

##########
File path: iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/hive/HiveIcebergMetaHook.java
##########
@@ -459,35 +454,35 @@ public void rollbackInsertTable(org.apache.hadoop.hive.metastore.api.Table table
       throws MetaException {
     String tableName = TableIdentifier.of(table.getDbName(), table.getTableName()).toString();
     JobContext jobContext = getJobContextForCommitOrAbort(tableName, overwrite);
-    OutputCommitter committer = new HiveIcebergOutputCommitter();
-    try {
-      LOG.info("rollbackInsertTable: Aborting job for jobID: {} and table: {}", jobContext.getJobID(), tableName);
-      committer.abortJob(jobContext, JobStatus.State.FAILED);
-    } catch (IOException e) {
-      LOG.error("Error while trying to abort failed job. There might be uncleaned data files.", e);
-      // no throwing here because the original commitInsertTable exception should be propagated
-    } finally {
-      // avoid config pollution with prefixed/suffixed keys
-      cleanCommitConfig(tableName);
+    if (jobContext != null) {
+      OutputCommitter committer = new HiveIcebergOutputCommitter();
+      try {
+        LOG.info("rollbackInsertTable: Aborting job for jobID: {} and table: {}", jobContext.getJobID(), tableName);
+        committer.abortJob(jobContext, JobStatus.State.FAILED);
+      } catch (IOException e) {
+        LOG.error("Error while trying to abort failed job. There might be uncleaned data files.", e);
+        // no throwing here because the original commitInsertTable exception should be propagated
+      }
     }
   }
 
-  private void cleanCommitConfig(String tableName) {
-    conf.unset(TezTask.HIVE_TEZ_COMMIT_JOB_ID_PREFIX + tableName);
-    conf.unset(TezTask.HIVE_TEZ_COMMIT_TASK_COUNT_PREFIX + tableName);
-    conf.unset(InputFormatConfig.SERIALIZED_TABLE_PREFIX + tableName);
-    conf.unset(InputFormatConfig.OUTPUT_TABLES);
-  }
-
   private JobContext getJobContextForCommitOrAbort(String tableName, boolean overwrite) {

Review comment:
       Maybe Optional?

##########
File path: iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/hive/HiveIcebergSerDe.java
##########
@@ -183,10 +183,9 @@ private void createTableForCTAS(Configuration configuration, Properties serDePro
         serDeProperties.get(Catalogs.NAME), tableSchema, serDeProperties.get(InputFormatConfig.PARTITION_SPEC));
     Catalogs.createTable(configuration, serDeProperties);
 
-    // set these in the global conf so that we can rollback the table in the lifecycle hook in case of failures
-    String queryId = configuration.get(HiveConf.ConfVars.HIVEQUERYID.varname);
-    configuration.set(String.format(InputFormatConfig.IS_CTAS_QUERY_TEMPLATE, queryId), "true");
-    configuration.set(String.format(InputFormatConfig.CTAS_TABLE_NAME_TEMPLATE, queryId),
+    // set these in the query state so that we can rollback the table in the lifecycle hook in case of failures
+    SessionStateUtil.addResource(configuration, InputFormatConfig.IS_CTAS_QUERY, "true");

Review comment:
       Do we need both?
   Maybe if we have `CTAS_TABLE_NAME` then `IS_CTAS_QUERY` is `true`

##########
File path: ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezTask.java
##########
@@ -384,24 +382,28 @@ private void collectCommitInformation(TezWork work) throws IOException, TezExcep
               // get all target tables this vertex wrote to
               List<String> tables = new ArrayList<>();
               for (Map.Entry<String, String> entry : jobConf) {

Review comment:
       Do we have a faster solution for this? The `jobConf` could be very-very big

##########
File path: ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezTask.java
##########
@@ -384,24 +382,28 @@ private void collectCommitInformation(TezWork work) throws IOException, TezExcep
               // get all target tables this vertex wrote to
               List<String> tables = new ArrayList<>();
               for (Map.Entry<String, String> entry : jobConf) {
-                if (entry.getKey().startsWith("iceberg.mr.serialized.table.")) {
-                  tables.add(entry.getKey().substring("iceberg.mr.serialized.table.".length()));
+                if (entry.getKey().startsWith(ICEBERG_SERIALIZED_TABLE_PREFIX)) {
+                  tables.add(entry.getKey().substring(ICEBERG_SERIALIZED_TABLE_PREFIX.length()));
                 }
               }
-              // save information for each target table (jobID, task num, query state)
+              // find iceberg props in jobConf as they can be needed, but not available, during job commit
+              Map<String, String> icebergProperties = new HashMap<>();
+              jobConf.forEach(e -> {

Review comment:
       WE iterated through the jobConf a few lines ago. Might worth to consider to merge the loops, if all else fails

##########
File path: ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezTask.java
##########
@@ -384,24 +382,28 @@ private void collectCommitInformation(TezWork work) throws IOException, TezExcep
               // get all target tables this vertex wrote to
               List<String> tables = new ArrayList<>();
               for (Map.Entry<String, String> entry : jobConf) {
-                if (entry.getKey().startsWith("iceberg.mr.serialized.table.")) {
-                  tables.add(entry.getKey().substring("iceberg.mr.serialized.table.".length()));
+                if (entry.getKey().startsWith(ICEBERG_SERIALIZED_TABLE_PREFIX)) {
+                  tables.add(entry.getKey().substring(ICEBERG_SERIALIZED_TABLE_PREFIX.length()));
                 }
               }
-              // save information for each target table (jobID, task num, query state)
+              // find iceberg props in jobConf as they can be needed, but not available, during job commit
+              Map<String, String> icebergProperties = new HashMap<>();
+              jobConf.forEach(e -> {

Review comment:
       WE iterated through the jobConf a few lines ago. Might worth to consider to merge the loops, if everything else fails

##########
File path: ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezTask.java
##########
@@ -384,24 +382,28 @@ private void collectCommitInformation(TezWork work) throws IOException, TezExcep
               // get all target tables this vertex wrote to
               List<String> tables = new ArrayList<>();
               for (Map.Entry<String, String> entry : jobConf) {
-                if (entry.getKey().startsWith("iceberg.mr.serialized.table.")) {
-                  tables.add(entry.getKey().substring("iceberg.mr.serialized.table.".length()));
+                if (entry.getKey().startsWith(ICEBERG_SERIALIZED_TABLE_PREFIX)) {
+                  tables.add(entry.getKey().substring(ICEBERG_SERIALIZED_TABLE_PREFIX.length()));
                 }
               }
-              // save information for each target table (jobID, task num, query state)
+              // find iceberg props in jobConf as they can be needed, but not available, during job commit
+              Map<String, String> icebergProperties = new HashMap<>();
+              jobConf.forEach(e -> {
+                // don't copy the serialized tables, they're not needed anymore and take up lots of space
+                if (e.getKey().startsWith("iceberg.mr.") && !e.getKey().startsWith(ICEBERG_SERIALIZED_TABLE_PREFIX)) {
+                  icebergProperties.put(e.getKey(), e.getValue());
+                }
+              });
+              // save information for each target table (jobID, task num)
               for (String table : tables) {
-                sessionConf.set(HIVE_TEZ_COMMIT_JOB_ID_PREFIX + table, jobIdStr);
-                sessionConf.setInt(HIVE_TEZ_COMMIT_TASK_COUNT_PREFIX + table,
-                    status.getProgress().getSucceededTaskCount());
+                SessionStateUtil.newCommitInfo(jobConf, table)

Review comment:
       This is a little bit odd to me.
   I mean I understand what did you do, but it still feels strange. Convince me 😄 




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: gitbox-unsubscribe@hive.apache.org
For additional commands, e-mail: gitbox-help@hive.apache.org


[GitHub] [hive] pvary commented on a change in pull request #2347: HIVE-25195: Store Iceberg write commit and ctas information in QueryState

Posted by GitBox <gi...@apache.org>.
pvary commented on a change in pull request #2347:
URL: https://github.com/apache/hive/pull/2347#discussion_r644899221



##########
File path: iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/hive/HiveIcebergMetaHook.java
##########
@@ -437,19 +438,13 @@ public void commitInsertTable(org.apache.hadoop.hive.metastore.api.Table table,
       throws MetaException {
     String tableName = TableIdentifier.of(table.getDbName(), table.getTableName()).toString();
     JobContext jobContext = getJobContextForCommitOrAbort(tableName, overwrite);
-    boolean failure = false;
-    try {
-      OutputCommitter committer = new HiveIcebergOutputCommitter();
-      committer.commitJob(jobContext);
-    } catch (Exception e) {
-      failure = true;
-      LOG.error("Error while trying to commit job", e);
-      throw new MetaException(StringUtils.stringifyException(e));
-    } finally {
-      // if there's a failure, the configs will still be needed in rollbackInsertTable
-      if (!failure) {
-        // avoid config pollution with prefixed/suffixed keys
-        cleanCommitConfig(tableName);
+    if (jobContext != null) {

Review comment:
       Oh... I found the reason behind it 😄 




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: gitbox-unsubscribe@hive.apache.org
For additional commands, e-mail: gitbox-help@hive.apache.org


[GitHub] [hive] marton-bod commented on a change in pull request #2347: HIVE-25195: Store Iceberg write commit and ctas information in QueryState

Posted by GitBox <gi...@apache.org>.
marton-bod commented on a change in pull request #2347:
URL: https://github.com/apache/hive/pull/2347#discussion_r646605093



##########
File path: ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezTask.java
##########
@@ -381,27 +379,23 @@ private void collectCommitInformation(TezWork work) throws IOException, TezExcep
             if (child.isDirectory() && child.getPath().getName().contains(jobIdPrefix)) {
               // folder name pattern is queryID-jobID, we're removing the queryID part to get the jobID
               String jobIdStr = child.getPath().getName().substring(jobConf.get("hive.query.id").length() + 1);
-              // get all target tables this vertex wrote to
+
               List<String> tables = new ArrayList<>();
+              Map<String, String> icebergProperties = new HashMap<>();
               for (Map.Entry<String, String> entry : jobConf) {
-                if (entry.getKey().startsWith("iceberg.mr.serialized.table.")) {
-                  tables.add(entry.getKey().substring("iceberg.mr.serialized.table.".length()));
+                if (entry.getKey().startsWith(ICEBERG_SERIALIZED_TABLE_PREFIX)) {
+                  // get all target tables this vertex wrote to
+                  tables.add(entry.getKey().substring(ICEBERG_SERIALIZED_TABLE_PREFIX.length()));
+                } else if (entry.getKey().startsWith("iceberg.mr.")) {
+                  // find iceberg props in jobConf as they can be needed, but not available, during job commit
+                  icebergProperties.put(entry.getKey(), entry.getValue());
                 }
               }
-              // save information for each target table (jobID, task num, query state)
-              for (String table : tables) {
-                sessionConf.set(HIVE_TEZ_COMMIT_JOB_ID_PREFIX + table, jobIdStr);
-                sessionConf.setInt(HIVE_TEZ_COMMIT_TASK_COUNT_PREFIX + table,
-                    status.getProgress().getSucceededTaskCount());
-              }
+              // save information for each target table

Review comment:
       Done




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: gitbox-unsubscribe@hive.apache.org
For additional commands, e-mail: gitbox-help@hive.apache.org


[GitHub] [hive] lcspinter merged pull request #2347: HIVE-25195: Store Iceberg write commit and ctas information in QueryState

Posted by GitBox <gi...@apache.org>.
lcspinter merged pull request #2347:
URL: https://github.com/apache/hive/pull/2347


   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: gitbox-unsubscribe@hive.apache.org
For additional commands, e-mail: gitbox-help@hive.apache.org


[GitHub] [hive] marton-bod commented on a change in pull request #2347: HIVE-25195: Store Iceberg write commit and ctas information in QueryState

Posted by GitBox <gi...@apache.org>.
marton-bod commented on a change in pull request #2347:
URL: https://github.com/apache/hive/pull/2347#discussion_r646327621



##########
File path: ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezTask.java
##########
@@ -384,24 +382,28 @@ private void collectCommitInformation(TezWork work) throws IOException, TezExcep
               // get all target tables this vertex wrote to
               List<String> tables = new ArrayList<>();
               for (Map.Entry<String, String> entry : jobConf) {
-                if (entry.getKey().startsWith("iceberg.mr.serialized.table.")) {
-                  tables.add(entry.getKey().substring("iceberg.mr.serialized.table.".length()));
+                if (entry.getKey().startsWith(ICEBERG_SERIALIZED_TABLE_PREFIX)) {
+                  tables.add(entry.getKey().substring(ICEBERG_SERIALIZED_TABLE_PREFIX.length()));
                 }
               }
-              // save information for each target table (jobID, task num, query state)
+              // find iceberg props in jobConf as they can be needed, but not available, during job commit
+              Map<String, String> icebergProperties = new HashMap<>();
+              jobConf.forEach(e -> {
+                // don't copy the serialized tables, they're not needed anymore and take up lots of space
+                if (e.getKey().startsWith("iceberg.mr.") && !e.getKey().startsWith(ICEBERG_SERIALIZED_TABLE_PREFIX)) {
+                  icebergProperties.put(e.getKey(), e.getValue());
+                }
+              });
+              // save information for each target table (jobID, task num)
               for (String table : tables) {
-                sessionConf.set(HIVE_TEZ_COMMIT_JOB_ID_PREFIX + table, jobIdStr);
-                sessionConf.setInt(HIVE_TEZ_COMMIT_TASK_COUNT_PREFIX + table,
-                    status.getProgress().getSucceededTaskCount());
+                SessionStateUtil.newCommitInfo(jobConf, table)

Review comment:
       Decided to go with option 1, it looks cleaner.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: gitbox-unsubscribe@hive.apache.org
For additional commands, e-mail: gitbox-help@hive.apache.org


[GitHub] [hive] pvary commented on a change in pull request #2347: HIVE-25195: Store Iceberg write commit and ctas information in QueryState

Posted by GitBox <gi...@apache.org>.
pvary commented on a change in pull request #2347:
URL: https://github.com/apache/hive/pull/2347#discussion_r644910919



##########
File path: ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezTask.java
##########
@@ -384,24 +382,28 @@ private void collectCommitInformation(TezWork work) throws IOException, TezExcep
               // get all target tables this vertex wrote to
               List<String> tables = new ArrayList<>();
               for (Map.Entry<String, String> entry : jobConf) {
-                if (entry.getKey().startsWith("iceberg.mr.serialized.table.")) {
-                  tables.add(entry.getKey().substring("iceberg.mr.serialized.table.".length()));
+                if (entry.getKey().startsWith(ICEBERG_SERIALIZED_TABLE_PREFIX)) {
+                  tables.add(entry.getKey().substring(ICEBERG_SERIALIZED_TABLE_PREFIX.length()));
                 }
               }
-              // save information for each target table (jobID, task num, query state)
+              // find iceberg props in jobConf as they can be needed, but not available, during job commit
+              Map<String, String> icebergProperties = new HashMap<>();
+              jobConf.forEach(e -> {

Review comment:
       WE iterated through the jobConf a few lines ago. Might worth to consider to merge the loops, if everything else fails




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: gitbox-unsubscribe@hive.apache.org
For additional commands, e-mail: gitbox-help@hive.apache.org


[GitHub] [hive] pvary commented on a change in pull request #2347: HIVE-25195: Store Iceberg write commit and ctas information in QueryState

Posted by GitBox <gi...@apache.org>.
pvary commented on a change in pull request #2347:
URL: https://github.com/apache/hive/pull/2347#discussion_r644907549



##########
File path: iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/hive/HiveIcebergSerDe.java
##########
@@ -183,10 +183,9 @@ private void createTableForCTAS(Configuration configuration, Properties serDePro
         serDeProperties.get(Catalogs.NAME), tableSchema, serDeProperties.get(InputFormatConfig.PARTITION_SPEC));
     Catalogs.createTable(configuration, serDeProperties);
 
-    // set these in the global conf so that we can rollback the table in the lifecycle hook in case of failures
-    String queryId = configuration.get(HiveConf.ConfVars.HIVEQUERYID.varname);
-    configuration.set(String.format(InputFormatConfig.IS_CTAS_QUERY_TEMPLATE, queryId), "true");
-    configuration.set(String.format(InputFormatConfig.CTAS_TABLE_NAME_TEMPLATE, queryId),
+    // set these in the query state so that we can rollback the table in the lifecycle hook in case of failures
+    SessionStateUtil.addResource(configuration, InputFormatConfig.IS_CTAS_QUERY, "true");

Review comment:
       Do we need both?
   Maybe if we have `CTAS_TABLE_NAME` then `IS_CTAS_QUERY` is `true`




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: gitbox-unsubscribe@hive.apache.org
For additional commands, e-mail: gitbox-help@hive.apache.org