You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flume.apache.org by mp...@apache.org on 2020/02/27 07:35:03 UTC

[flume] branch trunk updated: FLUME-3352: Skip spooldir canary when tracker dir is used (#314)

This is an automated email from the ASF dual-hosted git repository.

mpercy pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/flume.git


The following commit(s) were added to refs/heads/trunk by this push:
     new 5c9bfe6  FLUME-3352: Skip spooldir canary when tracker dir is used (#314)
5c9bfe6 is described below

commit 5c9bfe6dc745d85f2459958b767e6e05550d8a81
Author: Yang.Tao <ha...@163.com>
AuthorDate: Thu Feb 27 15:34:54 2020 +0800

    FLUME-3352: Skip spooldir canary when tracker dir is used (#314)
    
    * Unnecessary canary test will block on readonly spooldir while another trackerdir is set.
    * Add unit tests to ensure TrackingPolicy.RENAME / TrackingPolicy.TRACKER_DIR effective on readonly spooldir
---
 .../avro/ReliableSpoolingFileEventReader.java      | 28 ++++++++-------
 .../avro/TestReliableSpoolingFileEventReader.java  | 42 ++++++++++++++++++++++
 2 files changed, 57 insertions(+), 13 deletions(-)

diff --git a/flume-ng-core/src/main/java/org/apache/flume/client/avro/ReliableSpoolingFileEventReader.java b/flume-ng-core/src/main/java/org/apache/flume/client/avro/ReliableSpoolingFileEventReader.java
index 1e1fbc0..dde084e 100644
--- a/flume-ng-core/src/main/java/org/apache/flume/client/avro/ReliableSpoolingFileEventReader.java
+++ b/flume-ng-core/src/main/java/org/apache/flume/client/avro/ReliableSpoolingFileEventReader.java
@@ -180,20 +180,22 @@ public class ReliableSpoolingFileEventReader implements ReliableEventReader {
     Preconditions.checkState(spoolDirectory.isDirectory(),
         "Path is not a directory: " + spoolDirectory.getAbsolutePath());
 
-    // Do a canary test to make sure we have access to spooling directory
-    try {
-      File canary = File.createTempFile("flume-spooldir-perm-check-", ".canary",
-          spoolDirectory);
-      Files.write(canary.toPath(), "testing flume file permissions\n".getBytes());
-      List<String> lines = Files.readAllLines(canary.toPath(), Charsets.UTF_8);
-      Preconditions.checkState(!lines.isEmpty(), "Empty canary file %s", canary);
-      if (!canary.delete()) {
-        throw new IOException("Unable to delete canary file " + canary);
+    if(trackingPolicy.equalsIgnoreCase(TrackingPolicy.RENAME.name())) {
+      // Do a canary test to make sure we have access to spooling directory
+      try {
+        File canary = File.createTempFile("flume-spooldir-perm-check-", ".canary",
+                spoolDirectory);
+        Files.write(canary.toPath(), "testing flume file permissions\n".getBytes());
+        List<String> lines = Files.readAllLines(canary.toPath(), Charsets.UTF_8);
+        Preconditions.checkState(!lines.isEmpty(), "Empty canary file %s", canary);
+        if (!canary.delete()) {
+          throw new IOException("Unable to delete canary file " + canary);
+        }
+        logger.debug("Successfully created and deleted canary file: {}", canary);
+      } catch (IOException e) {
+        throw new FlumeException("Unable to read and modify files" +
+                " in the spooling directory: " + spoolDirectory, e);
       }
-      logger.debug("Successfully created and deleted canary file: {}", canary);
-    } catch (IOException e) {
-      throw new FlumeException("Unable to read and modify files" +
-          " in the spooling directory: " + spoolDirectory, e);
     }
 
     this.spoolDirectory = spoolDirectory;
diff --git a/flume-ng-core/src/test/java/org/apache/flume/client/avro/TestReliableSpoolingFileEventReader.java b/flume-ng-core/src/test/java/org/apache/flume/client/avro/TestReliableSpoolingFileEventReader.java
index 6a5a69f..d3bfcf7 100644
--- a/flume-ng-core/src/test/java/org/apache/flume/client/avro/TestReliableSpoolingFileEventReader.java
+++ b/flume-ng-core/src/test/java/org/apache/flume/client/avro/TestReliableSpoolingFileEventReader.java
@@ -41,6 +41,7 @@ import org.apache.commons.io.FileUtils;
 import org.apache.commons.lang.SystemUtils;
 import org.apache.commons.lang.mutable.MutableLong;
 import org.apache.flume.Event;
+import org.apache.flume.FlumeException;
 import org.apache.flume.client.avro.ReliableSpoolingFileEventReader.DeletePolicy;
 import org.apache.flume.instrumentation.SourceCounter;
 import org.apache.flume.client.avro.ReliableSpoolingFileEventReader.TrackingPolicy;
@@ -92,9 +93,15 @@ public class TestReliableSpoolingFileEventReader {
 
   @After
   public void tearDown() {
+    setDirWritable(WORK_DIR);
     deleteDir(WORK_DIR);
   }
 
+  private void setDirWritable(File dir){
+    // make dir writable
+    dir.setWritable(true);
+  }
+
   private void deleteDir(File dir) {
     // delete all the files & dirs we created
     try {
@@ -315,6 +322,41 @@ public class TestReliableSpoolingFileEventReader {
         trackerFiles.size());
   }
 
+  @Test(expected = FlumeException.class)
+  public void testRenameTrackingPolicyOnReadonlySpoolDirectory() throws IOException {
+    File workDir = WORK_DIR;
+    if(workDir.setReadOnly()){
+      new ReliableSpoolingFileEventReader.Builder().spoolDirectory(workDir)
+              .trackingPolicy(TrackingPolicy.RENAME.name())
+              .sourceCounter(new SourceCounter("test"))
+              .build();
+    } else {
+      // Operation on directory permission is not supported in current operating system.
+      throw new FlumeException("Operation on directory permission is not supported in current operating system.");
+    }
+  }
+
+  @Test()
+  public void testTrackerDirTrackingPolicyOnReadonlySpoolDirectory() throws IOException {
+    File workDir = WORK_DIR;
+    String trackerDirPath =
+            SpoolDirectorySourceConfigurationConstants.DEFAULT_TRACKER_DIR;
+    File trackerDir = new File(WORK_DIR, trackerDirPath);
+    if (!trackerDir.exists()) {
+      trackerDir.mkdir();
+    }
+    if(workDir.setReadOnly()){
+      new ReliableSpoolingFileEventReader.Builder().spoolDirectory(workDir)
+              .trackingPolicy(TrackingPolicy.TRACKER_DIR.name())
+              .trackerDirPath(trackerDirPath)
+              .sourceCounter(new SourceCounter("test"))
+              .build();
+    } else {
+      // Operation on directory permission is not supported in current operating system.
+      return;
+    }
+  }
+
   @Test(expected = NullPointerException.class)
   public void testNullConsumeOrder() throws IOException {
     new ReliableSpoolingFileEventReader.Builder().spoolDirectory(WORK_DIR)