You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@gobblin.apache.org by GitBox <gi...@apache.org> on 2022/10/21 20:42:49 UTC

[GitHub] [gobblin] vikrambohra commented on a diff in pull request #3589: [GOBBLIN-1732] Search for dummy file in writer directory

vikrambohra commented on code in PR #3589:
URL: https://github.com/apache/gobblin/pull/3589#discussion_r1002169405


##########
gobblin-iceberg/src/main/java/org/apache/gobblin/iceberg/publisher/GobblinMCEPublisher.java:
##########
@@ -154,25 +156,28 @@ private Map<Path, Metrics> computeFileMetrics(State state) throws IOException {
   private Map<Path, Metrics> computeDummyFile(State state) throws IOException {
     Map<Path, Metrics> newFiles = new HashMap<>();
     FileSystem fs = FileSystem.get(conf);
-    for (final String pathString : state.getPropAsList(ConfigurationKeys.DATA_PUBLISHER_DATASET_DIR, "")) {
-      Path path = new Path(pathString);
-      //
-      PriorityQueue<FileStatus> fileStatuses =
-          new PriorityQueue<>((x, y) -> Long.compare(y.getModificationTime(), x.getModificationTime()));
-      if (fs.exists(path)) {
-        fileStatuses.add(fs.getFileStatus(path));
-      }
-      // Only register files
-      while (!fileStatuses.isEmpty()) {
-        FileStatus fileStatus = fileStatuses.poll();
-        if (fileStatus.isDirectory()) {
-          fileStatuses.addAll(Arrays.asList(fs.listStatus(fileStatus.getPath(), HIDDEN_FILES_FILTER)));
-        } else {
-          Path filePath = fileStatus.getPath();
-          newFiles.put(filePath, null);
-          // Only one concrete file from the path is needed
-          return newFiles;
-        }
+    if (!state.contains(ConfigurationKeys.DATA_PUBLISHER_DATASET_DIR)) {
+      return newFiles;
+    }
+    String baseDatasetString = state.getProp(ConfigurationKeys.DATA_PUBLISHER_DATASET_DIR);

Review Comment:
   Is it a safe assumption for DATA_PUBLISHER_DATASET_DIR to be a path and not a comma separated string of paths as the current implementation assumes?



##########
gobblin-iceberg/src/main/java/org/apache/gobblin/iceberg/publisher/GobblinMCEPublisher.java:
##########
@@ -154,25 +156,28 @@ private Map<Path, Metrics> computeFileMetrics(State state) throws IOException {
   private Map<Path, Metrics> computeDummyFile(State state) throws IOException {
     Map<Path, Metrics> newFiles = new HashMap<>();
     FileSystem fs = FileSystem.get(conf);
-    for (final String pathString : state.getPropAsList(ConfigurationKeys.DATA_PUBLISHER_DATASET_DIR, "")) {
-      Path path = new Path(pathString);
-      //
-      PriorityQueue<FileStatus> fileStatuses =
-          new PriorityQueue<>((x, y) -> Long.compare(y.getModificationTime(), x.getModificationTime()));
-      if (fs.exists(path)) {
-        fileStatuses.add(fs.getFileStatus(path));
-      }
-      // Only register files
-      while (!fileStatuses.isEmpty()) {
-        FileStatus fileStatus = fileStatuses.poll();
-        if (fileStatus.isDirectory()) {
-          fileStatuses.addAll(Arrays.asList(fs.listStatus(fileStatus.getPath(), HIDDEN_FILES_FILTER)));
-        } else {
-          Path filePath = fileStatus.getPath();
-          newFiles.put(filePath, null);
-          // Only one concrete file from the path is needed
-          return newFiles;
-        }
+    if (!state.contains(ConfigurationKeys.DATA_PUBLISHER_DATASET_DIR)) {
+      return newFiles;
+    }
+    String baseDatasetString = state.getProp(ConfigurationKeys.DATA_PUBLISHER_DATASET_DIR);
+    Path searchPath = new Path(baseDatasetString);
+    if (state.contains(TimeBasedWriterPartitioner.WRITER_PARTITION_PREFIX)) {
+      searchPath = new Path(searchPath, state.getProp(TimeBasedWriterPartitioner.WRITER_PARTITION_PREFIX));

Review Comment:
   will the WRITER_PARTITION_PREFIX always be hourly?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: dev-unsubscribe@gobblin.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org