You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flume.apache.org by mp...@apache.org on 2020/02/27 07:35:03 UTC
[flume] branch trunk updated: FLUME-3352: Skip spooldir canary when
tracker dir is used (#314)
This is an automated email from the ASF dual-hosted git repository.
mpercy pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/flume.git
The following commit(s) were added to refs/heads/trunk by this push:
new 5c9bfe6 FLUME-3352: Skip spooldir canary when tracker dir is used (#314)
5c9bfe6 is described below
commit 5c9bfe6dc745d85f2459958b767e6e05550d8a81
Author: Yang.Tao <ha...@163.com>
AuthorDate: Thu Feb 27 15:34:54 2020 +0800
FLUME-3352: Skip spooldir canary when tracker dir is used (#314)
* Unnecessary canary test will block on readonly spooldir while another trackerdir is set.
* Add unit tests to ensure TrackingPolicy.RENAME / TrackingPolicy.TRACKER_DIR effective on readonly spooldir
---
.../avro/ReliableSpoolingFileEventReader.java | 28 ++++++++-------
.../avro/TestReliableSpoolingFileEventReader.java | 42 ++++++++++++++++++++++
2 files changed, 57 insertions(+), 13 deletions(-)
diff --git a/flume-ng-core/src/main/java/org/apache/flume/client/avro/ReliableSpoolingFileEventReader.java b/flume-ng-core/src/main/java/org/apache/flume/client/avro/ReliableSpoolingFileEventReader.java
index 1e1fbc0..dde084e 100644
--- a/flume-ng-core/src/main/java/org/apache/flume/client/avro/ReliableSpoolingFileEventReader.java
+++ b/flume-ng-core/src/main/java/org/apache/flume/client/avro/ReliableSpoolingFileEventReader.java
@@ -180,20 +180,22 @@ public class ReliableSpoolingFileEventReader implements ReliableEventReader {
Preconditions.checkState(spoolDirectory.isDirectory(),
"Path is not a directory: " + spoolDirectory.getAbsolutePath());
- // Do a canary test to make sure we have access to spooling directory
- try {
- File canary = File.createTempFile("flume-spooldir-perm-check-", ".canary",
- spoolDirectory);
- Files.write(canary.toPath(), "testing flume file permissions\n".getBytes());
- List<String> lines = Files.readAllLines(canary.toPath(), Charsets.UTF_8);
- Preconditions.checkState(!lines.isEmpty(), "Empty canary file %s", canary);
- if (!canary.delete()) {
- throw new IOException("Unable to delete canary file " + canary);
+ if(trackingPolicy.equalsIgnoreCase(TrackingPolicy.RENAME.name())) {
+ // Do a canary test to make sure we have access to spooling directory
+ try {
+ File canary = File.createTempFile("flume-spooldir-perm-check-", ".canary",
+ spoolDirectory);
+ Files.write(canary.toPath(), "testing flume file permissions\n".getBytes());
+ List<String> lines = Files.readAllLines(canary.toPath(), Charsets.UTF_8);
+ Preconditions.checkState(!lines.isEmpty(), "Empty canary file %s", canary);
+ if (!canary.delete()) {
+ throw new IOException("Unable to delete canary file " + canary);
+ }
+ logger.debug("Successfully created and deleted canary file: {}", canary);
+ } catch (IOException e) {
+ throw new FlumeException("Unable to read and modify files" +
+ " in the spooling directory: " + spoolDirectory, e);
}
- logger.debug("Successfully created and deleted canary file: {}", canary);
- } catch (IOException e) {
- throw new FlumeException("Unable to read and modify files" +
- " in the spooling directory: " + spoolDirectory, e);
}
this.spoolDirectory = spoolDirectory;
diff --git a/flume-ng-core/src/test/java/org/apache/flume/client/avro/TestReliableSpoolingFileEventReader.java b/flume-ng-core/src/test/java/org/apache/flume/client/avro/TestReliableSpoolingFileEventReader.java
index 6a5a69f..d3bfcf7 100644
--- a/flume-ng-core/src/test/java/org/apache/flume/client/avro/TestReliableSpoolingFileEventReader.java
+++ b/flume-ng-core/src/test/java/org/apache/flume/client/avro/TestReliableSpoolingFileEventReader.java
@@ -41,6 +41,7 @@ import org.apache.commons.io.FileUtils;
import org.apache.commons.lang.SystemUtils;
import org.apache.commons.lang.mutable.MutableLong;
import org.apache.flume.Event;
+import org.apache.flume.FlumeException;
import org.apache.flume.client.avro.ReliableSpoolingFileEventReader.DeletePolicy;
import org.apache.flume.instrumentation.SourceCounter;
import org.apache.flume.client.avro.ReliableSpoolingFileEventReader.TrackingPolicy;
@@ -92,9 +93,15 @@ public class TestReliableSpoolingFileEventReader {
@After
public void tearDown() {
+ setDirWritable(WORK_DIR);
deleteDir(WORK_DIR);
}
+ private void setDirWritable(File dir){
+ // make dir writable
+ dir.setWritable(true);
+ }
+
private void deleteDir(File dir) {
// delete all the files & dirs we created
try {
@@ -315,6 +322,41 @@ public class TestReliableSpoolingFileEventReader {
trackerFiles.size());
}
+ @Test(expected = FlumeException.class)
+ public void testRenameTrackingPolicyOnReadonlySpoolDirectory() throws IOException {
+ File workDir = WORK_DIR;
+ if(workDir.setReadOnly()){
+ new ReliableSpoolingFileEventReader.Builder().spoolDirectory(workDir)
+ .trackingPolicy(TrackingPolicy.RENAME.name())
+ .sourceCounter(new SourceCounter("test"))
+ .build();
+ } else {
+ // Operation on directory permission is not supported in current operating system.
+ throw new FlumeException("Operation on directory permission is not supported in current operating system.");
+ }
+ }
+
+ @Test()
+ public void testTrackerDirTrackingPolicyOnReadonlySpoolDirectory() throws IOException {
+ File workDir = WORK_DIR;
+ String trackerDirPath =
+ SpoolDirectorySourceConfigurationConstants.DEFAULT_TRACKER_DIR;
+ File trackerDir = new File(WORK_DIR, trackerDirPath);
+ if (!trackerDir.exists()) {
+ trackerDir.mkdir();
+ }
+ if(workDir.setReadOnly()){
+ new ReliableSpoolingFileEventReader.Builder().spoolDirectory(workDir)
+ .trackingPolicy(TrackingPolicy.TRACKER_DIR.name())
+ .trackerDirPath(trackerDirPath)
+ .sourceCounter(new SourceCounter("test"))
+ .build();
+ } else {
+ // Operation on directory permission is not supported in current operating system.
+ return;
+ }
+ }
+
@Test(expected = NullPointerException.class)
public void testNullConsumeOrder() throws IOException {
new ReliableSpoolingFileEventReader.Builder().spoolDirectory(WORK_DIR)