You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@gobblin.apache.org by GitBox <gi...@apache.org> on 2022/10/21 20:51:30 UTC

[GitHub] [gobblin] jack-moseley commented on a diff in pull request #3589: [GOBBLIN-1732] Search for dummy file in writer directory

jack-moseley commented on code in PR #3589:
URL: https://github.com/apache/gobblin/pull/3589#discussion_r1002178271


##########
gobblin-iceberg/src/main/java/org/apache/gobblin/iceberg/publisher/GobblinMCEPublisher.java:
##########
@@ -154,25 +156,28 @@ private Map<Path, Metrics> computeFileMetrics(State state) throws IOException {
   private Map<Path, Metrics> computeDummyFile(State state) throws IOException {
     Map<Path, Metrics> newFiles = new HashMap<>();
     FileSystem fs = FileSystem.get(conf);
-    for (final String pathString : state.getPropAsList(ConfigurationKeys.DATA_PUBLISHER_DATASET_DIR, "")) {
-      Path path = new Path(pathString);
-      //
-      PriorityQueue<FileStatus> fileStatuses =
-          new PriorityQueue<>((x, y) -> Long.compare(y.getModificationTime(), x.getModificationTime()));
-      if (fs.exists(path)) {
-        fileStatuses.add(fs.getFileStatus(path));
-      }
-      // Only register files
-      while (!fileStatuses.isEmpty()) {
-        FileStatus fileStatus = fileStatuses.poll();
-        if (fileStatus.isDirectory()) {
-          fileStatuses.addAll(Arrays.asList(fs.listStatus(fileStatus.getPath(), HIDDEN_FILES_FILTER)));
-        } else {
-          Path filePath = fileStatus.getPath();
-          newFiles.put(filePath, null);
-          // Only one concrete file from the path is needed
-          return newFiles;
-        }
+    if (!state.contains(ConfigurationKeys.DATA_PUBLISHER_DATASET_DIR)) {
+      return newFiles;
+    }
+    String baseDatasetString = state.getProp(ConfigurationKeys.DATA_PUBLISHER_DATASET_DIR);
+    Path searchPath = new Path(baseDatasetString);
+    if (state.contains(TimeBasedWriterPartitioner.WRITER_PARTITION_PREFIX)) {
+      searchPath = new Path(searchPath, state.getProp(TimeBasedWriterPartitioner.WRITER_PARTITION_PREFIX));

Review Comment:
   For streaming yes this property is set like `writer.partition.prefix=hourly`



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: dev-unsubscribe@gobblin.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org