You are viewing a plain text version of this content. The canonical link for it is here.
Posted to gitbox@hive.apache.org by GitBox <gi...@apache.org> on 2020/07/30 14:46:52 UTC

[GitHub] [hive] pvargacl opened a new pull request #1339: HIVE-23956: Delete delta fileIds should be pushed execution

pvargacl opened a new pull request #1339:
URL: https://github.com/apache/hive/pull/1339


   


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: gitbox-unsubscribe@hive.apache.org
For additional commands, e-mail: gitbox-help@hive.apache.org


[GitHub] [hive] pvargacl commented on a change in pull request #1339: HIVE-23956: Delete delta fileIds should be pushed execution

Posted by GitBox <gi...@apache.org>.
pvargacl commented on a change in pull request #1339:
URL: https://github.com/apache/hive/pull/1339#discussion_r464476338



##########
File path: ql/src/java/org/apache/hadoop/hive/ql/io/AcidInputFormat.java
##########
@@ -118,70 +126,217 @@
      */
     private long visibilityTxnId;
 
+    private List<DeltaFileMetaData> deltaFiles;
+
     public DeltaMetaData() {
-      this(0,0,new ArrayList<Integer>(), 0);
+      this(0, 0, new ArrayList<>(), 0, new ArrayList<>());
     }
+
     /**
+     * @param minWriteId min writeId of the delta directory
+     * @param maxWriteId max writeId of the delta directory
      * @param stmtIds delta dir suffixes when a single txn writes > 1 delta in the same partition
      * @param visibilityTxnId maybe 0, if the dir name didn't have it.  txnid:0 is always visible
+     * @param deltaFiles bucketFiles in the directory
      */
-    DeltaMetaData(long minWriteId, long maxWriteId, List<Integer> stmtIds, long visibilityTxnId) {
+    public DeltaMetaData(long minWriteId, long maxWriteId, List<Integer> stmtIds, long visibilityTxnId,
+        List<DeltaFileMetaData> deltaFiles) {
       this.minWriteId = minWriteId;
       this.maxWriteId = maxWriteId;
       if (stmtIds == null) {
         throw new IllegalArgumentException("stmtIds == null");
       }
       this.stmtIds = stmtIds;
       this.visibilityTxnId = visibilityTxnId;
+      this.deltaFiles = ObjectUtils.defaultIfNull(deltaFiles, new ArrayList<>());
     }
-    long getMinWriteId() {
+
+    public long getMinWriteId() {
       return minWriteId;
     }
-    long getMaxWriteId() {
+
+    public long getMaxWriteId() {
       return maxWriteId;
     }
-    List<Integer> getStmtIds() {
+
+    public List<Integer> getStmtIds() {
       return stmtIds;
     }
-    long getVisibilityTxnId() {
+
+    public long getVisibilityTxnId() {
       return visibilityTxnId;
     }
+
+    public List<DeltaFileMetaData> getDeltaFiles() {
+      return deltaFiles;
+    }
+
+    public List<DeltaFileMetaData> getDeltaFilesForStmtId(final Integer stmtId) {
+      if (stmtIds.size() <= 1 || stmtId == null) {
+        // If it is not a multistatement delta, we do not store the stmtId in the file list
+        return deltaFiles;
+      } else {
+        return deltaFiles.stream().filter(df -> stmtId.equals(df.getStmtId())).collect(Collectors.toList());
+      }
+    }
+
     @Override
     public void write(DataOutput out) throws IOException {
       out.writeLong(minWriteId);
       out.writeLong(maxWriteId);
       out.writeInt(stmtIds.size());
-      for(Integer id : stmtIds) {
+      for (Integer id : stmtIds) {
         out.writeInt(id);
       }
       out.writeLong(visibilityTxnId);
+      out.writeInt(deltaFiles.size());
+      for (DeltaFileMetaData fileMeta : deltaFiles) {
+        fileMeta.write(out);
+      }
     }
+
     @Override
     public void readFields(DataInput in) throws IOException {
       minWriteId = in.readLong();
       maxWriteId = in.readLong();
       stmtIds.clear();
       int numStatements = in.readInt();
-      for(int i = 0; i < numStatements; i++) {
+      for (int i = 0; i < numStatements; i++) {
         stmtIds.add(in.readInt());
       }
       visibilityTxnId = in.readLong();
+
+      deltaFiles.clear();
+      int numFiles = in.readInt();
+      for (int i = 0; i < numFiles; i++) {
+        DeltaFileMetaData file = new DeltaFileMetaData();
+        file.readFields(in);
+        deltaFiles.add(file);
+      }
     }
-    String getName() {
+
+    private String getName() {
       assert stmtIds.isEmpty() : "use getName(int)";
-      return AcidUtils.addVisibilitySuffix(AcidUtils
-          .deleteDeltaSubdir(minWriteId, maxWriteId), visibilityTxnId);
+      return AcidUtils.addVisibilitySuffix(AcidUtils.deleteDeltaSubdir(minWriteId, maxWriteId), visibilityTxnId);
     }
-    String getName(int stmtId) {
+
+    private String getName(int stmtId) {
       assert !stmtIds.isEmpty() : "use getName()";
       return AcidUtils.addVisibilitySuffix(AcidUtils
           .deleteDeltaSubdir(minWriteId, maxWriteId, stmtId), visibilityTxnId);
     }
+
+    public List<Pair<Path, Integer>> getPaths(Path root) {
+      if (stmtIds.isEmpty()) {
+        return Collections.singletonList(new ImmutablePair<>(new Path(root, getName()), null));
+      } else {
+        // To support multistatement transactions we may have multiple directories corresponding to one DeltaMetaData
+        return getStmtIds().stream()
+            .map(stmtId -> new ImmutablePair<>(new Path(root, getName(stmtId)), stmtId)).collect(Collectors.toList());
+      }
+    }
+
     @Override
     public String toString() {
       return "Delta(?," + minWriteId + "," + maxWriteId + "," + stmtIds + "," + visibilityTxnId + ")";
     }
   }
+
+  final class DeltaFileMetaData implements Writable {
+    private static final int HAS_LONG_FILEID_FLAG = 1;
+    private static final int HAS_ATTEMPTID_FLAG = 2;
+    private static final int HAS_STMTID_FLAG = 4;
+
+    private long modTime;
+    private long length;
+    // Optional
+    private Integer attemptId;
+    // Optional
+    private Long fileId;
+    // Optional, if the deltaMeta contains multiple stmtIds, it will contain this files parent's stmtId
+    private Integer stmtId;
+
+    public DeltaFileMetaData() {
+    }
+
+    public DeltaFileMetaData(HadoopShims.HdfsFileStatusWithId fileStatus, Integer stmtId) {
+      modTime = fileStatus.getFileStatus().getModificationTime();
+      length = fileStatus.getFileStatus().getLen();
+      String attempt = AcidUtils.parseAttemptId(fileStatus.getFileStatus().getPath());
+      attemptId = StringUtils.isEmpty(attempt) ? null : Integer.parseInt(attempt);
+      fileId = fileStatus.getFileId();
+      this.stmtId = stmtId;
+    }
+
+    public DeltaFileMetaData(long modTime, long length, @Nullable Integer attemptId, @Nullable Long fileId,
+        @Nullable Integer stmtId) {
+      this.modTime = modTime;
+      this.length = length;
+      this.attemptId = attemptId;
+      this.fileId = fileId;
+      this.stmtId = stmtId;
+    }
+
+    public void clearStmtId() {
+      stmtId = null;
+    }
+
+    public Integer getStmtId() {
+      return stmtId;
+    }
+
+    @Override
+    public void write(DataOutput out) throws IOException {
+      int flags = (fileId != null ? HAS_LONG_FILEID_FLAG : 0) |
+          (attemptId != null ? HAS_ATTEMPTID_FLAG : 0) |
+          (stmtId != null ? HAS_STMTID_FLAG : 0);
+      out.writeByte(flags);
+      out.writeLong(modTime);
+      out.writeLong(length);
+      if (attemptId != null) {
+        out.writeInt(attemptId);
+      }
+      if (fileId != null) {
+        out.writeLong(fileId);
+      }
+      if (stmtId != null) {
+        out.writeInt(stmtId);
+      }
+    }
+
+    @Override
+    public void readFields(DataInput in) throws IOException {
+      byte flags = in.readByte();
+      boolean hasLongFileId = (HAS_LONG_FILEID_FLAG & flags) != 0,
+          hasAttemptId = (HAS_ATTEMPTID_FLAG & flags) != 0,
+          hasStmtId = (HAS_STMTID_FLAG & flags) != 0;
+      modTime = in.readLong();
+      length = in.readLong();
+      if (hasAttemptId) {
+        attemptId = in.readInt();
+      }
+      if (hasLongFileId) {
+        fileId = in.readLong();
+      }
+      if (hasStmtId) {
+        stmtId = in.readInt();
+      }
+    }
+
+    public Object getFileId(Path deltaDirectory, int bucketId) {
+      if (fileId != null) {
+        return fileId;
+      }
+      // Calculate the synthetic fileid
+      Path realPath = getPath(deltaDirectory, bucketId);
+      return new SyntheticFileId(realPath, length, modTime);
+    }

Review comment:
       I will add the forceSynthetic parameter, it has valid use-casee (https://issues.apache.org/jira/browse/HIVE-20338) but I have a problem with this: boolean allowSynthetic = HiveConf.getBoolVar(daemonConf, ConfVars.LLAP_CACHE_ALLOW_SYNTHETIC_FILEID);
   If someone disables this, it will render the llap cache useless, even more, your orctailcache will just throw an IllegalCacheConfigurationException   




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: gitbox-unsubscribe@hive.apache.org
For additional commands, e-mail: gitbox-help@hive.apache.org


[GitHub] [hive] pvary commented on a change in pull request #1339: HIVE-23956: Delete delta fileIds should be pushed execution

Posted by GitBox <gi...@apache.org>.
pvary commented on a change in pull request #1339:
URL: https://github.com/apache/hive/pull/1339#discussion_r464311382



##########
File path: ql/src/java/org/apache/hadoop/hive/ql/io/orc/VectorizedOrcAcidRowBatchReader.java
##########
@@ -1641,28 +1645,26 @@ public int compareTo(CompressedOwid other) {
      * Check if the delete delta folder needs to be scanned for a given split's min/max write ids.
      *
      * @param orcSplitMinMaxWriteIds
-     * @param deleteDeltaDir
+     * @param deleteDelta
+     * @param stmtId statementId of the deleteDelta if present
      * @return true when  delete delta dir has to be scanned.
      */
     @VisibleForTesting
     protected static boolean isQualifiedDeleteDeltaForSplit(AcidOutputFormat.Options orcSplitMinMaxWriteIds,
-        Path deleteDeltaDir)
-    {
-      AcidUtils.ParsedDelta deleteDelta = AcidUtils.parsedDelta(deleteDeltaDir, false);
+        AcidInputFormat.DeltaMetaData deleteDelta, Integer stmtId) {

Review comment:
       nit: extra spaces?




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: gitbox-unsubscribe@hive.apache.org
For additional commands, e-mail: gitbox-help@hive.apache.org


[GitHub] [hive] pvary commented on a change in pull request #1339: HIVE-23956: Delete delta fileIds should be pushed execution

Posted by GitBox <gi...@apache.org>.
pvary commented on a change in pull request #1339:
URL: https://github.com/apache/hive/pull/1339#discussion_r463156735



##########
File path: ql/src/java/org/apache/hadoop/hive/ql/io/AcidInputFormat.java
##########
@@ -118,70 +123,183 @@
      */
     private long visibilityTxnId;
 
+    private List<DeltaFileMetaData> deltaFiles;
+
     public DeltaMetaData() {
-      this(0,0,new ArrayList<Integer>(), 0);
+      this(0, 0, new ArrayList<>(), 0, new ArrayList<>());
     }
     /**
+     * @param minWriteId min writeId of the delta directory
+     * @param maxWriteId max writeId of the delta directory
      * @param stmtIds delta dir suffixes when a single txn writes > 1 delta in the same partition
      * @param visibilityTxnId maybe 0, if the dir name didn't have it.  txnid:0 is always visible
+     * @param deltaFileStatuses bucketFiles in the directory
      */
-    DeltaMetaData(long minWriteId, long maxWriteId, List<Integer> stmtIds, long visibilityTxnId) {
+    DeltaMetaData(long minWriteId, long maxWriteId, List<Integer> stmtIds, long visibilityTxnId,
+        List<HadoopShims.HdfsFileStatusWithId> deltaFileStatuses) {
       this.minWriteId = minWriteId;
       this.maxWriteId = maxWriteId;
       if (stmtIds == null) {
         throw new IllegalArgumentException("stmtIds == null");
       }
       this.stmtIds = stmtIds;
       this.visibilityTxnId = visibilityTxnId;
+      this.deltaFiles = new ArrayList<>();
+      for(HadoopShims.HdfsFileStatusWithId fileStatus : deltaFileStatuses) {
+        deltaFiles.add(new DeltaFileMetaData(fileStatus));
+      }
     }
+
     long getMinWriteId() {
       return minWriteId;
     }
+
     long getMaxWriteId() {
       return maxWriteId;
     }
+
     List<Integer> getStmtIds() {
       return stmtIds;
     }
+
     long getVisibilityTxnId() {
       return visibilityTxnId;
     }
+
+    public List<DeltaFileMetaData> getDeltaFiles() {
+      return deltaFiles;
+    }
+
     @Override
     public void write(DataOutput out) throws IOException {
       out.writeLong(minWriteId);
       out.writeLong(maxWriteId);
       out.writeInt(stmtIds.size());
-      for(Integer id : stmtIds) {
+      for (Integer id : stmtIds) {
         out.writeInt(id);
       }
       out.writeLong(visibilityTxnId);
+      out.writeInt(deltaFiles.size());
+      for (DeltaFileMetaData fileMeta : deltaFiles) {
+        fileMeta.write(out);
+      }
     }
+
     @Override
     public void readFields(DataInput in) throws IOException {
       minWriteId = in.readLong();
       maxWriteId = in.readLong();
       stmtIds.clear();
       int numStatements = in.readInt();
-      for(int i = 0; i < numStatements; i++) {
+      for (int i = 0; i < numStatements; i++) {
         stmtIds.add(in.readInt());
       }
       visibilityTxnId = in.readLong();
+
+      deltaFiles.clear();
+      int numFiles = in.readInt();
+      for(int i = 0; i< numFiles; i++) {
+        DeltaFileMetaData file = new DeltaFileMetaData();
+        file.readFields(in);
+        deltaFiles.add(file);
+      }
     }
-    String getName() {
+    private String getName() {
       assert stmtIds.isEmpty() : "use getName(int)";
       return AcidUtils.addVisibilitySuffix(AcidUtils
           .deleteDeltaSubdir(minWriteId, maxWriteId), visibilityTxnId);
     }
-    String getName(int stmtId) {
+    private String getName(int stmtId) {
       assert !stmtIds.isEmpty() : "use getName()";
       return AcidUtils.addVisibilitySuffix(AcidUtils
           .deleteDeltaSubdir(minWriteId, maxWriteId, stmtId), visibilityTxnId);
     }
+
+    public List<Path> getPaths(Path root) {
+      if (stmtIds.isEmpty()) {
+        return Collections.singletonList(new Path(root, getName()));
+      } else {
+        // To support multistatement transactions we may have multiple directories corresponding to one DeltaMetaData
+        return getStmtIds().stream().map(stmtId -> new Path(root, getName(stmtId))).collect(Collectors.toList());
+      }
+    }
     @Override
     public String toString() {
       return "Delta(?," + minWriteId + "," + maxWriteId + "," + stmtIds + "," + visibilityTxnId + ")";
     }
   }
+  final class DeltaFileMetaData implements Writable {

Review comment:
       Nit: newline 




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: gitbox-unsubscribe@hive.apache.org
For additional commands, e-mail: gitbox-help@hive.apache.org


[GitHub] [hive] pvary commented on a change in pull request #1339: HIVE-23956: Delete delta fileIds should be pushed execution

Posted by GitBox <gi...@apache.org>.
pvary commented on a change in pull request #1339:
URL: https://github.com/apache/hive/pull/1339#discussion_r463156327



##########
File path: ql/src/java/org/apache/hadoop/hive/ql/io/AcidInputFormat.java
##########
@@ -118,70 +123,183 @@
      */
     private long visibilityTxnId;
 
+    private List<DeltaFileMetaData> deltaFiles;
+
     public DeltaMetaData() {
-      this(0,0,new ArrayList<Integer>(), 0);
+      this(0, 0, new ArrayList<>(), 0, new ArrayList<>());
     }
     /**

Review comment:
       Nit: newline 




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: gitbox-unsubscribe@hive.apache.org
For additional commands, e-mail: gitbox-help@hive.apache.org


[GitHub] [hive] pvary commented on a change in pull request #1339: HIVE-23956: Delete delta fileIds should be pushed execution

Posted by GitBox <gi...@apache.org>.
pvary commented on a change in pull request #1339:
URL: https://github.com/apache/hive/pull/1339#discussion_r463163160



##########
File path: ql/src/java/org/apache/hadoop/hive/ql/io/AcidUtils.java
##########
@@ -1113,10 +1119,13 @@ else if(statementId != parsedDelta.statementId) {
               && (last.getMinWriteId() == parsedDelta.getMinWriteId())
               && (last.getMaxWriteId() == parsedDelta.getMaxWriteId())) {
         last.getStmtIds().add(parsedDelta.getStatementId());
+        for(HadoopShims.HdfsFileStatusWithId fileStatus : parsedDelta.getFiles()) {

Review comment:
       Nit: space before for




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: gitbox-unsubscribe@hive.apache.org
For additional commands, e-mail: gitbox-help@hive.apache.org


[GitHub] [hive] pvary commented on a change in pull request #1339: HIVE-23956: Delete delta fileIds should be pushed execution

Posted by GitBox <gi...@apache.org>.
pvary commented on a change in pull request #1339:
URL: https://github.com/apache/hive/pull/1339#discussion_r463151233



##########
File path: ql/src/java/org/apache/hadoop/hive/ql/io/AcidInputFormat.java
##########
@@ -118,70 +123,183 @@
      */
     private long visibilityTxnId;
 
+    private List<DeltaFileMetaData> deltaFiles;
+
     public DeltaMetaData() {
-      this(0,0,new ArrayList<Integer>(), 0);
+      this(0, 0, new ArrayList<>(), 0, new ArrayList<>());
     }
     /**
+     * @param minWriteId min writeId of the delta directory
+     * @param maxWriteId max writeId of the delta directory
      * @param stmtIds delta dir suffixes when a single txn writes > 1 delta in the same partition
      * @param visibilityTxnId maybe 0, if the dir name didn't have it.  txnid:0 is always visible
+     * @param deltaFileStatuses bucketFiles in the directory
      */
-    DeltaMetaData(long minWriteId, long maxWriteId, List<Integer> stmtIds, long visibilityTxnId) {
+    DeltaMetaData(long minWriteId, long maxWriteId, List<Integer> stmtIds, long visibilityTxnId,
+        List<HadoopShims.HdfsFileStatusWithId> deltaFileStatuses) {
       this.minWriteId = minWriteId;
       this.maxWriteId = maxWriteId;
       if (stmtIds == null) {
         throw new IllegalArgumentException("stmtIds == null");
       }
       this.stmtIds = stmtIds;
       this.visibilityTxnId = visibilityTxnId;
+      this.deltaFiles = new ArrayList<>();
+      for(HadoopShims.HdfsFileStatusWithId fileStatus : deltaFileStatuses) {
+        deltaFiles.add(new DeltaFileMetaData(fileStatus));
+      }
     }
+
     long getMinWriteId() {
       return minWriteId;
     }
+
     long getMaxWriteId() {
       return maxWriteId;
     }
+
     List<Integer> getStmtIds() {
       return stmtIds;
     }
+
     long getVisibilityTxnId() {
       return visibilityTxnId;
     }
+
+    public List<DeltaFileMetaData> getDeltaFiles() {
+      return deltaFiles;
+    }
+
     @Override
     public void write(DataOutput out) throws IOException {
       out.writeLong(minWriteId);
       out.writeLong(maxWriteId);
       out.writeInt(stmtIds.size());
-      for(Integer id : stmtIds) {
+      for (Integer id : stmtIds) {
         out.writeInt(id);
       }
       out.writeLong(visibilityTxnId);
+      out.writeInt(deltaFiles.size());
+      for (DeltaFileMetaData fileMeta : deltaFiles) {
+        fileMeta.write(out);
+      }
     }
+
     @Override
     public void readFields(DataInput in) throws IOException {
       minWriteId = in.readLong();
       maxWriteId = in.readLong();
       stmtIds.clear();
       int numStatements = in.readInt();
-      for(int i = 0; i < numStatements; i++) {
+      for (int i = 0; i < numStatements; i++) {
         stmtIds.add(in.readInt());
       }
       visibilityTxnId = in.readLong();
+
+      deltaFiles.clear();
+      int numFiles = in.readInt();
+      for(int i = 0; i< numFiles; i++) {
+        DeltaFileMetaData file = new DeltaFileMetaData();
+        file.readFields(in);
+        deltaFiles.add(file);
+      }
     }
-    String getName() {
+    private String getName() {
       assert stmtIds.isEmpty() : "use getName(int)";
       return AcidUtils.addVisibilitySuffix(AcidUtils
           .deleteDeltaSubdir(minWriteId, maxWriteId), visibilityTxnId);
     }
-    String getName(int stmtId) {
+    private String getName(int stmtId) {
       assert !stmtIds.isEmpty() : "use getName()";
       return AcidUtils.addVisibilitySuffix(AcidUtils
           .deleteDeltaSubdir(minWriteId, maxWriteId, stmtId), visibilityTxnId);
     }
+
+    public List<Path> getPaths(Path root) {
+      if (stmtIds.isEmpty()) {
+        return Collections.singletonList(new Path(root, getName()));
+      } else {
+        // To support multistatement transactions we may have multiple directories corresponding to one DeltaMetaData
+        return getStmtIds().stream().map(stmtId -> new Path(root, getName(stmtId))).collect(Collectors.toList());
+      }
+    }
     @Override

Review comment:
       Nit: newline




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: gitbox-unsubscribe@hive.apache.org
For additional commands, e-mail: gitbox-help@hive.apache.org


[GitHub] [hive] pvary commented on a change in pull request #1339: HIVE-23956: Delete delta fileIds should be pushed execution

Posted by GitBox <gi...@apache.org>.
pvary commented on a change in pull request #1339:
URL: https://github.com/apache/hive/pull/1339#discussion_r464309265



##########
File path: ql/src/java/org/apache/hadoop/hive/ql/io/orc/VectorizedOrcAcidRowBatchReader.java
##########
@@ -1574,20 +1577,23 @@ public int compareTo(CompressedOwid other) {
       this.orcSplit = orcSplit;
 
       try {
-        final Path[] deleteDeltaDirs = getDeleteDeltaDirsFromSplit(orcSplit);
-        if (deleteDeltaDirs.length > 0) {
+        if (orcSplit.getDeltas().size() > 0) {
           AcidOutputFormat.Options orcSplitMinMaxWriteIds =
               AcidUtils.parseBaseOrDeltaBucketFilename(orcSplit.getPath(), conf);
           int totalDeleteEventCount = 0;
-          for (Path deleteDeltaDir : deleteDeltaDirs) {
-            if (!isQualifiedDeleteDeltaForSplit(orcSplitMinMaxWriteIds, deleteDeltaDir)) {
-              continue;
-            }
-            Path[] deleteDeltaFiles = OrcRawRecordMerger.getDeltaFiles(deleteDeltaDir, bucket,
-                new OrcRawRecordMerger.Options().isCompacting(false), null);
-            for (Path deleteDeltaFile : deleteDeltaFiles) {
-              try {
-                ReaderData readerData = getOrcTail(deleteDeltaFile, conf, cacheTag);
+          for (AcidInputFormat.DeltaMetaData deltaMetaData : orcSplit.getDeltas()) {
+            // We got one path for each statement in a multiStmt transaction
+            for (Pair<Path,Integer> deleteDeltaDir : deltaMetaData.getPaths(orcSplit.getRootDir())) {

Review comment:
       nit: space <Path, Integer>




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: gitbox-unsubscribe@hive.apache.org
For additional commands, e-mail: gitbox-help@hive.apache.org


[GitHub] [hive] szlta merged pull request #1339: HIVE-23956: Delete delta fileIds should be pushed execution

Posted by GitBox <gi...@apache.org>.
szlta merged pull request #1339:
URL: https://github.com/apache/hive/pull/1339


   


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: gitbox-unsubscribe@hive.apache.org
For additional commands, e-mail: gitbox-help@hive.apache.org


[GitHub] [hive] pvary commented on a change in pull request #1339: HIVE-23956: Delete delta fileIds should be pushed execution

Posted by GitBox <gi...@apache.org>.
pvary commented on a change in pull request #1339:
URL: https://github.com/apache/hive/pull/1339#discussion_r464310228



##########
File path: ql/src/java/org/apache/hadoop/hive/ql/io/AcidInputFormat.java
##########
@@ -118,70 +126,217 @@
      */
     private long visibilityTxnId;
 
+    private List<DeltaFileMetaData> deltaFiles;
+
     public DeltaMetaData() {
-      this(0,0,new ArrayList<Integer>(), 0);
+      this(0, 0, new ArrayList<>(), 0, new ArrayList<>());
     }
+
     /**
+     * @param minWriteId min writeId of the delta directory
+     * @param maxWriteId max writeId of the delta directory
      * @param stmtIds delta dir suffixes when a single txn writes > 1 delta in the same partition
      * @param visibilityTxnId maybe 0, if the dir name didn't have it.  txnid:0 is always visible
+     * @param deltaFiles bucketFiles in the directory
      */
-    DeltaMetaData(long minWriteId, long maxWriteId, List<Integer> stmtIds, long visibilityTxnId) {
+    public DeltaMetaData(long minWriteId, long maxWriteId, List<Integer> stmtIds, long visibilityTxnId,
+        List<DeltaFileMetaData> deltaFiles) {
       this.minWriteId = minWriteId;
       this.maxWriteId = maxWriteId;
       if (stmtIds == null) {
         throw new IllegalArgumentException("stmtIds == null");
       }
       this.stmtIds = stmtIds;
       this.visibilityTxnId = visibilityTxnId;
+      this.deltaFiles = ObjectUtils.defaultIfNull(deltaFiles, new ArrayList<>());
     }
-    long getMinWriteId() {
+
+    public long getMinWriteId() {
       return minWriteId;
     }
-    long getMaxWriteId() {
+
+    public long getMaxWriteId() {
       return maxWriteId;
     }
-    List<Integer> getStmtIds() {
+
+    public List<Integer> getStmtIds() {
       return stmtIds;
     }
-    long getVisibilityTxnId() {
+
+    public long getVisibilityTxnId() {
       return visibilityTxnId;
     }
+
+    public List<DeltaFileMetaData> getDeltaFiles() {
+      return deltaFiles;
+    }
+
+    public List<DeltaFileMetaData> getDeltaFilesForStmtId(final Integer stmtId) {
+      if (stmtIds.size() <= 1 || stmtId == null) {
+        // If it is not a multistatement delta, we do not store the stmtId in the file list
+        return deltaFiles;
+      } else {
+        return deltaFiles.stream().filter(df -> stmtId.equals(df.getStmtId())).collect(Collectors.toList());
+      }
+    }
+
     @Override
     public void write(DataOutput out) throws IOException {
       out.writeLong(minWriteId);
       out.writeLong(maxWriteId);
       out.writeInt(stmtIds.size());
-      for(Integer id : stmtIds) {
+      for (Integer id : stmtIds) {
         out.writeInt(id);
       }
       out.writeLong(visibilityTxnId);
+      out.writeInt(deltaFiles.size());
+      for (DeltaFileMetaData fileMeta : deltaFiles) {
+        fileMeta.write(out);
+      }
     }
+
     @Override
     public void readFields(DataInput in) throws IOException {
       minWriteId = in.readLong();
       maxWriteId = in.readLong();
       stmtIds.clear();
       int numStatements = in.readInt();
-      for(int i = 0; i < numStatements; i++) {
+      for (int i = 0; i < numStatements; i++) {
         stmtIds.add(in.readInt());
       }
       visibilityTxnId = in.readLong();
+
+      deltaFiles.clear();
+      int numFiles = in.readInt();
+      for (int i = 0; i < numFiles; i++) {
+        DeltaFileMetaData file = new DeltaFileMetaData();
+        file.readFields(in);
+        deltaFiles.add(file);
+      }
     }
-    String getName() {
+
+    private String getName() {
       assert stmtIds.isEmpty() : "use getName(int)";
-      return AcidUtils.addVisibilitySuffix(AcidUtils
-          .deleteDeltaSubdir(minWriteId, maxWriteId), visibilityTxnId);
+      return AcidUtils.addVisibilitySuffix(AcidUtils.deleteDeltaSubdir(minWriteId, maxWriteId), visibilityTxnId);
     }
-    String getName(int stmtId) {
+
+    private String getName(int stmtId) {
       assert !stmtIds.isEmpty() : "use getName()";
       return AcidUtils.addVisibilitySuffix(AcidUtils
           .deleteDeltaSubdir(minWriteId, maxWriteId, stmtId), visibilityTxnId);
     }
+
+    public List<Pair<Path, Integer>> getPaths(Path root) {

Review comment:
       Do we need the order? Why not map?




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: gitbox-unsubscribe@hive.apache.org
For additional commands, e-mail: gitbox-help@hive.apache.org


[GitHub] [hive] pvargacl commented on a change in pull request #1339: HIVE-23956: Delete delta fileIds should be pushed execution

Posted by GitBox <gi...@apache.org>.
pvargacl commented on a change in pull request #1339:
URL: https://github.com/apache/hive/pull/1339#discussion_r464473125



##########
File path: ql/src/java/org/apache/hadoop/hive/ql/io/orc/VectorizedOrcAcidRowBatchReader.java
##########
@@ -1641,28 +1645,26 @@ public int compareTo(CompressedOwid other) {
      * Check if the delete delta folder needs to be scanned for a given split's min/max write ids.
      *
      * @param orcSplitMinMaxWriteIds
-     * @param deleteDeltaDir
+     * @param deleteDelta
+     * @param stmtId statementId of the deleteDelta if present
      * @return true when  delete delta dir has to be scanned.
      */
     @VisibleForTesting
     protected static boolean isQualifiedDeleteDeltaForSplit(AcidOutputFormat.Options orcSplitMinMaxWriteIds,
-        Path deleteDeltaDir)
-    {
-      AcidUtils.ParsedDelta deleteDelta = AcidUtils.parsedDelta(deleteDeltaDir, false);
+        AcidInputFormat.DeltaMetaData deleteDelta, Integer stmtId) {

Review comment:
       it is the second line of parameters, no extra space here




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: gitbox-unsubscribe@hive.apache.org
For additional commands, e-mail: gitbox-help@hive.apache.org


[GitHub] [hive] pvary commented on a change in pull request #1339: HIVE-23956: Delete delta fileIds should be pushed execution

Posted by GitBox <gi...@apache.org>.
pvary commented on a change in pull request #1339:
URL: https://github.com/apache/hive/pull/1339#discussion_r464308530



##########
File path: ql/src/java/org/apache/hadoop/hive/ql/io/AcidUtils.java
##########
@@ -2493,7 +2514,7 @@ private static Path chooseFile(Path baseOrDeltaDir, FileSystem fs) throws IOExce
       }
       FileStatus[] dataFiles;
       try {
-        dataFiles = fs.listStatus(new Path[]{baseOrDeltaDir}, originalBucketFilter);
+        dataFiles = fs.listStatus(baseOrDeltaDir , originalBucketFilter);

Review comment:
       nit: extra space




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: gitbox-unsubscribe@hive.apache.org
For additional commands, e-mail: gitbox-help@hive.apache.org


[GitHub] [hive] pvary commented on a change in pull request #1339: HIVE-23956: Delete delta fileIds should be pushed execution

Posted by GitBox <gi...@apache.org>.
pvary commented on a change in pull request #1339:
URL: https://github.com/apache/hive/pull/1339#discussion_r463171159



##########
File path: ql/src/test/org/apache/hadoop/hive/ql/io/TestAcidInputFormat.java
##########
@@ -83,6 +94,34 @@ public void testDeltaMetaConstructWithState() throws Exception {
     assertThat(deltaMetaData.getStmtIds().get(2), is(99));
   }
 
+  @Test
+  public void testDeltaMetaWithFile() throws Exception {

Review comment:
       Test for all of the different serialization options




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: gitbox-unsubscribe@hive.apache.org
For additional commands, e-mail: gitbox-help@hive.apache.org


[GitHub] [hive] pvary commented on a change in pull request #1339: HIVE-23956: Delete delta fileIds should be pushed execution

Posted by GitBox <gi...@apache.org>.
pvary commented on a change in pull request #1339:
URL: https://github.com/apache/hive/pull/1339#discussion_r464304647



##########
File path: llap-server/src/test/org/apache/hadoop/hive/llap/cache/TestOrcMetadataCache.java
##########
@@ -250,18 +255,71 @@ public void testGetOrcTailForPath() throws Exception {
     Configuration jobConf = new Configuration();
     Configuration daemonConf = new Configuration();
     CacheTag tag = CacheTag.build("test-table");
-    OrcTail uncached = OrcEncodedDataReader.getOrcTailForPath(path, jobConf, tag, daemonConf, cache);
+    OrcTail uncached = OrcEncodedDataReader.getOrcTailForPath(path, jobConf, tag, daemonConf, cache, null);
     jobConf.set(HiveConf.ConfVars.LLAP_IO_CACHE_ONLY.varname, "true");
-    OrcTail cached = OrcEncodedDataReader.getOrcTailForPath(path, jobConf, tag, daemonConf, cache);
+    OrcTail cached = OrcEncodedDataReader.getOrcTailForPath(path, jobConf, tag, daemonConf, cache, null);
     assertEquals(uncached.getSerializedTail(), cached.getSerializedTail());
     assertEquals(uncached.getFileTail(), cached.getFileTail());
   }
 
+  @Test
+  public void testGetOrcTailForPathWithFileId() throws Exception {
+    DummyMemoryManager mm = new DummyMemoryManager();
+    DummyCachePolicy cp = new DummyCachePolicy();
+    final int MAX_ALLOC = 64;
+    LlapDaemonCacheMetrics metrics = LlapDaemonCacheMetrics.create("", "");
+    BuddyAllocator alloc = new BuddyAllocator(
+        false, false, 8, MAX_ALLOC, 1, 4096, 0, null, mm, metrics, null, true);
+    MetadataCache cache = new MetadataCache(alloc, mm, cp, true, metrics);
+
+    Path path = new Path("../data/files/alltypesorc");
+    Configuration jobConf = new Configuration();
+    Configuration daemonConf = new Configuration();
+    CacheTag tag = CacheTag.build("test-table");
+    FileSystem fs = FileSystem.get(daemonConf);
+    FileStatus fileStatus = fs.getFileStatus(path);
+    OrcTail uncached = OrcEncodedDataReader.getOrcTailForPath(fileStatus.getPath(), jobConf, tag, daemonConf, cache, new SyntheticFileId(fileStatus));
+    jobConf.set(HiveConf.ConfVars.LLAP_IO_CACHE_ONLY.varname, "true");
+    // this should work from the cache, by recalculating the same fileId
+    OrcTail cached = OrcEncodedDataReader.getOrcTailForPath(fileStatus.getPath(), jobConf, tag, daemonConf, cache, null);
+    assertEquals(uncached.getSerializedTail(), cached.getSerializedTail());
+    assertEquals(uncached.getFileTail(), cached.getFileTail());
+  }
+
+  @Test
+  public void testGetOrcTailForPathWithFileIdChange() throws Exception {
+    DummyMemoryManager mm = new DummyMemoryManager();
+    DummyCachePolicy cp = new DummyCachePolicy();
+    final int MAX_ALLOC = 64;
+    LlapDaemonCacheMetrics metrics = LlapDaemonCacheMetrics.create("", "");
+    BuddyAllocator alloc = new BuddyAllocator(
+        false, false, 8, MAX_ALLOC, 1, 4096, 0, null, mm, metrics, null, true);
+    MetadataCache cache = new MetadataCache(alloc, mm, cp, true, metrics);
+
+    Path path = new Path("../data/files/alltypesorc");
+    Configuration jobConf = new Configuration();
+    Configuration daemonConf = new Configuration();
+    CacheTag tag = CacheTag.build("test-table");
+    OrcEncodedDataReader.getOrcTailForPath(path, jobConf, tag, daemonConf, cache, new SyntheticFileId(path, 100, 100));
+    jobConf.set(HiveConf.ConfVars.LLAP_IO_CACHE_ONLY.varname, "true");
+    Exception ex = null;
+    try {
+      // this should miss the cache, since the fileKey changed
+      OrcEncodedDataReader.getOrcTailForPath(path, jobConf, tag, daemonConf, cache, new SyntheticFileId(path, 100, 101));
+    } catch (IOException e) {
+      ex = e;
+    }
+    Assert.assertNotNull(ex);
+    Assert.assertTrue(ex.getMessage().contains(HiveConf.ConfVars.LLAP_IO_CACHE_ONLY.varname));
+  }
+
+

Review comment:
       nit: too many newline. If we need any fix, please remove them




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: gitbox-unsubscribe@hive.apache.org
For additional commands, e-mail: gitbox-help@hive.apache.org


[GitHub] [hive] pvary commented on a change in pull request #1339: HIVE-23956: Delete delta fileIds should be pushed execution

Posted by GitBox <gi...@apache.org>.
pvary commented on a change in pull request #1339:
URL: https://github.com/apache/hive/pull/1339#discussion_r464312615



##########
File path: ql/src/test/org/apache/hadoop/hive/ql/TestTxnCommands.java
##########
@@ -618,7 +618,13 @@ public void testMultipleInserts() throws Exception {
     dumpTableData(Table.ACIDTBL, 1, 1);
     List<String> rs1 = runStatementOnDriver("select a,b from " + Table.ACIDTBL + " order by a,b");
     Assert.assertEquals("Content didn't match after commit rs1", allData, rs1);
+    runStatementOnDriver("delete from " + Table.ACIDTBL + " where b = 2");

Review comment:
       This is a valid test, but I think the testMultipleInserts test for inserts, and this is test for deletes. Maybe create its' own test method named testDeleteOfInserts like testUpdateOfInserts?




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: gitbox-unsubscribe@hive.apache.org
For additional commands, e-mail: gitbox-help@hive.apache.org


[GitHub] [hive] pvary commented on a change in pull request #1339: HIVE-23956: Delete delta fileIds should be pushed execution

Posted by GitBox <gi...@apache.org>.
pvary commented on a change in pull request #1339:
URL: https://github.com/apache/hive/pull/1339#discussion_r464859899



##########
File path: ql/src/java/org/apache/hadoop/hive/ql/io/orc/VectorizedOrcAcidRowBatchReader.java
##########
@@ -1641,28 +1645,26 @@ public int compareTo(CompressedOwid other) {
      * Check if the delete delta folder needs to be scanned for a given split's min/max write ids.
      *
      * @param orcSplitMinMaxWriteIds
-     * @param deleteDeltaDir
+     * @param deleteDelta
+     * @param stmtId statementId of the deleteDelta if present
      * @return true when  delete delta dir has to be scanned.
      */
     @VisibleForTesting
     protected static boolean isQualifiedDeleteDeltaForSplit(AcidOutputFormat.Options orcSplitMinMaxWriteIds,
-        Path deleteDeltaDir)
-    {
-      AcidUtils.ParsedDelta deleteDelta = AcidUtils.parsedDelta(deleteDeltaDir, false);
+        AcidInputFormat.DeltaMetaData deleteDelta, Integer stmtId) {

Review comment:
       :)




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: gitbox-unsubscribe@hive.apache.org
For additional commands, e-mail: gitbox-help@hive.apache.org


[GitHub] [hive] pvargacl commented on a change in pull request #1339: HIVE-23956: Delete delta fileIds should be pushed execution

Posted by GitBox <gi...@apache.org>.
pvargacl commented on a change in pull request #1339:
URL: https://github.com/apache/hive/pull/1339#discussion_r464483465



##########
File path: ql/src/java/org/apache/hadoop/hive/ql/io/AcidInputFormat.java
##########
@@ -118,70 +126,217 @@
      */
     private long visibilityTxnId;
 
+    private List<DeltaFileMetaData> deltaFiles;
+
     public DeltaMetaData() {
-      this(0,0,new ArrayList<Integer>(), 0);
+      this(0, 0, new ArrayList<>(), 0, new ArrayList<>());
     }
+
     /**
+     * @param minWriteId min writeId of the delta directory
+     * @param maxWriteId max writeId of the delta directory
      * @param stmtIds delta dir suffixes when a single txn writes > 1 delta in the same partition
      * @param visibilityTxnId maybe 0, if the dir name didn't have it.  txnid:0 is always visible
+     * @param deltaFiles bucketFiles in the directory
      */
-    DeltaMetaData(long minWriteId, long maxWriteId, List<Integer> stmtIds, long visibilityTxnId) {
+    public DeltaMetaData(long minWriteId, long maxWriteId, List<Integer> stmtIds, long visibilityTxnId,
+        List<DeltaFileMetaData> deltaFiles) {
       this.minWriteId = minWriteId;
       this.maxWriteId = maxWriteId;
       if (stmtIds == null) {
         throw new IllegalArgumentException("stmtIds == null");
       }
       this.stmtIds = stmtIds;
       this.visibilityTxnId = visibilityTxnId;
+      this.deltaFiles = ObjectUtils.defaultIfNull(deltaFiles, new ArrayList<>());
     }
-    long getMinWriteId() {
+
+    public long getMinWriteId() {
       return minWriteId;
     }
-    long getMaxWriteId() {
+
+    public long getMaxWriteId() {
       return maxWriteId;
     }
-    List<Integer> getStmtIds() {
+
+    public List<Integer> getStmtIds() {
       return stmtIds;
     }
-    long getVisibilityTxnId() {
+
+    public long getVisibilityTxnId() {
       return visibilityTxnId;
     }
+
+    public List<DeltaFileMetaData> getDeltaFiles() {
+      return deltaFiles;
+    }
+
+    public List<DeltaFileMetaData> getDeltaFilesForStmtId(final Integer stmtId) {
+      if (stmtIds.size() <= 1 || stmtId == null) {
+        // If it is not a multistatement delta, we do not store the stmtId in the file list
+        return deltaFiles;
+      } else {
+        return deltaFiles.stream().filter(df -> stmtId.equals(df.getStmtId())).collect(Collectors.toList());

Review comment:
       I will be a very small list, I don't think it matters.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: gitbox-unsubscribe@hive.apache.org
For additional commands, e-mail: gitbox-help@hive.apache.org


[GitHub] [hive] pvargacl commented on a change in pull request #1339: HIVE-23956: Delete delta fileIds should be pushed execution

Posted by GitBox <gi...@apache.org>.
pvargacl commented on a change in pull request #1339:
URL: https://github.com/apache/hive/pull/1339#discussion_r463249932



##########
File path: ql/src/java/org/apache/hadoop/hive/ql/io/orc/VectorizedOrcAcidRowBatchReader.java
##########
@@ -1574,45 +1576,46 @@ public int compareTo(CompressedOwid other) {
       this.orcSplit = orcSplit;
 
       try {
-        final Path[] deleteDeltaDirs = getDeleteDeltaDirsFromSplit(orcSplit);
-        if (deleteDeltaDirs.length > 0) {
+        if (orcSplit.getDeltas().size() > 0) {
           AcidOutputFormat.Options orcSplitMinMaxWriteIds =
               AcidUtils.parseBaseOrDeltaBucketFilename(orcSplit.getPath(), conf);
           int totalDeleteEventCount = 0;
-          for (Path deleteDeltaDir : deleteDeltaDirs) {
-            if (!isQualifiedDeleteDeltaForSplit(orcSplitMinMaxWriteIds, deleteDeltaDir)) {
-              continue;
-            }
-            Path[] deleteDeltaFiles = OrcRawRecordMerger.getDeltaFiles(deleteDeltaDir, bucket,
-                new OrcRawRecordMerger.Options().isCompacting(false), null);
-            for (Path deleteDeltaFile : deleteDeltaFiles) {
-              try {
-                ReaderData readerData = getOrcTail(deleteDeltaFile, conf, cacheTag);
-                OrcTail orcTail = readerData.orcTail;
-                if (orcTail.getFooter().getNumberOfRows() <= 0) {
-                  continue; // just a safe check to ensure that we are not reading empty delete files.
-                }
-                OrcRawRecordMerger.KeyInterval deleteKeyInterval = findDeleteMinMaxKeys(orcTail, deleteDeltaFile);
-                if (!deleteKeyInterval.isIntersects(keyInterval)) {
-                  // If there is no intersection between data and delete delta, do not read delete file
-                  continue;
-                }
-                // Reader can be reused if it was created before for getting orcTail: mostly for non-LLAP cache cases.
-                // For LLAP cases we need to create it here.
-                Reader deleteDeltaReader = readerData.reader != null ? readerData.reader :
-                    OrcFile.createReader(deleteDeltaFile, OrcFile.readerOptions(conf));
-                totalDeleteEventCount += deleteDeltaReader.getNumberOfRows();
-                DeleteReaderValue deleteReaderValue = new DeleteReaderValue(deleteDeltaReader,
-                    deleteDeltaFile, readerOptions, bucket, validWriteIdList, isBucketedTable, conf,
-                    keyInterval, orcSplit);
-                DeleteRecordKey deleteRecordKey = new DeleteRecordKey();
-                if (deleteReaderValue.next(deleteRecordKey)) {
-                  sortMerger.put(deleteRecordKey, deleteReaderValue);
-                } else {
-                  deleteReaderValue.close();
+          for (AcidInputFormat.DeltaMetaData deltaMetaData : orcSplit.getDeltas()) {
+            for (Path deleteDeltaDir : deltaMetaData.getPaths(orcSplit.getRootDir())) {
+              if (!isQualifiedDeleteDeltaForSplit(orcSplitMinMaxWriteIds, deleteDeltaDir)) {
+                LOG.debug("Skipping delete delta dir {}", deleteDeltaDir);
+                continue;
+              }
+              for (AcidInputFormat.DeltaFileMetaData fileMetaData : deltaMetaData.getDeltaFiles()) {
+                Path deleteDeltaFile = fileMetaData.getPath(deleteDeltaDir, bucket);
+                try {
+                  ReaderData readerData = getOrcTail(deleteDeltaFile, conf, cacheTag, fileMetaData.getFileId(deleteDeltaDir, bucket));
+                  OrcTail orcTail = readerData.orcTail;
+                  if (orcTail.getFooter().getNumberOfRows() <= 0) {
+                    continue; // just a safe check to ensure that we are not reading empty delete files.
+                  }
+                  OrcRawRecordMerger.KeyInterval deleteKeyInterval = findDeleteMinMaxKeys(orcTail, deleteDeltaFile);
+                  if (!deleteKeyInterval.isIntersects(keyInterval)) {
+                    // If there is no intersection between data and delete delta, do not read delete file
+                    continue;
+                  }
+                  // Reader can be reused if it was created before for getting orcTail: mostly for non-LLAP cache cases.
+                  // For LLAP cases we need to create it here.
+                  Reader deleteDeltaReader = readerData.reader != null ? readerData.reader : OrcFile
+                      .createReader(deleteDeltaFile, OrcFile.readerOptions(conf));
+                  totalDeleteEventCount += deleteDeltaReader.getNumberOfRows();
+                  DeleteReaderValue deleteReaderValue =
+                      new DeleteReaderValue(deleteDeltaReader, deleteDeltaFile, readerOptions, bucket, validWriteIdList,
+                          isBucketedTable, conf, keyInterval, orcSplit);
+                  DeleteRecordKey deleteRecordKey = new DeleteRecordKey();
+                  if (deleteReaderValue.next(deleteRecordKey)) {
+                    sortMerger.put(deleteRecordKey, deleteReaderValue);
+                  } else {
+                    deleteReaderValue.close();
+                  }
+                } catch (FileNotFoundException fnf) {

Review comment:
       Technically we need this, because of the multistatement case is not handled too well. There is one DeltaMetaData for one writeId and the statementIds are collected there. I did not want to disturb this structure, but this way I have one merged fileList for the different folders and it will try each file for each folder. This is far from ideal, but I don't think it is worth the effort to change this, before the multistatement feature is developed. But I will change the comment to reflect that.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: gitbox-unsubscribe@hive.apache.org
For additional commands, e-mail: gitbox-help@hive.apache.org


[GitHub] [hive] pvary commented on a change in pull request #1339: HIVE-23956: Delete delta fileIds should be pushed execution

Posted by GitBox <gi...@apache.org>.
pvary commented on a change in pull request #1339:
URL: https://github.com/apache/hive/pull/1339#discussion_r463163936



##########
File path: ql/src/java/org/apache/hadoop/hive/ql/io/AcidUtils.java
##########
@@ -1165,8 +1168,14 @@ private static ParsedDelta parseDelta(Path path, String deltaPrefix, FileSystem
     throws IOException {
     ParsedDelta p = parsedDelta(path, deltaPrefix, fs, dirSnapshot);
     boolean isDeleteDelta = deltaPrefix.equals(DELETE_DELTA_PREFIX);
+    List<HdfsFileStatusWithId> files = new ArrayList<>();
+    for (FileStatus fileStatus : dirSnapshot.getFiles()) {

Review comment:
       Nit: maybe do it in java8 way?




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: gitbox-unsubscribe@hive.apache.org
For additional commands, e-mail: gitbox-help@hive.apache.org


[GitHub] [hive] pvary commented on a change in pull request #1339: HIVE-23956: Delete delta fileIds should be pushed execution

Posted by GitBox <gi...@apache.org>.
pvary commented on a change in pull request #1339:
URL: https://github.com/apache/hive/pull/1339#discussion_r463166514



##########
File path: ql/src/java/org/apache/hadoop/hive/ql/io/orc/VectorizedOrcAcidRowBatchReader.java
##########
@@ -680,14 +681,15 @@ public void setBaseAndInnerReader(
    * @param path The Orc file path we want to get the OrcTail for
    * @param conf The Configuration to access LLAP
    * @param cacheTag The cacheTag needed to get OrcTail from LLAP IO cache
+   * @param fileKey fileId of the Orc file (either the Long fileId of HDFS or the SyntheticFileId)
    * @return ReaderData object where the orcTail is not null. Reader can be null, but if we had to create
    * one we return that as well for further reuse.
    */
-  private static ReaderData getOrcTail(Path path, Configuration conf, CacheTag cacheTag) throws IOException {
+  private static ReaderData getOrcTail(Path path, Configuration conf, CacheTag cacheTag, Object fileKey) throws IOException {

Review comment:
       Maybe 2 different getOrcTail method on the LLAP IO interface? @szlta?




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: gitbox-unsubscribe@hive.apache.org
For additional commands, e-mail: gitbox-help@hive.apache.org


[GitHub] [hive] pvary commented on a change in pull request #1339: HIVE-23956: Delete delta fileIds should be pushed execution

Posted by GitBox <gi...@apache.org>.
pvary commented on a change in pull request #1339:
URL: https://github.com/apache/hive/pull/1339#discussion_r463406580



##########
File path: ql/src/java/org/apache/hadoop/hive/ql/io/orc/VectorizedOrcAcidRowBatchReader.java
##########
@@ -1574,45 +1576,46 @@ public int compareTo(CompressedOwid other) {
       this.orcSplit = orcSplit;
 
       try {
-        final Path[] deleteDeltaDirs = getDeleteDeltaDirsFromSplit(orcSplit);
-        if (deleteDeltaDirs.length > 0) {
+        if (orcSplit.getDeltas().size() > 0) {
           AcidOutputFormat.Options orcSplitMinMaxWriteIds =
               AcidUtils.parseBaseOrDeltaBucketFilename(orcSplit.getPath(), conf);
           int totalDeleteEventCount = 0;
-          for (Path deleteDeltaDir : deleteDeltaDirs) {
-            if (!isQualifiedDeleteDeltaForSplit(orcSplitMinMaxWriteIds, deleteDeltaDir)) {
-              continue;
-            }
-            Path[] deleteDeltaFiles = OrcRawRecordMerger.getDeltaFiles(deleteDeltaDir, bucket,
-                new OrcRawRecordMerger.Options().isCompacting(false), null);
-            for (Path deleteDeltaFile : deleteDeltaFiles) {
-              try {
-                ReaderData readerData = getOrcTail(deleteDeltaFile, conf, cacheTag);
-                OrcTail orcTail = readerData.orcTail;
-                if (orcTail.getFooter().getNumberOfRows() <= 0) {
-                  continue; // just a safe check to ensure that we are not reading empty delete files.
-                }
-                OrcRawRecordMerger.KeyInterval deleteKeyInterval = findDeleteMinMaxKeys(orcTail, deleteDeltaFile);
-                if (!deleteKeyInterval.isIntersects(keyInterval)) {
-                  // If there is no intersection between data and delete delta, do not read delete file
-                  continue;
-                }
-                // Reader can be reused if it was created before for getting orcTail: mostly for non-LLAP cache cases.
-                // For LLAP cases we need to create it here.
-                Reader deleteDeltaReader = readerData.reader != null ? readerData.reader :
-                    OrcFile.createReader(deleteDeltaFile, OrcFile.readerOptions(conf));
-                totalDeleteEventCount += deleteDeltaReader.getNumberOfRows();
-                DeleteReaderValue deleteReaderValue = new DeleteReaderValue(deleteDeltaReader,
-                    deleteDeltaFile, readerOptions, bucket, validWriteIdList, isBucketedTable, conf,
-                    keyInterval, orcSplit);
-                DeleteRecordKey deleteRecordKey = new DeleteRecordKey();
-                if (deleteReaderValue.next(deleteRecordKey)) {
-                  sortMerger.put(deleteRecordKey, deleteReaderValue);
-                } else {
-                  deleteReaderValue.close();
+          for (AcidInputFormat.DeltaMetaData deltaMetaData : orcSplit.getDeltas()) {
+            for (Path deleteDeltaDir : deltaMetaData.getPaths(orcSplit.getRootDir())) {
+              if (!isQualifiedDeleteDeltaForSplit(orcSplitMinMaxWriteIds, deleteDeltaDir)) {
+                LOG.debug("Skipping delete delta dir {}", deleteDeltaDir);
+                continue;
+              }
+              for (AcidInputFormat.DeltaFileMetaData fileMetaData : deltaMetaData.getDeltaFiles()) {
+                Path deleteDeltaFile = fileMetaData.getPath(deleteDeltaDir, bucket);
+                try {
+                  ReaderData readerData = getOrcTail(deleteDeltaFile, conf, cacheTag, fileMetaData.getFileId(deleteDeltaDir, bucket));
+                  OrcTail orcTail = readerData.orcTail;
+                  if (orcTail.getFooter().getNumberOfRows() <= 0) {
+                    continue; // just a safe check to ensure that we are not reading empty delete files.
+                  }
+                  OrcRawRecordMerger.KeyInterval deleteKeyInterval = findDeleteMinMaxKeys(orcTail, deleteDeltaFile);
+                  if (!deleteKeyInterval.isIntersects(keyInterval)) {
+                    // If there is no intersection between data and delete delta, do not read delete file
+                    continue;
+                  }
+                  // Reader can be reused if it was created before for getting orcTail: mostly for non-LLAP cache cases.
+                  // For LLAP cases we need to create it here.
+                  Reader deleteDeltaReader = readerData.reader != null ? readerData.reader : OrcFile
+                      .createReader(deleteDeltaFile, OrcFile.readerOptions(conf));
+                  totalDeleteEventCount += deleteDeltaReader.getNumberOfRows();
+                  DeleteReaderValue deleteReaderValue =
+                      new DeleteReaderValue(deleteDeltaReader, deleteDeltaFile, readerOptions, bucket, validWriteIdList,
+                          isBucketedTable, conf, keyInterval, orcSplit);
+                  DeleteRecordKey deleteRecordKey = new DeleteRecordKey();
+                  if (deleteReaderValue.next(deleteRecordKey)) {
+                    sortMerger.put(deleteRecordKey, deleteReaderValue);
+                  } else {
+                    deleteReaderValue.close();
+                  }
+                } catch (FileNotFoundException fnf) {

Review comment:
       Multitable insersts also uses stmtId when inserting data. Not sure if we can insert twice in the same table with a single query....




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: gitbox-unsubscribe@hive.apache.org
For additional commands, e-mail: gitbox-help@hive.apache.org


[GitHub] [hive] pvargacl commented on a change in pull request #1339: HIVE-23956: Delete delta fileIds should be pushed execution

Posted by GitBox <gi...@apache.org>.
pvargacl commented on a change in pull request #1339:
URL: https://github.com/apache/hive/pull/1339#discussion_r463237211



##########
File path: ql/src/java/org/apache/hadoop/hive/ql/io/orc/VectorizedOrcAcidRowBatchReader.java
##########
@@ -680,14 +681,15 @@ public void setBaseAndInnerReader(
    * @param path The Orc file path we want to get the OrcTail for
    * @param conf The Configuration to access LLAP
    * @param cacheTag The cacheTag needed to get OrcTail from LLAP IO cache
+   * @param fileKey fileId of the Orc file (either the Long fileId of HDFS or the SyntheticFileId)
    * @return ReaderData object where the orcTail is not null. Reader can be null, but if we had to create
    * one we return that as well for further reuse.
    */
-  private static ReaderData getOrcTail(Path path, Configuration conf, CacheTag cacheTag) throws IOException {
+  private static ReaderData getOrcTail(Path path, Configuration conf, CacheTag cacheTag, Object fileKey) throws IOException {

Review comment:
       I am not sure, we will ever want to use it. without fileId.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: gitbox-unsubscribe@hive.apache.org
For additional commands, e-mail: gitbox-help@hive.apache.org


[GitHub] [hive] pvargacl commented on a change in pull request #1339: HIVE-23956: Delete delta fileIds should be pushed execution

Posted by GitBox <gi...@apache.org>.
pvargacl commented on a change in pull request #1339:
URL: https://github.com/apache/hive/pull/1339#discussion_r464472005



##########
File path: ql/src/java/org/apache/hadoop/hive/ql/io/AcidInputFormat.java
##########
@@ -118,70 +126,217 @@
      */
     private long visibilityTxnId;
 
+    private List<DeltaFileMetaData> deltaFiles;
+
     public DeltaMetaData() {
-      this(0,0,new ArrayList<Integer>(), 0);
+      this(0, 0, new ArrayList<>(), 0, new ArrayList<>());
     }
+
     /**
+     * @param minWriteId min writeId of the delta directory
+     * @param maxWriteId max writeId of the delta directory
      * @param stmtIds delta dir suffixes when a single txn writes > 1 delta in the same partition
      * @param visibilityTxnId maybe 0, if the dir name didn't have it.  txnid:0 is always visible
+     * @param deltaFiles bucketFiles in the directory
      */
-    DeltaMetaData(long minWriteId, long maxWriteId, List<Integer> stmtIds, long visibilityTxnId) {
+    public DeltaMetaData(long minWriteId, long maxWriteId, List<Integer> stmtIds, long visibilityTxnId,
+        List<DeltaFileMetaData> deltaFiles) {
       this.minWriteId = minWriteId;
       this.maxWriteId = maxWriteId;
       if (stmtIds == null) {
         throw new IllegalArgumentException("stmtIds == null");
       }
       this.stmtIds = stmtIds;
       this.visibilityTxnId = visibilityTxnId;
+      this.deltaFiles = ObjectUtils.defaultIfNull(deltaFiles, new ArrayList<>());
     }
-    long getMinWriteId() {
+
+    public long getMinWriteId() {
       return minWriteId;
     }
-    long getMaxWriteId() {
+
+    public long getMaxWriteId() {
       return maxWriteId;
     }
-    List<Integer> getStmtIds() {
+
+    public List<Integer> getStmtIds() {
       return stmtIds;
     }
-    long getVisibilityTxnId() {
+
+    public long getVisibilityTxnId() {
       return visibilityTxnId;
     }
+
+    public List<DeltaFileMetaData> getDeltaFiles() {
+      return deltaFiles;
+    }
+
+    public List<DeltaFileMetaData> getDeltaFilesForStmtId(final Integer stmtId) {
+      if (stmtIds.size() <= 1 || stmtId == null) {
+        // If it is not a multistatement delta, we do not store the stmtId in the file list
+        return deltaFiles;
+      } else {
+        return deltaFiles.stream().filter(df -> stmtId.equals(df.getStmtId())).collect(Collectors.toList());
+      }
+    }
+
     @Override
     public void write(DataOutput out) throws IOException {
       out.writeLong(minWriteId);
       out.writeLong(maxWriteId);
       out.writeInt(stmtIds.size());
-      for(Integer id : stmtIds) {
+      for (Integer id : stmtIds) {
         out.writeInt(id);
       }
       out.writeLong(visibilityTxnId);
+      out.writeInt(deltaFiles.size());
+      for (DeltaFileMetaData fileMeta : deltaFiles) {
+        fileMeta.write(out);
+      }
     }
+
     @Override
     public void readFields(DataInput in) throws IOException {
       minWriteId = in.readLong();
       maxWriteId = in.readLong();
       stmtIds.clear();
       int numStatements = in.readInt();
-      for(int i = 0; i < numStatements; i++) {
+      for (int i = 0; i < numStatements; i++) {
         stmtIds.add(in.readInt());
       }
       visibilityTxnId = in.readLong();
+
+      deltaFiles.clear();
+      int numFiles = in.readInt();
+      for (int i = 0; i < numFiles; i++) {
+        DeltaFileMetaData file = new DeltaFileMetaData();
+        file.readFields(in);
+        deltaFiles.add(file);
+      }
     }
-    String getName() {
+
+    private String getName() {
       assert stmtIds.isEmpty() : "use getName(int)";
-      return AcidUtils.addVisibilitySuffix(AcidUtils
-          .deleteDeltaSubdir(minWriteId, maxWriteId), visibilityTxnId);
+      return AcidUtils.addVisibilitySuffix(AcidUtils.deleteDeltaSubdir(minWriteId, maxWriteId), visibilityTxnId);
     }
-    String getName(int stmtId) {
+
+    private String getName(int stmtId) {
       assert !stmtIds.isEmpty() : "use getName()";
       return AcidUtils.addVisibilitySuffix(AcidUtils
           .deleteDeltaSubdir(minWriteId, maxWriteId, stmtId), visibilityTxnId);
     }
+
+    public List<Pair<Path, Integer>> getPaths(Path root) {

Review comment:
       I think the List is much more straightforward, it will keep the stmid order.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: gitbox-unsubscribe@hive.apache.org
For additional commands, e-mail: gitbox-help@hive.apache.org


[GitHub] [hive] pvary commented on a change in pull request #1339: HIVE-23956: Delete delta fileIds should be pushed execution

Posted by GitBox <gi...@apache.org>.
pvary commented on a change in pull request #1339:
URL: https://github.com/apache/hive/pull/1339#discussion_r464306341



##########
File path: ql/src/java/org/apache/hadoop/hive/ql/io/AcidInputFormat.java
##########
@@ -118,70 +126,217 @@
      */
     private long visibilityTxnId;
 
+    private List<DeltaFileMetaData> deltaFiles;
+
     public DeltaMetaData() {
-      this(0,0,new ArrayList<Integer>(), 0);
+      this(0, 0, new ArrayList<>(), 0, new ArrayList<>());
     }
+
     /**
+     * @param minWriteId min writeId of the delta directory
+     * @param maxWriteId max writeId of the delta directory
      * @param stmtIds delta dir suffixes when a single txn writes > 1 delta in the same partition
      * @param visibilityTxnId maybe 0, if the dir name didn't have it.  txnid:0 is always visible
+     * @param deltaFiles bucketFiles in the directory
      */
-    DeltaMetaData(long minWriteId, long maxWriteId, List<Integer> stmtIds, long visibilityTxnId) {
+    public DeltaMetaData(long minWriteId, long maxWriteId, List<Integer> stmtIds, long visibilityTxnId,
+        List<DeltaFileMetaData> deltaFiles) {
       this.minWriteId = minWriteId;
       this.maxWriteId = maxWriteId;
       if (stmtIds == null) {
         throw new IllegalArgumentException("stmtIds == null");
       }
       this.stmtIds = stmtIds;
       this.visibilityTxnId = visibilityTxnId;
+      this.deltaFiles = ObjectUtils.defaultIfNull(deltaFiles, new ArrayList<>());
     }
-    long getMinWriteId() {
+
+    public long getMinWriteId() {
       return minWriteId;
     }
-    long getMaxWriteId() {
+
+    public long getMaxWriteId() {
       return maxWriteId;
     }
-    List<Integer> getStmtIds() {
+
+    public List<Integer> getStmtIds() {
       return stmtIds;
     }
-    long getVisibilityTxnId() {
+
+    public long getVisibilityTxnId() {
       return visibilityTxnId;
     }
+
+    public List<DeltaFileMetaData> getDeltaFiles() {
+      return deltaFiles;
+    }
+
+    public List<DeltaFileMetaData> getDeltaFilesForStmtId(final Integer stmtId) {
+      if (stmtIds.size() <= 1 || stmtId == null) {
+        // If it is not a multistatement delta, we do not store the stmtId in the file list
+        return deltaFiles;
+      } else {
+        return deltaFiles.stream().filter(df -> stmtId.equals(df.getStmtId())).collect(Collectors.toList());

Review comment:
       Question: How often do we call this? Is it ok to calculate this every time, or it would be better to store in a way that is already filtered, like a map?




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: gitbox-unsubscribe@hive.apache.org
For additional commands, e-mail: gitbox-help@hive.apache.org


[GitHub] [hive] pvary commented on a change in pull request #1339: HIVE-23956: Delete delta fileIds should be pushed execution

Posted by GitBox <gi...@apache.org>.
pvary commented on a change in pull request #1339:
URL: https://github.com/apache/hive/pull/1339#discussion_r463169065



##########
File path: ql/src/java/org/apache/hadoop/hive/ql/io/orc/VectorizedOrcAcidRowBatchReader.java
##########
@@ -1574,45 +1576,46 @@ public int compareTo(CompressedOwid other) {
       this.orcSplit = orcSplit;
 
       try {
-        final Path[] deleteDeltaDirs = getDeleteDeltaDirsFromSplit(orcSplit);
-        if (deleteDeltaDirs.length > 0) {
+        if (orcSplit.getDeltas().size() > 0) {
           AcidOutputFormat.Options orcSplitMinMaxWriteIds =
               AcidUtils.parseBaseOrDeltaBucketFilename(orcSplit.getPath(), conf);
           int totalDeleteEventCount = 0;
-          for (Path deleteDeltaDir : deleteDeltaDirs) {
-            if (!isQualifiedDeleteDeltaForSplit(orcSplitMinMaxWriteIds, deleteDeltaDir)) {
-              continue;
-            }
-            Path[] deleteDeltaFiles = OrcRawRecordMerger.getDeltaFiles(deleteDeltaDir, bucket,
-                new OrcRawRecordMerger.Options().isCompacting(false), null);
-            for (Path deleteDeltaFile : deleteDeltaFiles) {
-              try {
-                ReaderData readerData = getOrcTail(deleteDeltaFile, conf, cacheTag);
-                OrcTail orcTail = readerData.orcTail;
-                if (orcTail.getFooter().getNumberOfRows() <= 0) {
-                  continue; // just a safe check to ensure that we are not reading empty delete files.
-                }
-                OrcRawRecordMerger.KeyInterval deleteKeyInterval = findDeleteMinMaxKeys(orcTail, deleteDeltaFile);
-                if (!deleteKeyInterval.isIntersects(keyInterval)) {
-                  // If there is no intersection between data and delete delta, do not read delete file
-                  continue;
-                }
-                // Reader can be reused if it was created before for getting orcTail: mostly for non-LLAP cache cases.
-                // For LLAP cases we need to create it here.
-                Reader deleteDeltaReader = readerData.reader != null ? readerData.reader :
-                    OrcFile.createReader(deleteDeltaFile, OrcFile.readerOptions(conf));
-                totalDeleteEventCount += deleteDeltaReader.getNumberOfRows();
-                DeleteReaderValue deleteReaderValue = new DeleteReaderValue(deleteDeltaReader,
-                    deleteDeltaFile, readerOptions, bucket, validWriteIdList, isBucketedTable, conf,
-                    keyInterval, orcSplit);
-                DeleteRecordKey deleteRecordKey = new DeleteRecordKey();
-                if (deleteReaderValue.next(deleteRecordKey)) {
-                  sortMerger.put(deleteRecordKey, deleteReaderValue);
-                } else {
-                  deleteReaderValue.close();
+          for (AcidInputFormat.DeltaMetaData deltaMetaData : orcSplit.getDeltas()) {
+            for (Path deleteDeltaDir : deltaMetaData.getPaths(orcSplit.getRootDir())) {
+              if (!isQualifiedDeleteDeltaForSplit(orcSplitMinMaxWriteIds, deleteDeltaDir)) {
+                LOG.debug("Skipping delete delta dir {}", deleteDeltaDir);
+                continue;
+              }
+              for (AcidInputFormat.DeltaFileMetaData fileMetaData : deltaMetaData.getDeltaFiles()) {
+                Path deleteDeltaFile = fileMetaData.getPath(deleteDeltaDir, bucket);
+                try {
+                  ReaderData readerData = getOrcTail(deleteDeltaFile, conf, cacheTag, fileMetaData.getFileId(deleteDeltaDir, bucket));
+                  OrcTail orcTail = readerData.orcTail;
+                  if (orcTail.getFooter().getNumberOfRows() <= 0) {
+                    continue; // just a safe check to ensure that we are not reading empty delete files.
+                  }
+                  OrcRawRecordMerger.KeyInterval deleteKeyInterval = findDeleteMinMaxKeys(orcTail, deleteDeltaFile);
+                  if (!deleteKeyInterval.isIntersects(keyInterval)) {
+                    // If there is no intersection between data and delete delta, do not read delete file
+                    continue;
+                  }
+                  // Reader can be reused if it was created before for getting orcTail: mostly for non-LLAP cache cases.
+                  // For LLAP cases we need to create it here.
+                  Reader deleteDeltaReader = readerData.reader != null ? readerData.reader : OrcFile
+                      .createReader(deleteDeltaFile, OrcFile.readerOptions(conf));
+                  totalDeleteEventCount += deleteDeltaReader.getNumberOfRows();
+                  DeleteReaderValue deleteReaderValue =
+                      new DeleteReaderValue(deleteDeltaReader, deleteDeltaFile, readerOptions, bucket, validWriteIdList,
+                          isBucketedTable, conf, keyInterval, orcSplit);
+                  DeleteRecordKey deleteRecordKey = new DeleteRecordKey();
+                  if (deleteReaderValue.next(deleteRecordKey)) {
+                    sortMerger.put(deleteRecordKey, deleteReaderValue);
+                  } else {
+                    deleteReaderValue.close();
+                  }
+                } catch (FileNotFoundException fnf) {

Review comment:
       Do we still need this?




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: gitbox-unsubscribe@hive.apache.org
For additional commands, e-mail: gitbox-help@hive.apache.org


[GitHub] [hive] szlta commented on a change in pull request #1339: HIVE-23956: Delete delta fileIds should be pushed execution

Posted by GitBox <gi...@apache.org>.
szlta commented on a change in pull request #1339:
URL: https://github.com/apache/hive/pull/1339#discussion_r464303366



##########
File path: llap-server/src/test/org/apache/hadoop/hive/llap/cache/TestOrcMetadataCache.java
##########
@@ -250,18 +255,71 @@ public void testGetOrcTailForPath() throws Exception {
     Configuration jobConf = new Configuration();
     Configuration daemonConf = new Configuration();
     CacheTag tag = CacheTag.build("test-table");
-    OrcTail uncached = OrcEncodedDataReader.getOrcTailForPath(path, jobConf, tag, daemonConf, cache);
+    OrcTail uncached = OrcEncodedDataReader.getOrcTailForPath(path, jobConf, tag, daemonConf, cache, null);
     jobConf.set(HiveConf.ConfVars.LLAP_IO_CACHE_ONLY.varname, "true");
-    OrcTail cached = OrcEncodedDataReader.getOrcTailForPath(path, jobConf, tag, daemonConf, cache);
+    OrcTail cached = OrcEncodedDataReader.getOrcTailForPath(path, jobConf, tag, daemonConf, cache, null);
     assertEquals(uncached.getSerializedTail(), cached.getSerializedTail());
     assertEquals(uncached.getFileTail(), cached.getFileTail());
   }
 
+  @Test
+  public void testGetOrcTailForPathWithFileId() throws Exception {
+    DummyMemoryManager mm = new DummyMemoryManager();
+    DummyCachePolicy cp = new DummyCachePolicy();
+    final int MAX_ALLOC = 64;
+    LlapDaemonCacheMetrics metrics = LlapDaemonCacheMetrics.create("", "");
+    BuddyAllocator alloc = new BuddyAllocator(
+        false, false, 8, MAX_ALLOC, 1, 4096, 0, null, mm, metrics, null, true);
+    MetadataCache cache = new MetadataCache(alloc, mm, cp, true, metrics);
+
+    Path path = new Path("../data/files/alltypesorc");
+    Configuration jobConf = new Configuration();
+    Configuration daemonConf = new Configuration();
+    CacheTag tag = CacheTag.build("test-table");
+    FileSystem fs = FileSystem.get(daemonConf);
+    FileStatus fileStatus = fs.getFileStatus(path);
+    OrcTail uncached = OrcEncodedDataReader.getOrcTailForPath(fileStatus.getPath(), jobConf, tag, daemonConf, cache, new SyntheticFileId(fileStatus));
+    jobConf.set(HiveConf.ConfVars.LLAP_IO_CACHE_ONLY.varname, "true");
+    // this should work from the cache, by recalculating the same fileId
+    OrcTail cached = OrcEncodedDataReader.getOrcTailForPath(fileStatus.getPath(), jobConf, tag, daemonConf, cache, null);
+    assertEquals(uncached.getSerializedTail(), cached.getSerializedTail());
+    assertEquals(uncached.getFileTail(), cached.getFileTail());
+  }
+
+  @Test
+  public void testGetOrcTailForPathWithFileIdChange() throws Exception {
+    DummyMemoryManager mm = new DummyMemoryManager();
+    DummyCachePolicy cp = new DummyCachePolicy();
+    final int MAX_ALLOC = 64;
+    LlapDaemonCacheMetrics metrics = LlapDaemonCacheMetrics.create("", "");
+    BuddyAllocator alloc = new BuddyAllocator(
+        false, false, 8, MAX_ALLOC, 1, 4096, 0, null, mm, metrics, null, true);
+    MetadataCache cache = new MetadataCache(alloc, mm, cp, true, metrics);
+
+    Path path = new Path("../data/files/alltypesorc");
+    Configuration jobConf = new Configuration();
+    Configuration daemonConf = new Configuration();
+    CacheTag tag = CacheTag.build("test-table");
+    OrcEncodedDataReader.getOrcTailForPath(path, jobConf, tag, daemonConf, cache, new SyntheticFileId(path, 100, 100));
+    jobConf.set(HiveConf.ConfVars.LLAP_IO_CACHE_ONLY.varname, "true");
+    Exception ex = null;
+    try {
+      // this should miss the cache, since the fileKey changed
+      OrcEncodedDataReader.getOrcTailForPath(path, jobConf, tag, daemonConf, cache, new SyntheticFileId(path, 100, 101));

Review comment:
       you can add a _fail_ call here, as it should always jump from line 308 to catch clause.

##########
File path: ql/src/java/org/apache/hadoop/hive/ql/io/orc/VectorizedOrcAcidRowBatchReader.java
##########
@@ -680,14 +681,15 @@ public void setBaseAndInnerReader(
    * @param path The Orc file path we want to get the OrcTail for
    * @param conf The Configuration to access LLAP
    * @param cacheTag The cacheTag needed to get OrcTail from LLAP IO cache
+   * @param fileKey fileId of the Orc file (either the Long fileId of HDFS or the SyntheticFileId)
    * @return ReaderData object where the orcTail is not null. Reader can be null, but if we had to create
    * one we return that as well for further reuse.
    */
-  private static ReaderData getOrcTail(Path path, Configuration conf, CacheTag cacheTag) throws IOException {
+  private static ReaderData getOrcTail(Path path, Configuration conf, CacheTag cacheTag, Object fileKey) throws IOException {

Review comment:
       I'm fine with leaving just this one endpoint, and fileKey being optionally null.
   Please do emphasize the optionality of fileKey arg in the javadoc part: aka it will be generated if not given, etc..

##########
File path: ql/src/java/org/apache/hadoop/hive/ql/io/AcidInputFormat.java
##########
@@ -118,70 +126,217 @@
      */
     private long visibilityTxnId;
 
+    private List<DeltaFileMetaData> deltaFiles;
+
     public DeltaMetaData() {
-      this(0,0,new ArrayList<Integer>(), 0);
+      this(0, 0, new ArrayList<>(), 0, new ArrayList<>());
     }
+
     /**
+     * @param minWriteId min writeId of the delta directory
+     * @param maxWriteId max writeId of the delta directory
      * @param stmtIds delta dir suffixes when a single txn writes > 1 delta in the same partition
      * @param visibilityTxnId maybe 0, if the dir name didn't have it.  txnid:0 is always visible
+     * @param deltaFiles bucketFiles in the directory
      */
-    DeltaMetaData(long minWriteId, long maxWriteId, List<Integer> stmtIds, long visibilityTxnId) {
+    public DeltaMetaData(long minWriteId, long maxWriteId, List<Integer> stmtIds, long visibilityTxnId,
+        List<DeltaFileMetaData> deltaFiles) {
       this.minWriteId = minWriteId;
       this.maxWriteId = maxWriteId;
       if (stmtIds == null) {
         throw new IllegalArgumentException("stmtIds == null");
       }
       this.stmtIds = stmtIds;
       this.visibilityTxnId = visibilityTxnId;
+      this.deltaFiles = ObjectUtils.defaultIfNull(deltaFiles, new ArrayList<>());
     }
-    long getMinWriteId() {
+
+    public long getMinWriteId() {
       return minWriteId;
     }
-    long getMaxWriteId() {
+
+    public long getMaxWriteId() {
       return maxWriteId;
     }
-    List<Integer> getStmtIds() {
+
+    public List<Integer> getStmtIds() {
       return stmtIds;
     }
-    long getVisibilityTxnId() {
+
+    public long getVisibilityTxnId() {
       return visibilityTxnId;
     }
+
+    public List<DeltaFileMetaData> getDeltaFiles() {
+      return deltaFiles;
+    }
+
+    public List<DeltaFileMetaData> getDeltaFilesForStmtId(final Integer stmtId) {
+      if (stmtIds.size() <= 1 || stmtId == null) {
+        // If it is not a multistatement delta, we do not store the stmtId in the file list
+        return deltaFiles;
+      } else {
+        return deltaFiles.stream().filter(df -> stmtId.equals(df.getStmtId())).collect(Collectors.toList());
+      }
+    }
+
     @Override
     public void write(DataOutput out) throws IOException {
       out.writeLong(minWriteId);
       out.writeLong(maxWriteId);
       out.writeInt(stmtIds.size());
-      for(Integer id : stmtIds) {
+      for (Integer id : stmtIds) {
         out.writeInt(id);
       }
       out.writeLong(visibilityTxnId);
+      out.writeInt(deltaFiles.size());
+      for (DeltaFileMetaData fileMeta : deltaFiles) {
+        fileMeta.write(out);
+      }
     }
+
     @Override
     public void readFields(DataInput in) throws IOException {
       minWriteId = in.readLong();
       maxWriteId = in.readLong();
       stmtIds.clear();
       int numStatements = in.readInt();
-      for(int i = 0; i < numStatements; i++) {
+      for (int i = 0; i < numStatements; i++) {
         stmtIds.add(in.readInt());
       }
       visibilityTxnId = in.readLong();
+
+      deltaFiles.clear();
+      int numFiles = in.readInt();
+      for (int i = 0; i < numFiles; i++) {
+        DeltaFileMetaData file = new DeltaFileMetaData();
+        file.readFields(in);
+        deltaFiles.add(file);
+      }
     }
-    String getName() {
+
+    private String getName() {
       assert stmtIds.isEmpty() : "use getName(int)";
-      return AcidUtils.addVisibilitySuffix(AcidUtils
-          .deleteDeltaSubdir(minWriteId, maxWriteId), visibilityTxnId);
+      return AcidUtils.addVisibilitySuffix(AcidUtils.deleteDeltaSubdir(minWriteId, maxWriteId), visibilityTxnId);
     }
-    String getName(int stmtId) {
+
+    private String getName(int stmtId) {
       assert !stmtIds.isEmpty() : "use getName()";
       return AcidUtils.addVisibilitySuffix(AcidUtils
           .deleteDeltaSubdir(minWriteId, maxWriteId, stmtId), visibilityTxnId);
     }
+
+    public List<Pair<Path, Integer>> getPaths(Path root) {
+      if (stmtIds.isEmpty()) {
+        return Collections.singletonList(new ImmutablePair<>(new Path(root, getName()), null));
+      } else {
+        // To support multistatement transactions we may have multiple directories corresponding to one DeltaMetaData
+        return getStmtIds().stream()
+            .map(stmtId -> new ImmutablePair<>(new Path(root, getName(stmtId)), stmtId)).collect(Collectors.toList());
+      }
+    }
+
     @Override
     public String toString() {
       return "Delta(?," + minWriteId + "," + maxWriteId + "," + stmtIds + "," + visibilityTxnId + ")";
     }
   }
+
+  final class DeltaFileMetaData implements Writable {
+    private static final int HAS_LONG_FILEID_FLAG = 1;
+    private static final int HAS_ATTEMPTID_FLAG = 2;
+    private static final int HAS_STMTID_FLAG = 4;
+
+    private long modTime;
+    private long length;
+    // Optional
+    private Integer attemptId;
+    // Optional
+    private Long fileId;
+    // Optional, if the deltaMeta contains multiple stmtIds, it will contain this files parent's stmtId
+    private Integer stmtId;
+
+    public DeltaFileMetaData() {
+    }
+
+    public DeltaFileMetaData(HadoopShims.HdfsFileStatusWithId fileStatus, Integer stmtId) {
+      modTime = fileStatus.getFileStatus().getModificationTime();
+      length = fileStatus.getFileStatus().getLen();
+      String attempt = AcidUtils.parseAttemptId(fileStatus.getFileStatus().getPath());
+      attemptId = StringUtils.isEmpty(attempt) ? null : Integer.parseInt(attempt);
+      fileId = fileStatus.getFileId();
+      this.stmtId = stmtId;
+    }
+
+    public DeltaFileMetaData(long modTime, long length, @Nullable Integer attemptId, @Nullable Long fileId,
+        @Nullable Integer stmtId) {
+      this.modTime = modTime;
+      this.length = length;
+      this.attemptId = attemptId;
+      this.fileId = fileId;
+      this.stmtId = stmtId;
+    }
+
+    public void clearStmtId() {
+      stmtId = null;
+    }
+
+    public Integer getStmtId() {
+      return stmtId;
+    }
+
+    @Override
+    public void write(DataOutput out) throws IOException {
+      int flags = (fileId != null ? HAS_LONG_FILEID_FLAG : 0) |
+          (attemptId != null ? HAS_ATTEMPTID_FLAG : 0) |
+          (stmtId != null ? HAS_STMTID_FLAG : 0);
+      out.writeByte(flags);
+      out.writeLong(modTime);
+      out.writeLong(length);
+      if (attemptId != null) {
+        out.writeInt(attemptId);
+      }
+      if (fileId != null) {
+        out.writeLong(fileId);
+      }
+      if (stmtId != null) {
+        out.writeInt(stmtId);
+      }
+    }
+
+    @Override
+    public void readFields(DataInput in) throws IOException {
+      byte flags = in.readByte();
+      boolean hasLongFileId = (HAS_LONG_FILEID_FLAG & flags) != 0,
+          hasAttemptId = (HAS_ATTEMPTID_FLAG & flags) != 0,
+          hasStmtId = (HAS_STMTID_FLAG & flags) != 0;
+      modTime = in.readLong();
+      length = in.readLong();
+      if (hasAttemptId) {
+        attemptId = in.readInt();
+      }
+      if (hasLongFileId) {
+        fileId = in.readLong();
+      }
+      if (hasStmtId) {
+        stmtId = in.readInt();
+      }
+    }
+
+    public Object getFileId(Path deltaDirectory, int bucketId) {
+      if (fileId != null) {
+        return fileId;
+      }
+      // Calculate the synthetic fileid
+      Path realPath = getPath(deltaDirectory, bucketId);
+      return new SyntheticFileId(realPath, length, modTime);
+    }

Review comment:
       The generation of fileId may be more complicated based on what configuration is given. Please refer to https://github.com/apache/hive/blob/master/llap-server/src/java/org/apache/hadoop/hive/llap/io/encoded/OrcEncodedDataReader.java#L496-L517




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: gitbox-unsubscribe@hive.apache.org
For additional commands, e-mail: gitbox-help@hive.apache.org


[GitHub] [hive] pvary commented on a change in pull request #1339: HIVE-23956: Delete delta fileIds should be pushed execution

Posted by GitBox <gi...@apache.org>.
pvary commented on a change in pull request #1339:
URL: https://github.com/apache/hive/pull/1339#discussion_r463150602



##########
File path: ql/src/java/org/apache/hadoop/hive/ql/io/AcidInputFormat.java
##########
@@ -118,70 +123,183 @@
      */
     private long visibilityTxnId;
 
+    private List<DeltaFileMetaData> deltaFiles;
+
     public DeltaMetaData() {
-      this(0,0,new ArrayList<Integer>(), 0);
+      this(0, 0, new ArrayList<>(), 0, new ArrayList<>());
     }
     /**
+     * @param minWriteId min writeId of the delta directory
+     * @param maxWriteId max writeId of the delta directory
      * @param stmtIds delta dir suffixes when a single txn writes > 1 delta in the same partition
      * @param visibilityTxnId maybe 0, if the dir name didn't have it.  txnid:0 is always visible
+     * @param deltaFileStatuses bucketFiles in the directory
      */
-    DeltaMetaData(long minWriteId, long maxWriteId, List<Integer> stmtIds, long visibilityTxnId) {
+    DeltaMetaData(long minWriteId, long maxWriteId, List<Integer> stmtIds, long visibilityTxnId,
+        List<HadoopShims.HdfsFileStatusWithId> deltaFileStatuses) {
       this.minWriteId = minWriteId;
       this.maxWriteId = maxWriteId;
       if (stmtIds == null) {
         throw new IllegalArgumentException("stmtIds == null");
       }
       this.stmtIds = stmtIds;
       this.visibilityTxnId = visibilityTxnId;
+      this.deltaFiles = new ArrayList<>();
+      for(HadoopShims.HdfsFileStatusWithId fileStatus : deltaFileStatuses) {

Review comment:
       Nit: maybe a space after the for keyword? 




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: gitbox-unsubscribe@hive.apache.org
For additional commands, e-mail: gitbox-help@hive.apache.org


[GitHub] [hive] pvary commented on a change in pull request #1339: HIVE-23956: Delete delta fileIds should be pushed execution

Posted by GitBox <gi...@apache.org>.
pvary commented on a change in pull request #1339:
URL: https://github.com/apache/hive/pull/1339#discussion_r463159912



##########
File path: ql/src/java/org/apache/hadoop/hive/ql/io/AcidInputFormat.java
##########
@@ -118,70 +123,183 @@
      */
     private long visibilityTxnId;
 
+    private List<DeltaFileMetaData> deltaFiles;
+
     public DeltaMetaData() {
-      this(0,0,new ArrayList<Integer>(), 0);
+      this(0, 0, new ArrayList<>(), 0, new ArrayList<>());
     }
     /**
+     * @param minWriteId min writeId of the delta directory
+     * @param maxWriteId max writeId of the delta directory
      * @param stmtIds delta dir suffixes when a single txn writes > 1 delta in the same partition
      * @param visibilityTxnId maybe 0, if the dir name didn't have it.  txnid:0 is always visible
+     * @param deltaFileStatuses bucketFiles in the directory
      */
-    DeltaMetaData(long minWriteId, long maxWriteId, List<Integer> stmtIds, long visibilityTxnId) {
+    DeltaMetaData(long minWriteId, long maxWriteId, List<Integer> stmtIds, long visibilityTxnId,
+        List<HadoopShims.HdfsFileStatusWithId> deltaFileStatuses) {
       this.minWriteId = minWriteId;
       this.maxWriteId = maxWriteId;
       if (stmtIds == null) {
         throw new IllegalArgumentException("stmtIds == null");
       }
       this.stmtIds = stmtIds;
       this.visibilityTxnId = visibilityTxnId;
+      this.deltaFiles = new ArrayList<>();
+      for(HadoopShims.HdfsFileStatusWithId fileStatus : deltaFileStatuses) {
+        deltaFiles.add(new DeltaFileMetaData(fileStatus));
+      }
     }
+
     long getMinWriteId() {
       return minWriteId;
     }
+
     long getMaxWriteId() {
       return maxWriteId;
     }
+
     List<Integer> getStmtIds() {
       return stmtIds;
     }
+
     long getVisibilityTxnId() {
       return visibilityTxnId;
     }
+
+    public List<DeltaFileMetaData> getDeltaFiles() {
+      return deltaFiles;
+    }
+
     @Override
     public void write(DataOutput out) throws IOException {
       out.writeLong(minWriteId);
       out.writeLong(maxWriteId);
       out.writeInt(stmtIds.size());
-      for(Integer id : stmtIds) {
+      for (Integer id : stmtIds) {
         out.writeInt(id);
       }
       out.writeLong(visibilityTxnId);
+      out.writeInt(deltaFiles.size());
+      for (DeltaFileMetaData fileMeta : deltaFiles) {
+        fileMeta.write(out);
+      }
     }
+
     @Override
     public void readFields(DataInput in) throws IOException {
       minWriteId = in.readLong();
       maxWriteId = in.readLong();
       stmtIds.clear();
       int numStatements = in.readInt();
-      for(int i = 0; i < numStatements; i++) {
+      for (int i = 0; i < numStatements; i++) {
         stmtIds.add(in.readInt());
       }
       visibilityTxnId = in.readLong();
+
+      deltaFiles.clear();
+      int numFiles = in.readInt();
+      for(int i = 0; i< numFiles; i++) {
+        DeltaFileMetaData file = new DeltaFileMetaData();
+        file.readFields(in);
+        deltaFiles.add(file);
+      }
     }
-    String getName() {
+    private String getName() {
       assert stmtIds.isEmpty() : "use getName(int)";
       return AcidUtils.addVisibilitySuffix(AcidUtils
           .deleteDeltaSubdir(minWriteId, maxWriteId), visibilityTxnId);
     }
-    String getName(int stmtId) {
+    private String getName(int stmtId) {
       assert !stmtIds.isEmpty() : "use getName()";
       return AcidUtils.addVisibilitySuffix(AcidUtils
           .deleteDeltaSubdir(minWriteId, maxWriteId, stmtId), visibilityTxnId);
     }
+
+    public List<Path> getPaths(Path root) {
+      if (stmtIds.isEmpty()) {
+        return Collections.singletonList(new Path(root, getName()));
+      } else {
+        // To support multistatement transactions we may have multiple directories corresponding to one DeltaMetaData
+        return getStmtIds().stream().map(stmtId -> new Path(root, getName(stmtId))).collect(Collectors.toList());
+      }
+    }
     @Override
     public String toString() {
       return "Delta(?," + minWriteId + "," + maxWriteId + "," + stmtIds + "," + visibilityTxnId + ")";
     }
   }
+  final class DeltaFileMetaData implements Writable {
+    private static final int HAS_LONG_FILEID_FLAG = 1;
+    private static final int HAS_ATTEMPTID_FLAG = 2;
+
+    private long modTime;
+    private long length;
+    // Optional
+    private Integer attemptId;
+    // Optional
+    private Long fileId;
+
+    public DeltaFileMetaData() {
+    }
+
+    public DeltaFileMetaData(HadoopShims.HdfsFileStatusWithId fileStatus) {
+      modTime = fileStatus.getFileStatus().getModificationTime();
+      length = fileStatus.getFileStatus().getLen();
+      String attempt = AcidUtils.parseAttemptId(fileStatus.getFileStatus().getPath());
+      attemptId = StringUtils.isEmpty(attempt) ? null : Integer.parseInt(attempt);
+      fileId = fileStatus.getFileId();
+    }
+
+    public DeltaFileMetaData(long modTime, long length, @Nullable Integer attemptId, @Nullable Long fileId) {
+      this.modTime = modTime;
+      this.length = length;
+      this.attemptId = attemptId;
+      this.fileId = fileId;
+    }
+
+
+    @Override
+    public void write(DataOutput out) throws IOException {
+      int flags = (fileId != null ? HAS_LONG_FILEID_FLAG : 0) |
+          (attemptId != null ? HAS_ATTEMPTID_FLAG : 0);
+      out.writeByte(flags);
+      out.writeLong(modTime);
+      out.writeLong(length);
+      if (attemptId != null) {
+        out.writeInt(attemptId);
+      }
+      if (fileId != null) {
+        out.writeLong(fileId);
+      }
+    }
+
+    @Override
+    public void readFields(DataInput in) throws IOException {
+      byte flags = in.readByte();
+      boolean hasLongFileId = (HAS_LONG_FILEID_FLAG & flags) != 0,
+          hasAttemptId = (HAS_ATTEMPTID_FLAG & flags) != 0;
+      modTime = in.readLong();
+      length = in.readLong();
+      if (hasAttemptId) {
+        attemptId = in.readInt();
+      }
+      if (hasLongFileId) {
+        fileId = in.readLong();
+      }
+    }
+
+    public Object getFileId(Path deltaDirectory, int bucketId) {
+      if (fileId != null) {
+        return fileId;
+      }
+      // Calculate the synthetic fileid
+      Path realPath = getPath(deltaDirectory, bucketId);
+      return new SyntheticFileId(realPath, length, modTime);
+    }
+    public Path getPath(Path deltaDirectory, int bucketId) {

Review comment:
       Nit: newline 




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: gitbox-unsubscribe@hive.apache.org
For additional commands, e-mail: gitbox-help@hive.apache.org


[GitHub] [hive] pvary commented on a change in pull request #1339: HIVE-23956: Delete delta fileIds should be pushed execution

Posted by GitBox <gi...@apache.org>.
pvary commented on a change in pull request #1339:
URL: https://github.com/apache/hive/pull/1339#discussion_r463168246



##########
File path: ql/src/java/org/apache/hadoop/hive/ql/io/orc/VectorizedOrcAcidRowBatchReader.java
##########
@@ -1574,45 +1576,46 @@ public int compareTo(CompressedOwid other) {
       this.orcSplit = orcSplit;
 
       try {
-        final Path[] deleteDeltaDirs = getDeleteDeltaDirsFromSplit(orcSplit);
-        if (deleteDeltaDirs.length > 0) {
+        if (orcSplit.getDeltas().size() > 0) {
           AcidOutputFormat.Options orcSplitMinMaxWriteIds =
               AcidUtils.parseBaseOrDeltaBucketFilename(orcSplit.getPath(), conf);
           int totalDeleteEventCount = 0;
-          for (Path deleteDeltaDir : deleteDeltaDirs) {
-            if (!isQualifiedDeleteDeltaForSplit(orcSplitMinMaxWriteIds, deleteDeltaDir)) {
-              continue;
-            }
-            Path[] deleteDeltaFiles = OrcRawRecordMerger.getDeltaFiles(deleteDeltaDir, bucket,
-                new OrcRawRecordMerger.Options().isCompacting(false), null);
-            for (Path deleteDeltaFile : deleteDeltaFiles) {
-              try {
-                ReaderData readerData = getOrcTail(deleteDeltaFile, conf, cacheTag);
-                OrcTail orcTail = readerData.orcTail;
-                if (orcTail.getFooter().getNumberOfRows() <= 0) {
-                  continue; // just a safe check to ensure that we are not reading empty delete files.
-                }
-                OrcRawRecordMerger.KeyInterval deleteKeyInterval = findDeleteMinMaxKeys(orcTail, deleteDeltaFile);
-                if (!deleteKeyInterval.isIntersects(keyInterval)) {
-                  // If there is no intersection between data and delete delta, do not read delete file
-                  continue;
-                }
-                // Reader can be reused if it was created before for getting orcTail: mostly for non-LLAP cache cases.
-                // For LLAP cases we need to create it here.
-                Reader deleteDeltaReader = readerData.reader != null ? readerData.reader :
-                    OrcFile.createReader(deleteDeltaFile, OrcFile.readerOptions(conf));
-                totalDeleteEventCount += deleteDeltaReader.getNumberOfRows();
-                DeleteReaderValue deleteReaderValue = new DeleteReaderValue(deleteDeltaReader,
-                    deleteDeltaFile, readerOptions, bucket, validWriteIdList, isBucketedTable, conf,
-                    keyInterval, orcSplit);
-                DeleteRecordKey deleteRecordKey = new DeleteRecordKey();
-                if (deleteReaderValue.next(deleteRecordKey)) {
-                  sortMerger.put(deleteRecordKey, deleteReaderValue);
-                } else {
-                  deleteReaderValue.close();
+          for (AcidInputFormat.DeltaMetaData deltaMetaData : orcSplit.getDeltas()) {
+            for (Path deleteDeltaDir : deltaMetaData.getPaths(orcSplit.getRootDir())) {
+              if (!isQualifiedDeleteDeltaForSplit(orcSplitMinMaxWriteIds, deleteDeltaDir)) {

Review comment:
       isQualifiedDeleteDelta basically reparses the delta dir. Can we prevent this?




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: gitbox-unsubscribe@hive.apache.org
For additional commands, e-mail: gitbox-help@hive.apache.org