You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kudu.apache.org by mp...@apache.org on 2018/09/28 18:44:01 UTC

kudu git commit: KUDU-2012 Kudu Flume sink auth support

Repository: kudu
Updated Branches:
  refs/heads/master ad620415a -> 7db4d82f4


KUDU-2012 Kudu Flume sink auth support

Adding FlumeAuthenticator to KuduSink and creating KuduClient
inside a PrivilegedExecutor action.

Added an extra step to the mini cluster to create
a keyTab for the client used for testing.

Added automated test with short KDC ticket lifetime
to test reacquiring.

Manual testing was done on a secure cluster as well.

Change-Id: I11b5f08802883afa178d346af48d3bcd15281917
Reviewed-on: http://gerrit.cloudera.org:8080/11334
Tested-by: Kudu Jenkins
Reviewed-by: Mike Percy <mp...@apache.org>


Project: http://git-wip-us.apache.org/repos/asf/kudu/repo
Commit: http://git-wip-us.apache.org/repos/asf/kudu/commit/7db4d82f
Tree: http://git-wip-us.apache.org/repos/asf/kudu/tree/7db4d82f
Diff: http://git-wip-us.apache.org/repos/asf/kudu/diff/7db4d82f

Branch: refs/heads/master
Commit: 7db4d82f4bcb98aba05853249fd4c84677c57227
Parents: ad62041
Author: Ferenc Szabó <sz...@apache.org>
Authored: Mon Aug 27 07:16:52 2018 +0200
Committer: Mike Percy <mp...@apache.org>
Committed: Fri Sep 28 18:43:34 2018 +0000

----------------------------------------------------------------------
 .../org/apache/kudu/client/BaseKuduTest.java    |   7 ++
 .../org/apache/kudu/client/MiniKuduCluster.java |   7 ++
 .../org/apache/kudu/flume/sink/KuduSink.java    |  25 +++-
 .../sink/KuduSinkConfigurationConstants.java    |  15 +++
 .../sink/AvroKuduOperationsProducerTest.java    |  63 +++-------
 .../sink/KeyedKuduOperationsProducerTest.java   |  98 ++++------------
 .../apache/kudu/flume/sink/KuduSinkTest.java    |  64 ++---------
 .../kudu/flume/sink/KuduSinkTestUtil.java       |  85 ++++++++++++++
 .../sink/RegexpKuduOperationsProducerTest.java  | 113 +++++++-----------
 .../kudu/flume/sink/SecureKuduSinkTest.java     | 115 +++++++++++++++++++
 src/kudu/mini-cluster/external_mini_cluster.cc  |   3 +
 src/kudu/security/test/mini_kdc.cc              |  13 +++
 src/kudu/security/test/mini_kdc.h               |   4 +
 13 files changed, 365 insertions(+), 247 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kudu/blob/7db4d82f/java/kudu-client/src/test/java/org/apache/kudu/client/BaseKuduTest.java
----------------------------------------------------------------------
diff --git a/java/kudu-client/src/test/java/org/apache/kudu/client/BaseKuduTest.java b/java/kudu-client/src/test/java/org/apache/kudu/client/BaseKuduTest.java
index 959b92c..c410cf7 100644
--- a/java/kudu-client/src/test/java/org/apache/kudu/client/BaseKuduTest.java
+++ b/java/kudu-client/src/test/java/org/apache/kudu/client/BaseKuduTest.java
@@ -300,6 +300,13 @@ public class BaseKuduTest {
   }
 
   /**
+   * @return path to the mini cluster root directory
+   */
+  protected String getClusterRoot() {
+    return miniCluster.getClusterRoot();
+  }
+
+  /**
    * Kills all the master servers.
    * Does nothing to the servers that are already dead.
    *

http://git-wip-us.apache.org/repos/asf/kudu/blob/7db4d82f/java/kudu-client/src/test/java/org/apache/kudu/client/MiniKuduCluster.java
----------------------------------------------------------------------
diff --git a/java/kudu-client/src/test/java/org/apache/kudu/client/MiniKuduCluster.java b/java/kudu-client/src/test/java/org/apache/kudu/client/MiniKuduCluster.java
index a372dfc..dfa5ea7 100644
--- a/java/kudu-client/src/test/java/org/apache/kudu/client/MiniKuduCluster.java
+++ b/java/kudu-client/src/test/java/org/apache/kudu/client/MiniKuduCluster.java
@@ -506,6 +506,13 @@ public class MiniKuduCluster implements AutoCloseable {
   }
 
   /**
+   * @return path to the mini cluster root directory
+   */
+  public String getClusterRoot() {
+    return clusterRoot;
+  }
+
+  /**
    * Helper runnable that receives stderr and logs it along with the process' identifier.
    */
   public static class ProcessInputStreamLogPrinterRunnable implements Runnable {

http://git-wip-us.apache.org/repos/asf/kudu/blob/7db4d82f/java/kudu-flume-sink/src/main/java/org/apache/kudu/flume/sink/KuduSink.java
----------------------------------------------------------------------
diff --git a/java/kudu-flume-sink/src/main/java/org/apache/kudu/flume/sink/KuduSink.java b/java/kudu-flume-sink/src/main/java/org/apache/kudu/flume/sink/KuduSink.java
index 105bc1e..f63f941 100644
--- a/java/kudu-flume-sink/src/main/java/org/apache/kudu/flume/sink/KuduSink.java
+++ b/java/kudu-flume-sink/src/main/java/org/apache/kudu/flume/sink/KuduSink.java
@@ -21,11 +21,15 @@ package org.apache.kudu.flume.sink;
 
 import static org.apache.kudu.flume.sink.KuduSinkConfigurationConstants.BATCH_SIZE;
 import static org.apache.kudu.flume.sink.KuduSinkConfigurationConstants.IGNORE_DUPLICATE_ROWS;
+import static org.apache.kudu.flume.sink.KuduSinkConfigurationConstants.KERBEROS_KEYTAB;
+import static org.apache.kudu.flume.sink.KuduSinkConfigurationConstants.KERBEROS_PRINCIPAL;
 import static org.apache.kudu.flume.sink.KuduSinkConfigurationConstants.MASTER_ADDRESSES;
 import static org.apache.kudu.flume.sink.KuduSinkConfigurationConstants.PRODUCER;
+import static org.apache.kudu.flume.sink.KuduSinkConfigurationConstants.PROXY_USER;
 import static org.apache.kudu.flume.sink.KuduSinkConfigurationConstants.TABLE_NAME;
 import static org.apache.kudu.flume.sink.KuduSinkConfigurationConstants.TIMEOUT_MILLIS;
 
+import java.security.PrivilegedAction;
 import java.util.List;
 
 import com.google.common.base.Preconditions;
@@ -35,6 +39,8 @@ import org.apache.flume.Event;
 import org.apache.flume.EventDeliveryException;
 import org.apache.flume.FlumeException;
 import org.apache.flume.Transaction;
+import org.apache.flume.auth.FlumeAuthenticationUtil;
+import org.apache.flume.auth.PrivilegedExecutor;
 import org.apache.flume.conf.Configurable;
 import org.apache.flume.instrumentation.SinkCounter;
 import org.apache.flume.sink.AbstractSink;
@@ -109,6 +115,7 @@ public class KuduSink extends AbstractSink implements Configurable {
   private KuduClient client;
   private KuduOperationsProducer operationsProducer;
   private SinkCounter sinkCounter;
+  private PrivilegedExecutor privilegedExecutor;
 
   public KuduSink() {
     this(null);
@@ -127,7 +134,15 @@ public class KuduSink extends AbstractSink implements Configurable {
 
     // Client is not null only inside tests.
     if (client == null) {
-      client = new KuduClient.KuduClientBuilder(masterAddresses).build();
+      // Creating client with FlumeAuthenticator.
+      client = privilegedExecutor.execute(
+        new PrivilegedAction<KuduClient>() {
+          @Override
+          public KuduClient run() {
+            return new KuduClient.KuduClientBuilder(masterAddresses).build();
+          }
+        }
+      );
     }
     session = client.newSession();
     session.setFlushMode(SessionConfiguration.FlushMode.MANUAL_FLUSH);
@@ -194,6 +209,12 @@ public class KuduSink extends AbstractSink implements Configurable {
     timeoutMillis = context.getLong(TIMEOUT_MILLIS, DEFAULT_TIMEOUT_MILLIS);
     ignoreDuplicateRows = context.getBoolean(IGNORE_DUPLICATE_ROWS, DEFAULT_IGNORE_DUPLICATE_ROWS);
     String operationProducerType = context.getString(PRODUCER);
+    String kerberosPrincipal = context.getString(KERBEROS_PRINCIPAL);
+    String kerberosKeytab = context.getString(KERBEROS_KEYTAB);
+    String proxyUser = context.getString(PROXY_USER);
+
+    privilegedExecutor = FlumeAuthenticationUtil.getAuthenticator(
+        kerberosPrincipal, kerberosKeytab).proxyAs(proxyUser);
 
     // Check for operations producer, if null set default operations producer type.
     if (operationProducerType == null || operationProducerType.isEmpty()) {
@@ -203,7 +224,7 @@ public class KuduSink extends AbstractSink implements Configurable {
 
     Context producerContext = new Context();
     producerContext.putAll(context.getSubProperties(
-            KuduSinkConfigurationConstants.PRODUCER_PREFIX));
+        KuduSinkConfigurationConstants.PRODUCER_PREFIX));
 
     try {
       Class<? extends KuduOperationsProducer> clazz =

http://git-wip-us.apache.org/repos/asf/kudu/blob/7db4d82f/java/kudu-flume-sink/src/main/java/org/apache/kudu/flume/sink/KuduSinkConfigurationConstants.java
----------------------------------------------------------------------
diff --git a/java/kudu-flume-sink/src/main/java/org/apache/kudu/flume/sink/KuduSinkConfigurationConstants.java b/java/kudu-flume-sink/src/main/java/org/apache/kudu/flume/sink/KuduSinkConfigurationConstants.java
index d3b4fb6..dbb2f66 100644
--- a/java/kudu-flume-sink/src/main/java/org/apache/kudu/flume/sink/KuduSinkConfigurationConstants.java
+++ b/java/kudu-flume-sink/src/main/java/org/apache/kudu/flume/sink/KuduSinkConfigurationConstants.java
@@ -64,4 +64,19 @@ public class KuduSinkConfigurationConstants {
    * Whether to ignore duplicate primary key errors caused by inserts.
    */
   public static final String IGNORE_DUPLICATE_ROWS = "ignoreDuplicateRows";
+
+  /**
+   * Path to the keytab file used for authentication
+   */
+  public static final String KERBEROS_KEYTAB = "kerberosKeytab";
+
+  /**
+   * Kerberos principal used for authentication
+   */
+  public static final String KERBEROS_PRINCIPAL = "kerberosPrincipal";
+
+  /**
+   * The effective user if different from the kerberos principal
+   */
+  public static final String PROXY_USER = "proxyUser";
 }

http://git-wip-us.apache.org/repos/asf/kudu/blob/7db4d82f/java/kudu-flume-sink/src/test/java/org/apache/kudu/flume/sink/AvroKuduOperationsProducerTest.java
----------------------------------------------------------------------
diff --git a/java/kudu-flume-sink/src/test/java/org/apache/kudu/flume/sink/AvroKuduOperationsProducerTest.java b/java/kudu-flume-sink/src/test/java/org/apache/kudu/flume/sink/AvroKuduOperationsProducerTest.java
index 7b20a2c..9f200b8 100644
--- a/java/kudu-flume-sink/src/test/java/org/apache/kudu/flume/sink/AvroKuduOperationsProducerTest.java
+++ b/java/kudu-flume-sink/src/test/java/org/apache/kudu/flume/sink/AvroKuduOperationsProducerTest.java
@@ -23,10 +23,8 @@ import static java.nio.charset.StandardCharsets.UTF_8;
 import static org.apache.kudu.flume.sink.AvroKuduOperationsProducer.SCHEMA_LITERAL_HEADER;
 import static org.apache.kudu.flume.sink.AvroKuduOperationsProducer.SCHEMA_PROP;
 import static org.apache.kudu.flume.sink.AvroKuduOperationsProducer.SCHEMA_URL_HEADER;
-import static org.apache.kudu.flume.sink.KuduSinkConfigurationConstants.MASTER_ADDRESSES;
 import static org.apache.kudu.flume.sink.KuduSinkConfigurationConstants.PRODUCER;
 import static org.apache.kudu.flume.sink.KuduSinkConfigurationConstants.PRODUCER_PREFIX;
-import static org.apache.kudu.flume.sink.KuduSinkConfigurationConstants.TABLE_NAME;
 import static org.apache.kudu.util.ClientTestUtil.scanTableToStrings;
 import static org.junit.Assert.assertArrayEquals;
 import static org.junit.Assert.assertEquals;
@@ -38,7 +36,6 @@ import java.net.URL;
 import java.nio.file.Paths;
 import java.util.ArrayList;
 import java.util.Collections;
-import java.util.HashMap;
 import java.util.List;
 
 import com.google.common.collect.ImmutableList;
@@ -49,15 +46,10 @@ import org.apache.avro.io.DatumWriter;
 import org.apache.avro.io.Encoder;
 import org.apache.avro.io.EncoderFactory;
 import org.apache.avro.specific.SpecificDatumWriter;
-import org.apache.flume.Channel;
 import org.apache.flume.Context;
 import org.apache.flume.Event;
-import org.apache.flume.Sink;
-import org.apache.flume.Transaction;
-import org.apache.flume.channel.MemoryChannel;
-import org.apache.flume.conf.Configurables;
 import org.apache.flume.event.EventBuilder;
-import org.apache.kudu.util.DecimalUtil;
+
 import org.junit.Test;
 
 import org.apache.kudu.ColumnSchema;
@@ -66,6 +58,7 @@ import org.apache.kudu.Type;
 import org.apache.kudu.client.BaseKuduTest;
 import org.apache.kudu.client.CreateTableOptions;
 import org.apache.kudu.client.KuduTable;
+import org.apache.kudu.util.DecimalUtil;
 
 public class AvroKuduOperationsProducerTest extends BaseKuduTest {
   private static String schemaUriString;
@@ -117,27 +110,13 @@ public class AvroKuduOperationsProducerTest extends BaseKuduTest {
     KuduTable table = createNewTable(
         String.format("test%sevents%s", eventCount, schemaLocation));
     String tableName = table.getName();
-    Context ctx = schemaLocation != SchemaLocation.GLOBAL ? new Context()
+    Context context = schemaLocation != SchemaLocation.GLOBAL ? new Context()
         : new Context(ImmutableMap.of(PRODUCER_PREFIX + SCHEMA_PROP, schemaUriString));
-    KuduSink sink = createSink(tableName, ctx);
-
-    Channel channel = new MemoryChannel();
-    Configurables.configure(channel, new Context());
-    sink.setChannel(channel);
-    sink.start();
-
-    Transaction tx = channel.getTransaction();
-    tx.begin();
-    writeEventsToChannel(channel, eventCount, schemaLocation);
-    tx.commit();
-    tx.close();
-
-    Sink.Status status = sink.process();
-    if (eventCount == 0) {
-      assertEquals("incorrect status for empty channel", status, Sink.Status.BACKOFF);
-    } else {
-      assertEquals("incorrect status for non-empty channel", status, Sink.Status.READY);
-    }
+    context.put(PRODUCER, AvroKuduOperationsProducer.class.getName());
+
+    List<Event> events = generateEvents(eventCount, schemaLocation);
+
+    KuduSinkTestUtil.processEventsCreatingSink(syncClient, context, tableName, events);
 
     List<String> answers = makeAnswers(eventCount);
     List<String> rows = scanTableToStrings(table);
@@ -146,11 +125,12 @@ public class AvroKuduOperationsProducerTest extends BaseKuduTest {
   }
 
   private KuduTable createNewTable(String tableName) throws Exception {
-    ArrayList<ColumnSchema> columns = new ArrayList<>(5);
+    List<ColumnSchema> columns = new ArrayList<>(5);
     columns.add(new ColumnSchema.ColumnSchemaBuilder("key", Type.INT32).key(true).build());
     columns.add(new ColumnSchema.ColumnSchemaBuilder("longField", Type.INT64).build());
     columns.add(new ColumnSchema.ColumnSchemaBuilder("doubleField", Type.DOUBLE).build());
-    columns.add(new ColumnSchema.ColumnSchemaBuilder("nullableField", Type.STRING).nullable(true).build());
+    columns.add(new ColumnSchema.ColumnSchemaBuilder("nullableField", Type.STRING)
+        .nullable(true).build());
     columns.add(new ColumnSchema.ColumnSchemaBuilder("stringField", Type.STRING).build());
     columns.add(new ColumnSchema.ColumnSchemaBuilder("decimalField", Type.DECIMAL)
         .typeAttributes(DecimalUtil.typeAttributes(9, 1)).build());
@@ -160,21 +140,9 @@ public class AvroKuduOperationsProducerTest extends BaseKuduTest {
     return createTable(tableName, new Schema(columns), createOptions);
   }
 
-  private KuduSink createSink(String tableName, Context ctx) {
-    KuduSink sink = new KuduSink(syncClient);
-    HashMap<String, String> parameters = new HashMap<>();
-    parameters.put(TABLE_NAME, tableName);
-    parameters.put(MASTER_ADDRESSES, getMasterAddressesAsString());
-    parameters.put(PRODUCER, AvroKuduOperationsProducer.class.getName());
-    Context context = new Context(parameters);
-    context.putAll(ctx.getParameters());
-    Configurables.configure(sink, context);
-
-    return sink;
-  }
-
-  private void writeEventsToChannel(Channel channel, int eventCount,
-                                    SchemaLocation schemaLocation) throws Exception {
+  private List<Event> generateEvents(int eventCount,
+                                     SchemaLocation schemaLocation) throws Exception {
+    List<Event> events = new ArrayList<>();
     for (int i = 0; i < eventCount; i++) {
       AvroKuduOperationsProducerTestRecord record = new AvroKuduOperationsProducerTestRecord();
       record.setKey(10 * i);
@@ -195,8 +163,9 @@ public class AvroKuduOperationsProducerTest extends BaseKuduTest {
       } else if (schemaLocation == SchemaLocation.LITERAL) {
         e.setHeaders(ImmutableMap.of(SCHEMA_LITERAL_HEADER, schemaLiteral));
       }
-      channel.put(e);
+      events.add(e);
     }
+    return events;
   }
 
   private List<String> makeAnswers(int eventCount) {

http://git-wip-us.apache.org/repos/asf/kudu/blob/7db4d82f/java/kudu-flume-sink/src/test/java/org/apache/kudu/flume/sink/KeyedKuduOperationsProducerTest.java
----------------------------------------------------------------------
diff --git a/java/kudu-flume-sink/src/test/java/org/apache/kudu/flume/sink/KeyedKuduOperationsProducerTest.java b/java/kudu-flume-sink/src/test/java/org/apache/kudu/flume/sink/KeyedKuduOperationsProducerTest.java
index b16a209..1940369 100644
--- a/java/kudu-flume-sink/src/test/java/org/apache/kudu/flume/sink/KeyedKuduOperationsProducerTest.java
+++ b/java/kudu-flume-sink/src/test/java/org/apache/kudu/flume/sink/KeyedKuduOperationsProducerTest.java
@@ -16,13 +16,12 @@
  * specific language governing permissions and limitations
  * under the License.
  */
+
 package org.apache.kudu.flume.sink;
 
 import static java.nio.charset.StandardCharsets.UTF_8;
-import static org.apache.kudu.flume.sink.KuduSinkConfigurationConstants.MASTER_ADDRESSES;
 import static org.apache.kudu.flume.sink.KuduSinkConfigurationConstants.PRODUCER;
 import static org.apache.kudu.flume.sink.KuduSinkConfigurationConstants.PRODUCER_PREFIX;
-import static org.apache.kudu.flume.sink.KuduSinkConfigurationConstants.TABLE_NAME;
 import static org.apache.kudu.flume.sink.SimpleKeyedKuduOperationsProducer.KEY_COLUMN_DEFAULT;
 import static org.apache.kudu.flume.sink.SimpleKeyedKuduOperationsProducer.OPERATION_PROP;
 import static org.apache.kudu.flume.sink.SimpleKeyedKuduOperationsProducer.PAYLOAD_COLUMN_DEFAULT;
@@ -31,18 +30,12 @@ import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertTrue;
 
 import java.util.ArrayList;
-import java.util.HashMap;
 import java.util.List;
 
 import com.google.common.collect.ImmutableList;
 import com.google.common.collect.ImmutableMap;
-import org.apache.flume.Channel;
 import org.apache.flume.Context;
 import org.apache.flume.Event;
-import org.apache.flume.Sink;
-import org.apache.flume.Transaction;
-import org.apache.flume.channel.MemoryChannel;
-import org.apache.flume.conf.Configurables;
 import org.apache.flume.event.EventBuilder;
 import org.junit.Test;
 import org.slf4j.Logger;
@@ -69,7 +62,7 @@ public class KeyedKuduOperationsProducerTest extends BaseKuduTest {
         new ColumnSchema.ColumnSchemaBuilder(PAYLOAD_COLUMN_DEFAULT, Type.BINARY)
             .key(false).build());
     CreateTableOptions createOptions =
-      new CreateTableOptions().setRangePartitionColumns(ImmutableList.of(KEY_COLUMN_DEFAULT))
+        new CreateTableOptions().setRangePartitionColumns(ImmutableList.of(KEY_COLUMN_DEFAULT))
                               .setNumReplicas(1);
     KuduTable table = createTable(tableName, new Schema(columns), createOptions);
 
@@ -114,29 +107,22 @@ public class KeyedKuduOperationsProducerTest extends BaseKuduTest {
 
     KuduTable table = createNewTable("testDupUpsertEvents");
     String tableName = table.getName();
-    Context ctx = new Context(ImmutableMap.of(PRODUCER_PREFIX + OPERATION_PROP, "upsert"));
-    KuduSink sink = createSink(tableName, ctx);
-
-    Channel channel = new MemoryChannel();
-    Configurables.configure(channel, new Context());
-    sink.setChannel(channel);
+    Context ctx = new Context(ImmutableMap.of(
+        PRODUCER_PREFIX + OPERATION_PROP, "upsert",
+        PRODUCER, SimpleKeyedKuduOperationsProducer.class.getName()
+    ));
+    KuduSink sink = KuduSinkTestUtil.createSink(syncClient, tableName, ctx);
     sink.start();
 
-    Transaction tx = channel.getTransaction();
-    tx.begin();
-
     int numRows = 3;
+    List<Event> events = new ArrayList<>();
     for (int i = 0; i < numRows; i++) {
       Event e = EventBuilder.withBody(String.format("payload body %s", i), UTF_8);
       e.setHeaders(ImmutableMap.of(KEY_COLUMN_DEFAULT, String.format("key %s", i)));
-      channel.put(e);
+      events.add(e);
     }
 
-    tx.commit();
-    tx.close();
-
-    Sink.Status status = sink.process();
-    assertTrue("incorrect status for non-empty channel", status != Sink.Status.BACKOFF);
+    KuduSinkTestUtil.processEvents(sink, events);
 
     List<String> rows = scanTableToStrings(table);
     assertEquals(numRows + " row(s) expected", numRows, rows.size());
@@ -145,18 +131,10 @@ public class KeyedKuduOperationsProducerTest extends BaseKuduTest {
       assertTrue("incorrect payload", rows.get(i).contains("payload body " + i));
     }
 
-    Transaction utx = channel.getTransaction();
-    utx.begin();
-
     Event dup = EventBuilder.withBody("payload body upserted".getBytes(UTF_8));
     dup.setHeaders(ImmutableMap.of("key", String.format("key %s", 0)));
-    channel.put(dup);
 
-    utx.commit();
-    utx.close();
-
-    Sink.Status upStatus = sink.process();
-    assertTrue("incorrect status for non-empty channel", upStatus != Sink.Status.BACKOFF);
+    KuduSinkTestUtil.processEvents(sink, ImmutableList.of(dup));
 
     List<String> upRows = scanTableToStrings(table);
     assertEquals(numRows + " row(s) expected", numRows, upRows.size());
@@ -174,33 +152,14 @@ public class KeyedKuduOperationsProducerTest extends BaseKuduTest {
 
     KuduTable table = createNewTable("test" + eventCount + "events" + operation);
     String tableName = table.getName();
-    Context ctx = new Context(ImmutableMap.of(PRODUCER_PREFIX + OPERATION_PROP, operation));
-    KuduSink sink = createSink(tableName, ctx);
-
-    Channel channel = new MemoryChannel();
-    Configurables.configure(channel, new Context());
-    sink.setChannel(channel);
-    sink.start();
-
-    Transaction tx = channel.getTransaction();
-    tx.begin();
-
-    for (int i = 0; i < eventCount; i++) {
-      Event e = EventBuilder.withBody(String.format("payload body %s", i)
-          .getBytes(UTF_8));
-      e.setHeaders(ImmutableMap.of("key", String.format("key %s", i)));
-      channel.put(e);
-    }
+    Context context = new Context(ImmutableMap.of(
+        PRODUCER_PREFIX + OPERATION_PROP, operation,
+        PRODUCER, SimpleKeyedKuduOperationsProducer.class.getName()
+    ));
 
-    tx.commit();
-    tx.close();
+    List<Event> events = getEvents(eventCount);
 
-    Sink.Status status = sink.process();
-    if (eventCount == 0) {
-      assertTrue("incorrect status for empty channel", status == Sink.Status.BACKOFF);
-    } else {
-      assertTrue("incorrect status for non-empty channel", status != Sink.Status.BACKOFF);
-    }
+    KuduSinkTestUtil.processEventsCreatingSink(syncClient, context, tableName, events);
 
     List<String> rows = scanTableToStrings(table);
     assertEquals(eventCount + " row(s) expected", eventCount, rows.size());
@@ -212,20 +171,13 @@ public class KeyedKuduOperationsProducerTest extends BaseKuduTest {
     LOG.info("Testing {} events finished successfully.", eventCount);
   }
 
-  private KuduSink createSink(String tableName, Context ctx) {
-    LOG.info("Creating Kudu sink for '{}' table...", tableName);
-
-    KuduSink sink = new KuduSink(syncClient);
-    HashMap<String, String> parameters = new HashMap<>();
-    parameters.put(TABLE_NAME, tableName);
-    parameters.put(MASTER_ADDRESSES, getMasterAddressesAsString());
-    parameters.put(PRODUCER, SimpleKeyedKuduOperationsProducer.class.getName());
-    Context context = new Context(parameters);
-    context.putAll(ctx.getParameters());
-    Configurables.configure(sink, context);
-
-    LOG.info("Created Kudu sink for '{}' table.", tableName);
-
-    return sink;
+  private List<Event> getEvents(int eventCount) {
+    List<Event> events = new ArrayList<>();
+    for (int i = 0; i < eventCount; i++) {
+      Event e = EventBuilder.withBody(String.format("payload body %s", i).getBytes(UTF_8));
+      e.setHeaders(ImmutableMap.of("key", String.format("key %s", i)));
+      events.add(e);
+    }
+    return events;
   }
 }

http://git-wip-us.apache.org/repos/asf/kudu/blob/7db4d82f/java/kudu-flume-sink/src/test/java/org/apache/kudu/flume/sink/KuduSinkTest.java
----------------------------------------------------------------------
diff --git a/java/kudu-flume-sink/src/test/java/org/apache/kudu/flume/sink/KuduSinkTest.java b/java/kudu-flume-sink/src/test/java/org/apache/kudu/flume/sink/KuduSinkTest.java
index d4dce94..eb5f7c8 100644
--- a/java/kudu-flume-sink/src/test/java/org/apache/kudu/flume/sink/KuduSinkTest.java
+++ b/java/kudu-flume-sink/src/test/java/org/apache/kudu/flume/sink/KuduSinkTest.java
@@ -16,11 +16,13 @@
  * specific language governing permissions and limitations
  * under the License.
  */
+
 package org.apache.kudu.flume.sink;
 
 import static java.nio.charset.StandardCharsets.UTF_8;
 import static org.apache.kudu.util.ClientTestUtil.scanTableToStrings;
 import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertSame;
 import static org.junit.Assert.assertTrue;
 import static org.junit.Assert.fail;
 
@@ -35,8 +37,8 @@ import org.apache.flume.Event;
 import org.apache.flume.EventDeliveryException;
 import org.apache.flume.FlumeException;
 import org.apache.flume.Sink;
+import org.apache.flume.Sink.Status;
 import org.apache.flume.Transaction;
-import org.apache.flume.channel.MemoryChannel;
 import org.apache.flume.conf.Configurables;
 import org.apache.flume.event.EventBuilder;
 import org.junit.Assert;
@@ -97,13 +99,10 @@ public class KuduSinkTest extends BaseKuduTest {
   }
 
   @Test(expected = FlumeException.class)
-  public void testMissingTable() throws Exception {
+  public void testMissingTable() {
     LOG.info("Testing missing table...");
 
-    KuduSink sink = createSink("missingTable");
-    Channel channel = new MemoryChannel();
-    Configurables.configure(channel, new Context());
-    sink.setChannel(channel);
+    KuduSink sink = KuduSinkTestUtil.createSink(syncClient, "missingTable", new Context());
     sink.start();
 
     LOG.info("Testing missing table finished successfully.");
@@ -140,12 +139,9 @@ public class KuduSinkTest extends BaseKuduTest {
     Context sinkContext = new Context();
     sinkContext.put(KuduSinkConfigurationConstants.IGNORE_DUPLICATE_ROWS,
                     Boolean.toString(ignoreDuplicateRows));
-    KuduSink sink = createSink(tableName, sinkContext);
-
-    Channel channel = new MemoryChannel();
-    Configurables.configure(channel, new Context());
-    sink.setChannel(channel);
+    KuduSink sink = KuduSinkTestUtil.createSink(syncClient, tableName, sinkContext);
     sink.start();
+    Channel channel = sink.getChannel();
 
     Transaction tx = channel.getTransaction();
     tx.begin();
@@ -163,7 +159,7 @@ public class KuduSinkTest extends BaseKuduTest {
       if (!ignoreDuplicateRows) {
         fail("Incorrectly ignored duplicate rows!");
       }
-      assertTrue("incorrect status for empty channel", status == Sink.Status.READY);
+      assertSame("incorrect status for empty channel", status, Status.READY);
     } catch (EventDeliveryException e) {
       if (ignoreDuplicateRows) {
         throw new AssertionError("Failed to ignore duplicate rows!", e);
@@ -189,31 +185,15 @@ public class KuduSinkTest extends BaseKuduTest {
 
     KuduTable table = createNewTable("test" + eventCount + "events");
     String tableName = table.getName();
-    KuduSink sink = createSink(tableName);
-
-    Channel channel = new MemoryChannel();
-    Configurables.configure(channel, new Context());
-    sink.setChannel(channel);
-    sink.start();
 
-    Transaction tx = channel.getTransaction();
-    tx.begin();
+    List<Event> events = new ArrayList<>();
 
     for (int i = 0; i < eventCount; i++) {
-      Event e = EventBuilder.withBody(String.format("payload body %s", i)
-          .getBytes(UTF_8));
-      channel.put(e);
+      Event e = EventBuilder.withBody(String.format("payload body %s", i).getBytes(UTF_8));
+      events.add(e);
     }
 
-    tx.commit();
-    tx.close();
-
-    Sink.Status status = sink.process();
-    if (eventCount == 0) {
-      assertTrue("incorrect status for empty channel", status == Sink.Status.BACKOFF);
-    } else {
-      assertTrue("incorrect status for non-empty channel", status != Sink.Status.BACKOFF);
-    }
+    KuduSinkTestUtil.processEventsCreatingSink(syncClient, new Context(), tableName, events);
 
     List<String> rows = scanTableToStrings(table);
     assertEquals(eventCount + " row(s) expected", eventCount, rows.size());
@@ -224,24 +204,4 @@ public class KuduSinkTest extends BaseKuduTest {
 
     LOG.info("Testing {} events finished successfully.", eventCount);
   }
-
-  private KuduSink createSink(String tableName) {
-    return createSink(tableName, new Context());
-  }
-
-  private KuduSink createSink(String tableName, Context ctx) {
-    LOG.info("Creating Kudu sink for '{}' table...", tableName);
-
-    KuduSink sink = new KuduSink(syncClient);
-    HashMap<String, String> parameters = new HashMap<>();
-    parameters.put(KuduSinkConfigurationConstants.TABLE_NAME, tableName);
-    parameters.put(KuduSinkConfigurationConstants.MASTER_ADDRESSES, getMasterAddressesAsString());
-    Context context = new Context(parameters);
-    context.putAll(ctx.getParameters());
-    Configurables.configure(sink, context);
-
-    LOG.info("Created Kudu sink for '{}' table.", tableName);
-
-    return sink;
-  }
 }

http://git-wip-us.apache.org/repos/asf/kudu/blob/7db4d82f/java/kudu-flume-sink/src/test/java/org/apache/kudu/flume/sink/KuduSinkTestUtil.java
----------------------------------------------------------------------
diff --git a/java/kudu-flume-sink/src/test/java/org/apache/kudu/flume/sink/KuduSinkTestUtil.java b/java/kudu-flume-sink/src/test/java/org/apache/kudu/flume/sink/KuduSinkTestUtil.java
new file mode 100644
index 0000000..e24bfe0
--- /dev/null
+++ b/java/kudu-flume-sink/src/test/java/org/apache/kudu/flume/sink/KuduSinkTestUtil.java
@@ -0,0 +1,85 @@
+package org.apache.kudu.flume.sink;
+
+import static org.apache.kudu.flume.sink.KuduSinkConfigurationConstants.KERBEROS_KEYTAB;
+import static org.apache.kudu.flume.sink.KuduSinkConfigurationConstants.KERBEROS_PRINCIPAL;
+import static org.apache.kudu.flume.sink.KuduSinkConfigurationConstants.MASTER_ADDRESSES;
+import static org.apache.kudu.flume.sink.KuduSinkConfigurationConstants.TABLE_NAME;
+import static org.junit.Assert.assertNotSame;
+import static org.junit.Assert.assertSame;
+
+import java.util.List;
+
+import org.apache.flume.Channel;
+import org.apache.flume.Context;
+import org.apache.flume.Event;
+import org.apache.flume.EventDeliveryException;
+import org.apache.flume.Sink.Status;
+import org.apache.flume.Transaction;
+import org.apache.flume.channel.MemoryChannel;
+import org.apache.flume.conf.Configurables;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.kudu.client.KuduClient;
+
+class KuduSinkTestUtil {
+  private static final Logger LOG = LoggerFactory.getLogger(KuduSinkTestUtil.class);
+
+  static KuduSink createSink(KuduClient client, String tableName, Context ctx) {
+    return createSink(tableName, client, ctx, client.getMasterAddressesAsString());
+  }
+
+  private static KuduSink createSink(
+      String tableName, KuduClient client, Context ctx, String masterAddresses) {
+    LOG.info("Creating Kudu sink for '{}' table...", tableName);
+
+    Context context = new Context();
+    context.put(TABLE_NAME, tableName);
+    context.put(MASTER_ADDRESSES, masterAddresses);
+    context.putAll(ctx.getParameters());
+    KuduSink sink = new KuduSink(client);
+    Configurables.configure(sink, context);
+    Channel channel = new MemoryChannel();
+    Configurables.configure(channel, new Context());
+    sink.setChannel(channel);
+
+    LOG.info("Created Kudu sink for '{}' table.", tableName);
+
+    return sink;
+  }
+
+  static KuduSink createSecureSink(String tableName, String masterAddresses, String clusterRoot) {
+    Context context = new Context();
+    context.put(KERBEROS_KEYTAB, clusterRoot + "/krb5kdc/test-user.keytab");
+    context.put(KERBEROS_PRINCIPAL, "test-user@KRBTEST.COM");
+
+    return createSink(tableName, null, context, masterAddresses);
+  }
+
+  static void processEventsCreatingSink(
+      KuduClient syncClient, Context context, String tableName, List<Event> events
+      ) throws EventDeliveryException {
+    KuduSink sink = createSink(syncClient, tableName, context);
+    sink.start();
+    processEvents(sink, events);
+  }
+
+  static void processEvents(KuduSink sink, List<Event> events) throws EventDeliveryException {
+    Channel channel = sink.getChannel();
+    Transaction tx = channel.getTransaction();
+    tx.begin();
+    for (Event e : events) {
+      channel.put(e);
+    }
+    tx.commit();
+    tx.close();
+
+    Status status = sink.process();
+    if (events.isEmpty()) {
+      assertSame("incorrect status for empty channel", status, Status.BACKOFF);
+    } else {
+      assertNotSame("incorrect status for non-empty channel", status, Status.BACKOFF);
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/kudu/blob/7db4d82f/java/kudu-flume-sink/src/test/java/org/apache/kudu/flume/sink/RegexpKuduOperationsProducerTest.java
----------------------------------------------------------------------
diff --git a/java/kudu-flume-sink/src/test/java/org/apache/kudu/flume/sink/RegexpKuduOperationsProducerTest.java b/java/kudu-flume-sink/src/test/java/org/apache/kudu/flume/sink/RegexpKuduOperationsProducerTest.java
index 8b4c3df..cadfa2e 100644
--- a/java/kudu-flume-sink/src/test/java/org/apache/kudu/flume/sink/RegexpKuduOperationsProducerTest.java
+++ b/java/kudu-flume-sink/src/test/java/org/apache/kudu/flume/sink/RegexpKuduOperationsProducerTest.java
@@ -16,34 +16,25 @@
  * specific language governing permissions and limitations
  * under the License.
  */
+
 package org.apache.kudu.flume.sink;
 
 import static java.nio.charset.StandardCharsets.UTF_8;
-import static org.apache.kudu.flume.sink.KuduSinkConfigurationConstants.MASTER_ADDRESSES;
 import static org.apache.kudu.flume.sink.KuduSinkConfigurationConstants.PRODUCER;
 import static org.apache.kudu.flume.sink.KuduSinkConfigurationConstants.PRODUCER_PREFIX;
-import static org.apache.kudu.flume.sink.KuduSinkConfigurationConstants.TABLE_NAME;
 import static org.apache.kudu.flume.sink.RegexpKuduOperationsProducer.OPERATION_PROP;
 import static org.apache.kudu.flume.sink.RegexpKuduOperationsProducer.PATTERN_PROP;
 import static org.apache.kudu.util.ClientTestUtil.scanTableToStrings;
 import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertTrue;
 
 import java.util.ArrayList;
 import java.util.Collections;
-import java.util.HashMap;
 import java.util.List;
 
 import com.google.common.collect.ImmutableList;
-import org.apache.flume.Channel;
 import org.apache.flume.Context;
 import org.apache.flume.Event;
-import org.apache.flume.Sink;
-import org.apache.flume.Transaction;
-import org.apache.flume.channel.MemoryChannel;
-import org.apache.flume.conf.Configurables;
 import org.apache.flume.event.EventBuilder;
-import org.apache.kudu.util.DecimalUtil;
 import org.junit.Test;
 
 import org.apache.kudu.ColumnSchema;
@@ -52,6 +43,7 @@ import org.apache.kudu.Type;
 import org.apache.kudu.client.BaseKuduTest;
 import org.apache.kudu.client.CreateTableOptions;
 import org.apache.kudu.client.KuduTable;
+import org.apache.kudu.util.DecimalUtil;
 
 public class RegexpKuduOperationsProducerTest extends BaseKuduTest {
   private static final String TEST_REGEXP =
@@ -75,8 +67,7 @@ public class RegexpKuduOperationsProducerTest extends BaseKuduTest {
         .typeAttributes(DecimalUtil.typeAttributes(9, 1)).build());
     CreateTableOptions createOptions =
         new CreateTableOptions().addHashPartitions(ImmutableList.of("key"), 3).setNumReplicas(1);
-    KuduTable table = createTable(tableName, new Schema(columns), createOptions);
-    return table;
+    return createTable(tableName, new Schema(columns), createOptions);
   }
 
   @Test
@@ -117,16 +108,42 @@ public class RegexpKuduOperationsProducerTest extends BaseKuduTest {
   private void testEvents(int eventCount, int perEventRowCount, String operation) throws Exception {
     String tableName = String.format("test%sevents%srowseach%s",
         eventCount, perEventRowCount, operation);
+    Context context = new Context();
+    context.put(PRODUCER_PREFIX + PATTERN_PROP, TEST_REGEXP);
+    context.put(PRODUCER_PREFIX + OPERATION_PROP, operation);
+    context.put(PRODUCER, RegexpKuduOperationsProducer.class.getName());
     KuduTable table = createNewTable(tableName);
-    KuduSink sink = createSink(tableName, operation);
 
-    Channel channel = new MemoryChannel();
-    Configurables.configure(channel, new Context());
-    sink.setChannel(channel);
-    sink.start();
+    List<Event> events = generateEvents(eventCount, perEventRowCount, operation);
 
-    Transaction tx = channel.getTransaction();
-    tx.begin();
+    KuduSinkTestUtil.processEventsCreatingSink(syncClient, context, tableName, events);
+
+    List<String> rows = scanTableToStrings(table);
+    assertEquals(eventCount * perEventRowCount + " row(s) expected",
+        eventCount * perEventRowCount,
+        rows.size());
+
+    ArrayList<String> rightAnswers = new ArrayList<>(eventCount * perEventRowCount);
+    for (int i = 0; i < eventCount; i++) {
+      for (int j = 0; j < perEventRowCount; j++) {
+        int value = operation.equals("upsert") && i == 0 ? 1 : i;
+        String baseAnswer = "INT32 key=1%2$d%3$d1, INT8 byteFld=%1$d, INT16 shortFld=%1$d, " +
+            "INT32 intFld=%1$d, INT64 longFld=%1$d, BINARY binaryFld=\"binary\", " +
+            "STRING stringFld=string, BOOL boolFld=false, FLOAT floatFld=%1$d.%1$d, " +
+            "DOUBLE doubleFld=%1$d.%1$d, DECIMAL decimalFld(9, 1)=%1$d.%1$d";
+        String rightAnswer = String.format(baseAnswer, value, i, j);
+        rightAnswers.add(rightAnswer);
+      }
+    }
+    Collections.sort(rightAnswers);
+
+    for (int k = 0; k < eventCount * perEventRowCount; k++) {
+      assertEquals("incorrect row", rightAnswers.get(k), rows.get(k));
+    }
+  }
+
+  private List<Event> generateEvents(int eventCount, int perEventRowCount, String operation) {
+    List<Event> events = new ArrayList<>();
 
     for (int i = 0; i < eventCount; i++) {
       StringBuilder payload = new StringBuilder();
@@ -137,7 +154,7 @@ public class RegexpKuduOperationsProducerTest extends BaseKuduTest {
         payload.append(row);
       }
       Event e = EventBuilder.withBody(payload.toString().getBytes(UTF_8));
-      channel.put(e);
+      events.add(e);
     }
 
     if (eventCount > 0) {
@@ -151,7 +168,7 @@ public class RegexpKuduOperationsProducerTest extends BaseKuduTest {
           upserts.append(row);
         }
         Event e = EventBuilder.withBody(upserts.toString().getBytes(UTF_8));
-        channel.put(e);
+        events.add(e);
       }
 
       // Also check some bad/corner cases.
@@ -160,59 +177,9 @@ public class RegexpKuduOperationsProducerTest extends BaseKuduTest {
       String[] testCases = {mismatchInInt, emptyString};
       for (String testCase : testCases) {
         Event e = EventBuilder.withBody(testCase.getBytes(UTF_8));
-        channel.put(e);
-      }
-    }
-
-    tx.commit();
-    tx.close();
-
-    Sink.Status status = sink.process();
-    if (eventCount == 0) {
-      assertTrue("incorrect status for empty channel", status == Sink.Status.BACKOFF);
-    } else {
-      assertTrue("incorrect status for non-empty channel", status != Sink.Status.BACKOFF);
-    }
-
-    List<String> rows = scanTableToStrings(table);
-    assertEquals(eventCount * perEventRowCount + " row(s) expected",
-      eventCount * perEventRowCount,
-      rows.size());
-
-    ArrayList<String> rightAnswers = new ArrayList<>(eventCount * perEventRowCount);
-    for (int i = 0; i < eventCount; i++) {
-      for (int j = 0; j < perEventRowCount; j++) {
-        int value = operation.equals("upsert") && i == 0 ? 1 : i;
-        String baseAnswer = "INT32 key=1%2$d%3$d1, INT8 byteFld=%1$d, INT16 shortFld=%1$d, " +
-            "INT32 intFld=%1$d, INT64 longFld=%1$d, BINARY binaryFld=\"binary\", " +
-            "STRING stringFld=string, BOOL boolFld=false, FLOAT floatFld=%1$d.%1$d, " +
-            "DOUBLE doubleFld=%1$d.%1$d, DECIMAL decimalFld(9, 1)=%1$d.%1$d";
-        String rightAnswer = String.format(baseAnswer, value, i, j);
-        rightAnswers.add(rightAnswer);
+        events.add(e);
       }
     }
-    Collections.sort(rightAnswers);
-
-    for (int k = 0; k < eventCount * perEventRowCount; k++) {
-      assertEquals("incorrect row", rightAnswers.get(k), rows.get(k));
-    }
-  }
-
-  private KuduSink createSink(String tableName, String operation) {
-    return createSink(tableName, new Context(), operation);
-  }
-
-  private KuduSink createSink(String tableName, Context ctx, String operation) {
-    KuduSink sink = new KuduSink(syncClient);
-    HashMap<String, String> parameters = new HashMap<>();
-    parameters.put(TABLE_NAME, tableName);
-    parameters.put(MASTER_ADDRESSES, getMasterAddressesAsString());
-    parameters.put(PRODUCER, RegexpKuduOperationsProducer.class.getName());
-    parameters.put(PRODUCER_PREFIX + PATTERN_PROP, TEST_REGEXP);
-    parameters.put(PRODUCER_PREFIX + OPERATION_PROP, operation);
-    Context context = new Context(parameters);
-    context.putAll(ctx.getParameters());
-    Configurables.configure(sink, context);
-    return sink;
+    return events;
   }
 }

http://git-wip-us.apache.org/repos/asf/kudu/blob/7db4d82f/java/kudu-flume-sink/src/test/java/org/apache/kudu/flume/sink/SecureKuduSinkTest.java
----------------------------------------------------------------------
diff --git a/java/kudu-flume-sink/src/test/java/org/apache/kudu/flume/sink/SecureKuduSinkTest.java b/java/kudu-flume-sink/src/test/java/org/apache/kudu/flume/sink/SecureKuduSinkTest.java
new file mode 100644
index 0000000..7fbfcef
--- /dev/null
+++ b/java/kudu-flume-sink/src/test/java/org/apache/kudu/flume/sink/SecureKuduSinkTest.java
@@ -0,0 +1,115 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.kudu.flume.sink;
+
+import static java.nio.charset.StandardCharsets.UTF_8;
+import static org.apache.kudu.util.ClientTestUtil.scanTableToStrings;
+import static org.apache.kudu.util.SecurityUtil.KUDU_TICKETCACHE_PROPERTY;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.concurrent.TimeUnit;
+
+import com.google.common.collect.ImmutableList;
+import org.apache.flume.Event;
+import org.apache.flume.EventDeliveryException;
+import org.apache.flume.event.EventBuilder;
+import org.junit.Before;
+import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.kudu.ColumnSchema;
+import org.apache.kudu.Schema;
+import org.apache.kudu.Type;
+import org.apache.kudu.client.BaseKuduTest;
+import org.apache.kudu.client.CreateTableOptions;
+import org.apache.kudu.client.KuduTable;
+import org.apache.kudu.client.MiniKuduCluster.MiniKuduClusterBuilder;
+
+public class SecureKuduSinkTest extends BaseKuduTest {
+  private static final Logger LOG = LoggerFactory.getLogger(SecureKuduSinkTest.class);
+  private static final int TICKET_LIFETIME_SECONDS = 10;
+  private static final int RENEWABLE_LIFETIME_SECONDS = 30;
+
+  @Before
+  public void clearTicketCacheProperty() {
+    // Let Flume authenticate.
+    System.clearProperty(KUDU_TICKETCACHE_PROPERTY);
+  }
+
+  @Override
+  protected MiniKuduClusterBuilder getMiniClusterBuilder() {
+    return super.getMiniClusterBuilder()
+      .kdcTicketLifetime(TICKET_LIFETIME_SECONDS + "s")
+      .kdcRenewLifetime(RENEWABLE_LIFETIME_SECONDS + "s")
+      .enableKerberos();
+  }
+
+  @Test
+  public void testEventsWithShortTickets() throws Exception {
+    LOG.info("Creating new table...");
+    ArrayList<ColumnSchema> columns = new ArrayList<>(1);
+    columns.add(new ColumnSchema.ColumnSchemaBuilder("payload", Type.BINARY).key(true).build());
+    CreateTableOptions createOptions =
+        new CreateTableOptions().setRangePartitionColumns(ImmutableList.of("payload"))
+        .setNumReplicas(1);
+    String tableName = "test_long_lived_events";
+    KuduTable table = createTable(tableName, new Schema(columns), createOptions);
+    LOG.info("Created new table.");
+
+    KuduSink sink = KuduSinkTestUtil.createSecureSink(
+        tableName, getMasterAddressesAsString(), getClusterRoot());
+    sink.start();
+
+    LOG.info("Testing events at the beginning.");
+    int eventCount = 10;
+
+    processEvents(sink, 0, eventCount / 2);
+
+    LOG.info("Waiting for tickets to expire");
+    TimeUnit.SECONDS.sleep(RENEWABLE_LIFETIME_SECONDS * 2);
+
+    LOG.info("Testing events after ticket renewal.");
+    processEvents(sink, eventCount / 2, eventCount);
+
+    List<String> rows = scanTableToStrings(table);
+    assertEquals(eventCount + " row(s) expected", eventCount, rows.size());
+
+    for (int i = 0; i < eventCount; i++) {
+      assertTrue("incorrect payload", rows.get(i).contains("payload body " + i));
+    }
+
+    LOG.info("Testing {} events finished successfully.", eventCount);
+  }
+
+  private void processEvents(KuduSink sink, int from, int to) throws EventDeliveryException {
+    List<Event> events = new ArrayList<>();
+    for (int i = from; i < to; i++) {
+      Event e = EventBuilder.withBody(String.format("payload body %s", i).getBytes(UTF_8));
+      events.add(e);
+    }
+
+    KuduSinkTestUtil.processEvents(sink, events);
+    LOG.info("Events flushed.");
+  }
+}

http://git-wip-us.apache.org/repos/asf/kudu/blob/7db4d82f/src/kudu/mini-cluster/external_mini_cluster.cc
----------------------------------------------------------------------
diff --git a/src/kudu/mini-cluster/external_mini_cluster.cc b/src/kudu/mini-cluster/external_mini_cluster.cc
index 76472fe..414956d 100644
--- a/src/kudu/mini-cluster/external_mini_cluster.cc
+++ b/src/kudu/mini-cluster/external_mini_cluster.cc
@@ -183,6 +183,9 @@ Status ExternalMiniCluster::Start() {
     RETURN_NOT_OK_PREPEND(kdc_->CreateUserPrincipal("joe-interloper"),
                           "could not create unauthorized principal");
 
+    RETURN_NOT_OK_PREPEND(kdc_->CreateKeytabForExistingPrincipal("test-user"),
+                         "could not create client keytab");
+
     RETURN_NOT_OK_PREPEND(kdc_->Kinit("test-admin"),
                           "could not kinit as admin");
 

http://git-wip-us.apache.org/repos/asf/kudu/blob/7db4d82f/src/kudu/security/test/mini_kdc.cc
----------------------------------------------------------------------
diff --git a/src/kudu/security/test/mini_kdc.cc b/src/kudu/security/test/mini_kdc.cc
index 904695a..f4151e6 100644
--- a/src/kudu/security/test/mini_kdc.cc
+++ b/src/kudu/security/test/mini_kdc.cc
@@ -272,6 +272,19 @@ Status MiniKdc::CreateServiceKeytab(const string& spn,
   return Status::OK();
 }
 
+Status MiniKdc::CreateKeytabForExistingPrincipal(const string& spn) {
+  SCOPED_LOG_SLOW_EXECUTION(WARNING, 100, Substitute("creating keytab for $0", spn));
+  string kt_path = spn;
+  StripString(&kt_path, "/", '_');
+  kt_path = JoinPathSegments(options_.data_root, kt_path) + ".keytab";
+
+  string kadmin;
+  RETURN_NOT_OK(GetBinaryPath("kadmin.local", &kadmin));
+  RETURN_NOT_OK(Subprocess::Call(MakeArgv({
+          kadmin, "-q", Substitute("xst -norandkey -k $0 $1", kt_path, spn)})));
+  return Status::OK();
+}
+
 Status MiniKdc::Kinit(const string& username) {
   SCOPED_LOG_SLOW_EXECUTION(WARNING, 100, Substitute("kinit for $0", username));
   string kinit;

http://git-wip-us.apache.org/repos/asf/kudu/blob/7db4d82f/src/kudu/security/test/mini_kdc.h
----------------------------------------------------------------------
diff --git a/src/kudu/security/test/mini_kdc.h b/src/kudu/security/test/mini_kdc.h
index e282cc4..95e7848 100644
--- a/src/kudu/security/test/mini_kdc.h
+++ b/src/kudu/security/test/mini_kdc.h
@@ -92,6 +92,10 @@ class MiniKdc {
   // will be reset and a new keytab will be generated.
   Status CreateServiceKeytab(const std::string& spn, std::string* path);
 
+  // Creates a keytab for an existing principal.
+  // 'spn' is the desired service principal name (e.g. "kudu/foo.example.com").
+  Status CreateKeytabForExistingPrincipal(const std::string& spn);
+
   // Kinit a user to the mini KDC.
   Status Kinit(const std::string& username) WARN_UNUSED_RESULT;