You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flume.apache.org by mp...@apache.org on 2017/02/01 17:41:56 UTC

flume git commit: FLUME-3049. Make HDFS sink rotate more reliably in secure mode

Repository: flume
Updated Branches:
  refs/heads/flume-3049-3 [created] f215374bd


FLUME-3049. Make HDFS sink rotate more reliably in secure mode

It was reported that the HDFS sink had a bug where file rotation was not
reliable in secure mode.

After investigating, it turns out that this was caused by a bug in the
FlumeAuthenticator code: A "try" block in UGIExecutor.execute() was
wrapping exceptions (such as IOException) with a SecurityException.

That exception wrapping was breaking the contract of BucketWriter
because the caller (HDFSEventSink) did not understand how to react to
the SecurityException. This also likely had other negative effects in
exceptional cases.

The following changes are included in this patch:

* Remove the exception wrapping in UGIExecutor.execute().
* Add tests for exception propagation in FlumeAuthenticator
  implementations.
* Add testRotateBucketOnIOException() to TestBucketWriter as a
  regression test for the HDFS sink issue.

Closes #106.

Reviewers: Attila Simon, Mike Percy

(Denes Arvay via Mike Percy)


Project: http://git-wip-us.apache.org/repos/asf/flume/repo
Commit: http://git-wip-us.apache.org/repos/asf/flume/commit/f215374b
Tree: http://git-wip-us.apache.org/repos/asf/flume/tree/f215374b
Diff: http://git-wip-us.apache.org/repos/asf/flume/diff/f215374b

Branch: refs/heads/flume-3049-3
Commit: f215374bdf9a08b16fa7ccd3b40098024afe8677
Parents: 18453d3
Author: Denes Arvay <de...@cloudera.com>
Authored: Fri Jan 27 11:43:27 2017 +0100
Committer: Mike Percy <mp...@apache.org>
Committed: Wed Feb 1 09:34:44 2017 -0800

----------------------------------------------------------------------
 .../java/org/apache/flume/auth/UGIExecutor.java |  9 +---
 .../flume/auth/TestFlumeAuthenticator.java      | 50 ++++++++++++++++++--
 .../flume/sink/hdfs/TestBucketWriter.java       | 49 +++++++++++++++++++
 3 files changed, 95 insertions(+), 13 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flume/blob/f215374b/flume-ng-auth/src/main/java/org/apache/flume/auth/UGIExecutor.java
----------------------------------------------------------------------
diff --git a/flume-ng-auth/src/main/java/org/apache/flume/auth/UGIExecutor.java b/flume-ng-auth/src/main/java/org/apache/flume/auth/UGIExecutor.java
index a6ebd86..41308b4 100644
--- a/flume-ng-auth/src/main/java/org/apache/flume/auth/UGIExecutor.java
+++ b/flume-ng-auth/src/main/java/org/apache/flume/auth/UGIExecutor.java
@@ -43,14 +43,7 @@ class UGIExecutor implements PrivilegedExecutor {
   @Override
   public <T> T execute(PrivilegedExceptionAction<T> action) throws Exception {
     ensureValidAuth();
-    try {
-      return ugi.doAs(action);
-    } catch (IOException ex) {
-      throw new SecurityException("Privileged action failed", ex);
-    } catch (InterruptedException ex) {
-      Thread.interrupted();
-      throw new SecurityException(ex);
-    }
+    return ugi.doAs(action);
   }
 
   private void ensureValidAuth() {

http://git-wip-us.apache.org/repos/asf/flume/blob/f215374b/flume-ng-auth/src/test/java/org/apache/flume/auth/TestFlumeAuthenticator.java
----------------------------------------------------------------------
diff --git a/flume-ng-auth/src/test/java/org/apache/flume/auth/TestFlumeAuthenticator.java b/flume-ng-auth/src/test/java/org/apache/flume/auth/TestFlumeAuthenticator.java
index 0dc8872..be84646 100644
--- a/flume-ng-auth/src/test/java/org/apache/flume/auth/TestFlumeAuthenticator.java
+++ b/flume-ng-auth/src/test/java/org/apache/flume/auth/TestFlumeAuthenticator.java
@@ -18,12 +18,14 @@
 package org.apache.flume.auth;
 
 import org.apache.hadoop.minikdc.MiniKdc;
+import org.junit.After;
 import org.junit.AfterClass;
 import org.junit.BeforeClass;
 import org.junit.Test;
 
 import java.io.File;
 import java.io.IOException;
+import java.security.PrivilegedExceptionAction;
 import java.util.Properties;
 
 import static org.junit.Assert.assertEquals;
@@ -66,6 +68,12 @@ public class TestFlumeAuthenticator {
     }
   }
 
+  @After
+  public void tearDown() {
+    // Clear the previous statically stored logged in credentials
+    FlumeAuthenticationUtil.clearCredentials();
+  }
+
   @Test
   public void testNullLogin() throws IOException {
     String principal = null;
@@ -82,8 +90,6 @@ public class TestFlumeAuthenticator {
     String keytab = flumeKeytab.getAbsolutePath();
     String expResult = principal;
 
-    // Clear the previous statically stored logged in credentials
-    FlumeAuthenticationUtil.clearCredentials();
     FlumeAuthenticator authenticator = FlumeAuthenticationUtil.getAuthenticator(
             principal, keytab);
     assertTrue(authenticator.isAuthenticated());
@@ -110,6 +116,43 @@ public class TestFlumeAuthenticator {
     }
   }
 
+  /**
+   * Test whether the exception raised in the <code>PrivilegedExceptionAction</code> gets
+   * propagated as-is from {@link KerberosAuthenticator#execute(PrivilegedExceptionAction)}.
+   */
+  @Test(expected = IOException.class)
+  public void testKerberosAuthenticatorExceptionInExecute() throws Exception {
+    String principal = flumePrincipal;
+    String keytab = flumeKeytab.getAbsolutePath();
+
+    FlumeAuthenticator authenticator = FlumeAuthenticationUtil.getAuthenticator(principal, keytab);
+    assertTrue(authenticator instanceof KerberosAuthenticator);
+
+    authenticator.execute(new PrivilegedExceptionAction<Object>() {
+      @Override
+      public Object run() throws Exception {
+        throw new IOException();
+      }
+    });
+  }
+
+  /**
+   * Test whether the exception raised in the <code>PrivilegedExceptionAction</code> gets
+   * propagated as-is from {@link SimpleAuthenticator#execute(PrivilegedExceptionAction)}.
+   */
+  @Test(expected = IOException.class)
+  public void testSimpleAuthenticatorExceptionInExecute() throws Exception {
+    FlumeAuthenticator authenticator = FlumeAuthenticationUtil.getAuthenticator(null, null);
+    assertTrue(authenticator instanceof SimpleAuthenticator);
+
+    authenticator.execute(new PrivilegedExceptionAction<Object>() {
+      @Override
+      public Object run() throws Exception {
+        throw new IOException();
+      }
+    });
+  }
+
   @Test
   public void testProxyAs() throws IOException {
     String username = "alice";
@@ -138,9 +181,6 @@ public class TestFlumeAuthenticator {
     kdc.createPrincipal(keytab, principal);
     String expResult = principal + "@" + kdc.getRealm();
 
-    // Clear the previous statically stored logged in credentials
-    FlumeAuthenticationUtil.clearCredentials();
-
     FlumeAuthenticator authenticator = FlumeAuthenticationUtil.getAuthenticator(
             principal, keytab.getAbsolutePath());
     assertTrue(authenticator.isAuthenticated());

http://git-wip-us.apache.org/repos/asf/flume/blob/f215374b/flume-ng-sinks/flume-hdfs-sink/src/test/java/org/apache/flume/sink/hdfs/TestBucketWriter.java
----------------------------------------------------------------------
diff --git a/flume-ng-sinks/flume-hdfs-sink/src/test/java/org/apache/flume/sink/hdfs/TestBucketWriter.java b/flume-ng-sinks/flume-hdfs-sink/src/test/java/org/apache/flume/sink/hdfs/TestBucketWriter.java
index 78241a1..4221a5d 100644
--- a/flume-ng-sinks/flume-hdfs-sink/src/test/java/org/apache/flume/sink/hdfs/TestBucketWriter.java
+++ b/flume-ng-sinks/flume-hdfs-sink/src/test/java/org/apache/flume/sink/hdfs/TestBucketWriter.java
@@ -36,6 +36,7 @@ import org.junit.AfterClass;
 import org.junit.Assert;
 import org.junit.BeforeClass;
 import org.junit.Test;
+import org.mockito.Mockito;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -448,4 +449,52 @@ public class TestBucketWriter {
                       "but got " + bucketWriter.renameTries.get(),
                       bucketWriter.renameTries.get() == numberOfRetriesRequired);
   }
+
+  // Test that we don't swallow IOExceptions in secure mode. We should close the bucket writer
+  // and rethrow the exception. Regression test for FLUME-3049.
+  @Test
+  public void testRotateBucketOnIOException() throws IOException, InterruptedException {
+    MockHDFSWriter hdfsWriter = Mockito.spy(new MockHDFSWriter());
+    PrivilegedExecutor ugiProxy =
+        FlumeAuthenticationUtil.getAuthenticator(null, null).proxyAs("alice");
+
+    final int ROLL_COUNT = 1; // Cause a roll after every successful append().
+    BucketWriter bucketWriter = new BucketWriter(
+        0, 0, ROLL_COUNT, 0, ctx, "/tmp", "file", "", ".tmp", null, null,
+        SequenceFile.CompressionType.NONE, hdfsWriter, timedRollerPool, ugiProxy,
+        new SinkCounter("test-bucket-writer-" + System.currentTimeMillis()), 0, null, null, 30000,
+        Executors.newSingleThreadExecutor(), 0, 0);
+
+    Event e = EventBuilder.withBody("foo", Charsets.UTF_8);
+
+    // Write one event successfully.
+    bucketWriter.append(e);
+
+    // Fail the next write.
+    IOException expectedIOException = new IOException("Test injected IOException");
+    Mockito.doThrow(expectedIOException).when(hdfsWriter)
+        .append(Mockito.any(Event.class));
+
+    // The second time we try to write we should get an IOException.
+    try {
+      bucketWriter.append(e);
+      Assert.fail("Expected IOException wasn't thrown during append");
+    } catch (IOException ex) {
+      Assert.assertEquals(expectedIOException, ex);
+      logger.info("Caught expected IOException", ex);
+    }
+
+    // The third time we try to write we should get a BucketClosedException, because the
+    // BucketWriter should attempt to close itself before rethrowing the IOException on the first
+    // call.
+    try {
+      bucketWriter.append(e);
+      Assert.fail("BucketWriter should be already closed, BucketClosedException expected");
+    } catch (BucketClosedException ex) {
+      logger.info("Caught expected BucketClosedException", ex);
+    }
+
+    Assert.assertEquals("events written", 1, hdfsWriter.getEventsWritten());
+    Assert.assertEquals("2 files should be closed", 2, hdfsWriter.getFilesClosed());
+  }
 }