You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@gobblin.apache.org by wl...@apache.org on 2022/10/21 23:37:06 UTC
[gobblin] branch master updated: Search for dummy file in writer directory (#3589)
This is an automated email from the ASF dual-hosted git repository.
wlo pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/gobblin.git
The following commit(s) were added to refs/heads/master by this push:
new 13ed3d429 Search for dummy file in writer directory (#3589)
13ed3d429 is described below
commit 13ed3d42962003991507ad7e095b93b7d4c107da
Author: Jack Moseley <jm...@linkedin.com>
AuthorDate: Fri Oct 21 16:37:00 2022 -0700
Search for dummy file in writer directory (#3589)
---
.../iceberg/publisher/GobblinMCEPublisher.java | 43 ++++++++++++----------
.../iceberg/publisher/GobblinMCEPublisherTest.java | 3 ++
2 files changed, 27 insertions(+), 19 deletions(-)
diff --git a/gobblin-iceberg/src/main/java/org/apache/gobblin/iceberg/publisher/GobblinMCEPublisher.java b/gobblin-iceberg/src/main/java/org/apache/gobblin/iceberg/publisher/GobblinMCEPublisher.java
index 53611f8c1..47616529f 100644
--- a/gobblin-iceberg/src/main/java/org/apache/gobblin/iceberg/publisher/GobblinMCEPublisher.java
+++ b/gobblin-iceberg/src/main/java/org/apache/gobblin/iceberg/publisher/GobblinMCEPublisher.java
@@ -39,6 +39,8 @@ import org.apache.gobblin.publisher.DataPublisher;
import org.apache.gobblin.util.HadoopUtils;
import org.apache.gobblin.util.filters.HiddenFilter;
import org.apache.gobblin.writer.PartitionedDataWriter;
+import org.apache.gobblin.writer.partitioner.TimeBasedWriterPartitioner;
+
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
@@ -154,25 +156,28 @@ public class GobblinMCEPublisher extends DataPublisher {
private Map<Path, Metrics> computeDummyFile(State state) throws IOException {
Map<Path, Metrics> newFiles = new HashMap<>();
FileSystem fs = FileSystem.get(conf);
- for (final String pathString : state.getPropAsList(ConfigurationKeys.DATA_PUBLISHER_DATASET_DIR, "")) {
- Path path = new Path(pathString);
- //
- PriorityQueue<FileStatus> fileStatuses =
- new PriorityQueue<>((x, y) -> Long.compare(y.getModificationTime(), x.getModificationTime()));
- if (fs.exists(path)) {
- fileStatuses.add(fs.getFileStatus(path));
- }
- // Only register files
- while (!fileStatuses.isEmpty()) {
- FileStatus fileStatus = fileStatuses.poll();
- if (fileStatus.isDirectory()) {
- fileStatuses.addAll(Arrays.asList(fs.listStatus(fileStatus.getPath(), HIDDEN_FILES_FILTER)));
- } else {
- Path filePath = fileStatus.getPath();
- newFiles.put(filePath, null);
- // Only one concrete file from the path is needed
- return newFiles;
- }
+ if (!state.contains(ConfigurationKeys.DATA_PUBLISHER_DATASET_DIR)) {
+ return newFiles;
+ }
+ String baseDatasetString = state.getProp(ConfigurationKeys.DATA_PUBLISHER_DATASET_DIR);
+ Path searchPath = new Path(baseDatasetString);
+ if (state.contains(TimeBasedWriterPartitioner.WRITER_PARTITION_PREFIX)) {
+ searchPath = new Path(searchPath, state.getProp(TimeBasedWriterPartitioner.WRITER_PARTITION_PREFIX));
+ }
+ PriorityQueue<FileStatus> fileStatuses = new PriorityQueue<>((x, y) -> Long.compare(y.getModificationTime(), x.getModificationTime()));
+ if (fs.exists(searchPath)) {
+ fileStatuses.add(fs.getFileStatus(searchPath));
+ }
+ // Only register files
+ while (!fileStatuses.isEmpty()) {
+ FileStatus fileStatus = fileStatuses.poll();
+ if (fileStatus.isDirectory()) {
+ fileStatuses.addAll(Arrays.asList(fs.listStatus(fileStatus.getPath(), HIDDEN_FILES_FILTER)));
+ } else {
+ Path filePath = fileStatus.getPath();
+ newFiles.put(filePath, null);
+ // Only one concrete file from the path is needed
+ return newFiles;
}
}
return newFiles;
diff --git a/gobblin-iceberg/src/test/java/org/apache/gobblin/iceberg/publisher/GobblinMCEPublisherTest.java b/gobblin-iceberg/src/test/java/org/apache/gobblin/iceberg/publisher/GobblinMCEPublisherTest.java
index e4f034546..faa59dd80 100644
--- a/gobblin-iceberg/src/test/java/org/apache/gobblin/iceberg/publisher/GobblinMCEPublisherTest.java
+++ b/gobblin-iceberg/src/test/java/org/apache/gobblin/iceberg/publisher/GobblinMCEPublisherTest.java
@@ -53,6 +53,8 @@ import org.apache.gobblin.source.workunit.WorkUnit;
import org.apache.gobblin.writer.FsDataWriterBuilder;
import org.apache.gobblin.writer.GobblinOrcWriter;
import org.apache.gobblin.writer.PartitionedDataWriter;
+import org.apache.gobblin.writer.partitioner.TimeBasedWriterPartitioner;
+
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
@@ -280,6 +282,7 @@ public class GobblinMCEPublisherTest {
state.setProp(ConfigurationKeys.DATA_PUBLISHER_DATASET_DIR, datasetDir.toString());
state.setProp(AbstractJob.JOB_ID, "testFlow");
state.setProp(PartitionedDataWriter.WRITER_LATEST_SCHEMA, _avroPartitionSchema);
+ state.setProp(TimeBasedWriterPartitioner.WRITER_PARTITION_PREFIX, "hourly");
}
private void setGMCEPublisherStateForAvroFile(WorkUnitState state) {