You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@gobblin.apache.org by wl...@apache.org on 2022/10/21 23:37:06 UTC

[gobblin] branch master updated: Search for dummy file in writer directory (#3589)

This is an automated email from the ASF dual-hosted git repository.

wlo pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/gobblin.git


The following commit(s) were added to refs/heads/master by this push:
     new 13ed3d429 Search for dummy file in writer directory (#3589)
13ed3d429 is described below

commit 13ed3d42962003991507ad7e095b93b7d4c107da
Author: Jack Moseley <jm...@linkedin.com>
AuthorDate: Fri Oct 21 16:37:00 2022 -0700

    Search for dummy file in writer directory (#3589)
---
 .../iceberg/publisher/GobblinMCEPublisher.java     | 43 ++++++++++++----------
 .../iceberg/publisher/GobblinMCEPublisherTest.java |  3 ++
 2 files changed, 27 insertions(+), 19 deletions(-)

diff --git a/gobblin-iceberg/src/main/java/org/apache/gobblin/iceberg/publisher/GobblinMCEPublisher.java b/gobblin-iceberg/src/main/java/org/apache/gobblin/iceberg/publisher/GobblinMCEPublisher.java
index 53611f8c1..47616529f 100644
--- a/gobblin-iceberg/src/main/java/org/apache/gobblin/iceberg/publisher/GobblinMCEPublisher.java
+++ b/gobblin-iceberg/src/main/java/org/apache/gobblin/iceberg/publisher/GobblinMCEPublisher.java
@@ -39,6 +39,8 @@ import org.apache.gobblin.publisher.DataPublisher;
 import org.apache.gobblin.util.HadoopUtils;
 import org.apache.gobblin.util.filters.HiddenFilter;
 import org.apache.gobblin.writer.PartitionedDataWriter;
+import org.apache.gobblin.writer.partitioner.TimeBasedWriterPartitioner;
+
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FileStatus;
 import org.apache.hadoop.fs.FileSystem;
@@ -154,25 +156,28 @@ public class GobblinMCEPublisher extends DataPublisher {
   private Map<Path, Metrics> computeDummyFile(State state) throws IOException {
     Map<Path, Metrics> newFiles = new HashMap<>();
     FileSystem fs = FileSystem.get(conf);
-    for (final String pathString : state.getPropAsList(ConfigurationKeys.DATA_PUBLISHER_DATASET_DIR, "")) {
-      Path path = new Path(pathString);
-      //
-      PriorityQueue<FileStatus> fileStatuses =
-          new PriorityQueue<>((x, y) -> Long.compare(y.getModificationTime(), x.getModificationTime()));
-      if (fs.exists(path)) {
-        fileStatuses.add(fs.getFileStatus(path));
-      }
-      // Only register files
-      while (!fileStatuses.isEmpty()) {
-        FileStatus fileStatus = fileStatuses.poll();
-        if (fileStatus.isDirectory()) {
-          fileStatuses.addAll(Arrays.asList(fs.listStatus(fileStatus.getPath(), HIDDEN_FILES_FILTER)));
-        } else {
-          Path filePath = fileStatus.getPath();
-          newFiles.put(filePath, null);
-          // Only one concrete file from the path is needed
-          return newFiles;
-        }
+    if (!state.contains(ConfigurationKeys.DATA_PUBLISHER_DATASET_DIR)) {
+      return newFiles;
+    }
+    String baseDatasetString = state.getProp(ConfigurationKeys.DATA_PUBLISHER_DATASET_DIR);
+    Path searchPath = new Path(baseDatasetString);
+    if (state.contains(TimeBasedWriterPartitioner.WRITER_PARTITION_PREFIX)) {
+      searchPath = new Path(searchPath, state.getProp(TimeBasedWriterPartitioner.WRITER_PARTITION_PREFIX));
+    }
+    PriorityQueue<FileStatus> fileStatuses = new PriorityQueue<>((x, y) -> Long.compare(y.getModificationTime(), x.getModificationTime()));
+    if (fs.exists(searchPath)) {
+      fileStatuses.add(fs.getFileStatus(searchPath));
+    }
+    // Only register files
+    while (!fileStatuses.isEmpty()) {
+      FileStatus fileStatus = fileStatuses.poll();
+      if (fileStatus.isDirectory()) {
+        fileStatuses.addAll(Arrays.asList(fs.listStatus(fileStatus.getPath(), HIDDEN_FILES_FILTER)));
+      } else {
+        Path filePath = fileStatus.getPath();
+        newFiles.put(filePath, null);
+        // Only one concrete file from the path is needed
+        return newFiles;
       }
     }
     return newFiles;
diff --git a/gobblin-iceberg/src/test/java/org/apache/gobblin/iceberg/publisher/GobblinMCEPublisherTest.java b/gobblin-iceberg/src/test/java/org/apache/gobblin/iceberg/publisher/GobblinMCEPublisherTest.java
index e4f034546..faa59dd80 100644
--- a/gobblin-iceberg/src/test/java/org/apache/gobblin/iceberg/publisher/GobblinMCEPublisherTest.java
+++ b/gobblin-iceberg/src/test/java/org/apache/gobblin/iceberg/publisher/GobblinMCEPublisherTest.java
@@ -53,6 +53,8 @@ import org.apache.gobblin.source.workunit.WorkUnit;
 import org.apache.gobblin.writer.FsDataWriterBuilder;
 import org.apache.gobblin.writer.GobblinOrcWriter;
 import org.apache.gobblin.writer.PartitionedDataWriter;
+import org.apache.gobblin.writer.partitioner.TimeBasedWriterPartitioner;
+
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
@@ -280,6 +282,7 @@ public class GobblinMCEPublisherTest {
     state.setProp(ConfigurationKeys.DATA_PUBLISHER_DATASET_DIR, datasetDir.toString());
     state.setProp(AbstractJob.JOB_ID, "testFlow");
     state.setProp(PartitionedDataWriter.WRITER_LATEST_SCHEMA, _avroPartitionSchema);
+    state.setProp(TimeBasedWriterPartitioner.WRITER_PARTITION_PREFIX, "hourly");
   }
 
   private void setGMCEPublisherStateForAvroFile(WorkUnitState state) {