You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@iceberg.apache.org by GitBox <gi...@apache.org> on 2021/03/19 08:50:42 UTC

[GitHub] [iceberg] pvary commented on a change in pull request #2228: Hive: Implement multi-table inserts

pvary commented on a change in pull request #2228:
URL: https://github.com/apache/iceberg/pull/2228#discussion_r597500201



##########
File path: mr/src/main/java/org/apache/iceberg/mr/hive/HiveIcebergOutputCommitter.java
##########
@@ -135,29 +153,31 @@ public void abortTask(TaskAttemptContext originalContext) throws IOException {
   @Override
   public void commitJob(JobContext originalContext) throws IOException {
     JobContext jobContext = TezUtil.enrichContextWithVertexId(originalContext);
-
-    JobConf conf = jobContext.getJobConf();
-    Table table = Catalogs.loadTable(conf);
+    JobConf jobConf = jobContext.getJobConf();
 
     long startTime = System.currentTimeMillis();
-    LOG.info("Committing job has started for table: {}, using location: {}", table,
-        generateJobLocation(conf, jobContext.getJobID()));
+    LOG.info("Committing job {} has started", jobContext.getJobID());
 
-    FileIO io = HiveIcebergStorageHandler.io(jobContext.getJobConf());
-    List<DataFile> dataFiles = dataFiles(jobContext, io, true);
+    Map<String, String> outputs = SerializationUtil.deserializeFromBase64(jobConf.get(InputFormatConfig.OUTPUT_TABLES));
 
-    if (dataFiles.size() > 0) {
-      // Appending data files to the table
-      AppendFiles append = table.newAppend();
-      dataFiles.forEach(append::appendFile);
-      append.commit();
-      LOG.info("Commit took {} ms for table: {} with {} file(s)", System.currentTimeMillis() - startTime, table,
-          dataFiles.size());
-      LOG.debug("Added files {}", dataFiles);
-    } else {
-      LOG.info("Commit took {} ms for table: {} with no new files", System.currentTimeMillis() - startTime, table);
+    ExecutorService fileExecutor = fileExecutor(jobConf);
+    ExecutorService tableExecutor = tableExecutor(jobConf, outputs.size());
+    try {
+      // Commits the changes for the output tables in parallel
+      Tasks.foreach(outputs.entrySet())
+          .throwFailureWhenFinished()
+          .stopOnFailure()
+          .executeWith(tableExecutor)
+          .run(entry -> commitTable(fileExecutor, jobContext, entry.getKey(), entry.getValue()));

Review comment:
       Sadly with the current Hive API we can not have atomicity.
   
   Currently here is how Hive works:
   1. Create a staging directory for the query inside the table root directories
   2. Create files inside these staging directories, and write data there - Mappers/Reducers
   3. With a MoveTask move the contents of the staging directory to the final place
   
   SerDe provides a way to change the step 2., but we do not have access to the other steps.
   
   On HDFS this works fairly well, since the hdfs move is fast / atomic (still no guarantees for multi table inserts), but on S3 it is not "optimal". Hive solves this for the ACID tables by making the changes available for reads when the transaction is committed. When #1849 is available we have to do some kind of 2 way commit, so the Hive ACID transaction is committed together with the Iceberg transaction. Not yet sure how it will look like. Maybe shared locking? Maybe adding two way commit to the API? We will find out when we get there.
   
   Until the problem is solved correctly this could be a somewhat limping solution for these queries to execute offering the similar guarantees than the current multitable inserts above S3 using Hive.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org