You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hudi.apache.org by vi...@apache.org on 2019/05/21 21:49:56 UTC
[incubator-hudi] branch master updated: Spark Stage retry handling
This is an automated email from the ASF dual-hosted git repository.
vinoth pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-hudi.git
The following commit(s) were added to refs/heads/master by this push:
new 145034c Spark Stage retry handling
145034c is described below
commit 145034c5faebefe51b6f1557f156bc19c787d671
Author: Balaji Varadarajan <va...@uber.com>
AuthorDate: Fri Mar 8 15:05:33 2019 -0800
Spark Stage retry handling
---
.../com/uber/hoodie/CompactionAdminClient.java | 7 +-
.../java/com/uber/hoodie/HoodieWriteClient.java | 57 ++++--
.../com/uber/hoodie/config/HoodieWriteConfig.java | 93 +++++----
.../uber/hoodie/func/BulkInsertMapFunction.java | 10 +-
.../hoodie/func/CopyOnWriteLazyInsertIterable.java | 32 ++-
.../hoodie/func/MergeOnReadLazyInsertIterable.java | 8 +-
.../java/com/uber/hoodie/io/ConsistencyCheck.java | 112 -----------
.../com/uber/hoodie/io/HoodieAppendHandle.java | 17 +-
.../com/uber/hoodie/io/HoodieCreateHandle.java | 26 +--
.../java/com/uber/hoodie/io/HoodieIOHandle.java | 85 +++++---
.../java/com/uber/hoodie/io/HoodieMergeHandle.java | 41 ++--
.../io/compact/HoodieRealtimeTableCompactor.java | 2 +-
.../hoodie/io/storage/HoodieParquetWriter.java | 1 +
.../uber/hoodie/table/HoodieCopyOnWriteTable.java | 125 +++---------
.../uber/hoodie/table/HoodieMergeOnReadTable.java | 11 +-
.../java/com/uber/hoodie/table/HoodieTable.java | 148 ++++++++++++--
.../src/test/java/com/uber/hoodie/TestCleaner.java | 34 ++--
.../java/com/uber/hoodie/TestConsistencyGuard.java | 108 +++++++++++
.../TestHoodieClientOnCopyOnWriteStorage.java | 100 +++++-----
.../java/com/uber/hoodie/TestHoodieReadClient.java | 3 +-
.../uber/hoodie/common/HoodieClientTestUtils.java | 4 +-
.../uber/hoodie/func/TestUpdateMapFunction.java | 115 +++++++----
.../com/uber/hoodie/io/TestConsistencyCheck.java | 91 ---------
.../uber/hoodie/io/TestHoodieCommitArchiveLog.java | 9 +-
.../uber/hoodie/table/TestCopyOnWriteTable.java | 125 ++++++------
.../io/storage/HoodieWrapperFileSystem.java | 67 +++++--
.../io/storage/SizeAwareFSDataOutputStream.java | 21 +-
.../com/uber/hoodie/common/model/FileSlice.java | 6 +-
.../uber/hoodie/common/model/HoodieLogFile.java | 48 ++++-
.../uber/hoodie/common/model/HoodieWriteStat.java | 5 +-
.../hoodie/common/table/HoodieTableMetaClient.java | 17 ++
.../hoodie/common/table/log/HoodieLogFormat.java | 50 ++++-
.../common/table/log/HoodieLogFormatWriter.java | 25 ++-
.../uber/hoodie/common/util/ConsistencyGuard.java | 89 +++++++++
.../java/com/uber/hoodie/common/util/FSUtils.java | 182 ++++++++++++-----
.../common/util/FailSafeConsistencyGuard.java | 200 +++++++++++++++++++
.../hoodie/common/util/NoOpConsistencyGuard.java | 46 +++++
.../com/uber/hoodie/common/util/ParquetUtils.java | 2 -
.../uber/hoodie/common/model/HoodieTestUtils.java | 55 ++++--
.../hoodie/common/model/TestHoodieWriteStat.java | 15 +-
.../common/table/log/HoodieLogFormatTest.java | 65 +++++++
.../table/view/HoodieTableFileSystemViewTest.java | 216 +++++++++++----------
.../table/view/IncrementalFSViewSyncTest.java | 6 +-
.../hoodie/common/util/CompactionTestUtils.java | 4 +-
.../hoodie/common/util/TestCompactionUtils.java | 18 +-
.../com/uber/hoodie/common/util/TestFSUtils.java | 69 +++++--
.../hadoop/realtime/HoodieRealtimeFileSplit.java | 10 +
.../uber/hoodie/hadoop/InputFormatTestUtil.java | 10 +-
.../realtime/HoodieRealtimeRecordReaderTest.java | 6 +-
.../test/java/com/uber/hoodie/hive/TestUtil.java | 3 +-
hoodie-timeline-service/pom.xml | 2 +-
.../utilities/schema/RowBasedSchemaProvider.java | 2 +-
.../hoodie/utilities/TestHoodieSnapshotCopier.java | 20 +-
53 files changed, 1660 insertions(+), 963 deletions(-)
diff --git a/hoodie-client/src/main/java/com/uber/hoodie/CompactionAdminClient.java b/hoodie-client/src/main/java/com/uber/hoodie/CompactionAdminClient.java
index 547a0a4..0333328 100644
--- a/hoodie-client/src/main/java/com/uber/hoodie/CompactionAdminClient.java
+++ b/hoodie-client/src/main/java/com/uber/hoodie/CompactionAdminClient.java
@@ -29,6 +29,7 @@ import com.uber.hoodie.common.model.HoodieFileGroupId;
import com.uber.hoodie.common.model.HoodieLogFile;
import com.uber.hoodie.common.table.HoodieTableMetaClient;
import com.uber.hoodie.common.table.HoodieTimeline;
+import com.uber.hoodie.common.table.log.HoodieLogFormat;
import com.uber.hoodie.common.table.timeline.HoodieInstant;
import com.uber.hoodie.common.table.timeline.HoodieInstant.State;
import com.uber.hoodie.common.table.view.HoodieTableFileSystemView;
@@ -245,7 +246,7 @@ public class CompactionAdminClient extends AbstractHoodieClient {
"Expect new log version to be sane");
HoodieLogFile newLogFile = new HoodieLogFile(new Path(lf.getPath().getParent(),
FSUtils.makeLogFileName(lf.getFileId(), "." + FSUtils.getFileExtensionFromLog(lf.getPath()),
- compactionInstant, lf.getLogVersion() - maxVersion)));
+ compactionInstant, lf.getLogVersion() - maxVersion, HoodieLogFormat.UNKNOWN_WRITE_TOKEN)));
return Pair.of(lf, newLogFile);
}).collect(Collectors.toList());
}
@@ -436,7 +437,7 @@ public class CompactionAdminClient extends AbstractHoodieClient {
.filter(fs -> fs.getFileId().equals(operation.getFileId())).findFirst().get();
List<HoodieLogFile> logFilesToRepair =
merged.getLogFiles().filter(lf -> lf.getBaseCommitTime().equals(compactionInstant))
- .sorted(HoodieLogFile.getBaseInstantAndLogVersionComparator().reversed())
+ .sorted(HoodieLogFile.getLogFileComparator())
.collect(Collectors.toList());
FileSlice fileSliceForCompaction =
fileSystemView.getLatestFileSlicesBeforeOrOn(operation.getPartitionPath(), operation.getBaseInstantTime())
@@ -451,7 +452,7 @@ public class CompactionAdminClient extends AbstractHoodieClient {
for (HoodieLogFile toRepair : logFilesToRepair) {
int version = maxUsedVersion + 1;
HoodieLogFile newLf = new HoodieLogFile(new Path(parentPath, FSUtils.makeLogFileName(operation.getFileId(),
- logExtn, operation.getBaseInstantTime(), version)));
+ logExtn, operation.getBaseInstantTime(), version, HoodieLogFormat.UNKNOWN_WRITE_TOKEN)));
result.add(Pair.of(toRepair, newLf));
maxUsedVersion = version;
}
diff --git a/hoodie-client/src/main/java/com/uber/hoodie/HoodieWriteClient.java b/hoodie-client/src/main/java/com/uber/hoodie/HoodieWriteClient.java
index 92bac7a..5a918e4 100644
--- a/hoodie-client/src/main/java/com/uber/hoodie/HoodieWriteClient.java
+++ b/hoodie-client/src/main/java/com/uber/hoodie/HoodieWriteClient.java
@@ -72,6 +72,7 @@ import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.stream.Collectors;
+import java.util.stream.IntStream;
import org.apache.log4j.LogManager;
import org.apache.log4j.Logger;
import org.apache.spark.Partitioner;
@@ -333,9 +334,10 @@ public class HoodieWriteClient<T extends HoodieRecordPayload> extends AbstractHo
String commitTime, HoodieTable<T> table,
Option<UserDefinedBulkInsertPartitioner> bulkInsertPartitioner) {
final JavaRDD<HoodieRecord<T>> repartitionedRecords;
+ final int parallelism = config.getBulkInsertShuffleParallelism();
if (bulkInsertPartitioner.isDefined()) {
repartitionedRecords = bulkInsertPartitioner.get()
- .repartitionRecords(dedupedRecords, config.getBulkInsertShuffleParallelism());
+ .repartitionRecords(dedupedRecords, parallelism);
} else {
// Now, sort the records and line them up nicely for loading.
repartitionedRecords = dedupedRecords.sortBy(record -> {
@@ -343,10 +345,16 @@ public class HoodieWriteClient<T extends HoodieRecordPayload> extends AbstractHo
// the records split evenly across RDD partitions, such that small partitions fit
// into 1 RDD partition, while big ones spread evenly across multiple RDD partitions
return String.format("%s+%s", record.getPartitionPath(), record.getRecordKey());
- }, true, config.getBulkInsertShuffleParallelism());
+ }, true, parallelism);
}
+
+ //generate new file ID prefixes for each output partition
+ final List<String> fileIDPrefixes = IntStream.range(0, parallelism)
+ .mapToObj(i -> FSUtils.createNewFileIdPfx())
+ .collect(Collectors.toList());
+
JavaRDD<WriteStatus> writeStatusRDD = repartitionedRecords
- .mapPartitionsWithIndex(new BulkInsertMapFunction<T>(commitTime, config, table), true)
+ .mapPartitionsWithIndex(new BulkInsertMapFunction<T>(commitTime, config, table, fileIDPrefixes), true)
.flatMap(writeStatuses -> writeStatuses.iterator());
return updateIndexAndCommitIfNeeded(writeStatusRDD, table, commitTime);
@@ -498,20 +506,7 @@ public class HoodieWriteClient<T extends HoodieRecordPayload> extends AbstractHo
updateMetadataAndRollingStats(actionType, metadata, stats);
// Finalize write
- final Timer.Context finalizeCtx = metrics.getFinalizeCtx();
- try {
- table.finalizeWrite(jsc, stats);
- if (finalizeCtx != null) {
- Optional<Long> durationInMs = Optional.of(metrics.getDurationInMs(finalizeCtx.stop()));
- durationInMs.ifPresent(duration -> {
- logger.info("Finalize write elapsed time (milliseconds): " + duration);
- metrics.updateFinalizeWriteMetrics(duration, stats.size());
- });
- }
- } catch (HoodieIOException ioe) {
- throw new HoodieCommitException(
- "Failed to complete commit " + commitTime + " due to finalize errors.", ioe);
- }
+ finalizeWrite(table, commitTime, stats);
// add in extra metadata
if (extraMetadata.isPresent()) {
@@ -1270,7 +1265,7 @@ public class HoodieWriteClient<T extends HoodieRecordPayload> extends AbstractHo
String compactionCommitTime, boolean autoCommit, Optional<Map<String, String>> extraMetadata) {
if (autoCommit) {
HoodieCommitMetadata metadata =
- doCompactionCommit(compactedStatuses, table.getMetaClient(), compactionCommitTime, extraMetadata);
+ doCompactionCommit(table, compactedStatuses, compactionCommitTime, extraMetadata);
if (compactionTimer != null) {
long durationInMs = metrics.getDurationInMs(compactionTimer.stop());
try {
@@ -1288,6 +1283,23 @@ public class HoodieWriteClient<T extends HoodieRecordPayload> extends AbstractHo
}
}
+ private void finalizeWrite(HoodieTable<T> table, String instantTime, List<HoodieWriteStat> stats) {
+ try {
+ final Timer.Context finalizeCtx = metrics.getFinalizeCtx();
+ table.finalizeWrite(jsc, instantTime, stats);
+ if (finalizeCtx != null) {
+ Optional<Long> durationInMs = Optional.of(metrics.getDurationInMs(finalizeCtx.stop()));
+ durationInMs.ifPresent(duration -> {
+ logger.info("Finalize write elapsed time (milliseconds): " + duration);
+ metrics.updateFinalizeWriteMetrics(duration, stats.size());
+ });
+ }
+ } catch (HoodieIOException ioe) {
+ throw new HoodieCommitException(
+ "Failed to complete commit " + instantTime + " due to finalize errors.", ioe);
+ }
+ }
+
/**
* Rollback failed compactions. Inflight rollbacks for compactions revert the .inflight file to the .requested file
*
@@ -1301,8 +1313,9 @@ public class HoodieWriteClient<T extends HoodieRecordPayload> extends AbstractHo
table.getActiveTimeline().revertCompactionInflightToRequested(inflightInstant);
}
- private HoodieCommitMetadata doCompactionCommit(JavaRDD<WriteStatus> writeStatuses,
- HoodieTableMetaClient metaClient, String compactionCommitTime, Optional<Map<String, String>> extraMetadata) {
+ private HoodieCommitMetadata doCompactionCommit(HoodieTable<T> table, JavaRDD<WriteStatus> writeStatuses,
+ String compactionCommitTime, Optional<Map<String, String>> extraMetadata) {
+ HoodieTableMetaClient metaClient = table.getMetaClient();
List<HoodieWriteStat> updateStatusMap = writeStatuses.map(WriteStatus::getStat)
.collect();
@@ -1311,6 +1324,10 @@ public class HoodieWriteClient<T extends HoodieRecordPayload> extends AbstractHo
metadata.addWriteStat(stat.getPartitionPath(), stat);
}
+ // Finalize write
+ List<HoodieWriteStat> stats = writeStatuses.map(WriteStatus::getStat).collect();
+ finalizeWrite(table, compactionCommitTime, stats);
+
// Copy extraMetadata
extraMetadata.ifPresent(m -> {
m.entrySet().stream().forEach(e -> {
diff --git a/hoodie-client/src/main/java/com/uber/hoodie/config/HoodieWriteConfig.java b/hoodie-client/src/main/java/com/uber/hoodie/config/HoodieWriteConfig.java
index e05a435..3c69fa9 100644
--- a/hoodie-client/src/main/java/com/uber/hoodie/config/HoodieWriteConfig.java
+++ b/hoodie-client/src/main/java/com/uber/hoodie/config/HoodieWriteConfig.java
@@ -62,19 +62,26 @@ public class HoodieWriteConfig extends DefaultHoodieConfig {
private static final String DEFAULT_ASSUME_DATE_PARTITIONING = "false";
private static final String HOODIE_WRITE_STATUS_CLASS_PROP = "hoodie.writestatus.class";
private static final String DEFAULT_HOODIE_WRITE_STATUS_CLASS = WriteStatus.class.getName();
- private static final String HOODIE_COPYONWRITE_USE_TEMP_FOLDER_CREATE =
- "hoodie.copyonwrite.use" + ".temp.folder.for.create";
- private static final String DEFAULT_HOODIE_COPYONWRITE_USE_TEMP_FOLDER_CREATE = "false";
- private static final String HOODIE_COPYONWRITE_USE_TEMP_FOLDER_MERGE =
- "hoodie.copyonwrite.use" + ".temp.folder.for.merge";
- private static final String DEFAULT_HOODIE_COPYONWRITE_USE_TEMP_FOLDER_MERGE = "false";
private static final String FINALIZE_WRITE_PARALLELISM = "hoodie.finalize.write.parallelism";
private static final String DEFAULT_FINALIZE_WRITE_PARALLELISM = DEFAULT_PARALLELISM;
- private static final String CONSISTENCY_CHECK_ENABLED = "hoodie.consistency.check.enabled";
+ private static final String CONSISTENCY_CHECK_ENABLED_PROP = "hoodie.consistency.check.enabled";
private static final String DEFAULT_CONSISTENCY_CHECK_ENABLED = "false";
private static final String EMBEDDED_TIMELINE_SERVER_ENABLED = "hoodie.embed.timeline.server";
private static final String DEFAULT_EMBEDDED_TIMELINE_SERVER_ENABLED = "false";
+ // time between successive attempts to ensure written data's metadata is consistent on storage
+ private static final String INITIAL_CONSISTENCY_CHECK_INTERVAL_MS_PROP =
+ "hoodie.consistency.check.initial_interval_ms";
+ private static long DEFAULT_INITIAL_CONSISTENCY_CHECK_INTERVAL_MS = 2000L;
+
+ // max interval time
+ private static final String MAX_CONSISTENCY_CHECK_INTERVAL_MS_PROP = "hoodie.consistency.check.max_interval_ms";
+ private static long DEFAULT_MAX_CONSISTENCY_CHECK_INTERVAL_MS = 300000L;
+
+ // maximum number of checks, for consistency of written data. Will wait upto 256 Secs
+ private static final String MAX_CONSISTENCY_CHECKS_PROP = "hoodie.consistency.check.max_checks";
+ private static int DEFAULT_MAX_CONSISTENCY_CHECKS = 7;
+
// Hoodie Write Client transparently rewrites File System View config when embedded mode is enabled
// We keep track of original config and rewritten config
private final FileSystemViewStorageConfig clientSpecifiedViewStorageConfig;
@@ -148,29 +155,28 @@ public class HoodieWriteConfig extends DefaultHoodieConfig {
return props.getProperty(HOODIE_WRITE_STATUS_CLASS_PROP);
}
- public boolean shouldUseTempFolderForCopyOnWriteForCreate() {
- return Boolean.parseBoolean(props.getProperty(HOODIE_COPYONWRITE_USE_TEMP_FOLDER_CREATE));
+ public int getFinalizeWriteParallelism() {
+ return Integer.parseInt(props.getProperty(FINALIZE_WRITE_PARALLELISM));
}
- public boolean shouldUseTempFolderForCopyOnWriteForMerge() {
- return Boolean.parseBoolean(props.getProperty(HOODIE_COPYONWRITE_USE_TEMP_FOLDER_MERGE));
+ public boolean isConsistencyCheckEnabled() {
+ return Boolean.parseBoolean(props.getProperty(CONSISTENCY_CHECK_ENABLED_PROP));
}
- public boolean shouldUseTempFolderForCopyOnWrite() {
- return shouldUseTempFolderForCopyOnWriteForCreate()
- || shouldUseTempFolderForCopyOnWriteForMerge();
+ public boolean isEmbeddedTimelineServerEnabled() {
+ return Boolean.parseBoolean(props.getProperty(EMBEDDED_TIMELINE_SERVER_ENABLED));
}
- public int getFinalizeWriteParallelism() {
- return Integer.parseInt(props.getProperty(FINALIZE_WRITE_PARALLELISM));
+ public int getMaxConsistencyChecks() {
+ return Integer.parseInt(props.getProperty(MAX_CONSISTENCY_CHECKS_PROP));
}
- public boolean isConsistencyCheckEnabled() {
- return Boolean.parseBoolean(props.getProperty(CONSISTENCY_CHECK_ENABLED));
+ public int getInitialConsistencyCheckIntervalMs() {
+ return Integer.parseInt(props.getProperty(INITIAL_CONSISTENCY_CHECK_INTERVAL_MS_PROP));
}
- public boolean isEmbeddedTimelineServerEnabled() {
- return Boolean.parseBoolean(props.getProperty(EMBEDDED_TIMELINE_SERVER_ENABLED));
+ public int getMaxConsistencyCheckIntervalMs() {
+ return Integer.parseInt(props.getProperty(MAX_CONSISTENCY_CHECK_INTERVAL_MS_PROP));
}
/**
@@ -588,20 +594,6 @@ public class HoodieWriteConfig extends DefaultHoodieConfig {
return this;
}
- public Builder withUseTempFolderCopyOnWriteForCreate(
- boolean shouldUseTempFolderCopyOnWriteForCreate) {
- props.setProperty(HOODIE_COPYONWRITE_USE_TEMP_FOLDER_CREATE,
- String.valueOf(shouldUseTempFolderCopyOnWriteForCreate));
- return this;
- }
-
- public Builder withUseTempFolderCopyOnWriteForMerge(
- boolean shouldUseTempFolderCopyOnWriteForMerge) {
- props.setProperty(HOODIE_COPYONWRITE_USE_TEMP_FOLDER_MERGE,
- String.valueOf(shouldUseTempFolderCopyOnWriteForMerge));
- return this;
- }
-
public Builder withFileSystemViewConfig(FileSystemViewStorageConfig viewStorageConfig) {
props.putAll(viewStorageConfig.getProps());
isViewConfigSet = true;
@@ -614,7 +606,7 @@ public class HoodieWriteConfig extends DefaultHoodieConfig {
}
public Builder withConsistencyCheckEnabled(boolean enabled) {
- props.setProperty(CONSISTENCY_CHECK_ENABLED, String.valueOf(enabled));
+ props.setProperty(CONSISTENCY_CHECK_ENABLED_PROP, String.valueOf(enabled));
return this;
}
@@ -623,6 +615,21 @@ public class HoodieWriteConfig extends DefaultHoodieConfig {
return this;
}
+ public Builder withInitialConsistencyCheckIntervalMs(int initialIntevalMs) {
+ props.setProperty(INITIAL_CONSISTENCY_CHECK_INTERVAL_MS_PROP, String.valueOf(initialIntevalMs));
+ return this;
+ }
+
+ public Builder withMaxConsistencyCheckIntervalMs(int maxIntervalMs) {
+ props.setProperty(MAX_CONSISTENCY_CHECK_INTERVAL_MS_PROP, String.valueOf(maxIntervalMs));
+ return this;
+ }
+
+ public Builder withMaxConsistencyChecks(int maxConsistencyChecks) {
+ props.setProperty(MAX_CONSISTENCY_CHECKS_PROP, String.valueOf(maxConsistencyChecks));
+ return this;
+ }
+
public HoodieWriteConfig build() {
// Check for mandatory properties
setDefaultOnCondition(props, !props.containsKey(INSERT_PARALLELISM), INSERT_PARALLELISM,
@@ -643,18 +650,18 @@ public class HoodieWriteConfig extends DefaultHoodieConfig {
HOODIE_ASSUME_DATE_PARTITIONING_PROP, DEFAULT_ASSUME_DATE_PARTITIONING);
setDefaultOnCondition(props, !props.containsKey(HOODIE_WRITE_STATUS_CLASS_PROP),
HOODIE_WRITE_STATUS_CLASS_PROP, DEFAULT_HOODIE_WRITE_STATUS_CLASS);
- setDefaultOnCondition(props, !props.containsKey(HOODIE_COPYONWRITE_USE_TEMP_FOLDER_CREATE),
- HOODIE_COPYONWRITE_USE_TEMP_FOLDER_CREATE,
- DEFAULT_HOODIE_COPYONWRITE_USE_TEMP_FOLDER_CREATE);
- setDefaultOnCondition(props, !props.containsKey(HOODIE_COPYONWRITE_USE_TEMP_FOLDER_MERGE),
- HOODIE_COPYONWRITE_USE_TEMP_FOLDER_MERGE,
- DEFAULT_HOODIE_COPYONWRITE_USE_TEMP_FOLDER_MERGE);
setDefaultOnCondition(props, !props.containsKey(FINALIZE_WRITE_PARALLELISM),
FINALIZE_WRITE_PARALLELISM, DEFAULT_FINALIZE_WRITE_PARALLELISM);
- setDefaultOnCondition(props, !props.containsKey(CONSISTENCY_CHECK_ENABLED),
- CONSISTENCY_CHECK_ENABLED, DEFAULT_CONSISTENCY_CHECK_ENABLED);
+ setDefaultOnCondition(props, !props.containsKey(CONSISTENCY_CHECK_ENABLED_PROP),
+ CONSISTENCY_CHECK_ENABLED_PROP, DEFAULT_CONSISTENCY_CHECK_ENABLED);
setDefaultOnCondition(props, !props.containsKey(EMBEDDED_TIMELINE_SERVER_ENABLED),
EMBEDDED_TIMELINE_SERVER_ENABLED, DEFAULT_EMBEDDED_TIMELINE_SERVER_ENABLED);
+ setDefaultOnCondition(props, !props.containsKey(INITIAL_CONSISTENCY_CHECK_INTERVAL_MS_PROP),
+ INITIAL_CONSISTENCY_CHECK_INTERVAL_MS_PROP, String.valueOf(DEFAULT_INITIAL_CONSISTENCY_CHECK_INTERVAL_MS));
+ setDefaultOnCondition(props, !props.containsKey(MAX_CONSISTENCY_CHECK_INTERVAL_MS_PROP),
+ MAX_CONSISTENCY_CHECK_INTERVAL_MS_PROP, String.valueOf(DEFAULT_MAX_CONSISTENCY_CHECK_INTERVAL_MS));
+ setDefaultOnCondition(props, !props.containsKey(MAX_CONSISTENCY_CHECKS_PROP),
+ MAX_CONSISTENCY_CHECKS_PROP, String.valueOf(DEFAULT_MAX_CONSISTENCY_CHECKS));
// Make sure the props is propagated
setDefaultOnCondition(props, !isIndexConfigSet,
diff --git a/hoodie-client/src/main/java/com/uber/hoodie/func/BulkInsertMapFunction.java b/hoodie-client/src/main/java/com/uber/hoodie/func/BulkInsertMapFunction.java
index e530e86..0f716f2 100644
--- a/hoodie-client/src/main/java/com/uber/hoodie/func/BulkInsertMapFunction.java
+++ b/hoodie-client/src/main/java/com/uber/hoodie/func/BulkInsertMapFunction.java
@@ -35,17 +35,19 @@ public class BulkInsertMapFunction<T extends HoodieRecordPayload> implements
private String commitTime;
private HoodieWriteConfig config;
private HoodieTable<T> hoodieTable;
+ private List<String> fileIDPrefixes;
public BulkInsertMapFunction(String commitTime, HoodieWriteConfig config,
- HoodieTable<T> hoodieTable) {
+ HoodieTable<T> hoodieTable, List<String> fileIDPrefixes) {
this.commitTime = commitTime;
this.config = config;
this.hoodieTable = hoodieTable;
+ this.fileIDPrefixes = fileIDPrefixes;
}
@Override
- public Iterator<List<WriteStatus>> call(Integer partition,
- Iterator<HoodieRecord<T>> sortedRecordItr) throws Exception {
- return new CopyOnWriteLazyInsertIterable<>(sortedRecordItr, config, commitTime, hoodieTable);
+ public Iterator<List<WriteStatus>> call(Integer partition, Iterator<HoodieRecord<T>> sortedRecordItr) {
+ return new CopyOnWriteLazyInsertIterable<>(sortedRecordItr, config, commitTime, hoodieTable,
+ fileIDPrefixes.get(partition));
}
}
diff --git a/hoodie-client/src/main/java/com/uber/hoodie/func/CopyOnWriteLazyInsertIterable.java b/hoodie-client/src/main/java/com/uber/hoodie/func/CopyOnWriteLazyInsertIterable.java
index 30b353b..72dd0bb 100644
--- a/hoodie-client/src/main/java/com/uber/hoodie/func/CopyOnWriteLazyInsertIterable.java
+++ b/hoodie-client/src/main/java/com/uber/hoodie/func/CopyOnWriteLazyInsertIterable.java
@@ -27,16 +27,12 @@ import com.uber.hoodie.io.HoodieCreateHandle;
import com.uber.hoodie.io.HoodieIOHandle;
import com.uber.hoodie.table.HoodieTable;
import java.util.ArrayList;
-import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Optional;
-import java.util.Set;
-import java.util.UUID;
import java.util.function.Function;
import org.apache.avro.Schema;
import org.apache.avro.generic.IndexedRecord;
-import org.apache.spark.TaskContext;
/**
* Lazy Iterable, that writes a stream of HoodieRecords sorted by the partitionPath, into new
@@ -48,15 +44,17 @@ public class CopyOnWriteLazyInsertIterable<T extends HoodieRecordPayload> extend
protected final HoodieWriteConfig hoodieConfig;
protected final String commitTime;
protected final HoodieTable<T> hoodieTable;
- protected Set<String> partitionsCleaned;
+ protected final String idPrefix;
+ protected int numFilesWritten;
public CopyOnWriteLazyInsertIterable(Iterator<HoodieRecord<T>> sortedRecordItr, HoodieWriteConfig config,
- String commitTime, HoodieTable<T> hoodieTable) {
+ String commitTime, HoodieTable<T> hoodieTable, String idPrefix) {
super(sortedRecordItr);
- this.partitionsCleaned = new HashSet<>();
this.hoodieConfig = config;
this.commitTime = commitTime;
this.hoodieTable = hoodieTable;
+ this.idPrefix = idPrefix;
+ this.numFilesWritten = 0;
}
// Used for caching HoodieRecord along with insertValue. We need this to offload computation work to buffering thread.
@@ -113,7 +111,10 @@ public class CopyOnWriteLazyInsertIterable<T extends HoodieRecordPayload> extend
@Override
protected void end() {
+ }
+ protected String getNextFileId(String idPfx) {
+ return String.format("%s-%d", idPfx, numFilesWritten++);
}
protected CopyOnWriteInsertHandler getInsertHandler() {
@@ -133,20 +134,11 @@ public class CopyOnWriteLazyInsertIterable<T extends HoodieRecordPayload> extend
@Override
protected void consumeOneRecord(HoodieInsertValueGenResult<HoodieRecord> payload) {
final HoodieRecord insertPayload = payload.record;
- // clean up any partial failures
- if (!partitionsCleaned.contains(insertPayload.getPartitionPath())) {
- // This insert task could fail multiple times, but Spark will faithfully retry with
- // the same data again. Thus, before we open any files under a given partition, we
- // first delete any files in the same partitionPath written by same Spark partition
- HoodieIOHandle.cleanupTmpFilesFromCurrentCommit(hoodieConfig, commitTime, insertPayload.getPartitionPath(),
- TaskContext.getPartitionId(), hoodieTable);
- partitionsCleaned.add(insertPayload.getPartitionPath());
- }
// lazily initialize the handle, for the first time
if (handle == null) {
- handle = new HoodieCreateHandle(hoodieConfig, commitTime, hoodieTable, insertPayload.getPartitionPath(), UUID
- .randomUUID().toString());
+ handle = new HoodieCreateHandle(hoodieConfig, commitTime, hoodieTable, insertPayload.getPartitionPath(),
+ getNextFileId(idPrefix));
}
if (handle.canWrite(payload.record)) {
@@ -156,8 +148,8 @@ public class CopyOnWriteLazyInsertIterable<T extends HoodieRecordPayload> extend
// handle is full.
statuses.add(handle.close());
// Need to handle the rejected payload & open new handle
- handle = new HoodieCreateHandle(hoodieConfig, commitTime, hoodieTable, insertPayload.getPartitionPath(), UUID
- .randomUUID().toString());
+ handle = new HoodieCreateHandle(hoodieConfig, commitTime, hoodieTable, insertPayload.getPartitionPath(),
+ getNextFileId(idPrefix));
handle.write(insertPayload, payload.insertValue, payload.exception); // we should be able to write 1 payload.
}
}
diff --git a/hoodie-client/src/main/java/com/uber/hoodie/func/MergeOnReadLazyInsertIterable.java b/hoodie-client/src/main/java/com/uber/hoodie/func/MergeOnReadLazyInsertIterable.java
index b4eea0f..1da4529 100644
--- a/hoodie-client/src/main/java/com/uber/hoodie/func/MergeOnReadLazyInsertIterable.java
+++ b/hoodie-client/src/main/java/com/uber/hoodie/func/MergeOnReadLazyInsertIterable.java
@@ -34,8 +34,8 @@ public class MergeOnReadLazyInsertIterable<T extends HoodieRecordPayload> extend
CopyOnWriteLazyInsertIterable<T> {
public MergeOnReadLazyInsertIterable(Iterator<HoodieRecord<T>> sortedRecordItr, HoodieWriteConfig config,
- String commitTime, HoodieTable<T> hoodieTable) {
- super(sortedRecordItr, config, commitTime, hoodieTable);
+ String commitTime, HoodieTable<T> hoodieTable, String idPfx) {
+ super(sortedRecordItr, config, commitTime, hoodieTable, idPfx);
}
@Override
@@ -51,7 +51,7 @@ public class MergeOnReadLazyInsertIterable<T extends HoodieRecordPayload> extend
List<WriteStatus> statuses = new ArrayList<>();
// lazily initialize the handle, for the first time
if (handle == null) {
- handle = new HoodieAppendHandle(hoodieConfig, commitTime, hoodieTable);
+ handle = new HoodieAppendHandle(hoodieConfig, commitTime, hoodieTable, getNextFileId(idPrefix));
}
if (handle.canWrite(insertPayload)) {
// write the payload, if the handle has capacity
@@ -61,7 +61,7 @@ public class MergeOnReadLazyInsertIterable<T extends HoodieRecordPayload> extend
handle.close();
statuses.add(handle.getWriteStatus());
// Need to handle the rejected payload & open new handle
- handle = new HoodieAppendHandle(hoodieConfig, commitTime, hoodieTable);
+ handle = new HoodieAppendHandle(hoodieConfig, commitTime, hoodieTable, getNextFileId(idPrefix));
handle.write(insertPayload, payload.insertValue, payload.exception); // we should be able to write 1 payload.
}
}
diff --git a/hoodie-client/src/main/java/com/uber/hoodie/io/ConsistencyCheck.java b/hoodie-client/src/main/java/com/uber/hoodie/io/ConsistencyCheck.java
deleted file mode 100644
index 0171e07..0000000
--- a/hoodie-client/src/main/java/com/uber/hoodie/io/ConsistencyCheck.java
+++ /dev/null
@@ -1,112 +0,0 @@
-/*
- * Copyright (c) 2018 Uber Technologies, Inc. (hoodie-dev-group@uber.com)
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package com.uber.hoodie.io;
-
-import com.google.common.annotations.VisibleForTesting;
-import com.uber.hoodie.common.SerializableConfiguration;
-import com.uber.hoodie.common.util.FSUtils;
-import java.io.Serializable;
-import java.util.ArrayList;
-import java.util.Arrays;
-import java.util.List;
-import java.util.Set;
-import java.util.stream.Collectors;
-import java.util.stream.StreamSupport;
-import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.fs.Path;
-import org.apache.log4j.LogManager;
-import org.apache.log4j.Logger;
-import org.apache.spark.api.java.JavaSparkContext;
-
-/**
- * Checks if all the written paths have their metadata consistent on storage and thus be listable to
- * queries. This is important for cloud, stores like AWS S3 which are eventually consistent with
- * their metadata. Without such checks, we may proceed to commit the written data, without the
- * written data being made available to queries. In cases like incremental pull this can lead to
- * downstream readers failing to ever see some data.
- */
-public class ConsistencyCheck implements Serializable {
-
- private static final transient Logger log = LogManager.getLogger(ConsistencyCheck.class);
-
- private String basePath;
-
- private List<String> relPaths;
-
- private transient JavaSparkContext jsc;
-
- private SerializableConfiguration hadoopConf;
-
- private int parallelism;
-
- public ConsistencyCheck(String basePath, List<String> relPaths, JavaSparkContext jsc,
- int parallelism) {
- this.basePath = basePath;
- this.relPaths = relPaths;
- this.jsc = jsc;
- this.hadoopConf = new SerializableConfiguration(jsc.hadoopConfiguration());
- this.parallelism = parallelism;
- }
-
- @VisibleForTesting
- void sleepSafe(long waitMs) {
- try {
- Thread.sleep(waitMs);
- } catch (InterruptedException e) {
- // ignore & continue next attempt
- }
- }
-
- /**
- * Repeatedly lists the filesystem on the paths, with exponential backoff and marks paths found as
- * passing the check.
- *
- * @return list of (relative) paths failing the check
- */
- public List<String> check(int maxAttempts, long initalDelayMs) {
- long waitMs = initalDelayMs;
- int attempt = 0;
-
- List<String> remainingPaths = new ArrayList<>(relPaths);
- while (attempt++ < maxAttempts) {
- remainingPaths = jsc.parallelize(remainingPaths, parallelism)
- .groupBy(p -> new Path(basePath, p).getParent()) // list by partition
- .map(pair -> {
- FileSystem fs = FSUtils.getFs(basePath, hadoopConf.get());
- // list the partition path and obtain all file paths present
- Set<String> fileNames = Arrays.stream(fs.listStatus(pair._1()))
- .map(s -> s.getPath().getName())
- .collect(Collectors.toSet());
-
- // only return paths that can't be found
- return StreamSupport.stream(pair._2().spliterator(), false)
- .filter(p -> !fileNames.contains(new Path(basePath, p).getName()))
- .collect(Collectors.toList());
- })
- .flatMap(List::iterator).collect();
- if (remainingPaths.size() == 0) {
- break; // we are done.
- }
-
- log.info("Consistency check, waiting for " + waitMs + " ms , after attempt :" + attempt);
- sleepSafe(waitMs);
- waitMs = waitMs * 2; // double check interval every attempt
- }
-
- return remainingPaths;
- }
-}
diff --git a/hoodie-client/src/main/java/com/uber/hoodie/io/HoodieAppendHandle.java b/hoodie-client/src/main/java/com/uber/hoodie/io/HoodieAppendHandle.java
index 1f5e95f..c33e5ee 100644
--- a/hoodie-client/src/main/java/com/uber/hoodie/io/HoodieAppendHandle.java
+++ b/hoodie-client/src/main/java/com/uber/hoodie/io/HoodieAppendHandle.java
@@ -32,6 +32,7 @@ import com.uber.hoodie.common.table.log.HoodieLogFormat.Writer;
import com.uber.hoodie.common.table.log.block.HoodieAvroDataBlock;
import com.uber.hoodie.common.table.log.block.HoodieDeleteBlock;
import com.uber.hoodie.common.table.log.block.HoodieLogBlock;
+import com.uber.hoodie.common.util.FSUtils;
import com.uber.hoodie.common.util.HoodieAvroUtils;
import com.uber.hoodie.common.util.Option;
import com.uber.hoodie.config.HoodieWriteConfig;
@@ -40,12 +41,10 @@ import com.uber.hoodie.exception.HoodieUpsertException;
import com.uber.hoodie.table.HoodieTable;
import java.io.IOException;
import java.util.ArrayList;
-import java.util.Comparator;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Optional;
-import java.util.UUID;
import java.util.concurrent.atomic.AtomicLong;
import org.apache.avro.generic.GenericRecord;
import org.apache.avro.generic.IndexedRecord;
@@ -96,14 +95,14 @@ public class HoodieAppendHandle<T extends HoodieRecordPayload> extends HoodieIOH
public HoodieAppendHandle(HoodieWriteConfig config, String commitTime, HoodieTable<T> hoodieTable,
String fileId, Iterator<HoodieRecord<T>> recordItr) {
- super(config, commitTime, hoodieTable);
+ super(config, commitTime, fileId, hoodieTable);
writeStatus.setStat(new HoodieDeltaWriteStat());
this.fileId = fileId;
this.recordItr = recordItr;
}
- public HoodieAppendHandle(HoodieWriteConfig config, String commitTime, HoodieTable<T> hoodieTable) {
- this(config, commitTime, hoodieTable, UUID.randomUUID().toString(), null);
+ public HoodieAppendHandle(HoodieWriteConfig config, String commitTime, HoodieTable<T> hoodieTable, String fileId) {
+ this(config, commitTime, hoodieTable, fileId, null);
}
private void init(HoodieRecord record) {
@@ -270,12 +269,16 @@ public class HoodieAppendHandle<T extends HoodieRecordPayload> extends HoodieIOH
private Writer createLogWriter(Option<FileSlice> fileSlice, String baseCommitTime)
throws IOException, InterruptedException {
+ Optional<HoodieLogFile> latestLogFile = fileSlice.get().getLatestLogFile();
+
return HoodieLogFormat.newWriterBuilder()
.onParentPath(new Path(hoodieTable.getMetaClient().getBasePath(), partitionPath))
.withFileId(fileId).overBaseCommit(baseCommitTime).withLogVersion(
- fileSlice.get().getLogFiles().map(logFile -> logFile.getLogVersion())
- .max(Comparator.naturalOrder()).orElse(HoodieLogFile.LOGFILE_BASE_VERSION))
+ latestLogFile.map(HoodieLogFile::getLogVersion).orElse(HoodieLogFile.LOGFILE_BASE_VERSION))
.withSizeThreshold(config.getLogFileMaxSize()).withFs(fs)
+ .withLogWriteToken(
+ latestLogFile.map(x -> FSUtils.getWriteTokenFromLogPath(x.getPath())).orElse(writeToken))
+ .withRolloverLogWriteToken(writeToken)
.withFileExtension(HoodieLogFile.DELTA_EXTENSION).build();
}
diff --git a/hoodie-client/src/main/java/com/uber/hoodie/io/HoodieCreateHandle.java b/hoodie-client/src/main/java/com/uber/hoodie/io/HoodieCreateHandle.java
index 734b7c0..f090291 100644
--- a/hoodie-client/src/main/java/com/uber/hoodie/io/HoodieCreateHandle.java
+++ b/hoodie-client/src/main/java/com/uber/hoodie/io/HoodieCreateHandle.java
@@ -45,7 +45,6 @@ public class HoodieCreateHandle<T extends HoodieRecordPayload> extends HoodieIOH
private final HoodieStorageWriter<IndexedRecord> storageWriter;
private final Path path;
- private Path tempPath = null;
private long recordsWritten = 0;
private long insertRecordsWritten = 0;
private long recordsDeleted = 0;
@@ -54,26 +53,22 @@ public class HoodieCreateHandle<T extends HoodieRecordPayload> extends HoodieIOH
public HoodieCreateHandle(HoodieWriteConfig config, String commitTime, HoodieTable<T> hoodieTable,
String partitionPath, String fileId) {
- super(config, commitTime, hoodieTable);
+ super(config, commitTime, fileId, hoodieTable);
writeStatus.setFileId(fileId);
writeStatus.setPartitionPath(partitionPath);
- final int sparkPartitionId = TaskContext.getPartitionId();
- this.path = makeNewPath(partitionPath, sparkPartitionId, writeStatus.getFileId());
- if (config.shouldUseTempFolderForCopyOnWriteForCreate()) {
- this.tempPath = makeTempPath(partitionPath, sparkPartitionId, writeStatus.getFileId(),
- TaskContext.get().stageId(), TaskContext.get().taskAttemptId());
- }
+ this.path = makeNewPath(partitionPath);
try {
HoodiePartitionMetadata partitionMetadata = new HoodiePartitionMetadata(fs, commitTime,
new Path(config.getBasePath()), FSUtils.getPartitionPath(config.getBasePath(), partitionPath));
partitionMetadata.trySave(TaskContext.getPartitionId());
+ createMarkerFile(partitionPath);
this.storageWriter = HoodieStorageWriterFactory
- .getStorageWriter(commitTime, getStorageWriterPath(), hoodieTable, config, writerSchema);
+ .getStorageWriter(commitTime, path, hoodieTable, config, writerSchema);
} catch (IOException e) {
throw new HoodieInsertException(
- "Failed to initialize HoodieStorageWriter for path " + getStorageWriterPath(), e);
+ "Failed to initialize HoodieStorageWriter for path " + path, e);
}
logger.info("New InsertHandle for partition :" + partitionPath + " with fileId " + fileId);
}
@@ -138,7 +133,7 @@ public class HoodieCreateHandle<T extends HoodieRecordPayload> extends HoodieIOH
}
} catch (IOException io) {
throw new HoodieInsertException(
- "Failed to insert records for path " + getStorageWriterPath(), io);
+ "Failed to insert records for path " + path, io);
}
}
@@ -165,8 +160,8 @@ public class HoodieCreateHandle<T extends HoodieRecordPayload> extends HoodieIOH
stat.setNumInserts(insertRecordsWritten);
stat.setPrevCommit(HoodieWriteStat.NULL_COMMIT);
stat.setFileId(writeStatus.getFileId());
- stat.setPaths(new Path(config.getBasePath()), path, tempPath);
- long fileSizeInBytes = FSUtils.getFileSize(fs, getStorageWriterPath());
+ stat.setPath(new Path(config.getBasePath()), path);
+ long fileSizeInBytes = FSUtils.getFileSize(fs, path);
stat.setTotalWriteBytes(fileSizeInBytes);
stat.setFileSizeInBytes(fileSizeInBytes);
stat.setTotalWriteErrors(writeStatus.getTotalErrorRecords());
@@ -180,9 +175,4 @@ public class HoodieCreateHandle<T extends HoodieRecordPayload> extends HoodieIOH
throw new HoodieInsertException("Failed to close the Insert Handle for path " + path, e);
}
}
-
- private Path getStorageWriterPath() {
- // Use tempPath for storage writer if possible
- return (this.tempPath == null) ? this.path : this.tempPath;
- }
}
diff --git a/hoodie-client/src/main/java/com/uber/hoodie/io/HoodieIOHandle.java b/hoodie-client/src/main/java/com/uber/hoodie/io/HoodieIOHandle.java
index abcd247..1602d72 100644
--- a/hoodie-client/src/main/java/com/uber/hoodie/io/HoodieIOHandle.java
+++ b/hoodie-client/src/main/java/com/uber/hoodie/io/HoodieIOHandle.java
@@ -17,14 +17,17 @@
package com.uber.hoodie.io;
import com.uber.hoodie.WriteStatus;
+import com.uber.hoodie.common.io.storage.HoodieWrapperFileSystem;
import com.uber.hoodie.common.model.HoodieRecord;
import com.uber.hoodie.common.model.HoodieRecordPayload;
-import com.uber.hoodie.common.table.HoodieTableMetaClient;
import com.uber.hoodie.common.util.FSUtils;
+import com.uber.hoodie.common.util.FailSafeConsistencyGuard;
import com.uber.hoodie.common.util.HoodieAvroUtils;
import com.uber.hoodie.common.util.HoodieTimer;
+import com.uber.hoodie.common.util.NoOpConsistencyGuard;
import com.uber.hoodie.common.util.ReflectionUtils;
import com.uber.hoodie.config.HoodieWriteConfig;
+import com.uber.hoodie.exception.HoodieException;
import com.uber.hoodie.exception.HoodieIOException;
import com.uber.hoodie.table.HoodieTable;
import java.io.IOException;
@@ -32,16 +35,19 @@ import java.util.Optional;
import org.apache.avro.Schema;
import org.apache.avro.generic.GenericRecord;
import org.apache.avro.generic.IndexedRecord;
-import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.log4j.LogManager;
import org.apache.log4j.Logger;
+import org.apache.spark.TaskContext;
+
public abstract class HoodieIOHandle<T extends HoodieRecordPayload> {
private static Logger logger = LogManager.getLogger(HoodieIOHandle.class);
protected final String commitTime;
+ protected final String fileId;
+ protected final String writeToken;
protected final HoodieWriteConfig config;
protected final FileSystem fs;
protected final HoodieTable<T> hoodieTable;
@@ -50,10 +56,13 @@ public abstract class HoodieIOHandle<T extends HoodieRecordPayload> {
protected HoodieTimer timer;
protected final WriteStatus writeStatus;
- public HoodieIOHandle(HoodieWriteConfig config, String commitTime, HoodieTable<T> hoodieTable) {
+ public HoodieIOHandle(HoodieWriteConfig config, String commitTime, String fileId,
+ HoodieTable<T> hoodieTable) {
this.commitTime = commitTime;
+ this.fileId = fileId;
+ this.writeToken = makeSparkWriteToken();
this.config = config;
- this.fs = hoodieTable.getMetaClient().getFs();
+ this.fs = getFileSystem(hoodieTable, config);
this.hoodieTable = hoodieTable;
this.originalSchema = new Schema.Parser().parse(config.getSchema());
this.writerSchema = createHoodieWriteSchema(originalSchema);
@@ -63,33 +72,26 @@ public abstract class HoodieIOHandle<T extends HoodieRecordPayload> {
config.getWriteStatusFailureFraction());
}
+ private static FileSystem getFileSystem(HoodieTable hoodieTable, HoodieWriteConfig config) {
+ return new HoodieWrapperFileSystem(hoodieTable.getMetaClient().getFs(), config.isConsistencyCheckEnabled()
+ ? new FailSafeConsistencyGuard(hoodieTable.getMetaClient().getFs(),
+ config.getMaxConsistencyChecks(), config.getInitialConsistencyCheckIntervalMs(),
+ config.getMaxConsistencyCheckIntervalMs()) : new NoOpConsistencyGuard());
+ }
+
/**
- * Deletes any new tmp files written during the current commit, into the partition
+ * Generate a write token based on the currently running spark task and its place in the spark dag.
*/
- public static void cleanupTmpFilesFromCurrentCommit(HoodieWriteConfig config, String commitTime,
- String partitionPath, int taskPartitionId, HoodieTable hoodieTable) {
- FileSystem fs = hoodieTable.getMetaClient().getFs();
- try {
- FileStatus[] prevFailedFiles = fs.globStatus(new Path(String
- .format("%s/%s/%s", config.getBasePath(), partitionPath,
- FSUtils.maskWithoutFileId(commitTime, taskPartitionId))));
- if (prevFailedFiles != null) {
- logger.info(
- "Deleting " + prevFailedFiles.length + " files generated by previous failed attempts.");
- for (FileStatus status : prevFailedFiles) {
- fs.delete(status.getPath(), false);
- }
- }
- } catch (IOException e) {
- throw new HoodieIOException("Failed to cleanup Temp files from commit " + commitTime, e);
- }
+ private static String makeSparkWriteToken() {
+ return FSUtils.makeWriteToken(TaskContext.getPartitionId(), TaskContext.get().stageId(),
+ TaskContext.get().taskAttemptId());
}
public static Schema createHoodieWriteSchema(Schema originalSchema) {
return HoodieAvroUtils.addMetadataFields(originalSchema);
}
- public Path makeNewPath(String partitionPath, int taskPartitionId, String fileName) {
+ public Path makeNewPath(String partitionPath) {
Path path = FSUtils.getPartitionPath(config.getBasePath(), partitionPath);
try {
fs.mkdirs(path); // create a new partition as needed.
@@ -97,16 +99,37 @@ public abstract class HoodieIOHandle<T extends HoodieRecordPayload> {
throw new HoodieIOException("Failed to make dir " + path, e);
}
- return new Path(path.toString(),
- FSUtils.makeDataFileName(commitTime, taskPartitionId, fileName));
+ return new Path(path.toString(), FSUtils.makeDataFileName(commitTime, writeToken, fileId));
}
- public Path makeTempPath(String partitionPath, int taskPartitionId, String fileName, int stageId,
- long taskAttemptId) {
- Path path = new Path(config.getBasePath(), HoodieTableMetaClient.TEMPFOLDER_NAME);
- return new Path(path.toString(),
- FSUtils.makeTempDataFileName(partitionPath, commitTime, taskPartitionId, fileName, stageId,
- taskAttemptId));
+ /**
+ * Creates an empty marker file corresponding to storage writer path
+ * @param partitionPath Partition path
+ */
+ protected void createMarkerFile(String partitionPath) {
+ Path markerPath = makeNewMarkerPath(partitionPath);
+ try {
+ logger.info("Creating Marker Path=" + markerPath);
+ fs.create(markerPath, false).close();
+ } catch (IOException e) {
+ throw new HoodieException("Failed to create marker file " + markerPath, e);
+ }
+ }
+
+ /**
+ * THe marker path will be <base-path>/.hoodie/.temp/<instant_ts>/2019/04/25/filename
+ * @param partitionPath
+ * @return
+ */
+ private Path makeNewMarkerPath(String partitionPath) {
+ Path markerRootPath = new Path(hoodieTable.getMetaClient().getMarkerFolderPath(commitTime));
+ Path path = FSUtils.getPartitionPath(markerRootPath, partitionPath);
+ try {
+ fs.mkdirs(path); // create a new partition as needed.
+ } catch (IOException e) {
+ throw new HoodieIOException("Failed to make dir " + path, e);
+ }
+ return new Path(path.toString(), FSUtils.makeMarkerFile(commitTime, writeToken, fileId));
}
public Schema getWriterSchema() {
diff --git a/hoodie-client/src/main/java/com/uber/hoodie/io/HoodieMergeHandle.java b/hoodie-client/src/main/java/com/uber/hoodie/io/HoodieMergeHandle.java
index ae98eb5..e77c156 100644
--- a/hoodie-client/src/main/java/com/uber/hoodie/io/HoodieMergeHandle.java
+++ b/hoodie-client/src/main/java/com/uber/hoodie/io/HoodieMergeHandle.java
@@ -57,7 +57,6 @@ public class HoodieMergeHandle<T extends HoodieRecordPayload> extends HoodieIOHa
private HoodieStorageWriter<IndexedRecord> storageWriter;
private Path newFilePath;
private Path oldFilePath;
- private Path tempPath = null;
private long recordsWritten = 0;
private long recordsDeleted = 0;
private long updatedRecordsWritten = 0;
@@ -66,7 +65,7 @@ public class HoodieMergeHandle<T extends HoodieRecordPayload> extends HoodieIOHa
public HoodieMergeHandle(HoodieWriteConfig config, String commitTime, HoodieTable<T> hoodieTable,
Iterator<HoodieRecord<T>> recordItr, String fileId) {
- super(config, commitTime, hoodieTable);
+ super(config, commitTime, fileId, hoodieTable);
String partitionPath = init(fileId, recordItr);
init(fileId, partitionPath,
hoodieTable.getROFileSystemView().getLatestDataFile(partitionPath, fileId).get());
@@ -77,7 +76,7 @@ public class HoodieMergeHandle<T extends HoodieRecordPayload> extends HoodieIOHa
*/
public HoodieMergeHandle(HoodieWriteConfig config, String commitTime, HoodieTable<T> hoodieTable,
Map<String, HoodieRecord<T>> keyToNewRecords, String fileId, HoodieDataFile dataFileToBeMerged) {
- super(config, commitTime, hoodieTable);
+ super(config, commitTime, fileId, hoodieTable);
this.keyToNewRecords = keyToNewRecords;
this.useWriterSchema = true;
init(fileId, keyToNewRecords.get(keyToNewRecords.keySet().stream().findFirst().get())
@@ -101,30 +100,25 @@ public class HoodieMergeHandle<T extends HoodieRecordPayload> extends HoodieIOHa
oldFilePath = new Path(
config.getBasePath() + "/" + partitionPath + "/" + latestValidFilePath);
String relativePath = new Path((partitionPath.isEmpty() ? "" : partitionPath + "/") + FSUtils
- .makeDataFileName(commitTime, TaskContext.getPartitionId(), fileId)).toString();
+ .makeDataFileName(commitTime, writeToken, fileId)).toString();
newFilePath = new Path(config.getBasePath(), relativePath);
- if (config.shouldUseTempFolderForCopyOnWriteForMerge()) {
- this.tempPath = makeTempPath(partitionPath, TaskContext.getPartitionId(), fileId,
- TaskContext.get().stageId(), TaskContext.get().taskAttemptId());
- }
-
- // handle cases of partial failures, for update task
- if (fs.exists(newFilePath)) {
- fs.delete(newFilePath, false);
- }
logger.info(String
.format("Merging new data into oldPath %s, as newPath %s", oldFilePath.toString(),
- getStorageWriterPath().toString()));
+ newFilePath.toString()));
// file name is same for all records, in this bunch
writeStatus.setFileId(fileId);
writeStatus.setPartitionPath(partitionPath);
writeStatus.getStat().setPartitionPath(partitionPath);
writeStatus.getStat().setFileId(fileId);
- writeStatus.getStat().setPaths(new Path(config.getBasePath()), newFilePath, tempPath);
+ writeStatus.getStat().setPath(new Path(config.getBasePath()), newFilePath);
+
+ // Create Marker file
+ createMarkerFile(partitionPath);
+
// Create the writer for writing the new version file
storageWriter = HoodieStorageWriterFactory
- .getStorageWriter(commitTime, getStorageWriterPath(), hoodieTable, config, writerSchema);
+ .getStorageWriter(commitTime, newFilePath, hoodieTable, config, writerSchema);
} catch (IOException io) {
logger.error("Error in update task at commit " + commitTime, io);
writeStatus.setGlobalError(io);
@@ -231,17 +225,17 @@ public class HoodieMergeHandle<T extends HoodieRecordPayload> extends HoodieIOHa
if (copyOldRecord) {
// this should work as it is, since this is an existing record
String errMsg = "Failed to merge old record into new file for key " + key + " from old file "
- + getOldFilePath() + " to new file " + getStorageWriterPath();
+ + getOldFilePath() + " to new file " + newFilePath;
try {
storageWriter.writeAvro(key, oldRecord);
} catch (ClassCastException e) {
logger.error("Schema mismatch when rewriting old record " + oldRecord + " from file "
- + getOldFilePath() + " to file " + getStorageWriterPath() + " with writerSchema " + writerSchema
+ + getOldFilePath() + " to file " + newFilePath + " with writerSchema " + writerSchema
.toString(true));
throw new HoodieUpsertException(errMsg, e);
} catch (IOException e) {
logger.error("Failed to merge old record into new file for key " + key + " from old file "
- + getOldFilePath() + " to new file " + getStorageWriterPath(), e);
+ + getOldFilePath() + " to new file " + newFilePath, e);
throw new HoodieUpsertException(errMsg, e);
}
recordsWritten++;
@@ -270,7 +264,7 @@ public class HoodieMergeHandle<T extends HoodieRecordPayload> extends HoodieIOHa
storageWriter.close();
}
- long fileSizeInBytes = FSUtils.getFileSize(fs, getStorageWriterPath());
+ long fileSizeInBytes = FSUtils.getFileSize(fs, newFilePath);
writeStatus.getStat().setTotalWriteBytes(fileSizeInBytes);
writeStatus.getStat().setFileSizeInBytes(fileSizeInBytes);
writeStatus.getStat().setNumWrites(recordsWritten);
@@ -291,13 +285,8 @@ public class HoodieMergeHandle<T extends HoodieRecordPayload> extends HoodieIOHa
return oldFilePath;
}
- private Path getStorageWriterPath() {
- // Use tempPath for storage writer if possible
- return (this.tempPath == null) ? this.newFilePath : this.tempPath;
- }
-
@Override
public WriteStatus getWriteStatus() {
return writeStatus;
}
-}
\ No newline at end of file
+}
diff --git a/hoodie-client/src/main/java/com/uber/hoodie/io/compact/HoodieRealtimeTableCompactor.java b/hoodie-client/src/main/java/com/uber/hoodie/io/compact/HoodieRealtimeTableCompactor.java
index 86aff63..024bd3a 100644
--- a/hoodie-client/src/main/java/com/uber/hoodie/io/compact/HoodieRealtimeTableCompactor.java
+++ b/hoodie-client/src/main/java/com/uber/hoodie/io/compact/HoodieRealtimeTableCompactor.java
@@ -201,7 +201,7 @@ public class HoodieRealtimeTableCompactor implements HoodieCompactor {
.map(
s -> {
List<HoodieLogFile> logFiles = s.getLogFiles().sorted(HoodieLogFile
- .getBaseInstantAndLogVersionComparator().reversed()).collect(Collectors.toList());
+ .getLogFileComparator()).collect(Collectors.toList());
totalLogFiles.add((long) logFiles.size());
totalFileSlices.add(1L);
// Avro generated classes are not inheriting Serializable. Using CompactionOperation POJO
diff --git a/hoodie-client/src/main/java/com/uber/hoodie/io/storage/HoodieParquetWriter.java b/hoodie-client/src/main/java/com/uber/hoodie/io/storage/HoodieParquetWriter.java
index bc8b491..45327e6 100644
--- a/hoodie-client/src/main/java/com/uber/hoodie/io/storage/HoodieParquetWriter.java
+++ b/hoodie-client/src/main/java/com/uber/hoodie/io/storage/HoodieParquetWriter.java
@@ -17,6 +17,7 @@
package com.uber.hoodie.io.storage;
import com.uber.hoodie.avro.HoodieAvroWriteSupport;
+import com.uber.hoodie.common.io.storage.HoodieWrapperFileSystem;
import com.uber.hoodie.common.model.HoodieRecord;
import com.uber.hoodie.common.model.HoodieRecordPayload;
import com.uber.hoodie.common.util.FSUtils;
diff --git a/hoodie-client/src/main/java/com/uber/hoodie/table/HoodieCopyOnWriteTable.java b/hoodie-client/src/main/java/com/uber/hoodie/table/HoodieCopyOnWriteTable.java
index e968e93..879e6e2 100644
--- a/hoodie-client/src/main/java/com/uber/hoodie/table/HoodieCopyOnWriteTable.java
+++ b/hoodie-client/src/main/java/com/uber/hoodie/table/HoodieCopyOnWriteTable.java
@@ -28,8 +28,6 @@ import com.uber.hoodie.common.model.HoodieRecord;
import com.uber.hoodie.common.model.HoodieRecordLocation;
import com.uber.hoodie.common.model.HoodieRecordPayload;
import com.uber.hoodie.common.model.HoodieRollingStatMetadata;
-import com.uber.hoodie.common.model.HoodieWriteStat;
-import com.uber.hoodie.common.table.HoodieTableMetaClient;
import com.uber.hoodie.common.table.HoodieTimeline;
import com.uber.hoodie.common.table.timeline.HoodieActiveTimeline;
import com.uber.hoodie.common.table.timeline.HoodieInstant;
@@ -52,7 +50,6 @@ import java.io.IOException;
import java.io.Serializable;
import java.nio.charset.StandardCharsets;
import java.util.ArrayList;
-import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
import java.util.Iterator;
@@ -234,14 +231,14 @@ public class HoodieCopyOnWriteTable<T extends HoodieRecordPayload> extends Hoodi
return new HoodieMergeHandle<>(config, commitTime, this, keyToNewRecords, fileId, dataFileToBeMerged);
}
- public Iterator<List<WriteStatus>> handleInsert(String commitTime,
+ public Iterator<List<WriteStatus>> handleInsert(String commitTime, String idPfx,
Iterator<HoodieRecord<T>> recordItr) throws Exception {
// This is needed since sometimes some buckets are never picked in getPartition() and end up with 0 records
if (!recordItr.hasNext()) {
logger.info("Empty partition");
return Collections.singletonList((List<WriteStatus>) Collections.EMPTY_LIST).iterator();
}
- return new CopyOnWriteLazyInsertIterable<>(recordItr, config, commitTime, this);
+ return new CopyOnWriteLazyInsertIterable<>(recordItr, config, commitTime, this, idPfx);
}
public Iterator<List<WriteStatus>> handleInsert(String commitTime, String partitionPath, String fileId,
@@ -261,9 +258,9 @@ public class HoodieCopyOnWriteTable<T extends HoodieRecordPayload> extends Hoodi
BucketType btype = binfo.bucketType;
try {
if (btype.equals(BucketType.INSERT)) {
- return handleInsert(commitTime, recordItr);
+ return handleInsert(commitTime, binfo.fileIdPrefix, recordItr);
} else if (btype.equals(BucketType.UPDATE)) {
- return handleUpdate(commitTime, binfo.fileLoc, recordItr);
+ return handleUpdate(commitTime, binfo.fileIdPrefix, recordItr);
} else {
throw new HoodieUpsertException(
"Unknown bucketType " + btype + " for partition :" + partition);
@@ -376,9 +373,6 @@ public class HoodieCopyOnWriteTable<T extends HoodieRecordPayload> extends Hoodi
.withDeletedFileResults(filesToDeletedStatus).build();
}).collect();
- // clean temporary data files
- cleanTemporaryDataFiles(jsc);
-
// Delete Inflight instant if enabled
deleteInflightInstant(deleteInstants, activeTimeline,
new HoodieInstant(true, actionType, commit));
@@ -391,96 +385,25 @@ public class HoodieCopyOnWriteTable<T extends HoodieRecordPayload> extends Hoodi
* @param activeTimeline Hoodie active timeline
* @param instantToBeDeleted Instant to be deleted
*/
- protected static void deleteInflightInstant(boolean deleteInstant, HoodieActiveTimeline activeTimeline,
+ protected void deleteInflightInstant(boolean deleteInstant, HoodieActiveTimeline activeTimeline,
HoodieInstant instantToBeDeleted) {
// Remove the rolled back inflight commits
if (deleteInstant) {
- activeTimeline.deleteInflight(instantToBeDeleted);
- logger.info("Deleted inflight commit " + instantToBeDeleted);
- } else {
- logger.warn("Rollback finished without deleting inflight instant file. Instant=" + instantToBeDeleted);
- }
- }
-
- /**
- * Finalize the written data files
- *
- * @param stats List of HoodieWriteStats
- * @return number of files finalized
- */
- @Override
- @SuppressWarnings("unchecked")
- public void finalizeWrite(JavaSparkContext jsc, List<HoodieWriteStat> stats)
- throws HoodieIOException {
-
- super.finalizeWrite(jsc, stats);
-
- if (config.shouldUseTempFolderForCopyOnWrite()) {
- // This is to rename each data file from temporary path to its final location
- jsc.parallelize(stats, config.getFinalizeWriteParallelism())
- .foreach(writeStat -> {
- final FileSystem fs = getMetaClient().getFs();
- final Path finalPath = new Path(config.getBasePath(), writeStat.getPath());
-
- if (writeStat.getTempPath() != null) {
- final Path tempPath = new Path(config.getBasePath(), writeStat.getTempPath());
- boolean success;
- try {
- logger.info("Renaming temporary file: " + tempPath + " to " + finalPath);
- success = fs.rename(tempPath, finalPath);
- } catch (IOException e) {
- throw new HoodieIOException(
- "Failed to rename file: " + tempPath + " to " + finalPath);
- }
-
- if (!success) {
- throw new HoodieIOException(
- "Failed to rename file: " + tempPath + " to " + finalPath);
- }
- }
- });
-
- // clean temporary data files
- cleanTemporaryDataFiles(jsc);
- }
- }
-
- /**
- * Clean temporary data files that are produced from previous failed commit or retried spark
- * stages.
- */
- private void cleanTemporaryDataFiles(JavaSparkContext jsc) {
- if (!config.shouldUseTempFolderForCopyOnWrite()) {
- return;
- }
-
- final FileSystem fs = getMetaClient().getFs();
- final Path temporaryFolder = new Path(config.getBasePath(),
- HoodieTableMetaClient.TEMPFOLDER_NAME);
- try {
- if (!fs.exists(temporaryFolder)) {
- logger.info("Temporary folder does not exist: " + temporaryFolder);
- return;
- }
- List<FileStatus> fileStatusesList = Arrays.asList(fs.listStatus(temporaryFolder));
- List<Tuple2<String, Boolean>> results = jsc
- .parallelize(fileStatusesList, config.getFinalizeWriteParallelism()).map(fileStatus -> {
- FileSystem fs1 = getMetaClient().getFs();
- boolean success = fs1.delete(fileStatus.getPath(), false);
- logger
- .info("Deleting file in temporary folder" + fileStatus.getPath() + "\t" + success);
- return new Tuple2<>(fileStatus.getPath().toString(), success);
- }).collect();
-
- for (Tuple2<String, Boolean> result : results) {
- if (!result._2()) {
- logger.info("Failed to delete file: " + result._1());
- throw new HoodieIOException("Failed to delete file in temporary folder: " + result._1());
+ try {
+ //TODO: Cleanup Hoodie 1.0 rollback to simply call super.cleanFailedWrites with consistency check disabled
+ // and empty WriteStat list.
+ Path markerDir = new Path(metaClient.getMarkerFolderPath(instantToBeDeleted.getTimestamp()));
+ logger.info("Removing marker directory=" + markerDir);
+ if (metaClient.getFs().exists(markerDir)) {
+ metaClient.getFs().delete(markerDir, true);
}
+ activeTimeline.deleteInflight(instantToBeDeleted);
+ logger.info("Deleted inflight commit " + instantToBeDeleted);
+ } catch (IOException e) {
+ throw new HoodieIOException(e.getMessage(), e);
}
- } catch (IOException e) {
- throw new HoodieIOException(
- "Failed to clean data files in temporary folder: " + temporaryFolder);
+ } else {
+ logger.warn("Rollback finished without deleting inflight instant file. Instant=" + instantToBeDeleted);
}
}
@@ -624,13 +547,13 @@ public class HoodieCopyOnWriteTable<T extends HoodieRecordPayload> extends Hoodi
class BucketInfo implements Serializable {
BucketType bucketType;
- String fileLoc;
+ String fileIdPrefix;
@Override
public String toString() {
final StringBuilder sb = new StringBuilder("BucketInfo {");
sb.append("bucketType=").append(bucketType).append(", ");
- sb.append("fileLoc=").append(fileLoc);
+ sb.append("fileIdPrefix=").append(fileIdPrefix);
sb.append('}');
return sb.toString();
}
@@ -697,12 +620,12 @@ public class HoodieCopyOnWriteTable<T extends HoodieRecordPayload> extends Hoodi
}
}
- private int addUpdateBucket(String fileLoc) {
+ private int addUpdateBucket(String fileIdHint) {
int bucket = totalBuckets;
- updateLocationToBucket.put(fileLoc, bucket);
+ updateLocationToBucket.put(fileIdHint, bucket);
BucketInfo bucketInfo = new BucketInfo();
bucketInfo.bucketType = BucketType.UPDATE;
- bucketInfo.fileLoc = fileLoc;
+ bucketInfo.fileIdPrefix = fileIdHint;
bucketInfoMap.put(totalBuckets, bucketInfo);
totalBuckets++;
return bucket;
@@ -764,6 +687,7 @@ public class HoodieCopyOnWriteTable<T extends HoodieRecordPayload> extends Hoodi
recordsPerBucket.add(totalUnassignedInserts / insertBuckets);
BucketInfo bucketInfo = new BucketInfo();
bucketInfo.bucketType = BucketType.INSERT;
+ bucketInfo.fileIdPrefix = FSUtils.createNewFileIdPfx();
bucketInfoMap.put(totalBuckets, bucketInfo);
totalBuckets++;
}
@@ -784,7 +708,6 @@ public class HoodieCopyOnWriteTable<T extends HoodieRecordPayload> extends Hoodi
}
}
-
/**
* Returns a list of small files in the given partition path
*/
diff --git a/hoodie-client/src/main/java/com/uber/hoodie/table/HoodieMergeOnReadTable.java b/hoodie-client/src/main/java/com/uber/hoodie/table/HoodieMergeOnReadTable.java
index 402fc8c..98248b9 100644
--- a/hoodie-client/src/main/java/com/uber/hoodie/table/HoodieMergeOnReadTable.java
+++ b/hoodie-client/src/main/java/com/uber/hoodie/table/HoodieMergeOnReadTable.java
@@ -121,13 +121,13 @@ public class HoodieMergeOnReadTable<T extends HoodieRecordPayload> extends
}
@Override
- public Iterator<List<WriteStatus>> handleInsert(String commitTime,
+ public Iterator<List<WriteStatus>> handleInsert(String commitTime, String idPfx,
Iterator<HoodieRecord<T>> recordItr) throws Exception {
// If canIndexLogFiles, write inserts to log files else write inserts to parquet files
if (index.canIndexLogFiles()) {
- return new MergeOnReadLazyInsertIterable<>(recordItr, config, commitTime, this);
+ return new MergeOnReadLazyInsertIterable<>(recordItr, config, commitTime, this, idPfx);
} else {
- return super.handleInsert(commitTime, recordItr);
+ return super.handleInsert(commitTime, idPfx, recordItr);
}
}
@@ -325,10 +325,10 @@ public class HoodieMergeOnReadTable<T extends HoodieRecordPayload> extends
}
@Override
- public void finalizeWrite(JavaSparkContext jsc, List<HoodieWriteStat> stats)
+ public void finalizeWrite(JavaSparkContext jsc, String instantTs, List<HoodieWriteStat> stats)
throws HoodieIOException {
// delegate to base class for MOR tables
- super.finalizeWrite(jsc, stats);
+ super.finalizeWrite(jsc, instantTs, stats);
}
@Override
@@ -362,6 +362,7 @@ public class HoodieMergeOnReadTable<T extends HoodieRecordPayload> extends
super(profile);
}
+ @Override
protected List<SmallFile> getSmallFiles(String partitionPath) {
// smallFiles only for partitionPath
diff --git a/hoodie-client/src/main/java/com/uber/hoodie/table/HoodieTable.java b/hoodie-client/src/main/java/com/uber/hoodie/table/HoodieTable.java
index 6f7dd30..bff61c1 100644
--- a/hoodie-client/src/main/java/com/uber/hoodie/table/HoodieTable.java
+++ b/hoodie-client/src/main/java/com/uber/hoodie/table/HoodieTable.java
@@ -34,19 +34,30 @@ import com.uber.hoodie.common.table.timeline.HoodieInstant;
import com.uber.hoodie.common.table.view.FileSystemViewManager;
import com.uber.hoodie.common.table.view.HoodieTableFileSystemView;
import com.uber.hoodie.common.util.AvroUtils;
+import com.uber.hoodie.common.util.ConsistencyGuard;
+import com.uber.hoodie.common.util.ConsistencyGuard.FileVisibility;
+import com.uber.hoodie.common.util.FSUtils;
+import com.uber.hoodie.common.util.FailSafeConsistencyGuard;
+import com.uber.hoodie.common.util.collection.Pair;
import com.uber.hoodie.config.HoodieWriteConfig;
import com.uber.hoodie.exception.HoodieException;
import com.uber.hoodie.exception.HoodieIOException;
import com.uber.hoodie.exception.HoodieSavepointException;
import com.uber.hoodie.index.HoodieIndex;
-import com.uber.hoodie.io.ConsistencyCheck;
import java.io.IOException;
import java.io.Serializable;
+import java.util.ArrayList;
import java.util.Iterator;
import java.util.List;
+import java.util.Map;
+import java.util.concurrent.TimeoutException;
import java.util.stream.Collectors;
import java.util.stream.Stream;
import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.log4j.LogManager;
+import org.apache.log4j.Logger;
import org.apache.spark.Partitioner;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
@@ -56,10 +67,7 @@ import org.apache.spark.api.java.JavaSparkContext;
*/
public abstract class HoodieTable<T extends HoodieRecordPayload> implements Serializable {
- // time between successive attempts to ensure written data's metadata is consistent on storage
- private static long INITIAL_CONSISTENCY_CHECK_INTERVAL_MS = 2000L;
- // maximum number of checks, for consistency of written data. Will wait upto 256 Secs
- private static int MAX_CONSISTENCY_CHECKS = 7;
+ private static Logger logger = LogManager.getLogger(HoodieTable.class);
protected final HoodieWriteConfig config;
protected final HoodieTableMetaClient metaClient;
@@ -279,20 +287,126 @@ public abstract class HoodieTable<T extends HoodieRecordPayload> implements Seri
* @param stats List of HoodieWriteStats
* @throws HoodieIOException if some paths can't be finalized on storage
*/
- public void finalizeWrite(JavaSparkContext jsc, List<HoodieWriteStat> stats)
+ public void finalizeWrite(JavaSparkContext jsc, String instantTs, List<HoodieWriteStat> stats)
throws HoodieIOException {
- if (config.isConsistencyCheckEnabled()) {
- List<String> pathsToCheck = stats.stream()
- .map(stat -> stat.getTempPath() != null
- ? stat.getTempPath() : stat.getPath())
- .collect(Collectors.toList());
-
- List<String> failingPaths = new ConsistencyCheck(config.getBasePath(), pathsToCheck, jsc,
- config.getFinalizeWriteParallelism())
- .check(MAX_CONSISTENCY_CHECKS, INITIAL_CONSISTENCY_CHECK_INTERVAL_MS);
- if (failingPaths.size() > 0) {
- throw new HoodieIOException("Could not verify consistency of paths : " + failingPaths);
+ cleanFailedWrites(jsc, instantTs, stats, config.isConsistencyCheckEnabled());
+ }
+
+ /**
+ * Reconciles WriteStats and marker files to detect and safely delete duplicate data files created because of Spark
+ * retries.
+ *
+ * @param jsc Spark Context
+ * @param instantTs Instant Timestamp
+ * @param stats Hoodie Write Stat
+ * @param consistencyCheckEnabled Consistency Check Enabled
+ * @throws HoodieIOException
+ */
+ protected void cleanFailedWrites(JavaSparkContext jsc, String instantTs, List<HoodieWriteStat> stats,
+ boolean consistencyCheckEnabled) throws HoodieIOException {
+ try {
+ // Reconcile marker and data files with WriteStats so that partially written data-files due to failed
+ // (but succeeded on retry) tasks are removed.
+ String basePath = getMetaClient().getBasePath();
+ FileSystem fs = getMetaClient().getFs();
+ Path markerDir = new Path(metaClient.getMarkerFolderPath(instantTs));
+
+ if (!fs.exists(markerDir)) {
+ // Happens when all writes are appends
+ return;
+ }
+
+ List<String> invalidDataPaths = FSUtils.getAllDataFilesForMarkers(fs, basePath, instantTs, markerDir.toString());
+ List<String> validDataPaths = stats.stream().map(w -> String.format("%s/%s", basePath, w.getPath()))
+ .filter(p -> p.endsWith(".parquet")).collect(Collectors.toList());
+ // Contains list of partially created files. These needs to be cleaned up.
+ invalidDataPaths.removeAll(validDataPaths);
+ logger.warn("InValid data paths=" + invalidDataPaths);
+
+ Map<String, List<Pair<String, String>>> groupByPartition = invalidDataPaths.stream()
+ .map(dp -> Pair.of(new Path(dp).getParent().toString(), dp))
+ .collect(Collectors.groupingBy(Pair::getKey));
+
+ if (!groupByPartition.isEmpty()) {
+ // Ensure all files in delete list is actually present. This is mandatory for an eventually consistent FS.
+ // Otherwise, we may miss deleting such files. If files are not found even after retries, fail the commit
+ if (consistencyCheckEnabled) {
+ // This will either ensure all files to be deleted are present.
+ waitForAllFiles(jsc, groupByPartition, FileVisibility.APPEAR);
+ }
+
+ // Now delete partially written files
+ jsc.parallelize(new ArrayList<>(groupByPartition.values()), config.getFinalizeWriteParallelism())
+ .map(partitionWithFileList -> {
+ final FileSystem fileSystem = metaClient.getFs();
+ logger.info("Deleting invalid data files=" + partitionWithFileList);
+ if (partitionWithFileList.isEmpty()) {
+ return true;
+ }
+ // Delete
+ partitionWithFileList.stream().map(Pair::getValue).forEach(file -> {
+ try {
+ fileSystem.delete(new Path(file), false);
+ } catch (IOException e) {
+ throw new HoodieIOException(e.getMessage(), e);
+ }
+ });
+
+ return true;
+ }).collect();
+
+ // Now ensure the deleted files disappear
+ if (consistencyCheckEnabled) {
+ // This will either ensure all files to be deleted are absent.
+ waitForAllFiles(jsc, groupByPartition, FileVisibility.DISAPPEAR);
+ }
}
+ // Now delete the marker directory
+ if (fs.exists(markerDir)) {
+ // For append only case, we do not write to marker dir. Hence, the above check
+ logger.info("Removing marker directory=" + markerDir);
+ fs.delete(markerDir, true);
+ }
+ } catch (IOException ioe) {
+ throw new HoodieIOException(ioe.getMessage(), ioe);
+ }
+ }
+
+ /**
+ * Ensures all files passed either appear or disappear
+ * @param jsc JavaSparkContext
+ * @param groupByPartition Files grouped by partition
+ * @param visibility Appear/Disappear
+ */
+ private void waitForAllFiles(JavaSparkContext jsc, Map<String, List<Pair<String, String>>> groupByPartition,
+ FileVisibility visibility) {
+ // This will either ensure all files to be deleted are present.
+ boolean checkPassed =
+ jsc.parallelize(new ArrayList<>(groupByPartition.entrySet()), config.getFinalizeWriteParallelism())
+ .map(partitionWithFileList -> waitForCondition(partitionWithFileList.getKey(),
+ partitionWithFileList.getValue().stream(), visibility))
+ .collect().stream().allMatch(x -> x);
+ if (!checkPassed) {
+ throw new HoodieIOException("Consistency check failed to ensure all files " + visibility);
+ }
+ }
+
+ private boolean waitForCondition(String partitionPath, Stream<Pair<String, String>> partitionFilePaths,
+ FileVisibility visibility) {
+ final FileSystem fileSystem = metaClient.getFs();
+ List<String> fileList = partitionFilePaths.map(Pair::getValue).collect(Collectors.toList());
+ try {
+ getFailSafeConsistencyGuard(fileSystem).waitTill(partitionPath, fileList, visibility);
+ } catch (IOException | TimeoutException ioe) {
+ logger.error("Got exception while waiting for files to show up", ioe);
+ return false;
}
+ return true;
+ }
+
+ private ConsistencyGuard getFailSafeConsistencyGuard(FileSystem fileSystem) {
+ return new FailSafeConsistencyGuard(fileSystem, config.getMaxConsistencyChecks(),
+ config.getInitialConsistencyCheckIntervalMs(),
+ config.getMaxConsistencyCheckIntervalMs());
}
}
diff --git a/hoodie-client/src/test/java/com/uber/hoodie/TestCleaner.java b/hoodie-client/src/test/java/com/uber/hoodie/TestCleaner.java
index 987a1d2..3d60c3e 100644
--- a/hoodie-client/src/test/java/com/uber/hoodie/TestCleaner.java
+++ b/hoodie-client/src/test/java/com/uber/hoodie/TestCleaner.java
@@ -66,6 +66,7 @@ import java.util.function.Predicate;
import java.util.stream.Collectors;
import java.util.stream.Stream;
import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.RemoteIterator;
import org.apache.log4j.LogManager;
import org.apache.log4j.Logger;
import org.apache.spark.api.java.JavaRDD;
@@ -620,18 +621,17 @@ public class TestCleaner extends TestHoodieClientBase {
* Test Cleaning functionality of table.rollback() API.
*/
@Test
- public void testCleanTemporaryDataFilesOnRollback() throws IOException {
+ public void testCleanMarkerDataFilesOnRollback() throws IOException {
HoodieTestUtils.createCommitFiles(basePath, "000");
- List<String> tempFiles = createTempFiles("000", 10);
- assertEquals("Some temp files are created.", 10, tempFiles.size());
- assertEquals("Some temp files are created.", tempFiles.size(), getTotalTempFiles());
-
- HoodieWriteConfig config = HoodieWriteConfig.newBuilder().withPath(basePath)
- .withUseTempFolderCopyOnWriteForCreate(true)
- .withUseTempFolderCopyOnWriteForMerge(false).build();
- HoodieTable table = HoodieTable.getHoodieTable(new HoodieTableMetaClient(jsc.hadoopConfiguration(), config
- .getBasePath(), true),
- config, jsc);
+ List<String> markerFiles = createMarkerFiles("000", 10);
+ assertEquals("Some marker files are created.", 10, markerFiles.size());
+ assertEquals("Some marker files are created.", markerFiles.size(), getTotalTempFiles());
+
+ HoodieWriteConfig config = HoodieWriteConfig.newBuilder().withPath(basePath).build();
+ HoodieTable table = HoodieTable.getHoodieTable(
+ new HoodieTableMetaClient(jsc.hadoopConfiguration(), config.getBasePath(), true), config,
+ jsc);
+
table.rollback(jsc, "000", true);
assertEquals("All temp files are deleted.", 0, getTotalTempFiles());
}
@@ -901,10 +901,10 @@ public class TestCleaner extends TestHoodieClientBase {
* @return generated files
* @throws IOException in case of error
*/
- private List<String> createTempFiles(String commitTime, int numFiles) throws IOException {
+ private List<String> createMarkerFiles(String commitTime, int numFiles) throws IOException {
List<String> files = new ArrayList<>();
for (int i = 0; i < numFiles; i++) {
- files.add(HoodieTestUtils.createNewDataFile(basePath, HoodieTableMetaClient.TEMPFOLDER_NAME, commitTime));
+ files.add(HoodieTestUtils.createNewMarkerFile(basePath, "2019/03/29", commitTime));
}
return files;
}
@@ -915,7 +915,13 @@ public class TestCleaner extends TestHoodieClientBase {
* @throws IOException in case of error
*/
private int getTotalTempFiles() throws IOException {
- return fs.listStatus(new Path(basePath, HoodieTableMetaClient.TEMPFOLDER_NAME)).length;
+ RemoteIterator itr = fs.listFiles(new Path(basePath, HoodieTableMetaClient.TEMPFOLDER_NAME), true);
+ int count = 0;
+ while (itr.hasNext()) {
+ count++;
+ itr.next();
+ }
+ return count;
}
private Stream<Pair<String, String>> convertPathToFileIdWithCommitTime(
diff --git a/hoodie-client/src/test/java/com/uber/hoodie/TestConsistencyGuard.java b/hoodie-client/src/test/java/com/uber/hoodie/TestConsistencyGuard.java
new file mode 100644
index 0000000..3330575
--- /dev/null
+++ b/hoodie-client/src/test/java/com/uber/hoodie/TestConsistencyGuard.java
@@ -0,0 +1,108 @@
+/*
+ * Copyright (c) 2019 Uber Technologies, Inc. (hoodie-dev-group@uber.com)
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package com.uber.hoodie;
+
+import com.uber.hoodie.common.HoodieClientTestUtils;
+import com.uber.hoodie.common.util.ConsistencyGuard;
+import com.uber.hoodie.common.util.FSUtils;
+import com.uber.hoodie.common.util.FailSafeConsistencyGuard;
+import java.io.IOException;
+import java.util.Arrays;
+import java.util.concurrent.TimeoutException;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.LocalFileSystem;
+import org.apache.hadoop.fs.Path;
+import org.junit.Before;
+import org.junit.Test;
+import org.junit.rules.TemporaryFolder;
+
+public class TestConsistencyGuard {
+ private String basePath;
+ protected transient FileSystem fs;
+
+ @Before
+ public void setup() throws IOException {
+ TemporaryFolder testFolder = new TemporaryFolder();
+ testFolder.create();
+ basePath = testFolder.getRoot().getAbsolutePath();
+ fs = FSUtils.getFs(basePath, new Configuration());
+ if (fs instanceof LocalFileSystem) {
+ LocalFileSystem lfs = (LocalFileSystem) fs;
+ // With LocalFileSystem, with checksum disabled, fs.open() returns an inputStream which is FSInputStream
+ // This causes ClassCastExceptions in LogRecordScanner (and potentially other places) calling fs.open
+ // So, for the tests, we enforce checksum verification to circumvent the problem
+ lfs.setVerifyChecksum(true);
+ }
+ }
+
+ @Test
+ public void testCheckPassingAppearAndDisAppear() throws Exception {
+ HoodieClientTestUtils.fakeDataFile(basePath, "partition/path", "000", "f1");
+ HoodieClientTestUtils.fakeDataFile(basePath, "partition/path", "000", "f2");
+ HoodieClientTestUtils.fakeDataFile(basePath, "partition/path", "000", "f3");
+
+ ConsistencyGuard passing = new FailSafeConsistencyGuard(fs, 1, 1000, 1000);
+ passing.waitTillFileAppears(new Path(basePath + "/partition/path/f1_1-0-1_000.parquet"));
+ passing.waitTillFileAppears(new Path(basePath + "/partition/path/f2_1-0-1_000.parquet"));
+ passing.waitTillAllFilesAppear(basePath + "/partition/path",
+ Arrays.asList(basePath + "/partition/path/f1_1-0-1_000.parquet",
+ basePath + "/partition/path/f2_1-0-1_000.parquet"));
+
+ fs.delete(new Path(basePath + "/partition/path/f1_1-0-1_000.parquet"), false);
+ fs.delete(new Path(basePath + "/partition/path/f2_1-0-1_000.parquet"), false);
+ passing.waitTillFileDisappears(new Path(basePath + "/partition/path/f1_1-0-1_000.parquet"));
+ passing.waitTillFileDisappears(new Path(basePath + "/partition/path/f2_1-0-1_000.parquet"));
+ passing.waitTillAllFilesDisappear(basePath + "/partition/path",
+ Arrays.asList(basePath + "/partition/path/f1_1-0-1_000.parquet",
+ basePath + "/partition/path/f2_1-0-1_000.parquet"));
+ }
+
+ @Test(expected = TimeoutException.class)
+ public void testCheckFailingAppear() throws Exception {
+ HoodieClientTestUtils.fakeDataFile(basePath, "partition/path", "000", "f1");
+ ConsistencyGuard passing = new FailSafeConsistencyGuard(fs, 3, 10, 10);
+ passing.waitTillAllFilesAppear(basePath + "/partition/path",
+ Arrays.asList(basePath + "/partition/path/f1_1-0-2_000.parquet",
+ basePath + "/partition/path/f2_1-0-2_000.parquet"));
+ }
+
+
+ @Test(expected = TimeoutException.class)
+ public void testCheckFailingAppears() throws Exception {
+ HoodieClientTestUtils.fakeDataFile(basePath, "partition/path", "000", "f1");
+ ConsistencyGuard passing = new FailSafeConsistencyGuard(fs, 3, 10, 10);
+ passing.waitTillFileAppears(new Path(basePath + "/partition/path/f1_1-0-2_000.parquet"));
+ }
+
+ @Test(expected = TimeoutException.class)
+ public void testCheckFailingDisappear() throws Exception {
+ HoodieClientTestUtils.fakeDataFile(basePath, "partition/path", "000", "f1");
+ ConsistencyGuard passing = new FailSafeConsistencyGuard(fs, 3, 10, 10);
+ passing.waitTillAllFilesDisappear(basePath + "/partition/path",
+ Arrays.asList(basePath + "/partition/path/f1_1-0-1_000.parquet",
+ basePath + "/partition/path/f2_1-0-2_000.parquet"));
+ }
+
+ @Test(expected = TimeoutException.class)
+ public void testCheckFailingDisappears() throws Exception {
+ HoodieClientTestUtils.fakeDataFile(basePath, "partition/path", "000", "f1");
+ HoodieClientTestUtils.fakeDataFile(basePath, "partition/path", "000", "f1");
+ ConsistencyGuard passing = new FailSafeConsistencyGuard(fs, 3, 10, 10);
+ passing.waitTillFileDisappears(new Path(basePath + "/partition/path/f1_1-0-1_000.parquet"));
+ }
+}
diff --git a/hoodie-client/src/test/java/com/uber/hoodie/TestHoodieClientOnCopyOnWriteStorage.java b/hoodie-client/src/test/java/com/uber/hoodie/TestHoodieClientOnCopyOnWriteStorage.java
index 9ce1070..73ce6f0 100644
--- a/hoodie-client/src/test/java/com/uber/hoodie/TestHoodieClientOnCopyOnWriteStorage.java
+++ b/hoodie-client/src/test/java/com/uber/hoodie/TestHoodieClientOnCopyOnWriteStorage.java
@@ -16,6 +16,7 @@
package com.uber.hoodie;
+import static com.uber.hoodie.common.table.HoodieTableMetaClient.MARKER_EXTN;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertTrue;
@@ -38,6 +39,7 @@ import com.uber.hoodie.common.table.TableFileSystemView;
import com.uber.hoodie.common.table.timeline.HoodieInstant;
import com.uber.hoodie.common.util.FSUtils;
import com.uber.hoodie.common.util.ParquetUtils;
+import com.uber.hoodie.common.util.collection.Pair;
import com.uber.hoodie.config.HoodieCompactionConfig;
import com.uber.hoodie.config.HoodieStorageConfig;
import com.uber.hoodie.config.HoodieWriteConfig;
@@ -245,19 +247,6 @@ public class TestHoodieClientOnCopyOnWriteStorage extends TestHoodieClientBase {
}
/**
- * Test Upsert API using temporary folders.
- */
- @Test
- public void testUpsertsWithFinalizeWrite() throws Exception {
- HoodieWriteConfig hoodieWriteConfig = getConfigBuilder()
- .withUseTempFolderCopyOnWriteForCreate(true)
- .withUseTempFolderCopyOnWriteForMerge(true)
- .build();
- testUpsertsInternal(hoodieWriteConfig,
- HoodieWriteClient::upsert, false);
- }
-
- /**
* Test UpsertPrepped API
*/
@Test
@@ -267,19 +256,6 @@ public class TestHoodieClientOnCopyOnWriteStorage extends TestHoodieClientBase {
}
/**
- * Test UpsertPrepped API using temporary folders.
- */
- @Test
- public void testUpsertsPreppedWithFinalizeWrite() throws Exception {
- HoodieWriteConfig hoodieWriteConfig = getConfigBuilder()
- .withUseTempFolderCopyOnWriteForCreate(true)
- .withUseTempFolderCopyOnWriteForMerge(true)
- .build();
- testUpsertsInternal(hoodieWriteConfig,
- HoodieWriteClient::upsertPreppedRecords, true);
- }
-
- /**
* Test one of HoodieWriteClient upsert(Prepped) APIs
*
* @param hoodieWriteConfig Write Config
@@ -385,7 +361,7 @@ public class TestHoodieClientOnCopyOnWriteStorage extends TestHoodieClientBase {
assertEquals("Just 1 file needs to be added.", 1, statuses.size());
String file1 = statuses.get(0).getFileId();
assertEquals("file should contain 100 records", ParquetUtils.readRowKeysFromParquet(jsc.hadoopConfiguration(),
- new Path(basePath, testPartitionPath + "/" + FSUtils.makeDataFileName(commitTime1, 0, file1))).size(), 100);
+ new Path(basePath, statuses.get(0).getStat().getPath())).size(), 100);
// Update + Inserts such that they just expand file1
String commitTime2 = "002";
@@ -403,7 +379,7 @@ public class TestHoodieClientOnCopyOnWriteStorage extends TestHoodieClientBase {
assertEquals("Just 1 file needs to be updated.", 1, statuses.size());
assertEquals("Existing file should be expanded", file1, statuses.get(0).getFileId());
assertEquals("Existing file should be expanded", commitTime1, statuses.get(0).getStat().getPrevCommit());
- Path newFile = new Path(basePath, testPartitionPath + "/" + FSUtils.makeDataFileName(commitTime2, 0, file1));
+ Path newFile = new Path(basePath, statuses.get(0).getStat().getPath());
assertEquals("file should contain 140 records",
ParquetUtils.readRowKeysFromParquet(jsc.hadoopConfiguration(), newFile).size(), 140);
@@ -499,7 +475,7 @@ public class TestHoodieClientOnCopyOnWriteStorage extends TestHoodieClientBase {
assertEquals("Just 1 file needs to be added.", 1, statuses.size());
String file1 = statuses.get(0).getFileId();
assertEquals("file should contain 100 records", ParquetUtils.readRowKeysFromParquet(jsc.hadoopConfiguration(),
- new Path(basePath, testPartitionPath + "/" + FSUtils.makeDataFileName(commitTime1, 0, file1))).size(), 100);
+ new Path(basePath, statuses.get(0).getStat().getPath())).size(), 100);
// Second, set of Inserts should just expand file1
String commitTime2 = "002";
@@ -513,7 +489,7 @@ public class TestHoodieClientOnCopyOnWriteStorage extends TestHoodieClientBase {
assertEquals("Just 1 file needs to be updated.", 1, statuses.size());
assertEquals("Existing file should be expanded", file1, statuses.get(0).getFileId());
assertEquals("Existing file should be expanded", commitTime1, statuses.get(0).getStat().getPrevCommit());
- Path newFile = new Path(basePath, testPartitionPath + "/" + FSUtils.makeDataFileName(commitTime2, 0, file1));
+ Path newFile = new Path(basePath, statuses.get(0).getStat().getPath());
assertEquals("file should contain 140 records",
ParquetUtils.readRowKeysFromParquet(jsc.hadoopConfiguration(), newFile).size(), 140);
@@ -678,22 +654,59 @@ public class TestHoodieClientOnCopyOnWriteStorage extends TestHoodieClientBase {
*/
@Test
public void testConsistencyCheckDuringFinalize() throws Exception {
- HoodieWriteConfig cfg = getConfigBuilder().withAutoCommit(false).build();
- HoodieWriteClient client = getHoodieWriteClient(cfg);
HoodieTableMetaClient metaClient = new HoodieTableMetaClient(jsc.hadoopConfiguration(),
basePath);
+ String commitTime = "000";
+ HoodieWriteConfig cfg = getConfigBuilder().withAutoCommit(false).build();
+ HoodieWriteClient client = getHoodieWriteClient(cfg);
+ Pair<Path, JavaRDD<WriteStatus>> result = testConsistencyCheck(metaClient, commitTime);
+ // Delete orphan marker and commit should succeed
+ metaClient.getFs().delete(result.getKey(), false);
+ assertTrue("Commit should succeed", client.commit(commitTime, result.getRight()));
+ assertTrue("After explicit commit, commit file should be created",
+ HoodieTestUtils.doesCommitExist(basePath, commitTime));
+ // Marker directory must be removed
+ assertFalse(metaClient.getFs().exists(new Path(metaClient.getMarkerFolderPath(commitTime))));
+ }
+
+ @Test
+ public void testRollbackAfterConsistencyCheckFailure() throws Exception {
String commitTime = "000";
+ HoodieTableMetaClient metaClient = new HoodieTableMetaClient(jsc.hadoopConfiguration(), basePath);
+ HoodieWriteConfig cfg = getConfigBuilder().withAutoCommit(false).build();
+ HoodieWriteClient client = getHoodieWriteClient(cfg);
+ testConsistencyCheck(metaClient, commitTime);
+
+ // Rollback of this commit should succeed
+ client.rollback(commitTime);
+ assertFalse("After explicit rollback, commit file should not be present",
+ HoodieTestUtils.doesCommitExist(basePath, commitTime));
+ // Marker directory must be removed after rollback
+ assertFalse(metaClient.getFs().exists(new Path(metaClient.getMarkerFolderPath(commitTime))));
+ }
+
+ private Pair<Path, JavaRDD<WriteStatus>> testConsistencyCheck(HoodieTableMetaClient metaClient, String commitTime)
+ throws Exception {
+ HoodieWriteConfig cfg = getConfigBuilder().withAutoCommit(false).withMaxConsistencyCheckIntervalMs(1)
+ .withInitialConsistencyCheckIntervalMs(1).build();
+ HoodieWriteClient client = getHoodieWriteClient(cfg);
+
client.startCommitWithTime(commitTime);
- JavaRDD<HoodieRecord> writeRecords = jsc
- .parallelize(dataGen.generateInserts(commitTime, 200), 1);
+ JavaRDD<HoodieRecord> writeRecords = jsc.parallelize(dataGen.generateInserts(commitTime, 200), 1);
JavaRDD<WriteStatus> result = client.bulkInsert(writeRecords, commitTime);
-
- // move one of the files & commit should fail
- WriteStatus status = result.take(1).get(0);
- Path origPath = new Path(basePath + "/" + status.getStat().getPath());
- Path hidePath = new Path(basePath + "/" + status.getStat().getPath() + "_hide");
- metaClient.getFs().rename(origPath, hidePath);
+ result.collect();
+
+ // Create a dummy marker file to simulate the case that a marker file was created without data file.
+ // This should fail the commit
+ String partitionPath = Arrays.stream(fs.globStatus(new Path(String.format("%s/*/*/*/*",
+ metaClient.getMarkerFolderPath(commitTime))),
+ path -> path.toString().endsWith(MARKER_EXTN))).limit(1)
+ .map(status -> status.getPath().getParent().toString()).collect(Collectors.toList()).get(0);
+ Path markerFilePath = new Path(String.format("%s/%s", partitionPath,
+ FSUtils.makeMarkerFile(commitTime, "1-0-1", UUID.randomUUID().toString())));
+ metaClient.getFs().create(markerFilePath);
+ logger.info("Created a dummy marker path=" + markerFilePath);
try {
client.commit(commitTime, result);
@@ -701,12 +714,7 @@ public class TestHoodieClientOnCopyOnWriteStorage extends TestHoodieClientBase {
} catch (HoodieCommitException cme) {
assertTrue(cme.getCause() instanceof HoodieIOException);
}
-
- // Re-introduce & commit should succeed
- metaClient.getFs().rename(hidePath, origPath);
- assertTrue("Commit should succeed", client.commit(commitTime, result));
- assertTrue("After explicit commit, commit file should be created",
- HoodieTestUtils.doesCommitExist(basePath, commitTime));
+ return Pair.of(markerFilePath, result);
}
/**
diff --git a/hoodie-client/src/test/java/com/uber/hoodie/TestHoodieReadClient.java b/hoodie-client/src/test/java/com/uber/hoodie/TestHoodieReadClient.java
index 8070be7..43dce29 100644
--- a/hoodie-client/src/test/java/com/uber/hoodie/TestHoodieReadClient.java
+++ b/hoodie-client/src/test/java/com/uber/hoodie/TestHoodieReadClient.java
@@ -27,6 +27,7 @@ import java.util.List;
import java.util.Optional;
import java.util.stream.Collectors;
import org.apache.spark.api.java.JavaRDD;
+import org.junit.Assert;
import org.junit.Test;
import scala.Option;
@@ -107,7 +108,7 @@ public class TestHoodieReadClient extends TestHoodieClientBase {
filteredRDD = readClient.filterExists(recordsRDD);
List<HoodieRecord> result = filteredRDD.collect();
// Check results
- assertTrue(result.size() == 25);
+ Assert.assertEquals(25, result.size());
}
/**
diff --git a/hoodie-client/src/test/java/com/uber/hoodie/common/HoodieClientTestUtils.java b/hoodie-client/src/test/java/com/uber/hoodie/common/HoodieClientTestUtils.java
index 2507dca..e77653e 100644
--- a/hoodie-client/src/test/java/com/uber/hoodie/common/HoodieClientTestUtils.java
+++ b/hoodie-client/src/test/java/com/uber/hoodie/common/HoodieClientTestUtils.java
@@ -105,7 +105,7 @@ public class HoodieClientTestUtils {
throws Exception {
String parentPath = String.format("%s/%s", basePath, partitionPath);
new File(parentPath).mkdirs();
- String path = String.format("%s/%s", parentPath, FSUtils.makeDataFileName(commitTime, 0, fileId));
+ String path = String.format("%s/%s", parentPath, FSUtils.makeDataFileName(commitTime, "1-0-1", fileId));
new File(path).createNewFile();
new RandomAccessFile(path, "rw").setLength(length);
}
@@ -236,7 +236,7 @@ public class HoodieClientTestUtils {
Thread.sleep(1000);
String commitTime = HoodieTestUtils.makeNewCommitTime();
String fileId = UUID.randomUUID().toString();
- String filename = FSUtils.makeDataFileName(commitTime, 1, fileId);
+ String filename = FSUtils.makeDataFileName(commitTime, "1-0-1", fileId);
HoodieTestUtils.createCommitFiles(basePath, commitTime);
return HoodieClientTestUtils
.writeParquetFile(basePath, partitionPath, filename, records, schema, filter, createCommitTime);
diff --git a/hoodie-client/src/test/java/com/uber/hoodie/func/TestUpdateMapFunction.java b/hoodie-client/src/test/java/com/uber/hoodie/func/TestUpdateMapFunction.java
index 20a14ea..d3c78a8 100644
--- a/hoodie-client/src/test/java/com/uber/hoodie/func/TestUpdateMapFunction.java
+++ b/hoodie-client/src/test/java/com/uber/hoodie/func/TestUpdateMapFunction.java
@@ -25,24 +25,32 @@ import com.uber.hoodie.common.model.HoodieKey;
import com.uber.hoodie.common.model.HoodieRecord;
import com.uber.hoodie.common.model.HoodieRecordLocation;
import com.uber.hoodie.common.model.HoodieTestUtils;
-import com.uber.hoodie.common.table.HoodieTableMetaClient;
import com.uber.hoodie.common.table.HoodieTimeline;
import com.uber.hoodie.common.util.FSUtils;
+import com.uber.hoodie.common.util.ParquetUtils;
import com.uber.hoodie.config.HoodieWriteConfig;
+import com.uber.hoodie.io.HoodieCreateHandle;
+import com.uber.hoodie.io.HoodieMergeHandle;
import com.uber.hoodie.table.HoodieCopyOnWriteTable;
import java.io.File;
+import java.io.Serializable;
import java.util.ArrayList;
-import java.util.Iterator;
+import java.util.Arrays;
import java.util.List;
+import org.apache.avro.Schema;
+import org.apache.avro.generic.GenericRecord;
import org.apache.commons.io.IOUtils;
+import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
+import org.apache.parquet.avro.AvroReadSupport;
import org.apache.spark.api.java.JavaSparkContext;
import org.junit.After;
+import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
import org.junit.rules.TemporaryFolder;
-public class TestUpdateMapFunction {
+public class TestUpdateMapFunction implements Serializable {
private String basePath = null;
private transient JavaSparkContext jsc = null;
@@ -71,51 +79,73 @@ public class TestUpdateMapFunction {
@Test
public void testSchemaEvolutionOnUpdate() throws Exception {
// Create a bunch of records with a old version of schema
- HoodieWriteConfig config = makeHoodieClientConfig("/exampleSchema.txt");
- HoodieTableMetaClient metaClient = new HoodieTableMetaClient(HoodieTestUtils.getDefaultHadoopConf(), basePath);
- HoodieCopyOnWriteTable table = new HoodieCopyOnWriteTable(config, jsc);
+ final HoodieWriteConfig config = makeHoodieClientConfig("/exampleSchema.txt");
+ System.out.println("JSC =" + jsc);
+ final HoodieCopyOnWriteTable table = new HoodieCopyOnWriteTable(config, jsc);
- String recordStr1 = "{\"_row_key\":\"8eb5b87a-1feh-4edd-87b4-6ec96dc405a0\","
- + "\"time\":\"2016-01-31T03:16:41.415Z\",\"number\":12}";
- String recordStr2 = "{\"_row_key\":\"8eb5b87b-1feu-4edd-87b4-6ec96dc405a0\","
- + "\"time\":\"2016-01-31T03:20:41.415Z\",\"number\":100}";
- String recordStr3 = "{\"_row_key\":\"8eb5b87c-1fej-4edd-87b4-6ec96dc405a0\","
- + "\"time\":\"2016-01-31T03:16:41.415Z\",\"number\":15}";
- List<HoodieRecord> records = new ArrayList<>();
- TestRawTripPayload rowChange1 = new TestRawTripPayload(recordStr1);
- records.add(new HoodieRecord(new HoodieKey(rowChange1.getRowKey(), rowChange1.getPartitionPath()), rowChange1));
- TestRawTripPayload rowChange2 = new TestRawTripPayload(recordStr2);
- records.add(new HoodieRecord(new HoodieKey(rowChange2.getRowKey(), rowChange2.getPartitionPath()), rowChange2));
- TestRawTripPayload rowChange3 = new TestRawTripPayload(recordStr3);
- records.add(new HoodieRecord(new HoodieKey(rowChange3.getRowKey(), rowChange3.getPartitionPath()), rowChange3));
- Iterator<List<WriteStatus>> insertResult = table.handleInsert("100", records.iterator());
- Path commitFile = new Path(config.getBasePath() + "/.hoodie/" + HoodieTimeline.makeCommitFileName("100"));
+ final List<WriteStatus> statuses = jsc.parallelize(Arrays.asList(1)).map(x -> {
+ String recordStr1 = "{\"_row_key\":\"8eb5b87a-1feh-4edd-87b4-6ec96dc405a0\","
+ + "\"time\":\"2016-01-31T03:16:41.415Z\",\"number\":12}";
+ String recordStr2 = "{\"_row_key\":\"8eb5b87b-1feu-4edd-87b4-6ec96dc405a0\","
+ + "\"time\":\"2016-01-31T03:20:41.415Z\",\"number\":100}";
+ String recordStr3 = "{\"_row_key\":\"8eb5b87c-1fej-4edd-87b4-6ec96dc405a0\","
+ + "\"time\":\"2016-01-31T03:16:41.415Z\",\"number\":15}";
+ List<HoodieRecord> insertRecords = new ArrayList<>();
+ TestRawTripPayload rowChange1 = new TestRawTripPayload(recordStr1);
+ insertRecords
+ .add(new HoodieRecord(new HoodieKey(rowChange1.getRowKey(), rowChange1.getPartitionPath()), rowChange1));
+ TestRawTripPayload rowChange2 = new TestRawTripPayload(recordStr2);
+ insertRecords
+ .add(new HoodieRecord(new HoodieKey(rowChange2.getRowKey(), rowChange2.getPartitionPath()), rowChange2));
+ TestRawTripPayload rowChange3 = new TestRawTripPayload(recordStr3);
+ insertRecords
+ .add(new HoodieRecord(new HoodieKey(rowChange3.getRowKey(), rowChange3.getPartitionPath()), rowChange3));
+
+ HoodieCreateHandle createHandle = new HoodieCreateHandle(config, "100", table, rowChange1.getPartitionPath(),
+ "f1-0", insertRecords.iterator());
+ createHandle.write();
+ WriteStatus insertResult = createHandle.close();
+ return insertResult;
+ }).collect();
+
+ final Path commitFile = new Path(config.getBasePath() + "/.hoodie/" + HoodieTimeline.makeCommitFileName("100"));
FSUtils.getFs(basePath, HoodieTestUtils.getDefaultHadoopConf()).create(commitFile);
// Now try an update with an evolved schema
// Evolved schema does not have guarantee on preserving the original field ordering
- config = makeHoodieClientConfig("/exampleEvolvedSchema.txt");
- metaClient = new HoodieTableMetaClient(HoodieTestUtils.getDefaultHadoopConf(), basePath);
- String fileId = insertResult.next().get(0).getFileId();
- System.out.println(fileId);
+ final HoodieWriteConfig config2 = makeHoodieClientConfig("/exampleEvolvedSchema.txt");
+ final Schema schema = Schema.parse(config2.getSchema());
+ final WriteStatus insertResult = statuses.get(0);
+ String fileId = insertResult.getFileId();
- table = new HoodieCopyOnWriteTable(config, jsc);
- // New content with values for the newly added field
- recordStr1 = "{\"_row_key\":\"8eb5b87a-1feh-4edd-87b4-6ec96dc405a0\","
- + "\"time\":\"2016-01-31T03:16:41.415Z\",\"number\":12,\"added_field\":1}";
- records = new ArrayList<>();
- rowChange1 = new TestRawTripPayload(recordStr1);
- HoodieRecord record1 = new HoodieRecord(new HoodieKey(rowChange1.getRowKey(), rowChange1.getPartitionPath()),
- rowChange1);
- record1.setCurrentLocation(new HoodieRecordLocation("100", fileId));
- records.add(record1);
+ final HoodieCopyOnWriteTable table2 = new HoodieCopyOnWriteTable(config2, jsc);
+ Assert.assertEquals(1, jsc.parallelize(Arrays.asList(1)).map(x -> {
+ // New content with values for the newly added field
+ String recordStr1 = "{\"_row_key\":\"8eb5b87a-1feh-4edd-87b4-6ec96dc405a0\","
+ + "\"time\":\"2016-01-31T03:16:41.415Z\",\"number\":12,\"added_field\":1}";
+ List<HoodieRecord> updateRecords = new ArrayList<>();
+ TestRawTripPayload rowChange1 = new TestRawTripPayload(recordStr1);
+ HoodieRecord record1 = new HoodieRecord(new HoodieKey(rowChange1.getRowKey(), rowChange1.getPartitionPath()),
+ rowChange1);
+ record1.setCurrentLocation(new HoodieRecordLocation("100", fileId));
+ updateRecords.add(record1);
- try {
- table.handleUpdate("101", fileId, records.iterator());
- } catch (ClassCastException e) {
- fail("UpdateFunction could not read records written with exampleSchema.txt using the "
- + "exampleEvolvedSchema.txt");
- }
+ try {
+ HoodieMergeHandle mergeHandle = new HoodieMergeHandle(config2, "101", table2, updateRecords.iterator(), fileId);
+ Configuration conf = new Configuration();
+ AvroReadSupport.setAvroReadSchema(conf, mergeHandle.getWriterSchema());
+ List<GenericRecord> oldRecords = ParquetUtils.readAvroRecords(conf,
+ new Path(config2.getBasePath() + "/" + insertResult.getStat().getPath()));
+ for (GenericRecord rec : oldRecords) {
+ mergeHandle.write(rec);
+ }
+ mergeHandle.close();
+ } catch (ClassCastException e) {
+ fail("UpdateFunction could not read records written with exampleSchema.txt using the "
+ + "exampleEvolvedSchema.txt");
+ }
+ return 1;
+ }).collect().size());
}
private HoodieWriteConfig makeHoodieClientConfig(String schema) throws Exception {
@@ -123,5 +153,4 @@ public class TestUpdateMapFunction {
String schemaStr = IOUtils.toString(getClass().getResourceAsStream(schema), "UTF-8");
return HoodieWriteConfig.newBuilder().withPath(basePath).withSchema(schemaStr).build();
}
-
-}
+}
\ No newline at end of file
diff --git a/hoodie-client/src/test/java/com/uber/hoodie/io/TestConsistencyCheck.java b/hoodie-client/src/test/java/com/uber/hoodie/io/TestConsistencyCheck.java
deleted file mode 100644
index 9a1c4d5..0000000
--- a/hoodie-client/src/test/java/com/uber/hoodie/io/TestConsistencyCheck.java
+++ /dev/null
@@ -1,91 +0,0 @@
-/*
- * Copyright (c) 2018 Uber Technologies, Inc. (hoodie-dev-group@uber.com)
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package com.uber.hoodie.io;
-
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertTrue;
-import static org.mockito.Mockito.anyInt;
-import static org.mockito.Mockito.anyList;
-import static org.mockito.Mockito.spy;
-import static org.mockito.Mockito.times;
-import static org.mockito.Mockito.verify;
-
-import com.uber.hoodie.common.HoodieClientTestUtils;
-import java.io.File;
-import java.io.IOException;
-import java.util.Arrays;
-import org.apache.spark.api.java.JavaSparkContext;
-import org.junit.After;
-import org.junit.Before;
-import org.junit.Test;
-import org.junit.rules.TemporaryFolder;
-
-public class TestConsistencyCheck {
-
- private String basePath;
- private JavaSparkContext jsc;
-
- @Before
- public void setup() throws IOException {
- jsc = new JavaSparkContext(HoodieClientTestUtils.getSparkConfForTest("ConsistencyCheckTest"));
- TemporaryFolder testFolder = new TemporaryFolder();
- testFolder.create();
- basePath = testFolder.getRoot().getAbsolutePath();
- }
-
- @After
- public void teardown() {
- if (jsc != null) {
- jsc.stop();
- }
- File testFolderPath = new File(basePath);
- if (testFolderPath.exists()) {
- testFolderPath.delete();
- }
- }
-
- @Test
- public void testExponentialBackoff() throws Exception {
- HoodieClientTestUtils.fakeDataFile(basePath, "partition/path", "000", "f1");
- JavaSparkContext jscSpy = spy(jsc);
-
- ConsistencyCheck failing = new ConsistencyCheck(basePath,
- Arrays.asList("partition/path/f1_0_000.parquet", "partition/path/f2_0_000.parquet"),
- jscSpy, 2);
- long startMs = System.currentTimeMillis();
- assertEquals(1, failing.check(5, 10).size());
- assertTrue((System.currentTimeMillis() - startMs) > (10 + 20 + 40 + 80));
- verify(jscSpy, times(5)).parallelize(anyList(), anyInt());
- }
-
- @Test
- public void testCheckPassingAndFailing() throws Exception {
- HoodieClientTestUtils.fakeDataFile(basePath, "partition/path", "000", "f1");
- HoodieClientTestUtils.fakeDataFile(basePath, "partition/path", "000", "f2");
- HoodieClientTestUtils.fakeDataFile(basePath, "partition/path", "000", "f3");
-
- ConsistencyCheck passing = new ConsistencyCheck(basePath,
- Arrays.asList("partition/path/f1_0_000.parquet", "partition/path/f2_0_000.parquet"),
- jsc, 2);
- assertEquals(0, passing.check(1, 1000).size());
-
- ConsistencyCheck failing = new ConsistencyCheck(basePath,
- Arrays.asList("partition/path/f1_0_000.parquet", "partition/path/f4_0_000.parquet"),
- jsc, 2);
- assertEquals(1, failing.check(1, 1000).size());
- }
-}
diff --git a/hoodie-client/src/test/java/com/uber/hoodie/io/TestHoodieCommitArchiveLog.java b/hoodie-client/src/test/java/com/uber/hoodie/io/TestHoodieCommitArchiveLog.java
index 2d6f957..45c200b 100644
--- a/hoodie-client/src/test/java/com/uber/hoodie/io/TestHoodieCommitArchiveLog.java
+++ b/hoodie-client/src/test/java/com/uber/hoodie/io/TestHoodieCommitArchiveLog.java
@@ -74,13 +74,14 @@ public class TestHoodieCommitArchiveLog {
@AfterClass
public static void cleanUp() throws Exception {
+ // Need to closeAll to clear FileSystem.Cache, required because DFS and LocalFS used in the
+ // same JVM
+ FileSystem.closeAll();
+
if (hdfsTestService != null) {
hdfsTestService.stop();
dfsCluster.shutdown();
}
- // Need to closeAll to clear FileSystem.Cache, required because DFS and LocalFS used in the
- // same JVM
- FileSystem.closeAll();
}
@BeforeClass
@@ -245,7 +246,7 @@ public class TestHoodieCommitArchiveLog {
//read the file
HoodieLogFormat.Reader reader = HoodieLogFormat.newReader(dfs,
- new HoodieLogFile(new Path(basePath + "/.hoodie/.commits_.archive.1")),
+ new HoodieLogFile(new Path(basePath + "/.hoodie/.commits_.archive.1_1-0-1")),
HoodieArchivedMetaEntry.getClassSchema());
int archivedRecordsCount = 0;
diff --git a/hoodie-client/src/test/java/com/uber/hoodie/table/TestCopyOnWriteTable.java b/hoodie-client/src/test/java/com/uber/hoodie/table/TestCopyOnWriteTable.java
index 714f0e5..822ec93 100644
--- a/hoodie-client/src/test/java/com/uber/hoodie/table/TestCopyOnWriteTable.java
+++ b/hoodie-client/src/test/java/com/uber/hoodie/table/TestCopyOnWriteTable.java
@@ -17,7 +17,6 @@
package com.uber.hoodie.table;
import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertTrue;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;
@@ -36,6 +35,7 @@ import com.uber.hoodie.common.table.HoodieTableMetaClient;
import com.uber.hoodie.common.table.HoodieTimeline;
import com.uber.hoodie.common.util.FSUtils;
import com.uber.hoodie.common.util.ParquetUtils;
+import com.uber.hoodie.common.util.collection.Pair;
import com.uber.hoodie.config.HoodieCompactionConfig;
import com.uber.hoodie.config.HoodieStorageConfig;
import com.uber.hoodie.config.HoodieWriteConfig;
@@ -44,18 +44,18 @@ import com.uber.hoodie.table.HoodieCopyOnWriteTable.UpsertPartitioner;
import java.io.File;
import java.util.ArrayList;
import java.util.Arrays;
-import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.UUID;
import org.apache.avro.generic.GenericRecord;
import org.apache.commons.io.IOUtils;
-import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.parquet.avro.AvroReadSupport;
import org.apache.parquet.hadoop.ParquetReader;
+import org.apache.spark.TaskContext;
import org.apache.spark.api.java.JavaSparkContext;
import org.junit.After;
+import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
import org.junit.rules.TemporaryFolder;
@@ -85,20 +85,24 @@ public class TestCopyOnWriteTable {
public void testMakeNewPath() throws Exception {
String fileName = UUID.randomUUID().toString();
String partitionPath = "2016/05/04";
- int unitNumber = (int) (Math.random() * 10);
- HoodieRecord record = mock(HoodieRecord.class);
- when(record.getPartitionPath()).thenReturn(partitionPath);
String commitTime = HoodieTestUtils.makeNewCommitTime();
HoodieWriteConfig config = makeHoodieClientConfig();
HoodieTableMetaClient metaClient = new HoodieTableMetaClient(jsc.hadoopConfiguration(), basePath);
HoodieTable table = HoodieTable.getHoodieTable(metaClient, config, jsc);
- HoodieCreateHandle io = new HoodieCreateHandle(config, commitTime, table, partitionPath,
- UUID.randomUUID().toString());
- Path newPath = io.makeNewPath(record.getPartitionPath(), unitNumber, fileName);
- assertTrue(newPath.toString().equals(
- this.basePath + "/" + partitionPath + "/" + FSUtils.makeDataFileName(commitTime, unitNumber, fileName)));
+ Pair<Path, String> newPathWithWriteToken = jsc.parallelize(Arrays.asList(1)).map(x -> {
+ HoodieRecord record = mock(HoodieRecord.class);
+ when(record.getPartitionPath()).thenReturn(partitionPath);
+ String writeToken = FSUtils.makeWriteToken(TaskContext.getPartitionId(), TaskContext.get().stageId(),
+ TaskContext.get().taskAttemptId());
+ HoodieCreateHandle io = new HoodieCreateHandle(config, commitTime, table, partitionPath, fileName);
+ return Pair.of(io.makeNewPath(record.getPartitionPath()), writeToken);
+ }).collect().get(0);
+
+ Assert.assertEquals(newPathWithWriteToken.getKey().toString(),
+ this.basePath + "/" + partitionPath + "/"
+ + FSUtils.makeDataFileName(commitTime, newPathWithWriteToken.getRight(), fileName));
}
private HoodieWriteConfig makeHoodieClientConfig() throws Exception {
@@ -141,7 +145,11 @@ public class TestCopyOnWriteTable {
records.add(new HoodieRecord(new HoodieKey(rowChange3.getRowKey(), rowChange3.getPartitionPath()), rowChange3));
// Insert new records
- HoodieClientTestUtils.collectStatuses(table.handleInsert(firstCommitTime, records.iterator()));
+ final HoodieCopyOnWriteTable cowTable = table;
+ jsc.parallelize(Arrays.asList(1)).map(x -> {
+ return cowTable.handleInsert(firstCommitTime, FSUtils.createNewFileIdPfx(), records.iterator());
+ }).map(x -> HoodieClientTestUtils.collectStatuses(x)).collect();
+
// We should have a parquet file generated (TODO: better control # files after we revise
// AvroParquetIO)
File parquetFile = null;
@@ -190,10 +198,12 @@ public class TestCopyOnWriteTable {
Thread.sleep(1000);
String newCommitTime = HoodieTestUtils.makeNewCommitTime();
metadata = new HoodieTableMetaClient(jsc.hadoopConfiguration(), basePath);
- table = new HoodieCopyOnWriteTable(config, jsc);
- Iterator<List<WriteStatus>> iter = table
- .handleUpdate(newCommitTime, updatedRecord1.getCurrentLocation().getFileId(),
- updatedRecords.iterator());
+ final HoodieCopyOnWriteTable newTable = new HoodieCopyOnWriteTable(config, jsc);
+ List<WriteStatus> statuses =
+ jsc.parallelize(Arrays.asList(1)).map(x -> {
+ return newTable.handleUpdate(newCommitTime, updatedRecord1.getCurrentLocation().getFileId(),
+ updatedRecords.iterator());
+ }).flatMap(x -> HoodieClientTestUtils.collectStatuses(x).iterator()).collect();
// Check the updated file
File updatedParquetFile = null;
@@ -231,7 +241,6 @@ public class TestCopyOnWriteTable {
}
updatedReader.close();
// Also check the numRecordsWritten
- List<WriteStatus> statuses = HoodieClientTestUtils.collectStatuses(iter);
WriteStatus writeStatus = statuses.get(0);
assertTrue("Should be only one file generated", statuses.size() == 1);
assertEquals(4, writeStatus.getStat().getNumWrites());//3 rewritten records + 1 new record
@@ -277,8 +286,10 @@ public class TestCopyOnWriteTable {
records.add(new HoodieRecord(new HoodieKey(rowChange3.getRowKey(), rowChange3.getPartitionPath()), rowChange3));
// Insert new records
- List<WriteStatus> writeStatuses = HoodieClientTestUtils
- .collectStatuses(table.handleInsert(firstCommitTime, records.iterator()));
+ List<WriteStatus> writeStatuses = jsc.parallelize(Arrays.asList(1)).map(x -> {
+ return table.handleInsert(firstCommitTime, FSUtils.createNewFileIdPfx(), records.iterator());
+ }).flatMap(x -> HoodieClientTestUtils.collectStatuses(x).iterator()).collect();
+
Map<String, String> allWriteStatusMergedMetadataMap = MetadataMergeWriteStatus
.mergeMetadataForWriteStatuses(writeStatuses);
assertTrue(allWriteStatusMergedMetadataMap.containsKey("InputRecordCount_1506582000"));
@@ -288,41 +299,6 @@ public class TestCopyOnWriteTable {
}
@Test
- public void testInsertWithPartialFailures() throws Exception {
- HoodieWriteConfig config = makeHoodieClientConfig();
- String commitTime = HoodieTestUtils.makeNewCommitTime();
- FileSystem fs = FSUtils.getFs(basePath, jsc.hadoopConfiguration());
- HoodieTableMetaClient metadata = new HoodieTableMetaClient(jsc.hadoopConfiguration(), basePath);
- HoodieCopyOnWriteTable table = new HoodieCopyOnWriteTable(config, jsc);
-
- // Write a few records, and get atleast one file
- // 10 records for partition 1, 1 record for partition 2.
- List<HoodieRecord> records = newHoodieRecords(10, "2016-01-31T03:16:41.415Z");
- records.addAll(newHoodieRecords(1, "2016-02-01T03:16:41.415Z"));
-
- // Simulate crash after first file
- List<WriteStatus> statuses = HoodieClientTestUtils
- .collectStatuses(table.handleInsert(commitTime, records.iterator()));
- WriteStatus status = statuses.get(0);
- Path partialFile = new Path(String.format("%s/%s/%s", basePath, status.getPartitionPath(),
- FSUtils.makeDataFileName(commitTime, 0, status.getFileId())));
- assertTrue(fs.exists(partialFile));
-
- // When we retry
- records = newHoodieRecords(10, "2016-01-31T03:16:41.415Z");
- records.addAll(newHoodieRecords(1, "2016-02-01T03:16:41.415Z"));
-
- statuses = HoodieClientTestUtils.collectStatuses(table.handleInsert(commitTime, records.iterator()));
- status = statuses.get(0);
-
- Path retriedFIle = new Path(String.format("%s/%s/%s", basePath, status.getPartitionPath(),
- FSUtils.makeDataFileName(commitTime, 0, status.getFileId())));
- assertTrue(fs.exists(retriedFIle));
- assertFalse(fs.exists(partialFile));
- }
-
-
- @Test
public void testInsertRecords() throws Exception {
HoodieWriteConfig config = makeHoodieClientConfig();
String commitTime = HoodieTestUtils.makeNewCommitTime();
@@ -335,8 +311,10 @@ public class TestCopyOnWriteTable {
records.addAll(newHoodieRecords(1, "2016-02-01T03:16:41.415Z"));
// Insert new records
- List<WriteStatus> returnedStatuses = HoodieClientTestUtils
- .collectStatuses(table.handleInsert(commitTime, records.iterator()));
+ final List<HoodieRecord> recs2 = records;
+ List<WriteStatus> returnedStatuses = jsc.parallelize(Arrays.asList(1)).map(x -> {
+ return table.handleInsert(commitTime, FSUtils.createNewFileIdPfx(), recs2.iterator());
+ }).flatMap(x -> HoodieClientTestUtils.collectStatuses(x).iterator()).collect();
// TODO: check the actual files and make sure 11 records, total were written.
assertEquals(2, returnedStatuses.size());
@@ -354,7 +332,11 @@ public class TestCopyOnWriteTable {
records.addAll(newHoodieRecords(1, "2016-02-02T03:16:41.415Z"));
// Insert new records
- returnedStatuses = HoodieClientTestUtils.collectStatuses(table.handleInsert(commitTime, records.iterator()));
+ final List<HoodieRecord> recs3 = records;
+
+ returnedStatuses = jsc.parallelize(Arrays.asList(1)).map(x -> {
+ return table.handleInsert(commitTime, FSUtils.createNewFileIdPfx(), recs3.iterator());
+ }).flatMap(x -> HoodieClientTestUtils.collectStatuses(x).iterator()).collect();
assertEquals(3, returnedStatuses.size());
assertEquals("2016/01/31", returnedStatuses.get(0).getPartitionPath());
@@ -388,7 +370,9 @@ public class TestCopyOnWriteTable {
}
// Insert new records
- HoodieClientTestUtils.collectStatuses(table.handleInsert(commitTime, records.iterator()));
+ jsc.parallelize(Arrays.asList(1))
+ .map(i -> table.handleInsert(commitTime, FSUtils.createNewFileIdPfx(), records.iterator()))
+ .map(x -> HoodieClientTestUtils.collectStatuses(x)).collect();
// Check the updated file
int counts = 0;
@@ -487,19 +471,26 @@ public class TestCopyOnWriteTable {
HoodieWriteConfig config = makeHoodieClientConfigBuilder().withStorageConfig(
HoodieStorageConfig.newBuilder().limitFileSize(1000 * 1024).build()).build();
HoodieTableMetaClient metadata = new HoodieTableMetaClient(jsc.hadoopConfiguration(), basePath);
- HoodieCopyOnWriteTable table = new HoodieCopyOnWriteTable(config, jsc);
+ final HoodieCopyOnWriteTable table = new HoodieCopyOnWriteTable(config, jsc);
String commitTime = "000";
HoodieTestDataGenerator dataGenerator = new HoodieTestDataGenerator();
// Perform inserts of 100 records to test CreateHandle and BufferedExecutor
- List<HoodieRecord> inserts = dataGenerator.generateInsertsWithHoodieAvroPayload(commitTime, 100);
- Iterator<List<WriteStatus>> ws = table.handleInsert(commitTime, inserts.iterator());
- WriteStatus writeStatus = ws.next().get(0);
+ final List<HoodieRecord> inserts = dataGenerator.generateInsertsWithHoodieAvroPayload(commitTime, 100);
+ final List<List<WriteStatus>> ws = jsc.parallelize(Arrays.asList(1)).map(x -> {
+ return table.handleInsert(commitTime, UUID.randomUUID().toString(), inserts.iterator());
+ }).map(x -> (List<WriteStatus>)HoodieClientTestUtils.collectStatuses(x)).collect();
+
+ WriteStatus writeStatus = ws.get(0).get(0);
String fileId = writeStatus.getFileId();
metadata.getFs().create(new Path(basePath + "/.hoodie/000.commit")).close();
- table = new HoodieCopyOnWriteTable(config, jsc);
- // Perform update of 100 records to test MergeHandle and BufferedExecutor
- table.handleUpdate("001", fileId,
- dataGenerator.generateUpdatesWithHoodieAvroPayload(commitTime, writeStatus.getWrittenRecords()).iterator());
+ final HoodieCopyOnWriteTable table2 = new HoodieCopyOnWriteTable(config, jsc);
+
+ final List<HoodieRecord> updates =
+ dataGenerator.generateUpdatesWithHoodieAvroPayload(commitTime, writeStatus.getWrittenRecords());
+
+ jsc.parallelize(Arrays.asList(1)).map(x -> {
+ return table2.handleUpdate("001", fileId, updates.iterator());
+ }).map(x -> (List<WriteStatus>)HoodieClientTestUtils.collectStatuses(x)).collect();
}
@After
@@ -511,4 +502,4 @@ public class TestCopyOnWriteTable {
jsc.stop();
}
}
-}
+}
\ No newline at end of file
diff --git a/hoodie-client/src/main/java/com/uber/hoodie/io/storage/HoodieWrapperFileSystem.java b/hoodie-common/src/main/java/com/uber/hoodie/common/io/storage/HoodieWrapperFileSystem.java
similarity index 92%
rename from hoodie-client/src/main/java/com/uber/hoodie/io/storage/HoodieWrapperFileSystem.java
rename to hoodie-common/src/main/java/com/uber/hoodie/common/io/storage/HoodieWrapperFileSystem.java
index f18901d..389cf6d 100644
--- a/hoodie-client/src/main/java/com/uber/hoodie/io/storage/HoodieWrapperFileSystem.java
+++ b/hoodie-common/src/main/java/com/uber/hoodie/common/io/storage/HoodieWrapperFileSystem.java
@@ -14,10 +14,12 @@
* limitations under the License.
*/
-package com.uber.hoodie.io.storage;
+package com.uber.hoodie.common.io.storage;
import com.uber.hoodie.common.storage.StorageSchemes;
+import com.uber.hoodie.common.util.ConsistencyGuard;
import com.uber.hoodie.common.util.FSUtils;
+import com.uber.hoodie.common.util.NoOpConsistencyGuard;
import com.uber.hoodie.exception.HoodieIOException;
import java.io.IOException;
import java.net.URI;
@@ -27,6 +29,7 @@ import java.util.List;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
+import java.util.concurrent.TimeoutException;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.BlockLocation;
import org.apache.hadoop.fs.ContentSummary;
@@ -64,6 +67,16 @@ public class HoodieWrapperFileSystem extends FileSystem {
ConcurrentHashMap<>();
private FileSystem fileSystem;
private URI uri;
+ private ConsistencyGuard consistencyGuard = new NoOpConsistencyGuard();
+
+ public HoodieWrapperFileSystem() {
+ }
+
+ public HoodieWrapperFileSystem(FileSystem fileSystem, ConsistencyGuard consistencyGuard) {
+ this.fileSystem = fileSystem;
+ this.uri = fileSystem.getUri();
+ this.consistencyGuard = consistencyGuard;
+ }
public static Path convertToHoodiePath(Path file, Configuration conf) {
try {
@@ -139,8 +152,8 @@ public class HoodieWrapperFileSystem extends FileSystem {
return fsDataOutputStream;
}
- SizeAwareFSDataOutputStream os = new SizeAwareFSDataOutputStream(
- fsDataOutputStream, () -> openStreams.remove(path.getName()));
+ SizeAwareFSDataOutputStream os = new SizeAwareFSDataOutputStream(path,
+ fsDataOutputStream, consistencyGuard, () -> openStreams.remove(path.getName()));
openStreams.put(path.getName(), os);
return os;
}
@@ -157,66 +170,66 @@ public class HoodieWrapperFileSystem extends FileSystem {
@Override
public FSDataOutputStream create(Path f, Progressable progress) throws IOException {
- return fileSystem.create(convertToDefaultPath(f), progress);
+ return wrapOutputStream(f, fileSystem.create(convertToDefaultPath(f), progress));
}
@Override
public FSDataOutputStream create(Path f, short replication) throws IOException {
- return fileSystem.create(convertToDefaultPath(f), replication);
+ return wrapOutputStream(f, fileSystem.create(convertToDefaultPath(f), replication));
}
@Override
public FSDataOutputStream create(Path f, short replication, Progressable progress)
throws IOException {
- return fileSystem.create(convertToDefaultPath(f), replication, progress);
+ return wrapOutputStream(f, fileSystem.create(convertToDefaultPath(f), replication, progress));
}
@Override
public FSDataOutputStream create(Path f, boolean overwrite, int bufferSize) throws IOException {
- return fileSystem.create(convertToDefaultPath(f), overwrite, bufferSize);
+ return wrapOutputStream(f, fileSystem.create(convertToDefaultPath(f), overwrite, bufferSize));
}
@Override
public FSDataOutputStream create(Path f, boolean overwrite, int bufferSize, Progressable progress)
throws IOException {
- return fileSystem.create(convertToDefaultPath(f), overwrite, bufferSize, progress);
+ return wrapOutputStream(f, fileSystem.create(convertToDefaultPath(f), overwrite, bufferSize, progress));
}
@Override
public FSDataOutputStream create(Path f, boolean overwrite, int bufferSize, short replication,
long blockSize, Progressable progress) throws IOException {
- return fileSystem
- .create(convertToDefaultPath(f), overwrite, bufferSize, replication, blockSize, progress);
+ return wrapOutputStream(f, fileSystem
+ .create(convertToDefaultPath(f), overwrite, bufferSize, replication, blockSize, progress));
}
@Override
public FSDataOutputStream create(Path f, FsPermission permission, EnumSet<CreateFlag> flags,
int bufferSize, short replication, long blockSize, Progressable progress) throws IOException {
- return fileSystem
+ return wrapOutputStream(f, fileSystem
.create(convertToDefaultPath(f), permission, flags, bufferSize, replication, blockSize,
- progress);
+ progress));
}
@Override
public FSDataOutputStream create(Path f, FsPermission permission, EnumSet<CreateFlag> flags,
int bufferSize, short replication, long blockSize, Progressable progress,
Options.ChecksumOpt checksumOpt) throws IOException {
- return fileSystem
+ return wrapOutputStream(f, fileSystem
.create(convertToDefaultPath(f), permission, flags, bufferSize, replication, blockSize,
- progress, checksumOpt);
+ progress, checksumOpt));
}
@Override
public FSDataOutputStream create(Path f, boolean overwrite, int bufferSize, short replication,
long blockSize) throws IOException {
- return fileSystem
- .create(convertToDefaultPath(f), overwrite, bufferSize, replication, blockSize);
+ return wrapOutputStream(f, fileSystem
+ .create(convertToDefaultPath(f), overwrite, bufferSize, replication, blockSize));
}
@Override
public FSDataOutputStream append(Path f, int bufferSize, Progressable progress)
throws IOException {
- return fileSystem.append(convertToDefaultPath(f), bufferSize, progress);
+ return wrapOutputStream(f, fileSystem.append(convertToDefaultPath(f), bufferSize, progress));
}
@Override
@@ -226,7 +239,16 @@ public class HoodieWrapperFileSystem extends FileSystem {
@Override
public boolean delete(Path f, boolean recursive) throws IOException {
- return fileSystem.delete(convertToDefaultPath(f), recursive);
+ boolean success = fileSystem.delete(convertToDefaultPath(f), recursive);
+
+ if (success) {
+ try {
+ consistencyGuard.waitTillFileDisappears(f);
+ } catch (TimeoutException e) {
+ return false;
+ }
+ }
+ return success;
}
@Override
@@ -251,6 +273,11 @@ public class HoodieWrapperFileSystem extends FileSystem {
@Override
public FileStatus getFileStatus(Path f) throws IOException {
+ try {
+ consistencyGuard.waitTillFileAppears(convertToDefaultPath(f));
+ } catch (TimeoutException e) {
+ // pass
+ }
return fileSystem.getFileStatus(convertToDefaultPath(f));
}
@@ -353,12 +380,12 @@ public class HoodieWrapperFileSystem extends FileSystem {
@Override
public FSDataOutputStream append(Path f) throws IOException {
- return fileSystem.append(convertToDefaultPath(f));
+ return wrapOutputStream(f, fileSystem.append(convertToDefaultPath(f)));
}
@Override
public FSDataOutputStream append(Path f, int bufferSize) throws IOException {
- return fileSystem.append(convertToDefaultPath(f), bufferSize);
+ return wrapOutputStream(f, fileSystem.append(convertToDefaultPath(f), bufferSize));
}
@Override
diff --git a/hoodie-client/src/main/java/com/uber/hoodie/io/storage/SizeAwareFSDataOutputStream.java b/hoodie-common/src/main/java/com/uber/hoodie/common/io/storage/SizeAwareFSDataOutputStream.java
similarity index 71%
rename from hoodie-client/src/main/java/com/uber/hoodie/io/storage/SizeAwareFSDataOutputStream.java
rename to hoodie-common/src/main/java/com/uber/hoodie/common/io/storage/SizeAwareFSDataOutputStream.java
index 3f966d6..ac31933 100644
--- a/hoodie-client/src/main/java/com/uber/hoodie/io/storage/SizeAwareFSDataOutputStream.java
+++ b/hoodie-common/src/main/java/com/uber/hoodie/common/io/storage/SizeAwareFSDataOutputStream.java
@@ -14,11 +14,15 @@
* limitations under the License.
*/
-package com.uber.hoodie.io.storage;
+package com.uber.hoodie.common.io.storage;
+import com.uber.hoodie.common.util.ConsistencyGuard;
+import com.uber.hoodie.exception.HoodieException;
import java.io.IOException;
+import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicLong;
import org.apache.hadoop.fs.FSDataOutputStream;
+import org.apache.hadoop.fs.Path;
/**
* Wrapper over <code>FSDataOutputStream</code> to keep track of the size of the written bytes. This
@@ -30,11 +34,17 @@ public class SizeAwareFSDataOutputStream extends FSDataOutputStream {
private final Runnable closeCallback;
// Keep track of the bytes written
private final AtomicLong bytesWritten = new AtomicLong(0L);
+ // Path
+ private final Path path;
+ // Consistency guard
+ private final ConsistencyGuard consistencyGuard;
- public SizeAwareFSDataOutputStream(FSDataOutputStream out, Runnable closeCallback)
- throws IOException {
+ public SizeAwareFSDataOutputStream(Path path, FSDataOutputStream out,
+ ConsistencyGuard consistencyGuard, Runnable closeCallback) throws IOException {
super(out);
+ this.path = path;
this.closeCallback = closeCallback;
+ this.consistencyGuard = consistencyGuard;
}
@Override
@@ -52,6 +62,11 @@ public class SizeAwareFSDataOutputStream extends FSDataOutputStream {
@Override
public void close() throws IOException {
super.close();
+ try {
+ consistencyGuard.waitTillFileAppears(path);
+ } catch (TimeoutException e) {
+ throw new HoodieException(e);
+ }
closeCallback.run();
}
diff --git a/hoodie-common/src/main/java/com/uber/hoodie/common/model/FileSlice.java b/hoodie-common/src/main/java/com/uber/hoodie/common/model/FileSlice.java
index 9393c83..06de961 100644
--- a/hoodie-common/src/main/java/com/uber/hoodie/common/model/FileSlice.java
+++ b/hoodie-common/src/main/java/com/uber/hoodie/common/model/FileSlice.java
@@ -59,7 +59,7 @@ public class FileSlice implements Serializable {
this.fileGroupId = fileGroupId;
this.baseInstantTime = baseInstantTime;
this.dataFile = null;
- this.logFiles = new TreeSet<>(HoodieLogFile.getBaseInstantAndLogVersionComparator());
+ this.logFiles = new TreeSet<>(HoodieLogFile.getReverseLogFileComparator());
}
public void setDataFile(HoodieDataFile dataFile) {
@@ -94,6 +94,10 @@ public class FileSlice implements Serializable {
return Optional.ofNullable(dataFile);
}
+ public Optional<HoodieLogFile> getLatestLogFile() {
+ return logFiles.stream().findFirst();
+ }
+
/**
* Returns true if there is no data file and no log files. Happens as part of pending compaction
* @return
diff --git a/hoodie-common/src/main/java/com/uber/hoodie/common/model/HoodieLogFile.java b/hoodie-common/src/main/java/com/uber/hoodie/common/model/HoodieLogFile.java
index 0820b57..fa11252 100644
--- a/hoodie-common/src/main/java/com/uber/hoodie/common/model/HoodieLogFile.java
+++ b/hoodie-common/src/main/java/com/uber/hoodie/common/model/HoodieLogFile.java
@@ -72,6 +72,10 @@ public class HoodieLogFile implements Serializable {
return FSUtils.getFileVersionFromLog(getPath());
}
+ public String getLogWriteToken() {
+ return FSUtils.getWriteTokenFromLogPath(getPath());
+ }
+
public String getFileExtension() {
return FSUtils.getFileExtensionFromLog(getPath());
}
@@ -96,7 +100,11 @@ public class HoodieLogFile implements Serializable {
return fileStatus;
}
- public HoodieLogFile rollOver(FileSystem fs) throws IOException {
+ public void setFileStatus(FileStatus fileStatus) {
+ this.fileStatus = fileStatus;
+ }
+
+ public HoodieLogFile rollOver(FileSystem fs, String logWriteToken) throws IOException {
String fileId = getFileId();
String baseCommitTime = getBaseCommitTime();
Path path = getPath();
@@ -105,28 +113,50 @@ public class HoodieLogFile implements Serializable {
.computeNextLogVersion(fs, path.getParent(), fileId,
extension, baseCommitTime);
return new HoodieLogFile(new Path(path.getParent(),
- FSUtils.makeLogFileName(fileId, extension, baseCommitTime, newVersion)));
+ FSUtils.makeLogFileName(fileId, extension, baseCommitTime, newVersion, logWriteToken)));
+ }
+
+ public static Comparator<HoodieLogFile> getLogFileComparator() {
+ return new LogFileComparator();
}
- public static Comparator<HoodieLogFile> getBaseInstantAndLogVersionComparator() {
- return new BaseInstantAndLogVersionComparator();
+ public static Comparator<HoodieLogFile> getReverseLogFileComparator() {
+ return new LogFileComparator().reversed();
}
/**
* Comparator to order log-files
*/
- private static class BaseInstantAndLogVersionComparator implements Comparator<HoodieLogFile>, Serializable {
+ public static class LogFileComparator implements Comparator<HoodieLogFile>, Serializable {
+
+ private transient Comparator<HoodieLogFile> writeTokenComparator;
+
+ private Comparator<HoodieLogFile> getWriteTokenComparator() {
+ if (null == writeTokenComparator) {
+ // writeTokenComparator is not serializable. Hence, lazy loading
+ writeTokenComparator = Comparator.nullsFirst(Comparator.comparing(HoodieLogFile::getLogWriteToken));
+ }
+ return writeTokenComparator;
+ }
@Override
public int compare(HoodieLogFile o1, HoodieLogFile o2) {
String baseInstantTime1 = o1.getBaseCommitTime();
String baseInstantTime2 = o2.getBaseCommitTime();
+
if (baseInstantTime1.equals(baseInstantTime2)) {
- // reverse the order by log-version when base-commit is same
- return new Integer(o2.getLogVersion()).compareTo(o1.getLogVersion());
+
+ if (o1.getLogVersion() == o2.getLogVersion()) {
+ // Compare by write token when base-commit and log-version is same
+ return getWriteTokenComparator().compare(o1, o2);
+ }
+
+ // compare by log-version when base-commit is same
+ return Integer.compare(o1.getLogVersion(), o2.getLogVersion());
}
- // reverse the order by base-commits
- return baseInstantTime2.compareTo(baseInstantTime1);
+
+ // compare by base-commits
+ return baseInstantTime1.compareTo(baseInstantTime2);
}
}
diff --git a/hoodie-common/src/main/java/com/uber/hoodie/common/model/HoodieWriteStat.java b/hoodie-common/src/main/java/com/uber/hoodie/common/model/HoodieWriteStat.java
index e74606c..1a59016 100644
--- a/hoodie-common/src/main/java/com/uber/hoodie/common/model/HoodieWriteStat.java
+++ b/hoodie-common/src/main/java/com/uber/hoodie/common/model/HoodieWriteStat.java
@@ -310,11 +310,8 @@ public class HoodieWriteStat implements Serializable {
/**
* Set path and tempPath relative to the given basePath.
*/
- public void setPaths(Path basePath, Path path, Path tempPath) {
+ public void setPath(Path basePath, Path path) {
this.path = path.toString().replace(basePath + "/", "");
- if (tempPath != null) {
- this.tempPath = tempPath.toString().replace(basePath + "/", "");
- }
}
@Override
diff --git a/hoodie-common/src/main/java/com/uber/hoodie/common/table/HoodieTableMetaClient.java b/hoodie-common/src/main/java/com/uber/hoodie/common/table/HoodieTableMetaClient.java
index 7570786..e76f775 100644
--- a/hoodie-common/src/main/java/com/uber/hoodie/common/table/HoodieTableMetaClient.java
+++ b/hoodie-common/src/main/java/com/uber/hoodie/common/table/HoodieTableMetaClient.java
@@ -61,6 +61,7 @@ public class HoodieTableMetaClient implements Serializable {
public static String METAFOLDER_NAME = ".hoodie";
public static String TEMPFOLDER_NAME = METAFOLDER_NAME + File.separator + ".temp";
public static String AUXILIARYFOLDER_NAME = METAFOLDER_NAME + File.separator + ".aux";
+ public static final String MARKER_EXTN = ".marker";
private String basePath;
private transient FileSystem fs;
@@ -143,6 +144,22 @@ public class HoodieTableMetaClient implements Serializable {
}
/**
+ * @return Temp Folder path
+ */
+ public String getTempFolderPath() {
+ return basePath + File.separator + TEMPFOLDER_NAME;
+ }
+
+ /**
+ * Returns Marker folder path
+ * @param instantTs Instant Timestamp
+ * @return
+ */
+ public String getMarkerFolderPath(String instantTs) {
+ return String.format("%s%s%s", getTempFolderPath(), File.separator, instantTs);
+ }
+
+ /**
* @return Auxiliary Meta path
*/
public String getMetaAuxiliaryPath() {
diff --git a/hoodie-common/src/main/java/com/uber/hoodie/common/table/log/HoodieLogFormat.java b/hoodie-common/src/main/java/com/uber/hoodie/common/table/log/HoodieLogFormat.java
index 93e5ac7..abc2c61 100644
--- a/hoodie-common/src/main/java/com/uber/hoodie/common/table/log/HoodieLogFormat.java
+++ b/hoodie-common/src/main/java/com/uber/hoodie/common/table/log/HoodieLogFormat.java
@@ -19,9 +19,11 @@ package com.uber.hoodie.common.table.log;
import com.uber.hoodie.common.model.HoodieLogFile;
import com.uber.hoodie.common.table.log.block.HoodieLogBlock;
import com.uber.hoodie.common.util.FSUtils;
+import com.uber.hoodie.common.util.collection.Pair;
import java.io.Closeable;
import java.io.IOException;
import java.util.Iterator;
+import java.util.Optional;
import org.apache.avro.Schema;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
@@ -48,6 +50,8 @@ public interface HoodieLogFormat {
*/
int currentVersion = 1;
+ String UNKNOWN_WRITE_TOKEN = "1-0-1";
+
/**
* Writer interface to allow appending block to this file format
*/
@@ -106,6 +110,10 @@ public interface HoodieLogFormat {
private Integer logVersion;
// Location of the directory containing the log
private Path parentPath;
+ // Log File Write Token
+ private String logWriteToken;
+ // Rollover Log file write token
+ private String rolloverLogWriteToken;
public WriterBuilder withBufferSize(int bufferSize) {
this.bufferSize = bufferSize;
@@ -117,6 +125,16 @@ public interface HoodieLogFormat {
return this;
}
+ public WriterBuilder withLogWriteToken(String writeToken) {
+ this.logWriteToken = writeToken;
+ return this;
+ }
+
+ public WriterBuilder withRolloverLogWriteToken(String rolloverLogWriteToken) {
+ this.rolloverLogWriteToken = rolloverLogWriteToken;
+ return this;
+ }
+
public WriterBuilder withFs(FileSystem fs) {
this.fs = fs;
return this;
@@ -169,17 +187,37 @@ public interface HoodieLogFormat {
if (parentPath == null) {
throw new IllegalArgumentException("Log file parent location is not specified");
}
+
+ if (rolloverLogWriteToken == null) {
+ rolloverLogWriteToken = UNKNOWN_WRITE_TOKEN;
+ }
+
if (logVersion == null) {
log.info("Computing the next log version for " + logFileId + " in " + parentPath);
- logVersion =
- FSUtils.getCurrentLogVersion(fs, parentPath, logFileId, fileExtension, commitTime);
+ Optional<Pair<Integer, String>> versionAndWriteToken =
+ FSUtils.getLatestLogVersion(fs, parentPath, logFileId, fileExtension, commitTime);
+ if (versionAndWriteToken.isPresent()) {
+ logVersion = versionAndWriteToken.get().getKey();
+ logWriteToken = versionAndWriteToken.get().getValue();
+ } else {
+ logVersion = HoodieLogFile.LOGFILE_BASE_VERSION;
+ // this is the case where there is no existing log-file.
+ // Use rollover write token as write token to create new log file with tokens
+ logWriteToken = rolloverLogWriteToken;
+ }
log.info(
"Computed the next log version for " + logFileId + " in " + parentPath + " as "
- + logVersion);
+ + logVersion + " with write-token " + logWriteToken);
+ }
+
+ if (logWriteToken == null) {
+ // This is the case where we have existing log-file with old format. rollover to avoid any conflicts
+ logVersion += 1;
+ logWriteToken = rolloverLogWriteToken;
}
Path logPath = new Path(parentPath,
- FSUtils.makeLogFileName(logFileId, fileExtension, commitTime, logVersion));
+ FSUtils.makeLogFileName(logFileId, fileExtension, commitTime, logVersion, logWriteToken));
log.info("HoodieLogFile on path " + logPath);
HoodieLogFile logFile = new HoodieLogFile(logPath);
@@ -192,9 +230,9 @@ public interface HoodieLogFormat {
if (sizeThreshold == null) {
sizeThreshold = DEFAULT_SIZE_THRESHOLD;
}
- return new HoodieLogFormatWriter(fs, logFile, bufferSize, replication, sizeThreshold);
+ return new HoodieLogFormatWriter(fs, logFile, bufferSize, replication, sizeThreshold, logWriteToken,
+ rolloverLogWriteToken);
}
-
}
static WriterBuilder newWriterBuilder() {
diff --git a/hoodie-common/src/main/java/com/uber/hoodie/common/table/log/HoodieLogFormatWriter.java b/hoodie-common/src/main/java/com/uber/hoodie/common/table/log/HoodieLogFormatWriter.java
index 28b2501..614f622 100644
--- a/hoodie-common/src/main/java/com/uber/hoodie/common/table/log/HoodieLogFormatWriter.java
+++ b/hoodie-common/src/main/java/com/uber/hoodie/common/table/log/HoodieLogFormatWriter.java
@@ -48,6 +48,8 @@ public class HoodieLogFormatWriter implements HoodieLogFormat.Writer {
private final long sizeThreshold;
private final Integer bufferSize;
private final Short replication;
+ private final String logWriteToken;
+ private final String rolloverLogWriteToken;
private FSDataOutputStream output;
private static final String APPEND_UNAVAILABLE_EXCEPTION_MESSAGE = "not sufficiently replicated yet";
@@ -59,14 +61,15 @@ public class HoodieLogFormatWriter implements HoodieLogFormat.Writer {
* @param sizeThreshold
*/
HoodieLogFormatWriter(FileSystem fs, HoodieLogFile logFile, Integer bufferSize,
- Short replication, Long sizeThreshold)
+ Short replication, Long sizeThreshold, String logWriteToken, String rolloverLogWriteToken)
throws IOException, InterruptedException {
this.fs = fs;
this.logFile = logFile;
this.sizeThreshold = sizeThreshold;
this.bufferSize = bufferSize;
this.replication = replication;
-
+ this.logWriteToken = logWriteToken;
+ this.rolloverLogWriteToken = rolloverLogWriteToken;
Path path = logFile.getPath();
if (fs.exists(path)) {
boolean isAppendSupported = StorageSchemes.isAppendSupported(fs.getScheme());
@@ -87,7 +90,7 @@ public class HoodieLogFormatWriter implements HoodieLogFormat.Writer {
}
}
if (!isAppendSupported) {
- this.logFile = logFile.rollOver(fs);
+ this.logFile = logFile.rollOver(fs, rolloverLogWriteToken);
log.info("Append not supported.. Rolling over to " + logFile);
createNewFile();
}
@@ -180,10 +183,11 @@ public class HoodieLogFormatWriter implements HoodieLogFormat.Writer {
// file).
log.info("CurrentSize " + getCurrentSize() + " has reached threshold " + sizeThreshold
+ ". Rolling over to the next version");
- HoodieLogFile newLogFile = logFile.rollOver(fs);
+ HoodieLogFile newLogFile = logFile.rollOver(fs, rolloverLogWriteToken);
// close this writer and return the new writer
close();
- return new HoodieLogFormatWriter(fs, newLogFile, bufferSize, replication, sizeThreshold);
+ return new HoodieLogFormatWriter(fs, newLogFile, bufferSize, replication, sizeThreshold, logWriteToken,
+ rolloverLogWriteToken);
}
return this;
}
@@ -231,10 +235,15 @@ public class HoodieLogFormatWriter implements HoodieLogFormat.Writer {
// last block. Find more information here : https://issues.apache.org/jira/browse/HDFS-6325
log.warn("Failed to open an append stream to the log file. Opening a new log file..", e);
// Rollover the current log file (since cannot get a stream handle) and create new one
- this.logFile = logFile.rollOver(fs);
+ this.logFile = logFile.rollOver(fs, rolloverLogWriteToken);
+ createNewFile();
+ } else if (e.getClassName().contentEquals(AlreadyBeingCreatedException.class.getName())) {
+ log.warn("Another task executor writing to the same log file(" + logFile + ". Rolling over");
+ // Rollover the current log file (since cannot get a stream handle) and create new one
+ this.logFile = logFile.rollOver(fs, rolloverLogWriteToken);
createNewFile();
- } else if ((e.getClassName().contentEquals(AlreadyBeingCreatedException.class.getName()) || e.getClassName()
- .contentEquals(RecoveryInProgressException.class.getName())) && (fs instanceof DistributedFileSystem)) {
+ } else if (e.getClassName().contentEquals(RecoveryInProgressException.class.getName())
+ && (fs instanceof DistributedFileSystem)) {
// this happens when either another task executor writing to this file died or
// data node is going down. Note that we can only try to recover lease for a DistributedFileSystem.
// ViewFileSystem unfortunately does not support this operation
diff --git a/hoodie-common/src/main/java/com/uber/hoodie/common/util/ConsistencyGuard.java b/hoodie-common/src/main/java/com/uber/hoodie/common/util/ConsistencyGuard.java
new file mode 100644
index 0000000..825c597
--- /dev/null
+++ b/hoodie-common/src/main/java/com/uber/hoodie/common/util/ConsistencyGuard.java
@@ -0,0 +1,89 @@
+/*
+ * Copyright (c) 2019 Uber Technologies, Inc. (hoodie-dev-group@uber.com)
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ *
+ */
+
+package com.uber.hoodie.common.util;
+
+import java.io.IOException;
+import java.util.List;
+import java.util.concurrent.TimeoutException;
+import org.apache.hadoop.fs.Path;
+
+/**
+ * Ensures file create/delete operation is visible
+ */
+public interface ConsistencyGuard {
+
+ /**
+ * File Visibility
+ */
+ enum FileVisibility {
+ APPEAR,
+ DISAPPEAR,
+ }
+
+ /**
+ * Wait for file to be listable based on configurable timeout
+ * @param filePath
+ * @throws IOException when having trouble listing the path
+ * @throws TimeoutException when retries exhausted
+ */
+ void waitTillFileAppears(Path filePath) throws IOException, TimeoutException;
+
+ /**
+ * Wait for file to be listable based on configurable timeout
+ * @param filePath
+ * @throws IOException when having trouble listing the path
+ * @throws TimeoutException when retries exhausted
+ */
+ void waitTillFileDisappears(Path filePath) throws IOException, TimeoutException;
+
+ /**
+ * Wait till all passed files belonging to a directory shows up in the listing
+ */
+ void waitTillAllFilesAppear(String dirPath, List<String> files) throws IOException, TimeoutException;
+
+ /**
+ * Wait till all passed files belonging to a directory disappears from listing
+ */
+ void waitTillAllFilesDisappear(String dirPath, List<String> files) throws IOException, TimeoutException;
+
+
+ /**
+ * Wait Till target visibility is reached
+ * @param dirPath Directory Path
+ * @param files Files
+ * @param targetVisibility Target Visibitlity
+ * @throws IOException
+ * @throws TimeoutException
+ */
+ default void waitTill(String dirPath, List<String> files, FileVisibility targetVisibility)
+ throws IOException, TimeoutException {
+ switch (targetVisibility) {
+ case APPEAR: {
+ waitTillAllFilesAppear(dirPath, files);
+ break;
+ }
+ case DISAPPEAR: {
+ waitTillAllFilesDisappear(dirPath, files);
+ break;
+ }
+ default:
+ throw new IllegalStateException("Unknown File Visibility");
+ }
+ }
+}
diff --git a/hoodie-common/src/main/java/com/uber/hoodie/common/util/FSUtils.java b/hoodie-common/src/main/java/com/uber/hoodie/common/util/FSUtils.java
index b014710..a97cb51 100644
--- a/hoodie-common/src/main/java/com/uber/hoodie/common/util/FSUtils.java
+++ b/hoodie-common/src/main/java/com/uber/hoodie/common/util/FSUtils.java
@@ -16,21 +16,28 @@
package com.uber.hoodie.common.util;
+import static com.uber.hoodie.common.table.HoodieTableMetaClient.MARKER_EXTN;
+
import com.google.common.base.Preconditions;
+import com.uber.hoodie.common.model.HoodieFileFormat;
import com.uber.hoodie.common.model.HoodieLogFile;
import com.uber.hoodie.common.model.HoodiePartitionMetadata;
import com.uber.hoodie.common.table.HoodieTableMetaClient;
import com.uber.hoodie.common.table.timeline.HoodieInstant;
+import com.uber.hoodie.common.util.collection.Pair;
+import com.uber.hoodie.exception.HoodieException;
import com.uber.hoodie.exception.HoodieIOException;
import com.uber.hoodie.exception.InvalidHoodiePathException;
import java.io.File;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Arrays;
-import java.util.Comparator;
+import java.util.LinkedList;
import java.util.List;
import java.util.Map.Entry;
import java.util.Optional;
+import java.util.UUID;
+import java.util.function.Function;
import java.util.regex.Matcher;
import java.util.regex.Pattern;
import java.util.stream.Stream;
@@ -52,13 +59,15 @@ public class FSUtils {
private static final Logger LOG = LogManager.getLogger(FSUtils.class);
// Log files are of this pattern - .b5068208-e1a4-11e6-bf01-fe55135034f3_20170101134598.log.1
- private static final Pattern LOG_FILE_PATTERN = Pattern.compile("\\.(.*)_(.*)\\.(.*)\\.([0-9]*)");
+ private static final Pattern LOG_FILE_PATTERN =
+ Pattern.compile("\\.(.*)_(.*)\\.(.*)\\.([0-9]*)(_(([0-9]*)-([0-9]*)-([0-9]*)))?");
private static final String LOG_FILE_PREFIX = ".";
private static final int MAX_ATTEMPTS_RECOVER_LEASE = 10;
private static final long MIN_CLEAN_TO_KEEP = 10;
private static final long MIN_ROLLBACK_TO_KEEP = 10;
private static final String HOODIE_ENV_PROPS_PREFIX = "HOODIE_ENV_";
+
public static Configuration prepareHadoopConf(Configuration conf) {
conf.set("fs.hdfs.impl", org.apache.hadoop.hdfs.DistributedFileSystem.class.getName());
conf.set("fs.file.impl", org.apache.hadoop.fs.LocalFileSystem.class.getName());
@@ -76,7 +85,6 @@ public class FSUtils {
return conf;
}
-
public static FileSystem getFs(String path, Configuration conf) {
FileSystem fs;
conf = prepareHadoopConf(conf);
@@ -92,26 +100,36 @@ public class FSUtils {
return fs;
}
- public static String makeDataFileName(String commitTime, int taskPartitionId, String fileId) {
- return String.format("%s_%d_%s.parquet", fileId, taskPartitionId, commitTime);
+ /**
+ * A write token uniquely identifies an attempt at one of the IOHandle operations (Merge/Create/Append)
+ */
+ public static String makeWriteToken(int taskPartitionId, int stageId, long taskAttemptId) {
+ return String.format("%d-%d-%d", taskPartitionId, stageId, taskAttemptId);
}
- public static String makeTempDataFileName(String partitionPath, String commitTime,
- int taskPartitionId, String fileId, int stageId, long taskAttemptId) {
- return String.format("%s_%s_%d_%s_%d_%d.parquet", partitionPath.replace("/", "-"), fileId,
- taskPartitionId, commitTime, stageId, taskAttemptId);
+
+ public static String makeDataFileName(String commitTime, String writeToken, String fileId) {
+ return String.format("%s_%s_%s.parquet", fileId, writeToken, commitTime);
}
- public static String maskWithoutFileId(String commitTime, int taskPartitionId) {
- return String.format("*_%s_%s.parquet", taskPartitionId, commitTime);
+ public static String makeMarkerFile(String commitTime, String writeToken, String fileId) {
+ return String.format("%s_%s_%s%s", fileId, writeToken, commitTime, MARKER_EXTN);
}
- public static String maskWithoutTaskPartitionId(String commitTime, String fileId) {
- return String.format("%s_*_%s.parquet", fileId, commitTime);
+ public static String translateMarkerToDataPath(String basePath, String markerPath, String instantTs) {
+ Preconditions.checkArgument(markerPath.endsWith(MARKER_EXTN));
+ String markerRootPath = Path.getPathWithoutSchemeAndAuthority(new Path(
+ String.format("%s/%s/%s", basePath, HoodieTableMetaClient.TEMPFOLDER_NAME, instantTs))).toString();
+ int begin = markerPath.indexOf(markerRootPath);
+ Preconditions.checkArgument(begin >= 0, "Not in marker dir. Marker Path=" + markerPath
+ + ", Expected Marker Root=" + markerRootPath);
+ String rPath = markerPath.substring(begin + markerRootPath.length() + 1);
+ return String.format("%s/%s%s", basePath, rPath.replace(MARKER_EXTN, ""),
+ HoodieFileFormat.PARQUET.getFileExtension());
}
- public static String maskWithOnlyCommitTime(String commitTime) {
- return String.format("*_*_%s.parquet", commitTime);
+ public static String maskWithoutFileId(String commitTime, int taskPartitionId) {
+ return String.format("*_%s_%s%s", taskPartitionId, commitTime, HoodieFileFormat.PARQUET.getFileExtension());
}
public static String getCommitFromCommitFile(String commitFileName) {
@@ -175,18 +193,43 @@ public class FSUtils {
*/
public static List<String> getAllFoldersWithPartitionMetaFile(FileSystem fs, String basePathStr)
throws IOException {
- List<String> partitions = new ArrayList<>();
- Path basePath = new Path(basePathStr);
- RemoteIterator<LocatedFileStatus> allFiles = fs.listFiles(new Path(basePathStr), true);
- while (allFiles.hasNext()) {
- Path filePath = allFiles.next().getPath();
+ final Path basePath = new Path(basePathStr);
+ final List<String> partitions = new ArrayList<>();
+ processFiles(fs, basePathStr, (locatedFileStatus) -> {
+ Path filePath = locatedFileStatus.getPath();
if (filePath.getName().equals(HoodiePartitionMetadata.HOODIE_PARTITION_METAFILE)) {
partitions.add(getRelativePartitionPath(basePath, filePath.getParent()));
}
- }
+ return true;
+ });
return partitions;
}
+ public static final List<String> getAllDataFilesForMarkers(FileSystem fs, String basePath, String instantTs,
+ String markerDir) throws IOException {
+ List<String> dataFiles = new LinkedList<>();
+ FSUtils.processFiles(fs, markerDir, (status) -> {
+ String pathStr = status.getPath().toString();
+ if (pathStr.endsWith(MARKER_EXTN)) {
+ dataFiles.add(FSUtils.translateMarkerToDataPath(basePath, pathStr, instantTs));
+ }
+ return true;
+ });
+ return dataFiles;
+ }
+
+ private static final void processFiles(FileSystem fs, String basePathStr,
+ Function<LocatedFileStatus, Boolean> consumer) throws IOException {
+ RemoteIterator<LocatedFileStatus> allFiles = fs.listFiles(new Path(basePathStr), true);
+ while (allFiles.hasNext()) {
+ LocatedFileStatus status = allFiles.next();
+ boolean success = consumer.apply(status);
+ if (!success) {
+ throw new HoodieException("Failed to process file-status=" + status);
+ }
+ }
+ }
+
public static List<String> getAllPartitionPaths(FileSystem fs, String basePathStr,
boolean assumeDatePartitioning)
throws IOException {
@@ -208,6 +251,12 @@ public class FSUtils {
return name.replace(getFileExtension(name), "");
}
+ /**
+ * Returns a new unique prefix for creating a file group.
+ */
+ public static String createNewFileIdPfx() {
+ return UUID.randomUUID().toString();
+ }
/**
* Get the file extension from the log file
@@ -255,6 +304,53 @@ public class FSUtils {
}
/**
+ * Get TaskId used in log-path
+ */
+ public static Integer getTaskPartitionIdFromLogPath(Path path) {
+ Matcher matcher = LOG_FILE_PATTERN.matcher(path.getName());
+ if (!matcher.find()) {
+ throw new InvalidHoodiePathException(path, "LogFile");
+ }
+ String val = matcher.group(7);
+ return val == null ? null : Integer.parseInt(val);
+ }
+
+ /**
+ * Get Write-Token used in log-path
+ */
+ public static String getWriteTokenFromLogPath(Path path) {
+ Matcher matcher = LOG_FILE_PATTERN.matcher(path.getName());
+ if (!matcher.find()) {
+ throw new InvalidHoodiePathException(path, "LogFile");
+ }
+ return matcher.group(6);
+ }
+
+ /**
+ * Get StageId used in log-path
+ */
+ public static Integer getStageIdFromLogPath(Path path) {
+ Matcher matcher = LOG_FILE_PATTERN.matcher(path.getName());
+ if (!matcher.find()) {
+ throw new InvalidHoodiePathException(path, "LogFile");
+ }
+ String val = matcher.group(8);
+ return val == null ? null : Integer.parseInt(val);
+ }
+
+ /**
+ * Get Task Attempt Id used in log-path
+ */
+ public static Integer getTaskAttemptIdFromLogPath(Path path) {
+ Matcher matcher = LOG_FILE_PATTERN.matcher(path.getName());
+ if (!matcher.find()) {
+ throw new InvalidHoodiePathException(path, "LogFile");
+ }
+ String val = matcher.group(9);
+ return val == null ? null : Integer.parseInt(val);
+ }
+
+ /**
* Get the last part of the file name in the log file and convert to int.
*/
public static int getFileVersionFromLog(Path logPath) {
@@ -266,14 +362,10 @@ public class FSUtils {
}
public static String makeLogFileName(String fileId, String logFileExtension,
- String baseCommitTime, int version) {
- return LOG_FILE_PREFIX + String
- .format("%s_%s%s.%d", fileId, baseCommitTime, logFileExtension, version);
- }
-
- public static String maskWithoutLogVersion(String commitTime, String fileId,
- String logFileExtension) {
- return LOG_FILE_PREFIX + String.format("%s_%s%s*", fileId, commitTime, logFileExtension);
+ String baseCommitTime, int version, String writeToken) {
+ String suffix = (writeToken == null) ? String.format("%s_%s%s.%d",fileId, baseCommitTime, logFileExtension, version)
+ : String.format("%s_%s%s.%d_%s", fileId, baseCommitTime, logFileExtension, version, writeToken);
+ return LOG_FILE_PREFIX + suffix;
}
public static boolean isLogFile(Path logPath) {
@@ -288,9 +380,7 @@ public class FSUtils {
* Get the latest log file written from the list of log files passed in
*/
public static Optional<HoodieLogFile> getLatestLogFile(Stream<HoodieLogFile> logFiles) {
- return logFiles.sorted(Comparator
- .comparing(s -> s.getLogVersion(),
- Comparator.reverseOrder())).findFirst();
+ return logFiles.sorted(HoodieLogFile.getReverseLogFileComparator()).findFirst();
}
/**
@@ -308,36 +398,28 @@ public class FSUtils {
/**
* Get the latest log version for the fileId in the partition path
*/
- public static Optional<Integer> getLatestLogVersion(FileSystem fs, Path partitionPath,
+ public static Optional<Pair<Integer, String>> getLatestLogVersion(FileSystem fs, Path partitionPath,
final String fileId, final String logFileExtension, final String baseCommitTime)
throws IOException {
Optional<HoodieLogFile> latestLogFile =
getLatestLogFile(
getAllLogFiles(fs, partitionPath, fileId, logFileExtension, baseCommitTime));
if (latestLogFile.isPresent()) {
- return Optional.of(latestLogFile.get().getLogVersion());
+ return Optional.of(Pair.of(latestLogFile.get().getLogVersion(),
+ getWriteTokenFromLogPath(latestLogFile.get().getPath())));
}
return Optional.empty();
}
- public static int getCurrentLogVersion(FileSystem fs, Path partitionPath,
- final String fileId, final String logFileExtension, final String baseCommitTime)
- throws IOException {
- Optional<Integer> currentVersion =
- getLatestLogVersion(fs, partitionPath, fileId, logFileExtension, baseCommitTime);
- // handle potential overflow
- return (currentVersion.isPresent()) ? currentVersion.get() : HoodieLogFile.LOGFILE_BASE_VERSION;
- }
-
/**
* computes the next log version for the specified fileId in the partition path
*/
public static int computeNextLogVersion(FileSystem fs, Path partitionPath, final String fileId,
final String logFileExtension, final String baseCommitTime) throws IOException {
- Optional<Integer> currentVersion =
+ Optional<Pair<Integer, String>> currentVersionWithWriteToken =
getLatestLogVersion(fs, partitionPath, fileId, logFileExtension, baseCommitTime);
// handle potential overflow
- return (currentVersion.isPresent()) ? currentVersion.get() + 1
+ return (currentVersionWithWriteToken.isPresent()) ? currentVersionWithWriteToken.get().getKey() + 1
: HoodieLogFile.LOGFILE_BASE_VERSION;
}
@@ -349,10 +431,6 @@ public class FSUtils {
return fs.getDefaultReplication(path);
}
- public static Long getDefaultBlockSize(FileSystem fs, Path path) {
- return fs.getDefaultBlockSize(path);
- }
-
/**
* When a file was opened and the task died without closing the stream, another task executor
* cannot open because the existing lease will be active. We will try to recover the lease, from
@@ -431,8 +509,12 @@ public class FSUtils {
}
public static Path getPartitionPath(String basePath, String partitionPath) {
+ return getPartitionPath(new Path(basePath), partitionPath);
+ }
+
+ public static Path getPartitionPath(Path basePath, String partitionPath) {
// FOr non-partitioned table, return only base-path
- return ((partitionPath == null) || (partitionPath.isEmpty())) ? new Path(basePath) :
+ return ((partitionPath == null) || (partitionPath.isEmpty())) ? basePath :
new Path(basePath, partitionPath);
}
}
diff --git a/hoodie-common/src/main/java/com/uber/hoodie/common/util/FailSafeConsistencyGuard.java b/hoodie-common/src/main/java/com/uber/hoodie/common/util/FailSafeConsistencyGuard.java
new file mode 100644
index 0000000..12b4762
--- /dev/null
+++ b/hoodie-common/src/main/java/com/uber/hoodie/common/util/FailSafeConsistencyGuard.java
@@ -0,0 +1,200 @@
+/*
+ * Copyright (c) 2019 Uber Technologies, Inc. (hoodie-dev-group@uber.com)
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ *
+ */
+
+package com.uber.hoodie.common.util;
+
+import java.io.FileNotFoundException;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+import java.util.concurrent.TimeoutException;
+import java.util.function.Function;
+import java.util.stream.Collectors;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.log4j.LogManager;
+import org.apache.log4j.Logger;
+
+/**
+ * A consistency checker that fails if it is unable to meet the required condition within a specified timeout
+ */
+public class FailSafeConsistencyGuard implements ConsistencyGuard {
+
+ private static final transient Logger log = LogManager.getLogger(FailSafeConsistencyGuard.class);
+
+ private final FileSystem fs;
+ private final int maxAttempts;
+ private final long initialDelayMs;
+ private final long maxDelayMs;
+
+ public FailSafeConsistencyGuard(FileSystem fs, int maxAttempts, long initalDelayMs, long maxDelayMs) {
+ this.fs = fs;
+ this.maxAttempts = maxAttempts;
+ this.initialDelayMs = initalDelayMs;
+ this.maxDelayMs = maxDelayMs;
+ }
+
+ @Override
+ public void waitTillFileAppears(Path filePath) throws TimeoutException {
+ waitForFileVisibility(filePath, FileVisibility.APPEAR);
+ }
+
+ @Override
+ public void waitTillFileDisappears(Path filePath)
+ throws TimeoutException {
+ waitForFileVisibility(filePath, FileVisibility.DISAPPEAR);
+ }
+
+ @Override
+ public void waitTillAllFilesAppear(String dirPath, List<String> files) throws TimeoutException {
+ waitForFilesVisibility(dirPath, files, FileVisibility.APPEAR);
+ }
+
+ @Override
+ public void waitTillAllFilesDisappear(String dirPath, List<String> files) throws TimeoutException {
+ waitForFilesVisibility(dirPath, files, FileVisibility.DISAPPEAR);
+ }
+
+ /**
+ * Helper function to wait for all files belonging to single directory to appear
+ * @param dirPath Dir Path
+ * @param files Files to appear/disappear
+ * @param event Appear/Disappear
+ * @throws TimeoutException
+ */
+ public void waitForFilesVisibility(String dirPath, List<String> files, FileVisibility event)
+ throws TimeoutException {
+ Path dir = new Path(dirPath);
+ List<String> filesWithoutSchemeAndAuthority =
+ files.stream().map(f -> Path.getPathWithoutSchemeAndAuthority(new Path(f))).map(p -> p.toString())
+ .collect(Collectors.toList());
+
+ retryTillSuccess((retryNum) -> {
+ try {
+ log.info("Trying " + retryNum);
+ FileStatus[] entries = fs.listStatus(dir);
+ List<String> gotFiles = Arrays.stream(entries).map(e -> Path.getPathWithoutSchemeAndAuthority(e.getPath()))
+ .map(p -> p.toString()).collect(Collectors.toList());
+ List<String> candidateFiles = new ArrayList<>(filesWithoutSchemeAndAuthority);
+ boolean altered = candidateFiles.removeAll(gotFiles);
+
+ switch (event) {
+ case DISAPPEAR:
+ log.info("Following files are visible" + candidateFiles);
+ // If no candidate files gets removed, it means all of them have disappeared
+ return !altered;
+ case APPEAR:
+ default:
+ // if all files appear, the list is empty
+ return candidateFiles.isEmpty();
+ }
+ } catch (IOException ioe) {
+ log.warn("Got IOException waiting for file event. Have tried " + retryNum + " time(s)", ioe);
+ }
+ return false;
+ }, "Timed out waiting for filles to become visible");
+ }
+
+ /**
+ * Helper to check of file visibility
+ * @param filePath File Path
+ * @param visibility Visibility
+ * @return
+ * @throws IOException
+ */
+ private boolean checkFileVisibility(Path filePath, FileVisibility visibility) throws IOException {
+ try {
+ FileStatus[] status = fs.listStatus(filePath);
+ switch (visibility) {
+ case APPEAR:
+ return status.length != 0;
+ case DISAPPEAR:
+ default:
+ return status.length == 0;
+ }
+ } catch (FileNotFoundException nfe) {
+ switch (visibility) {
+ case APPEAR:
+ return false;
+ case DISAPPEAR:
+ default:
+ return true;
+ }
+ }
+ }
+
+ /**
+ * Helper function to wait till file either appears/disappears
+ * @param filePath File Path
+ * @param visibility
+ * @throws TimeoutException
+ */
+ private void waitForFileVisibility(Path filePath, FileVisibility visibility) throws TimeoutException {
+ long waitMs = initialDelayMs;
+ int attempt = 0;
+ while (attempt < maxAttempts) {
+ try {
+ if (checkFileVisibility(filePath, visibility)) {
+ return;
+ }
+ } catch (IOException ioe) {
+ log.warn("Got IOException waiting for file visibility. Retrying", ioe);
+ }
+
+ sleepSafe(waitMs);
+ waitMs = waitMs * 2; // double check interval every attempt
+ waitMs = waitMs > maxDelayMs ? maxDelayMs : waitMs;
+ attempt++;
+ }
+ throw new TimeoutException("Timed-out waiting for the file to " + visibility.name());
+ }
+
+ /**
+ * Retries the predicate for condfigurable number of times till we the predicate returns success
+ * @param predicate Predicate Function
+ * @param timedOutMessage Timed-Out message for logging
+ * @throws TimeoutException when retries are exhausted
+ */
+ private void retryTillSuccess(Function<Integer, Boolean> predicate, String timedOutMessage) throws TimeoutException {
+ long waitMs = initialDelayMs;
+ int attempt = 0;
+ log.warn("Max Attempts=" + maxAttempts);
+ while (attempt < maxAttempts) {
+ boolean success = predicate.apply(attempt);
+ if (success) {
+ return;
+ }
+ sleepSafe(waitMs);
+ waitMs = waitMs * 2; // double check interval every attempt
+ waitMs = waitMs > maxDelayMs ? maxDelayMs : waitMs;
+ attempt++;
+ }
+ throw new TimeoutException(timedOutMessage);
+
+ }
+
+ void sleepSafe(long waitMs) {
+ try {
+ Thread.sleep(waitMs);
+ } catch (InterruptedException e) {
+ // ignore & continue next attempt
+ }
+ }
+}
diff --git a/hoodie-common/src/main/java/com/uber/hoodie/common/util/NoOpConsistencyGuard.java b/hoodie-common/src/main/java/com/uber/hoodie/common/util/NoOpConsistencyGuard.java
new file mode 100644
index 0000000..acc20b7
--- /dev/null
+++ b/hoodie-common/src/main/java/com/uber/hoodie/common/util/NoOpConsistencyGuard.java
@@ -0,0 +1,46 @@
+/*
+ * Copyright (c) 2019 Uber Technologies, Inc. (hoodie-dev-group@uber.com)
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ *
+ */
+
+package com.uber.hoodie.common.util;
+
+import java.util.List;
+import org.apache.hadoop.fs.Path;
+
+/**
+ * Default Consistency guard that does nothing. Used for HDFS deployments
+ */
+public class NoOpConsistencyGuard implements ConsistencyGuard {
+
+ @Override
+ public void waitTillFileAppears(Path filePath) {
+ }
+
+ @Override
+ public void waitTillFileDisappears(Path filePath) {
+ }
+
+ @Override
+ public void waitTillAllFilesAppear(String dirPath, List<String> files) {
+
+ }
+
+ @Override
+ public void waitTillAllFilesDisappear(String dirPath, List<String> files) {
+
+ }
+}
diff --git a/hoodie-common/src/main/java/com/uber/hoodie/common/util/ParquetUtils.java b/hoodie-common/src/main/java/com/uber/hoodie/common/util/ParquetUtils.java
index a112a6d..24c4868 100644
--- a/hoodie-common/src/main/java/com/uber/hoodie/common/util/ParquetUtils.java
+++ b/hoodie-common/src/main/java/com/uber/hoodie/common/util/ParquetUtils.java
@@ -113,7 +113,6 @@ public class ParquetUtils {
return footer;
}
-
/**
* Get the schema of the given parquet file.
*/
@@ -121,7 +120,6 @@ public class ParquetUtils {
return readMetadata(configuration, parquetFilePath).getFileMetaData().getSchema();
}
-
private static List<String> readParquetFooter(Configuration configuration, Path parquetFilePath,
String... footerNames) {
List<String> footerVals = new ArrayList<>();
diff --git a/hoodie-common/src/test/java/com/uber/hoodie/common/model/HoodieTestUtils.java b/hoodie-common/src/test/java/com/uber/hoodie/common/model/HoodieTestUtils.java
index e8b287a..4627fe3 100644
--- a/hoodie-common/src/test/java/com/uber/hoodie/common/model/HoodieTestUtils.java
+++ b/hoodie-common/src/test/java/com/uber/hoodie/common/model/HoodieTestUtils.java
@@ -79,7 +79,8 @@ public class HoodieTestUtils {
public static final String TEST_EXTENSION = ".test";
public static final String RAW_TRIPS_TEST_NAME = "raw_trips";
- public static final int DEFAULT_TASK_PARTITIONID = 1;
+ public static final String DEFAULT_WRITE_TOKEN = "1-0-1";
+ public static final int DEFAULT_LOG_VERSION = 1;
public static final String[] DEFAULT_PARTITION_PATHS = {"2016/03/15", "2015/03/16", "2015/03/17"};
private static Random rand = new Random(46474747);
@@ -92,8 +93,7 @@ public class HoodieTestUtils {
return init(basePath, HoodieTableType.COPY_ON_WRITE);
}
- public static HoodieTableMetaClient init(String basePath, HoodieTableType tableType)
- throws IOException {
+ public static HoodieTableMetaClient init(String basePath, HoodieTableType tableType) throws IOException {
return initTableType(getDefaultHadoopConf(), basePath, tableType);
}
@@ -163,14 +163,30 @@ public class HoodieTestUtils {
return createDataFile(basePath, partitionPath, commitTime, fileID);
}
+ public static final String createNewMarkerFile(String basePath, String partitionPath, String commitTime)
+ throws IOException {
+ String fileID = UUID.randomUUID().toString();
+ return createMarkerFile(basePath, partitionPath, commitTime, fileID);
+ }
+
public static final String createDataFile(String basePath, String partitionPath, String commitTime, String fileID)
throws IOException {
String folderPath = basePath + "/" + partitionPath + "/";
new File(folderPath).mkdirs();
- new File(folderPath + FSUtils.makeDataFileName(commitTime, DEFAULT_TASK_PARTITIONID, fileID)).createNewFile();
+ new File(folderPath + FSUtils.makeDataFileName(commitTime, DEFAULT_WRITE_TOKEN, fileID)).createNewFile();
return fileID;
}
+ public static final String createMarkerFile(String basePath, String partitionPath, String commitTime, String fileID)
+ throws IOException {
+ String folderPath = basePath + "/" + HoodieTableMetaClient.TEMPFOLDER_NAME + "/" + commitTime + "/"
+ + partitionPath + "/";
+ new File(folderPath).mkdirs();
+ File f = new File(folderPath + FSUtils.makeMarkerFile(commitTime, DEFAULT_WRITE_TOKEN, fileID));
+ f.createNewFile();
+ return f.getAbsolutePath();
+ }
+
public static final String createNewLogFile(FileSystem fs, String basePath, String partitionPath, String commitTime,
String fileID, Optional<Integer> version) throws IOException {
String folderPath = basePath + "/" + partitionPath + "/";
@@ -179,7 +195,9 @@ public class HoodieTestUtils {
throw new IOException("cannot create directory for path " + folderPath);
}
boolean createFile = fs.createNewFile(new Path(
- folderPath + FSUtils.makeLogFileName(fileID, ".log", commitTime, version.orElse(DEFAULT_TASK_PARTITIONID))));
+ folderPath + FSUtils
+ .makeLogFileName(fileID, ".log", commitTime, version.orElse(DEFAULT_LOG_VERSION),
+ HoodieLogFormat.UNKNOWN_WRITE_TOKEN)));
if (!createFile) {
throw new IOException(
StringUtils.format("cannot create data file for commit %s and fileId %s", commitTime, fileID));
@@ -208,39 +226,38 @@ public class HoodieTestUtils {
AvroUtils.serializeCompactionPlan(plan));
}
- public static final String getDataFilePath(String basePath, String partitionPath, String commitTime, String fileID)
- throws IOException {
+ public static final String getDataFilePath(String basePath, String partitionPath, String commitTime, String fileID) {
return basePath + "/" + partitionPath + "/" + FSUtils
- .makeDataFileName(commitTime, DEFAULT_TASK_PARTITIONID, fileID);
+ .makeDataFileName(commitTime, DEFAULT_WRITE_TOKEN, fileID);
}
public static final String getLogFilePath(String basePath, String partitionPath, String commitTime, String fileID,
- Optional<Integer> version) throws IOException {
+ Optional<Integer> version) {
return basePath + "/" + partitionPath + "/" + FSUtils.makeLogFileName(fileID, ".log", commitTime,
- version.orElse(DEFAULT_TASK_PARTITIONID));
+ version.orElse(DEFAULT_LOG_VERSION), HoodieLogFormat.UNKNOWN_WRITE_TOKEN);
}
- public static final String getCommitFilePath(String basePath, String commitTime) throws IOException {
+ public static final String getCommitFilePath(String basePath, String commitTime) {
return basePath + "/" + HoodieTableMetaClient.METAFOLDER_NAME + "/" + commitTime + HoodieTimeline.COMMIT_EXTENSION;
}
- public static final String getInflightCommitFilePath(String basePath, String commitTime) throws IOException {
+ public static final String getInflightCommitFilePath(String basePath, String commitTime) {
return basePath + "/" + HoodieTableMetaClient.METAFOLDER_NAME + "/" + commitTime
+ HoodieTimeline.INFLIGHT_COMMIT_EXTENSION;
}
- public static final String getRequestedCompactionFilePath(String basePath, String commitTime) throws IOException {
+ public static final String getRequestedCompactionFilePath(String basePath, String commitTime) {
return basePath + "/" + HoodieTableMetaClient.AUXILIARYFOLDER_NAME + "/" + commitTime
+ HoodieTimeline.INFLIGHT_COMMIT_EXTENSION;
}
- public static final boolean doesDataFileExist(String basePath, String partitionPath, String commitTime, String fileID)
- throws IOException {
+ public static final boolean doesDataFileExist(String basePath, String partitionPath, String commitTime,
+ String fileID) {
return new File(getDataFilePath(basePath, partitionPath, commitTime, fileID)).exists();
}
public static final boolean doesLogFileExist(String basePath, String partitionPath, String commitTime, String fileID,
- Optional<Integer> version) throws IOException {
+ Optional<Integer> version) {
return new File(getLogFilePath(basePath, partitionPath, commitTime, fileID, version)).exists();
}
@@ -256,10 +273,6 @@ public class HoodieTestUtils {
.exists();
}
- public static String makeInflightTestFileName(String instant) {
- return instant + TEST_EXTENSION + HoodieTimeline.INFLIGHT_EXTENSION;
- }
-
public static void createCleanFiles(String basePath, String commitTime, Configuration configuration)
throws IOException {
Path commitFile = new Path(
@@ -395,4 +408,4 @@ public class HoodieTestUtils {
}
return writeStatList;
}
-}
+}
\ No newline at end of file
diff --git a/hoodie-common/src/test/java/com/uber/hoodie/common/model/TestHoodieWriteStat.java b/hoodie-common/src/test/java/com/uber/hoodie/common/model/TestHoodieWriteStat.java
index a462a6d..7d35895 100644
--- a/hoodie-common/src/test/java/com/uber/hoodie/common/model/TestHoodieWriteStat.java
+++ b/hoodie-common/src/test/java/com/uber/hoodie/common/model/TestHoodieWriteStat.java
@@ -35,27 +35,20 @@ public class TestHoodieWriteStat {
String basePathString = "/data/tables/some-hoodie-table";
String partitionPathString = "2017/12/31";
String fileName = UUID.randomUUID().toString();
- int taskPartitionId = Integer.MAX_VALUE;
- int stageId = Integer.MAX_VALUE;
- long taskAttemptId = Long.MAX_VALUE;
+ String writeToken = "1-0-1";
Path basePath = new Path(basePathString);
Path partitionPath = new Path(basePath, partitionPathString);
Path tempPath = new Path(basePath, HoodieTableMetaClient.TEMPFOLDER_NAME);
- Path finalizeFilePath = new Path(partitionPath, FSUtils.makeDataFileName(commitTime, taskPartitionId, fileName));
- Path tempFilePath = new Path(tempPath, FSUtils
- .makeTempDataFileName(partitionPathString, commitTime, taskPartitionId,
- fileName, stageId, taskAttemptId));
-
+ Path finalizeFilePath = new Path(partitionPath, FSUtils.makeDataFileName(commitTime, writeToken, fileName));
HoodieWriteStat writeStat = new HoodieWriteStat();
- writeStat.setPaths(basePath, finalizeFilePath, tempFilePath);
+ writeStat.setPath(basePath, finalizeFilePath);
assertEquals(finalizeFilePath, new Path(basePath, writeStat.getPath()));
- assertEquals(tempFilePath, new Path(basePath, writeStat.getTempPath()));
// test for null tempFilePath
writeStat = new HoodieWriteStat();
- writeStat.setPaths(basePath, finalizeFilePath, null);
+ writeStat.setPath(basePath, finalizeFilePath);
assertEquals(finalizeFilePath, new Path(basePath, writeStat.getPath()));
assertNull(writeStat.getTempPath());
}
diff --git a/hoodie-common/src/test/java/com/uber/hoodie/common/table/log/HoodieLogFormatTest.java b/hoodie-common/src/test/java/com/uber/hoodie/common/table/log/HoodieLogFormatTest.java
index 602840c..ab5a21b 100644
--- a/hoodie-common/src/test/java/com/uber/hoodie/common/table/log/HoodieLogFormatTest.java
+++ b/hoodie-common/src/test/java/com/uber/hoodie/common/table/log/HoodieLogFormatTest.java
@@ -19,6 +19,7 @@ package com.uber.hoodie.common.table.log;
import static com.uber.hoodie.common.util.SchemaTestUtil.getSimpleSchema;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertTrue;
import static org.junit.Assert.fail;
@@ -176,6 +177,63 @@ public class HoodieLogFormatTest {
}
@Test
+ public void testConcurrentAppendOnExistingLogFileWithoutWriteToken() throws Exception {
+ testConcurrentAppend(true, false);
+ }
+
+ @Test
+ public void testConcurrentAppendOnExistingLogFileWithWriteToken() throws Exception {
+ testConcurrentAppend(true, true);
+ }
+
+ @Test
+ public void testConcurrentAppendOnFirstLogFileVersion() throws Exception {
+ testConcurrentAppend(false, true);
+ }
+
+ private void testConcurrentAppend(boolean logFileExists, boolean newLogFileFormat) throws Exception {
+ HoodieLogFormat.WriterBuilder builder1 = HoodieLogFormat.newWriterBuilder().onParentPath(partitionPath)
+ .withFileExtension(HoodieLogFile.DELTA_EXTENSION).withFileId("test-fileid1")
+ .overBaseCommit("100").withFs(fs);
+ HoodieLogFormat.WriterBuilder builder2 = HoodieLogFormat.newWriterBuilder().onParentPath(partitionPath)
+ .withFileExtension(HoodieLogFile.DELTA_EXTENSION).withFileId("test-fileid1")
+ .overBaseCommit("100").withFs(fs);
+
+ if (newLogFileFormat && logFileExists) {
+ // Assume there is an existing log-file with write token
+ builder1 = builder1.withLogVersion(1).withLogWriteToken(HoodieLogFormat.UNKNOWN_WRITE_TOKEN)
+ .withRolloverLogWriteToken(HoodieLogFormat.UNKNOWN_WRITE_TOKEN);
+ builder2 = builder2.withLogVersion(1).withLogWriteToken(HoodieLogFormat.UNKNOWN_WRITE_TOKEN)
+ .withRolloverLogWriteToken(HoodieLogFormat.UNKNOWN_WRITE_TOKEN);
+ } else if (newLogFileFormat) {
+ // First log file of the file-slice
+ builder1 = builder1.withLogVersion(HoodieLogFile.LOGFILE_BASE_VERSION)
+ .withLogWriteToken(HoodieLogFormat.UNKNOWN_WRITE_TOKEN)
+ .withRolloverLogWriteToken(HoodieLogFormat.UNKNOWN_WRITE_TOKEN);
+ builder2 = builder2.withLogVersion(HoodieLogFile.LOGFILE_BASE_VERSION)
+ .withLogWriteToken(HoodieLogFormat.UNKNOWN_WRITE_TOKEN)
+ .withRolloverLogWriteToken(HoodieLogFormat.UNKNOWN_WRITE_TOKEN);
+ } else {
+ builder1 = builder1.withLogVersion(1).withRolloverLogWriteToken(HoodieLogFormat.UNKNOWN_WRITE_TOKEN);
+ }
+ Writer writer = builder1.build();
+ Writer writer2 = builder2.build();
+ HoodieLogFile logFile1 = writer.getLogFile();
+ HoodieLogFile logFile2 = writer2.getLogFile();
+ List<IndexedRecord> records = SchemaTestUtil.generateTestRecords(0, 100);
+ Map<HoodieLogBlock.HeaderMetadataType, String> header = Maps.newHashMap();
+ header.put(HoodieLogBlock.HeaderMetadataType.INSTANT_TIME, "100");
+ header.put(HoodieLogBlock.HeaderMetadataType.SCHEMA, getSimpleSchema().toString());
+ HoodieAvroDataBlock dataBlock = new HoodieAvroDataBlock(records, header);
+ writer = writer.appendBlock(dataBlock);
+ writer2 = writer2.appendBlock(dataBlock);
+ writer.close();
+ writer2.close();
+ assertNotNull(logFile1.getLogWriteToken());
+ assertEquals("Log Files must have different versions", logFile1.getLogVersion(), logFile2.getLogVersion() - 1);
+ }
+
+ @Test
public void testMultipleAppend() throws IOException, URISyntaxException, InterruptedException {
Writer writer = HoodieLogFormat.newWriterBuilder().onParentPath(partitionPath)
.withFileExtension(HoodieLogFile.DELTA_EXTENSION).withFileId("test-fileid1")
@@ -225,6 +283,12 @@ public class HoodieLogFormatTest {
}
}
+ /**
+ * This is actually a test on concurrent append and not recovery lease.
+ * Commenting this out.
+ * https://issues.apache.org/jira/browse/HUDI-117
+ */
+ /**
@Test
public void testLeaseRecovery() throws IOException, URISyntaxException, InterruptedException {
Writer writer = HoodieLogFormat.newWriterBuilder().onParentPath(partitionPath)
@@ -253,6 +317,7 @@ public class HoodieLogFormatTest {
fs.getFileStatus(writer.getLogFile().getPath()).getLen());
writer.close();
}
+ **/
@Test
public void testAppendNotSupported() throws IOException, URISyntaxException, InterruptedException {
diff --git a/hoodie-common/src/test/java/com/uber/hoodie/common/table/view/HoodieTableFileSystemViewTest.java b/hoodie-common/src/test/java/com/uber/hoodie/common/table/view/HoodieTableFileSystemViewTest.java
index f11acac..cd26a2b 100644
--- a/hoodie-common/src/test/java/com/uber/hoodie/common/table/view/HoodieTableFileSystemViewTest.java
+++ b/hoodie-common/src/test/java/com/uber/hoodie/common/table/view/HoodieTableFileSystemViewTest.java
@@ -65,6 +65,8 @@ import org.junit.rules.TemporaryFolder;
@SuppressWarnings("ResultOfMethodCallIgnored")
public class HoodieTableFileSystemViewTest {
+ private static String TEST_WRITE_TOKEN = "1-0-1";
+
protected HoodieTableMetaClient metaClient;
protected String basePath;
protected SyncableFileSystemView fsView;
@@ -119,8 +121,10 @@ public class HoodieTableFileSystemViewTest {
String instantTime1 = "1";
String deltaInstantTime1 = "2";
String deltaInstantTime2 = "3";
- String fileName1 = FSUtils.makeLogFileName(fileId, HoodieLogFile.DELTA_EXTENSION, instantTime1, 0);
- String fileName2 = FSUtils.makeLogFileName(fileId, HoodieLogFile.DELTA_EXTENSION, instantTime1, 1);
+ String fileName1 = FSUtils.makeLogFileName(fileId, HoodieLogFile.DELTA_EXTENSION,
+ instantTime1, 0, TEST_WRITE_TOKEN);
+ String fileName2 = FSUtils.makeLogFileName(fileId, HoodieLogFile.DELTA_EXTENSION,
+ instantTime1, 1, TEST_WRITE_TOKEN);
new File(basePath + "/" + partitionPath + "/" + fileName1).createNewFile();
new File(basePath + "/" + partitionPath + "/" + fileName2).createNewFile();
HoodieActiveTimeline commitTimeline = metaClient.getActiveTimeline();
@@ -248,11 +252,13 @@ public class HoodieTableFileSystemViewTest {
String dataFileName = null;
if (!skipCreatingDataFile) {
- dataFileName = FSUtils.makeDataFileName(instantTime1, 1, fileId);
+ dataFileName = FSUtils.makeDataFileName(instantTime1, TEST_WRITE_TOKEN, fileId);
new File(basePath + "/" + partitionPath + "/" + dataFileName).createNewFile();
}
- String fileName1 = FSUtils.makeLogFileName(fileId, HoodieLogFile.DELTA_EXTENSION, instantTime1, 0);
- String fileName2 = FSUtils.makeLogFileName(fileId, HoodieLogFile.DELTA_EXTENSION, instantTime1, 1);
+ String fileName1 = FSUtils.makeLogFileName(fileId, HoodieLogFile.DELTA_EXTENSION,
+ instantTime1, 0, TEST_WRITE_TOKEN);
+ String fileName2 = FSUtils.makeLogFileName(fileId, HoodieLogFile.DELTA_EXTENSION,
+ instantTime1, 1, TEST_WRITE_TOKEN);
new File(basePath + "/" + partitionPath + "/" + fileName1).createNewFile();
new File(basePath + "/" + partitionPath + "/" + fileName2).createNewFile();
HoodieActiveTimeline commitTimeline = metaClient.getActiveTimeline();
@@ -267,7 +273,7 @@ public class HoodieTableFileSystemViewTest {
refreshFsView();
List<FileSlice> fileSlices = rtView.getLatestFileSlices(partitionPath).collect(Collectors.toList());
String compactionRequestedTime = "4";
- String compactDataFileName = FSUtils.makeDataFileName(compactionRequestedTime, 1, fileId);
+ String compactDataFileName = FSUtils.makeDataFileName(compactionRequestedTime, TEST_WRITE_TOKEN, fileId);
List<Pair<String, FileSlice>> partitionFileSlicesPairs = new ArrayList<>();
partitionFileSlicesPairs.add(Pair.of(partitionPath, fileSlices.get(0)));
HoodieCompactionPlan compactionPlan = CompactionUtils.buildFromFileSlices(partitionFileSlicesPairs,
@@ -299,8 +305,10 @@ public class HoodieTableFileSystemViewTest {
String deltaInstantTime5 = "6";
List<String> allInstantTimes = Arrays.asList(instantTime1, deltaInstantTime1, deltaInstantTime2,
compactionRequestedTime, deltaInstantTime4, deltaInstantTime5);
- String fileName3 = FSUtils.makeLogFileName(fileId, HoodieLogFile.DELTA_EXTENSION, compactionRequestedTime, 0);
- String fileName4 = FSUtils.makeLogFileName(fileId, HoodieLogFile.DELTA_EXTENSION, compactionRequestedTime, 1);
+ String fileName3 = FSUtils.makeLogFileName(fileId, HoodieLogFile.DELTA_EXTENSION,
+ compactionRequestedTime, 0, TEST_WRITE_TOKEN);
+ String fileName4 = FSUtils.makeLogFileName(fileId, HoodieLogFile.DELTA_EXTENSION,
+ compactionRequestedTime, 1, TEST_WRITE_TOKEN);
new File(basePath + "/" + partitionPath + "/" + fileName3).createNewFile();
new File(basePath + "/" + partitionPath + "/" + fileName4).createNewFile();
HoodieInstant deltaInstant4 = new HoodieInstant(true, HoodieTimeline.DELTA_COMMIT_ACTION, deltaInstantTime4);
@@ -400,15 +408,15 @@ public class HoodieTableFileSystemViewTest {
final String orphanFileId2 = UUID.randomUUID().toString();
final String invalidInstantId = "INVALIDTIME";
String inflightDeltaInstantTime = "7";
- String orphanDataFileName = FSUtils.makeDataFileName(invalidInstantId, 1, orphanFileId1);
+ String orphanDataFileName = FSUtils.makeDataFileName(invalidInstantId, TEST_WRITE_TOKEN, orphanFileId1);
new File(basePath + "/" + partitionPath + "/" + orphanDataFileName).createNewFile();
String orphanLogFileName =
- FSUtils.makeLogFileName(orphanFileId2, HoodieLogFile.DELTA_EXTENSION, invalidInstantId, 0);
+ FSUtils.makeLogFileName(orphanFileId2, HoodieLogFile.DELTA_EXTENSION, invalidInstantId, 0, TEST_WRITE_TOKEN);
new File(basePath + "/" + partitionPath + "/" + orphanLogFileName).createNewFile();
- String inflightDataFileName = FSUtils.makeDataFileName(inflightDeltaInstantTime, 1, inflightFileId1);
+ String inflightDataFileName = FSUtils.makeDataFileName(inflightDeltaInstantTime, TEST_WRITE_TOKEN, inflightFileId1);
new File(basePath + "/" + partitionPath + "/" + inflightDataFileName).createNewFile();
- String inflightLogFileName =
- FSUtils.makeLogFileName(inflightFileId2, HoodieLogFile.DELTA_EXTENSION, inflightDeltaInstantTime, 0);
+ String inflightLogFileName = FSUtils.makeLogFileName(inflightFileId2, HoodieLogFile.DELTA_EXTENSION,
+ inflightDeltaInstantTime, 0, TEST_WRITE_TOKEN);
new File(basePath + "/" + partitionPath + "/" + inflightLogFileName).createNewFile();
// Mark instant as inflight
commitTimeline.saveToInflight(new HoodieInstant(State.INFLIGHT, HoodieTimeline.DELTA_COMMIT_ACTION,
@@ -558,7 +566,7 @@ public class HoodieTableFileSystemViewTest {
// Only one commit, but is not safe
String commitTime1 = "1";
- String fileName1 = FSUtils.makeDataFileName(commitTime1, 1, fileId);
+ String fileName1 = FSUtils.makeDataFileName(commitTime1, TEST_WRITE_TOKEN, fileId);
new File(basePath + "/" + partitionPath + "/" + fileName1).createNewFile();
refreshFsView();
assertFalse("No commit, should not find any data file",
@@ -576,7 +584,7 @@ public class HoodieTableFileSystemViewTest {
// Do another commit, but not safe
String commitTime2 = "2";
- String fileName2 = FSUtils.makeDataFileName(commitTime2, 1, fileId);
+ String fileName2 = FSUtils.makeDataFileName(commitTime2, TEST_WRITE_TOKEN, fileId);
new File(basePath + "/" + partitionPath + "/" + fileName2).createNewFile();
refreshFsView();
assertEquals("", fileName1,
@@ -610,21 +618,21 @@ public class HoodieTableFileSystemViewTest {
String fileId3 = UUID.randomUUID().toString();
String fileId4 = UUID.randomUUID().toString();
- new File(fullPartitionPath + FSUtils.makeDataFileName(commitTime1, 1, fileId1)).createNewFile();
- new File(fullPartitionPath + FSUtils.makeDataFileName(commitTime4, 1, fileId1)).createNewFile();
- new File(fullPartitionPath + FSUtils.makeLogFileName(fileId1, HoodieLogFile.DELTA_EXTENSION, commitTime4, 0))
- .createNewFile();
- new File(fullPartitionPath + FSUtils.makeLogFileName(fileId1, HoodieLogFile.DELTA_EXTENSION, commitTime4, 1))
- .createNewFile();
- new File(fullPartitionPath + FSUtils.makeDataFileName(commitTime1, 1, fileId2)).createNewFile();
- new File(fullPartitionPath + FSUtils.makeDataFileName(commitTime2, 1, fileId2)).createNewFile();
- new File(fullPartitionPath + FSUtils.makeDataFileName(commitTime3, 1, fileId2)).createNewFile();
- new File(fullPartitionPath + FSUtils.makeLogFileName(fileId2, HoodieLogFile.DELTA_EXTENSION, commitTime3, 0))
- .createNewFile();
- new File(fullPartitionPath + FSUtils.makeDataFileName(commitTime3, 1, fileId3)).createNewFile();
- new File(fullPartitionPath + FSUtils.makeDataFileName(commitTime4, 1, fileId3)).createNewFile();
- new File(fullPartitionPath + FSUtils.makeLogFileName(fileId4, HoodieLogFile.DELTA_EXTENSION, commitTime4, 0))
- .createNewFile();
+ new File(fullPartitionPath + FSUtils.makeDataFileName(commitTime1, TEST_WRITE_TOKEN, fileId1)).createNewFile();
+ new File(fullPartitionPath + FSUtils.makeDataFileName(commitTime4, TEST_WRITE_TOKEN, fileId1)).createNewFile();
+ new File(fullPartitionPath + FSUtils.makeLogFileName(fileId1, HoodieLogFile.DELTA_EXTENSION,
+ commitTime4, 0, TEST_WRITE_TOKEN)).createNewFile();
+ new File(fullPartitionPath + FSUtils.makeLogFileName(fileId1, HoodieLogFile.DELTA_EXTENSION,
+ commitTime4, 1, TEST_WRITE_TOKEN)).createNewFile();
+ new File(fullPartitionPath + FSUtils.makeDataFileName(commitTime1, TEST_WRITE_TOKEN, fileId2)).createNewFile();
+ new File(fullPartitionPath + FSUtils.makeDataFileName(commitTime2, TEST_WRITE_TOKEN, fileId2)).createNewFile();
+ new File(fullPartitionPath + FSUtils.makeDataFileName(commitTime3, TEST_WRITE_TOKEN, fileId2)).createNewFile();
+ new File(fullPartitionPath + FSUtils.makeLogFileName(fileId2,
+ HoodieLogFile.DELTA_EXTENSION, commitTime3, 0, TEST_WRITE_TOKEN)).createNewFile();
+ new File(fullPartitionPath + FSUtils.makeDataFileName(commitTime3, TEST_WRITE_TOKEN, fileId3)).createNewFile();
+ new File(fullPartitionPath + FSUtils.makeDataFileName(commitTime4, TEST_WRITE_TOKEN, fileId3)).createNewFile();
+ new File(fullPartitionPath + FSUtils.makeLogFileName(fileId4, HoodieLogFile.DELTA_EXTENSION,
+ commitTime4, 0, TEST_WRITE_TOKEN)).createNewFile();
new File(basePath + "/.hoodie/" + commitTime1 + ".commit").createNewFile();
new File(basePath + "/.hoodie/" + commitTime2 + ".commit").createNewFile();
@@ -653,9 +661,9 @@ public class HoodieTableFileSystemViewTest {
for (HoodieDataFile status : dataFileList) {
filenames.add(status.getFileName());
}
- assertTrue(filenames.contains(FSUtils.makeDataFileName(commitTime4, 1, fileId1)));
- assertTrue(filenames.contains(FSUtils.makeDataFileName(commitTime3, 1, fileId2)));
- assertTrue(filenames.contains(FSUtils.makeDataFileName(commitTime4, 1, fileId3)));
+ assertTrue(filenames.contains(FSUtils.makeDataFileName(commitTime4, TEST_WRITE_TOKEN, fileId1)));
+ assertTrue(filenames.contains(FSUtils.makeDataFileName(commitTime3, TEST_WRITE_TOKEN, fileId2)));
+ assertTrue(filenames.contains(FSUtils.makeDataFileName(commitTime4, TEST_WRITE_TOKEN, fileId3)));
filenames = Sets.newHashSet();
List<HoodieLogFile> logFilesList = rtView.getLatestFileSlicesBeforeOrOn("2016/05/01", commitTime4)
@@ -665,10 +673,14 @@ public class HoodieTableFileSystemViewTest {
for (HoodieLogFile logFile : logFilesList) {
filenames.add(logFile.getFileName());
}
- assertTrue(filenames.contains(FSUtils.makeLogFileName(fileId1, HoodieLogFile.DELTA_EXTENSION, commitTime4, 0)));
- assertTrue(filenames.contains(FSUtils.makeLogFileName(fileId1, HoodieLogFile.DELTA_EXTENSION, commitTime4, 1)));
- assertTrue(filenames.contains(FSUtils.makeLogFileName(fileId2, HoodieLogFile.DELTA_EXTENSION, commitTime3, 0)));
- assertTrue(filenames.contains(FSUtils.makeLogFileName(fileId4, HoodieLogFile.DELTA_EXTENSION, commitTime4, 0)));
+ assertTrue(filenames.contains(FSUtils.makeLogFileName(fileId1, HoodieLogFile.DELTA_EXTENSION,
+ commitTime4, 0, TEST_WRITE_TOKEN)));
+ assertTrue(filenames.contains(FSUtils.makeLogFileName(fileId1, HoodieLogFile.DELTA_EXTENSION,
+ commitTime4, 1, TEST_WRITE_TOKEN)));
+ assertTrue(filenames.contains(FSUtils.makeLogFileName(fileId2, HoodieLogFile.DELTA_EXTENSION,
+ commitTime3, 0, TEST_WRITE_TOKEN)));
+ assertTrue(filenames.contains(FSUtils.makeLogFileName(fileId4, HoodieLogFile.DELTA_EXTENSION,
+ commitTime4, 0, TEST_WRITE_TOKEN)));
// Reset the max commit time
List<HoodieDataFile> dataFiles = roView.getLatestDataFilesBeforeOrOn("2016/05/01", commitTime3)
@@ -679,12 +691,12 @@ public class HoodieTableFileSystemViewTest {
}
if (!isLatestFileSliceOnly) {
assertEquals(3, dataFiles.size());
- assertTrue(filenames.contains(FSUtils.makeDataFileName(commitTime1, 1, fileId1)));
- assertTrue(filenames.contains(FSUtils.makeDataFileName(commitTime3, 1, fileId2)));
- assertTrue(filenames.contains(FSUtils.makeDataFileName(commitTime3, 1, fileId3)));
+ assertTrue(filenames.contains(FSUtils.makeDataFileName(commitTime1, TEST_WRITE_TOKEN, fileId1)));
+ assertTrue(filenames.contains(FSUtils.makeDataFileName(commitTime3, TEST_WRITE_TOKEN, fileId2)));
+ assertTrue(filenames.contains(FSUtils.makeDataFileName(commitTime3, TEST_WRITE_TOKEN, fileId3)));
} else {
assertEquals(1, dataFiles.size());
- assertTrue(filenames.contains(FSUtils.makeDataFileName(commitTime3, 1, fileId2)));
+ assertTrue(filenames.contains(FSUtils.makeDataFileName(commitTime3, TEST_WRITE_TOKEN, fileId2)));
}
logFilesList =
@@ -692,7 +704,7 @@ public class HoodieTableFileSystemViewTest {
.flatMap(logFileList -> logFileList).collect(Collectors.toList());
assertEquals(logFilesList.size(), 1);
assertTrue(logFilesList.get(0).getFileName()
- .equals(FSUtils.makeLogFileName(fileId2, HoodieLogFile.DELTA_EXTENSION, commitTime3, 0)));
+ .equals(FSUtils.makeLogFileName(fileId2, HoodieLogFile.DELTA_EXTENSION, commitTime3, 0, TEST_WRITE_TOKEN)));
}
@@ -713,13 +725,13 @@ public class HoodieTableFileSystemViewTest {
String fileId2 = UUID.randomUUID().toString();
String fileId3 = UUID.randomUUID().toString();
- new File(fullPartitionPath + FSUtils.makeDataFileName(commitTime1, 1, fileId1)).createNewFile();
- new File(fullPartitionPath + FSUtils.makeDataFileName(commitTime4, 1, fileId1)).createNewFile();
- new File(fullPartitionPath + FSUtils.makeDataFileName(commitTime1, 1, fileId2)).createNewFile();
- new File(fullPartitionPath + FSUtils.makeDataFileName(commitTime2, 1, fileId2)).createNewFile();
- new File(fullPartitionPath + FSUtils.makeDataFileName(commitTime3, 1, fileId2)).createNewFile();
- new File(fullPartitionPath + FSUtils.makeDataFileName(commitTime3, 1, fileId3)).createNewFile();
- new File(fullPartitionPath + FSUtils.makeDataFileName(commitTime4, 1, fileId3)).createNewFile();
+ new File(fullPartitionPath + FSUtils.makeDataFileName(commitTime1, TEST_WRITE_TOKEN, fileId1)).createNewFile();
+ new File(fullPartitionPath + FSUtils.makeDataFileName(commitTime4, TEST_WRITE_TOKEN, fileId1)).createNewFile();
+ new File(fullPartitionPath + FSUtils.makeDataFileName(commitTime1, TEST_WRITE_TOKEN, fileId2)).createNewFile();
+ new File(fullPartitionPath + FSUtils.makeDataFileName(commitTime2, TEST_WRITE_TOKEN, fileId2)).createNewFile();
+ new File(fullPartitionPath + FSUtils.makeDataFileName(commitTime3, TEST_WRITE_TOKEN, fileId2)).createNewFile();
+ new File(fullPartitionPath + FSUtils.makeDataFileName(commitTime3, TEST_WRITE_TOKEN, fileId3)).createNewFile();
+ new File(fullPartitionPath + FSUtils.makeDataFileName(commitTime4, TEST_WRITE_TOKEN, fileId3)).createNewFile();
new File(basePath + "/.hoodie/" + commitTime1 + ".commit").createNewFile();
new File(basePath + "/.hoodie/" + commitTime2 + ".commit").createNewFile();
@@ -744,22 +756,22 @@ public class HoodieTableFileSystemViewTest {
Set<String> expFileNames = new HashSet<>();
if (fileId.equals(fileId1)) {
if (!isLatestFileSliceOnly) {
- expFileNames.add(FSUtils.makeDataFileName(commitTime1, 1, fileId1));
+ expFileNames.add(FSUtils.makeDataFileName(commitTime1, TEST_WRITE_TOKEN, fileId1));
}
- expFileNames.add(FSUtils.makeDataFileName(commitTime4, 1, fileId1));
+ expFileNames.add(FSUtils.makeDataFileName(commitTime4, TEST_WRITE_TOKEN, fileId1));
assertEquals(expFileNames, filenames);
} else if (fileId.equals(fileId2)) {
if (!isLatestFileSliceOnly) {
- expFileNames.add(FSUtils.makeDataFileName(commitTime1, 1, fileId2));
- expFileNames.add(FSUtils.makeDataFileName(commitTime2, 1, fileId2));
+ expFileNames.add(FSUtils.makeDataFileName(commitTime1, TEST_WRITE_TOKEN, fileId2));
+ expFileNames.add(FSUtils.makeDataFileName(commitTime2, TEST_WRITE_TOKEN, fileId2));
}
- expFileNames.add(FSUtils.makeDataFileName(commitTime3, 1, fileId2));
+ expFileNames.add(FSUtils.makeDataFileName(commitTime3, TEST_WRITE_TOKEN, fileId2));
assertEquals(expFileNames, filenames);
} else {
if (!isLatestFileSliceOnly) {
- expFileNames.add(FSUtils.makeDataFileName(commitTime3, 1, fileId3));
+ expFileNames.add(FSUtils.makeDataFileName(commitTime3, TEST_WRITE_TOKEN, fileId3));
}
- expFileNames.add(FSUtils.makeDataFileName(commitTime4, 1, fileId3));
+ expFileNames.add(FSUtils.makeDataFileName(commitTime4, TEST_WRITE_TOKEN, fileId3));
assertEquals(expFileNames, filenames);
}
}
@@ -782,19 +794,19 @@ public class HoodieTableFileSystemViewTest {
String fileId2 = UUID.randomUUID().toString();
String fileId3 = UUID.randomUUID().toString();
- new File(fullPartitionPath + FSUtils.makeDataFileName(commitTime1, 1, fileId1)).createNewFile();
- new File(fullPartitionPath + FSUtils.makeLogFileName(fileId1, HoodieLogFile.DELTA_EXTENSION, commitTime1, 0))
- .createNewFile();
- new File(fullPartitionPath + FSUtils.makeDataFileName(commitTime3, 1, fileId1)).createNewFile();
+ new File(fullPartitionPath + FSUtils.makeDataFileName(commitTime1, TEST_WRITE_TOKEN, fileId1)).createNewFile();
+ new File(fullPartitionPath + FSUtils.makeLogFileName(fileId1, HoodieLogFile.DELTA_EXTENSION,
+ commitTime1, 0, TEST_WRITE_TOKEN)).createNewFile();
+ new File(fullPartitionPath + FSUtils.makeDataFileName(commitTime3, TEST_WRITE_TOKEN, fileId1)).createNewFile();
- new File(fullPartitionPath + FSUtils.makeDataFileName(commitTime1, 1, fileId2)).createNewFile();
- new File(fullPartitionPath + FSUtils.makeDataFileName(commitTime2, 1, fileId2)).createNewFile();
- new File(fullPartitionPath + FSUtils.makeDataFileName(commitTime3, 1, fileId2)).createNewFile();
- new File(fullPartitionPath + FSUtils.makeLogFileName(fileId2, HoodieLogFile.DELTA_EXTENSION, commitTime3, 0))
- .createNewFile();
+ new File(fullPartitionPath + FSUtils.makeDataFileName(commitTime1, TEST_WRITE_TOKEN, fileId2)).createNewFile();
+ new File(fullPartitionPath + FSUtils.makeDataFileName(commitTime2, TEST_WRITE_TOKEN, fileId2)).createNewFile();
+ new File(fullPartitionPath + FSUtils.makeDataFileName(commitTime3, TEST_WRITE_TOKEN, fileId2)).createNewFile();
+ new File(fullPartitionPath + FSUtils.makeLogFileName(fileId2, HoodieLogFile.DELTA_EXTENSION,
+ commitTime3, 0, TEST_WRITE_TOKEN)).createNewFile();
- new File(fullPartitionPath + FSUtils.makeDataFileName(commitTime3, 1, fileId3)).createNewFile();
- new File(fullPartitionPath + FSUtils.makeDataFileName(commitTime4, 1, fileId3)).createNewFile();
+ new File(fullPartitionPath + FSUtils.makeDataFileName(commitTime3, TEST_WRITE_TOKEN, fileId3)).createNewFile();
+ new File(fullPartitionPath + FSUtils.makeDataFileName(commitTime4, TEST_WRITE_TOKEN, fileId3)).createNewFile();
new File(basePath + "/.hoodie/" + commitTime1 + ".commit").createNewFile();
new File(basePath + "/.hoodie/" + commitTime2 + ".commit").createNewFile();
@@ -817,10 +829,10 @@ public class HoodieTableFileSystemViewTest {
filenames.add(status.getFileName());
}
- assertTrue(filenames.contains(FSUtils.makeDataFileName(commitTime3, 1, fileId1)));
- assertTrue(filenames.contains(FSUtils.makeDataFileName(commitTime3, 1, fileId2)));
+ assertTrue(filenames.contains(FSUtils.makeDataFileName(commitTime3, TEST_WRITE_TOKEN, fileId1)));
+ assertTrue(filenames.contains(FSUtils.makeDataFileName(commitTime3, TEST_WRITE_TOKEN, fileId2)));
if (!isLatestFileSliceOnly) {
- assertTrue(filenames.contains(FSUtils.makeDataFileName(commitTime3, 1, fileId3)));
+ assertTrue(filenames.contains(FSUtils.makeDataFileName(commitTime3, TEST_WRITE_TOKEN, fileId3)));
}
List<FileSlice> slices = rtView.getLatestFileSliceInRange(Lists.newArrayList(commitTime3, commitTime4))
@@ -861,13 +873,13 @@ public class HoodieTableFileSystemViewTest {
String fileId2 = UUID.randomUUID().toString();
String fileId3 = UUID.randomUUID().toString();
- new File(fullPartitionPath + FSUtils.makeDataFileName(commitTime1, 1, fileId1)).createNewFile();
- new File(fullPartitionPath + FSUtils.makeDataFileName(commitTime4, 1, fileId1)).createNewFile();
- new File(fullPartitionPath + FSUtils.makeDataFileName(commitTime1, 1, fileId2)).createNewFile();
- new File(fullPartitionPath + FSUtils.makeDataFileName(commitTime2, 1, fileId2)).createNewFile();
- new File(fullPartitionPath + FSUtils.makeDataFileName(commitTime3, 1, fileId2)).createNewFile();
- new File(fullPartitionPath + FSUtils.makeDataFileName(commitTime3, 1, fileId3)).createNewFile();
- new File(fullPartitionPath + FSUtils.makeDataFileName(commitTime4, 1, fileId3)).createNewFile();
+ new File(fullPartitionPath + FSUtils.makeDataFileName(commitTime1, TEST_WRITE_TOKEN, fileId1)).createNewFile();
+ new File(fullPartitionPath + FSUtils.makeDataFileName(commitTime4, TEST_WRITE_TOKEN, fileId1)).createNewFile();
+ new File(fullPartitionPath + FSUtils.makeDataFileName(commitTime1, TEST_WRITE_TOKEN, fileId2)).createNewFile();
+ new File(fullPartitionPath + FSUtils.makeDataFileName(commitTime2, TEST_WRITE_TOKEN, fileId2)).createNewFile();
+ new File(fullPartitionPath + FSUtils.makeDataFileName(commitTime3, TEST_WRITE_TOKEN, fileId2)).createNewFile();
+ new File(fullPartitionPath + FSUtils.makeDataFileName(commitTime3, TEST_WRITE_TOKEN, fileId3)).createNewFile();
+ new File(fullPartitionPath + FSUtils.makeDataFileName(commitTime4, TEST_WRITE_TOKEN, fileId3)).createNewFile();
new File(basePath + "/.hoodie/" + commitTime1 + ".commit").createNewFile();
new File(basePath + "/.hoodie/" + commitTime2 + ".commit").createNewFile();
@@ -887,8 +899,8 @@ public class HoodieTableFileSystemViewTest {
for (HoodieDataFile status : dataFiles) {
filenames.add(status.getFileName());
}
- assertTrue(filenames.contains(FSUtils.makeDataFileName(commitTime1, 1, fileId1)));
- assertTrue(filenames.contains(FSUtils.makeDataFileName(commitTime2, 1, fileId2)));
+ assertTrue(filenames.contains(FSUtils.makeDataFileName(commitTime1, TEST_WRITE_TOKEN, fileId1)));
+ assertTrue(filenames.contains(FSUtils.makeDataFileName(commitTime2, TEST_WRITE_TOKEN, fileId2)));
} else {
assertEquals(0, dataFiles.size());
}
@@ -912,28 +924,31 @@ public class HoodieTableFileSystemViewTest {
String fileId2 = UUID.randomUUID().toString();
String fileId3 = UUID.randomUUID().toString();
- new File(fullPartitionPath + "/" + FSUtils.makeDataFileName(commitTime1, 1, fileId1)).createNewFile();
+ new File(fullPartitionPath + "/" + FSUtils.makeDataFileName(commitTime1,
+ TEST_WRITE_TOKEN, fileId1)).createNewFile();
new File(fullPartitionPath + "/"
- + FSUtils.makeLogFileName(fileId1, HoodieLogFile.DELTA_EXTENSION, commitTime1, 0)).createNewFile();
+ + FSUtils.makeLogFileName(fileId1, HoodieLogFile.DELTA_EXTENSION,
+ commitTime1, 0, TEST_WRITE_TOKEN)).createNewFile();
new File(fullPartitionPath + "/"
- + FSUtils.makeDataFileName(commitTime4, 1, fileId1)).createNewFile();
+ + FSUtils.makeDataFileName(commitTime4, TEST_WRITE_TOKEN, fileId1)).createNewFile();
new File(fullPartitionPath + "/"
- + FSUtils.makeLogFileName(fileId1, HoodieLogFile.DELTA_EXTENSION, commitTime4, 0)).createNewFile();
+ + FSUtils.makeLogFileName(fileId1, HoodieLogFile.DELTA_EXTENSION,
+ commitTime4, 0, TEST_WRITE_TOKEN)).createNewFile();
new File(fullPartitionPath + "/"
- + FSUtils.makeDataFileName(commitTime1, 1, fileId2)).createNewFile();
+ + FSUtils.makeDataFileName(commitTime1, TEST_WRITE_TOKEN, fileId2)).createNewFile();
new File(fullPartitionPath + "/"
- + FSUtils.makeDataFileName(commitTime2, 1, fileId2)).createNewFile();
+ + FSUtils.makeDataFileName(commitTime2, TEST_WRITE_TOKEN, fileId2)).createNewFile();
new File(fullPartitionPath + "/"
- + FSUtils.makeLogFileName(fileId2, HoodieLogFile.DELTA_EXTENSION, commitTime2, 0))
+ + FSUtils.makeLogFileName(fileId2, HoodieLogFile.DELTA_EXTENSION, commitTime2, 0, TEST_WRITE_TOKEN))
.createNewFile();
new File(fullPartitionPath + "/"
- + FSUtils.makeDataFileName(commitTime3, 1, fileId2)).createNewFile();
+ + FSUtils.makeDataFileName(commitTime3, TEST_WRITE_TOKEN, fileId2)).createNewFile();
new File(fullPartitionPath + "/"
- + FSUtils.makeDataFileName(commitTime3, 1, fileId3)).createNewFile();
+ + FSUtils.makeDataFileName(commitTime3, TEST_WRITE_TOKEN, fileId3)).createNewFile();
new File(fullPartitionPath + "/"
- + FSUtils.makeDataFileName(commitTime4, 1, fileId3)).createNewFile();
+ + FSUtils.makeDataFileName(commitTime4, TEST_WRITE_TOKEN, fileId3)).createNewFile();
new File(basePath + "/.hoodie/" + commitTime1 + ".commit").createNewFile();
new File(basePath + "/.hoodie/" + commitTime2 + ".commit").createNewFile();
@@ -979,9 +994,9 @@ public class HoodieTableFileSystemViewTest {
for (HoodieDataFile status : statuses1) {
filenames.add(status.getFileName());
}
- assertTrue(filenames.contains(FSUtils.makeDataFileName(commitTime4, 1, fileId1)));
- assertTrue(filenames.contains(FSUtils.makeDataFileName(commitTime3, 1, fileId2)));
- assertTrue(filenames.contains(FSUtils.makeDataFileName(commitTime4, 1, fileId3)));
+ assertTrue(filenames.contains(FSUtils.makeDataFileName(commitTime4, TEST_WRITE_TOKEN, fileId1)));
+ assertTrue(filenames.contains(FSUtils.makeDataFileName(commitTime3, TEST_WRITE_TOKEN, fileId2)));
+ assertTrue(filenames.contains(FSUtils.makeDataFileName(commitTime4, TEST_WRITE_TOKEN, fileId3)));
}
@Test
@@ -1002,16 +1017,17 @@ public class HoodieTableFileSystemViewTest {
String deltaInstantTime2 = "3";
String fileId = UUID.randomUUID().toString();
- String dataFileName = FSUtils.makeDataFileName(instantTime1, 1, fileId);
+ String dataFileName = FSUtils.makeDataFileName(instantTime1, TEST_WRITE_TOKEN, fileId);
new File(fullPartitionPath1 + dataFileName).createNewFile();
- String fileName1 = FSUtils.makeLogFileName(fileId, HoodieLogFile.DELTA_EXTENSION, instantTime1, 0);
+ String fileName1 = FSUtils.makeLogFileName(fileId, HoodieLogFile.DELTA_EXTENSION,
+ instantTime1, 0, TEST_WRITE_TOKEN);
new File(fullPartitionPath1 + fileName1)
.createNewFile();
- new File(fullPartitionPath2 + FSUtils.makeDataFileName(instantTime1, 1, fileId)).createNewFile();
+ new File(fullPartitionPath2 + FSUtils.makeDataFileName(instantTime1, TEST_WRITE_TOKEN, fileId)).createNewFile();
new File(fullPartitionPath2 + fileName1)
.createNewFile();
- new File(fullPartitionPath3 + FSUtils.makeDataFileName(instantTime1, 1, fileId)).createNewFile();
+ new File(fullPartitionPath3 + FSUtils.makeDataFileName(instantTime1, TEST_WRITE_TOKEN, fileId)).createNewFile();
new File(fullPartitionPath3 + fileName1)
.createNewFile();
@@ -1052,7 +1068,7 @@ public class HoodieTableFileSystemViewTest {
partitionFileSlicesPairs.add(Pair.of(partitionPath3, fileSlices.get(0)));
String compactionRequestedTime = "2";
- String compactDataFileName = FSUtils.makeDataFileName(compactionRequestedTime, 1, fileId);
+ String compactDataFileName = FSUtils.makeDataFileName(compactionRequestedTime, TEST_WRITE_TOKEN, fileId);
HoodieCompactionPlan compactionPlan = CompactionUtils.buildFromFileSlices(partitionFileSlicesPairs,
Optional.empty(), Optional.empty());
@@ -1072,8 +1088,10 @@ public class HoodieTableFileSystemViewTest {
String deltaInstantTime5 = "6";
List<String> allInstantTimes = Arrays.asList(instantTime1, deltaInstantTime1, deltaInstantTime2,
compactionRequestedTime, deltaInstantTime4, deltaInstantTime5);
- String fileName3 = FSUtils.makeLogFileName(fileId, HoodieLogFile.DELTA_EXTENSION, compactionRequestedTime, 0);
- String fileName4 = FSUtils.makeLogFileName(fileId, HoodieLogFile.DELTA_EXTENSION, compactionRequestedTime, 1);
+ String fileName3 = FSUtils.makeLogFileName(fileId, HoodieLogFile.DELTA_EXTENSION,
+ compactionRequestedTime, 0, TEST_WRITE_TOKEN);
+ String fileName4 = FSUtils.makeLogFileName(fileId, HoodieLogFile.DELTA_EXTENSION,
+ compactionRequestedTime, 1, TEST_WRITE_TOKEN);
new File(basePath + "/" + partitionPath1 + "/" + fileName3).createNewFile();
new File(basePath + "/" + partitionPath1 + "/" + fileName4).createNewFile();
new File(basePath + "/" + partitionPath2 + "/" + fileName3).createNewFile();
diff --git a/hoodie-common/src/test/java/com/uber/hoodie/common/table/view/IncrementalFSViewSyncTest.java b/hoodie-common/src/test/java/com/uber/hoodie/common/table/view/IncrementalFSViewSyncTest.java
index dc31c1a..a57ebad 100644
--- a/hoodie-common/src/test/java/com/uber/hoodie/common/table/view/IncrementalFSViewSyncTest.java
+++ b/hoodie-common/src/test/java/com/uber/hoodie/common/table/view/IncrementalFSViewSyncTest.java
@@ -69,6 +69,8 @@ import org.junit.rules.TemporaryFolder;
public class IncrementalFSViewSyncTest {
+ private static String TEST_WRITE_TOKEN = "1-0-1";
+
protected HoodieTableMetaClient metaClient;
protected String basePath;
@@ -756,8 +758,8 @@ public class IncrementalFSViewSyncTest {
return fileIdsPerPartition.stream().map(f -> {
try {
File file = new File(basePath + "/" + p + "/"
- + (deltaCommit ? FSUtils.makeLogFileName(f, ".log", baseInstant, Integer.parseInt(instant)) :
- FSUtils.makeDataFileName(instant, 0, f)));
+ + (deltaCommit ? FSUtils.makeLogFileName(f, ".log", baseInstant,
+ Integer.parseInt(instant), TEST_WRITE_TOKEN) : FSUtils.makeDataFileName(instant, TEST_WRITE_TOKEN, f)));
file.createNewFile();
HoodieWriteStat w = new HoodieWriteStat();
w.setFileId(f);
diff --git a/hoodie-common/src/test/java/com/uber/hoodie/common/util/CompactionTestUtils.java b/hoodie-common/src/test/java/com/uber/hoodie/common/util/CompactionTestUtils.java
index 5f46f8b..527322c 100644
--- a/hoodie-common/src/test/java/com/uber/hoodie/common/util/CompactionTestUtils.java
+++ b/hoodie-common/src/test/java/com/uber/hoodie/common/util/CompactionTestUtils.java
@@ -49,6 +49,8 @@ import org.junit.Assert;
public class CompactionTestUtils {
+ private static String TEST_WRITE_TOKEN = "1-0-1";
+
public static Map<HoodieFileGroupId, Pair<String, HoodieCompactionOperation>> setupAndValidateCompactionOperations(
HoodieTableMetaClient metaClient, boolean inflight,
int numEntriesInPlan1, int numEntriesInPlan2,
@@ -151,7 +153,7 @@ public class CompactionTestUtils {
FileSlice slice = new FileSlice(DEFAULT_PARTITION_PATHS[0], instantId, fileId);
if (createDataFile) {
slice.setDataFile(new TestHoodieDataFile(metaClient.getBasePath() + "/" + DEFAULT_PARTITION_PATHS[0]
- + "/" + FSUtils.makeDataFileName(instantId, 1, fileId)));
+ + "/" + FSUtils.makeDataFileName(instantId, TEST_WRITE_TOKEN, fileId)));
}
String logFilePath1 = HoodieTestUtils
.getLogFilePath(metaClient.getBasePath(), DEFAULT_PARTITION_PATHS[0], instantId, fileId,
diff --git a/hoodie-common/src/test/java/com/uber/hoodie/common/util/TestCompactionUtils.java b/hoodie-common/src/test/java/com/uber/hoodie/common/util/TestCompactionUtils.java
index c364884..f689039 100644
--- a/hoodie-common/src/test/java/com/uber/hoodie/common/util/TestCompactionUtils.java
+++ b/hoodie-common/src/test/java/com/uber/hoodie/common/util/TestCompactionUtils.java
@@ -50,6 +50,8 @@ import org.junit.rules.TemporaryFolder;
public class TestCompactionUtils {
+ private static String TEST_WRITE_TOKEN = "1-0-1";
+
private static final Map<String, Double> metrics =
new ImmutableMap.Builder<String, Double>()
.put("key1", 1.0)
@@ -85,9 +87,9 @@ public class TestCompactionUtils {
//File Slice with no data-file but log files present
FileSlice noDataFileSlice = new FileSlice(DEFAULT_PARTITION_PATHS[0],"000", "noData1");
noDataFileSlice.addLogFile(new HoodieLogFile(new Path(
- FSUtils.makeLogFileName("noData1", ".log", "000", 1))));
+ FSUtils.makeLogFileName("noData1", ".log", "000", 1, TEST_WRITE_TOKEN))));
noDataFileSlice.addLogFile(new HoodieLogFile(new Path(
- FSUtils.makeLogFileName("noData1", ".log", "000", 2))));
+ FSUtils.makeLogFileName("noData1", ".log", "000", 2, TEST_WRITE_TOKEN))));
op = CompactionUtils.buildFromFileSlice(
DEFAULT_PARTITION_PATHS[0], noDataFileSlice, Optional.of(metricsCaptureFn));
testFileSliceCompactionOpEquality(noDataFileSlice, op, DEFAULT_PARTITION_PATHS[0]);
@@ -96,9 +98,9 @@ public class TestCompactionUtils {
FileSlice fileSlice = new FileSlice(DEFAULT_PARTITION_PATHS[0],"000", "noData1");
fileSlice.setDataFile(new TestHoodieDataFile("/tmp/noLog_1_000.parquet"));
fileSlice.addLogFile(new HoodieLogFile(new Path(
- FSUtils.makeLogFileName("noData1", ".log", "000", 1))));
+ FSUtils.makeLogFileName("noData1", ".log", "000", 1, TEST_WRITE_TOKEN))));
fileSlice.addLogFile(new HoodieLogFile(new Path(
- FSUtils.makeLogFileName("noData1", ".log", "000", 2))));
+ FSUtils.makeLogFileName("noData1", ".log", "000", 2, TEST_WRITE_TOKEN))));
op = CompactionUtils.buildFromFileSlice(
DEFAULT_PARTITION_PATHS[0], fileSlice, Optional.of(metricsCaptureFn));
testFileSliceCompactionOpEquality(fileSlice, op, DEFAULT_PARTITION_PATHS[0]);
@@ -112,16 +114,16 @@ public class TestCompactionUtils {
FileSlice fileSlice = new FileSlice(DEFAULT_PARTITION_PATHS[0],"000", "noData1");
fileSlice.setDataFile(new TestHoodieDataFile("/tmp/noLog_1_000.parquet"));
fileSlice.addLogFile(new HoodieLogFile(new Path(
- FSUtils.makeLogFileName("noData1", ".log", "000", 1))));
+ FSUtils.makeLogFileName("noData1", ".log", "000", 1, TEST_WRITE_TOKEN))));
fileSlice.addLogFile(new HoodieLogFile(new Path(
- FSUtils.makeLogFileName("noData1", ".log", "000", 2))));
+ FSUtils.makeLogFileName("noData1", ".log", "000", 2, TEST_WRITE_TOKEN))));
FileSlice noLogFileSlice = new FileSlice(DEFAULT_PARTITION_PATHS[0],"000", "noLog1");
noLogFileSlice.setDataFile(new TestHoodieDataFile("/tmp/noLog_1_000.parquet"));
FileSlice noDataFileSlice = new FileSlice(DEFAULT_PARTITION_PATHS[0],"000", "noData1");
noDataFileSlice.addLogFile(new HoodieLogFile(new Path(
- FSUtils.makeLogFileName("noData1", ".log", "000", 1))));
+ FSUtils.makeLogFileName("noData1", ".log", "000", 1, TEST_WRITE_TOKEN))));
noDataFileSlice.addLogFile(new HoodieLogFile(new Path(
- FSUtils.makeLogFileName("noData1", ".log", "000", 2))));
+ FSUtils.makeLogFileName("noData1", ".log", "000", 2, TEST_WRITE_TOKEN))));
List<FileSlice> fileSliceList = Arrays.asList(emptyFileSlice, noDataFileSlice, fileSlice, noLogFileSlice);
List<Pair<String, FileSlice>> input = fileSliceList.stream().map(f -> Pair.of(DEFAULT_PARTITION_PATHS[0], f))
.collect(Collectors.toList());
diff --git a/hoodie-common/src/test/java/com/uber/hoodie/common/util/TestFSUtils.java b/hoodie-common/src/test/java/com/uber/hoodie/common/util/TestFSUtils.java
index 8afcb90..5c3ba02 100644
--- a/hoodie-common/src/test/java/com/uber/hoodie/common/util/TestFSUtils.java
+++ b/hoodie-common/src/test/java/com/uber/hoodie/common/util/TestFSUtils.java
@@ -23,14 +23,18 @@ import com.uber.hoodie.common.model.HoodieTestUtils;
import java.text.SimpleDateFormat;
import java.util.Date;
import java.util.UUID;
+import java.util.regex.Pattern;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
+import org.junit.Assert;
import org.junit.Rule;
import org.junit.Test;
import org.junit.contrib.java.lang.system.EnvironmentVariables;
public class TestFSUtils {
+ private static String TEST_WRITE_TOKEN = "1-0-1";
+
@Rule
public final EnvironmentVariables environmentVariables = new EnvironmentVariables();
@@ -39,22 +43,8 @@ public class TestFSUtils {
String commitTime = new SimpleDateFormat("yyyyMMddHHmmss").format(new Date());
int taskPartitionId = 2;
String fileName = UUID.randomUUID().toString();
- assertTrue(FSUtils.makeDataFileName(commitTime, taskPartitionId, fileName)
- .equals(fileName + "_" + taskPartitionId + "_" + commitTime + ".parquet"));
- }
-
- @Test
- public void testMakeTempDataFileName() {
- String commitTime = new SimpleDateFormat("yyyyMMddHHmmss").format(new Date());
- String partitionPath = "2017/12/31";
- int taskPartitionId = Integer.MAX_VALUE;
- int stageId = Integer.MAX_VALUE;
- long taskAttemptId = Long.MAX_VALUE;
- String fileName = UUID.randomUUID().toString();
- assertTrue(
- FSUtils.makeTempDataFileName(partitionPath, commitTime, taskPartitionId, fileName, stageId, taskAttemptId)
- .equals(partitionPath.replace("/", "-") + "_" + fileName + "_" + taskPartitionId + "_" + commitTime + "_"
- + stageId + "_" + taskAttemptId + ".parquet"));
+ assertTrue(FSUtils.makeDataFileName(commitTime, TEST_WRITE_TOKEN, fileName)
+ .equals(fileName + "_" + TEST_WRITE_TOKEN + "_" + commitTime + ".parquet"));
}
@Test
@@ -70,7 +60,7 @@ public class TestFSUtils {
String commitTime = new SimpleDateFormat("yyyyMMddHHmmss").format(new Date());
int taskPartitionId = 2;
String fileName = UUID.randomUUID().toString();
- String fullFileName = FSUtils.makeDataFileName(commitTime, taskPartitionId, fileName);
+ String fullFileName = FSUtils.makeDataFileName(commitTime, TEST_WRITE_TOKEN, fileName);
assertTrue(FSUtils.getCommitTime(fullFileName).equals(commitTime));
}
@@ -79,7 +69,7 @@ public class TestFSUtils {
String commitTime = new SimpleDateFormat("yyyyMMddHHmmss").format(new Date());
int taskPartitionId = 2;
String fileName = UUID.randomUUID().toString();
- String fullFileName = FSUtils.makeDataFileName(commitTime, taskPartitionId, fileName);
+ String fullFileName = FSUtils.makeDataFileName(commitTime, TEST_WRITE_TOKEN, fileName);
assertTrue(FSUtils.getFileId(fullFileName).equals(fileName));
}
@@ -121,4 +111,47 @@ public class TestFSUtils {
Path partitionPath = new Path("/test/apache/apache/hudi");
assertEquals("apache/hudi", FSUtils.getRelativePartitionPath(basePath, partitionPath));
}
+
+ @Test
+ public void testOldLogFileName() {
+ // Check if old log file names are still parseable by FSUtils method
+ String partitionPath = "2019/01/01/";
+ String fileName = UUID.randomUUID().toString();
+ String oldLogFile = makeOldLogFileName(fileName, ".log", "100", 1);
+ Path rlPath = new Path(new Path(partitionPath), oldLogFile);
+ Assert.assertTrue(FSUtils.isLogFile(rlPath));
+ Assert.assertEquals(fileName, FSUtils.getFileIdFromLogPath(rlPath));
+ Assert.assertEquals("100", FSUtils.getBaseCommitTimeFromLogPath(rlPath));
+ Assert.assertEquals(1, FSUtils.getFileVersionFromLog(rlPath));
+ Assert.assertNull(FSUtils.getTaskPartitionIdFromLogPath(rlPath));
+ Assert.assertNull(FSUtils.getStageIdFromLogPath(rlPath));
+ Assert.assertNull(FSUtils.getTaskAttemptIdFromLogPath(rlPath));
+ Assert.assertNull(FSUtils.getWriteTokenFromLogPath(rlPath));
+ }
+
+ @Test
+ public void tesLogFileName() {
+ // Check if log file names are parseable by FSUtils method
+ String partitionPath = "2019/01/01/";
+ String fileName = UUID.randomUUID().toString();
+ String logFile = FSUtils.makeLogFileName(fileName, ".log", "100", 2, "1-0-1");
+ System.out.println("Log File =" + logFile);
+ Path rlPath = new Path(new Path(partitionPath), logFile);
+ Assert.assertTrue(FSUtils.isLogFile(rlPath));
+ Assert.assertEquals(fileName, FSUtils.getFileIdFromLogPath(rlPath));
+ Assert.assertEquals("100", FSUtils.getBaseCommitTimeFromLogPath(rlPath));
+ Assert.assertEquals(2, FSUtils.getFileVersionFromLog(rlPath));
+ Assert.assertEquals(new Integer(1), FSUtils.getTaskPartitionIdFromLogPath(rlPath));
+ Assert.assertEquals(new Integer(0), FSUtils.getStageIdFromLogPath(rlPath));
+ Assert.assertEquals(new Integer(1), FSUtils.getTaskAttemptIdFromLogPath(rlPath));
+
+ }
+
+ public static String makeOldLogFileName(String fileId, String logFileExtension,
+ String baseCommitTime, int version) {
+ Pattern oldLogFilePattern =
+ Pattern.compile("\\.(.*)_(.*)\\.(.*)\\.([0-9]*)(\\.([0-9]*))");
+ return "." + String
+ .format("%s_%s%s.%d", fileId, baseCommitTime, logFileExtension, version);
+ }
}
diff --git a/hoodie-hadoop-mr/src/main/java/com/uber/hoodie/hadoop/realtime/HoodieRealtimeFileSplit.java b/hoodie-hadoop-mr/src/main/java/com/uber/hoodie/hadoop/realtime/HoodieRealtimeFileSplit.java
index 36f6a54..9029c97 100644
--- a/hoodie-hadoop-mr/src/main/java/com/uber/hoodie/hadoop/realtime/HoodieRealtimeFileSplit.java
+++ b/hoodie-hadoop-mr/src/main/java/com/uber/hoodie/hadoop/realtime/HoodieRealtimeFileSplit.java
@@ -97,4 +97,14 @@ public class HoodieRealtimeFileSplit extends FileSplit {
deltaFilePaths.add(readString(in));
}
}
+
+ @Override
+ public String toString() {
+ return "HoodieRealtimeFileSplit{"
+ + "DataPath=" + getPath()
+ + ", deltaFilePaths=" + deltaFilePaths
+ + ", maxCommitTime='" + maxCommitTime + '\''
+ + ", basePath='" + basePath + '\''
+ + '}';
+ }
}
diff --git a/hoodie-hadoop-mr/src/test/java/com/uber/hoodie/hadoop/InputFormatTestUtil.java b/hoodie-hadoop-mr/src/test/java/com/uber/hoodie/hadoop/InputFormatTestUtil.java
index 351a306..b774458 100644
--- a/hoodie-hadoop-mr/src/test/java/com/uber/hoodie/hadoop/InputFormatTestUtil.java
+++ b/hoodie-hadoop-mr/src/test/java/com/uber/hoodie/hadoop/InputFormatTestUtil.java
@@ -36,6 +36,8 @@ import org.junit.rules.TemporaryFolder;
public class InputFormatTestUtil {
+ private static String TEST_WRITE_TOKEN = "1-0-1";
+
public static File prepareDataset(TemporaryFolder basePath, int numberOfFiles,
String commitNumber) throws IOException {
basePath.create();
@@ -43,7 +45,7 @@ public class InputFormatTestUtil {
File partitionPath = basePath.newFolder("2016", "05", "01");
for (int i = 0; i < numberOfFiles; i++) {
File dataFile = new File(partitionPath,
- FSUtils.makeDataFileName(commitNumber, 1, "fileid" + i));
+ FSUtils.makeDataFileName(commitNumber, TEST_WRITE_TOKEN, "fileid" + i));
dataFile.createNewFile();
}
return partitionPath;
@@ -65,7 +67,7 @@ public class InputFormatTestUtil {
.subList(0, Math.min(numberOfFilesUpdated, dataFiles.size()));
for (File file : toUpdateList) {
String fileId = FSUtils.getFileId(file.getName());
- File dataFile = new File(directory, FSUtils.makeDataFileName(newCommit, 1, fileId));
+ File dataFile = new File(directory, FSUtils.makeDataFileName(newCommit, TEST_WRITE_TOKEN, fileId));
dataFile.createNewFile();
}
}
@@ -117,7 +119,7 @@ public class InputFormatTestUtil {
throws IOException {
AvroParquetWriter parquetWriter;
for (int i = 0; i < numberOfFiles; i++) {
- String fileId = FSUtils.makeDataFileName(commitNumber, 1, "fileid" + i);
+ String fileId = FSUtils.makeDataFileName(commitNumber, TEST_WRITE_TOKEN, "fileid" + i);
File dataFile = new File(partitionPath, fileId);
parquetWriter = new AvroParquetWriter(new Path(dataFile.getAbsolutePath()), schema);
try {
@@ -149,7 +151,7 @@ public class InputFormatTestUtil {
}
})[0];
String fileId = FSUtils.getFileId(fileToUpdate.getName());
- File dataFile = new File(directory, FSUtils.makeDataFileName(newCommit, 1, fileId));
+ File dataFile = new File(directory, FSUtils.makeDataFileName(newCommit, TEST_WRITE_TOKEN, fileId));
AvroParquetWriter parquetWriter = new AvroParquetWriter(new Path(dataFile.getAbsolutePath()),
schema);
try {
diff --git a/hoodie-hadoop-mr/src/test/java/com/uber/hoodie/hadoop/realtime/HoodieRealtimeRecordReaderTest.java b/hoodie-hadoop-mr/src/test/java/com/uber/hoodie/hadoop/realtime/HoodieRealtimeRecordReaderTest.java
index fbd635e..d1ed214 100644
--- a/hoodie-hadoop-mr/src/test/java/com/uber/hoodie/hadoop/realtime/HoodieRealtimeRecordReaderTest.java
+++ b/hoodie-hadoop-mr/src/test/java/com/uber/hoodie/hadoop/realtime/HoodieRealtimeRecordReaderTest.java
@@ -141,7 +141,7 @@ public class HoodieRealtimeRecordReaderTest {
//create a split with baseFile (parquet file written earlier) and new log file(s)
String logFilePath = writer.getLogFile().getPath().toString();
HoodieRealtimeFileSplit split = new HoodieRealtimeFileSplit(
- new FileSplit(new Path(partitionDir + "/fileid0_1_" + commitTime + ".parquet"), 0, 1,
+ new FileSplit(new Path(partitionDir + "/fileid0_1-0-1_" + commitTime + ".parquet"), 0, 1,
jobConf), basePath.getRoot().getPath(), Arrays.asList(logFilePath), newCommitTime);
//create a RecordReader to be used by HoodieRealtimeRecordReader
@@ -203,7 +203,7 @@ public class HoodieRealtimeRecordReaderTest {
//create a split with baseFile (parquet file written earlier) and new log file(s)
String logFilePath = writer.getLogFile().getPath().toString();
HoodieRealtimeFileSplit split = new HoodieRealtimeFileSplit(
- new FileSplit(new Path(partitionDir + "/fileid0_1_" + commitTime + ".parquet"), 0, 1,
+ new FileSplit(new Path(partitionDir + "/fileid0_1-0-1_" + commitTime + ".parquet"), 0, 1,
jobConf), basePath.getRoot().getPath(), Arrays.asList(logFilePath), newCommitTime);
//create a RecordReader to be used by HoodieRealtimeRecordReader
@@ -286,7 +286,7 @@ public class HoodieRealtimeRecordReaderTest {
//create a split with baseFile (parquet file written earlier) and new log file(s)
String logFilePath = writer.getLogFile().getPath().toString();
HoodieRealtimeFileSplit split = new HoodieRealtimeFileSplit(
- new FileSplit(new Path(partitionDir + "/fileid0_1_" + commitTime + ".parquet"), 0, 1,
+ new FileSplit(new Path(partitionDir + "/fileid0_1-0-1_" + commitTime + ".parquet"), 0, 1,
jobConf), basePath.getRoot().getPath(), Arrays.asList(logFilePath), newCommitTime);
//create a RecordReader to be used by HoodieRealtimeRecordReader
diff --git a/hoodie-hive/src/test/java/com/uber/hoodie/hive/TestUtil.java b/hoodie-hive/src/test/java/com/uber/hoodie/hive/TestUtil.java
index 7093bcf..d9d3461 100644
--- a/hoodie-hive/src/test/java/com/uber/hoodie/hive/TestUtil.java
+++ b/hoodie-hive/src/test/java/com/uber/hoodie/hive/TestUtil.java
@@ -16,7 +16,6 @@
package com.uber.hoodie.hive;
-import static com.uber.hoodie.common.model.HoodieTestUtils.DEFAULT_TASK_PARTITIONID;
import static org.junit.Assert.fail;
import com.google.common.collect.Lists;
@@ -260,7 +259,7 @@ public class TestUtil {
// Create 5 files
String fileId = UUID.randomUUID().toString();
Path filePath = new Path(partPath.toString() + "/" + FSUtils.makeDataFileName(commitTime,
- DEFAULT_TASK_PARTITIONID, fileId));
+ "1-0-1", fileId));
generateParquetData(filePath, isParquetSchemaSimple);
HoodieWriteStat writeStat = new HoodieWriteStat();
writeStat.setFileId(fileId);
diff --git a/hoodie-timeline-service/pom.xml b/hoodie-timeline-service/pom.xml
index 9f26109..0202d85 100644
--- a/hoodie-timeline-service/pom.xml
+++ b/hoodie-timeline-service/pom.xml
@@ -119,7 +119,7 @@
<dependency>
<groupId>io.javalin</groupId>
<artifactId>javalin</artifactId>
- <version>2.4.0</version>
+ <version>2.8.0</version>
</dependency>
<dependency>
<groupId>org.apache.httpcomponents</groupId>
diff --git a/hoodie-utilities/src/main/java/com/uber/hoodie/utilities/schema/RowBasedSchemaProvider.java b/hoodie-utilities/src/main/java/com/uber/hoodie/utilities/schema/RowBasedSchemaProvider.java
index 0a9d7c6..6592eb6 100644
--- a/hoodie-utilities/src/main/java/com/uber/hoodie/utilities/schema/RowBasedSchemaProvider.java
+++ b/hoodie-utilities/src/main/java/com/uber/hoodie/utilities/schema/RowBasedSchemaProvider.java
@@ -20,6 +20,6 @@ public class RowBasedSchemaProvider extends SchemaProvider {
@Override
public Schema getSourceSchema() {
return AvroConversionUtils.convertStructTypeToAvroSchema(rowStruct, HOODIE_RECORD_STRUCT_NAME,
- HOODIE_RECORD_NAMESPACE);
+ HOODIE_RECORD_NAMESPACE);
}
}
diff --git a/hoodie-utilities/src/test/java/com/uber/hoodie/utilities/TestHoodieSnapshotCopier.java b/hoodie-utilities/src/test/java/com/uber/hoodie/utilities/TestHoodieSnapshotCopier.java
index 45344da..86c2d38 100644
--- a/hoodie-utilities/src/test/java/com/uber/hoodie/utilities/TestHoodieSnapshotCopier.java
+++ b/hoodie-utilities/src/test/java/com/uber/hoodie/utilities/TestHoodieSnapshotCopier.java
@@ -37,6 +37,8 @@ import org.junit.rules.TemporaryFolder;
public class TestHoodieSnapshotCopier {
+ private static String TEST_WRITE_TOKEN = "1-0-1";
+
private String rootPath = null;
private String basePath = null;
private String outputPath = null;
@@ -102,35 +104,35 @@ public class TestHoodieSnapshotCopier {
basePath);
// Make commit1
File file11 = new File(
- basePath + "/2016/05/01/" + FSUtils.makeDataFileName(commitTime1, 1, "id11"));
+ basePath + "/2016/05/01/" + FSUtils.makeDataFileName(commitTime1, TEST_WRITE_TOKEN, "id11"));
file11.createNewFile();
File file12 = new File(
- basePath + "/2016/05/02/" + FSUtils.makeDataFileName(commitTime1, 1, "id12"));
+ basePath + "/2016/05/02/" + FSUtils.makeDataFileName(commitTime1, TEST_WRITE_TOKEN, "id12"));
file12.createNewFile();
File file13 = new File(
- basePath + "/2016/05/06/" + FSUtils.makeDataFileName(commitTime1, 1, "id13"));
+ basePath + "/2016/05/06/" + FSUtils.makeDataFileName(commitTime1, TEST_WRITE_TOKEN, "id13"));
file13.createNewFile();
// Make commit2
File file21 = new File(
- basePath + "/2016/05/01/" + FSUtils.makeDataFileName(commitTime2, 1, "id21"));
+ basePath + "/2016/05/01/" + FSUtils.makeDataFileName(commitTime2, TEST_WRITE_TOKEN, "id21"));
file21.createNewFile();
File file22 = new File(
- basePath + "/2016/05/02/" + FSUtils.makeDataFileName(commitTime2, 1, "id22"));
+ basePath + "/2016/05/02/" + FSUtils.makeDataFileName(commitTime2, TEST_WRITE_TOKEN, "id22"));
file22.createNewFile();
File file23 = new File(
- basePath + "/2016/05/06/" + FSUtils.makeDataFileName(commitTime2, 1, "id23"));
+ basePath + "/2016/05/06/" + FSUtils.makeDataFileName(commitTime2, TEST_WRITE_TOKEN, "id23"));
file23.createNewFile();
// Make commit3
File file31 = new File(
- basePath + "/2016/05/01/" + FSUtils.makeDataFileName(commitTime3, 1, "id31"));
+ basePath + "/2016/05/01/" + FSUtils.makeDataFileName(commitTime3, TEST_WRITE_TOKEN, "id31"));
file31.createNewFile();
File file32 = new File(
- basePath + "/2016/05/02/" + FSUtils.makeDataFileName(commitTime3, 1, "id32"));
+ basePath + "/2016/05/02/" + FSUtils.makeDataFileName(commitTime3, TEST_WRITE_TOKEN, "id32"));
file32.createNewFile();
File file33 = new File(
- basePath + "/2016/05/06/" + FSUtils.makeDataFileName(commitTime3, 1, "id33"));
+ basePath + "/2016/05/06/" + FSUtils.makeDataFileName(commitTime3, TEST_WRITE_TOKEN, "id33"));
file33.createNewFile();
// Do a snapshot copy