You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kudu.apache.org by gr...@apache.org on 2018/10/17 20:43:06 UTC
[2/5] kudu git commit: KUDU-2411: (Part 1) Break out existing test
utilities into a seperate module
http://git-wip-us.apache.org/repos/asf/kudu/blob/15f1416f/java/kudu-test-utils/src/main/java/org/apache/kudu/test/ClientTestUtil.java
----------------------------------------------------------------------
diff --git a/java/kudu-test-utils/src/main/java/org/apache/kudu/test/ClientTestUtil.java b/java/kudu-test-utils/src/main/java/org/apache/kudu/test/ClientTestUtil.java
new file mode 100644
index 0000000..9b67a9b
--- /dev/null
+++ b/java/kudu-test-utils/src/main/java/org/apache/kudu/test/ClientTestUtil.java
@@ -0,0 +1,376 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.kudu.test;
+
+import com.google.common.collect.ImmutableList;
+import com.google.common.collect.Iterators;
+import com.google.common.collect.Lists;
+import com.stumbleupon.async.Callback;
+import com.stumbleupon.async.Deferred;
+import org.apache.kudu.ColumnSchema;
+import org.apache.kudu.ColumnTypeAttributes;
+import org.apache.kudu.Schema;
+import org.apache.kudu.Type;
+import org.apache.kudu.client.AsyncKuduClient;
+import org.apache.kudu.client.AsyncKuduScanner;
+import org.apache.kudu.client.AsyncKuduSession;
+import org.apache.kudu.client.CreateTableOptions;
+import org.apache.kudu.client.Insert;
+import org.apache.kudu.client.KuduClient;
+import org.apache.kudu.client.KuduException;
+import org.apache.kudu.client.KuduPredicate;
+import org.apache.kudu.client.KuduScanToken;
+import org.apache.kudu.client.KuduScanner;
+import org.apache.kudu.client.KuduSession;
+import org.apache.kudu.client.KuduTable;
+import org.apache.kudu.client.PartialRow;
+import org.apache.kudu.client.RowResult;
+import org.apache.kudu.client.RowResultIterator;
+import org.apache.kudu.util.DecimalUtil;
+import org.apache.yetus.audience.InterfaceAudience;
+import org.apache.yetus.audience.InterfaceStability;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+import java.util.concurrent.atomic.AtomicInteger;
+
+/**
+ * Utilities useful for cluster testing.
+ */
+@InterfaceAudience.Private
+@InterfaceStability.Unstable
+public abstract class ClientTestUtil {
+
+ private static final Logger LOG = LoggerFactory.getLogger(ClientTestUtil.class);
+
+ public static final Callback<Object, Object> defaultErrorCB = new Callback<Object, Object>() {
+ @Override
+ public Object call(Object arg) throws Exception {
+ if (arg == null) {
+ return null;
+ }
+ if (arg instanceof Exception) {
+ LOG.warn("Got exception", (Exception) arg);
+ } else {
+ LOG.warn("Got an error response back {}", arg);
+ }
+ return new Exception("cannot recover from error: " + arg);
+ }
+ };
+
+ /**
+ * Counts the rows from the {@code scanner} until exhaustion. It doesn't require the scanner to
+ * be new, so it can be used to finish scanning a previously-started scan.
+ */
+ public static int countRowsInScan(AsyncKuduScanner scanner, long timeoutMs) throws Exception {
+ final AtomicInteger counter = new AtomicInteger();
+
+ Callback<Object, RowResultIterator> cb = new Callback<Object, RowResultIterator>() {
+ @Override
+ public Object call(RowResultIterator arg) throws Exception {
+ if (arg == null) return null;
+ counter.addAndGet(arg.getNumRows());
+ return null;
+ }
+ };
+
+ while (scanner.hasMoreRows()) {
+ Deferred<RowResultIterator> data = scanner.nextRows();
+ data.addCallbacks(cb, defaultErrorCB);
+ data.join(timeoutMs);
+ }
+ return counter.get();
+ }
+
+ /**
+ * Same as {@link #countRowsInScan(AsyncKuduScanner, long)}, but defaults the timeout to 60
+ * seconds.
+ */
+ public static int countRowsInScan(AsyncKuduScanner scanner) throws Exception {
+ return countRowsInScan(scanner, 60000);
+ }
+
+ public static int countRowsInScan(KuduScanner scanner) throws KuduException {
+ int counter = 0;
+ while (scanner.hasMoreRows()) {
+ counter += scanner.nextRows().getNumRows();
+ }
+ return counter;
+ }
+
+ /**
+ * Scans the table and returns the number of rows.
+ * @param table the table
+ * @param predicates optional predicates to apply to the scan
+ * @return the number of rows in the table matching the predicates
+ */
+ public static long countRowsInTable(KuduTable table, KuduPredicate... predicates) throws KuduException {
+ KuduScanner.KuduScannerBuilder scanBuilder =
+ table.getAsyncClient().syncClient().newScannerBuilder(table);
+ for (KuduPredicate predicate : predicates) {
+ scanBuilder.addPredicate(predicate);
+ }
+ scanBuilder.setProjectedColumnIndexes(ImmutableList.<Integer>of());
+ return countRowsInScan(scanBuilder.build());
+ }
+
+ /**
+ * Counts the rows in the provided scan tokens.
+ */
+ public static int countScanTokenRows(List<KuduScanToken> tokens, final String masterAddresses,
+ final long operationTimeoutMs)
+ throws IOException, InterruptedException {
+ final AtomicInteger count = new AtomicInteger(0);
+ List<Thread> threads = new ArrayList<>();
+ for (final KuduScanToken token : tokens) {
+ final byte[] serializedToken = token.serialize();
+ Thread thread = new Thread(new Runnable() {
+ @Override
+ public void run() {
+ try (KuduClient contextClient = new KuduClient.KuduClientBuilder(masterAddresses)
+ .defaultAdminOperationTimeoutMs(operationTimeoutMs)
+ .build()) {
+ KuduScanner scanner = KuduScanToken.deserializeIntoScanner(serializedToken, contextClient);
+ try {
+ int localCount = 0;
+ while (scanner.hasMoreRows()) {
+ localCount += Iterators.size(scanner.nextRows());
+ }
+ count.addAndGet(localCount);
+ } finally {
+ scanner.close();
+ }
+ } catch (Exception e) {
+ LOG.error("exception in parallel token scanner", e);
+ }
+ }
+ });
+ thread.run();
+ threads.add(thread);
+ }
+
+ for (Thread thread : threads) {
+ thread.join();
+ }
+ return count.get();
+ }
+
+ public static List<String> scanTableToStrings(KuduTable table,
+ KuduPredicate... predicates) throws Exception {
+ List<String> rowStrings = Lists.newArrayList();
+ KuduScanner.KuduScannerBuilder scanBuilder =
+ table.getAsyncClient().syncClient().newScannerBuilder(table);
+ for (KuduPredicate predicate : predicates) {
+ scanBuilder.addPredicate(predicate);
+ }
+ KuduScanner scanner = scanBuilder.build();
+ while (scanner.hasMoreRows()) {
+ RowResultIterator rows = scanner.nextRows();
+ for (RowResult r : rows) {
+ rowStrings.add(r.rowToString());
+ }
+ }
+ Collections.sort(rowStrings);
+ return rowStrings;
+ }
+
+ public static Schema getSchemaWithAllTypes() {
+ List<ColumnSchema> columns =
+ ImmutableList.of(
+ new ColumnSchema.ColumnSchemaBuilder("int8", Type.INT8).key(true).build(),
+ new ColumnSchema.ColumnSchemaBuilder("int16", Type.INT16).build(),
+ new ColumnSchema.ColumnSchemaBuilder("int32", Type.INT32).build(),
+ new ColumnSchema.ColumnSchemaBuilder("int64", Type.INT64).build(),
+ new ColumnSchema.ColumnSchemaBuilder("bool", Type.BOOL).build(),
+ new ColumnSchema.ColumnSchemaBuilder("float", Type.FLOAT).build(),
+ new ColumnSchema.ColumnSchemaBuilder("double", Type.DOUBLE).build(),
+ new ColumnSchema.ColumnSchemaBuilder("string", Type.STRING).build(),
+ new ColumnSchema.ColumnSchemaBuilder("binary-array", Type.BINARY).build(),
+ new ColumnSchema.ColumnSchemaBuilder("binary-bytebuffer", Type.BINARY).build(),
+ new ColumnSchema.ColumnSchemaBuilder("null", Type.STRING).nullable(true).build(),
+ new ColumnSchema.ColumnSchemaBuilder("timestamp", Type.UNIXTIME_MICROS).build(),
+ new ColumnSchema.ColumnSchemaBuilder("decimal", Type.DECIMAL)
+ .typeAttributes(DecimalUtil.typeAttributes(5, 3)).build());
+
+ return new Schema(columns);
+ }
+
+ public static CreateTableOptions getAllTypesCreateTableOptions() {
+ return new CreateTableOptions().setRangePartitionColumns(ImmutableList.of("int8"));
+ }
+
+ public static Schema getBasicSchema() {
+ ArrayList<ColumnSchema> columns = new ArrayList<>(5);
+ columns.add(new ColumnSchema.ColumnSchemaBuilder("key", Type.INT32).key(true).build());
+ columns.add(new ColumnSchema.ColumnSchemaBuilder("column1_i", Type.INT32).build());
+ columns.add(new ColumnSchema.ColumnSchemaBuilder("column2_i", Type.INT32).build());
+ columns.add(new ColumnSchema.ColumnSchemaBuilder("column3_s", Type.STRING)
+ .nullable(true)
+ .desiredBlockSize(4096)
+ .encoding(ColumnSchema.Encoding.DICT_ENCODING)
+ .compressionAlgorithm(ColumnSchema.CompressionAlgorithm.LZ4)
+ .build());
+ columns.add(new ColumnSchema.ColumnSchemaBuilder("column4_b", Type.BOOL).build());
+ return new Schema(columns);
+ }
+
+ public static CreateTableOptions getBasicCreateTableOptions() {
+ return new CreateTableOptions().setRangePartitionColumns(ImmutableList.of("key"));
+ }
+
+ /**
+ * Creates table options with non-covering range partitioning for a table with
+ * the basic schema. Range partition key ranges fall between the following values:
+ *
+ * [ 0, 50)
+ * [ 50, 100)
+ * [200, 300)
+ */
+ public static CreateTableOptions getBasicTableOptionsWithNonCoveredRange() {
+ Schema schema = getBasicSchema();
+ CreateTableOptions option = new CreateTableOptions();
+ option.setRangePartitionColumns(ImmutableList.of("key"));
+
+ PartialRow aLowerBound = schema.newPartialRow();
+ aLowerBound.addInt("key", 0);
+ PartialRow aUpperBound = schema.newPartialRow();
+ aUpperBound.addInt("key", 100);
+ option.addRangePartition(aLowerBound, aUpperBound);
+
+ PartialRow bLowerBound = schema.newPartialRow();
+ bLowerBound.addInt("key", 200);
+ PartialRow bUpperBound = schema.newPartialRow();
+ bUpperBound.addInt("key", 300);
+ option.addRangePartition(bLowerBound, bUpperBound);
+
+ PartialRow split = schema.newPartialRow();
+ split.addInt("key", 50);
+ option.addSplitRow(split);
+ return option;
+ }
+
+ /**
+ * A generic helper function to create a table with default test options.
+ */
+ public static KuduTable createDefaultTable(KuduClient client, String tableName) throws KuduException {
+ return client.createTable(tableName, getBasicSchema(), getBasicCreateTableOptions());
+ }
+
+ /**
+ * Load a table of default schema with the specified number of records, in ascending key order.
+ */
+ public static void loadDefaultTable(KuduClient client, String tableName, int numRows)
+ throws KuduException {
+ KuduTable table = client.openTable(tableName);
+ KuduSession session = client.newSession();
+ for (int i = 0; i < numRows; i++) {
+ Insert insert = createBasicSchemaInsert(table, i);
+ session.apply(insert);
+ }
+ session.flush();
+ session.close();
+ }
+
+ public static Insert createBasicSchemaInsert(KuduTable table, int key) {
+ Insert insert = table.newInsert();
+ PartialRow row = insert.getRow();
+ row.addInt(0, key);
+ row.addInt(1, 2);
+ row.addInt(2, 3);
+ row.addString(3, "a string");
+ row.addBoolean(4, true);
+ return insert;
+ }
+
+ public static KuduTable createFourTabletsTableWithNineRows(AsyncKuduClient client,
+ String tableName,
+ final long timeoutMs)
+ throws Exception {
+ final int[] KEYS = new int[] { 10, 20, 30 };
+ final Schema basicSchema = getBasicSchema();
+ CreateTableOptions builder = getBasicCreateTableOptions();
+ for (int i : KEYS) {
+ PartialRow splitRow = basicSchema.newPartialRow();
+ splitRow.addInt(0, i);
+ builder.addSplitRow(splitRow);
+ }
+ KuduTable table = client.syncClient().createTable(tableName, basicSchema, builder);
+ AsyncKuduSession session = client.newSession();
+
+ // create a table with on empty tablet and 3 tablets of 3 rows each
+ for (int key1 : KEYS) {
+ for (int key2 = 1; key2 <= 3; key2++) {
+ Insert insert = table.newInsert();
+ PartialRow row = insert.getRow();
+ row.addInt(0, key1 + key2);
+ row.addInt(1, key1);
+ row.addInt(2, key2);
+ row.addString(3, "a string");
+ row.addBoolean(4, true);
+ session.apply(insert).join(timeoutMs);
+ }
+ }
+ session.close().join(timeoutMs);
+ return table;
+ }
+
+ public static Schema createManyStringsSchema() {
+ ArrayList<ColumnSchema> columns = new ArrayList<ColumnSchema>(4);
+ columns.add(new ColumnSchema.ColumnSchemaBuilder("key", Type.STRING).key(true).build());
+ columns.add(new ColumnSchema.ColumnSchemaBuilder("c1", Type.STRING).build());
+ columns.add(new ColumnSchema.ColumnSchemaBuilder("c2", Type.STRING).build());
+ columns.add(new ColumnSchema.ColumnSchemaBuilder("c3", Type.STRING).nullable(true).build());
+ columns.add(new ColumnSchema.ColumnSchemaBuilder("c4", Type.STRING).nullable(true).build());
+ return new Schema(columns);
+ }
+
+ public static Schema createSchemaWithBinaryColumns() {
+ ArrayList<ColumnSchema> columns = new ArrayList<ColumnSchema>();
+ columns.add(new ColumnSchema.ColumnSchemaBuilder("key", Type.BINARY).key(true).build());
+ columns.add(new ColumnSchema.ColumnSchemaBuilder("c1", Type.STRING).build());
+ columns.add(new ColumnSchema.ColumnSchemaBuilder("c2", Type.DOUBLE).build());
+ columns.add(new ColumnSchema.ColumnSchemaBuilder("c3", Type.BINARY).nullable(true).build());
+ return new Schema(columns);
+ }
+
+ public static Schema createSchemaWithTimestampColumns() {
+ ArrayList<ColumnSchema> columns = new ArrayList<ColumnSchema>();
+ columns.add(new ColumnSchema.ColumnSchemaBuilder("key", Type.UNIXTIME_MICROS).key(true).build());
+ columns.add(new ColumnSchema.ColumnSchemaBuilder("c1", Type.UNIXTIME_MICROS).nullable(true).build());
+ return new Schema(columns);
+ }
+
+ public static Schema createSchemaWithDecimalColumns() {
+ ArrayList<ColumnSchema> columns = new ArrayList<ColumnSchema>();
+ columns.add(new ColumnSchema.ColumnSchemaBuilder("key", Type.DECIMAL).key(true)
+ .typeAttributes(
+ new ColumnTypeAttributes.ColumnTypeAttributesBuilder()
+ .precision(DecimalUtil.MAX_DECIMAL64_PRECISION).build()
+ ).build());
+ columns.add(new ColumnSchema.ColumnSchemaBuilder("c1", Type.DECIMAL).nullable(true)
+ .typeAttributes(
+ new ColumnTypeAttributes.ColumnTypeAttributesBuilder()
+ .precision(DecimalUtil.MAX_DECIMAL128_PRECISION).build()
+ ).build());
+ return new Schema(columns);
+ }
+}
http://git-wip-us.apache.org/repos/asf/kudu/blob/15f1416f/java/kudu-test-utils/src/main/java/org/apache/kudu/test/KuduTestHarness.java
----------------------------------------------------------------------
diff --git a/java/kudu-test-utils/src/main/java/org/apache/kudu/test/KuduTestHarness.java b/java/kudu-test-utils/src/main/java/org/apache/kudu/test/KuduTestHarness.java
new file mode 100644
index 0000000..172989e
--- /dev/null
+++ b/java/kudu-test-utils/src/main/java/org/apache/kudu/test/KuduTestHarness.java
@@ -0,0 +1,444 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+package org.apache.kudu.test;
+
+import com.google.common.base.Stopwatch;
+import com.stumbleupon.async.Deferred;
+import org.apache.kudu.Common;
+import org.apache.kudu.client.AsyncKuduClient;
+import org.apache.kudu.client.AsyncKuduClient.AsyncKuduClientBuilder;
+import org.apache.kudu.client.DeadlineTracker;
+import org.apache.kudu.client.HostAndPort;
+import org.apache.kudu.client.KuduClient;
+import org.apache.kudu.client.KuduException;
+import org.apache.kudu.client.KuduTable;
+import org.apache.kudu.client.LocatedTablet;
+import org.apache.kudu.client.RemoteTablet;
+import org.apache.kudu.master.Master;
+import org.apache.kudu.test.cluster.MiniKuduCluster;
+import org.apache.kudu.test.cluster.MiniKuduCluster.MiniKuduClusterBuilder;
+import org.apache.kudu.test.cluster.FakeDNS;
+import org.apache.kudu.test.junit.RetryRule;
+import org.apache.yetus.audience.InterfaceAudience;
+import org.apache.yetus.audience.InterfaceStability;
+import org.junit.rules.ExternalResource;
+import org.junit.runner.Description;
+import org.junit.runners.model.Statement;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.lang.annotation.ElementType;
+import java.lang.annotation.Retention;
+import java.lang.annotation.RetentionPolicy;
+import java.lang.annotation.Target;
+import java.util.List;
+import java.util.Random;
+import java.util.concurrent.TimeUnit;
+
+import static org.junit.Assert.fail;
+
+/**
+ * A Junit Rule that manages a Kudu cluster and clients for testing.
+ * This rule also includes utility methods for the cluster
+ * and clients.
+ *
+ * <pre>
+ * public static class TestFoo {
+ *
+ * @Rule
+ * public KuduTestHarness harness = new KuduTestHarness();
+ *
+ * ...
+ * }
+ * </pre>
+ */
+@InterfaceAudience.Private
+@InterfaceStability.Unstable
+public class KuduTestHarness extends ExternalResource {
+
+ private static final Logger LOG = LoggerFactory.getLogger(KuduTestHarness.class);
+
+ private static final int NUM_MASTER_SERVERS = 3;
+ private static final int NUM_TABLET_SERVERS = 3;
+
+ // Default timeout/sleep interval for various client operations,
+ // waiting for various jobs/threads to complete, etc.
+ public static final int DEFAULT_SLEEP = 50000;
+
+ private final Random randomForTSRestart = RandomUtils.getRandom();
+
+ private MiniKuduClusterBuilder clusterBuilder;
+ private MiniKuduCluster miniCluster;
+
+ // We create both versions of the asyncClient for ease of use.
+ private AsyncKuduClient asyncClient;
+ private KuduClient client;
+
+ public KuduTestHarness(final MiniKuduClusterBuilder clusterBuilder) {
+ this.clusterBuilder = clusterBuilder;
+ }
+
+ public KuduTestHarness() {
+ this.clusterBuilder = getBaseClusterBuilder();
+ }
+
+ /**
+ * Returns the base MiniKuduClusterBuilder used when creating a
+ * KuduTestHarness with the default constructor. This is useful
+ * if you want to add to the default cluster setup.
+ */
+ public static MiniKuduClusterBuilder getBaseClusterBuilder() {
+ return new MiniKuduClusterBuilder()
+ .numMasterServers(NUM_MASTER_SERVERS)
+ .numTabletServers(NUM_TABLET_SERVERS);
+ }
+
+ @Override
+ public Statement apply(Statement base, Description description) {
+ // Set any master server flags defined in the method level annotation.
+ MasterServerConfig masterServerConfig = description.getAnnotation(MasterServerConfig.class);
+ if (masterServerConfig != null) {
+ for (String flag : masterServerConfig.flags()) {
+ clusterBuilder.addMasterServerFlag(flag);
+ }
+ }
+ // Set any tablet server flags defined in the method level annotation.
+ TabletServerConfig tabletServerConfig = description.getAnnotation(TabletServerConfig.class);
+ if (tabletServerConfig != null) {
+ for (String flag : tabletServerConfig.flags()) {
+ clusterBuilder.addTabletServerFlag(flag);
+ }
+ }
+
+ // Generate the ExternalResource Statement.
+ Statement statement = super.apply(base, description);
+ // Wrap in the RetryRule to rerun flaky tests.
+ return new RetryRule().apply(statement, description);
+ }
+
+ @Override
+ public void before() throws Exception {
+ FakeDNS.getInstance().install();
+ LOG.info("Creating a new MiniKuduCluster...");
+ miniCluster = clusterBuilder.build();
+ LOG.info("Creating a new Kudu client...");
+ asyncClient = new AsyncKuduClientBuilder(miniCluster.getMasterAddressesAsString())
+ .defaultAdminOperationTimeoutMs(DEFAULT_SLEEP)
+ .build();
+ client = asyncClient.syncClient();
+ }
+
+ @Override
+ public void after() {
+ try {
+ if (client != null) {
+ client.shutdown();
+ // No need to explicitly shutdown the async client,
+ // shutting down the sync client effectively does that.
+ }
+ } catch (KuduException e) {
+ LOG.warn("Error while shutting down the test client");
+ } finally {
+ if (miniCluster != null) {
+ miniCluster.shutdown();
+ }
+ }
+ }
+
+ public KuduClient getClient() {
+ return client;
+ }
+
+ public AsyncKuduClient getAsyncClient() {
+ return asyncClient;
+ }
+
+ /**
+ * Helper method to easily kill a tablet server that serves the given table's only tablet's
+ * leader. The currently running test case will be failed if there's more than one tablet,
+ * if the tablet has no leader after some retries, or if the tablet server was already killed.
+ *
+ * This method is thread-safe.
+ * @param table a KuduTable which will get its single tablet's leader killed.
+ * @throws Exception
+ */
+ public void killTabletLeader(KuduTable table) throws Exception {
+ List<LocatedTablet> tablets = table.getTabletsLocations(DEFAULT_SLEEP);
+ if (tablets.isEmpty() || tablets.size() > 1) {
+ fail("Currently only support killing leaders for tables containing 1 tablet, table " +
+ table.getName() + " has " + tablets.size());
+ }
+ LocatedTablet tablet = tablets.get(0);
+ if (tablet.getReplicas().size() == 1) {
+ fail("Table " + table.getName() + " only has 1 tablet, please enable replication");
+ }
+
+ HostAndPort hp = findLeaderTabletServer(tablet);
+ miniCluster.killTabletServer(hp);
+ }
+
+ /**
+ * Helper method to kill a tablet server that serves the given tablet's
+ * leader. The currently running test case will be failed if the tablet has no
+ * leader after some retries, or if the tablet server was already killed.
+ *
+ * This method is thread-safe.
+ * @param tablet a RemoteTablet which will get its leader killed
+ * @throws Exception
+ */
+ public void killTabletLeader(RemoteTablet tablet) throws Exception {
+ killTabletLeader(new LocatedTablet(tablet));
+ }
+
+ /**
+ * Helper method to kill a tablet server that serves the given tablet's
+ * leader. The currently running test case will be failed if the tablet has no
+ * leader after some retries, or if the tablet server was already killed.
+ *
+ * This method is thread-safe.
+ * @param tablet a LocatedTablet which will get its leader killed
+ * @throws Exception
+ */
+ public void killTabletLeader(LocatedTablet tablet) throws Exception {
+ HostAndPort hp = findLeaderTabletServer(tablet);
+ miniCluster.killTabletServer(hp);
+ }
+
+ /**
+ * Finds the RPC port of the given tablet's leader tserver.
+ * @param tablet a LocatedTablet
+ * @return the host and port of the given tablet's leader tserver
+ * @throws Exception if we are unable to find the leader tserver
+ */
+ public HostAndPort findLeaderTabletServer(LocatedTablet tablet)
+ throws Exception {
+ LocatedTablet.Replica leader = null;
+ DeadlineTracker deadlineTracker = new DeadlineTracker();
+ deadlineTracker.setDeadline(DEFAULT_SLEEP);
+ while (leader == null) {
+ if (deadlineTracker.timedOut()) {
+ fail("Timed out while trying to find a leader for this table");
+ }
+
+ leader = tablet.getLeaderReplica();
+ if (leader == null) {
+ LOG.info("Sleeping while waiting for a tablet LEADER to arise, currently slept {} ms",
+ deadlineTracker.getElapsedMillis());
+ Thread.sleep(50);
+ }
+ }
+ return new HostAndPort(leader.getRpcHost(), leader.getRpcPort());
+ }
+
+ /**
+ * Helper method to easily kill the leader master.
+ *
+ * This method is thread-safe.
+ * @throws Exception if there is an error finding or killing the leader master.
+ */
+ public void killLeaderMasterServer() throws Exception {
+ HostAndPort hp = findLeaderMasterServer();
+ miniCluster.killMasterServer(hp);
+ }
+
+ /**
+ * Find the host and port of the leader master.
+ * @return the host and port of the leader master
+ * @throws Exception if we are unable to find the leader master
+ */
+ public HostAndPort findLeaderMasterServer() throws Exception {
+ Stopwatch sw = Stopwatch.createStarted();
+ while (sw.elapsed(TimeUnit.MILLISECONDS) < DEFAULT_SLEEP) {
+ Deferred<Master.GetTableLocationsResponsePB> masterLocD =
+ asyncClient.getMasterTableLocationsPB(null);
+ Master.GetTableLocationsResponsePB r = masterLocD.join(DEFAULT_SLEEP);
+ Common.HostPortPB pb = r.getTabletLocations(0)
+ .getReplicas(0)
+ .getTsInfo()
+ .getRpcAddresses(0);
+ if (pb.getPort() != -1) {
+ return new HostAndPort(pb.getHost(), pb.getPort());
+ }
+ }
+ throw new IOException(String.format("No leader master found after %d ms", DEFAULT_SLEEP));
+ }
+
+ /**
+ * Picks at random a tablet server that serves tablets from the passed table and restarts it.
+ * @param table table to query for a TS to restart
+ * @throws Exception
+ */
+ public void restartTabletServer(KuduTable table) throws Exception {
+ List<LocatedTablet> tablets = table.getTabletsLocations(DEFAULT_SLEEP);
+ if (tablets.isEmpty()) {
+ fail("Table " + table.getName() + " doesn't have any tablets");
+ }
+
+ LocatedTablet tablet = tablets.get(0);
+ LocatedTablet.Replica replica =
+ tablet.getReplicas().get(randomForTSRestart.nextInt(tablet.getReplicas().size()));
+ HostAndPort hp = new HostAndPort(replica.getRpcHost(), replica.getRpcPort());
+ miniCluster.killTabletServer(hp);
+ miniCluster.startTabletServer(hp);
+ }
+
+ /**
+ * Kills a tablet server that serves the given tablet's leader and restarts it.
+ * @param tablet a RemoteTablet which will get its leader killed and restarted
+ * @throws Exception
+ */
+ public void restartTabletServer(RemoteTablet tablet) throws Exception {
+ HostAndPort hp = findLeaderTabletServer(new LocatedTablet(tablet));
+ miniCluster.killTabletServer(hp);
+ miniCluster.startTabletServer(hp);
+ }
+
+ /**
+ * Kills and restarts the leader master.
+ * @throws Exception
+ */
+ public void restartLeaderMaster() throws Exception {
+ HostAndPort hp = findLeaderMasterServer();
+ miniCluster.killMasterServer(hp);
+ miniCluster.startMasterServer(hp);
+ }
+
+ /**
+ * Return the comma-separated list of "host:port" pairs that describes the master
+ * config for this cluster.
+ * @return The master config string.
+ */
+ public String getMasterAddressesAsString() {
+ return miniCluster.getMasterAddressesAsString();
+ }
+
+ /**
+ * @return the list of master servers
+ */
+ public List<HostAndPort> getMasterServers() {
+ return miniCluster.getMasterServers();
+ }
+
+ /**
+ * @return the list of tablet servers
+ */
+ public List<HostAndPort> getTabletServers() {
+ return miniCluster.getMasterServers();
+ }
+
+ /**
+ * @return path to the mini cluster root directory
+ */
+ public String getClusterRoot() {
+ return miniCluster.getClusterRoot();
+ }
+
+ /**
+ * Kills all the master servers.
+ * Does nothing to the servers that are already dead.
+ *
+ * @throws IOException
+ */
+ public void killAllMasterServers() throws IOException {
+ miniCluster.killAllMasterServers();
+ }
+
+ /**
+ * Starts all the master servers.
+ * Does nothing to the servers that are already running.
+ *
+ * @throws IOException
+ */
+ public void startAllMasterServers() throws IOException {
+ miniCluster.startAllMasterServers();
+ }
+
+ /**
+ * Kills all the tablet servers.
+ * Does nothing to the servers that are already dead.
+ *
+ * @throws IOException
+ */
+ public void killAllTabletServers() throws IOException {
+ miniCluster.killAllTabletServers();
+ }
+
+ /**
+ * Starts all the tablet servers.
+ * Does nothing to the servers that are already running.
+ *
+ * @throws IOException
+ */
+ public void startAllTabletServers() throws IOException {
+ miniCluster.startAllTabletServers();
+ }
+
+ /**
+ * Removes all credentials for all principals from the Kerberos credential cache.
+ */
+ public void kdestroy() throws IOException {
+ miniCluster.kdestroy();
+ }
+
+ /**
+ * Re-initialize Kerberos credentials for the given username, writing them
+ * into the Kerberos credential cache.
+ * @param username the username to kinit as
+ */
+ public void kinit(String username) throws IOException {
+ miniCluster.kinit(username);
+ }
+
+ /**
+ * Resets the clients so that their state is completely fresh, including meta
+ * cache, connections, open tables, sessions and scanners, and propagated timestamp.
+ */
+ public void resetClients() throws IOException {
+ client.shutdown();
+ asyncClient = new AsyncKuduClientBuilder(miniCluster.getMasterAddressesAsString())
+ .defaultAdminOperationTimeoutMs(DEFAULT_SLEEP)
+ .build();
+ client = asyncClient.syncClient();
+ }
+
+ /**
+ * An annotation that can be added to each test method to
+ * define additional master server flags to be used when
+ * creating the test cluster.
+ *
+ * ex: @MasterServerConfig(flags = { "key1=valA", "key2=valB" })
+ */
+ @Retention(RetentionPolicy.RUNTIME)
+ @Target({ElementType.METHOD})
+ public @interface MasterServerConfig {
+ String[] flags();
+ }
+
+ /**
+ * An annotation that can be added to each test method to
+ * define additional tablet server flags to be used when
+ * creating the test cluster.
+ *
+ * ex: @TabletServerConfig(flags = { "key1=valA", "key2=valB" })
+ */
+ @Retention(RetentionPolicy.RUNTIME)
+ @Target({ElementType.METHOD})
+ public @interface TabletServerConfig {
+ String[] flags();
+ }
+}
http://git-wip-us.apache.org/repos/asf/kudu/blob/15f1416f/java/kudu-test-utils/src/main/java/org/apache/kudu/test/ProtobufUtils.java
----------------------------------------------------------------------
diff --git a/java/kudu-test-utils/src/main/java/org/apache/kudu/test/ProtobufUtils.java b/java/kudu-test-utils/src/main/java/org/apache/kudu/test/ProtobufUtils.java
new file mode 100644
index 0000000..5be17c2
--- /dev/null
+++ b/java/kudu-test-utils/src/main/java/org/apache/kudu/test/ProtobufUtils.java
@@ -0,0 +1,64 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+package org.apache.kudu.test;
+
+import com.google.protobuf.ByteString;
+import org.apache.yetus.audience.InterfaceAudience;
+import org.apache.yetus.audience.InterfaceStability;
+
+import org.apache.kudu.Common;
+import org.apache.kudu.consensus.Metadata;
+import org.apache.kudu.master.Master;
+
+@InterfaceAudience.Private
+@InterfaceStability.Unstable
+public class ProtobufUtils {
+
+ /**
+ * Get a PartitionPB with empty start and end keys.
+ * @return a fake partition
+ */
+ public static Common.PartitionPB.Builder getFakePartitionPB() {
+ Common.PartitionPB.Builder partition = Common.PartitionPB.newBuilder();
+ partition.setPartitionKeyStart(ByteString.EMPTY);
+ partition.setPartitionKeyEnd(ByteString.EMPTY);
+ return partition;
+ }
+
+ /**
+ * Create a ReplicaPB based on the passed information.
+ * @param uuid server's identifier
+ * @param host server's hostname
+ * @param port server's port
+ * @param role server's role in the configuration
+ * @return a fake ReplicaPB
+ */
+ public static Master.TabletLocationsPB.ReplicaPB.Builder getFakeTabletReplicaPB(
+ String uuid, String host, int port, Metadata.RaftPeerPB.Role role) {
+ Master.TSInfoPB.Builder tsInfoBuilder = Master.TSInfoPB.newBuilder();
+ Common.HostPortPB.Builder hostBuilder = Common.HostPortPB.newBuilder();
+ hostBuilder.setHost(host);
+ hostBuilder.setPort(port);
+ tsInfoBuilder.addRpcAddresses(hostBuilder);
+ tsInfoBuilder.setPermanentUuid(ByteString.copyFromUtf8(uuid));
+ Master.TabletLocationsPB.ReplicaPB.Builder replicaBuilder =
+ Master.TabletLocationsPB.ReplicaPB.newBuilder();
+ replicaBuilder.setTsInfo(tsInfoBuilder);
+ replicaBuilder.setRole(role);
+ return replicaBuilder;
+ }
+}
http://git-wip-us.apache.org/repos/asf/kudu/blob/15f1416f/java/kudu-test-utils/src/main/java/org/apache/kudu/test/RandomUtils.java
----------------------------------------------------------------------
diff --git a/java/kudu-test-utils/src/main/java/org/apache/kudu/test/RandomUtils.java b/java/kudu-test-utils/src/main/java/org/apache/kudu/test/RandomUtils.java
new file mode 100644
index 0000000..0328e15
--- /dev/null
+++ b/java/kudu-test-utils/src/main/java/org/apache/kudu/test/RandomUtils.java
@@ -0,0 +1,49 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+package org.apache.kudu.test;
+
+import org.apache.yetus.audience.InterfaceAudience;
+import org.apache.yetus.audience.InterfaceStability;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.Random;
+
+@InterfaceAudience.Private
+@InterfaceStability.Unstable
+public class RandomUtils {
+ private static final Logger LOG = LoggerFactory.getLogger(RandomUtils.class);
+
+ private static final String TEST_RANDOM_SEED_PROP = "testRandomSeed";
+
+ /**
+ * Get an instance of Random for use in tests and logs the seed used.
+ *
+ * Uses a default seed of System.currentTimeMillis() with the option to
+ * override via the testRandomSeed system property.
+ */
+ public static Random getRandom() {
+ // First check the system property.
+ long seed = System.currentTimeMillis();
+ if (System.getProperty(TEST_RANDOM_SEED_PROP) != null) {
+ seed = Long.parseLong(System.getProperty(TEST_RANDOM_SEED_PROP));
+ LOG.info("System property {} is defined. Overriding random seed.", TEST_RANDOM_SEED_PROP, seed);
+ }
+ LOG.info("Using random seed: {}", seed);
+ return new Random(seed);
+ }
+}
http://git-wip-us.apache.org/repos/asf/kudu/blob/15f1416f/java/kudu-test-utils/src/main/java/org/apache/kudu/test/cluster/FakeDNS.java
----------------------------------------------------------------------
diff --git a/java/kudu-test-utils/src/main/java/org/apache/kudu/test/cluster/FakeDNS.java b/java/kudu-test-utils/src/main/java/org/apache/kudu/test/cluster/FakeDNS.java
new file mode 100644
index 0000000..ca0fcbd
--- /dev/null
+++ b/java/kudu-test-utils/src/main/java/org/apache/kudu/test/cluster/FakeDNS.java
@@ -0,0 +1,192 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.kudu.test.cluster;
+
+import java.lang.reflect.Field;
+import java.lang.reflect.InvocationHandler;
+import java.lang.reflect.Method;
+import java.lang.reflect.Proxy;
+import java.net.InetAddress;
+import java.net.UnknownHostException;
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.Map;
+import javax.annotation.concurrent.GuardedBy;
+
+import com.google.common.base.Throwables;
+import com.google.common.net.InetAddresses;
+import org.apache.yetus.audience.InterfaceAudience;
+import org.apache.yetus.audience.InterfaceStability;
+
+/**
+ * Fake DNS resolver which allows our tests to work well even though we use
+ * strange loopback IP addresses (127.x.y.z) with no corresponding reverse
+ * DNS.
+ *
+ * This overrides the reverse lookups for such IPs to return the same address
+ * in String form.
+ *
+ * Without this class, reverse DNS lookups for such addresses often take
+ * 5 seconds to return, causing timeouts and overall test slowness.
+ *
+ * In the future this class might also be extended to test more interesting
+ * DNS-related scenarios.
+ */
+@InterfaceAudience.Private
+@InterfaceStability.Unstable
+public class FakeDNS {
+ static FakeDNS instance = new FakeDNS();
+
+ @GuardedBy("this")
+ private Map<String, InetAddress> forwardResolutions = new HashMap<>();
+
+ @GuardedBy("this")
+ private Map<InetAddress, String> reverseResolutions = new HashMap<>();
+
+ /** whether the fake resolver has been installed */
+ @GuardedBy("this")
+ private boolean installed = false;
+
+ private FakeDNS() {}
+ public static FakeDNS getInstance() {
+ return instance;
+ }
+
+ public synchronized void addForwardResolution(String hostname, InetAddress ip) {
+ forwardResolutions.put(hostname, ip);
+ }
+
+ public synchronized void addReverseResolution(InetAddress ip, String hostname) {
+ reverseResolutions.put(ip, hostname);
+ }
+
+ /**
+ * Install the fake DNS resolver into the Java runtime.
+ */
+ public synchronized void install() {
+ if (installed) return;
+ try {
+ try {
+ // Override the NameService in Java 9 or later.
+ final Class<?> nameServiceInterface = Class.forName("java.net.InetAddress$NameService");
+ Field field = InetAddress.class.getDeclaredField("nameService");
+ // Get the default NameService to fallback to.
+ Method method = InetAddress.class.getDeclaredMethod("createNameService");
+ method.setAccessible(true);
+ Object fallbackNameService = method.invoke(null);
+ // Create a proxy instance to set on the InetAddress field which will handle
+ // all NameService calls.
+ Object proxy = Proxy.newProxyInstance(nameServiceInterface.getClassLoader(),
+ new Class<?>[]{nameServiceInterface}, new NameServiceListener(fallbackNameService));
+ field.setAccessible(true);
+ field.set(InetAddress.class, proxy);
+ } catch (final ClassNotFoundException | NoSuchFieldException e) {
+ // Override the NameService in Java 8 or earlier.
+ final Class<?> nameServiceInterface = Class.forName("sun.net.spi.nameservice.NameService");
+ Field field = InetAddress.class.getDeclaredField("nameServices");
+ // Get the default NameService to fallback to.
+ Method method = InetAddress.class.getDeclaredMethod("createNSProvider", String.class);
+ method.setAccessible(true);
+ Object fallbackNameService = method.invoke(null, "default");
+ // Create a proxy instance to set on the InetAddress field which will handle
+ // all NameService calls.
+ Object proxy = Proxy.newProxyInstance(nameServiceInterface.getClassLoader(),
+ new Class<?>[]{nameServiceInterface}, new NameServiceListener(fallbackNameService));
+ field.setAccessible(true);
+ // Java 8 or earlier takes a list of NameServices
+ field.set(InetAddress.class, Arrays.asList(proxy));
+ }
+ } catch (Exception e) {
+ throw new RuntimeException(e);
+ }
+ installed = true;
+ }
+
+ /**
+ * The NameService in all versions of Java has the same interface, so we
+ * can use the same InvocationHandler as our proxy instance for both
+ * java.net.InetAddress$NameService and sun.net.spi.nameservice.NameService.
+ */
+ private class NameServiceListener implements InvocationHandler {
+
+ private final Object fallbackNameService;
+
+ // Creates a NameServiceListener with a NameService implementation to
+ // fallback to. The parameter is untyped so we can handle the NameService
+ // type in all versions of Java with reflection.
+ NameServiceListener(Object fallbackNameService) {
+ this.fallbackNameService = fallbackNameService;
+ }
+
+ private InetAddress[] lookupAllHostAddr(String host) throws UnknownHostException {
+ InetAddress inetAddress;
+ synchronized(FakeDNS.this) {
+ inetAddress = forwardResolutions.get(host);
+ }
+ if (inetAddress != null) {
+ return new InetAddress[]{inetAddress};
+ }
+
+ try {
+ Method method = fallbackNameService.getClass()
+ .getDeclaredMethod("lookupAllHostAddr", String.class);
+ method.setAccessible(true);
+ return (InetAddress[]) method.invoke(fallbackNameService, host);
+ } catch (ReflectiveOperationException e) {
+ Throwables.propagateIfPossible(e.getCause(), UnknownHostException.class);
+ throw new AssertionError("unexpected reflection issue", e);
+ }
+ }
+
+ private String getHostByAddr(byte[] addr) throws UnknownHostException {
+ if (addr[0] == 127) {
+ return InetAddresses.toAddrString(InetAddress.getByAddress(addr));
+ }
+
+ String hostname;
+ synchronized (FakeDNS.this) {
+ hostname = reverseResolutions.get(InetAddress.getByAddress(addr));
+ }
+ if (hostname != null) {
+ return hostname;
+ }
+
+ try {
+ Method method = fallbackNameService.getClass()
+ .getDeclaredMethod("getHostByAddr", byte[].class);
+ method.setAccessible(true);
+ return (String) method.invoke(fallbackNameService, (Object) addr);
+ } catch (ReflectiveOperationException e) {
+ Throwables.propagateIfPossible(e.getCause(), UnknownHostException.class);
+ throw new AssertionError("unexpected reflection issue", e);
+ }
+ }
+
+ @Override
+ public Object invoke(Object proxy, Method method, Object[] args) throws Throwable {
+ switch (method.getName()) {
+ case "lookupAllHostAddr":
+ return lookupAllHostAddr((String) args[0]);
+ case "getHostByAddr":
+ return getHostByAddr((byte[]) args[0]);
+ default:
+ throw new UnsupportedOperationException();
+ }
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/kudu/blob/15f1416f/java/kudu-test-utils/src/main/java/org/apache/kudu/test/cluster/KuduBinaryLocator.java
----------------------------------------------------------------------
diff --git a/java/kudu-test-utils/src/main/java/org/apache/kudu/test/cluster/KuduBinaryLocator.java b/java/kudu-test-utils/src/main/java/org/apache/kudu/test/cluster/KuduBinaryLocator.java
new file mode 100644
index 0000000..9c5597f
--- /dev/null
+++ b/java/kudu-test-utils/src/main/java/org/apache/kudu/test/cluster/KuduBinaryLocator.java
@@ -0,0 +1,94 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+package org.apache.kudu.test.cluster;
+
+import com.google.common.io.CharStreams;
+import org.apache.yetus.audience.InterfaceAudience;
+import org.apache.yetus.audience.InterfaceStability;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.File;
+import java.io.FileNotFoundException;
+import java.io.IOException;
+import java.io.InputStreamReader;
+import java.io.Reader;
+
+import static java.nio.charset.StandardCharsets.UTF_8;
+
+@InterfaceAudience.Private
+@InterfaceStability.Unstable
+public class KuduBinaryLocator {
+ private static final Logger LOG = LoggerFactory.getLogger(KuduBinaryLocator.class);
+
+ private static final String KUDU_BIN_DIR_PROP = "kuduBinDir";
+
+ /**
+ * Find the binary directory within the build tree.
+ *
+ * Uses the following priority:
+ * - If kuduBinDir system property is set, use that.
+ * - If the `kudu` binary is found on the PATH using `which kudu`,
+ * use its parent directory.
+ */
+ private static String findBinaryDir() {
+ // If kuduBinDir system property is set, use that.
+ String kuduBinDirProp = System.getProperty(KUDU_BIN_DIR_PROP);
+ if (kuduBinDirProp != null) {
+ LOG.info("Using Kudu binary directory specified by system property '{}': {}",
+ KUDU_BIN_DIR_PROP, kuduBinDirProp);
+ return kuduBinDirProp;
+ }
+
+ // If the `kudu` binary is found on the PATH using `which kudu`, use its parent directory.
+ try {
+ Runtime runtime = Runtime.getRuntime();
+ Process process = runtime.exec("which kudu");
+ int errorCode = process.waitFor();
+ if (errorCode == 0) {
+ try(Reader reader = new InputStreamReader(process.getInputStream(), UTF_8)) {
+ String kuduBinary = CharStreams.toString(reader);
+ String kuduBinDir = new File(kuduBinary).getParent();
+ LOG.info("Using Kudu binary directory found on path with 'which kudu': {}", kuduBinDir);
+ return kuduBinDir;
+ }
+ }
+ } catch (IOException | InterruptedException ex) {
+ throw new RuntimeException("Error while locating kudu binary", ex);
+ }
+
+ throw new RuntimeException("Could not locate the kudu binary directory. " +
+ "Set the system variable " + KUDU_BIN_DIR_PROP +
+ " or ensure the `kudu` binary is on your path.");
+ }
+
+ /**
+ * @param binName the binary to look for (eg 'kudu-tserver')
+ * @return the absolute path of that binary
+ * @throws FileNotFoundException if no such binary is found
+ */
+ public static String findBinary(String binName) throws FileNotFoundException {
+ String binDir = findBinaryDir();
+
+ File candidate = new File(binDir, binName);
+ if (candidate.canExecute()) {
+ return candidate.getAbsolutePath();
+ }
+ throw new FileNotFoundException("Cannot find binary " + binName +
+ " in binary directory " + binDir);
+ }
+}
http://git-wip-us.apache.org/repos/asf/kudu/blob/15f1416f/java/kudu-test-utils/src/main/java/org/apache/kudu/test/cluster/MiniKuduCluster.java
----------------------------------------------------------------------
diff --git a/java/kudu-test-utils/src/main/java/org/apache/kudu/test/cluster/MiniKuduCluster.java b/java/kudu-test-utils/src/main/java/org/apache/kudu/test/cluster/MiniKuduCluster.java
new file mode 100644
index 0000000..dae4344
--- /dev/null
+++ b/java/kudu-test-utils/src/main/java/org/apache/kudu/test/cluster/MiniKuduCluster.java
@@ -0,0 +1,643 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+package org.apache.kudu.test.cluster;
+
+import static java.nio.charset.StandardCharsets.UTF_8;
+
+import java.io.BufferedReader;
+import java.io.DataInputStream;
+import java.io.DataOutputStream;
+import java.io.File;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.InputStreamReader;
+import java.nio.file.Files;
+import java.nio.file.Paths;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+
+import com.google.common.base.Joiner;
+import com.google.common.base.Preconditions;
+import com.google.common.collect.ImmutableList;
+import com.google.common.collect.Lists;
+import com.google.common.collect.Maps;
+
+import org.apache.kudu.Common;
+import org.apache.kudu.client.HostAndPort;
+import org.apache.kudu.client.ProtobufHelper;
+import org.apache.kudu.test.KuduTestHarness;
+import org.apache.kudu.tools.Tool.ControlShellRequestPB;
+import org.apache.kudu.tools.Tool.ControlShellResponsePB;
+import org.apache.kudu.tools.Tool.CreateClusterRequestPB.MiniKdcOptionsPB;
+import org.apache.kudu.tools.Tool.CreateClusterRequestPB;
+import org.apache.kudu.tools.Tool.DaemonIdentifierPB;
+import org.apache.kudu.tools.Tool.DaemonInfoPB;
+import org.apache.kudu.tools.Tool.GetKDCEnvVarsRequestPB;
+import org.apache.kudu.tools.Tool.GetMastersRequestPB;
+import org.apache.kudu.tools.Tool.GetTServersRequestPB;
+import org.apache.kudu.tools.Tool.KdestroyRequestPB;
+import org.apache.kudu.tools.Tool.KinitRequestPB;
+import org.apache.kudu.tools.Tool.StartClusterRequestPB;
+import org.apache.kudu.tools.Tool.StartDaemonRequestPB;
+import org.apache.kudu.tools.Tool.StopDaemonRequestPB;
+import org.apache.kudu.util.SecurityUtil;
+import org.apache.yetus.audience.InterfaceAudience;
+import org.apache.yetus.audience.InterfaceStability;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Utility class to start and manipulate Kudu clusters. Depends on precompiled
+ * kudu, kudu-master, and kudu-tserver binaries. {@link KuduTestHarness}
+ * should be used instead of directly using this class in almost all cases.
+ */
+@InterfaceAudience.Private
+@InterfaceStability.Unstable
+public class MiniKuduCluster implements AutoCloseable {
+
+ private static final Logger LOG = LoggerFactory.getLogger(MiniKuduCluster.class);
+
+ // Control shell process.
+ private Process miniCluster;
+
+ // Request channel to the control shell.
+ private DataOutputStream miniClusterStdin;
+
+ // Response channel from the control shell.
+ private DataInputStream miniClusterStdout;
+
+ // Thread that reads and logs stderr from the control shell.
+ private Thread miniClusterErrorPrinter;
+
+ private static class DaemonInfo {
+ DaemonIdentifierPB id;
+ boolean isRunning;
+ }
+
+ // Map of master addresses to daemon information.
+ private final Map<HostAndPort, DaemonInfo> masterServers = Maps.newHashMap();
+
+ // Map of tserver addresses to daemon information.
+ private final Map<HostAndPort, DaemonInfo> tabletServers = Maps.newHashMap();
+
+ // Builder-provided cluster configuration state.
+ private final boolean enableKerberos;
+ private final int numMasters;
+ private final int numTservers;
+ private final ImmutableList<String> extraTserverFlags;
+ private final ImmutableList<String> extraMasterFlags;
+ private final String clusterRoot;
+
+ private MiniKdcOptionsPB kdcOptionsPb;
+ private final Common.HmsMode hmsMode;
+
+ private MiniKuduCluster(boolean enableKerberos,
+ int numMasters,
+ int numTservers,
+ List<String> extraTserverFlags,
+ List<String> extraMasterFlags,
+ MiniKdcOptionsPB kdcOptionsPb,
+ String clusterRoot,
+ Common.HmsMode hmsMode) {
+ this.enableKerberos = enableKerberos;
+ this.numMasters = numMasters;
+ this.numTservers = numTservers;
+ this.extraTserverFlags = ImmutableList.copyOf(extraTserverFlags);
+ this.extraMasterFlags = ImmutableList.copyOf(extraMasterFlags);
+ this.kdcOptionsPb = kdcOptionsPb;
+ this.hmsMode = hmsMode;
+
+ if (clusterRoot == null) {
+ // If a cluster root was not set, create a unique temp directory to use.
+ // The mini cluster will clean this directory up on exit.
+ try {
+ File tempRoot = getTempDirectory("mini-kudu-cluster");
+ this.clusterRoot = tempRoot.toString();
+ } catch (IOException ex) {
+ throw new RuntimeException("Could not create cluster root directory", ex);
+ }
+ } else {
+ this.clusterRoot = clusterRoot;
+ }
+ }
+
+ // Match the C++ MiniCluster test functionality for overriding the tmp directory used.
+ // See MakeClusterRoot in src/kudu/tools/tool_action_test.cc.
+ // If the TEST_TMPDIR environment variable is defined that directory will be used
+ // instead of the default temp directory.
+ private File getTempDirectory(String prefix) throws IOException {
+ String testTmpdir = System.getenv("TEST_TMPDIR");
+ if (testTmpdir != null) {
+ LOG.info("Using the temp directory defined by TEST_TMPDIR: " + testTmpdir);
+ return Files.createTempDirectory(Paths.get(testTmpdir), prefix).toFile();
+ } else {
+ return Files.createTempDirectory(prefix).toFile();
+ }
+ }
+
+ /**
+ * Sends a command to the control shell and receives its response.
+ * <p>
+ * The method is synchronized to prevent interleaving of requests and responses.
+ * @param req control shell request
+ * @return control shell response
+ * @throws IOException if there was some kind of transport error, or if the
+ * response indicates an error
+ */
+ private synchronized ControlShellResponsePB sendRequestToCluster(ControlShellRequestPB req)
+ throws IOException {
+ // Send the request's size (4 bytes, big endian) followed by the request.
+ LOG.debug("Request: {}", req);
+ miniClusterStdin.writeInt(req.getSerializedSize());
+ miniClusterStdin.write(req.toByteArray());
+ miniClusterStdin.flush();
+
+ // Read the response's size (4 bytes, big endian) followed by the response.
+ int respLength = miniClusterStdout.readInt();
+ byte[] respBody = new byte[respLength];
+ miniClusterStdout.readFully(respBody);
+ ControlShellResponsePB resp = ControlShellResponsePB.parseFrom(respBody);
+ LOG.debug("Response: {}", resp);
+
+ // Convert any error into an exception.
+ if (resp.hasError()) {
+ throw new IOException(resp.getError().getMessage());
+ }
+ return resp;
+ }
+
+ /**
+ * Starts this Kudu cluster.
+ * @throws IOException if something went wrong in transit
+ */
+ private void start() throws IOException {
+ Preconditions.checkArgument(numMasters > 0, "Need at least one master");
+
+ // Start the control shell and the communication channel to it.
+ List<String> commandLine = Lists.newArrayList(
+ KuduBinaryLocator.findBinary("kudu"),
+ "test",
+ "mini_cluster",
+ "--serialization=pb");
+ LOG.info("Starting process: {}", commandLine);
+ ProcessBuilder processBuilder = new ProcessBuilder(commandLine);
+ miniCluster = processBuilder.start();
+ miniClusterStdin = new DataOutputStream(miniCluster.getOutputStream());
+ miniClusterStdout = new DataInputStream(miniCluster.getInputStream());
+
+ // Set up a thread that logs stderr from the control shell; this will
+ // include all cluster logging.
+ ProcessInputStreamLogPrinterRunnable printer =
+ new ProcessInputStreamLogPrinterRunnable(miniCluster.getErrorStream());
+ miniClusterErrorPrinter = new Thread(printer);
+ miniClusterErrorPrinter.setDaemon(true);
+ miniClusterErrorPrinter.setName("cluster stderr printer");
+ miniClusterErrorPrinter.start();
+
+ // Create and start the cluster.
+ sendRequestToCluster(
+ ControlShellRequestPB.newBuilder()
+ .setCreateCluster(CreateClusterRequestPB.newBuilder()
+ .setNumMasters(numMasters)
+ .setNumTservers(numTservers)
+ .setEnableKerberos(enableKerberos)
+ .setHmsMode(hmsMode)
+ .addAllExtraMasterFlags(extraMasterFlags)
+ .addAllExtraTserverFlags(extraTserverFlags)
+ .setMiniKdcOptions(kdcOptionsPb)
+ .setClusterRoot(clusterRoot)
+ .build())
+ .build());
+ sendRequestToCluster(
+ ControlShellRequestPB.newBuilder()
+ .setStartCluster(StartClusterRequestPB.newBuilder().build())
+ .build());
+
+ // If the cluster is Kerberized, retrieve the KDC's environment variables
+ // and adapt them into certain security-related system properties.
+ if (enableKerberos) {
+ ControlShellResponsePB resp = sendRequestToCluster(
+ ControlShellRequestPB.newBuilder()
+ .setGetKdcEnvVars(GetKDCEnvVarsRequestPB.newBuilder().build())
+ .build());
+ for (Map.Entry<String, String> e : resp.getGetKdcEnvVars().getEnvVarsMap().entrySet()) {
+ if (e.getKey().equals("KRB5_CONFIG")) {
+ System.setProperty("java.security.krb5.conf", e.getValue());
+ } else if (e.getKey().equals("KRB5CCNAME")) {
+ System.setProperty(SecurityUtil.KUDU_TICKETCACHE_PROPERTY, e.getValue());
+ }
+ }
+ }
+
+ // Initialize the maps of master and tablet servers.
+ ControlShellResponsePB resp = sendRequestToCluster(
+ ControlShellRequestPB.newBuilder()
+ .setGetMasters(GetMastersRequestPB.newBuilder().build())
+ .build());
+ for (DaemonInfoPB info : resp.getGetMasters().getMastersList()) {
+ DaemonInfo d = new DaemonInfo();
+ d.id = info.getId();
+ d.isRunning = true;
+ masterServers.put(ProtobufHelper.hostAndPortFromPB(info.getBoundRpcAddress()), d);
+ }
+ resp = sendRequestToCluster(
+ ControlShellRequestPB.newBuilder()
+ .setGetTservers(GetTServersRequestPB.newBuilder().build())
+ .build());
+ for (DaemonInfoPB info : resp.getGetTservers().getTserversList()) {
+ DaemonInfo d = new DaemonInfo();
+ d.id = info.getId();
+ d.isRunning = true;
+ tabletServers.put(ProtobufHelper.hostAndPortFromPB(info.getBoundRpcAddress()), d);
+ }
+ }
+
+ /**
+ * @return comma-separated list of master server addresses
+ */
+ public String getMasterAddressesAsString() {
+ return Joiner.on(',').join(masterServers.keySet());
+ }
+
+ /**
+ * @return the list of master servers
+ */
+ public List<HostAndPort> getMasterServers() {
+ return new ArrayList(masterServers.keySet());
+ }
+
+ /**
+ * @return the list of tablet servers
+ */
+ public List<HostAndPort> getTabletServers() {
+ return new ArrayList(tabletServers.keySet());
+ }
+
+ /**
+ * Starts a master identified by a host and port.
+ * Does nothing if the server was already running.
+ *
+ * @param hp unique host and port identifying the server
+ * @throws IOException if something went wrong in transit
+ */
+ public void startMasterServer(HostAndPort hp) throws IOException {
+ DaemonInfo d = getMasterServer(hp);
+ if (d.isRunning) {
+ return;
+ }
+ LOG.info("Starting master server {}", hp);
+ sendRequestToCluster(ControlShellRequestPB.newBuilder()
+ .setStartDaemon(StartDaemonRequestPB.newBuilder().setId(d.id).build())
+ .build());
+ d.isRunning = true;
+ }
+
+ /**
+ * Kills a master identified identified by an host and port.
+ * Does nothing if the master was already dead.
+ *
+ * @param hp unique host and port identifying the server
+ * @throws IOException if something went wrong in transit
+ */
+ public void killMasterServer(HostAndPort hp) throws IOException {
+ DaemonInfo d = getMasterServer(hp);
+ if (!d.isRunning) {
+ return;
+ }
+ LOG.info("Killing master server {}", hp);
+ sendRequestToCluster(ControlShellRequestPB.newBuilder()
+ .setStopDaemon(StopDaemonRequestPB.newBuilder().setId(d.id).build())
+ .build());
+ d.isRunning = false;
+ }
+
+ /**
+ * Starts a tablet server identified by an host and port.
+ * Does nothing if the server was already running.
+ *
+ * @param hp unique host and port identifying the server
+ * @throws IOException if something went wrong in transit
+ */
+ public void startTabletServer(HostAndPort hp) throws IOException {
+ DaemonInfo d = getTabletServer(hp);
+ if (d.isRunning) {
+ return;
+ }
+ LOG.info("Starting tablet server {}", hp);
+ sendRequestToCluster(ControlShellRequestPB.newBuilder()
+ .setStartDaemon(StartDaemonRequestPB.newBuilder().setId(d.id).build())
+ .build());
+ d.isRunning = true;
+ }
+
+ /**
+ * Kills a tablet server identified by an host and port.
+ * Does nothing if the tablet server was already dead.
+ *
+ * @param hp unique host and port identifying the server
+ * @throws IOException if something went wrong in transit
+ */
+ public void killTabletServer(HostAndPort hp) throws IOException {
+ DaemonInfo d = getTabletServer(hp);
+ if (!d.isRunning) {
+ return;
+ }
+ LOG.info("Killing tablet server {}", hp);
+ sendRequestToCluster(ControlShellRequestPB.newBuilder()
+ .setStopDaemon(StopDaemonRequestPB.newBuilder().setId(d.id).build())
+ .build());
+ d.isRunning = false;
+ }
+
+ /**
+ * Kills all the master servers.
+ * Does nothing to the servers that are already dead.
+ *
+ * @throws IOException if something went wrong in transit
+ */
+ public void killAllMasterServers() throws IOException {
+ for (Map.Entry<HostAndPort, DaemonInfo> e : masterServers.entrySet()) {
+ killMasterServer(e.getKey());
+ }
+ }
+
+ /**
+ * Starts all the master servers.
+ * Does nothing to the servers that are already running.
+ *
+ * @throws IOException if something went wrong in transit
+ */
+ public void startAllMasterServers() throws IOException {
+ for (Map.Entry<HostAndPort, DaemonInfo> e : masterServers.entrySet()) {
+ startMasterServer(e.getKey());
+ }
+ }
+
+ /**
+ * Kills all tablet servers.
+ * Does nothing to the servers that are already dead.
+ *
+ * @throws IOException if something went wrong in transit
+ */
+ public void killAllTabletServers() throws IOException {
+ for (Map.Entry<HostAndPort, DaemonInfo> e : tabletServers.entrySet()) {
+ killTabletServer(e.getKey());
+ }
+ }
+
+ /**
+ * Starts all the tablet servers.
+ * Does nothing to the servers that are already running.
+ *
+ * @throws IOException if something went wrong in transit
+ */
+ public void startAllTabletServers() throws IOException {
+ for (Map.Entry<HostAndPort, DaemonInfo> e : tabletServers.entrySet()) {
+ startTabletServer(e.getKey());
+ }
+ }
+
+ /**
+ * Removes all credentials for all principals from the Kerberos credential cache.
+ */
+ public void kdestroy() throws IOException {
+ LOG.info("Destroying all Kerberos credentials");
+ sendRequestToCluster(ControlShellRequestPB.newBuilder()
+ .setKdestroy(KdestroyRequestPB.getDefaultInstance())
+ .build());
+ }
+
+ /**
+ * Re-initialize Kerberos credentials for the given username, writing them
+ * into the Kerberos credential cache.
+ * @param username the username to kinit as
+ */
+ public void kinit(String username) throws IOException {
+ LOG.info("Running kinit for user {}", username);
+ sendRequestToCluster(ControlShellRequestPB.newBuilder()
+ .setKinit(KinitRequestPB.newBuilder().setUsername(username).build())
+ .build());
+ }
+
+
+ /** {@override} */
+ @Override
+ public void close() {
+ shutdown();
+ }
+
+ /**
+ * Shuts down a Kudu cluster.
+ */
+ public void shutdown() {
+ // Closing stdin should cause the control shell process to terminate.
+ if (miniClusterStdin != null) {
+ try {
+ miniClusterStdin.close();
+ } catch (IOException e) {
+ LOG.info("Caught exception while closing minicluster stdin", e);
+ }
+ }
+ if (miniClusterStdout != null) {
+ try {
+ miniClusterStdout.close();
+ } catch (IOException e) {
+ LOG.info("Caught exception while closing minicluster stdout", e);
+ }
+ }
+ if (miniClusterErrorPrinter != null) {
+ try {
+ miniClusterErrorPrinter.join();
+ } catch (InterruptedException e) {
+ LOG.info("Caught exception while closing minicluster stderr", e);
+ }
+ }
+ if (miniCluster != null) {
+ try {
+ miniCluster.waitFor();
+ } catch (InterruptedException e) {
+ LOG.warn("Minicluster process did not exit, destroying");
+ miniCluster.destroy();
+ }
+ }
+ }
+
+ /**
+ * Returns a master server identified by an address.
+ *
+ * @param hp unique host and port identifying the server
+ * @return the DaemonInfo of the server
+ * @throws RuntimeException if the server is not found
+ */
+ private DaemonInfo getMasterServer(HostAndPort hp) throws RuntimeException {
+ DaemonInfo d = masterServers.get(hp);
+ if (d == null) {
+ throw new RuntimeException(String.format("Master server %s not found", hp));
+ }
+ return d;
+ }
+
+ /**
+ * Returns a tablet server identified by an address.
+ *
+ * @param hp unique host and port identifying the server
+ * @return the DaemonInfo of the server
+ * @throws RuntimeException if the server is not found
+ */
+ private DaemonInfo getTabletServer(HostAndPort hp) throws RuntimeException {
+ DaemonInfo d = tabletServers.get(hp);
+ if (d == null) {
+ throw new RuntimeException(String.format("Tablet server %s not found", hp));
+ }
+ return d;
+ }
+
+ /**
+ * @return path to the mini cluster root directory
+ */
+ public String getClusterRoot() {
+ return clusterRoot;
+ }
+
+ /**
+ * Helper runnable that receives stderr and logs it along with the process' identifier.
+ */
+ public static class ProcessInputStreamLogPrinterRunnable implements Runnable {
+
+ private final InputStream is;
+
+ public ProcessInputStreamLogPrinterRunnable(InputStream is) {
+ this.is = is;
+ }
+
+ @Override
+ public void run() {
+ try {
+ String line;
+ BufferedReader in = new BufferedReader(
+ new InputStreamReader(is, UTF_8));
+ while ((line = in.readLine()) != null) {
+ LOG.info(line);
+ }
+ in.close();
+ } catch (Exception e) {
+ if (!e.getMessage().contains("Stream closed")) {
+ LOG.error("Caught error while reading a process' output", e);
+ }
+ }
+ }
+ }
+
+ /**
+ * Builder for {@link MiniKuduCluster}
+ */
+ public static class MiniKuduClusterBuilder {
+
+ private int numMasterServers = 1;
+ private int numTabletServers = 3;
+ private boolean enableKerberos = false;
+ private final List<String> extraTabletServerFlags = new ArrayList<>();
+ private final List<String> extraMasterServerFlags = new ArrayList<>();
+ private String clusterRoot = null;
+
+ private MiniKdcOptionsPB.Builder kdcOptionsPb = MiniKdcOptionsPB.newBuilder();
+ private Common.HmsMode hmsMode = Common.HmsMode.NONE;
+
+ public MiniKuduClusterBuilder numMasterServers(int numMasterServers) {
+ this.numMasterServers = numMasterServers;
+ return this;
+ }
+
+ public MiniKuduClusterBuilder numTabletServers(int numTabletServers) {
+ this.numTabletServers = numTabletServers;
+ return this;
+ }
+
+ /**
+ * Enables Kerberos on the mini cluster and acquire client credentials for this process.
+ * @return this instance
+ */
+ public MiniKuduClusterBuilder enableKerberos() {
+ enableKerberos = true;
+ return this;
+ }
+
+ public MiniKuduClusterBuilder enableHiveMetastoreIntegration() {
+ hmsMode = Common.HmsMode.ENABLE_METASTORE_INTEGRATION;
+ return this;
+ }
+
+ /**
+ * Adds a new flag to be passed to the Tablet Server daemons on start.
+ * @return this instance
+ */
+ public MiniKuduClusterBuilder addTabletServerFlag(String flag) {
+ this.extraTabletServerFlags.add(flag);
+ return this;
+ }
+
+ /**
+ * Adds a new flag to be passed to the Master daemons on start.
+ * @return this instance
+ */
+ public MiniKuduClusterBuilder addMasterServerFlag(String flag) {
+ this.extraMasterServerFlags.add(flag);
+ return this;
+ }
+
+ public MiniKuduClusterBuilder kdcTicketLifetime(String lifetime) {
+ this.kdcOptionsPb.setTicketLifetime(lifetime);
+ return this;
+ }
+
+ public MiniKuduClusterBuilder kdcRenewLifetime(String lifetime) {
+ this.kdcOptionsPb.setRenewLifetime(lifetime);
+ return this;
+ }
+
+ /**
+ * Sets the directory where the cluster's data and logs should be placed.
+ * @return this instance
+ */
+ public MiniKuduClusterBuilder clusterRoot(String clusterRoot) {
+ this.clusterRoot = clusterRoot;
+ return this;
+ }
+
+ /**
+ * Builds and starts a new {@link MiniKuduCluster} using builder state.
+ * @return the newly started {@link MiniKuduCluster}
+ * @throws IOException if something went wrong starting the cluster
+ */
+ public MiniKuduCluster build() throws IOException {
+ MiniKuduCluster cluster =
+ new MiniKuduCluster(enableKerberos,
+ numMasterServers, numTabletServers,
+ extraTabletServerFlags, extraMasterServerFlags,
+ kdcOptionsPb.build(), clusterRoot, hmsMode);
+ try {
+ cluster.start();
+ } catch (IOException e) {
+ // MiniKuduCluster.close should not throw, so no need for a nested try/catch.
+ cluster.close();
+ throw e;
+ }
+ return cluster;
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/kudu/blob/15f1416f/java/kudu-test-utils/src/main/java/org/apache/kudu/test/junit/AssertHelpers.java
----------------------------------------------------------------------
diff --git a/java/kudu-test-utils/src/main/java/org/apache/kudu/test/junit/AssertHelpers.java b/java/kudu-test-utils/src/main/java/org/apache/kudu/test/junit/AssertHelpers.java
new file mode 100644
index 0000000..9fc2c0c
--- /dev/null
+++ b/java/kudu-test-utils/src/main/java/org/apache/kudu/test/junit/AssertHelpers.java
@@ -0,0 +1,46 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+package org.apache.kudu.test.junit;
+
+import org.apache.yetus.audience.InterfaceAudience;
+import org.apache.yetus.audience.InterfaceStability;
+
+import static org.junit.Assert.assertTrue;
+
+@InterfaceAudience.Private
+@InterfaceStability.Unstable
+public class AssertHelpers {
+ public interface BooleanExpression {
+ boolean get() throws Exception;
+ }
+
+ // A looping check. It's mainly useful for scanners, since writes may take a little time to show
+ // up.
+ public static void assertEventuallyTrue(String description, BooleanExpression expression,
+ long timeoutMillis) throws Exception {
+ long deadlineNanos = System.nanoTime() + timeoutMillis * 1000000;
+ boolean success;
+
+ do {
+ success = expression.get();
+ if (success) break;
+ Thread.sleep(50); // Sleep for 50ms
+ } while (System.nanoTime() < deadlineNanos);
+
+ assertTrue(description, success);
+ }
+}
http://git-wip-us.apache.org/repos/asf/kudu/blob/15f1416f/java/kudu-test-utils/src/main/java/org/apache/kudu/test/junit/RetryRule.java
----------------------------------------------------------------------
diff --git a/java/kudu-test-utils/src/main/java/org/apache/kudu/test/junit/RetryRule.java b/java/kudu-test-utils/src/main/java/org/apache/kudu/test/junit/RetryRule.java
new file mode 100644
index 0000000..b096a3e
--- /dev/null
+++ b/java/kudu-test-utils/src/main/java/org/apache/kudu/test/junit/RetryRule.java
@@ -0,0 +1,83 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+package org.apache.kudu.test.junit;
+
+import org.apache.yetus.audience.InterfaceAudience;
+import org.apache.yetus.audience.InterfaceStability;
+import org.junit.rules.TestRule;
+import org.junit.runner.Description;
+import org.junit.runners.model.Statement;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * A JUnit rule to retry failed tests.
+ * We use this with Gradle because it doesn't support
+ * Surefire/Failsafe rerunFailingTestsCount like Maven does. We use the system
+ * property rerunFailingTestsCount to mimic the maven arguments closely.
+ */
+@InterfaceAudience.Private
+@InterfaceStability.Unstable
+public class RetryRule implements TestRule {
+
+ private static final Logger LOG = LoggerFactory.getLogger(RetryRule.class);
+ private static final int RETRY_COUNT = Integer.getInteger("rerunFailingTestsCount", 0);
+
+ public RetryRule () {}
+
+ @Override
+ public Statement apply(Statement base, Description description) {
+ return new RetryStatement(base, description, RETRY_COUNT);
+ }
+
+ private static class RetryStatement extends Statement {
+
+ private final Statement base;
+ private final Description description;
+ private final int retryCount;
+
+ RetryStatement(final Statement base, final Description description, final int retryCount) {
+ this.base = base;
+ this.description = description;
+ this.retryCount = retryCount;
+ }
+
+ @Override
+ public void evaluate() throws Throwable {
+ // If there are no retries, just pass through to evaluate as usual.
+ if (retryCount == 0) {
+ base.evaluate();
+ return;
+ }
+
+ // To retry we catch the exception for the evaluate, log a message, and retry.
+ // We track and throw the last failure if all tries fail.
+ Throwable lastException = null;
+ for (int i = 0; i < retryCount; i++) {
+ try {
+ base.evaluate();
+ return;
+ } catch (Throwable t) {
+ lastException = t;
+ LOG.error(description.getDisplayName() + ": failed run " + (i + 1), t);
+ }
+ }
+ LOG.error(description.getDisplayName() + ": giving up after " + retryCount + " failures");
+ throw lastException;
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/kudu/blob/15f1416f/java/kudu-test-utils/src/test/java/org/apache/kudu/test/TestMiniKuduCluster.java
----------------------------------------------------------------------
diff --git a/java/kudu-test-utils/src/test/java/org/apache/kudu/test/TestMiniKuduCluster.java b/java/kudu-test-utils/src/test/java/org/apache/kudu/test/TestMiniKuduCluster.java
new file mode 100644
index 0000000..95a3843
--- /dev/null
+++ b/java/kudu-test-utils/src/test/java/org/apache/kudu/test/TestMiniKuduCluster.java
@@ -0,0 +1,136 @@
+/**
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License. See accompanying LICENSE file.
+ */
+package org.apache.kudu.test;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertNull;
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
+
+import java.io.IOException;
+import java.net.Socket;
+
+import org.apache.kudu.client.DeadlineTracker;
+import org.apache.kudu.client.HostAndPort;
+import org.apache.kudu.client.KuduClient;
+import org.apache.kudu.client.KuduClient.KuduClientBuilder;
+import org.apache.kudu.client.ListTablesResponse;
+import org.apache.kudu.test.cluster.MiniKuduCluster;
+import org.apache.kudu.test.junit.RetryRule;
+import org.apache.kudu.test.cluster.FakeDNS;
+import org.junit.Rule;
+import org.junit.Test;
+
+public class TestMiniKuduCluster {
+
+ private static final int NUM_TABLET_SERVERS = 3;
+ private static final int NUM_MASTERS = 1;
+ private static final long SLEEP_TIME_MS = 10000;
+
+ @Rule
+ public RetryRule retryRule = new RetryRule();
+
+ @Test(timeout = 50000)
+ public void test() throws Exception {
+ try (MiniKuduCluster cluster = new MiniKuduCluster.MiniKuduClusterBuilder()
+ .numMasterServers(NUM_MASTERS)
+ .numTabletServers(NUM_TABLET_SERVERS)
+ .build()) {
+ assertEquals(NUM_MASTERS, cluster.getMasterServers().size());
+ assertEquals(NUM_TABLET_SERVERS, cluster.getTabletServers().size());
+
+ {
+ // Kill the master.
+ HostAndPort masterHostPort = cluster.getMasterServers().get(0);
+ testHostPort(masterHostPort, true);
+ cluster.killMasterServer(masterHostPort);
+
+ testHostPort(masterHostPort, false);
+
+ // Restart the master.
+ cluster.startMasterServer(masterHostPort);
+
+ // Test we can reach it.
+ testHostPort(masterHostPort, true);
+ }
+
+ {
+ // Kill the first TS.
+ HostAndPort tsHostPort = cluster.getTabletServers().get(0);
+ testHostPort(tsHostPort, true);
+ cluster.killTabletServer(tsHostPort);
+
+ testHostPort(tsHostPort, false);
+
+ // Restart it.
+ cluster.startTabletServer(tsHostPort);
+
+ testHostPort(tsHostPort, true);
+ }
+ }
+ }
+
+ @Test(timeout = 50000)
+ public void testKerberos() throws Exception {
+ FakeDNS.getInstance().install();
+ try (MiniKuduCluster cluster = new MiniKuduCluster.MiniKuduClusterBuilder()
+ .numMasterServers(NUM_MASTERS)
+ .numTabletServers(NUM_TABLET_SERVERS)
+ .enableKerberos()
+ .build()) {
+ KuduClient client = new KuduClientBuilder(cluster.getMasterAddressesAsString()).build();
+ ListTablesResponse resp = client.getTablesList();
+ assertTrue(resp.getTablesList().isEmpty());
+ assertNull(client.getHiveMetastoreConfig());
+ }
+ }
+
+ @Test(timeout = 50000)
+ public void testHiveMetastoreIntegration() throws Exception {
+ try (MiniKuduCluster cluster = new MiniKuduCluster.MiniKuduClusterBuilder()
+ .numMasterServers(NUM_MASTERS)
+ .numTabletServers(NUM_TABLET_SERVERS)
+ .enableHiveMetastoreIntegration()
+ .build()) {
+ KuduClient client = new KuduClientBuilder(cluster.getMasterAddressesAsString()).build();
+ assertNotNull(client.getHiveMetastoreConfig());
+ }
+ }
+
+ /**
+ * Test whether the specified host and port is open or closed, waiting up to a certain time.
+ * @param hp the host and port to test
+ * @param testIsOpen true if we should want it to be open, false if we want it closed
+ */
+ private static void testHostPort(HostAndPort hp,
+ boolean testIsOpen) throws InterruptedException {
+ DeadlineTracker tracker = new DeadlineTracker();
+ while (tracker.getElapsedMillis() < SLEEP_TIME_MS) {
+ try {
+ Socket socket = new Socket(hp.getHost(), hp.getPort());
+ socket.close();
+ if (testIsOpen) {
+ return;
+ }
+ } catch (IOException e) {
+ if (!testIsOpen) {
+ return;
+ }
+ }
+ Thread.sleep(200);
+ }
+ fail("HostAndPort " + hp + " is still " + (testIsOpen ? "closed " : "open"));
+ }
+}
http://git-wip-us.apache.org/repos/asf/kudu/blob/15f1416f/java/settings.gradle
----------------------------------------------------------------------
diff --git a/java/settings.gradle b/java/settings.gradle
index 6a60213..96ca03f 100644
--- a/java/settings.gradle
+++ b/java/settings.gradle
@@ -28,3 +28,4 @@ include "kudu-jepsen"
include "kudu-mapreduce"
include "kudu-spark"
include "kudu-spark-tools"
+include "kudu-test-utils"