You are viewing a plain text version of this content. The canonical link for it is here.
Posted to gitbox@hive.apache.org by GitBox <gi...@apache.org> on 2021/06/07 12:09:59 UTC

[GitHub] [hive] maheshk114 opened a new pull request #2357: Hive 25205 : Reduce overhead of partition column stat updation during batch loading of partitions.

maheshk114 opened a new pull request #2357:
URL: https://github.com/apache/hive/pull/2357


   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: gitbox-unsubscribe@hive.apache.org
For additional commands, e-mail: gitbox-help@hive.apache.org


[GitHub] [hive] pkumarsinha commented on a change in pull request #2357: Hive 25205 : Reduce overhead of adding write notification log during batch loading of partition.

Posted by GitBox <gi...@apache.org>.
pkumarsinha commented on a change in pull request #2357:
URL: https://github.com/apache/hive/pull/2357#discussion_r679197638



##########
File path: ql/src/java/org/apache/hadoop/hive/ql/metadata/Hive.java
##########
@@ -3101,10 +3102,25 @@ private void constructOneLBLocationMap(FileStatus fSta,
       // For acid table, add the acid_write event with file list at the time of load itself. But
       // it should be done after partition is created.
 
+      List<WriteNotificationLogRequest> requestList = new ArrayList<>();
       for (Entry<Path, PartitionDetails> entry : partitionDetailsMap.entrySet()) {
         PartitionDetails partitionDetails = entry.getValue();
         if (isTxnTable && partitionDetails.newFiles != null) {
-          addWriteNotificationLog(tbl, partitionDetails.fullSpec, partitionDetails.newFiles, writeId);
+          addWriteNotificationLog(tbl, partitionDetails.fullSpec, partitionDetails.newFiles,
+                  writeId, requestList);
+        }
+      }
+      if (requestList.size() != 0) {
+        WriteNotificationLogBatchRequest rqst = new WriteNotificationLogBatchRequest(tbl.getCatName(), tbl.getDbName(),
+                tbl.getTableName(), requestList);
+        try {
+          get(conf).getSynchronizedMSC().addWriteNotificationLogInBatch(rqst);
+        } catch (TException e) {
+          // For older HMS, if the batch API is not supported, fall back to older API.
+          LOG.info("addWriteNotificationLogInBatch failed with ", e);

Review comment:
       Testing once for call being idempotent in case of failure should be helpful as every exception, like say for example, SQL Exception, it might be treated as incompatibility in HS2 and HMS versions.

##########
File path: hcatalog/server-extensions/src/main/java/org/apache/hive/hcatalog/listener/DbNotificationListener.java
##########
@@ -1104,97 +1138,160 @@ private long getNextEventId(Connection con, SQLGenerator sqlGenerator, long numV
     return nextSequenceValue.get();
   }
 
-  private void addWriteNotificationLog(NotificationEvent event, AcidWriteEvent acidWriteEvent, Connection dbConn,
-                                 SQLGenerator sqlGenerator, AcidWriteMessage msg) throws MetaException, SQLException {
-    LOG.debug("DbNotificationListener: adding write notification log for : {}", event.getMessage());
+  private void addWriteNotificationLog(List<NotificationEvent> eventBatch, List<AcidWriteEvent> acidWriteEventList,
+                                       Connection dbConn, SQLGenerator sqlGenerator, List<AcidWriteMessage> msgBatch)
+          throws MetaException, SQLException {
+    LOG.debug("DbNotificationListener: adding write notification log for : {}", eventBatch);
     assert ((dbConn != null) && (sqlGenerator != null));
 
-    Statement stmt = null;
-    PreparedStatement pst = null;
-    ResultSet rs = null;
-    String dbName = acidWriteEvent.getDatabase();
-    String tblName = acidWriteEvent.getTable();
-    String partition = acidWriteEvent.getPartition();
-    String tableObj = msg.getTableObjStr();
-    String partitionObj = msg.getPartitionObjStr();
-    String files = ReplChangeManager.joinWithSeparator(msg.getFiles());
+    int numRows;
+    long maxRows = MetastoreConf.getIntVar(conf, ConfVars.JDBC_MAX_BATCH_SIZE);
 
-    try {
-      stmt = dbConn.createStatement();
+    try (Statement stmt = dbConn.createStatement()) {
       String st = sqlGenerator.getDbProduct().getPrepareTxnStmt();
       if (st != null) {
         stmt.execute(st);
       }
+    } catch (Exception e) {
+      LOG.error("Failed to execute query ", e);
+      throw new MetaException(e.getMessage());
+    }
 
-      String s = sqlGenerator.addForUpdateClause("select \"WNL_FILES\", \"WNL_ID\" from" +
-                      " \"TXN_WRITE_NOTIFICATION_LOG\" " +
-                      "where \"WNL_DATABASE\" = ? " +
-                      "and \"WNL_TABLE\" = ? " +  " and \"WNL_PARTITION\" = ? " +
-                      "and \"WNL_TXNID\" = " + Long.toString(acidWriteEvent.getTxnId()));
-      List<String> params = Arrays.asList(dbName, tblName, partition);
-      pst = sqlGenerator.prepareStmtWithParameters(dbConn, s, params);
-      LOG.debug("Going to execute query <" + s.replaceAll("\\?", "{}") + ">",
-              quoteString(dbName), quoteString(tblName), quoteString(partition));
-      rs = pst.executeQuery();
-      if (!rs.next()) {
-        // if rs is empty then no lock is taken and thus it can not cause deadlock.
-        long nextNLId = getNextNLId(dbConn, sqlGenerator,
-                "org.apache.hadoop.hive.metastore.model.MTxnWriteNotificationLog", 1L);
-        s = "insert into \"TXN_WRITE_NOTIFICATION_LOG\" " +
-                "(\"WNL_ID\", \"WNL_TXNID\", \"WNL_WRITEID\", \"WNL_DATABASE\", \"WNL_TABLE\", " +
-                "\"WNL_PARTITION\", \"WNL_TABLE_OBJ\", \"WNL_PARTITION_OBJ\", " +
-                "\"WNL_FILES\", \"WNL_EVENT_TIME\") VALUES (?,?,?,?,?,?,?,?,?,?)";
-        closeStmt(pst);
-        int currentTime = now();
-        pst = dbConn.prepareStatement(sqlGenerator.addEscapeCharacters(s));
-        pst.setLong(1, nextNLId);
-        pst.setLong(2, acidWriteEvent.getTxnId());
-        pst.setLong(3, acidWriteEvent.getWriteId());
-        pst.setString(4, dbName);
-        pst.setString(5, tblName);
-        pst.setString(6, partition);
-        pst.setString(7, tableObj);
-        pst.setString(8, partitionObj);
-        pst.setString(9, files);
-        pst.setInt(10, currentTime);
-        LOG.info("Going to execute insert <" + s.replaceAll("\\?", "{}") + ">", nextNLId
-                , acidWriteEvent.getTxnId(), acidWriteEvent.getWriteId(), quoteString(dbName), quoteString(tblName),
-                quoteString(partition), quoteString(tableObj), quoteString(partitionObj), quoteString(files), currentTime);
-        pst.execute();
-      } else {
-        String existingFiles = rs.getString(1);
-        if (existingFiles.contains(sqlGenerator.addEscapeCharacters(files))) {
-          // If list of files are already present then no need to update it again. This scenario can come in case of
-          // retry done to the meta store for the same operation.
-          LOG.info("file list " + files + " already present");
-          return;
+    ResultSet rs = null;
+    String select = sqlGenerator.addForUpdateClause("select \"WNL_ID\", \"WNL_FILES\" from" +
+            " \"TXN_WRITE_NOTIFICATION_LOG\" " +
+            "where \"WNL_DATABASE\" = ? " +
+            "and \"WNL_TABLE\" = ? " + " and \"WNL_PARTITION\" = ? " +
+            "and \"WNL_TXNID\" = ? ");
+    List<Integer> insertList = new ArrayList<>();
+    Map<Integer, Pair<Long, String>> updateMap = new HashMap<>();
+    try (PreparedStatement pst = dbConn.prepareStatement(select)) {
+      for (int i = 0; i < acidWriteEventList.size(); i++) {
+        String dbName = acidWriteEventList.get(i).getDatabase();
+        String tblName = acidWriteEventList.get(i).getTable();
+        String partition = acidWriteEventList.get(i).getPartition();
+        Long txnId = acidWriteEventList.get(i).getTxnId();
+
+        LOG.debug("Going to execute query <" + select.replaceAll("\\?", "{}") + ">",
+                quoteString(dbName), quoteString(tblName), quoteString(partition));
+        pst.setString(1, dbName);
+        pst.setString(2, tblName);
+        pst.setString(3, partition);
+        pst.setLong(4, txnId);
+        rs = pst.executeQuery();
+        if (!rs.next()) {
+          insertList.add(i);
+        } else {
+          updateMap.put(i, new Pair<>(rs.getLong(1), rs.getString(2)));
         }
-        long nlId = rs.getLong(2);
-        int currentTime = now();
-        files = ReplChangeManager.joinWithSeparator(Lists.newArrayList(files, existingFiles));
-        s = "update \"TXN_WRITE_NOTIFICATION_LOG\" set \"WNL_TABLE_OBJ\" = ? ," +
+      }
+    } catch (Exception e) {
+      LOG.error("Failed to execute insert ", e);
+      throw new MetaException(e.getMessage());
+    } finally {
+      close(rs);
+    }
+
+    if (insertList.size() != 0) {
+      // if rs is empty then no lock is taken and thus it can not cause deadlock.
+      long nextNLId = getNextNLId(dbConn, sqlGenerator,
+              "org.apache.hadoop.hive.metastore.model.MTxnWriteNotificationLog", insertList.size());
+
+      String insert = "insert into \"TXN_WRITE_NOTIFICATION_LOG\" " +
+              "(\"WNL_ID\", \"WNL_TXNID\", \"WNL_WRITEID\", \"WNL_DATABASE\", \"WNL_TABLE\", " +
+              "\"WNL_PARTITION\", \"WNL_TABLE_OBJ\", \"WNL_PARTITION_OBJ\", " +
+              "\"WNL_FILES\", \"WNL_EVENT_TIME\") VALUES (?,?,?,?,?,?,?,?,?,?)";
+      try (PreparedStatement pst = dbConn.prepareStatement(sqlGenerator.addEscapeCharacters(insert))) {
+        numRows = 0;
+        for (int idx : insertList) {
+          String tableObj = msgBatch.get(idx).getTableObjStr();
+          String partitionObj = msgBatch.get(idx).getPartitionObjStr();
+          String files = ReplChangeManager.joinWithSeparator(msgBatch.get(idx).getFiles());
+          String dbName = acidWriteEventList.get(idx).getDatabase();
+          String tblName = acidWriteEventList.get(idx).getTable();
+          String partition = acidWriteEventList.get(idx).getPartition();
+          int currentTime = now();
+
+          pst.setLong(1, nextNLId++);
+          pst.setLong(2, acidWriteEventList.get(idx).getTxnId());
+          pst.setLong(3, acidWriteEventList.get(idx).getWriteId());
+          pst.setString(4, dbName);
+          pst.setString(5, tblName);
+          pst.setString(6, partition);
+          pst.setString(7, tableObj);
+          pst.setString(8, partitionObj);
+          pst.setString(9, files);
+          pst.setInt(10, currentTime);
+          LOG.debug("Going to execute insert <" + insert.replaceAll("\\?", "{}") + ">", nextNLId
+                  , acidWriteEventList.get(idx).getTxnId(), acidWriteEventList.get(idx).getWriteId()
+                  , quoteString(dbName), quoteString(tblName),
+                  quoteString(partition), quoteString(tableObj), quoteString(partitionObj), quoteString(files), currentTime);
+          pst.addBatch();
+          numRows++;
+          if (numRows == maxRows) {
+            pst.executeBatch();
+            numRows = 0;
+          }
+        }
+
+        if (numRows != 0) {
+          pst.executeBatch();
+        }
+      } catch (Exception e) {
+        LOG.error("Failed to execute insert ", e);
+        throw new MetaException(e.getMessage());
+      }
+    }
+
+    if (updateMap.size() != 0) {
+        String update = "update \"TXN_WRITE_NOTIFICATION_LOG\" set \"WNL_TABLE_OBJ\" = ? ," +
                 " \"WNL_PARTITION_OBJ\" = ? ," +
                 " \"WNL_FILES\" = ? ," +
                 " \"WNL_EVENT_TIME\" = ?" +
                 " where \"WNL_ID\" = ?";
-        closeStmt(pst);
-        pst = dbConn.prepareStatement(sqlGenerator.addEscapeCharacters(s));
-        pst.setString(1, tableObj);
-        pst.setString(2, partitionObj);
-        pst.setString(3, files);
-        pst.setInt(4, currentTime);
-        pst.setLong(5, nlId);
-        LOG.info("Going to execute update <" + s.replaceAll("\\?", "{}") + ">", quoteString(tableObj),
-                quoteString(partitionObj), quoteString(files), currentTime, nlId);
-        pst.executeUpdate();
+      try (PreparedStatement pst = dbConn.prepareStatement(sqlGenerator.addEscapeCharacters(update))) {
+        numRows = 0;
+        for (Map.Entry entry : updateMap.entrySet()) {
+          int idx = (int) entry.getKey();
+          Pair<Long, String> nlIdInfo = (Pair<Long, String>) entry.getValue();
+          String tableObj = msgBatch.get(idx).getTableObjStr();
+          String partitionObj = msgBatch.get(idx).getPartitionObjStr();
+          String files = ReplChangeManager.joinWithSeparator(msgBatch.get(idx).getFiles());
+          String existingFiles = nlIdInfo.second;
+          long nlId = nlIdInfo.first;
+          int currentTime = now();
+
+          if (existingFiles.contains(sqlGenerator.addEscapeCharacters(files))) {
+            // If list of files are already present then no need to update it again. This scenario can come in case of
+            // retry done to the meta store for the same operation.
+            LOG.info("file list " + files + " already present");
+            continue;
+          }
+
+          files = ReplChangeManager.joinWithSeparator(Lists.newArrayList(files, existingFiles));
+
+          pst.setString(1, tableObj);
+          pst.setString(2, partitionObj);
+          pst.setString(3, files);
+          pst.setInt(4, currentTime);
+          pst.setLong(5, nlId);
+          LOG.debug("Going to execute update <" + update.replaceAll("\\?", "{}") + ">",
+                  quoteString(tableObj), quoteString(partitionObj), quoteString(files), currentTime, nlId);
+          pst.addBatch();
+          numRows++;
+          if (numRows == maxRows) {
+            pst.executeBatch();
+            numRows = 0;
+          }
+        }
+
+        if (numRows != 0) {
+          pst.executeBatch();
+        }
+      } catch (Exception e) {

Review comment:
       Should we catch SQLException instead?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: gitbox-unsubscribe@hive.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: gitbox-unsubscribe@hive.apache.org
For additional commands, e-mail: gitbox-help@hive.apache.org


[GitHub] [hive] maheshk114 merged pull request #2357: Hive 25205 : Reduce overhead of adding write notification log during batch loading of partition.

Posted by GitBox <gi...@apache.org>.
maheshk114 merged pull request #2357:
URL: https://github.com/apache/hive/pull/2357


   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: gitbox-unsubscribe@hive.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: gitbox-unsubscribe@hive.apache.org
For additional commands, e-mail: gitbox-help@hive.apache.org


[GitHub] [hive] maheshk114 commented on a change in pull request #2357: Hive 25205 : Reduce overhead of adding write notification log during batch loading of partition.

Posted by GitBox <gi...@apache.org>.
maheshk114 commented on a change in pull request #2357:
URL: https://github.com/apache/hive/pull/2357#discussion_r681441315



##########
File path: ql/src/java/org/apache/hadoop/hive/ql/metadata/Hive.java
##########
@@ -3101,10 +3102,25 @@ private void constructOneLBLocationMap(FileStatus fSta,
       // For acid table, add the acid_write event with file list at the time of load itself. But
       // it should be done after partition is created.
 
+      List<WriteNotificationLogRequest> requestList = new ArrayList<>();
       for (Entry<Path, PartitionDetails> entry : partitionDetailsMap.entrySet()) {
         PartitionDetails partitionDetails = entry.getValue();
         if (isTxnTable && partitionDetails.newFiles != null) {
-          addWriteNotificationLog(tbl, partitionDetails.fullSpec, partitionDetails.newFiles, writeId);
+          addWriteNotificationLog(tbl, partitionDetails.fullSpec, partitionDetails.newFiles,
+                  writeId, requestList);
+        }
+      }
+      if (requestList.size() != 0) {
+        WriteNotificationLogBatchRequest rqst = new WriteNotificationLogBatchRequest(tbl.getCatName(), tbl.getDbName(),
+                tbl.getTableName(), requestList);
+        try {
+          get(conf).getSynchronizedMSC().addWriteNotificationLogInBatch(rqst);
+        } catch (TException e) {
+          // For older HMS, if the batch API is not supported, fall back to older API.
+          LOG.info("addWriteNotificationLogInBatch failed with ", e);

Review comment:
       I have changed the code to retry only if the error is not supported method. And checked the idempotent behaviour also.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: gitbox-unsubscribe@hive.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: gitbox-unsubscribe@hive.apache.org
For additional commands, e-mail: gitbox-help@hive.apache.org


[GitHub] [hive] maheshk114 merged pull request #2357: Hive 25205 : Reduce overhead of adding write notification log during batch loading of partition.

Posted by GitBox <gi...@apache.org>.
maheshk114 merged pull request #2357:
URL: https://github.com/apache/hive/pull/2357


   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: gitbox-unsubscribe@hive.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: gitbox-unsubscribe@hive.apache.org
For additional commands, e-mail: gitbox-help@hive.apache.org