You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by st...@apache.org on 2019/06/16 16:06:38 UTC
[hadoop] branch trunk updated: HADOOP-16279. S3Guard: Implement
time-based (TTL) expiry for entries (and tombstones).
This is an automated email from the ASF dual-hosted git repository.
stevel pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/hadoop.git
The following commit(s) were added to refs/heads/trunk by this push:
new f9cc9e1 HADOOP-16279. S3Guard: Implement time-based (TTL) expiry for entries (and tombstones).
f9cc9e1 is described below
commit f9cc9e162175444efe9d5b07ecb9a795f750ca3c
Author: Gabor Bota <ga...@cloudera.com>
AuthorDate: Sun Jun 16 17:05:01 2019 +0100
HADOOP-16279. S3Guard: Implement time-based (TTL) expiry for entries (and tombstones).
Contributed by Gabor Bota.
Change-Id: I73a2d2861901dedfe7a0e783b310fbb95e7c1af9
---
.../src/main/resources/core-default.xml | 8 +-
.../java/org/apache/hadoop/fs/s3a/Constants.java | 12 +-
.../java/org/apache/hadoop/fs/s3a/Listing.java | 2 +-
.../org/apache/hadoop/fs/s3a/S3AFileSystem.java | 39 ++-
.../fs/s3a/s3guard/DynamoDBMetadataStore.java | 99 ++++--
.../hadoop/fs/s3a/s3guard/ITtlTimeProvider.java | 34 ++
.../hadoop/fs/s3a/s3guard/LocalMetadataStore.java | 111 ++++---
.../hadoop/fs/s3a/s3guard/MetadataStore.java | 87 +++--
.../hadoop/fs/s3a/s3guard/NullMetadataStore.java | 13 +-
.../org/apache/hadoop/fs/s3a/s3guard/S3Guard.java | 137 ++++++--
.../apache/hadoop/fs/s3a/s3guard/S3GuardTool.java | 9 +-
.../src/site/markdown/tools/hadoop-aws/s3guard.md | 4 +-
.../fs/s3a/ITestS3GuardOutOfBandOperations.java | 349 ++++++++++++++++++++-
.../org/apache/hadoop/fs/s3a/ITestS3GuardTtl.java | 194 +++++++++++-
.../s3a/s3guard/AbstractS3GuardToolTestBase.java | 2 +-
.../fs/s3a/s3guard/ITestDynamoDBMetadataStore.java | 10 +-
.../s3guard/ITestDynamoDBMetadataStoreScale.java | 5 +-
.../fs/s3a/s3guard/MetadataStoreTestBase.java | 127 +++++++-
.../fs/s3a/s3guard/TestLocalMetadataStore.java | 7 +-
.../fs/s3a/s3guard/TestNullMetadataStore.java | 5 +
.../apache/hadoop/fs/s3a/s3guard/TestS3Guard.java | 195 +++++++++++-
.../scale/AbstractITestS3AMetadataStoreScale.java | 14 +-
22 files changed, 1287 insertions(+), 176 deletions(-)
diff --git a/hadoop-common-project/hadoop-common/src/main/resources/core-default.xml b/hadoop-common-project/hadoop-common/src/main/resources/core-default.xml
index b5056d1..7ffc2ad 100644
--- a/hadoop-common-project/hadoop-common/src/main/resources/core-default.xml
+++ b/hadoop-common-project/hadoop-common/src/main/resources/core-default.xml
@@ -1502,12 +1502,10 @@
</property>
<property>
- <name>fs.s3a.metadatastore.authoritative.dir.ttl</name>
- <value>3600000</value>
+ <name>fs.s3a.metadatastore.metadata.ttl</name>
+ <value>15m</value>
<description>
- This value sets how long a directory listing in the MS is considered as
- authoritative. The value is in milliseconds.
- MetadataStore should be authoritative to use this configuration knob.
+ This value sets how long an entry in a MetadataStore is valid.
</description>
</property>
diff --git a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/Constants.java b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/Constants.java
index a8dc161..7334506 100644
--- a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/Constants.java
+++ b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/Constants.java
@@ -353,10 +353,14 @@ public final class Constants {
/**
* How long a directory listing in the MS is considered as authoritative.
*/
- public static final String METADATASTORE_AUTHORITATIVE_DIR_TTL =
- "fs.s3a.metadatastore.authoritative.dir.ttl";
- public static final long DEFAULT_METADATASTORE_AUTHORITATIVE_DIR_TTL =
- TimeUnit.MINUTES.toMillis(60);
+ public static final String METADATASTORE_METADATA_TTL =
+ "fs.s3a.metadatastore.metadata.ttl";
+
+ /**
+ * Default TTL in milliseconds: 15 minutes.
+ */
+ public static final long DEFAULT_METADATASTORE_METADATA_TTL =
+ TimeUnit.MINUTES.toMillis(15);
/** read ahead buffer size to prevent connection re-establishments. */
public static final String READAHEAD_RANGE = "fs.s3a.readahead.range";
diff --git a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/Listing.java b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/Listing.java
index 0f8c52b..b62c456 100644
--- a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/Listing.java
+++ b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/Listing.java
@@ -464,7 +464,7 @@ public class Listing {
if (acceptor.accept(keyPath, summary) && filter.accept(keyPath)) {
S3AFileStatus status = createFileStatus(keyPath, summary,
owner.getDefaultBlockSize(keyPath), owner.getUsername(),
- null, null);
+ summary.getETag(), null);
LOG.debug("Adding: {}", status);
stats.add(status);
added++;
diff --git a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AFileSystem.java b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AFileSystem.java
index e6850e9..4bd58d5 100644
--- a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AFileSystem.java
+++ b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AFileSystem.java
@@ -126,6 +126,7 @@ import org.apache.hadoop.fs.s3a.s3guard.MetadataStoreListFilesIterator;
import org.apache.hadoop.fs.s3a.s3guard.MetadataStore;
import org.apache.hadoop.fs.s3a.s3guard.PathMetadata;
import org.apache.hadoop.fs.s3a.s3guard.S3Guard;
+import org.apache.hadoop.fs.s3a.s3guard.ITtlTimeProvider;
import org.apache.hadoop.fs.s3native.S3xLoginHelper;
import org.apache.hadoop.io.retry.RetryPolicies;
import org.apache.hadoop.fs.store.EtagChecksum;
@@ -244,7 +245,7 @@ public class S3AFileSystem extends FileSystem implements StreamCapabilities,
private AWSCredentialProviderList credentials;
- private S3Guard.ITtlTimeProvider ttlTimeProvider;
+ private ITtlTimeProvider ttlTimeProvider;
/** Add any deprecated keys. */
@SuppressWarnings("deprecation")
@@ -388,9 +389,11 @@ public class S3AFileSystem extends FileSystem implements StreamCapabilities,
getMetadataStore(), allowAuthoritative);
}
initMultipartUploads(conf);
- long authDirTtl = conf.getLong(METADATASTORE_AUTHORITATIVE_DIR_TTL,
- DEFAULT_METADATASTORE_AUTHORITATIVE_DIR_TTL);
- ttlTimeProvider = new S3Guard.TtlTimeProvider(authDirTtl);
+ if (hasMetadataStore()) {
+ long authDirTtl = conf.getTimeDuration(METADATASTORE_METADATA_TTL,
+ DEFAULT_METADATASTORE_METADATA_TTL, TimeUnit.MILLISECONDS);
+ ttlTimeProvider = new S3Guard.TtlTimeProvider(authDirTtl);
+ }
} catch (AmazonClientException e) {
throw translateException("initializing ", new Path(name), e);
}
@@ -1341,7 +1344,7 @@ public class S3AFileSystem extends FileSystem implements StreamCapabilities,
}
}
- metadataStore.move(srcPaths, dstMetas);
+ metadataStore.move(srcPaths, dstMetas, ttlTimeProvider);
if (!src.getParent().equals(dst.getParent())) {
LOG.debug("source & dest parents are different; fix up dir markers");
@@ -1722,7 +1725,7 @@ public class S3AFileSystem extends FileSystem implements StreamCapabilities,
instrumentation.directoryDeleted();
}
deleteObject(key);
- metadataStore.delete(f);
+ metadataStore.delete(f, ttlTimeProvider);
}
/**
@@ -2143,7 +2146,7 @@ public class S3AFileSystem extends FileSystem implements StreamCapabilities,
}
}
}
- metadataStore.deleteSubtree(f);
+ metadataStore.deleteSubtree(f, ttlTimeProvider);
} else {
LOG.debug("delete: Path is a file");
deleteObjectAtPath(f, key, true);
@@ -2466,7 +2469,10 @@ public class S3AFileSystem extends FileSystem implements StreamCapabilities,
LOG.debug("Getting path status for {} ({})", path, key);
// Check MetadataStore, if any.
- PathMetadata pm = metadataStore.get(path, needEmptyDirectoryFlag);
+ PathMetadata pm = null;
+ if (hasMetadataStore()) {
+ pm = S3Guard.getWithTtl(metadataStore, path, ttlTimeProvider);
+ }
Set<Path> tombstones = Collections.emptySet();
if (pm != null) {
if (pm.isDeleted()) {
@@ -2501,7 +2507,7 @@ public class S3AFileSystem extends FileSystem implements StreamCapabilities,
LOG.debug("S3Guard metadata for {} is outdated, updating it",
path);
return S3Guard.putAndReturn(metadataStore, s3AFileStatus,
- instrumentation);
+ instrumentation, ttlTimeProvider);
}
}
}
@@ -2534,12 +2540,14 @@ public class S3AFileSystem extends FileSystem implements StreamCapabilities,
null, null);
}
// entry was found, save in S3Guard
- return S3Guard.putAndReturn(metadataStore, s3FileStatus, instrumentation);
+ return S3Guard.putAndReturn(metadataStore, s3FileStatus,
+ instrumentation, ttlTimeProvider);
} else {
// there was no entry in S3Guard
// retrieve the data and update the metadata store in the process.
return S3Guard.putAndReturn(metadataStore,
- s3GetFileStatus(path, key, tombstones), instrumentation);
+ s3GetFileStatus(path, key, tombstones), instrumentation,
+ ttlTimeProvider);
}
}
@@ -3191,11 +3199,12 @@ public class S3AFileSystem extends FileSystem implements StreamCapabilities,
// See note about failure semantics in S3Guard documentation
try {
if (hasMetadataStore()) {
- S3Guard.addAncestors(metadataStore, p, username);
+ S3Guard.addAncestors(metadataStore, p, username, ttlTimeProvider);
S3AFileStatus status = createUploadFileStatus(p,
S3AUtils.objectRepresentsDirectory(key, length), length,
getDefaultBlockSize(p), username, eTag, versionId);
- S3Guard.putAndReturn(metadataStore, status, instrumentation);
+ S3Guard.putAndReturn(metadataStore, status, instrumentation,
+ ttlTimeProvider);
}
} catch (IOException e) {
if (failOnMetadataWriteError) {
@@ -3860,12 +3869,12 @@ public class S3AFileSystem extends FileSystem implements StreamCapabilities,
}
@VisibleForTesting
- protected S3Guard.ITtlTimeProvider getTtlTimeProvider() {
+ public ITtlTimeProvider getTtlTimeProvider() {
return ttlTimeProvider;
}
@VisibleForTesting
- protected void setTtlTimeProvider(S3Guard.ITtlTimeProvider ttlTimeProvider) {
+ protected void setTtlTimeProvider(ITtlTimeProvider ttlTimeProvider) {
this.ttlTimeProvider = ttlTimeProvider;
}
diff --git a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/s3guard/DynamoDBMetadataStore.java b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/s3guard/DynamoDBMetadataStore.java
index fa1a203..f668c6a 100644
--- a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/s3guard/DynamoDBMetadataStore.java
+++ b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/s3guard/DynamoDBMetadataStore.java
@@ -189,8 +189,10 @@ import static org.apache.hadoop.fs.s3a.s3guard.S3Guard.*;
* directory helps prevent unnecessary queries during traversal of an entire
* sub-tree.
*
- * Some mutating operations, notably {@link #deleteSubtree(Path)} and
- * {@link #move(Collection, Collection)}, are less efficient with this schema.
+ * Some mutating operations, notably
+ * {@link MetadataStore#deleteSubtree(Path, ITtlTimeProvider)} and
+ * {@link MetadataStore#move(Collection, Collection, ITtlTimeProvider)},
+ * are less efficient with this schema.
* They require mutating multiple items in the DynamoDB table.
*
* By default, DynamoDB access is performed within the same AWS region as
@@ -471,14 +473,15 @@ public class DynamoDBMetadataStore implements MetadataStore,
@Override
@Retries.RetryTranslated
- public void delete(Path path) throws IOException {
- innerDelete(path, true);
+ public void delete(Path path, ITtlTimeProvider ttlTimeProvider)
+ throws IOException {
+ innerDelete(path, true, ttlTimeProvider);
}
@Override
@Retries.RetryTranslated
public void forgetMetadata(Path path) throws IOException {
- innerDelete(path, false);
+ innerDelete(path, false, null);
}
/**
@@ -487,10 +490,13 @@ public class DynamoDBMetadataStore implements MetadataStore,
* There is no check as to whether the entry exists in the table first.
* @param path path to delete
* @param tombstone flag to create a tombstone marker
+ * @param ttlTimeProvider The time provider to set last_updated. Must not
+ * be null if tombstone is true.
* @throws IOException I/O error.
*/
@Retries.RetryTranslated
- private void innerDelete(final Path path, boolean tombstone)
+ private void innerDelete(final Path path, boolean tombstone,
+ ITtlTimeProvider ttlTimeProvider)
throws IOException {
checkPath(path);
LOG.debug("Deleting from table {} in region {}: {}",
@@ -505,8 +511,13 @@ public class DynamoDBMetadataStore implements MetadataStore,
// on that of S3A itself
boolean idempotent = S3AFileSystem.DELETE_CONSIDERED_IDEMPOTENT;
if (tombstone) {
+ Preconditions.checkArgument(ttlTimeProvider != null, "ttlTimeProvider "
+ + "must not be null");
+ final PathMetadata pmTombstone = PathMetadata.tombstone(path);
+ // update the last updated field of record when putting a tombstone
+ pmTombstone.setLastUpdated(ttlTimeProvider.getNow());
Item item = PathMetadataDynamoDBTranslation.pathMetadataToItem(
- new DDBPathMetadata(PathMetadata.tombstone(path)));
+ new DDBPathMetadata(pmTombstone));
writeOp.retry(
"Put tombstone",
path.toString(),
@@ -524,7 +535,8 @@ public class DynamoDBMetadataStore implements MetadataStore,
@Override
@Retries.RetryTranslated
- public void deleteSubtree(Path path) throws IOException {
+ public void deleteSubtree(Path path, ITtlTimeProvider ttlTimeProvider)
+ throws IOException {
checkPath(path);
LOG.debug("Deleting subtree from table {} in region {}: {}",
tableName, region, path);
@@ -537,7 +549,7 @@ public class DynamoDBMetadataStore implements MetadataStore,
for (DescendantsIterator desc = new DescendantsIterator(this, meta);
desc.hasNext();) {
- innerDelete(desc.next().getPath(), true);
+ innerDelete(desc.next().getPath(), true, ttlTimeProvider);
}
}
@@ -731,7 +743,8 @@ public class DynamoDBMetadataStore implements MetadataStore,
@Override
@Retries.RetryTranslated
public void move(Collection<Path> pathsToDelete,
- Collection<PathMetadata> pathsToCreate) throws IOException {
+ Collection<PathMetadata> pathsToCreate, ITtlTimeProvider ttlTimeProvider)
+ throws IOException {
if (pathsToDelete == null && pathsToCreate == null) {
return;
}
@@ -754,7 +767,11 @@ public class DynamoDBMetadataStore implements MetadataStore,
}
if (pathsToDelete != null) {
for (Path meta : pathsToDelete) {
- newItems.add(new DDBPathMetadata(PathMetadata.tombstone(meta)));
+ Preconditions.checkArgument(ttlTimeProvider != null, "ttlTimeProvider"
+ + " must not be null");
+ final PathMetadata pmTombstone = PathMetadata.tombstone(meta);
+ pmTombstone.setLastUpdated(ttlTimeProvider.getNow());
+ newItems.add(new DDBPathMetadata(pmTombstone));
}
}
@@ -1024,14 +1041,37 @@ public class DynamoDBMetadataStore implements MetadataStore,
}
@Retries.RetryTranslated
- private ItemCollection<ScanOutcome> expiredFiles(long modTime,
- String keyPrefix) throws IOException {
- String filterExpression =
- "mod_time < :mod_time and begins_with(parent, :parent)";
- String projectionExpression = "parent,child";
- ValueMap map = new ValueMap()
- .withLong(":mod_time", modTime)
- .withString(":parent", keyPrefix);
+ private ItemCollection<ScanOutcome> expiredFiles(PruneMode pruneMode,
+ long cutoff, String keyPrefix) throws IOException {
+
+ String filterExpression;
+ String projectionExpression;
+ ValueMap map;
+
+ switch (pruneMode) {
+ case ALL_BY_MODTIME:
+ filterExpression =
+ "mod_time < :mod_time and begins_with(parent, :parent)";
+ projectionExpression = "parent,child";
+ map = new ValueMap()
+ .withLong(":mod_time", cutoff)
+ .withString(":parent", keyPrefix);
+ break;
+ case TOMBSTONES_BY_LASTUPDATED:
+ filterExpression =
+ "last_updated < :last_updated and begins_with(parent, :parent) "
+ + "and is_deleted = :is_deleted";
+ projectionExpression = "parent,child";
+ map = new ValueMap()
+ .withLong(":last_updated", cutoff)
+ .withString(":parent", keyPrefix)
+ .withBoolean(":is_deleted", true);
+ break;
+ default:
+ throw new UnsupportedOperationException("Unsupported prune mode: "
+ + pruneMode);
+ }
+
return readOp.retry(
"scan",
keyPrefix,
@@ -1041,20 +1081,31 @@ public class DynamoDBMetadataStore implements MetadataStore,
@Override
@Retries.RetryTranslated
- public void prune(long modTime) throws IOException {
- prune(modTime, "/");
+ public void prune(PruneMode pruneMode, long cutoff) throws IOException {
+ prune(pruneMode, cutoff, "/");
}
/**
* Prune files, in batches. There's a sleep between each batch.
- * @param modTime Oldest modification time to allow
+ *
+ * @param pruneMode The mode of operation for the prune For details see
+ * {@link MetadataStore#prune(PruneMode, long)}
+ * @param cutoff Oldest modification time to allow
* @param keyPrefix The prefix for the keys that should be removed
* @throws IOException Any IO/DDB failure.
* @throws InterruptedIOException if the prune was interrupted
*/
@Override
@Retries.RetryTranslated
- public void prune(long modTime, String keyPrefix) throws IOException {
+ public void prune(PruneMode pruneMode, long cutoff, String keyPrefix)
+ throws IOException {
+ final ItemCollection<ScanOutcome> items =
+ expiredFiles(pruneMode, cutoff, keyPrefix);
+ innerPrune(items);
+ }
+
+ private void innerPrune(ItemCollection<ScanOutcome> items)
+ throws IOException {
int itemCount = 0;
try {
Collection<Path> deletionBatch =
@@ -1064,7 +1115,7 @@ public class DynamoDBMetadataStore implements MetadataStore,
S3GUARD_DDB_BACKGROUND_SLEEP_MSEC_DEFAULT,
TimeUnit.MILLISECONDS);
Set<Path> parentPathSet = new HashSet<>();
- for (Item item : expiredFiles(modTime, keyPrefix)) {
+ for (Item item : items) {
DDBPathMetadata md = PathMetadataDynamoDBTranslation
.itemToPathMetadata(item, username);
Path path = md.getFileStatus().getPath();
diff --git a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/s3guard/ITtlTimeProvider.java b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/s3guard/ITtlTimeProvider.java
new file mode 100644
index 0000000..daee621
--- /dev/null
+++ b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/s3guard/ITtlTimeProvider.java
@@ -0,0 +1,34 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.fs.s3a.s3guard;
+
+/**
+ * This interface is defined for handling TTL expiry of metadata in S3Guard.
+ *
+ * TTL can be tested by implementing this interface and setting is as
+ * {@code S3Guard.ttlTimeProvider}. By doing this, getNow() can return any
+ * value preferred and flaky tests could be avoided. By default getNow()
+ * returns the EPOCH in runtime.
+ *
+ * Time is measured in milliseconds,
+ */
+public interface ITtlTimeProvider {
+ long getNow();
+ long getMetadataTtl();
+}
diff --git a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/s3guard/LocalMetadataStore.java b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/s3guard/LocalMetadataStore.java
index 9276388..6c13cd1 100644
--- a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/s3guard/LocalMetadataStore.java
+++ b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/s3guard/LocalMetadataStore.java
@@ -112,32 +112,34 @@ public class LocalMetadataStore implements MetadataStore {
}
@Override
- public void delete(Path p) throws IOException {
- doDelete(p, false, true);
+ public void delete(Path p, ITtlTimeProvider ttlTimeProvider)
+ throws IOException {
+ doDelete(p, false, true, ttlTimeProvider);
}
@Override
public void forgetMetadata(Path p) throws IOException {
- doDelete(p, false, false);
+ doDelete(p, false, false, null);
}
@Override
- public void deleteSubtree(Path path) throws IOException {
- doDelete(path, true, true);
+ public void deleteSubtree(Path path, ITtlTimeProvider ttlTimeProvider)
+ throws IOException {
+ doDelete(path, true, true, ttlTimeProvider);
}
- private synchronized void doDelete(Path p, boolean recursive, boolean
- tombstone) {
+ private synchronized void doDelete(Path p, boolean recursive,
+ boolean tombstone, ITtlTimeProvider ttlTimeProvider) {
Path path = standardize(p);
// Delete entry from file cache, then from cached parent directory, if any
- deleteCacheEntries(path, tombstone);
+ deleteCacheEntries(path, tombstone, ttlTimeProvider);
if (recursive) {
// Remove all entries that have this dir as path prefix.
- deleteEntryByAncestor(path, localCache, tombstone);
+ deleteEntryByAncestor(path, localCache, tombstone, ttlTimeProvider);
}
}
@@ -191,7 +193,8 @@ public class LocalMetadataStore implements MetadataStore {
@Override
public void move(Collection<Path> pathsToDelete,
- Collection<PathMetadata> pathsToCreate) throws IOException {
+ Collection<PathMetadata> pathsToCreate,
+ ITtlTimeProvider ttlTimeProvider) throws IOException {
LOG.info("Move {} to {}", pathsToDelete, pathsToCreate);
Preconditions.checkNotNull(pathsToDelete, "pathsToDelete is null");
@@ -205,7 +208,7 @@ public class LocalMetadataStore implements MetadataStore {
// 1. Delete pathsToDelete
for (Path meta : pathsToDelete) {
LOG.debug("move: deleting metadata {}", meta);
- delete(meta);
+ delete(meta, ttlTimeProvider);
}
// 2. Create new destination path metadata
@@ -332,18 +335,19 @@ public class LocalMetadataStore implements MetadataStore {
}
@Override
- public void prune(long modTime) throws IOException{
- prune(modTime, "");
+ public void prune(PruneMode pruneMode, long cutoff) throws IOException{
+ prune(pruneMode, cutoff, "");
}
@Override
- public synchronized void prune(long modTime, String keyPrefix) {
+ public synchronized void prune(PruneMode pruneMode, long cutoff,
+ String keyPrefix) {
// prune files
// filter path_metadata (files), filter expired, remove expired
localCache.asMap().entrySet().stream()
.filter(entry -> entry.getValue().hasPathMeta())
- .filter(entry -> expired(
- entry.getValue().getFileMeta().getFileStatus(), modTime, keyPrefix))
+ .filter(entry -> expired(pruneMode,
+ entry.getValue().getFileMeta(), cutoff, keyPrefix))
.forEach(entry -> localCache.invalidate(entry.getKey()));
@@ -358,28 +362,37 @@ public class LocalMetadataStore implements MetadataStore {
Collection<PathMetadata> newChildren = new LinkedList<>();
for (PathMetadata child : oldChildren) {
- FileStatus status = child.getFileStatus();
- if (!expired(status, modTime, keyPrefix)) {
+ if (!expired(pruneMode, child, cutoff, keyPrefix)) {
newChildren.add(child);
}
}
- if (newChildren.size() != oldChildren.size()) {
- DirListingMetadata dlm =
- new DirListingMetadata(path, newChildren, false);
- localCache.put(path, new LocalMetadataEntry(dlm));
- if (!path.isRoot()) {
- DirListingMetadata parent = getDirListingMeta(path.getParent());
- if (parent != null) {
- parent.setAuthoritative(false);
- }
- }
- }
+ removeAuthoritativeFromParent(path, oldChildren, newChildren);
});
}
- private boolean expired(FileStatus status, long expiry, String keyPrefix) {
+ private void removeAuthoritativeFromParent(Path path,
+ Collection<PathMetadata> oldChildren,
+ Collection<PathMetadata> newChildren) {
+ if (newChildren.size() != oldChildren.size()) {
+ DirListingMetadata dlm =
+ new DirListingMetadata(path, newChildren, false);
+ localCache.put(path, new LocalMetadataEntry(dlm));
+ if (!path.isRoot()) {
+ DirListingMetadata parent = getDirListingMeta(path.getParent());
+ if (parent != null) {
+ parent.setAuthoritative(false);
+ }
+ }
+ }
+ }
+
+ private boolean expired(PruneMode pruneMode, PathMetadata metadata,
+ long cutoff, String keyPrefix) {
+ final S3AFileStatus status = metadata.getFileStatus();
+ final URI statusUri = status.getPath().toUri();
+
// remove the protocol from path string to be able to compare
- String bucket = status.getPath().toUri().getHost();
+ String bucket = statusUri.getHost();
String statusTranslatedPath = "";
if(bucket != null && !bucket.isEmpty()){
// if there's a bucket, (well defined host in Uri) the pathToParentKey
@@ -389,18 +402,33 @@ public class LocalMetadataStore implements MetadataStore {
} else {
// if there's no bucket in the path the pathToParentKey will fail, so
// this is the fallback to get the path from status
- statusTranslatedPath = status.getPath().toUri().getPath();
+ statusTranslatedPath = statusUri.getPath();
+ }
+
+ boolean expired;
+ switch (pruneMode) {
+ case ALL_BY_MODTIME:
+ // Note: S3 doesn't track modification time on directories, so for
+ // consistency with the DynamoDB implementation we ignore that here
+ expired = status.getModificationTime() < cutoff && !status.isDirectory()
+ && statusTranslatedPath.startsWith(keyPrefix);
+ break;
+ case TOMBSTONES_BY_LASTUPDATED:
+ expired = metadata.getLastUpdated() < cutoff && metadata.isDeleted()
+ && statusTranslatedPath.startsWith(keyPrefix);
+ break;
+ default:
+ throw new UnsupportedOperationException("Unsupported prune mode: "
+ + pruneMode);
}
- // Note: S3 doesn't track modification time on directories, so for
- // consistency with the DynamoDB implementation we ignore that here
- return status.getModificationTime() < expiry && !status.isDirectory()
- && statusTranslatedPath.startsWith(keyPrefix);
+ return expired;
}
@VisibleForTesting
static void deleteEntryByAncestor(Path ancestor,
- Cache<Path, LocalMetadataEntry> cache, boolean tombstone) {
+ Cache<Path, LocalMetadataEntry> cache, boolean tombstone,
+ ITtlTimeProvider ttlTimeProvider) {
cache.asMap().entrySet().stream()
.filter(entry -> isAncestorOf(ancestor, entry.getKey()))
@@ -410,7 +438,9 @@ public class LocalMetadataStore implements MetadataStore {
if(meta.hasDirMeta()){
cache.invalidate(path);
} else if(tombstone && meta.hasPathMeta()){
- meta.setPathMetadata(PathMetadata.tombstone(path));
+ final PathMetadata pmTombstone = PathMetadata.tombstone(path);
+ pmTombstone.setLastUpdated(ttlTimeProvider.getNow());
+ meta.setPathMetadata(pmTombstone);
} else {
cache.invalidate(path);
}
@@ -434,7 +464,8 @@ public class LocalMetadataStore implements MetadataStore {
* Update fileCache and dirCache to reflect deletion of file 'f'. Call with
* lock held.
*/
- private void deleteCacheEntries(Path path, boolean tombstone) {
+ private void deleteCacheEntries(Path path, boolean tombstone,
+ ITtlTimeProvider ttlTimeProvider) {
LocalMetadataEntry entry = localCache.getIfPresent(path);
// If there's no entry, delete should silently succeed
// (based on MetadataStoreTestBase#testDeleteNonExisting)
@@ -448,6 +479,7 @@ public class LocalMetadataStore implements MetadataStore {
if(entry.hasPathMeta()){
if (tombstone) {
PathMetadata pmd = PathMetadata.tombstone(path);
+ pmd.setLastUpdated(ttlTimeProvider.getNow());
entry.setPathMetadata(pmd);
} else {
entry.setPathMetadata(null);
@@ -474,6 +506,7 @@ public class LocalMetadataStore implements MetadataStore {
LOG.debug("removing parent's entry for {} ", path);
if (tombstone) {
dir.markDeleted(path);
+ dir.setLastUpdated(ttlTimeProvider.getNow());
} else {
dir.remove(path);
}
diff --git a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/s3guard/MetadataStore.java b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/s3guard/MetadataStore.java
index 746fd82..7875d43 100644
--- a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/s3guard/MetadataStore.java
+++ b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/s3guard/MetadataStore.java
@@ -63,16 +63,23 @@ public interface MetadataStore extends Closeable {
* Deletes exactly one path, leaving a tombstone to prevent lingering,
* inconsistent copies of it from being listed.
*
+ * Deleting an entry with a tombstone needs a
+ * {@link org.apache.hadoop.fs.s3a.s3guard.S3Guard.TtlTimeProvider} because
+ * the lastUpdated field of the record has to be updated to <pre>now</pre>.
+ *
* @param path the path to delete
+ * @param ttlTimeProvider the time provider to set last_updated. Must not
+ * be null.
* @throws IOException if there is an error
*/
- void delete(Path path) throws IOException;
+ void delete(Path path, ITtlTimeProvider ttlTimeProvider)
+ throws IOException;
/**
* Removes the record of exactly one path. Does not leave a tombstone (see
- * {@link MetadataStore#delete(Path)}. It is currently intended for testing
- * only, and a need to use it as part of normal FileSystem usage is not
- * anticipated.
+ * {@link MetadataStore#delete(Path, ITtlTimeProvider)}. It is currently
+ * intended for testing only, and a need to use it as part of normal
+ * FileSystem usage is not anticipated.
*
* @param path the path to delete
* @throws IOException if there is an error
@@ -88,10 +95,17 @@ public interface MetadataStore extends Closeable {
* implementations must also update any stored {@code DirListingMetadata}
* objects which track the parent of this file.
*
+ * Deleting a subtree with a tombstone needs a
+ * {@link org.apache.hadoop.fs.s3a.s3guard.S3Guard.TtlTimeProvider} because
+ * the lastUpdated field of all records have to be updated to <pre>now</pre>.
+ *
* @param path the root of the sub-tree to delete
+ * @param ttlTimeProvider the time provider to set last_updated. Must not
+ * be null.
* @throws IOException if there is an error
*/
- void deleteSubtree(Path path) throws IOException;
+ void deleteSubtree(Path path, ITtlTimeProvider ttlTimeProvider)
+ throws IOException;
/**
* Gets metadata for a path.
@@ -151,10 +165,13 @@ public interface MetadataStore extends Closeable {
* @param pathsToCreate Collection of all PathMetadata for the new paths
* that were created at the destination of the rename
* ().
+ * @param ttlTimeProvider the time provider to set last_updated. Must not
+ * be null.
* @throws IOException if there is an error
*/
void move(Collection<Path> pathsToDelete,
- Collection<PathMetadata> pathsToCreate) throws IOException;
+ Collection<PathMetadata> pathsToCreate,
+ ITtlTimeProvider ttlTimeProvider) throws IOException;
/**
* Saves metadata for exactly one path.
@@ -212,29 +229,54 @@ public interface MetadataStore extends Closeable {
void destroy() throws IOException;
/**
- * Clear any metadata older than a specified time from the repository.
- * Implementations MUST clear file metadata, and MAY clear directory metadata
- * (s3a itself does not track modification time for directories).
- * Implementations may also choose to throw UnsupportedOperationException
- * istead. Note that modification times should be in UTC, as returned by
- * System.currentTimeMillis at the time of modification.
+ * Prune method with two modes of operation:
+ * <ul>
+ * <li>
+ * {@link PruneMode#ALL_BY_MODTIME}
+ * Clear any metadata older than a specified mod_time from the store.
+ * Note that this modification time is the S3 modification time from the
+ * object's metadata - from the object store.
+ * Implementations MUST clear file metadata, and MAY clear directory
+ * metadata (s3a itself does not track modification time for directories).
+ * Implementations may also choose to throw UnsupportedOperationException
+ * instead. Note that modification times must be in UTC, as returned by
+ * System.currentTimeMillis at the time of modification.
+ * </li>
+ * </ul>
*
- * @param modTime Oldest modification time to allow
+ * <ul>
+ * <li>
+ * {@link PruneMode#TOMBSTONES_BY_LASTUPDATED}
+ * Clear any tombstone updated earlier than a specified time from the
+ * store. Note that this last_updated is the time when the metadata
+ * entry was last updated and maintained by the metadata store.
+ * Implementations MUST clear file metadata, and MAY clear directory
+ * metadata (s3a itself does not track modification time for directories).
+ * Implementations may also choose to throw UnsupportedOperationException
+ * instead. Note that last_updated must be in UTC, as returned by
+ * System.currentTimeMillis at the time of modification.
+ * </li>
+ * </ul>
+ *
+ * @param pruneMode
+ * @param cutoff Oldest time to allow (UTC)
* @throws IOException if there is an error
* @throws UnsupportedOperationException if not implemented
*/
- void prune(long modTime) throws IOException, UnsupportedOperationException;
+ void prune(PruneMode pruneMode, long cutoff) throws IOException,
+ UnsupportedOperationException;
/**
- * Same as {@link MetadataStore#prune(long)}, but with an additional
- * keyPrefix parameter to filter the pruned keys with a prefix.
+ * Same as {@link MetadataStore#prune(PruneMode, long)}, but with an
+ * additional keyPrefix parameter to filter the pruned keys with a prefix.
*
- * @param modTime Oldest modification time to allow
+ * @param pruneMode
+ * @param cutoff Oldest time to allow (UTC)
* @param keyPrefix The prefix for the keys that should be removed
* @throws IOException if there is an error
* @throws UnsupportedOperationException if not implemented
*/
- void prune(long modTime, String keyPrefix)
+ void prune(PruneMode pruneMode, long cutoff, String keyPrefix)
throws IOException, UnsupportedOperationException;
/**
@@ -252,4 +294,13 @@ public interface MetadataStore extends Closeable {
* @throws IOException if there is an error
*/
void updateParameters(Map<String, String> parameters) throws IOException;
+
+ /**
+ * Modes of operation for prune.
+ * For details see {@link MetadataStore#prune(PruneMode, long)}
+ */
+ enum PruneMode {
+ ALL_BY_MODTIME,
+ TOMBSTONES_BY_LASTUPDATED
+ }
}
diff --git a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/s3guard/NullMetadataStore.java b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/s3guard/NullMetadataStore.java
index 04704e7..1472ef1 100644
--- a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/s3guard/NullMetadataStore.java
+++ b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/s3guard/NullMetadataStore.java
@@ -47,7 +47,8 @@ public class NullMetadataStore implements MetadataStore {
}
@Override
- public void delete(Path path) throws IOException {
+ public void delete(Path path, ITtlTimeProvider ttlTimeProvider)
+ throws IOException {
}
@Override
@@ -55,7 +56,8 @@ public class NullMetadataStore implements MetadataStore {
}
@Override
- public void deleteSubtree(Path path) throws IOException {
+ public void deleteSubtree(Path path, ITtlTimeProvider ttlTimeProvider)
+ throws IOException {
}
@Override
@@ -76,7 +78,8 @@ public class NullMetadataStore implements MetadataStore {
@Override
public void move(Collection<Path> pathsToDelete,
- Collection<PathMetadata> pathsToCreate) throws IOException {
+ Collection<PathMetadata> pathsToCreate,
+ ITtlTimeProvider ttlTimeProvider) throws IOException {
}
@Override
@@ -96,11 +99,11 @@ public class NullMetadataStore implements MetadataStore {
}
@Override
- public void prune(long modTime) {
+ public void prune(PruneMode pruneMode, long cutoff) {
}
@Override
- public void prune(long modTime, String keyPrefix) {
+ public void prune(PruneMode pruneMode, long cutoff, String keyPrefix) {
}
@Override
diff --git a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/s3guard/S3Guard.java b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/s3guard/S3Guard.java
index 26c75e8..933a01c 100644
--- a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/s3guard/S3Guard.java
+++ b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/s3guard/S3Guard.java
@@ -25,9 +25,13 @@ import java.util.ArrayList;
import java.util.Collection;
import java.util.List;
import java.util.Map;
+import java.util.Objects;
import java.util.Set;
+import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;
+import javax.annotation.Nullable;
+
import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Preconditions;
import org.slf4j.Logger;
@@ -46,6 +50,8 @@ import org.apache.hadoop.fs.s3a.S3AInstrumentation;
import org.apache.hadoop.fs.s3a.Tristate;
import org.apache.hadoop.util.ReflectionUtils;
+import static org.apache.hadoop.fs.s3a.Constants.DEFAULT_METADATASTORE_METADATA_TTL;
+import static org.apache.hadoop.fs.s3a.Constants.METADATASTORE_METADATA_TTL;
import static org.apache.hadoop.fs.s3a.Constants.S3_METADATA_STORE_IMPL;
import static org.apache.hadoop.fs.s3a.Statistic.S3GUARD_METADATASTORE_PUT_PATH_LATENCY;
import static org.apache.hadoop.fs.s3a.Statistic.S3GUARD_METADATASTORE_PUT_PATH_REQUEST;
@@ -142,15 +148,17 @@ public final class S3Guard {
* @param ms MetadataStore to {@code put()} into.
* @param status status to store
* @param instrumentation instrumentation of the s3a file system
+ * @param timeProvider Time provider to use when writing entries
* @return The same status as passed in
* @throws IOException if metadata store update failed
*/
@RetryTranslated
public static S3AFileStatus putAndReturn(MetadataStore ms,
S3AFileStatus status,
- S3AInstrumentation instrumentation) throws IOException {
+ S3AInstrumentation instrumentation,
+ ITtlTimeProvider timeProvider) throws IOException {
long startTimeNano = System.nanoTime();
- ms.put(new PathMetadata(status));
+ S3Guard.putWithTtl(ms, new PathMetadata(status), timeProvider);
instrumentation.addValueToQuantiles(S3GUARD_METADATASTORE_PUT_PATH_LATENCY,
(System.nanoTime() - startTimeNano));
instrumentation.incrementCounter(S3GUARD_METADATASTORE_PUT_PATH_REQUEST, 1);
@@ -196,7 +204,7 @@ public final class S3Guard {
* @param backingStatuses Directory listing from the backing store.
* @param dirMeta Directory listing from MetadataStore. May be null.
* @param isAuthoritative State of authoritative mode
- * @param timeProvider Time provider for testing.
+ * @param timeProvider Time provider to use when updating entries
* @return Final result of directory listing.
* @throws IOException if metadata store update failed
*/
@@ -242,7 +250,7 @@ public final class S3Guard {
if (status != null
&& s.getModificationTime() > status.getModificationTime()) {
LOG.debug("Update ms with newer metadata of: {}", status);
- ms.put(new PathMetadata(s));
+ S3Guard.putWithTtl(ms, new PathMetadata(s), timeProvider);
}
}
@@ -357,7 +365,7 @@ public final class S3Guard {
}
// Batched put
- ms.put(pathMetas);
+ S3Guard.putWithTtl(ms, pathMetas, timeProvider);
} catch (IOException ioe) {
LOG.error("MetadataStore#put() failure:", ioe);
}
@@ -462,7 +470,8 @@ public final class S3Guard {
}
public static void addAncestors(MetadataStore metadataStore,
- Path qualifiedPath, String username) throws IOException {
+ Path qualifiedPath, String username, ITtlTimeProvider timeProvider)
+ throws IOException {
Collection<PathMetadata> newDirs = new ArrayList<>();
Path parent = qualifiedPath.getParent();
while (!parent.isRoot()) {
@@ -476,7 +485,7 @@ public final class S3Guard {
}
parent = parent.getParent();
}
- metadataStore.put(newDirs);
+ S3Guard.putWithTtl(metadataStore, newDirs, timeProvider);
}
private static void addMoveStatus(Collection<Path> srcPaths,
@@ -514,17 +523,6 @@ public final class S3Guard {
}
/**
- * This interface is defined for testing purposes.
- * TTL can be tested by implementing this interface and setting is as
- * {@code S3Guard.ttlTimeProvider}. By doing this, getNow() can return any
- * value preferred and flaky tests could be avoided.
- */
- public interface ITtlTimeProvider {
- long getNow();
- long getAuthoritativeDirTtl();
- }
-
- /**
* Runtime implementation for TTL Time Provider interface.
*/
public static class TtlTimeProvider implements ITtlTimeProvider {
@@ -534,34 +532,127 @@ public final class S3Guard {
this.authoritativeDirTtl = authoritativeDirTtl;
}
+ public TtlTimeProvider(Configuration conf) {
+ this.authoritativeDirTtl =
+ conf.getTimeDuration(METADATASTORE_METADATA_TTL,
+ DEFAULT_METADATASTORE_METADATA_TTL, TimeUnit.MILLISECONDS);
+ }
+
@Override
public long getNow() {
return System.currentTimeMillis();
}
- @Override public long getAuthoritativeDirTtl() {
+ @Override public long getMetadataTtl() {
return authoritativeDirTtl;
}
+
+ @Override
+ public boolean equals(final Object o) {
+ if (this == o) { return true; }
+ if (o == null || getClass() != o.getClass()) { return false; }
+ final TtlTimeProvider that = (TtlTimeProvider) o;
+ return authoritativeDirTtl == that.authoritativeDirTtl;
+ }
+
+ @Override
+ public int hashCode() {
+ return Objects.hash(authoritativeDirTtl);
+ }
+
+ @Override
+ public String toString() {
+ final StringBuilder sb = new StringBuilder(
+ "TtlTimeProvider{");
+ sb.append("authoritativeDirTtl=").append(authoritativeDirTtl);
+ sb.append(" millis}");
+ return sb.toString();
+ }
}
public static void putWithTtl(MetadataStore ms, DirListingMetadata dirMeta,
ITtlTimeProvider timeProvider)
throws IOException {
dirMeta.setLastUpdated(timeProvider.getNow());
+ dirMeta.getListing()
+ .forEach(pm -> pm.setLastUpdated(timeProvider.getNow()));
ms.put(dirMeta);
}
- public static DirListingMetadata listChildrenWithTtl(MetadataStore ms,
- Path path, ITtlTimeProvider timeProvider)
+ public static void putWithTtl(MetadataStore ms, PathMetadata fileMeta,
+ @Nullable ITtlTimeProvider timeProvider) throws IOException {
+ if (timeProvider != null) {
+ fileMeta.setLastUpdated(timeProvider.getNow());
+ } else {
+ LOG.debug("timeProvider is null, put {} without setting last_updated",
+ fileMeta);
+ }
+ ms.put(fileMeta);
+ }
+
+ public static void putWithTtl(MetadataStore ms,
+ Collection<PathMetadata> fileMetas,
+ @Nullable ITtlTimeProvider timeProvider)
throws IOException {
- long ttl = timeProvider.getAuthoritativeDirTtl();
+ if (timeProvider != null) {
+ final long now = timeProvider.getNow();
+ fileMetas.forEach(fileMeta -> fileMeta.setLastUpdated(now));
+ } else {
+ LOG.debug("timeProvider is null, put {} without setting last_updated",
+ fileMetas);
+ }
+ ms.put(fileMetas);
+ }
+
+ public static PathMetadata getWithTtl(MetadataStore ms, Path path,
+ @Nullable ITtlTimeProvider timeProvider) throws IOException {
+ final PathMetadata pathMetadata = ms.get(path);
+ // if timeProvider is null let's return with what the ms has
+ if (timeProvider == null) {
+ LOG.debug("timeProvider is null, returning pathMetadata as is");
+ return pathMetadata;
+ }
+
+ long ttl = timeProvider.getMetadataTtl();
+
+ if (pathMetadata != null) {
+ // Special case: the pathmetadata's last updated is 0. This can happen
+ // eg. with an old db using this implementation
+ if (pathMetadata.getLastUpdated() == 0) {
+ LOG.debug("PathMetadata TTL for {} is 0, so it will be returned as "
+ + "not expired.");
+ return pathMetadata;
+ }
+
+ if (!pathMetadata.isExpired(ttl, timeProvider.getNow())) {
+ return pathMetadata;
+ } else {
+ LOG.debug("PathMetadata TTl for {} is expired in metadata store.",
+ path);
+ return null;
+ }
+ }
+ return null;
+ }
+
+ public static DirListingMetadata listChildrenWithTtl(MetadataStore ms,
+ Path path, @Nullable ITtlTimeProvider timeProvider)
+ throws IOException {
DirListingMetadata dlm = ms.listChildren(path);
- if(dlm != null && dlm.isAuthoritative()
+ if (timeProvider == null) {
+ LOG.debug("timeProvider is null, returning DirListingMetadata as is");
+ return dlm;
+ }
+
+ long ttl = timeProvider.getMetadataTtl();
+
+ if (dlm != null && dlm.isAuthoritative()
&& dlm.isExpired(ttl, timeProvider.getNow())) {
dlm.setAuthoritative(false);
}
return dlm;
}
+
}
diff --git a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/s3guard/S3GuardTool.java b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/s3guard/S3GuardTool.java
index 397a9cb..dedb849 100644
--- a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/s3guard/S3GuardTool.java
+++ b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/s3guard/S3GuardTool.java
@@ -707,7 +707,8 @@ public abstract class S3GuardTool extends Configured implements Tool {
}
S3AFileStatus dir = DynamoDBMetadataStore.makeDirStatus(parent,
f.getOwner());
- getStore().put(new PathMetadata(dir));
+ S3Guard.putWithTtl(getStore(), new PathMetadata(dir),
+ getFilesystem().getTtlTimeProvider());
dirCache.add(parent);
parent = parent.getParent();
}
@@ -741,7 +742,8 @@ public abstract class S3GuardTool extends Configured implements Tool {
located.getVersionId());
}
putParentsIfNotPresent(child);
- getStore().put(new PathMetadata(child));
+ S3Guard.putWithTtl(getStore(), new PathMetadata(child),
+ getFilesystem().getTtlTimeProvider());
items++;
}
return items;
@@ -1073,7 +1075,8 @@ public abstract class S3GuardTool extends Configured implements Tool {
}
try {
- getStore().prune(divide, keyPrefix);
+ getStore().prune(MetadataStore.PruneMode.ALL_BY_MODTIME, divide,
+ keyPrefix);
} catch (UnsupportedOperationException e){
errorln("Prune operation not supported in metadata store.");
}
diff --git a/hadoop-tools/hadoop-aws/src/site/markdown/tools/hadoop-aws/s3guard.md b/hadoop-tools/hadoop-aws/src/site/markdown/tools/hadoop-aws/s3guard.md
index 94dc89b..337fc95 100644
--- a/hadoop-tools/hadoop-aws/src/site/markdown/tools/hadoop-aws/s3guard.md
+++ b/hadoop-tools/hadoop-aws/src/site/markdown/tools/hadoop-aws/s3guard.md
@@ -181,8 +181,8 @@ removed on `S3AFileSystem` level.
```xml
<property>
- <name>fs.s3a.metadatastore.authoritative.dir.ttl</name>
- <value>3600000</value>
+ <name>fs.s3a.metadatastore.metadata.ttl</name>
+ <value>15m</value>
</property>
```
diff --git a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/ITestS3GuardOutOfBandOperations.java b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/ITestS3GuardOutOfBandOperations.java
index 6dbe6f9..2af9a0a 100644
--- a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/ITestS3GuardOutOfBandOperations.java
+++ b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/ITestS3GuardOutOfBandOperations.java
@@ -24,6 +24,7 @@ import java.net.URI;
import java.util.Arrays;
import java.util.Collection;
import java.util.UUID;
+import java.util.concurrent.atomic.AtomicLong;
import java.util.stream.Collectors;
import org.junit.Assume;
@@ -37,20 +38,32 @@ import org.apache.hadoop.fs.s3a.s3guard.NullMetadataStore;
import org.apache.hadoop.io.IOUtils;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FSDataOutputStream;
+import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.s3a.impl.ChangeDetectionPolicy;
import org.apache.hadoop.fs.s3a.impl.ChangeDetectionPolicy.Source;
import org.apache.hadoop.fs.s3a.s3guard.DirListingMetadata;
import org.apache.hadoop.fs.s3a.s3guard.MetadataStore;
-
+import org.apache.hadoop.fs.s3a.s3guard.PathMetadata;
+import org.apache.hadoop.fs.s3a.s3guard.ITtlTimeProvider;
+import org.apache.hadoop.fs.LocatedFileStatus;
+import org.apache.hadoop.fs.RemoteIterator;
+
+import static org.apache.hadoop.fs.contract.ContractTestUtils.touch;
+import static org.apache.hadoop.fs.s3a.Constants.DEFAULT_METADATASTORE_METADATA_TTL;
+import static org.apache.hadoop.fs.s3a.Constants.METADATASTORE_METADATA_TTL;
import static org.apache.hadoop.fs.s3a.S3ATestUtils.removeBaseAndBucketOverrides;
import static org.apache.hadoop.test.LambdaTestUtils.eventually;
import static org.junit.Assume.assumeTrue;
+import static org.apache.hadoop.fs.contract.ContractTestUtils.readBytesToString;
import static org.apache.hadoop.fs.contract.ContractTestUtils.writeTextFile;
import static org.apache.hadoop.fs.s3a.Constants.METADATASTORE_AUTHORITATIVE;
import static org.apache.hadoop.fs.s3a.Constants.S3_METADATA_STORE_IMPL;
import static org.apache.hadoop.fs.s3a.S3ATestUtils.metadataStorePersistsAuthoritativeBit;
import static org.apache.hadoop.test.LambdaTestUtils.intercept;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
/**
*
@@ -115,7 +128,7 @@ public class ITestS3GuardOutOfBandOperations extends AbstractS3ATestBase {
* Test array for parameterized test runs.
* @return a list of parameter tuples.
*/
- @Parameterized.Parameters
+ @Parameterized.Parameters(name="auth={0}")
public static Collection<Object[]> params() {
return Arrays.asList(new Object[][]{
{true}, {false}
@@ -190,8 +203,11 @@ public class ITestS3GuardOutOfBandOperations extends AbstractS3ATestBase {
URI uri = testFS.getUri();
removeBaseAndBucketOverrides(uri.getHost(), config,
- METADATASTORE_AUTHORITATIVE);
+ METADATASTORE_AUTHORITATIVE,
+ METADATASTORE_METADATA_TTL);
config.setBoolean(METADATASTORE_AUTHORITATIVE, authoritativeMode);
+ config.setLong(METADATASTORE_METADATA_TTL,
+ DEFAULT_METADATASTORE_METADATA_TTL);
final S3AFileSystem gFs = createFS(uri, config);
// set back the same metadata store instance
gFs.setMetadataStore(realMs);
@@ -272,6 +288,292 @@ public class ITestS3GuardOutOfBandOperations extends AbstractS3ATestBase {
}
/**
+ * Tests that tombstone expiry is implemented, so if a file is created raw
+ * while the tombstone exist in ms for with the same name then S3Guard will
+ * check S3 for the file.
+ *
+ * Seq: create guarded; delete guarded; create raw (same path); read guarded;
+ * This will fail if no tombstone expiry is set
+ *
+ * @throws Exception
+ */
+ @Test
+ public void testTombstoneExpiryGuardedDeleteRawCreate() throws Exception {
+ boolean allowAuthoritative = authoritative;
+ Path testFilePath = path("TEGDRC-" + UUID.randomUUID() + "/file");
+ LOG.info("Allow authoritative param: {}", allowAuthoritative);
+ String originalText = "some test";
+ String newText = "the new originalText for test";
+
+ final ITtlTimeProvider originalTimeProvider =
+ guardedFs.getTtlTimeProvider();
+ try {
+ final AtomicLong now = new AtomicLong(1);
+ final AtomicLong metadataTtl = new AtomicLong(1);
+
+ // SET TTL TIME PROVIDER FOR TESTING
+ ITtlTimeProvider testTimeProvider =
+ new ITtlTimeProvider() {
+ @Override public long getNow() {
+ return now.get();
+ }
+
+ @Override public long getMetadataTtl() {
+ return metadataTtl.get();
+ }
+ };
+ guardedFs.setTtlTimeProvider(testTimeProvider);
+
+ // CREATE GUARDED
+ createAndAwaitFs(guardedFs, testFilePath, originalText);
+
+ // DELETE GUARDED
+ deleteGuardedTombstoned(guardedFs, testFilePath, now);
+
+ // CREATE RAW
+ createAndAwaitFs(rawFS, testFilePath, newText);
+
+ // CHECK LISTING - THE FILE SHOULD NOT BE THERE, EVEN IF IT'S CREATED RAW
+ checkListingDoesNotContainPath(guardedFs, testFilePath);
+
+ // CHANGE TTL SO ENTRY (& TOMBSTONE METADATA) WILL EXPIRE
+ long willExpire = now.get() + metadataTtl.get() + 1L;
+ now.set(willExpire);
+ LOG.info("willExpire: {}, ttlNow: {}; ttlTTL: {}", willExpire,
+ testTimeProvider.getNow(), testTimeProvider.getMetadataTtl());
+
+ // READ GUARDED
+ String newRead = readBytesToString(guardedFs, testFilePath,
+ newText.length());
+
+ // CHECK LISTING - THE FILE SHOULD BE THERE, TOMBSTONE EXPIRED
+ checkListingContainsPath(guardedFs, testFilePath);
+
+ // we can assert that the originalText is the new one, which created raw
+ LOG.info("Old: {}, New: {}, Read: {}", originalText, newText, newRead);
+ assertEquals("The text should be modified with a new.", newText,
+ newRead);
+ } finally {
+ guardedFs.delete(testFilePath, true);
+ guardedFs.setTtlTimeProvider(originalTimeProvider);
+ }
+ }
+
+ private void createAndAwaitFs(S3AFileSystem fs, Path testFilePath,
+ String text) throws Exception {
+ writeTextFile(fs, testFilePath, text, true);
+ final FileStatus newStatus = awaitFileStatus(fs, testFilePath);
+ assertNotNull("Newly created file status should not be null.", newStatus);
+ }
+
+ private void deleteGuardedTombstoned(S3AFileSystem guarded,
+ Path testFilePath, AtomicLong now) throws Exception {
+ guarded.delete(testFilePath, true);
+
+ final PathMetadata metadata =
+ guarded.getMetadataStore().get(testFilePath);
+ assertNotNull("Created file metadata should not be null in ms",
+ metadata);
+ assertEquals("Created file metadata last_updated should equal with "
+ + "mocked now", now.get(), metadata.getLastUpdated());
+
+ intercept(FileNotFoundException.class, testFilePath.toString(),
+ "This file should throw FNFE when reading through "
+ + "the guarded fs, and the metadatastore tombstoned the file.",
+ () -> guarded.getFileStatus(testFilePath));
+ }
+
+ /**
+ * createNonRecursive must fail if the parent directory has been deleted,
+ * and succeed if the tombstone has expired and the directory has been
+ * created out of band.
+ */
+ @Test
+ public void testCreateNonRecursiveFailsIfParentDeleted() throws Exception {
+ LOG.info("Authoritative mode: {}", authoritative);
+
+ String dirToDelete = methodName + UUID.randomUUID().toString();
+ String fileToTry = dirToDelete + "/theFileToTry";
+
+ final Path dirPath = path(dirToDelete);
+ final Path filePath = path(fileToTry);
+
+ // Create a directory with
+ ITtlTimeProvider mockTimeProvider = mock(ITtlTimeProvider.class);
+ ITtlTimeProvider originalTimeProvider = guardedFs.getTtlTimeProvider();
+
+ try {
+ guardedFs.setTtlTimeProvider(mockTimeProvider);
+ when(mockTimeProvider.getNow()).thenReturn(100L);
+ when(mockTimeProvider.getMetadataTtl()).thenReturn(5L);
+
+ // CREATE DIRECTORY
+ guardedFs.mkdirs(dirPath);
+
+ // DELETE DIRECTORY
+ guardedFs.delete(dirPath, true);
+
+ // WRITE TO DELETED DIRECTORY - FAIL
+ intercept(FileNotFoundException.class,
+ dirToDelete,
+ "createNonRecursive must fail if the parent directory has been deleted.",
+ () -> createNonRecursive(guardedFs, filePath));
+
+ // CREATE THE DIRECTORY RAW
+ rawFS.mkdirs(dirPath);
+ awaitFileStatus(rawFS, dirPath);
+
+ // SET TIME SO METADATA EXPIRES
+ when(mockTimeProvider.getNow()).thenReturn(110L);
+
+ // WRITE TO DELETED DIRECTORY - SUCCESS
+ createNonRecursive(guardedFs, filePath);
+
+ } finally {
+ guardedFs.delete(filePath, true);
+ guardedFs.delete(dirPath, true);
+ guardedFs.setTtlTimeProvider(originalTimeProvider);
+ }
+ }
+
+ /**
+ * When lastUpdated = 0 the entry should not expire. This is a special case
+ * eg. for old metadata entries
+ */
+ @Test
+ public void testLastUpdatedZeroWontExpire() throws Exception {
+ LOG.info("Authoritative mode: {}", authoritative);
+
+ String testFile = methodName + UUID.randomUUID().toString() +
+ "/theFileToTry";
+
+ long ttl = 10L;
+ final Path filePath = path(testFile);
+
+ ITtlTimeProvider mockTimeProvider = mock(ITtlTimeProvider.class);
+ ITtlTimeProvider originalTimeProvider = guardedFs.getTtlTimeProvider();
+
+ try {
+ guardedFs.setTtlTimeProvider(mockTimeProvider);
+ when(mockTimeProvider.getMetadataTtl()).thenReturn(ttl);
+
+ // create a file while the NOW is 0, so it will set 0 as the last_updated
+ when(mockTimeProvider.getNow()).thenReturn(0L);
+ touch(guardedFs, filePath);
+ deleteFile(guardedFs, filePath);
+
+ final PathMetadata pathMetadata =
+ guardedFs.getMetadataStore().get(filePath);
+ assertNotNull("pathMetadata should not be null after deleting with "
+ + "tombstones", pathMetadata);
+ assertEquals("pathMetadata lastUpdated field should be 0", 0,
+ pathMetadata.getLastUpdated());
+
+ // set the time, so the metadata would expire
+ when(mockTimeProvider.getNow()).thenReturn(2*ttl);
+ intercept(FileNotFoundException.class, filePath.toString(),
+ "This file should throw FNFE when reading through "
+ + "the guarded fs, and the metadatastore tombstoned the file. "
+ + "The tombstone won't expire if lastUpdated is set to 0.",
+ () -> guardedFs.getFileStatus(filePath));
+
+ } finally {
+ guardedFs.delete(filePath, true);
+ guardedFs.setTtlTimeProvider(originalTimeProvider);
+ }
+ }
+
+ /**
+ * 1. File is deleted in the guarded fs.
+ * 2. File is replaced in the raw fs.
+ * 3. File is deleted in the guarded FS after the expiry time.
+ * 4. File MUST NOT exist in raw FS.
+ */
+ @Test
+ public void deleteAfterTombstoneExpiryOobCreate() throws Exception {
+ LOG.info("Authoritative mode: {}", authoritative);
+
+ String testFile = methodName + UUID.randomUUID().toString() +
+ "/theFileToTry";
+
+ long ttl = 10L;
+ final Path filePath = path(testFile);
+
+ ITtlTimeProvider mockTimeProvider = mock(ITtlTimeProvider.class);
+ ITtlTimeProvider originalTimeProvider = guardedFs.getTtlTimeProvider();
+
+ try {
+ guardedFs.setTtlTimeProvider(mockTimeProvider);
+ when(mockTimeProvider.getMetadataTtl()).thenReturn(ttl);
+
+ // CREATE AND DELETE WITH GUARDED FS
+ when(mockTimeProvider.getNow()).thenReturn(100L);
+ touch(guardedFs, filePath);
+ deleteFile(guardedFs, filePath);
+
+ final PathMetadata pathMetadata =
+ guardedFs.getMetadataStore().get(filePath);
+ assertNotNull("pathMetadata should not be null after deleting with "
+ + "tombstones", pathMetadata);
+
+ // REPLACE WITH RAW FS
+ touch(rawFS, filePath);
+ awaitFileStatus(rawFS, filePath);
+
+ // SET EXPIRY TIME, SO THE TOMBSTONE IS EXPIRED
+ when(mockTimeProvider.getNow()).thenReturn(100L + 2 * ttl);
+
+ // DELETE IN GUARDED FS
+ guardedFs.delete(filePath, true);
+
+ // FILE MUST NOT EXIST IN RAW
+ intercept(FileNotFoundException.class, filePath.toString(),
+ "This file should throw FNFE when reading through "
+ + "the raw fs, and the guarded fs deleted the file.",
+ () -> rawFS.getFileStatus(filePath));
+
+ } finally {
+ guardedFs.delete(filePath, true);
+ guardedFs.setTtlTimeProvider(originalTimeProvider);
+ }
+ }
+
+ private void checkListingDoesNotContainPath(S3AFileSystem fs, Path filePath)
+ throws IOException {
+ final RemoteIterator<LocatedFileStatus> listIter =
+ fs.listFiles(filePath.getParent(), false);
+ while (listIter.hasNext()) {
+ final LocatedFileStatus lfs = listIter.next();
+ assertNotEquals("The tombstone has not been expired, so must not be"
+ + " listed.", filePath, lfs.getPath());
+ }
+ LOG.info("{}; file omitted from listFiles listing as expected.", filePath);
+
+ final FileStatus[] fileStatuses = fs.listStatus(filePath.getParent());
+ for (FileStatus fileStatus : fileStatuses) {
+ assertNotEquals("The tombstone has not been expired, so must not be"
+ + " listed.", filePath, fileStatus.getPath());
+ }
+ LOG.info("{}; file omitted from listStatus as expected.", filePath);
+ }
+
+ private void checkListingContainsPath(S3AFileSystem fs, Path filePath)
+ throws IOException {
+ final RemoteIterator<LocatedFileStatus> listIter =
+ fs.listFiles(filePath.getParent(), false);
+
+ while (listIter.hasNext()) {
+ final LocatedFileStatus lfs = listIter.next();
+ assertEquals(filePath, lfs.getPath());
+ }
+
+ final FileStatus[] fileStatuses = fs.listStatus(filePath.getParent());
+ for (FileStatus fileStatus : fileStatuses)
+ assertEquals("The file should be listed in fs.listStatus",
+ filePath, fileStatus.getPath());
+ }
+
+ /**
* Perform an out-of-band delete.
* @param testFilePath filename
* @param allowAuthoritative is the store authoritative
@@ -384,12 +686,18 @@ public class ITestS3GuardOutOfBandOperations extends AbstractS3ATestBase {
// Create initial statusIterator with guarded ms
writeTextFile(guardedFs, testFilePath, firstText, true);
// and cache the value for later
- final FileStatus origStatus = awaitFileStatus(rawFS, testFilePath);
+ final S3AFileStatus origStatus = awaitFileStatus(rawFS, testFilePath);
+ assertNotNull("No etag in raw status " + origStatus,
+ origStatus.getETag());
// Do a listing to cache the lists. Should be authoritative if it's set.
- final FileStatus[] origList = guardedFs.listStatus(testDirPath);
+ final S3AFileStatus[] origList = (S3AFileStatus[]) guardedFs.listStatus(
+ testDirPath);
assertArraySize("Added one file to the new dir, so the number of "
+ "files in the dir should be one.", 1, origList);
+ S3AFileStatus origGuardedFileStatus = origList[0];
+ assertNotNull("No etag in origGuardedFileStatus" + origGuardedFileStatus,
+ origGuardedFileStatus.getETag());
final DirListingMetadata dirListingMetadata =
realMs.listChildren(guardedFs.qualify(testDirPath));
assertListingAuthority(allowAuthoritative, dirListingMetadata);
@@ -406,7 +714,8 @@ public class ITestS3GuardOutOfBandOperations extends AbstractS3ATestBase {
final FileStatus rawFileStatus = awaitFileStatus(rawFS, testFilePath);
// check listing in guarded store.
- final FileStatus[] modList = guardedFs.listStatus(testDirPath);
+ final S3AFileStatus[] modList = (S3AFileStatus[]) guardedFs.listStatus(
+ testDirPath);
assertArraySize("Added one file to the new dir then modified it, "
+ "so the number of files in the dir should be one.", 1,
modList);
@@ -479,6 +788,24 @@ public class ITestS3GuardOutOfBandOperations extends AbstractS3ATestBase {
expectedLength, guardedLength);
}
}
+ // check etag. This relies on first and second text being different.
+ final S3AFileStatus rawS3AFileStatus = (S3AFileStatus) rawFileStatus;
+ final S3AFileStatus guardedS3AFileStatus = (S3AFileStatus)
+ guardedFileStatus;
+ final S3AFileStatus origS3AFileStatus = (S3AFileStatus) origStatus;
+ assertNotEquals(
+ "raw status still no to date with changes" + stats,
+ origS3AFileStatus.getETag(), rawS3AFileStatus.getETag());
+ if (allowAuthoritative) {
+ // expect the etag to be out of sync
+ assertNotEquals(
+ "etag in authoritative table with " + stats,
+ rawS3AFileStatus.getETag(), guardedS3AFileStatus.getETag());
+ } else {
+ assertEquals(
+ "etag in non-authoritative table with " + stats,
+ rawS3AFileStatus.getETag(), guardedS3AFileStatus.getETag());
+ }
// Next: modification time.
long rawModTime = rawFileStatus.getModificationTime();
long guardedModTime = guardedFileStatus.getModificationTime();
@@ -631,12 +958,18 @@ public class ITestS3GuardOutOfBandOperations extends AbstractS3ATestBase {
* @return the file status.
* @throws Exception failure
*/
- private FileStatus awaitFileStatus(S3AFileSystem fs,
+ private S3AFileStatus awaitFileStatus(S3AFileSystem fs,
final Path testFilePath)
throws Exception {
- return eventually(
+ return (S3AFileStatus) eventually(
STABILIZATION_TIME, PROBE_INTERVAL_MILLIS,
() -> fs.getFileStatus(testFilePath));
}
+ private FSDataOutputStream createNonRecursive(FileSystem fs, Path path)
+ throws Exception {
+ return fs
+ .createNonRecursive(path, false, 4096, (short) 3, (short) 4096, null);
+ }
+
}
diff --git a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/ITestS3GuardTtl.java b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/ITestS3GuardTtl.java
index d24009c..9622322 100644
--- a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/ITestS3GuardTtl.java
+++ b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/ITestS3GuardTtl.java
@@ -18,13 +18,22 @@
package org.apache.hadoop.fs.s3a;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.UUID;
+
import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.s3a.s3guard.DirListingMetadata;
+import org.apache.hadoop.fs.s3a.s3guard.ITtlTimeProvider;
import org.apache.hadoop.fs.s3a.s3guard.MetadataStore;
import org.apache.hadoop.fs.s3a.s3guard.S3Guard;
+
import org.junit.Assume;
import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
import static org.apache.hadoop.fs.contract.ContractTestUtils.touch;
import static org.apache.hadoop.fs.s3a.Constants.METADATASTORE_AUTHORITATIVE;
@@ -36,8 +45,37 @@ import static org.mockito.Mockito.when;
/**
* These tests are testing the S3Guard TTL (time to live) features.
*/
+@RunWith(Parameterized.class)
public class ITestS3GuardTtl extends AbstractS3ATestBase {
+ private final boolean authoritative;
+
+ /**
+ * Test array for parameterized test runs.
+ * @return a list of parameter tuples.
+ */
+ @Parameterized.Parameters
+ public static Collection<Object[]> params() {
+ return Arrays.asList(new Object[][]{
+ {true}, {false}
+ });
+ }
+
+ /**
+ * By changing the method name, the thread name is changed and
+ * so you can see in the logs which mode is being tested.
+ * @return a string to use for the thread namer.
+ */
+ @Override
+ protected String getMethodName() {
+ return super.getMethodName() +
+ (authoritative ? "-auth" : "-nonauth");
+ }
+
+ public ITestS3GuardTtl(boolean authoritative) {
+ this.authoritative = authoritative;
+ }
+
/**
* Patch the configuration - this test needs disabled filesystem caching.
* These tests modify the fs instance that would cause flaky tests.
@@ -47,11 +85,15 @@ public class ITestS3GuardTtl extends AbstractS3ATestBase {
protected Configuration createConfiguration() {
Configuration configuration = super.createConfiguration();
S3ATestUtils.disableFilesystemCaching(configuration);
- return S3ATestUtils.prepareTestConfiguration(configuration);
+ configuration =
+ S3ATestUtils.prepareTestConfiguration(configuration);
+ configuration.setBoolean(METADATASTORE_AUTHORITATIVE, authoritative);
+ return configuration;
}
@Test
public void testDirectoryListingAuthoritativeTtl() throws Exception {
+ LOG.info("Authoritative mode: {}", authoritative);
final S3AFileSystem fs = getFileSystem();
Assume.assumeTrue(fs.hasMetadataStore());
@@ -64,12 +106,12 @@ public class ITestS3GuardTtl extends AbstractS3ATestBase {
Assume.assumeTrue("MetadataStore should be authoritative for this test",
isMetadataStoreAuthoritative(getFileSystem().getConf()));
- S3Guard.ITtlTimeProvider mockTimeProvider =
- mock(S3Guard.ITtlTimeProvider.class);
- S3Guard.ITtlTimeProvider restoreTimeProvider = fs.getTtlTimeProvider();
+ ITtlTimeProvider mockTimeProvider =
+ mock(ITtlTimeProvider.class);
+ ITtlTimeProvider restoreTimeProvider = fs.getTtlTimeProvider();
fs.setTtlTimeProvider(mockTimeProvider);
when(mockTimeProvider.getNow()).thenReturn(100L);
- when(mockTimeProvider.getAuthoritativeDirTtl()).thenReturn(1L);
+ when(mockTimeProvider.getMetadataTtl()).thenReturn(1L);
Path dir = path("ttl/");
Path file = path("ttl/afile");
@@ -102,4 +144,146 @@ public class ITestS3GuardTtl extends AbstractS3ATestBase {
fs.setTtlTimeProvider(restoreTimeProvider);
}
}
+
+ @Test
+ public void testFileMetadataExpiresTtl() throws Exception {
+ LOG.info("Authoritative mode: {}", authoritative);
+
+ Path fileExpire1 = path("expirettl-" + UUID.randomUUID());
+ Path fileExpire2 = path("expirettl-" + UUID.randomUUID());
+ Path fileRetain = path("expirettl-" + UUID.randomUUID());
+
+ final S3AFileSystem fs = getFileSystem();
+ Assume.assumeTrue(fs.hasMetadataStore());
+ final MetadataStore ms = fs.getMetadataStore();
+
+ ITtlTimeProvider mockTimeProvider = mock(ITtlTimeProvider.class);
+ ITtlTimeProvider originalTimeProvider = fs.getTtlTimeProvider();
+
+ try {
+ fs.setTtlTimeProvider(mockTimeProvider);
+ when(mockTimeProvider.getMetadataTtl()).thenReturn(5L);
+
+ // set the time, so the fileExpire1 will expire
+ when(mockTimeProvider.getNow()).thenReturn(100L);
+ touch(fs, fileExpire1);
+ // set the time, so fileExpire2 will expire
+ when(mockTimeProvider.getNow()).thenReturn(101L);
+ touch(fs, fileExpire2);
+ // set the time, so fileRetain won't expire
+ when(mockTimeProvider.getNow()).thenReturn(109L);
+ touch(fs, fileRetain);
+ final FileStatus origFileRetainStatus = fs.getFileStatus(fileRetain);
+ // change time, so the first two file metadata is expired
+ when(mockTimeProvider.getNow()).thenReturn(110L);
+
+ // metadata is expired so this should refresh the metadata with
+ // last_updated to the getNow()
+ final FileStatus fileExpire1Status = fs.getFileStatus(fileExpire1);
+ assertNotNull(fileExpire1Status);
+ assertEquals(110L, ms.get(fileExpire1).getLastUpdated());
+
+ // metadata is expired so this should refresh the metadata with
+ // last_updated to the getNow()
+ final FileStatus fileExpire2Status = fs.getFileStatus(fileExpire2);
+ assertNotNull(fileExpire2Status);
+ assertEquals(110L, ms.get(fileExpire2).getLastUpdated());
+
+ final FileStatus fileRetainStatus = fs.getFileStatus(fileRetain);
+ assertEquals("Modification time of these files should be equal.",
+ origFileRetainStatus.getModificationTime(),
+ fileRetainStatus.getModificationTime());
+ assertNotNull(fileRetainStatus);
+ assertEquals(109L, ms.get(fileRetain).getLastUpdated());
+ } finally {
+ fs.delete(fileExpire1, true);
+ fs.delete(fileExpire2, true);
+ fs.delete(fileRetain, true);
+ fs.setTtlTimeProvider(originalTimeProvider);
+ }
+ }
+
+ /**
+ * create(tombstone file) must succeed irrespective of overwrite flag.
+ */
+ @Test
+ public void testCreateOnTombstonedFileSucceeds() throws Exception {
+ LOG.info("Authoritative mode: {}", authoritative);
+ final S3AFileSystem fs = getFileSystem();
+
+ String fileToTry = methodName + UUID.randomUUID().toString();
+
+ final Path filePath = path(fileToTry);
+
+ // Create a directory with
+ ITtlTimeProvider mockTimeProvider = mock(ITtlTimeProvider.class);
+ ITtlTimeProvider originalTimeProvider = fs.getTtlTimeProvider();
+
+ try {
+ fs.setTtlTimeProvider(mockTimeProvider);
+ when(mockTimeProvider.getNow()).thenReturn(100L);
+ when(mockTimeProvider.getMetadataTtl()).thenReturn(5L);
+
+ // CREATE A FILE
+ touch(fs, filePath);
+
+ // DELETE THE FILE - TOMBSTONE
+ fs.delete(filePath, true);
+
+ // CREATE THE SAME FILE WITHOUT ERROR DESPITE THE TOMBSTONE
+ touch(fs, filePath);
+
+ } finally {
+ fs.delete(filePath, true);
+ fs.setTtlTimeProvider(originalTimeProvider);
+ }
+ }
+
+ /**
+ * create("parent has tombstone") must always succeed (We dont check the
+ * parent), but after the file has been written, all entries up the tree
+ * must be valid. That is: the putAncestor code will correct everything
+ */
+ @Test
+ public void testCreateParentHasTombstone() throws Exception {
+ LOG.info("Authoritative mode: {}", authoritative);
+ final S3AFileSystem fs = getFileSystem();
+
+ String dirToDelete = methodName + UUID.randomUUID().toString();
+ String fileToTry = dirToDelete + "/theFileToTry";
+
+ final Path dirPath = path(dirToDelete);
+ final Path filePath = path(fileToTry);
+
+ // Create a directory with
+ ITtlTimeProvider mockTimeProvider = mock(ITtlTimeProvider.class);
+ ITtlTimeProvider originalTimeProvider = fs.getTtlTimeProvider();
+
+ try {
+ fs.setTtlTimeProvider(mockTimeProvider);
+ when(mockTimeProvider.getNow()).thenReturn(100L);
+ when(mockTimeProvider.getMetadataTtl()).thenReturn(5L);
+
+ // CREATE DIRECTORY
+ fs.mkdirs(dirPath);
+
+ // DELETE DIRECTORY
+ fs.delete(dirPath, true);
+
+ // WRITE TO DELETED DIRECTORY - SUCCESS
+ touch(fs, filePath);
+
+ // SET TIME SO METADATA EXPIRES
+ when(mockTimeProvider.getNow()).thenReturn(110L);
+
+ // WRITE TO DELETED DIRECTORY - SUCCESS
+ touch(fs, filePath);
+
+ } finally {
+ fs.delete(filePath, true);
+ fs.delete(dirPath, true);
+ fs.setTtlTimeProvider(originalTimeProvider);
+ }
+ }
+
}
diff --git a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/s3guard/AbstractS3GuardToolTestBase.java b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/s3guard/AbstractS3GuardToolTestBase.java
index 9241686..f616190 100644
--- a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/s3guard/AbstractS3GuardToolTestBase.java
+++ b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/s3guard/AbstractS3GuardToolTestBase.java
@@ -280,7 +280,7 @@ public abstract class AbstractS3GuardToolTestBase extends AbstractS3ATestBase {
"This child should have been kept (prefix restriction).", 1);
} finally {
getFileSystem().delete(parent, true);
- ms.prune(Long.MAX_VALUE);
+ ms.prune(MetadataStore.PruneMode.ALL_BY_MODTIME, Long.MAX_VALUE);
}
}
diff --git a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/s3guard/ITestDynamoDBMetadataStore.java b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/s3guard/ITestDynamoDBMetadataStore.java
index 149d1f3..5241dd4 100644
--- a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/s3guard/ITestDynamoDBMetadataStore.java
+++ b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/s3guard/ITestDynamoDBMetadataStore.java
@@ -230,6 +230,12 @@ public class ITestDynamoDBMetadataStore extends MetadataStoreTestBase {
IOUtils.cleanupWithLogger(LOG, fileSystem);
}
+ @Override protected String getPathStringForPrune(String path)
+ throws Exception {
+ String b = getTestBucketName(getContract().getFileSystem().getConf());
+ return "/" + b + "/dir2";
+ }
+
/**
* Each contract has its own S3AFileSystem and DynamoDBMetadataStore objects.
*/
@@ -437,7 +443,7 @@ public class ITestDynamoDBMetadataStore extends MetadataStoreTestBase {
}
// move the old paths to new paths and verify
- ms.move(pathsToDelete, newMetas);
+ ms.move(pathsToDelete, newMetas, getTtlTimeProvider());
assertEquals(0, ms.listChildren(oldDir).withoutTombstones().numEntries());
if (newMetas != null) {
assertTrue(CollectionUtils
@@ -650,7 +656,7 @@ public class ITestDynamoDBMetadataStore extends MetadataStoreTestBase {
1024, false))
);
- ddbms.move(fullSourcePaths, pathsToCreate);
+ ddbms.move(fullSourcePaths, pathsToCreate, getTtlTimeProvider());
// assert that all the ancestors should have been populated automatically
assertCached(testRoot + "/c");
diff --git a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/s3guard/ITestDynamoDBMetadataStoreScale.java b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/s3guard/ITestDynamoDBMetadataStoreScale.java
index 301ba16..95c607a 100644
--- a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/s3guard/ITestDynamoDBMetadataStoreScale.java
+++ b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/s3guard/ITestDynamoDBMetadataStoreScale.java
@@ -243,7 +243,8 @@ public class ITestDynamoDBMetadataStoreScale
if (pruneItems == BATCH_SIZE) {
describe("pruning files");
- ddbms.prune(Long.MAX_VALUE /* all files */);
+ ddbms.prune(MetadataStore.PruneMode.ALL_BY_MODTIME,
+ Long.MAX_VALUE /* all files */);
pruneItems = 0;
}
if (tracker.probe()) {
@@ -305,7 +306,7 @@ public class ITestDynamoDBMetadataStoreScale
private void retryingDelete(final Path path) {
try {
ddbms.getInvoker().retry("Delete ", path.toString(), true,
- () -> ddbms.delete(path));
+ () -> ddbms.delete(path, new S3Guard.TtlTimeProvider(getConf())));
} catch (IOException e) {
LOG.warn("Failed to delete {}: ", path, e);
}
diff --git a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/s3guard/MetadataStoreTestBase.java b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/s3guard/MetadataStoreTestBase.java
index 55f4707..754da0d 100644
--- a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/s3guard/MetadataStoreTestBase.java
+++ b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/s3guard/MetadataStoreTestBase.java
@@ -23,6 +23,7 @@ import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.HashSet;
+import java.util.List;
import java.util.Set;
import com.google.common.collect.Sets;
@@ -68,6 +69,7 @@ public abstract class MetadataStoreTestBase extends HadoopTestBase {
static final FsPermission PERMISSION = null;
static final String GROUP = null;
private final long accessTime = 0;
+ private static ITtlTimeProvider ttlTimeProvider;
/**
* Each test should override this. Will use a new Configuration instance.
@@ -123,6 +125,8 @@ public abstract class MetadataStoreTestBase extends HadoopTestBase {
assertNotNull("null MetadataStore", ms);
assertNotNull("null FileSystem", contract.getFileSystem());
ms.initialize(contract.getFileSystem());
+ ttlTimeProvider =
+ new S3Guard.TtlTimeProvider(contract.getFileSystem().getConf());
}
@After
@@ -310,7 +314,7 @@ public abstract class MetadataStoreTestBase extends HadoopTestBase {
public void testDelete() throws Exception {
setUpDeleteTest();
- ms.delete(strToPath("/ADirectory1/db1/file2"));
+ ms.delete(strToPath("/ADirectory1/db1/file2"), ttlTimeProvider);
/* Ensure delete happened. */
assertDirectorySize("/ADirectory1/db1", 1);
@@ -338,7 +342,7 @@ public abstract class MetadataStoreTestBase extends HadoopTestBase {
if (!allowMissing()) {
assertCached(p + "/ADirectory1/db1");
}
- ms.deleteSubtree(strToPath(p + "/ADirectory1/db1/"));
+ ms.deleteSubtree(strToPath(p + "/ADirectory1/db1/"), ttlTimeProvider);
assertEmptyDirectory(p + "/ADirectory1");
assertDeleted(p + "/ADirectory1/db1");
@@ -358,7 +362,7 @@ public abstract class MetadataStoreTestBase extends HadoopTestBase {
public void testDeleteRecursiveRoot() throws Exception {
setUpDeleteTest();
- ms.deleteSubtree(strToPath("/"));
+ ms.deleteSubtree(strToPath("/"), ttlTimeProvider);
assertDeleted("/ADirectory1");
assertDeleted("/ADirectory2");
assertDeleted("/ADirectory2/db1");
@@ -369,10 +373,10 @@ public abstract class MetadataStoreTestBase extends HadoopTestBase {
@Test
public void testDeleteNonExisting() throws Exception {
// Path doesn't exist, but should silently succeed
- ms.delete(strToPath("/bobs/your/uncle"));
+ ms.delete(strToPath("/bobs/your/uncle"), ttlTimeProvider);
// Ditto.
- ms.deleteSubtree(strToPath("/internets"));
+ ms.deleteSubtree(strToPath("/internets"), ttlTimeProvider);
}
@@ -408,7 +412,7 @@ public abstract class MetadataStoreTestBase extends HadoopTestBase {
}
if (!(ms instanceof NullMetadataStore)) {
- ms.delete(strToPath(filePath));
+ ms.delete(strToPath(filePath), ttlTimeProvider);
meta = ms.get(strToPath(filePath));
assertTrue("Tombstone not left for deleted file", meta.isDeleted());
}
@@ -586,7 +590,7 @@ public abstract class MetadataStoreTestBase extends HadoopTestBase {
destMetas.add(new PathMetadata(makeDirStatus("/b1")));
destMetas.add(new PathMetadata(makeFileStatus("/b1/file1", 100)));
destMetas.add(new PathMetadata(makeFileStatus("/b1/file2", 100)));
- ms.move(srcPaths, destMetas);
+ ms.move(srcPaths, destMetas, ttlTimeProvider);
// Assert src is no longer there
dirMeta = ms.listChildren(strToPath("/a1"));
@@ -636,11 +640,11 @@ public abstract class MetadataStoreTestBase extends HadoopTestBase {
// Make sure delete is correct as well
if (!allowMissing()) {
- ms.delete(new Path(p2));
+ ms.delete(new Path(p2), ttlTimeProvider);
meta = ms.get(new Path(p1));
assertNotNull("Path should not have been deleted", meta);
}
- ms.delete(new Path(p1));
+ ms.delete(new Path(p1), ttlTimeProvider);
}
@Test
@@ -668,7 +672,7 @@ public abstract class MetadataStoreTestBase extends HadoopTestBase {
assertListingsEqual(ls.getListing(), "/pruneFiles/new",
"/pruneFiles/old");
}
- ms.prune(cutoff);
+ ms.prune(MetadataStore.PruneMode.ALL_BY_MODTIME, cutoff);
ls = ms.listChildren(strToPath("/pruneFiles"));
if (allowMissing()) {
assertDeleted("/pruneFiles/old");
@@ -698,7 +702,7 @@ public abstract class MetadataStoreTestBase extends HadoopTestBase {
Thread.sleep(1);
long cutoff = getTime();
- ms.prune(cutoff);
+ ms.prune(MetadataStore.PruneMode.ALL_BY_MODTIME, cutoff);
assertDeleted("/pruneDirs/dir/file");
}
@@ -728,7 +732,7 @@ public abstract class MetadataStoreTestBase extends HadoopTestBase {
ms.put(parentDirMd);
}
- ms.prune(time);
+ ms.prune(MetadataStore.PruneMode.ALL_BY_MODTIME, time);
DirListingMetadata listing;
for (String directory : directories) {
Path path = strToPath(directory);
@@ -765,7 +769,7 @@ public abstract class MetadataStoreTestBase extends HadoopTestBase {
ms.put(parentDirMd);
// prune the ms
- ms.prune(time);
+ ms.prune(MetadataStore.PruneMode.ALL_BY_MODTIME, time);
// get the directory listings
DirListingMetadata rootDirMd = ms.listChildren(strToPath(rootDir));
@@ -823,6 +827,89 @@ public abstract class MetadataStoreTestBase extends HadoopTestBase {
}
}
+ @Test
+ public void testPruneExpiredTombstones() throws Exception {
+ List<String> keepFilenames = new ArrayList<>(
+ Arrays.asList("/dir1/fileK1", "/dir1/fileK2", "/dir1/fileK3"));
+ List<String> removeFilenames = new ArrayList<>(
+ Arrays.asList("/dir1/fileR1", "/dir1/fileR2", "/dir1/fileR3"));
+
+ long cutoff = 9001;
+
+ for(String fN : keepFilenames) {
+ final PathMetadata pathMetadata = new PathMetadata(makeFileStatus(fN, 1));
+ pathMetadata.setLastUpdated(9002L);
+ ms.put(pathMetadata);
+ }
+
+ for(String fN : removeFilenames) {
+ final PathMetadata pathMetadata = new PathMetadata(makeFileStatus(fN, 1));
+ pathMetadata.setLastUpdated(9000L);
+ // tombstones are the deleted files!
+ pathMetadata.setIsDeleted(true);
+ ms.put(pathMetadata);
+ }
+
+ ms.prune(MetadataStore.PruneMode.TOMBSTONES_BY_LASTUPDATED, cutoff);
+
+ if (!allowMissing()) {
+ for (String fN : keepFilenames) {
+ final PathMetadata pathMetadata = ms.get(strToPath(fN));
+ assertNotNull("Kept files should be in the metastore after prune",
+ pathMetadata);
+ }
+ }
+
+ for(String fN : removeFilenames) {
+ final PathMetadata pathMetadata = ms.get(strToPath(fN));
+ assertNull("Expired tombstones should be removed from metastore after "
+ + "the prune.", pathMetadata);
+ }
+ }
+
+ @Test
+ public void testPruneExpiredTombstonesSpecifiedPath() throws Exception {
+ List<String> keepFilenames = new ArrayList<>(
+ Arrays.asList("/dir1/fileK1", "/dir1/fileK2", "/dir1/fileK3"));
+ List<String> removeFilenames = new ArrayList<>(
+ Arrays.asList("/dir2/fileR1", "/dir2/fileR2", "/dir2/fileR3"));
+
+ long cutoff = 9001;
+
+ // Both are expired. Difference is it will only delete the specified one.
+ for (String fN : keepFilenames) {
+ final PathMetadata pathMetadata = new PathMetadata(makeFileStatus(fN, 1));
+ pathMetadata.setLastUpdated(9002L);
+ ms.put(pathMetadata);
+ }
+
+ for (String fN : removeFilenames) {
+ final PathMetadata pathMetadata = new PathMetadata(makeFileStatus(fN, 1));
+ pathMetadata.setLastUpdated(9000L);
+ // tombstones are the deleted files!
+ pathMetadata.setIsDeleted(true);
+ ms.put(pathMetadata);
+ }
+
+ final String prunePath = getPathStringForPrune("/dir2");
+ ms.prune(MetadataStore.PruneMode.TOMBSTONES_BY_LASTUPDATED, cutoff,
+ prunePath);
+
+ if (!allowMissing()) {
+ for (String fN : keepFilenames) {
+ final PathMetadata pathMetadata = ms.get(strToPath(fN));
+ assertNotNull("Kept files should be in the metastore after prune",
+ pathMetadata);
+ }
+ }
+
+ for (String fN : removeFilenames) {
+ final PathMetadata pathMetadata = ms.get(strToPath(fN));
+ assertNull("Expired tombstones should be removed from metastore after "
+ + "the prune.", pathMetadata);
+ }
+ }
+
/*
* Helper functions.
*/
@@ -837,6 +924,16 @@ public abstract class MetadataStoreTestBase extends HadoopTestBase {
return paths;
}
+
+ /**
+ * The prune operation needs the path with the bucket name as a string in
+ * {@link DynamoDBMetadataStore}, but not for {@link LocalMetadataStore}.
+ * This is an implementation detail of the ms, so this should be
+ * implemented in the subclasses.
+ */
+ protected abstract String getPathStringForPrune(String path)
+ throws Exception;
+
private void commonTestPutListStatus(final String parent) throws IOException {
putListStatusFiles(parent, true, buildPathStrings(parent, "file1", "file2",
"file3"));
@@ -1012,4 +1109,8 @@ public abstract class MetadataStoreTestBase extends HadoopTestBase {
return System.currentTimeMillis();
}
+ protected static ITtlTimeProvider getTtlTimeProvider() {
+ return ttlTimeProvider;
+ }
+
}
diff --git a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/s3guard/TestLocalMetadataStore.java b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/s3guard/TestLocalMetadataStore.java
index d0156f1..ee7b584 100644
--- a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/s3guard/TestLocalMetadataStore.java
+++ b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/s3guard/TestLocalMetadataStore.java
@@ -75,6 +75,11 @@ public class TestLocalMetadataStore extends MetadataStoreTestBase {
return new LocalMSContract(conf);
}
+ @Override protected String getPathStringForPrune(String path)
+ throws Exception{
+ return path;
+ }
+
@Test
public void testClearByAncestor() throws Exception {
Cache<Path, LocalMetadataEntry> cache = CacheBuilder.newBuilder().build();
@@ -184,7 +189,7 @@ public class TestLocalMetadataStore extends MetadataStoreTestBase {
String prefixStr, String pathStr, int leftoverSize) throws IOException {
populateMap(cache, prefixStr);
LocalMetadataStore.deleteEntryByAncestor(new Path(prefixStr + pathStr),
- cache, true);
+ cache, true, getTtlTimeProvider());
assertEquals(String.format("Cache should have %d entries", leftoverSize),
leftoverSize, sizeOfMap(cache));
cache.invalidateAll();
diff --git a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/s3guard/TestNullMetadataStore.java b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/s3guard/TestNullMetadataStore.java
index c0541ea..2e0bc4b 100644
--- a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/s3guard/TestNullMetadataStore.java
+++ b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/s3guard/TestNullMetadataStore.java
@@ -46,6 +46,11 @@ public class TestNullMetadataStore extends MetadataStoreTestBase {
return true;
}
+ @Override protected String getPathStringForPrune(String path)
+ throws Exception {
+ return path;
+ }
+
@Override
public AbstractMSContract createContract() {
return new NullMSContract();
diff --git a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/s3guard/TestS3Guard.java b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/s3guard/TestS3Guard.java
index b246da2..bdb256c 100644
--- a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/s3guard/TestS3Guard.java
+++ b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/s3guard/TestS3Guard.java
@@ -18,18 +18,28 @@
package org.apache.hadoop.fs.s3a.s3guard;
+import java.util.ArrayList;
import java.util.Arrays;
+import java.util.Collection;
import java.util.List;
+import java.util.concurrent.TimeUnit;
import org.junit.Assert;
import org.junit.Test;
+import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.s3a.S3AFileStatus;
import org.apache.hadoop.fs.s3a.Tristate;
-import static org.apache.hadoop.fs.s3a.Constants.DEFAULT_METADATASTORE_AUTHORITATIVE_DIR_TTL;
+import static org.apache.hadoop.fs.s3a.Constants.DEFAULT_METADATASTORE_METADATA_TTL;
+import static org.apache.hadoop.fs.s3a.Constants.METADATASTORE_METADATA_TTL;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.spy;
+import static org.mockito.Mockito.times;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.when;
/**
* Tests for the {@link S3Guard} utility class.
@@ -58,8 +68,8 @@ public class TestS3Guard extends Assert {
makeFileStatus("s3a://bucket/dir/s3-file4", false)
);
- S3Guard.ITtlTimeProvider timeProvider = new S3Guard.TtlTimeProvider(
- DEFAULT_METADATASTORE_AUTHORITATIVE_DIR_TTL);
+ ITtlTimeProvider timeProvider = new S3Guard.TtlTimeProvider(
+ DEFAULT_METADATASTORE_METADATA_TTL);
FileStatus[] result = S3Guard.dirListingUnion(ms, dirPath, s3Listing,
dirMeta, false, timeProvider);
@@ -70,6 +80,185 @@ public class TestS3Guard extends Assert {
assertContainsPath(result, "s3a://bucket/dir/s3-file4");
}
+ @Test
+ public void testPutWithTtlDirListingMeta() throws Exception {
+ // arrange
+ DirListingMetadata dlm = new DirListingMetadata(new Path("/"), null,
+ false);
+ MetadataStore ms = spy(MetadataStore.class);
+ ITtlTimeProvider timeProvider =
+ mock(ITtlTimeProvider.class);
+ when(timeProvider.getNow()).thenReturn(100L);
+
+ // act
+ S3Guard.putWithTtl(ms, dlm, timeProvider);
+
+ // assert
+ assertEquals("last update in " + dlm, 100L, dlm.getLastUpdated());
+ verify(timeProvider, times(1)).getNow();
+ verify(ms, times(1)).put(dlm);
+ }
+
+ @Test
+ public void testPutWithTtlFileMeta() throws Exception {
+ // arrange
+ S3AFileStatus fileStatus = mock(S3AFileStatus.class);
+ when(fileStatus.getPath()).thenReturn(new Path("/"));
+ PathMetadata pm = new PathMetadata(fileStatus);
+ MetadataStore ms = spy(MetadataStore.class);
+ ITtlTimeProvider timeProvider =
+ mock(ITtlTimeProvider.class);
+ when(timeProvider.getNow()).thenReturn(100L);
+
+ // act
+ S3Guard.putWithTtl(ms, pm, timeProvider);
+
+ // assert
+ assertEquals("last update in " + pm, 100L, pm.getLastUpdated());
+ verify(timeProvider, times(1)).getNow();
+ verify(ms, times(1)).put(pm);
+ }
+
+ @Test
+ public void testPutWithTtlCollection() throws Exception {
+ // arrange
+ S3AFileStatus fileStatus = mock(S3AFileStatus.class);
+ when(fileStatus.getPath()).thenReturn(new Path("/"));
+ Collection<PathMetadata> pmCollection = new ArrayList<>();
+ for (int i = 0; i < 10; i++) {
+ pmCollection.add(new PathMetadata(fileStatus));
+ }
+ MetadataStore ms = spy(MetadataStore.class);
+ ITtlTimeProvider timeProvider =
+ mock(ITtlTimeProvider.class);
+ when(timeProvider.getNow()).thenReturn(100L);
+
+ // act
+ S3Guard.putWithTtl(ms, pmCollection, timeProvider);
+
+ // assert
+ pmCollection.forEach(
+ pm -> assertEquals(100L, pm.getLastUpdated())
+ );
+ verify(timeProvider, times(1)).getNow();
+ verify(ms, times(1)).put(pmCollection);
+ }
+
+ @Test
+ public void testGetWithTtlExpired() throws Exception {
+ // arrange
+ S3AFileStatus fileStatus = mock(S3AFileStatus.class);
+ Path path = new Path("/file");
+ when(fileStatus.getPath()).thenReturn(path);
+ PathMetadata pm = new PathMetadata(fileStatus);
+ pm.setLastUpdated(100L);
+
+ MetadataStore ms = mock(MetadataStore.class);
+ when(ms.get(path)).thenReturn(pm);
+
+ ITtlTimeProvider timeProvider =
+ mock(ITtlTimeProvider.class);
+ when(timeProvider.getNow()).thenReturn(101L);
+ when(timeProvider.getMetadataTtl()).thenReturn(1L);
+
+ // act
+ final PathMetadata pmExpired = S3Guard.getWithTtl(ms, path, timeProvider);
+
+ // assert
+ assertNull(pmExpired);
+ }
+
+ @Test
+ public void testGetWithTtlNotExpired() throws Exception {
+ // arrange
+ S3AFileStatus fileStatus = mock(S3AFileStatus.class);
+ Path path = new Path("/file");
+ when(fileStatus.getPath()).thenReturn(path);
+ PathMetadata pm = new PathMetadata(fileStatus);
+ pm.setLastUpdated(100L);
+
+ MetadataStore ms = mock(MetadataStore.class);
+ when(ms.get(path)).thenReturn(pm);
+
+ ITtlTimeProvider timeProvider =
+ mock(ITtlTimeProvider.class);
+ when(timeProvider.getNow()).thenReturn(101L);
+ when(timeProvider.getMetadataTtl()).thenReturn(2L);
+
+ // act
+ final PathMetadata pmNotExpired =
+ S3Guard.getWithTtl(ms, path, timeProvider);
+
+ // assert
+ assertNotNull(pmNotExpired);
+ }
+
+ @Test
+ public void testGetWithZeroLastUpdatedNotExpired() throws Exception {
+ // arrange
+ S3AFileStatus fileStatus = mock(S3AFileStatus.class);
+ Path path = new Path("/file");
+ when(fileStatus.getPath()).thenReturn(path);
+ PathMetadata pm = new PathMetadata(fileStatus);
+ // we set 0 this time as the last updated: can happen eg. when we use an
+ // old dynamo table
+ pm.setLastUpdated(0L);
+
+ MetadataStore ms = mock(MetadataStore.class);
+ when(ms.get(path)).thenReturn(pm);
+
+ ITtlTimeProvider timeProvider =
+ mock(ITtlTimeProvider.class);
+ when(timeProvider.getNow()).thenReturn(101L);
+ when(timeProvider.getMetadataTtl()).thenReturn(2L);
+
+ // act
+ final PathMetadata pmExpired = S3Guard.getWithTtl(ms, path, timeProvider);
+
+ // assert
+ assertNotNull(pmExpired);
+ }
+
+
+ /**
+ * Makes sure that all uses of TTL timeouts use a consistent time unit.
+ * @throws Throwable failure
+ */
+ @Test
+ public void testTTLConstruction() throws Throwable {
+ // first one
+ ITtlTimeProvider timeProviderExplicit = new S3Guard.TtlTimeProvider(
+ DEFAULT_METADATASTORE_METADATA_TTL);
+
+ // mirror the FS construction,
+ // from a config guaranteed to be empty (i.e. the code defval)
+ Configuration conf = new Configuration(false);
+ long millitime = conf.getTimeDuration(METADATASTORE_METADATA_TTL,
+ DEFAULT_METADATASTORE_METADATA_TTL, TimeUnit.MILLISECONDS);
+ assertEquals(15 * 60_000, millitime);
+ S3Guard.TtlTimeProvider fsConstruction = new S3Guard.TtlTimeProvider(
+ millitime);
+ assertEquals("explicit vs fs construction", timeProviderExplicit,
+ fsConstruction);
+ assertEquals("first and second constructor", timeProviderExplicit,
+ new S3Guard.TtlTimeProvider(conf));
+ // set the conf to a time without unit
+ conf.setLong(METADATASTORE_METADATA_TTL,
+ DEFAULT_METADATASTORE_METADATA_TTL);
+ assertEquals("first and second time set through long", timeProviderExplicit,
+ new S3Guard.TtlTimeProvider(conf));
+ double timeInSeconds = DEFAULT_METADATASTORE_METADATA_TTL / 1000;
+ double timeInMinutes = timeInSeconds / 60;
+ String timeStr = String.format("%dm", (int) timeInMinutes);
+ assertEquals(":wrong time in minutes from " + timeInMinutes,
+ "15m", timeStr);
+ conf.set(METADATASTORE_METADATA_TTL, timeStr);
+ assertEquals("Time in millis as string from "
+ + conf.get(METADATASTORE_METADATA_TTL),
+ timeProviderExplicit,
+ new S3Guard.TtlTimeProvider(conf));
+ }
+
void assertContainsPath(FileStatus[] statuses, String pathStr) {
assertTrue("listing doesn't contain " + pathStr,
containsPath(statuses, pathStr));
diff --git a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/scale/AbstractITestS3AMetadataStoreScale.java b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/scale/AbstractITestS3AMetadataStoreScale.java
index b843392..1bffc3b 100644
--- a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/scale/AbstractITestS3AMetadataStoreScale.java
+++ b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/scale/AbstractITestS3AMetadataStoreScale.java
@@ -18,11 +18,15 @@
package org.apache.hadoop.fs.s3a.scale;
+import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.s3a.S3AFileStatus;
+import org.apache.hadoop.fs.s3a.s3guard.ITtlTimeProvider;
import org.apache.hadoop.fs.s3a.s3guard.MetadataStore;
import org.apache.hadoop.fs.s3a.s3guard.PathMetadata;
+import org.apache.hadoop.fs.s3a.s3guard.S3Guard;
+import org.junit.Before;
import org.junit.FixMethodOrder;
import org.junit.Test;
import org.junit.runners.MethodSorters;
@@ -54,6 +58,12 @@ public abstract class AbstractITestS3AMetadataStoreScale extends
static final long ACCESS_TIME = System.currentTimeMillis();
static final Path BUCKET_ROOT = new Path("s3a://fake-bucket/");
+ private ITtlTimeProvider ttlTimeProvider;
+
+ @Before
+ public void initialize() {
+ ttlTimeProvider = new S3Guard.TtlTimeProvider(new Configuration());
+ }
/**
* Subclasses should override this to provide the MetadataStore they which
@@ -129,7 +139,7 @@ public abstract class AbstractITestS3AMetadataStoreScale extends
toDelete = movedPaths;
toCreate = origMetas;
}
- ms.move(toDelete, toCreate);
+ ms.move(toDelete, toCreate, ttlTimeProvider);
}
moveTimer.end();
printTiming(LOG, "move", moveTimer, operations);
@@ -194,7 +204,7 @@ public abstract class AbstractITestS3AMetadataStoreScale extends
throws IOException {
describe("Recursive deletion");
NanoTimer deleteTimer = new NanoTimer();
- ms.deleteSubtree(BUCKET_ROOT);
+ ms.deleteSubtree(BUCKET_ROOT, ttlTimeProvider);
deleteTimer.end();
printTiming(LOG, "delete", deleteTimer, count);
}
---------------------------------------------------------------------
To unsubscribe, e-mail: common-commits-unsubscribe@hadoop.apache.org
For additional commands, e-mail: common-commits-help@hadoop.apache.org