You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by mj...@apache.org on 2019/04/20 01:29:47 UTC

[kafka] branch trunk updated: MINOR: Java8 cleanup (#6599)

This is an automated email from the ASF dual-hosted git repository.

mjsax pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new c596204  MINOR: Java8 cleanup (#6599)
c596204 is described below

commit c596204c0dc880ee51772f06cc786057f78f34f3
Author: Matthias J. Sax <mj...@apache.org>
AuthorDate: Fri Apr 19 18:29:27 2019 -0700

    MINOR: Java8 cleanup (#6599)
    
    Reviewers: Bill Bejeck <bi...@confluent.io>, Bruno Cadonna <br...@confluent.io>
---
 .../kafka/streams/tests/StreamsUpgradeTest.java    | 53 ++++++++++------------
 1 file changed, 23 insertions(+), 30 deletions(-)

diff --git a/streams/upgrade-system-tests-20/src/test/java/org/apache/kafka/streams/tests/StreamsUpgradeTest.java b/streams/upgrade-system-tests-20/src/test/java/org/apache/kafka/streams/tests/StreamsUpgradeTest.java
index 9fef2b1..0b4939c 100644
--- a/streams/upgrade-system-tests-20/src/test/java/org/apache/kafka/streams/tests/StreamsUpgradeTest.java
+++ b/streams/upgrade-system-tests-20/src/test/java/org/apache/kafka/streams/tests/StreamsUpgradeTest.java
@@ -16,17 +16,17 @@
  */
 package org.apache.kafka.streams.tests;
 
-import java.util.Properties;
 import org.apache.kafka.common.utils.Utils;
 import org.apache.kafka.streams.KafkaStreams;
 import org.apache.kafka.streams.StreamsBuilder;
 import org.apache.kafka.streams.StreamsConfig;
 import org.apache.kafka.streams.kstream.KStream;
 import org.apache.kafka.streams.processor.AbstractProcessor;
-import org.apache.kafka.streams.processor.Processor;
 import org.apache.kafka.streams.processor.ProcessorContext;
 import org.apache.kafka.streams.processor.ProcessorSupplier;
 
+import java.util.Properties;
+
 public class StreamsUpgradeTest {
 
 
@@ -59,40 +59,33 @@ public class StreamsUpgradeTest {
         final KafkaStreams streams = new KafkaStreams(builder.build(), config);
         streams.start();
 
-        Runtime.getRuntime().addShutdownHook(new Thread() {
-            @Override
-            public void run() {
-                streams.close();
-                System.out.println("UPGRADE-TEST-CLIENT-CLOSED");
-                System.out.flush();
-            }
-        });
+        Runtime.getRuntime().addShutdownHook(new Thread(() -> {
+            streams.close();
+            System.out.println("UPGRADE-TEST-CLIENT-CLOSED");
+            System.out.flush();
+        }));
     }
 
     private static <K, V> ProcessorSupplier<K, V> printProcessorSupplier() {
-        return new ProcessorSupplier<K, V>() {
-            public Processor<K, V> get() {
-                return new AbstractProcessor<K, V>() {
-                    private int numRecordsProcessed = 0;
-
-                    @Override
-                    public void init(final ProcessorContext context) {
-                        System.out.println("[2.0] initializing processor: topic=data taskId=" + context.taskId());
-                        numRecordsProcessed = 0;
-                    }
+        return () -> new AbstractProcessor<K, V>() {
+            private int numRecordsProcessed = 0;
 
-                    @Override
-                    public void process(final K key, final V value) {
-                        numRecordsProcessed++;
-                        if (numRecordsProcessed % 100 == 0) {
-                            System.out.println("processed " + numRecordsProcessed + " records from topic=data");
-                        }
-                    }
+            @Override
+            public void init(final ProcessorContext context) {
+                System.out.println("[2.0] initializing processor: topic=data taskId=" + context.taskId());
+                numRecordsProcessed = 0;
+            }
 
-                    @Override
-                    public void close() {}
-                };
+            @Override
+            public void process(final K key, final V value) {
+                numRecordsProcessed++;
+                if (numRecordsProcessed % 100 == 0) {
+                    System.out.println("processed " + numRecordsProcessed + " records from topic=data");
+                }
             }
+
+            @Override
+            public void close() {}
         };
     }
 }