You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by mj...@apache.org on 2019/04/20 01:29:47 UTC
[kafka] branch trunk updated: MINOR: Java8 cleanup (#6599)
This is an automated email from the ASF dual-hosted git repository.
mjsax pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push:
new c596204 MINOR: Java8 cleanup (#6599)
c596204 is described below
commit c596204c0dc880ee51772f06cc786057f78f34f3
Author: Matthias J. Sax <mj...@apache.org>
AuthorDate: Fri Apr 19 18:29:27 2019 -0700
MINOR: Java8 cleanup (#6599)
Reviewers: Bill Bejeck <bi...@confluent.io>, Bruno Cadonna <br...@confluent.io>
---
.../kafka/streams/tests/StreamsUpgradeTest.java | 53 ++++++++++------------
1 file changed, 23 insertions(+), 30 deletions(-)
diff --git a/streams/upgrade-system-tests-20/src/test/java/org/apache/kafka/streams/tests/StreamsUpgradeTest.java b/streams/upgrade-system-tests-20/src/test/java/org/apache/kafka/streams/tests/StreamsUpgradeTest.java
index 9fef2b1..0b4939c 100644
--- a/streams/upgrade-system-tests-20/src/test/java/org/apache/kafka/streams/tests/StreamsUpgradeTest.java
+++ b/streams/upgrade-system-tests-20/src/test/java/org/apache/kafka/streams/tests/StreamsUpgradeTest.java
@@ -16,17 +16,17 @@
*/
package org.apache.kafka.streams.tests;
-import java.util.Properties;
import org.apache.kafka.common.utils.Utils;
import org.apache.kafka.streams.KafkaStreams;
import org.apache.kafka.streams.StreamsBuilder;
import org.apache.kafka.streams.StreamsConfig;
import org.apache.kafka.streams.kstream.KStream;
import org.apache.kafka.streams.processor.AbstractProcessor;
-import org.apache.kafka.streams.processor.Processor;
import org.apache.kafka.streams.processor.ProcessorContext;
import org.apache.kafka.streams.processor.ProcessorSupplier;
+import java.util.Properties;
+
public class StreamsUpgradeTest {
@@ -59,40 +59,33 @@ public class StreamsUpgradeTest {
final KafkaStreams streams = new KafkaStreams(builder.build(), config);
streams.start();
- Runtime.getRuntime().addShutdownHook(new Thread() {
- @Override
- public void run() {
- streams.close();
- System.out.println("UPGRADE-TEST-CLIENT-CLOSED");
- System.out.flush();
- }
- });
+ Runtime.getRuntime().addShutdownHook(new Thread(() -> {
+ streams.close();
+ System.out.println("UPGRADE-TEST-CLIENT-CLOSED");
+ System.out.flush();
+ }));
}
private static <K, V> ProcessorSupplier<K, V> printProcessorSupplier() {
- return new ProcessorSupplier<K, V>() {
- public Processor<K, V> get() {
- return new AbstractProcessor<K, V>() {
- private int numRecordsProcessed = 0;
-
- @Override
- public void init(final ProcessorContext context) {
- System.out.println("[2.0] initializing processor: topic=data taskId=" + context.taskId());
- numRecordsProcessed = 0;
- }
+ return () -> new AbstractProcessor<K, V>() {
+ private int numRecordsProcessed = 0;
- @Override
- public void process(final K key, final V value) {
- numRecordsProcessed++;
- if (numRecordsProcessed % 100 == 0) {
- System.out.println("processed " + numRecordsProcessed + " records from topic=data");
- }
- }
+ @Override
+ public void init(final ProcessorContext context) {
+ System.out.println("[2.0] initializing processor: topic=data taskId=" + context.taskId());
+ numRecordsProcessed = 0;
+ }
- @Override
- public void close() {}
- };
+ @Override
+ public void process(final K key, final V value) {
+ numRecordsProcessed++;
+ if (numRecordsProcessed % 100 == 0) {
+ System.out.println("processed " + numRecordsProcessed + " records from topic=data");
+ }
}
+
+ @Override
+ public void close() {}
};
}
}