You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pinot.apache.org by jl...@apache.org on 2019/01/16 01:47:09 UTC
[incubator-pinot] branch master updated: Add touch method in
PinotFS; Call touch when moving deleted segments. (#3684)
This is an automated email from the ASF dual-hosted git repository.
jlli pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-pinot.git
The following commit(s) were added to refs/heads/master by this push:
new 3755e83 Add touch method in PinotFS; Call touch when moving deleted segments. (#3684)
3755e83 is described below
commit 3755e839a0c73e368f9e07a26f61f7e687864998
Author: Jialiang Li <jl...@linkedin.com>
AuthorDate: Tue Jan 15 17:47:04 2019 -0800
Add touch method in PinotFS; Call touch when moving deleted segments. (#3684)
---
.../org/apache/pinot/filesystem/AzurePinotFS.java | 11 +++++++++
.../helix/core/SegmentDeletionManager.java | 5 +++-
.../org/apache/pinot/filesystem/LocalPinotFS.java | 9 +++++++
.../java/org/apache/pinot/filesystem/PinotFS.java | 10 +++++++-
.../apache/pinot/filesystem/LocalPinotFSTest.java | 23 ++++++++++++++++++
.../pinot/filesystem/PinotFSFactoryTest.java | 5 ++++
.../org/apache/pinot/filesystem/HadoopPinotFS.java | 28 ++++++++++++++++------
7 files changed, 82 insertions(+), 9 deletions(-)
diff --git a/pinot-azure-filesystem/src/main/java/org/apache/pinot/filesystem/AzurePinotFS.java b/pinot-azure-filesystem/src/main/java/org/apache/pinot/filesystem/AzurePinotFS.java
index fd25bd0..9a5f9fa 100644
--- a/pinot-azure-filesystem/src/main/java/org/apache/pinot/filesystem/AzurePinotFS.java
+++ b/pinot-azure-filesystem/src/main/java/org/apache/pinot/filesystem/AzurePinotFS.java
@@ -36,6 +36,7 @@ import java.nio.file.Files;
import java.nio.file.Path;
import java.nio.file.Paths;
import java.util.ArrayList;
+import java.util.Date;
import java.util.List;
import org.apache.commons.configuration.Configuration;
import org.apache.commons.io.FileUtils;
@@ -234,4 +235,14 @@ public class AzurePinotFS extends PinotFS {
throw new RuntimeException(e);
}
}
+
+ @Override
+ public boolean touch(URI uri) throws IOException {
+ if (!exists(uri)) {
+ _adlStoreClient.createEmptyFile(uri.getPath());
+ } else {
+ _adlStoreClient.setTimes(uri.getPath(), null, new Date());
+ }
+ return true;
+ }
}
diff --git a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/SegmentDeletionManager.java b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/SegmentDeletionManager.java
index f696da2..a01341b 100644
--- a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/SegmentDeletionManager.java
+++ b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/SegmentDeletionManager.java
@@ -185,6 +185,9 @@ public class SegmentDeletionManager {
if (pinotFS.exists(fileToMoveURI)) {
// Overwrites the file if it already exists in the target directory.
pinotFS.move(fileToMoveURI, deletedSegmentDestURI, true);
+ // Updates last modified.
+ // Touch is needed here so that removeAgedDeletedSegments() works correctly.
+ pinotFS.touch(deletedSegmentDestURI);
LOGGER.info("Moved segment {} from {} to {}", segmentId, fileToMoveURI.toString(), deletedSegmentDestURI.toString());
} else {
if (!SegmentName.isHighLevelConsumerSegmentName(segmentId)) {
@@ -248,7 +251,7 @@ public class SegmentDeletionManager {
}
}
} catch (IOException e) {
- LOGGER.error("Had trouble deleting directories", deletedDirURI.toString());
+ LOGGER.error("Had trouble deleting directories: {}", deletedDirURI.toString(), e.toString());
}
} else {
LOGGER.info("dataDir is not configured, won't delete any expired segments from deleted directory.");
diff --git a/pinot-filesystem/src/main/java/org/apache/pinot/filesystem/LocalPinotFS.java b/pinot-filesystem/src/main/java/org/apache/pinot/filesystem/LocalPinotFS.java
index 987125f..e060d88 100644
--- a/pinot-filesystem/src/main/java/org/apache/pinot/filesystem/LocalPinotFS.java
+++ b/pinot-filesystem/src/main/java/org/apache/pinot/filesystem/LocalPinotFS.java
@@ -161,6 +161,15 @@ public class LocalPinotFS extends PinotFS {
return file.lastModified();
}
+ @Override
+ public boolean touch(URI uri) throws IOException {
+ File file = new File(decodeURI(uri.getRawPath()));
+ if (!exists(uri)) {
+ return file.createNewFile();
+ }
+ return file.setLastModified(System.currentTimeMillis());
+ }
+
private String encodeURI(String uri) {
String encodedStr;
try {
diff --git a/pinot-filesystem/src/main/java/org/apache/pinot/filesystem/PinotFS.java b/pinot-filesystem/src/main/java/org/apache/pinot/filesystem/PinotFS.java
index abcf494..2a13c77 100644
--- a/pinot-filesystem/src/main/java/org/apache/pinot/filesystem/PinotFS.java
+++ b/pinot-filesystem/src/main/java/org/apache/pinot/filesystem/PinotFS.java
@@ -137,7 +137,7 @@ public abstract class PinotFS implements Closeable {
/**
* Returns the age of the file
- * @param uri
+ * @param uri location of file or directory
* @return A long value representing the time the file was last modified, measured in milliseconds since epoch
* (00:00:00 GMT, January 1, 1970) or 0L if the file does not exist or if an I/O error occurs
* @throws Exception if uri is not valid or present
@@ -145,6 +145,14 @@ public abstract class PinotFS implements Closeable {
public abstract long lastModified(URI uri);
/**
+ * Updates the last modified time of an existing file or directory to be current time. If the file system object
+ * does not exist, creates an empty file.
+ * @param uri location of file or directory
+ * @throws IOException if the parent directory doesn't exist.
+ */
+ public abstract boolean touch(URI uri) throws IOException;
+
+ /**
* For certain filesystems, we may need to close the filesystem and do relevant operations to prevent leaks.
* By default, this method does nothing.
* @throws IOException
diff --git a/pinot-filesystem/src/test/java/org/apache/pinot/filesystem/LocalPinotFSTest.java b/pinot-filesystem/src/test/java/org/apache/pinot/filesystem/LocalPinotFSTest.java
index 6bdf32c..8b26b62 100644
--- a/pinot-filesystem/src/test/java/org/apache/pinot/filesystem/LocalPinotFSTest.java
+++ b/pinot-filesystem/src/test/java/org/apache/pinot/filesystem/LocalPinotFSTest.java
@@ -170,6 +170,29 @@ public class LocalPinotFSTest {
localPinotFS.mkdir(firstTempDir.toURI());
Assert.assertTrue(firstTempDir.exists(), "Could not make directory " + firstTempDir.getPath());
+ // Check that touching a file works
+ File nonExistingFile = new File(_absoluteTmpDirPath, "nonExistingFile");
+ Assert.assertFalse(nonExistingFile.exists());
+ localPinotFS.touch(nonExistingFile.toURI());
+ Assert.assertTrue(nonExistingFile.exists());
+ long currentTime = System.currentTimeMillis();
+ Assert.assertTrue(localPinotFS.lastModified(nonExistingFile.toURI()) < currentTime);
+ Thread.sleep(1000L);
+ // update last modified.
+ localPinotFS.touch(nonExistingFile.toURI());
+ Assert.assertTrue(localPinotFS.lastModified(nonExistingFile.toURI()) > currentTime);
+ FileUtils.deleteQuietly(nonExistingFile);
+
+ // Check that touch an file in a directory that doesn't exist should throw an exception.
+ File nonExistingFileUnderNonExistingDir = new File(_absoluteTmpDirPath, "nonExistingDir/nonExistingFile");
+ Assert.assertFalse(nonExistingFileUnderNonExistingDir.exists());
+ try {
+ localPinotFS.touch(nonExistingFileUnderNonExistingDir.toURI());
+ Assert.fail("Touch method should throw an IOException");
+ } catch (IOException e) {
+ // Expected.
+ }
+
// Check that directory only copy worked
localPinotFS.copy(firstTempDir.toURI(), secondTempDir.toURI());
Assert.assertTrue(localPinotFS.exists(secondTempDir.toURI()));
diff --git a/pinot-filesystem/src/test/java/org/apache/pinot/filesystem/PinotFSFactoryTest.java b/pinot-filesystem/src/test/java/org/apache/pinot/filesystem/PinotFSFactoryTest.java
index 131c4b6..a0b6f16 100644
--- a/pinot-filesystem/src/test/java/org/apache/pinot/filesystem/PinotFSFactoryTest.java
+++ b/pinot-filesystem/src/test/java/org/apache/pinot/filesystem/PinotFSFactoryTest.java
@@ -124,5 +124,10 @@ public class PinotFSFactoryTest {
public long lastModified(URI uri) {
return 0L;
}
+
+ @Override
+ public boolean touch(URI uri) throws IOException {
+ return true;
+ }
}
}
diff --git a/pinot-hadoop-filesystem/src/main/java/org/apache/pinot/filesystem/HadoopPinotFS.java b/pinot-hadoop-filesystem/src/main/java/org/apache/pinot/filesystem/HadoopPinotFS.java
index 97aae85..9a88114 100644
--- a/pinot-hadoop-filesystem/src/main/java/org/apache/pinot/filesystem/HadoopPinotFS.java
+++ b/pinot-hadoop-filesystem/src/main/java/org/apache/pinot/filesystem/HadoopPinotFS.java
@@ -26,6 +26,7 @@ import java.util.ArrayList;
import org.apache.commons.configuration.Configuration;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileUtil;
+import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.LocatedFileStatus;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.RemoteIterator;
@@ -80,9 +81,7 @@ public class HadoopPinotFS extends PinotFS {
@Override
public boolean delete(URI segmentUri, boolean forceDelete) throws IOException {
// Returns false if we are moving a directory and that directory is not empty
- if (isDirectory(segmentUri)
- && listFiles(segmentUri, false).length > 0
- && !forceDelete) {
+ if (isDirectory(segmentUri) && listFiles(segmentUri, false).length > 0 && !forceDelete) {
return false;
}
return _hadoopFS.delete(new Path(segmentUri), true);
@@ -107,7 +106,8 @@ public class HadoopPinotFS extends PinotFS {
RemoteIterator<LocatedFileStatus> sourceFiles = _hadoopFS.listFiles(source, true);
if (sourceFiles != null) {
while (sourceFiles.hasNext()) {
- boolean succeeded = FileUtil.copy(_hadoopFS, sourceFiles.next().getPath(), _hadoopFS, target, true, _hadoopConf);
+ boolean succeeded =
+ FileUtil.copy(_hadoopFS, sourceFiles.next().getPath(), _hadoopFS, target, true, _hadoopConf);
if (!succeeded) {
return false;
}
@@ -196,15 +196,29 @@ public class HadoopPinotFS extends PinotFS {
}
}
- private void authenticate(org.apache.hadoop.conf.Configuration hadoopConf, org.apache.commons.configuration.Configuration configs) {
+ @Override
+ public boolean touch(URI uri) throws IOException {
+ Path path = new Path(uri);
+ if (!exists(uri)) {
+ FSDataOutputStream fos = _hadoopFS.create(path);
+ fos.close();
+ } else {
+ _hadoopFS.setTimes(path, System.currentTimeMillis(), -1);
+ }
+ return true;
+ }
+
+ private void authenticate(org.apache.hadoop.conf.Configuration hadoopConf,
+ org.apache.commons.configuration.Configuration configs) {
String principal = configs.getString(PRINCIPAL);
String keytab = configs.getString(KEYTAB);
if (!Strings.isNullOrEmpty(principal) && !Strings.isNullOrEmpty(keytab)) {
UserGroupInformation.setConfiguration(hadoopConf);
if (UserGroupInformation.isSecurityEnabled()) {
try {
- if (!UserGroupInformation.getCurrentUser().hasKerberosCredentials()
- || !UserGroupInformation.getCurrentUser().getUserName().equals(principal)) {
+ if (!UserGroupInformation.getCurrentUser().hasKerberosCredentials() || !UserGroupInformation.getCurrentUser()
+ .getUserName()
+ .equals(principal)) {
LOGGER.info("Trying to authenticate user [%s] with keytab [%s]..", principal, keytab);
UserGroupInformation.loginUserFromKeytab(principal, keytab);
}
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org