You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kudu.apache.org by mp...@apache.org on 2018/09/28 18:44:01 UTC
kudu git commit: KUDU-2012 Kudu Flume sink auth support
Repository: kudu
Updated Branches:
refs/heads/master ad620415a -> 7db4d82f4
KUDU-2012 Kudu Flume sink auth support
Adding FlumeAuthenticator to KuduSink and creating KuduClient
inside a PrivilegedExecutor action.
Added an extra step to the mini cluster to create
a keyTab for the client used for testing.
Added automated test with short KDC ticket lifetime
to test reacquiring.
Manual testing was done on a secure cluster as well.
Change-Id: I11b5f08802883afa178d346af48d3bcd15281917
Reviewed-on: http://gerrit.cloudera.org:8080/11334
Tested-by: Kudu Jenkins
Reviewed-by: Mike Percy <mp...@apache.org>
Project: http://git-wip-us.apache.org/repos/asf/kudu/repo
Commit: http://git-wip-us.apache.org/repos/asf/kudu/commit/7db4d82f
Tree: http://git-wip-us.apache.org/repos/asf/kudu/tree/7db4d82f
Diff: http://git-wip-us.apache.org/repos/asf/kudu/diff/7db4d82f
Branch: refs/heads/master
Commit: 7db4d82f4bcb98aba05853249fd4c84677c57227
Parents: ad62041
Author: Ferenc Szabó <sz...@apache.org>
Authored: Mon Aug 27 07:16:52 2018 +0200
Committer: Mike Percy <mp...@apache.org>
Committed: Fri Sep 28 18:43:34 2018 +0000
----------------------------------------------------------------------
.../org/apache/kudu/client/BaseKuduTest.java | 7 ++
.../org/apache/kudu/client/MiniKuduCluster.java | 7 ++
.../org/apache/kudu/flume/sink/KuduSink.java | 25 +++-
.../sink/KuduSinkConfigurationConstants.java | 15 +++
.../sink/AvroKuduOperationsProducerTest.java | 63 +++-------
.../sink/KeyedKuduOperationsProducerTest.java | 98 ++++------------
.../apache/kudu/flume/sink/KuduSinkTest.java | 64 ++---------
.../kudu/flume/sink/KuduSinkTestUtil.java | 85 ++++++++++++++
.../sink/RegexpKuduOperationsProducerTest.java | 113 +++++++-----------
.../kudu/flume/sink/SecureKuduSinkTest.java | 115 +++++++++++++++++++
src/kudu/mini-cluster/external_mini_cluster.cc | 3 +
src/kudu/security/test/mini_kdc.cc | 13 +++
src/kudu/security/test/mini_kdc.h | 4 +
13 files changed, 365 insertions(+), 247 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/kudu/blob/7db4d82f/java/kudu-client/src/test/java/org/apache/kudu/client/BaseKuduTest.java
----------------------------------------------------------------------
diff --git a/java/kudu-client/src/test/java/org/apache/kudu/client/BaseKuduTest.java b/java/kudu-client/src/test/java/org/apache/kudu/client/BaseKuduTest.java
index 959b92c..c410cf7 100644
--- a/java/kudu-client/src/test/java/org/apache/kudu/client/BaseKuduTest.java
+++ b/java/kudu-client/src/test/java/org/apache/kudu/client/BaseKuduTest.java
@@ -300,6 +300,13 @@ public class BaseKuduTest {
}
/**
+ * @return path to the mini cluster root directory
+ */
+ protected String getClusterRoot() {
+ return miniCluster.getClusterRoot();
+ }
+
+ /**
* Kills all the master servers.
* Does nothing to the servers that are already dead.
*
http://git-wip-us.apache.org/repos/asf/kudu/blob/7db4d82f/java/kudu-client/src/test/java/org/apache/kudu/client/MiniKuduCluster.java
----------------------------------------------------------------------
diff --git a/java/kudu-client/src/test/java/org/apache/kudu/client/MiniKuduCluster.java b/java/kudu-client/src/test/java/org/apache/kudu/client/MiniKuduCluster.java
index a372dfc..dfa5ea7 100644
--- a/java/kudu-client/src/test/java/org/apache/kudu/client/MiniKuduCluster.java
+++ b/java/kudu-client/src/test/java/org/apache/kudu/client/MiniKuduCluster.java
@@ -506,6 +506,13 @@ public class MiniKuduCluster implements AutoCloseable {
}
/**
+ * @return path to the mini cluster root directory
+ */
+ public String getClusterRoot() {
+ return clusterRoot;
+ }
+
+ /**
* Helper runnable that receives stderr and logs it along with the process' identifier.
*/
public static class ProcessInputStreamLogPrinterRunnable implements Runnable {
http://git-wip-us.apache.org/repos/asf/kudu/blob/7db4d82f/java/kudu-flume-sink/src/main/java/org/apache/kudu/flume/sink/KuduSink.java
----------------------------------------------------------------------
diff --git a/java/kudu-flume-sink/src/main/java/org/apache/kudu/flume/sink/KuduSink.java b/java/kudu-flume-sink/src/main/java/org/apache/kudu/flume/sink/KuduSink.java
index 105bc1e..f63f941 100644
--- a/java/kudu-flume-sink/src/main/java/org/apache/kudu/flume/sink/KuduSink.java
+++ b/java/kudu-flume-sink/src/main/java/org/apache/kudu/flume/sink/KuduSink.java
@@ -21,11 +21,15 @@ package org.apache.kudu.flume.sink;
import static org.apache.kudu.flume.sink.KuduSinkConfigurationConstants.BATCH_SIZE;
import static org.apache.kudu.flume.sink.KuduSinkConfigurationConstants.IGNORE_DUPLICATE_ROWS;
+import static org.apache.kudu.flume.sink.KuduSinkConfigurationConstants.KERBEROS_KEYTAB;
+import static org.apache.kudu.flume.sink.KuduSinkConfigurationConstants.KERBEROS_PRINCIPAL;
import static org.apache.kudu.flume.sink.KuduSinkConfigurationConstants.MASTER_ADDRESSES;
import static org.apache.kudu.flume.sink.KuduSinkConfigurationConstants.PRODUCER;
+import static org.apache.kudu.flume.sink.KuduSinkConfigurationConstants.PROXY_USER;
import static org.apache.kudu.flume.sink.KuduSinkConfigurationConstants.TABLE_NAME;
import static org.apache.kudu.flume.sink.KuduSinkConfigurationConstants.TIMEOUT_MILLIS;
+import java.security.PrivilegedAction;
import java.util.List;
import com.google.common.base.Preconditions;
@@ -35,6 +39,8 @@ import org.apache.flume.Event;
import org.apache.flume.EventDeliveryException;
import org.apache.flume.FlumeException;
import org.apache.flume.Transaction;
+import org.apache.flume.auth.FlumeAuthenticationUtil;
+import org.apache.flume.auth.PrivilegedExecutor;
import org.apache.flume.conf.Configurable;
import org.apache.flume.instrumentation.SinkCounter;
import org.apache.flume.sink.AbstractSink;
@@ -109,6 +115,7 @@ public class KuduSink extends AbstractSink implements Configurable {
private KuduClient client;
private KuduOperationsProducer operationsProducer;
private SinkCounter sinkCounter;
+ private PrivilegedExecutor privilegedExecutor;
public KuduSink() {
this(null);
@@ -127,7 +134,15 @@ public class KuduSink extends AbstractSink implements Configurable {
// Client is not null only inside tests.
if (client == null) {
- client = new KuduClient.KuduClientBuilder(masterAddresses).build();
+ // Creating client with FlumeAuthenticator.
+ client = privilegedExecutor.execute(
+ new PrivilegedAction<KuduClient>() {
+ @Override
+ public KuduClient run() {
+ return new KuduClient.KuduClientBuilder(masterAddresses).build();
+ }
+ }
+ );
}
session = client.newSession();
session.setFlushMode(SessionConfiguration.FlushMode.MANUAL_FLUSH);
@@ -194,6 +209,12 @@ public class KuduSink extends AbstractSink implements Configurable {
timeoutMillis = context.getLong(TIMEOUT_MILLIS, DEFAULT_TIMEOUT_MILLIS);
ignoreDuplicateRows = context.getBoolean(IGNORE_DUPLICATE_ROWS, DEFAULT_IGNORE_DUPLICATE_ROWS);
String operationProducerType = context.getString(PRODUCER);
+ String kerberosPrincipal = context.getString(KERBEROS_PRINCIPAL);
+ String kerberosKeytab = context.getString(KERBEROS_KEYTAB);
+ String proxyUser = context.getString(PROXY_USER);
+
+ privilegedExecutor = FlumeAuthenticationUtil.getAuthenticator(
+ kerberosPrincipal, kerberosKeytab).proxyAs(proxyUser);
// Check for operations producer, if null set default operations producer type.
if (operationProducerType == null || operationProducerType.isEmpty()) {
@@ -203,7 +224,7 @@ public class KuduSink extends AbstractSink implements Configurable {
Context producerContext = new Context();
producerContext.putAll(context.getSubProperties(
- KuduSinkConfigurationConstants.PRODUCER_PREFIX));
+ KuduSinkConfigurationConstants.PRODUCER_PREFIX));
try {
Class<? extends KuduOperationsProducer> clazz =
http://git-wip-us.apache.org/repos/asf/kudu/blob/7db4d82f/java/kudu-flume-sink/src/main/java/org/apache/kudu/flume/sink/KuduSinkConfigurationConstants.java
----------------------------------------------------------------------
diff --git a/java/kudu-flume-sink/src/main/java/org/apache/kudu/flume/sink/KuduSinkConfigurationConstants.java b/java/kudu-flume-sink/src/main/java/org/apache/kudu/flume/sink/KuduSinkConfigurationConstants.java
index d3b4fb6..dbb2f66 100644
--- a/java/kudu-flume-sink/src/main/java/org/apache/kudu/flume/sink/KuduSinkConfigurationConstants.java
+++ b/java/kudu-flume-sink/src/main/java/org/apache/kudu/flume/sink/KuduSinkConfigurationConstants.java
@@ -64,4 +64,19 @@ public class KuduSinkConfigurationConstants {
* Whether to ignore duplicate primary key errors caused by inserts.
*/
public static final String IGNORE_DUPLICATE_ROWS = "ignoreDuplicateRows";
+
+ /**
+ * Path to the keytab file used for authentication
+ */
+ public static final String KERBEROS_KEYTAB = "kerberosKeytab";
+
+ /**
+ * Kerberos principal used for authentication
+ */
+ public static final String KERBEROS_PRINCIPAL = "kerberosPrincipal";
+
+ /**
+ * The effective user if different from the kerberos principal
+ */
+ public static final String PROXY_USER = "proxyUser";
}
http://git-wip-us.apache.org/repos/asf/kudu/blob/7db4d82f/java/kudu-flume-sink/src/test/java/org/apache/kudu/flume/sink/AvroKuduOperationsProducerTest.java
----------------------------------------------------------------------
diff --git a/java/kudu-flume-sink/src/test/java/org/apache/kudu/flume/sink/AvroKuduOperationsProducerTest.java b/java/kudu-flume-sink/src/test/java/org/apache/kudu/flume/sink/AvroKuduOperationsProducerTest.java
index 7b20a2c..9f200b8 100644
--- a/java/kudu-flume-sink/src/test/java/org/apache/kudu/flume/sink/AvroKuduOperationsProducerTest.java
+++ b/java/kudu-flume-sink/src/test/java/org/apache/kudu/flume/sink/AvroKuduOperationsProducerTest.java
@@ -23,10 +23,8 @@ import static java.nio.charset.StandardCharsets.UTF_8;
import static org.apache.kudu.flume.sink.AvroKuduOperationsProducer.SCHEMA_LITERAL_HEADER;
import static org.apache.kudu.flume.sink.AvroKuduOperationsProducer.SCHEMA_PROP;
import static org.apache.kudu.flume.sink.AvroKuduOperationsProducer.SCHEMA_URL_HEADER;
-import static org.apache.kudu.flume.sink.KuduSinkConfigurationConstants.MASTER_ADDRESSES;
import static org.apache.kudu.flume.sink.KuduSinkConfigurationConstants.PRODUCER;
import static org.apache.kudu.flume.sink.KuduSinkConfigurationConstants.PRODUCER_PREFIX;
-import static org.apache.kudu.flume.sink.KuduSinkConfigurationConstants.TABLE_NAME;
import static org.apache.kudu.util.ClientTestUtil.scanTableToStrings;
import static org.junit.Assert.assertArrayEquals;
import static org.junit.Assert.assertEquals;
@@ -38,7 +36,6 @@ import java.net.URL;
import java.nio.file.Paths;
import java.util.ArrayList;
import java.util.Collections;
-import java.util.HashMap;
import java.util.List;
import com.google.common.collect.ImmutableList;
@@ -49,15 +46,10 @@ import org.apache.avro.io.DatumWriter;
import org.apache.avro.io.Encoder;
import org.apache.avro.io.EncoderFactory;
import org.apache.avro.specific.SpecificDatumWriter;
-import org.apache.flume.Channel;
import org.apache.flume.Context;
import org.apache.flume.Event;
-import org.apache.flume.Sink;
-import org.apache.flume.Transaction;
-import org.apache.flume.channel.MemoryChannel;
-import org.apache.flume.conf.Configurables;
import org.apache.flume.event.EventBuilder;
-import org.apache.kudu.util.DecimalUtil;
+
import org.junit.Test;
import org.apache.kudu.ColumnSchema;
@@ -66,6 +58,7 @@ import org.apache.kudu.Type;
import org.apache.kudu.client.BaseKuduTest;
import org.apache.kudu.client.CreateTableOptions;
import org.apache.kudu.client.KuduTable;
+import org.apache.kudu.util.DecimalUtil;
public class AvroKuduOperationsProducerTest extends BaseKuduTest {
private static String schemaUriString;
@@ -117,27 +110,13 @@ public class AvroKuduOperationsProducerTest extends BaseKuduTest {
KuduTable table = createNewTable(
String.format("test%sevents%s", eventCount, schemaLocation));
String tableName = table.getName();
- Context ctx = schemaLocation != SchemaLocation.GLOBAL ? new Context()
+ Context context = schemaLocation != SchemaLocation.GLOBAL ? new Context()
: new Context(ImmutableMap.of(PRODUCER_PREFIX + SCHEMA_PROP, schemaUriString));
- KuduSink sink = createSink(tableName, ctx);
-
- Channel channel = new MemoryChannel();
- Configurables.configure(channel, new Context());
- sink.setChannel(channel);
- sink.start();
-
- Transaction tx = channel.getTransaction();
- tx.begin();
- writeEventsToChannel(channel, eventCount, schemaLocation);
- tx.commit();
- tx.close();
-
- Sink.Status status = sink.process();
- if (eventCount == 0) {
- assertEquals("incorrect status for empty channel", status, Sink.Status.BACKOFF);
- } else {
- assertEquals("incorrect status for non-empty channel", status, Sink.Status.READY);
- }
+ context.put(PRODUCER, AvroKuduOperationsProducer.class.getName());
+
+ List<Event> events = generateEvents(eventCount, schemaLocation);
+
+ KuduSinkTestUtil.processEventsCreatingSink(syncClient, context, tableName, events);
List<String> answers = makeAnswers(eventCount);
List<String> rows = scanTableToStrings(table);
@@ -146,11 +125,12 @@ public class AvroKuduOperationsProducerTest extends BaseKuduTest {
}
private KuduTable createNewTable(String tableName) throws Exception {
- ArrayList<ColumnSchema> columns = new ArrayList<>(5);
+ List<ColumnSchema> columns = new ArrayList<>(5);
columns.add(new ColumnSchema.ColumnSchemaBuilder("key", Type.INT32).key(true).build());
columns.add(new ColumnSchema.ColumnSchemaBuilder("longField", Type.INT64).build());
columns.add(new ColumnSchema.ColumnSchemaBuilder("doubleField", Type.DOUBLE).build());
- columns.add(new ColumnSchema.ColumnSchemaBuilder("nullableField", Type.STRING).nullable(true).build());
+ columns.add(new ColumnSchema.ColumnSchemaBuilder("nullableField", Type.STRING)
+ .nullable(true).build());
columns.add(new ColumnSchema.ColumnSchemaBuilder("stringField", Type.STRING).build());
columns.add(new ColumnSchema.ColumnSchemaBuilder("decimalField", Type.DECIMAL)
.typeAttributes(DecimalUtil.typeAttributes(9, 1)).build());
@@ -160,21 +140,9 @@ public class AvroKuduOperationsProducerTest extends BaseKuduTest {
return createTable(tableName, new Schema(columns), createOptions);
}
- private KuduSink createSink(String tableName, Context ctx) {
- KuduSink sink = new KuduSink(syncClient);
- HashMap<String, String> parameters = new HashMap<>();
- parameters.put(TABLE_NAME, tableName);
- parameters.put(MASTER_ADDRESSES, getMasterAddressesAsString());
- parameters.put(PRODUCER, AvroKuduOperationsProducer.class.getName());
- Context context = new Context(parameters);
- context.putAll(ctx.getParameters());
- Configurables.configure(sink, context);
-
- return sink;
- }
-
- private void writeEventsToChannel(Channel channel, int eventCount,
- SchemaLocation schemaLocation) throws Exception {
+ private List<Event> generateEvents(int eventCount,
+ SchemaLocation schemaLocation) throws Exception {
+ List<Event> events = new ArrayList<>();
for (int i = 0; i < eventCount; i++) {
AvroKuduOperationsProducerTestRecord record = new AvroKuduOperationsProducerTestRecord();
record.setKey(10 * i);
@@ -195,8 +163,9 @@ public class AvroKuduOperationsProducerTest extends BaseKuduTest {
} else if (schemaLocation == SchemaLocation.LITERAL) {
e.setHeaders(ImmutableMap.of(SCHEMA_LITERAL_HEADER, schemaLiteral));
}
- channel.put(e);
+ events.add(e);
}
+ return events;
}
private List<String> makeAnswers(int eventCount) {
http://git-wip-us.apache.org/repos/asf/kudu/blob/7db4d82f/java/kudu-flume-sink/src/test/java/org/apache/kudu/flume/sink/KeyedKuduOperationsProducerTest.java
----------------------------------------------------------------------
diff --git a/java/kudu-flume-sink/src/test/java/org/apache/kudu/flume/sink/KeyedKuduOperationsProducerTest.java b/java/kudu-flume-sink/src/test/java/org/apache/kudu/flume/sink/KeyedKuduOperationsProducerTest.java
index b16a209..1940369 100644
--- a/java/kudu-flume-sink/src/test/java/org/apache/kudu/flume/sink/KeyedKuduOperationsProducerTest.java
+++ b/java/kudu-flume-sink/src/test/java/org/apache/kudu/flume/sink/KeyedKuduOperationsProducerTest.java
@@ -16,13 +16,12 @@
* specific language governing permissions and limitations
* under the License.
*/
+
package org.apache.kudu.flume.sink;
import static java.nio.charset.StandardCharsets.UTF_8;
-import static org.apache.kudu.flume.sink.KuduSinkConfigurationConstants.MASTER_ADDRESSES;
import static org.apache.kudu.flume.sink.KuduSinkConfigurationConstants.PRODUCER;
import static org.apache.kudu.flume.sink.KuduSinkConfigurationConstants.PRODUCER_PREFIX;
-import static org.apache.kudu.flume.sink.KuduSinkConfigurationConstants.TABLE_NAME;
import static org.apache.kudu.flume.sink.SimpleKeyedKuduOperationsProducer.KEY_COLUMN_DEFAULT;
import static org.apache.kudu.flume.sink.SimpleKeyedKuduOperationsProducer.OPERATION_PROP;
import static org.apache.kudu.flume.sink.SimpleKeyedKuduOperationsProducer.PAYLOAD_COLUMN_DEFAULT;
@@ -31,18 +30,12 @@ import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertTrue;
import java.util.ArrayList;
-import java.util.HashMap;
import java.util.List;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;
-import org.apache.flume.Channel;
import org.apache.flume.Context;
import org.apache.flume.Event;
-import org.apache.flume.Sink;
-import org.apache.flume.Transaction;
-import org.apache.flume.channel.MemoryChannel;
-import org.apache.flume.conf.Configurables;
import org.apache.flume.event.EventBuilder;
import org.junit.Test;
import org.slf4j.Logger;
@@ -69,7 +62,7 @@ public class KeyedKuduOperationsProducerTest extends BaseKuduTest {
new ColumnSchema.ColumnSchemaBuilder(PAYLOAD_COLUMN_DEFAULT, Type.BINARY)
.key(false).build());
CreateTableOptions createOptions =
- new CreateTableOptions().setRangePartitionColumns(ImmutableList.of(KEY_COLUMN_DEFAULT))
+ new CreateTableOptions().setRangePartitionColumns(ImmutableList.of(KEY_COLUMN_DEFAULT))
.setNumReplicas(1);
KuduTable table = createTable(tableName, new Schema(columns), createOptions);
@@ -114,29 +107,22 @@ public class KeyedKuduOperationsProducerTest extends BaseKuduTest {
KuduTable table = createNewTable("testDupUpsertEvents");
String tableName = table.getName();
- Context ctx = new Context(ImmutableMap.of(PRODUCER_PREFIX + OPERATION_PROP, "upsert"));
- KuduSink sink = createSink(tableName, ctx);
-
- Channel channel = new MemoryChannel();
- Configurables.configure(channel, new Context());
- sink.setChannel(channel);
+ Context ctx = new Context(ImmutableMap.of(
+ PRODUCER_PREFIX + OPERATION_PROP, "upsert",
+ PRODUCER, SimpleKeyedKuduOperationsProducer.class.getName()
+ ));
+ KuduSink sink = KuduSinkTestUtil.createSink(syncClient, tableName, ctx);
sink.start();
- Transaction tx = channel.getTransaction();
- tx.begin();
-
int numRows = 3;
+ List<Event> events = new ArrayList<>();
for (int i = 0; i < numRows; i++) {
Event e = EventBuilder.withBody(String.format("payload body %s", i), UTF_8);
e.setHeaders(ImmutableMap.of(KEY_COLUMN_DEFAULT, String.format("key %s", i)));
- channel.put(e);
+ events.add(e);
}
- tx.commit();
- tx.close();
-
- Sink.Status status = sink.process();
- assertTrue("incorrect status for non-empty channel", status != Sink.Status.BACKOFF);
+ KuduSinkTestUtil.processEvents(sink, events);
List<String> rows = scanTableToStrings(table);
assertEquals(numRows + " row(s) expected", numRows, rows.size());
@@ -145,18 +131,10 @@ public class KeyedKuduOperationsProducerTest extends BaseKuduTest {
assertTrue("incorrect payload", rows.get(i).contains("payload body " + i));
}
- Transaction utx = channel.getTransaction();
- utx.begin();
-
Event dup = EventBuilder.withBody("payload body upserted".getBytes(UTF_8));
dup.setHeaders(ImmutableMap.of("key", String.format("key %s", 0)));
- channel.put(dup);
- utx.commit();
- utx.close();
-
- Sink.Status upStatus = sink.process();
- assertTrue("incorrect status for non-empty channel", upStatus != Sink.Status.BACKOFF);
+ KuduSinkTestUtil.processEvents(sink, ImmutableList.of(dup));
List<String> upRows = scanTableToStrings(table);
assertEquals(numRows + " row(s) expected", numRows, upRows.size());
@@ -174,33 +152,14 @@ public class KeyedKuduOperationsProducerTest extends BaseKuduTest {
KuduTable table = createNewTable("test" + eventCount + "events" + operation);
String tableName = table.getName();
- Context ctx = new Context(ImmutableMap.of(PRODUCER_PREFIX + OPERATION_PROP, operation));
- KuduSink sink = createSink(tableName, ctx);
-
- Channel channel = new MemoryChannel();
- Configurables.configure(channel, new Context());
- sink.setChannel(channel);
- sink.start();
-
- Transaction tx = channel.getTransaction();
- tx.begin();
-
- for (int i = 0; i < eventCount; i++) {
- Event e = EventBuilder.withBody(String.format("payload body %s", i)
- .getBytes(UTF_8));
- e.setHeaders(ImmutableMap.of("key", String.format("key %s", i)));
- channel.put(e);
- }
+ Context context = new Context(ImmutableMap.of(
+ PRODUCER_PREFIX + OPERATION_PROP, operation,
+ PRODUCER, SimpleKeyedKuduOperationsProducer.class.getName()
+ ));
- tx.commit();
- tx.close();
+ List<Event> events = getEvents(eventCount);
- Sink.Status status = sink.process();
- if (eventCount == 0) {
- assertTrue("incorrect status for empty channel", status == Sink.Status.BACKOFF);
- } else {
- assertTrue("incorrect status for non-empty channel", status != Sink.Status.BACKOFF);
- }
+ KuduSinkTestUtil.processEventsCreatingSink(syncClient, context, tableName, events);
List<String> rows = scanTableToStrings(table);
assertEquals(eventCount + " row(s) expected", eventCount, rows.size());
@@ -212,20 +171,13 @@ public class KeyedKuduOperationsProducerTest extends BaseKuduTest {
LOG.info("Testing {} events finished successfully.", eventCount);
}
- private KuduSink createSink(String tableName, Context ctx) {
- LOG.info("Creating Kudu sink for '{}' table...", tableName);
-
- KuduSink sink = new KuduSink(syncClient);
- HashMap<String, String> parameters = new HashMap<>();
- parameters.put(TABLE_NAME, tableName);
- parameters.put(MASTER_ADDRESSES, getMasterAddressesAsString());
- parameters.put(PRODUCER, SimpleKeyedKuduOperationsProducer.class.getName());
- Context context = new Context(parameters);
- context.putAll(ctx.getParameters());
- Configurables.configure(sink, context);
-
- LOG.info("Created Kudu sink for '{}' table.", tableName);
-
- return sink;
+ private List<Event> getEvents(int eventCount) {
+ List<Event> events = new ArrayList<>();
+ for (int i = 0; i < eventCount; i++) {
+ Event e = EventBuilder.withBody(String.format("payload body %s", i).getBytes(UTF_8));
+ e.setHeaders(ImmutableMap.of("key", String.format("key %s", i)));
+ events.add(e);
+ }
+ return events;
}
}
http://git-wip-us.apache.org/repos/asf/kudu/blob/7db4d82f/java/kudu-flume-sink/src/test/java/org/apache/kudu/flume/sink/KuduSinkTest.java
----------------------------------------------------------------------
diff --git a/java/kudu-flume-sink/src/test/java/org/apache/kudu/flume/sink/KuduSinkTest.java b/java/kudu-flume-sink/src/test/java/org/apache/kudu/flume/sink/KuduSinkTest.java
index d4dce94..eb5f7c8 100644
--- a/java/kudu-flume-sink/src/test/java/org/apache/kudu/flume/sink/KuduSinkTest.java
+++ b/java/kudu-flume-sink/src/test/java/org/apache/kudu/flume/sink/KuduSinkTest.java
@@ -16,11 +16,13 @@
* specific language governing permissions and limitations
* under the License.
*/
+
package org.apache.kudu.flume.sink;
import static java.nio.charset.StandardCharsets.UTF_8;
import static org.apache.kudu.util.ClientTestUtil.scanTableToStrings;
import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertSame;
import static org.junit.Assert.assertTrue;
import static org.junit.Assert.fail;
@@ -35,8 +37,8 @@ import org.apache.flume.Event;
import org.apache.flume.EventDeliveryException;
import org.apache.flume.FlumeException;
import org.apache.flume.Sink;
+import org.apache.flume.Sink.Status;
import org.apache.flume.Transaction;
-import org.apache.flume.channel.MemoryChannel;
import org.apache.flume.conf.Configurables;
import org.apache.flume.event.EventBuilder;
import org.junit.Assert;
@@ -97,13 +99,10 @@ public class KuduSinkTest extends BaseKuduTest {
}
@Test(expected = FlumeException.class)
- public void testMissingTable() throws Exception {
+ public void testMissingTable() {
LOG.info("Testing missing table...");
- KuduSink sink = createSink("missingTable");
- Channel channel = new MemoryChannel();
- Configurables.configure(channel, new Context());
- sink.setChannel(channel);
+ KuduSink sink = KuduSinkTestUtil.createSink(syncClient, "missingTable", new Context());
sink.start();
LOG.info("Testing missing table finished successfully.");
@@ -140,12 +139,9 @@ public class KuduSinkTest extends BaseKuduTest {
Context sinkContext = new Context();
sinkContext.put(KuduSinkConfigurationConstants.IGNORE_DUPLICATE_ROWS,
Boolean.toString(ignoreDuplicateRows));
- KuduSink sink = createSink(tableName, sinkContext);
-
- Channel channel = new MemoryChannel();
- Configurables.configure(channel, new Context());
- sink.setChannel(channel);
+ KuduSink sink = KuduSinkTestUtil.createSink(syncClient, tableName, sinkContext);
sink.start();
+ Channel channel = sink.getChannel();
Transaction tx = channel.getTransaction();
tx.begin();
@@ -163,7 +159,7 @@ public class KuduSinkTest extends BaseKuduTest {
if (!ignoreDuplicateRows) {
fail("Incorrectly ignored duplicate rows!");
}
- assertTrue("incorrect status for empty channel", status == Sink.Status.READY);
+ assertSame("incorrect status for empty channel", status, Status.READY);
} catch (EventDeliveryException e) {
if (ignoreDuplicateRows) {
throw new AssertionError("Failed to ignore duplicate rows!", e);
@@ -189,31 +185,15 @@ public class KuduSinkTest extends BaseKuduTest {
KuduTable table = createNewTable("test" + eventCount + "events");
String tableName = table.getName();
- KuduSink sink = createSink(tableName);
-
- Channel channel = new MemoryChannel();
- Configurables.configure(channel, new Context());
- sink.setChannel(channel);
- sink.start();
- Transaction tx = channel.getTransaction();
- tx.begin();
+ List<Event> events = new ArrayList<>();
for (int i = 0; i < eventCount; i++) {
- Event e = EventBuilder.withBody(String.format("payload body %s", i)
- .getBytes(UTF_8));
- channel.put(e);
+ Event e = EventBuilder.withBody(String.format("payload body %s", i).getBytes(UTF_8));
+ events.add(e);
}
- tx.commit();
- tx.close();
-
- Sink.Status status = sink.process();
- if (eventCount == 0) {
- assertTrue("incorrect status for empty channel", status == Sink.Status.BACKOFF);
- } else {
- assertTrue("incorrect status for non-empty channel", status != Sink.Status.BACKOFF);
- }
+ KuduSinkTestUtil.processEventsCreatingSink(syncClient, new Context(), tableName, events);
List<String> rows = scanTableToStrings(table);
assertEquals(eventCount + " row(s) expected", eventCount, rows.size());
@@ -224,24 +204,4 @@ public class KuduSinkTest extends BaseKuduTest {
LOG.info("Testing {} events finished successfully.", eventCount);
}
-
- private KuduSink createSink(String tableName) {
- return createSink(tableName, new Context());
- }
-
- private KuduSink createSink(String tableName, Context ctx) {
- LOG.info("Creating Kudu sink for '{}' table...", tableName);
-
- KuduSink sink = new KuduSink(syncClient);
- HashMap<String, String> parameters = new HashMap<>();
- parameters.put(KuduSinkConfigurationConstants.TABLE_NAME, tableName);
- parameters.put(KuduSinkConfigurationConstants.MASTER_ADDRESSES, getMasterAddressesAsString());
- Context context = new Context(parameters);
- context.putAll(ctx.getParameters());
- Configurables.configure(sink, context);
-
- LOG.info("Created Kudu sink for '{}' table.", tableName);
-
- return sink;
- }
}
http://git-wip-us.apache.org/repos/asf/kudu/blob/7db4d82f/java/kudu-flume-sink/src/test/java/org/apache/kudu/flume/sink/KuduSinkTestUtil.java
----------------------------------------------------------------------
diff --git a/java/kudu-flume-sink/src/test/java/org/apache/kudu/flume/sink/KuduSinkTestUtil.java b/java/kudu-flume-sink/src/test/java/org/apache/kudu/flume/sink/KuduSinkTestUtil.java
new file mode 100644
index 0000000..e24bfe0
--- /dev/null
+++ b/java/kudu-flume-sink/src/test/java/org/apache/kudu/flume/sink/KuduSinkTestUtil.java
@@ -0,0 +1,85 @@
+package org.apache.kudu.flume.sink;
+
+import static org.apache.kudu.flume.sink.KuduSinkConfigurationConstants.KERBEROS_KEYTAB;
+import static org.apache.kudu.flume.sink.KuduSinkConfigurationConstants.KERBEROS_PRINCIPAL;
+import static org.apache.kudu.flume.sink.KuduSinkConfigurationConstants.MASTER_ADDRESSES;
+import static org.apache.kudu.flume.sink.KuduSinkConfigurationConstants.TABLE_NAME;
+import static org.junit.Assert.assertNotSame;
+import static org.junit.Assert.assertSame;
+
+import java.util.List;
+
+import org.apache.flume.Channel;
+import org.apache.flume.Context;
+import org.apache.flume.Event;
+import org.apache.flume.EventDeliveryException;
+import org.apache.flume.Sink.Status;
+import org.apache.flume.Transaction;
+import org.apache.flume.channel.MemoryChannel;
+import org.apache.flume.conf.Configurables;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.kudu.client.KuduClient;
+
+class KuduSinkTestUtil {
+ private static final Logger LOG = LoggerFactory.getLogger(KuduSinkTestUtil.class);
+
+ static KuduSink createSink(KuduClient client, String tableName, Context ctx) {
+ return createSink(tableName, client, ctx, client.getMasterAddressesAsString());
+ }
+
+ private static KuduSink createSink(
+ String tableName, KuduClient client, Context ctx, String masterAddresses) {
+ LOG.info("Creating Kudu sink for '{}' table...", tableName);
+
+ Context context = new Context();
+ context.put(TABLE_NAME, tableName);
+ context.put(MASTER_ADDRESSES, masterAddresses);
+ context.putAll(ctx.getParameters());
+ KuduSink sink = new KuduSink(client);
+ Configurables.configure(sink, context);
+ Channel channel = new MemoryChannel();
+ Configurables.configure(channel, new Context());
+ sink.setChannel(channel);
+
+ LOG.info("Created Kudu sink for '{}' table.", tableName);
+
+ return sink;
+ }
+
+ static KuduSink createSecureSink(String tableName, String masterAddresses, String clusterRoot) {
+ Context context = new Context();
+ context.put(KERBEROS_KEYTAB, clusterRoot + "/krb5kdc/test-user.keytab");
+ context.put(KERBEROS_PRINCIPAL, "test-user@KRBTEST.COM");
+
+ return createSink(tableName, null, context, masterAddresses);
+ }
+
+ static void processEventsCreatingSink(
+ KuduClient syncClient, Context context, String tableName, List<Event> events
+ ) throws EventDeliveryException {
+ KuduSink sink = createSink(syncClient, tableName, context);
+ sink.start();
+ processEvents(sink, events);
+ }
+
+ static void processEvents(KuduSink sink, List<Event> events) throws EventDeliveryException {
+ Channel channel = sink.getChannel();
+ Transaction tx = channel.getTransaction();
+ tx.begin();
+ for (Event e : events) {
+ channel.put(e);
+ }
+ tx.commit();
+ tx.close();
+
+ Status status = sink.process();
+ if (events.isEmpty()) {
+ assertSame("incorrect status for empty channel", status, Status.BACKOFF);
+ } else {
+ assertNotSame("incorrect status for non-empty channel", status, Status.BACKOFF);
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/kudu/blob/7db4d82f/java/kudu-flume-sink/src/test/java/org/apache/kudu/flume/sink/RegexpKuduOperationsProducerTest.java
----------------------------------------------------------------------
diff --git a/java/kudu-flume-sink/src/test/java/org/apache/kudu/flume/sink/RegexpKuduOperationsProducerTest.java b/java/kudu-flume-sink/src/test/java/org/apache/kudu/flume/sink/RegexpKuduOperationsProducerTest.java
index 8b4c3df..cadfa2e 100644
--- a/java/kudu-flume-sink/src/test/java/org/apache/kudu/flume/sink/RegexpKuduOperationsProducerTest.java
+++ b/java/kudu-flume-sink/src/test/java/org/apache/kudu/flume/sink/RegexpKuduOperationsProducerTest.java
@@ -16,34 +16,25 @@
* specific language governing permissions and limitations
* under the License.
*/
+
package org.apache.kudu.flume.sink;
import static java.nio.charset.StandardCharsets.UTF_8;
-import static org.apache.kudu.flume.sink.KuduSinkConfigurationConstants.MASTER_ADDRESSES;
import static org.apache.kudu.flume.sink.KuduSinkConfigurationConstants.PRODUCER;
import static org.apache.kudu.flume.sink.KuduSinkConfigurationConstants.PRODUCER_PREFIX;
-import static org.apache.kudu.flume.sink.KuduSinkConfigurationConstants.TABLE_NAME;
import static org.apache.kudu.flume.sink.RegexpKuduOperationsProducer.OPERATION_PROP;
import static org.apache.kudu.flume.sink.RegexpKuduOperationsProducer.PATTERN_PROP;
import static org.apache.kudu.util.ClientTestUtil.scanTableToStrings;
import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertTrue;
import java.util.ArrayList;
import java.util.Collections;
-import java.util.HashMap;
import java.util.List;
import com.google.common.collect.ImmutableList;
-import org.apache.flume.Channel;
import org.apache.flume.Context;
import org.apache.flume.Event;
-import org.apache.flume.Sink;
-import org.apache.flume.Transaction;
-import org.apache.flume.channel.MemoryChannel;
-import org.apache.flume.conf.Configurables;
import org.apache.flume.event.EventBuilder;
-import org.apache.kudu.util.DecimalUtil;
import org.junit.Test;
import org.apache.kudu.ColumnSchema;
@@ -52,6 +43,7 @@ import org.apache.kudu.Type;
import org.apache.kudu.client.BaseKuduTest;
import org.apache.kudu.client.CreateTableOptions;
import org.apache.kudu.client.KuduTable;
+import org.apache.kudu.util.DecimalUtil;
public class RegexpKuduOperationsProducerTest extends BaseKuduTest {
private static final String TEST_REGEXP =
@@ -75,8 +67,7 @@ public class RegexpKuduOperationsProducerTest extends BaseKuduTest {
.typeAttributes(DecimalUtil.typeAttributes(9, 1)).build());
CreateTableOptions createOptions =
new CreateTableOptions().addHashPartitions(ImmutableList.of("key"), 3).setNumReplicas(1);
- KuduTable table = createTable(tableName, new Schema(columns), createOptions);
- return table;
+ return createTable(tableName, new Schema(columns), createOptions);
}
@Test
@@ -117,16 +108,42 @@ public class RegexpKuduOperationsProducerTest extends BaseKuduTest {
private void testEvents(int eventCount, int perEventRowCount, String operation) throws Exception {
String tableName = String.format("test%sevents%srowseach%s",
eventCount, perEventRowCount, operation);
+ Context context = new Context();
+ context.put(PRODUCER_PREFIX + PATTERN_PROP, TEST_REGEXP);
+ context.put(PRODUCER_PREFIX + OPERATION_PROP, operation);
+ context.put(PRODUCER, RegexpKuduOperationsProducer.class.getName());
KuduTable table = createNewTable(tableName);
- KuduSink sink = createSink(tableName, operation);
- Channel channel = new MemoryChannel();
- Configurables.configure(channel, new Context());
- sink.setChannel(channel);
- sink.start();
+ List<Event> events = generateEvents(eventCount, perEventRowCount, operation);
- Transaction tx = channel.getTransaction();
- tx.begin();
+ KuduSinkTestUtil.processEventsCreatingSink(syncClient, context, tableName, events);
+
+ List<String> rows = scanTableToStrings(table);
+ assertEquals(eventCount * perEventRowCount + " row(s) expected",
+ eventCount * perEventRowCount,
+ rows.size());
+
+ ArrayList<String> rightAnswers = new ArrayList<>(eventCount * perEventRowCount);
+ for (int i = 0; i < eventCount; i++) {
+ for (int j = 0; j < perEventRowCount; j++) {
+ int value = operation.equals("upsert") && i == 0 ? 1 : i;
+ String baseAnswer = "INT32 key=1%2$d%3$d1, INT8 byteFld=%1$d, INT16 shortFld=%1$d, " +
+ "INT32 intFld=%1$d, INT64 longFld=%1$d, BINARY binaryFld=\"binary\", " +
+ "STRING stringFld=string, BOOL boolFld=false, FLOAT floatFld=%1$d.%1$d, " +
+ "DOUBLE doubleFld=%1$d.%1$d, DECIMAL decimalFld(9, 1)=%1$d.%1$d";
+ String rightAnswer = String.format(baseAnswer, value, i, j);
+ rightAnswers.add(rightAnswer);
+ }
+ }
+ Collections.sort(rightAnswers);
+
+ for (int k = 0; k < eventCount * perEventRowCount; k++) {
+ assertEquals("incorrect row", rightAnswers.get(k), rows.get(k));
+ }
+ }
+
+ private List<Event> generateEvents(int eventCount, int perEventRowCount, String operation) {
+ List<Event> events = new ArrayList<>();
for (int i = 0; i < eventCount; i++) {
StringBuilder payload = new StringBuilder();
@@ -137,7 +154,7 @@ public class RegexpKuduOperationsProducerTest extends BaseKuduTest {
payload.append(row);
}
Event e = EventBuilder.withBody(payload.toString().getBytes(UTF_8));
- channel.put(e);
+ events.add(e);
}
if (eventCount > 0) {
@@ -151,7 +168,7 @@ public class RegexpKuduOperationsProducerTest extends BaseKuduTest {
upserts.append(row);
}
Event e = EventBuilder.withBody(upserts.toString().getBytes(UTF_8));
- channel.put(e);
+ events.add(e);
}
// Also check some bad/corner cases.
@@ -160,59 +177,9 @@ public class RegexpKuduOperationsProducerTest extends BaseKuduTest {
String[] testCases = {mismatchInInt, emptyString};
for (String testCase : testCases) {
Event e = EventBuilder.withBody(testCase.getBytes(UTF_8));
- channel.put(e);
- }
- }
-
- tx.commit();
- tx.close();
-
- Sink.Status status = sink.process();
- if (eventCount == 0) {
- assertTrue("incorrect status for empty channel", status == Sink.Status.BACKOFF);
- } else {
- assertTrue("incorrect status for non-empty channel", status != Sink.Status.BACKOFF);
- }
-
- List<String> rows = scanTableToStrings(table);
- assertEquals(eventCount * perEventRowCount + " row(s) expected",
- eventCount * perEventRowCount,
- rows.size());
-
- ArrayList<String> rightAnswers = new ArrayList<>(eventCount * perEventRowCount);
- for (int i = 0; i < eventCount; i++) {
- for (int j = 0; j < perEventRowCount; j++) {
- int value = operation.equals("upsert") && i == 0 ? 1 : i;
- String baseAnswer = "INT32 key=1%2$d%3$d1, INT8 byteFld=%1$d, INT16 shortFld=%1$d, " +
- "INT32 intFld=%1$d, INT64 longFld=%1$d, BINARY binaryFld=\"binary\", " +
- "STRING stringFld=string, BOOL boolFld=false, FLOAT floatFld=%1$d.%1$d, " +
- "DOUBLE doubleFld=%1$d.%1$d, DECIMAL decimalFld(9, 1)=%1$d.%1$d";
- String rightAnswer = String.format(baseAnswer, value, i, j);
- rightAnswers.add(rightAnswer);
+ events.add(e);
}
}
- Collections.sort(rightAnswers);
-
- for (int k = 0; k < eventCount * perEventRowCount; k++) {
- assertEquals("incorrect row", rightAnswers.get(k), rows.get(k));
- }
- }
-
- private KuduSink createSink(String tableName, String operation) {
- return createSink(tableName, new Context(), operation);
- }
-
- private KuduSink createSink(String tableName, Context ctx, String operation) {
- KuduSink sink = new KuduSink(syncClient);
- HashMap<String, String> parameters = new HashMap<>();
- parameters.put(TABLE_NAME, tableName);
- parameters.put(MASTER_ADDRESSES, getMasterAddressesAsString());
- parameters.put(PRODUCER, RegexpKuduOperationsProducer.class.getName());
- parameters.put(PRODUCER_PREFIX + PATTERN_PROP, TEST_REGEXP);
- parameters.put(PRODUCER_PREFIX + OPERATION_PROP, operation);
- Context context = new Context(parameters);
- context.putAll(ctx.getParameters());
- Configurables.configure(sink, context);
- return sink;
+ return events;
}
}
http://git-wip-us.apache.org/repos/asf/kudu/blob/7db4d82f/java/kudu-flume-sink/src/test/java/org/apache/kudu/flume/sink/SecureKuduSinkTest.java
----------------------------------------------------------------------
diff --git a/java/kudu-flume-sink/src/test/java/org/apache/kudu/flume/sink/SecureKuduSinkTest.java b/java/kudu-flume-sink/src/test/java/org/apache/kudu/flume/sink/SecureKuduSinkTest.java
new file mode 100644
index 0000000..7fbfcef
--- /dev/null
+++ b/java/kudu-flume-sink/src/test/java/org/apache/kudu/flume/sink/SecureKuduSinkTest.java
@@ -0,0 +1,115 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.kudu.flume.sink;
+
+import static java.nio.charset.StandardCharsets.UTF_8;
+import static org.apache.kudu.util.ClientTestUtil.scanTableToStrings;
+import static org.apache.kudu.util.SecurityUtil.KUDU_TICKETCACHE_PROPERTY;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.concurrent.TimeUnit;
+
+import com.google.common.collect.ImmutableList;
+import org.apache.flume.Event;
+import org.apache.flume.EventDeliveryException;
+import org.apache.flume.event.EventBuilder;
+import org.junit.Before;
+import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.kudu.ColumnSchema;
+import org.apache.kudu.Schema;
+import org.apache.kudu.Type;
+import org.apache.kudu.client.BaseKuduTest;
+import org.apache.kudu.client.CreateTableOptions;
+import org.apache.kudu.client.KuduTable;
+import org.apache.kudu.client.MiniKuduCluster.MiniKuduClusterBuilder;
+
+public class SecureKuduSinkTest extends BaseKuduTest {
+ private static final Logger LOG = LoggerFactory.getLogger(SecureKuduSinkTest.class);
+ private static final int TICKET_LIFETIME_SECONDS = 10;
+ private static final int RENEWABLE_LIFETIME_SECONDS = 30;
+
+ @Before
+ public void clearTicketCacheProperty() {
+ // Let Flume authenticate.
+ System.clearProperty(KUDU_TICKETCACHE_PROPERTY);
+ }
+
+ @Override
+ protected MiniKuduClusterBuilder getMiniClusterBuilder() {
+ return super.getMiniClusterBuilder()
+ .kdcTicketLifetime(TICKET_LIFETIME_SECONDS + "s")
+ .kdcRenewLifetime(RENEWABLE_LIFETIME_SECONDS + "s")
+ .enableKerberos();
+ }
+
+ @Test
+ public void testEventsWithShortTickets() throws Exception {
+ LOG.info("Creating new table...");
+ ArrayList<ColumnSchema> columns = new ArrayList<>(1);
+ columns.add(new ColumnSchema.ColumnSchemaBuilder("payload", Type.BINARY).key(true).build());
+ CreateTableOptions createOptions =
+ new CreateTableOptions().setRangePartitionColumns(ImmutableList.of("payload"))
+ .setNumReplicas(1);
+ String tableName = "test_long_lived_events";
+ KuduTable table = createTable(tableName, new Schema(columns), createOptions);
+ LOG.info("Created new table.");
+
+ KuduSink sink = KuduSinkTestUtil.createSecureSink(
+ tableName, getMasterAddressesAsString(), getClusterRoot());
+ sink.start();
+
+ LOG.info("Testing events at the beginning.");
+ int eventCount = 10;
+
+ processEvents(sink, 0, eventCount / 2);
+
+ LOG.info("Waiting for tickets to expire");
+ TimeUnit.SECONDS.sleep(RENEWABLE_LIFETIME_SECONDS * 2);
+
+ LOG.info("Testing events after ticket renewal.");
+ processEvents(sink, eventCount / 2, eventCount);
+
+ List<String> rows = scanTableToStrings(table);
+ assertEquals(eventCount + " row(s) expected", eventCount, rows.size());
+
+ for (int i = 0; i < eventCount; i++) {
+ assertTrue("incorrect payload", rows.get(i).contains("payload body " + i));
+ }
+
+ LOG.info("Testing {} events finished successfully.", eventCount);
+ }
+
+ private void processEvents(KuduSink sink, int from, int to) throws EventDeliveryException {
+ List<Event> events = new ArrayList<>();
+ for (int i = from; i < to; i++) {
+ Event e = EventBuilder.withBody(String.format("payload body %s", i).getBytes(UTF_8));
+ events.add(e);
+ }
+
+ KuduSinkTestUtil.processEvents(sink, events);
+ LOG.info("Events flushed.");
+ }
+}
http://git-wip-us.apache.org/repos/asf/kudu/blob/7db4d82f/src/kudu/mini-cluster/external_mini_cluster.cc
----------------------------------------------------------------------
diff --git a/src/kudu/mini-cluster/external_mini_cluster.cc b/src/kudu/mini-cluster/external_mini_cluster.cc
index 76472fe..414956d 100644
--- a/src/kudu/mini-cluster/external_mini_cluster.cc
+++ b/src/kudu/mini-cluster/external_mini_cluster.cc
@@ -183,6 +183,9 @@ Status ExternalMiniCluster::Start() {
RETURN_NOT_OK_PREPEND(kdc_->CreateUserPrincipal("joe-interloper"),
"could not create unauthorized principal");
+ RETURN_NOT_OK_PREPEND(kdc_->CreateKeytabForExistingPrincipal("test-user"),
+ "could not create client keytab");
+
RETURN_NOT_OK_PREPEND(kdc_->Kinit("test-admin"),
"could not kinit as admin");
http://git-wip-us.apache.org/repos/asf/kudu/blob/7db4d82f/src/kudu/security/test/mini_kdc.cc
----------------------------------------------------------------------
diff --git a/src/kudu/security/test/mini_kdc.cc b/src/kudu/security/test/mini_kdc.cc
index 904695a..f4151e6 100644
--- a/src/kudu/security/test/mini_kdc.cc
+++ b/src/kudu/security/test/mini_kdc.cc
@@ -272,6 +272,19 @@ Status MiniKdc::CreateServiceKeytab(const string& spn,
return Status::OK();
}
+Status MiniKdc::CreateKeytabForExistingPrincipal(const string& spn) {
+ SCOPED_LOG_SLOW_EXECUTION(WARNING, 100, Substitute("creating keytab for $0", spn));
+ string kt_path = spn;
+ StripString(&kt_path, "/", '_');
+ kt_path = JoinPathSegments(options_.data_root, kt_path) + ".keytab";
+
+ string kadmin;
+ RETURN_NOT_OK(GetBinaryPath("kadmin.local", &kadmin));
+ RETURN_NOT_OK(Subprocess::Call(MakeArgv({
+ kadmin, "-q", Substitute("xst -norandkey -k $0 $1", kt_path, spn)})));
+ return Status::OK();
+}
+
Status MiniKdc::Kinit(const string& username) {
SCOPED_LOG_SLOW_EXECUTION(WARNING, 100, Substitute("kinit for $0", username));
string kinit;
http://git-wip-us.apache.org/repos/asf/kudu/blob/7db4d82f/src/kudu/security/test/mini_kdc.h
----------------------------------------------------------------------
diff --git a/src/kudu/security/test/mini_kdc.h b/src/kudu/security/test/mini_kdc.h
index e282cc4..95e7848 100644
--- a/src/kudu/security/test/mini_kdc.h
+++ b/src/kudu/security/test/mini_kdc.h
@@ -92,6 +92,10 @@ class MiniKdc {
// will be reset and a new keytab will be generated.
Status CreateServiceKeytab(const std::string& spn, std::string* path);
+ // Creates a keytab for an existing principal.
+ // 'spn' is the desired service principal name (e.g. "kudu/foo.example.com").
+ Status CreateKeytabForExistingPrincipal(const std::string& spn);
+
// Kinit a user to the mini KDC.
Status Kinit(const std::string& username) WARN_UNUSED_RESULT;