You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by ma...@apache.org on 2018/12/21 08:51:26 UTC
[kafka] branch trunk updated: MINOR: Switch anonymous classes to
lambda expressions in tools module
This is an automated email from the ASF dual-hosted git repository.
manikumar pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push:
new 85906d3 MINOR: Switch anonymous classes to lambda expressions in tools module
85906d3 is described below
commit 85906d3d2ba49e593623de1484a9946224ce71cc
Author: Srinivas Reddy <sr...@gmail.com>
AuthorDate: Fri Dec 21 14:20:57 2018 +0530
MINOR: Switch anonymous classes to lambda expressions in tools module
Switch to lambda when ever possible instead of old anonymous way
in tools module
Author: Srinivas Reddy <sr...@gmail.com>
Author: Srinivas Reddy <mr...@users.noreply.github.com>
Reviewers: Ryanne Dolan <ry...@gmail.com>, Ismael Juma <is...@juma.me.uk>, Manikumar Reddy <ma...@gmail.com>
Closes #6013 from mrsrinivas/tools-switch-to-java8
---
.../kafka/tools/ClientCompatibilityTest.java | 109 +++++++++------------
.../java/org/apache/kafka/tools/ToolsUtils.java | 8 +-
.../kafka/tools/TransactionalMessageCopier.java | 19 ++--
.../org/apache/kafka/tools/VerifiableConsumer.java | 7 +-
.../kafka/tools/VerifiableLog4jAppender.java | 11 +--
.../org/apache/kafka/tools/VerifiableProducer.java | 23 ++---
.../java/org/apache/kafka/trogdor/agent/Agent.java | 19 ++--
.../apache/kafka/trogdor/agent/WorkerManager.java | 25 +++--
.../kafka/trogdor/coordinator/Coordinator.java | 19 ++--
.../apache/kafka/trogdor/rest/JsonRestServer.java | 27 +++--
.../trogdor/workload/ConnectionStressSpec.java | 9 +-
.../kafka/trogdor/workload/ProduceBenchSpec.java | 9 +-
.../kafka/trogdor/workload/RoundTripWorker.java | 19 ++--
.../trogdor/workload/RoundTripWorkloadSpec.java | 9 +-
.../org/apache/kafka/trogdor/agent/AgentTest.java | 4 +-
.../apache/kafka/trogdor/common/ExpectedTasks.java | 107 ++++++++++----------
.../kafka/trogdor/common/MiniTrogdorCluster.java | 35 +++----
.../kafka/trogdor/coordinator/CoordinatorTest.java | 11 +--
.../kafka/trogdor/task/SampleTaskWorker.java | 10 +-
19 files changed, 196 insertions(+), 284 deletions(-)
diff --git a/tools/src/main/java/org/apache/kafka/tools/ClientCompatibilityTest.java b/tools/src/main/java/org/apache/kafka/tools/ClientCompatibilityTest.java
index 6182744..5b7e228 100644
--- a/tools/src/main/java/org/apache/kafka/tools/ClientCompatibilityTest.java
+++ b/tools/src/main/java/org/apache/kafka/tools/ClientCompatibilityTest.java
@@ -261,65 +261,68 @@ public class ClientCompatibilityTest {
nodes.size(), testConfig.numClusterNodes);
}
tryFeature("createTopics", testConfig.createTopicsSupported,
- new Invoker() {
- @Override
- public void invoke() throws Throwable {
- try {
- client.createTopics(Collections.singleton(
- new NewTopic("newtopic", 1, (short) 1))).all().get();
- } catch (ExecutionException e) {
- throw e.getCause();
- }
+ () -> {
+ try {
+ client.createTopics(Collections.singleton(
+ new NewTopic("newtopic", 1, (short) 1))).all().get();
+ } catch (ExecutionException e) {
+ throw e.getCause();
}
},
- new ResultTester() {
- @Override
- public void test() throws Throwable {
- while (true) {
- try {
- client.describeTopics(Collections.singleton("newtopic")).all().get();
- break;
- } catch (ExecutionException e) {
- if (e.getCause() instanceof UnknownTopicOrPartitionException)
- continue;
- throw e;
- }
- }
- }
- });
+ () -> createTopicsResultTest(client, Collections.singleton("newtopic"))
+ );
+
while (true) {
Collection<TopicListing> listings = client.listTopics().listings().get();
if (!testConfig.createTopicsSupported)
break;
- boolean foundNewTopic = false;
- for (TopicListing listing : listings) {
- if (listing.name().equals("newtopic")) {
- if (listing.isInternal())
- throw new KafkaException("Did not expect newtopic to be an internal topic.");
- foundNewTopic = true;
- }
- }
- if (foundNewTopic)
+
+ if (topicExists(listings, "newtopic"))
break;
+
Thread.sleep(1);
log.info("Did not see newtopic. Retrying listTopics...");
}
+
tryFeature("describeAclsSupported", testConfig.describeAclsSupported,
- new Invoker() {
- @Override
- public void invoke() throws Throwable {
- try {
- client.describeAcls(AclBindingFilter.ANY).values().get();
- } catch (ExecutionException e) {
- if (e.getCause() instanceof SecurityDisabledException)
- return;
- throw e.getCause();
- }
+ () -> {
+ try {
+ client.describeAcls(AclBindingFilter.ANY).values().get();
+ } catch (ExecutionException e) {
+ if (e.getCause() instanceof SecurityDisabledException)
+ return;
+ throw e.getCause();
}
});
}
}
+ private void createTopicsResultTest(AdminClient client, Collection<String> topics)
+ throws InterruptedException, ExecutionException {
+ while (true) {
+ try {
+ client.describeTopics(topics).all().get();
+ break;
+ } catch (ExecutionException e) {
+ if (e.getCause() instanceof UnknownTopicOrPartitionException)
+ continue;
+ throw e;
+ }
+ }
+ }
+
+ private boolean topicExists(Collection<TopicListing> listings, String topicName) {
+ boolean foundTopic = false;
+ for (TopicListing listing : listings) {
+ if (listing.name().equals(topicName)) {
+ if (listing.isInternal())
+ throw new KafkaException(String.format("Did not expect %s to be an internal topic.", topicName));
+ foundTopic = true;
+ }
+ }
+ return foundTopic;
+ }
+
private static class OffsetsForTime {
Map<TopicPartition, OffsetAndTimestamp> result;
@@ -384,18 +387,8 @@ public class ClientCompatibilityTest {
}
final OffsetsForTime offsetsForTime = new OffsetsForTime();
tryFeature("offsetsForTimes", testConfig.offsetsForTimesSupported,
- new Invoker() {
- @Override
- public void invoke() {
- offsetsForTime.result = consumer.offsetsForTimes(timestampsToSearch);
- }
- },
- new ResultTester() {
- @Override
- public void test() {
- log.info("offsetsForTime = {}", offsetsForTime.result);
- }
- });
+ () -> offsetsForTime.result = consumer.offsetsForTimes(timestampsToSearch),
+ () -> log.info("offsetsForTime = {}", offsetsForTime.result));
// Whether or not offsetsForTimes works, beginningOffsets and endOffsets
// should work.
consumer.beginningOffsets(timestampsToSearch.keySet());
@@ -486,11 +479,7 @@ public class ClientCompatibilityTest {
}
private void tryFeature(String featureName, boolean supported, Invoker invoker) throws Throwable {
- tryFeature(featureName, supported, invoker, new ResultTester() {
- @Override
- public void test() {
- }
- });
+ tryFeature(featureName, supported, invoker, () -> { });
}
private void tryFeature(String featureName, boolean supported, Invoker invoker, ResultTester resultTester)
diff --git a/tools/src/main/java/org/apache/kafka/tools/ToolsUtils.java b/tools/src/main/java/org/apache/kafka/tools/ToolsUtils.java
index 0e5d130..3a80b58 100644
--- a/tools/src/main/java/org/apache/kafka/tools/ToolsUtils.java
+++ b/tools/src/main/java/org/apache/kafka/tools/ToolsUtils.java
@@ -19,7 +19,6 @@ package org.apache.kafka.tools;
import org.apache.kafka.common.Metric;
import org.apache.kafka.common.MetricName;
-import java.util.Comparator;
import java.util.Map;
import java.util.TreeMap;
@@ -32,12 +31,7 @@ public class ToolsUtils {
public static void printMetrics(Map<MetricName, ? extends Metric> metrics) {
if (metrics != null && !metrics.isEmpty()) {
int maxLengthOfDisplayName = 0;
- TreeMap<String, Object> sortedMetrics = new TreeMap<>(new Comparator<String>() {
- @Override
- public int compare(String o1, String o2) {
- return o1.compareTo(o2);
- }
- });
+ TreeMap<String, Object> sortedMetrics = new TreeMap<>();
for (Metric metric : metrics.values()) {
MetricName mName = metric.metricName();
String mergedName = mName.group() + ":" + mName.name() + ":" + mName.tags();
diff --git a/tools/src/main/java/org/apache/kafka/tools/TransactionalMessageCopier.java b/tools/src/main/java/org/apache/kafka/tools/TransactionalMessageCopier.java
index 27e7c7f..a0ac1f1 100644
--- a/tools/src/main/java/org/apache/kafka/tools/TransactionalMessageCopier.java
+++ b/tools/src/main/java/org/apache/kafka/tools/TransactionalMessageCopier.java
@@ -263,18 +263,15 @@ public class TransactionalMessageCopier {
final AtomicBoolean isShuttingDown = new AtomicBoolean(false);
final AtomicLong remainingMessages = new AtomicLong(maxMessages);
final AtomicLong numMessagesProcessed = new AtomicLong(0);
- Runtime.getRuntime().addShutdownHook(new Thread() {
- @Override
- public void run() {
- isShuttingDown.set(true);
- // Flush any remaining messages
- producer.close();
- synchronized (consumer) {
- consumer.close();
- }
- System.out.println(shutDownString(numMessagesProcessed.get(), remainingMessages.get(), transactionalId));
+ Runtime.getRuntime().addShutdownHook(new Thread(() -> {
+ isShuttingDown.set(true);
+ // Flush any remaining messages
+ producer.close();
+ synchronized (consumer) {
+ consumer.close();
}
- });
+ System.out.println(shutDownString(numMessagesProcessed.get(), remainingMessages.get(), transactionalId));
+ }));
try {
Random random = new Random();
diff --git a/tools/src/main/java/org/apache/kafka/tools/VerifiableConsumer.java b/tools/src/main/java/org/apache/kafka/tools/VerifiableConsumer.java
index 58f3471..1297841 100644
--- a/tools/src/main/java/org/apache/kafka/tools/VerifiableConsumer.java
+++ b/tools/src/main/java/org/apache/kafka/tools/VerifiableConsumer.java
@@ -620,12 +620,7 @@ public class VerifiableConsumer implements Closeable, OffsetCommitCallback, Cons
try {
final VerifiableConsumer consumer = createFromArgs(parser, args);
- Runtime.getRuntime().addShutdownHook(new Thread() {
- @Override
- public void run() {
- consumer.close();
- }
- });
+ Runtime.getRuntime().addShutdownHook(new Thread(() -> consumer.close()));
consumer.run();
} catch (ArgumentParserException e) {
diff --git a/tools/src/main/java/org/apache/kafka/tools/VerifiableLog4jAppender.java b/tools/src/main/java/org/apache/kafka/tools/VerifiableLog4jAppender.java
index 9d23bf3..12aa4f4 100644
--- a/tools/src/main/java/org/apache/kafka/tools/VerifiableLog4jAppender.java
+++ b/tools/src/main/java/org/apache/kafka/tools/VerifiableLog4jAppender.java
@@ -241,13 +241,10 @@ public class VerifiableLog4jAppender {
final VerifiableLog4jAppender appender = createFromArgs(args);
boolean infinite = appender.maxMessages < 0;
- Runtime.getRuntime().addShutdownHook(new Thread() {
- @Override
- public void run() {
- // Trigger main thread to stop producing messages
- appender.stopLogging = true;
- }
- });
+ Runtime.getRuntime().addShutdownHook(new Thread(() -> {
+ // Trigger main thread to stop producing messages
+ appender.stopLogging = true;
+ }));
long maxMessages = infinite ? Long.MAX_VALUE : appender.maxMessages;
for (long i = 0; i < maxMessages; i++) {
diff --git a/tools/src/main/java/org/apache/kafka/tools/VerifiableProducer.java b/tools/src/main/java/org/apache/kafka/tools/VerifiableProducer.java
index f0a991f..3e6f3f1 100644
--- a/tools/src/main/java/org/apache/kafka/tools/VerifiableProducer.java
+++ b/tools/src/main/java/org/apache/kafka/tools/VerifiableProducer.java
@@ -517,22 +517,19 @@ public class VerifiableProducer implements AutoCloseable {
final long startMs = System.currentTimeMillis();
ThroughputThrottler throttler = new ThroughputThrottler(producer.throughput, startMs);
- Runtime.getRuntime().addShutdownHook(new Thread() {
- @Override
- public void run() {
- // Trigger main thread to stop producing messages
- producer.stopProducing = true;
+ Runtime.getRuntime().addShutdownHook(new Thread(() -> {
+ // Trigger main thread to stop producing messages
+ producer.stopProducing = true;
- // Flush any remaining messages
- producer.close();
+ // Flush any remaining messages
+ producer.close();
- // Print a summary
- long stopMs = System.currentTimeMillis();
- double avgThroughput = 1000 * ((producer.numAcked) / (double) (stopMs - startMs));
+ // Print a summary
+ long stopMs = System.currentTimeMillis();
+ double avgThroughput = 1000 * ((producer.numAcked) / (double) (stopMs - startMs));
- producer.printJson(new ToolData(producer.numSent, producer.numAcked, producer.throughput, avgThroughput));
- }
- });
+ producer.printJson(new ToolData(producer.numSent, producer.numAcked, producer.throughput, avgThroughput));
+ }));
producer.run(throttler);
} catch (ArgumentParserException e) {
diff --git a/tools/src/main/java/org/apache/kafka/trogdor/agent/Agent.java b/tools/src/main/java/org/apache/kafka/trogdor/agent/Agent.java
index 20d34b7..c76ef26 100644
--- a/tools/src/main/java/org/apache/kafka/trogdor/agent/Agent.java
+++ b/tools/src/main/java/org/apache/kafka/trogdor/agent/Agent.java
@@ -147,18 +147,15 @@ public final class Agent {
log.info("Starting agent process.");
final Agent agent = new Agent(platform, Scheduler.SYSTEM, restServer, resource);
restServer.start(resource);
- Runtime.getRuntime().addShutdownHook(new Thread() {
- @Override
- public void run() {
- log.warn("Running agent shutdown hook.");
- try {
- agent.beginShutdown();
- agent.waitForShutdown();
- } catch (Exception e) {
- log.error("Got exception while running agent shutdown hook.", e);
- }
+ Runtime.getRuntime().addShutdownHook(new Thread(() -> {
+ log.warn("Running agent shutdown hook.");
+ try {
+ agent.beginShutdown();
+ agent.waitForShutdown();
+ } catch (Exception e) {
+ log.error("Got exception while running agent shutdown hook.", e);
}
- });
+ }));
agent.waitForShutdown();
}
};
diff --git a/tools/src/main/java/org/apache/kafka/trogdor/agent/WorkerManager.java b/tools/src/main/java/org/apache/kafka/trogdor/agent/WorkerManager.java
index 59d34c9..ef02716 100644
--- a/tools/src/main/java/org/apache/kafka/trogdor/agent/WorkerManager.java
+++ b/tools/src/main/java/org/apache/kafka/trogdor/agent/WorkerManager.java
@@ -317,21 +317,18 @@ public final class WorkerManager {
return;
}
KafkaFutureImpl<String> haltFuture = new KafkaFutureImpl<>();
- haltFuture.thenApply(new KafkaFuture.BaseFunction<String, Void>() {
- @Override
- public Void apply(String errorString) {
- if (errorString == null)
- errorString = "";
- if (errorString.isEmpty()) {
- log.info("{}: Worker {} is halting.", nodeName, worker);
- } else {
- log.info("{}: Worker {} is halting with error {}",
- nodeName, worker, errorString);
- }
- stateChangeExecutor.submit(
- new HandleWorkerHalting(worker, errorString, false));
- return null;
+ haltFuture.thenApply((KafkaFuture.BaseFunction<String, Void>) errorString -> {
+ if (errorString == null)
+ errorString = "";
+ if (errorString.isEmpty()) {
+ log.info("{}: Worker {} is halting.", nodeName, worker);
+ } else {
+ log.info("{}: Worker {} is halting with error {}",
+ nodeName, worker, errorString);
}
+ stateChangeExecutor.submit(
+ new HandleWorkerHalting(worker, errorString, false));
+ return null;
});
try {
worker.taskWorker.start(platform, worker.status, haltFuture);
diff --git a/tools/src/main/java/org/apache/kafka/trogdor/coordinator/Coordinator.java b/tools/src/main/java/org/apache/kafka/trogdor/coordinator/Coordinator.java
index cd3da90..a41a6f2 100644
--- a/tools/src/main/java/org/apache/kafka/trogdor/coordinator/Coordinator.java
+++ b/tools/src/main/java/org/apache/kafka/trogdor/coordinator/Coordinator.java
@@ -162,18 +162,15 @@ public final class Coordinator {
final Coordinator coordinator = new Coordinator(platform, Scheduler.SYSTEM,
restServer, resource, ThreadLocalRandom.current().nextLong(0, Long.MAX_VALUE / 2));
restServer.start(resource);
- Runtime.getRuntime().addShutdownHook(new Thread() {
- @Override
- public void run() {
- log.warn("Running coordinator shutdown hook.");
- try {
- coordinator.beginShutdown(false);
- coordinator.waitForShutdown();
- } catch (Exception e) {
- log.error("Got exception while running coordinator shutdown hook.", e);
- }
+ Runtime.getRuntime().addShutdownHook(new Thread(() -> {
+ log.warn("Running coordinator shutdown hook.");
+ try {
+ coordinator.beginShutdown(false);
+ coordinator.waitForShutdown();
+ } catch (Exception e) {
+ log.error("Got exception while running coordinator shutdown hook.", e);
}
- });
+ }));
coordinator.waitForShutdown();
}
};
diff --git a/tools/src/main/java/org/apache/kafka/trogdor/rest/JsonRestServer.java b/tools/src/main/java/org/apache/kafka/trogdor/rest/JsonRestServer.java
index ee8643b..196ec82 100644
--- a/tools/src/main/java/org/apache/kafka/trogdor/rest/JsonRestServer.java
+++ b/tools/src/main/java/org/apache/kafka/trogdor/rest/JsonRestServer.java
@@ -132,22 +132,19 @@ public class JsonRestServer {
*/
public void beginShutdown() {
if (!shutdownExecutor.isShutdown()) {
- shutdownExecutor.submit(new Callable<Void>() {
- @Override
- public Void call() throws Exception {
- try {
- log.info("Stopping REST server");
- jettyServer.stop();
- jettyServer.join();
- log.info("REST server stopped");
- } catch (Exception e) {
- log.error("Unable to stop REST server", e);
- } finally {
- jettyServer.destroy();
- }
- shutdownExecutor.shutdown();
- return null;
+ shutdownExecutor.submit((Callable<Void>) () -> {
+ try {
+ log.info("Stopping REST server");
+ jettyServer.stop();
+ jettyServer.join();
+ log.info("REST server stopped");
+ } catch (Exception e) {
+ log.error("Unable to stop REST server", e);
+ } finally {
+ jettyServer.destroy();
}
+ shutdownExecutor.shutdown();
+ return null;
});
}
}
diff --git a/tools/src/main/java/org/apache/kafka/trogdor/workload/ConnectionStressSpec.java b/tools/src/main/java/org/apache/kafka/trogdor/workload/ConnectionStressSpec.java
index c22396f..6141d30 100644
--- a/tools/src/main/java/org/apache/kafka/trogdor/workload/ConnectionStressSpec.java
+++ b/tools/src/main/java/org/apache/kafka/trogdor/workload/ConnectionStressSpec.java
@@ -19,7 +19,6 @@ package org.apache.kafka.trogdor.workload;
import com.fasterxml.jackson.annotation.JsonCreator;
import com.fasterxml.jackson.annotation.JsonProperty;
-import org.apache.kafka.trogdor.common.Topology;
import org.apache.kafka.trogdor.task.TaskController;
import org.apache.kafka.trogdor.task.TaskSpec;
import org.apache.kafka.trogdor.task.TaskWorker;
@@ -28,7 +27,6 @@ import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.Map;
-import java.util.Set;
import java.util.TreeSet;
/**
@@ -98,12 +96,7 @@ public class ConnectionStressSpec extends TaskSpec {
}
public TaskController newController(String id) {
- return new TaskController() {
- @Override
- public Set<String> targetNodes(Topology topology) {
- return new TreeSet<>(clientNodes);
- }
- };
+ return topology -> new TreeSet<>(clientNodes);
}
@Override
diff --git a/tools/src/main/java/org/apache/kafka/trogdor/workload/ProduceBenchSpec.java b/tools/src/main/java/org/apache/kafka/trogdor/workload/ProduceBenchSpec.java
index d15172f..34b5393 100644
--- a/tools/src/main/java/org/apache/kafka/trogdor/workload/ProduceBenchSpec.java
+++ b/tools/src/main/java/org/apache/kafka/trogdor/workload/ProduceBenchSpec.java
@@ -19,7 +19,6 @@ package org.apache.kafka.trogdor.workload;
import com.fasterxml.jackson.annotation.JsonCreator;
import com.fasterxml.jackson.annotation.JsonProperty;
-import org.apache.kafka.trogdor.common.Topology;
import org.apache.kafka.trogdor.task.TaskController;
import org.apache.kafka.trogdor.task.TaskSpec;
import org.apache.kafka.trogdor.task.TaskWorker;
@@ -27,7 +26,6 @@ import org.apache.kafka.trogdor.task.TaskWorker;
import java.util.Collections;
import java.util.Map;
import java.util.Optional;
-import java.util.Set;
/**
* The specification for a benchmark that produces messages to a set of topics.
@@ -170,12 +168,7 @@ public class ProduceBenchSpec extends TaskSpec {
@Override
public TaskController newController(String id) {
- return new TaskController() {
- @Override
- public Set<String> targetNodes(Topology topology) {
- return Collections.singleton(producerNode);
- }
- };
+ return topology -> Collections.singleton(producerNode);
}
@Override
diff --git a/tools/src/main/java/org/apache/kafka/trogdor/workload/RoundTripWorker.java b/tools/src/main/java/org/apache/kafka/trogdor/workload/RoundTripWorker.java
index 669fafc..b22292a 100644
--- a/tools/src/main/java/org/apache/kafka/trogdor/workload/RoundTripWorker.java
+++ b/tools/src/main/java/org/apache/kafka/trogdor/workload/RoundTripWorker.java
@@ -25,11 +25,9 @@ import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
-import org.apache.kafka.clients.producer.Callback;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.clients.producer.ProducerRecord;
-import org.apache.kafka.clients.producer.RecordMetadata;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.config.ConfigException;
import org.apache.kafka.common.errors.TimeoutException;
@@ -248,16 +246,13 @@ public class RoundTripWorker implements TaskWorker {
ProducerRecord<byte[], byte[]> record = new ProducerRecord<>(partition.topic(),
partition.partition(), KEY_GENERATOR.generate(messageIndex),
spec.valueGenerator().generate(messageIndex));
- producer.send(record, new Callback() {
- @Override
- public void onCompletion(RecordMetadata metadata, Exception exception) {
- if (exception == null) {
- unackedSends.countDown();
- } else {
- log.info("{}: Got exception when sending message {}: {}",
- id, messageIndex, exception.getMessage());
- toSendTracker.addFailed(messageIndex);
- }
+ producer.send(record, (metadata, exception) -> {
+ if (exception == null) {
+ unackedSends.countDown();
+ } else {
+ log.info("{}: Got exception when sending message {}: {}",
+ id, messageIndex, exception.getMessage());
+ toSendTracker.addFailed(messageIndex);
}
});
}
diff --git a/tools/src/main/java/org/apache/kafka/trogdor/workload/RoundTripWorkloadSpec.java b/tools/src/main/java/org/apache/kafka/trogdor/workload/RoundTripWorkloadSpec.java
index 9522e0a..42e09ee 100644
--- a/tools/src/main/java/org/apache/kafka/trogdor/workload/RoundTripWorkloadSpec.java
+++ b/tools/src/main/java/org/apache/kafka/trogdor/workload/RoundTripWorkloadSpec.java
@@ -19,14 +19,12 @@ package org.apache.kafka.trogdor.workload;
import com.fasterxml.jackson.annotation.JsonCreator;
import com.fasterxml.jackson.annotation.JsonProperty;
-import org.apache.kafka.trogdor.common.Topology;
import org.apache.kafka.trogdor.task.TaskController;
import org.apache.kafka.trogdor.task.TaskSpec;
import org.apache.kafka.trogdor.task.TaskWorker;
import java.util.Collections;
import java.util.Map;
-import java.util.Set;
/**
* The specification for a workload that sends messages to a broker and then
@@ -124,12 +122,7 @@ public class RoundTripWorkloadSpec extends TaskSpec {
@Override
public TaskController newController(String id) {
- return new TaskController() {
- @Override
- public Set<String> targetNodes(Topology topology) {
- return Collections.singleton(clientNode);
- }
- };
+ return topology -> Collections.singleton(clientNode);
}
@Override
diff --git a/tools/src/test/java/org/apache/kafka/trogdor/agent/AgentTest.java b/tools/src/test/java/org/apache/kafka/trogdor/agent/AgentTest.java
index 158e690..f0ea475 100644
--- a/tools/src/test/java/org/apache/kafka/trogdor/agent/AgentTest.java
+++ b/tools/src/test/java/org/apache/kafka/trogdor/agent/AgentTest.java
@@ -304,7 +304,7 @@ public class AgentTest {
try (MockKibosh mockKibosh = new MockKibosh()) {
Assert.assertEquals(KiboshControlFile.EMPTY, mockKibosh.read());
FilesUnreadableFaultSpec fooSpec = new FilesUnreadableFaultSpec(0, 900000,
- Collections.singleton("myAgent"), mockKibosh.tempDir.getPath().toString(), "/foo", 123);
+ Collections.singleton("myAgent"), mockKibosh.tempDir.getPath(), "/foo", 123);
client.createWorker(new CreateWorkerRequest(0, "foo", fooSpec));
new ExpectedTasks().
addTask(new ExpectedTaskBuilder("foo").
@@ -314,7 +314,7 @@ public class AgentTest {
Assert.assertEquals(new KiboshControlFile(Collections.<Kibosh.KiboshFaultSpec>singletonList(
new KiboshFilesUnreadableFaultSpec("/foo", 123))), mockKibosh.read());
FilesUnreadableFaultSpec barSpec = new FilesUnreadableFaultSpec(0, 900000,
- Collections.singleton("myAgent"), mockKibosh.tempDir.getPath().toString(), "/bar", 456);
+ Collections.singleton("myAgent"), mockKibosh.tempDir.getPath(), "/bar", 456);
client.createWorker(new CreateWorkerRequest(1, "bar", barSpec));
new ExpectedTasks().
addTask(new ExpectedTaskBuilder("foo").
diff --git a/tools/src/test/java/org/apache/kafka/trogdor/common/ExpectedTasks.java b/tools/src/test/java/org/apache/kafka/trogdor/common/ExpectedTasks.java
index c092c92..3eb781c 100644
--- a/tools/src/test/java/org/apache/kafka/trogdor/common/ExpectedTasks.java
+++ b/tools/src/test/java/org/apache/kafka/trogdor/common/ExpectedTasks.java
@@ -19,7 +19,6 @@ package org.apache.kafka.trogdor.common;
import com.fasterxml.jackson.annotation.JsonCreator;
import com.fasterxml.jackson.annotation.JsonProperty;
-import org.apache.kafka.test.TestCondition;
import org.apache.kafka.test.TestUtils;
import org.apache.kafka.trogdor.agent.AgentClient;
import org.apache.kafka.trogdor.coordinator.CoordinatorClient;
@@ -142,71 +141,65 @@ public class ExpectedTasks {
}
public ExpectedTasks waitFor(final CoordinatorClient client) throws InterruptedException {
- TestUtils.waitForCondition(new TestCondition() {
- @Override
- public boolean conditionMet() {
- TasksResponse tasks = null;
- try {
- tasks = client.tasks(new TasksRequest(null, 0, 0, 0, 0, Optional.empty()));
- } catch (Exception e) {
- log.info("Unable to get coordinator tasks", e);
- throw new RuntimeException(e);
- }
- StringBuilder errors = new StringBuilder();
- for (Map.Entry<String, ExpectedTask> entry : expected.entrySet()) {
- String id = entry.getKey();
- ExpectedTask task = entry.getValue();
- String differences = task.compare(tasks.tasks().get(id));
- if (differences != null) {
- errors.append(differences);
- }
- }
- String errorString = errors.toString();
- if (!errorString.isEmpty()) {
- log.info("EXPECTED TASKS: {}", JsonUtil.toJsonString(expected));
- log.info("ACTUAL TASKS : {}", JsonUtil.toJsonString(tasks.tasks()));
- log.info(errorString);
- return false;
+ TestUtils.waitForCondition(() -> {
+ TasksResponse tasks = null;
+ try {
+ tasks = client.tasks(new TasksRequest(null, 0, 0, 0, 0, Optional.empty()));
+ } catch (Exception e) {
+ log.info("Unable to get coordinator tasks", e);
+ throw new RuntimeException(e);
+ }
+ StringBuilder errors = new StringBuilder();
+ for (Map.Entry<String, ExpectedTask> entry : expected.entrySet()) {
+ String id = entry.getKey();
+ ExpectedTask task = entry.getValue();
+ String differences = task.compare(tasks.tasks().get(id));
+ if (differences != null) {
+ errors.append(differences);
}
- return true;
}
+ String errorString = errors.toString();
+ if (!errorString.isEmpty()) {
+ log.info("EXPECTED TASKS: {}", JsonUtil.toJsonString(expected));
+ log.info("ACTUAL TASKS : {}", JsonUtil.toJsonString(tasks.tasks()));
+ log.info(errorString);
+ return false;
+ }
+ return true;
}, "Timed out waiting for expected tasks " + JsonUtil.toJsonString(expected));
return this;
}
public ExpectedTasks waitFor(final AgentClient client) throws InterruptedException {
- TestUtils.waitForCondition(new TestCondition() {
- @Override
- public boolean conditionMet() {
- AgentStatusResponse status = null;
- try {
- status = client.status();
- } catch (Exception e) {
- log.info("Unable to get agent status", e);
- throw new RuntimeException(e);
- }
- StringBuilder errors = new StringBuilder();
- HashMap<String, WorkerState> taskIdToWorkerState = new HashMap<>();
- for (WorkerState state : status.workers().values()) {
- taskIdToWorkerState.put(state.taskId(), state);
- }
- for (Map.Entry<String, ExpectedTask> entry : expected.entrySet()) {
- String id = entry.getKey();
- ExpectedTask worker = entry.getValue();
- String differences = worker.compare(taskIdToWorkerState.get(id));
- if (differences != null) {
- errors.append(differences);
- }
- }
- String errorString = errors.toString();
- if (!errorString.isEmpty()) {
- log.info("EXPECTED WORKERS: {}", JsonUtil.toJsonString(expected));
- log.info("ACTUAL WORKERS : {}", JsonUtil.toJsonString(status.workers()));
- log.info(errorString);
- return false;
+ TestUtils.waitForCondition(() -> {
+ AgentStatusResponse status = null;
+ try {
+ status = client.status();
+ } catch (Exception e) {
+ log.info("Unable to get agent status", e);
+ throw new RuntimeException(e);
+ }
+ StringBuilder errors = new StringBuilder();
+ HashMap<String, WorkerState> taskIdToWorkerState = new HashMap<>();
+ for (WorkerState state : status.workers().values()) {
+ taskIdToWorkerState.put(state.taskId(), state);
+ }
+ for (Map.Entry<String, ExpectedTask> entry : expected.entrySet()) {
+ String id = entry.getKey();
+ ExpectedTask worker = entry.getValue();
+ String differences = worker.compare(taskIdToWorkerState.get(id));
+ if (differences != null) {
+ errors.append(differences);
}
- return true;
}
+ String errorString = errors.toString();
+ if (!errorString.isEmpty()) {
+ log.info("EXPECTED WORKERS: {}", JsonUtil.toJsonString(expected));
+ log.info("ACTUAL WORKERS : {}", JsonUtil.toJsonString(status.workers()));
+ log.info(errorString);
+ return false;
+ }
+ return true;
}, "Timed out waiting for expected workers " + JsonUtil.toJsonString(expected));
return this;
}
diff --git a/tools/src/test/java/org/apache/kafka/trogdor/common/MiniTrogdorCluster.java b/tools/src/test/java/org/apache/kafka/trogdor/common/MiniTrogdorCluster.java
index 46315c2..9edffaa 100644
--- a/tools/src/test/java/org/apache/kafka/trogdor/common/MiniTrogdorCluster.java
+++ b/tools/src/test/java/org/apache/kafka/trogdor/common/MiniTrogdorCluster.java
@@ -172,27 +172,24 @@ public class MiniTrogdorCluster implements AutoCloseable {
ThreadUtils.createThreadFactory("MiniTrogdorClusterStartupThread%d", false));
final AtomicReference<Exception> failure = new AtomicReference<Exception>(null);
for (final Map.Entry<String, NodeData> entry : nodes.entrySet()) {
- executor.submit(new Callable<Void>() {
- @Override
- public Void call() throws Exception {
- String nodeName = entry.getKey();
- try {
- NodeData node = entry.getValue();
- node.platform = new BasicPlatform(nodeName, topology, scheduler, commandRunner);
- if (node.agentRestResource != null) {
- node.agent = new Agent(node.platform, scheduler, node.agentRestServer,
- node.agentRestResource);
- }
- if (node.coordinatorRestResource != null) {
- node.coordinator = new Coordinator(node.platform, scheduler,
- node.coordinatorRestServer, node.coordinatorRestResource, 0);
- }
- } catch (Exception e) {
- log.error("Unable to initialize {}", nodeName, e);
- failure.compareAndSet(null, e);
+ executor.submit((Callable<Void>) () -> {
+ String nodeName = entry.getKey();
+ try {
+ NodeData node = entry.getValue();
+ node.platform = new BasicPlatform(nodeName, topology, scheduler, commandRunner);
+ if (node.agentRestResource != null) {
+ node.agent = new Agent(node.platform, scheduler, node.agentRestServer,
+ node.agentRestResource);
}
- return null;
+ if (node.coordinatorRestResource != null) {
+ node.coordinator = new Coordinator(node.platform, scheduler,
+ node.coordinatorRestServer, node.coordinatorRestResource, 0);
+ }
+ } catch (Exception e) {
+ log.error("Unable to initialize {}", nodeName, e);
+ failure.compareAndSet(null, e);
}
+ return null;
});
}
executor.shutdown();
diff --git a/tools/src/test/java/org/apache/kafka/trogdor/coordinator/CoordinatorTest.java b/tools/src/test/java/org/apache/kafka/trogdor/coordinator/CoordinatorTest.java
index 0207104..db1afac 100644
--- a/tools/src/test/java/org/apache/kafka/trogdor/coordinator/CoordinatorTest.java
+++ b/tools/src/test/java/org/apache/kafka/trogdor/coordinator/CoordinatorTest.java
@@ -24,7 +24,6 @@ import org.apache.kafka.common.utils.MockScheduler;
import org.apache.kafka.common.utils.MockTime;
import org.apache.kafka.common.utils.Scheduler;
import org.apache.kafka.common.utils.Utils;
-import org.apache.kafka.test.TestCondition;
import org.apache.kafka.test.TestUtils;
import org.apache.kafka.trogdor.agent.AgentClient;
import org.apache.kafka.trogdor.common.CapturingCommandRunner;
@@ -318,12 +317,8 @@ public class CoordinatorTest {
public ExpectedLines waitFor(final String nodeName,
final CapturingCommandRunner runner) throws InterruptedException {
- TestUtils.waitForCondition(new TestCondition() {
- @Override
- public boolean conditionMet() {
- return linesMatch(nodeName, runner.lines(nodeName));
- }
- }, "failed to find the expected lines " + this.toString());
+ TestUtils.waitForCondition(() -> linesMatch(nodeName, runner.lines(nodeName)),
+ "failed to find the expected lines " + this.toString());
return this;
}
@@ -473,7 +468,7 @@ public class CoordinatorTest {
assertEquals(0, coordinatorClient.tasks(
new TasksRequest(null, 10, 0, 10, 0, Optional.empty())).tasks().size());
TasksResponse resp1 = coordinatorClient.tasks(
- new TasksRequest(Arrays.asList(new String[] {"foo", "baz" }), 0, 0, 0, 0, Optional.empty()));
+ new TasksRequest(Arrays.asList("foo", "baz"), 0, 0, 0, 0, Optional.empty()));
assertTrue(resp1.tasks().containsKey("foo"));
assertFalse(resp1.tasks().containsKey("bar"));
assertEquals(1, resp1.tasks().size());
diff --git a/tools/src/test/java/org/apache/kafka/trogdor/task/SampleTaskWorker.java b/tools/src/test/java/org/apache/kafka/trogdor/task/SampleTaskWorker.java
index ade055d..404817a 100644
--- a/tools/src/test/java/org/apache/kafka/trogdor/task/SampleTaskWorker.java
+++ b/tools/src/test/java/org/apache/kafka/trogdor/task/SampleTaskWorker.java
@@ -22,7 +22,6 @@ import org.apache.kafka.common.internals.KafkaFutureImpl;
import org.apache.kafka.trogdor.common.Platform;
import org.apache.kafka.trogdor.common.ThreadUtils;
-import java.util.concurrent.Callable;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.ScheduledExecutorService;
@@ -53,12 +52,9 @@ public class SampleTaskWorker implements TaskWorker {
if (exitMs == null) {
exitMs = Long.MAX_VALUE;
}
- this.future = platform.scheduler().schedule(executor, new Callable<Void>() {
- @Override
- public Void call() throws Exception {
- haltFuture.complete(spec.error());
- return null;
- }
+ this.future = platform.scheduler().schedule(executor, () -> {
+ haltFuture.complete(spec.error());
+ return null;
}, exitMs);
}