You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by mj...@apache.org on 2018/02/25 20:26:21 UTC
[kafka] branch trunk updated: MINOR: fixes lgtm.com warnings (#4582)
This is an automated email from the ASF dual-hosted git repository.
mjsax pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push:
new 5df535e MINOR: fixes lgtm.com warnings (#4582)
5df535e is described below
commit 5df535e8a349771942050f1e3fd58851f413fa3a
Author: Matthias J. Sax <mj...@apache.org>
AuthorDate: Sun Feb 25 12:26:18 2018 -0800
MINOR: fixes lgtm.com warnings (#4582)
fixes lgmt.com warnings
cleanup PrintForeachAction and Printed
Author: Matthias J. Sax <ma...@confluent.io>
Reviewers: Sebastian Bauersfeld <se...@gmx.de>, Damian Guy <da...@confluent.io>, Bill Bejeck <bi...@confluent.io>, Guozhang Wang <gu...@confluent.io>
---
.../org/apache/kafka/common/cache/LRUCache.java | 2 +-
.../main/scala/kafka/tools/StreamsResetter.java | 30 +++++++++-----------
.../org/apache/kafka/streams/kstream/Printed.java | 19 ++++++-------
.../streams/kstream/internals/KTableImpl.java | 19 +++++++------
.../kstream/internals/PrintForeachAction.java | 32 +++++++++++-----------
.../streams/kstream/internals/PrintedInternal.java | 2 +-
.../processor/internals/InternalTopicManager.java | 2 +-
.../internals/StreamPartitionAssignor.java | 4 +--
.../streams/processor/internals/StreamThread.java | 12 ++++----
.../apache/kafka/streams/kstream/PrintedTest.java | 10 +++++--
.../kstream/internals/KStreamPrintTest.java | 16 ++---------
11 files changed, 70 insertions(+), 78 deletions(-)
diff --git a/clients/src/main/java/org/apache/kafka/common/cache/LRUCache.java b/clients/src/main/java/org/apache/kafka/common/cache/LRUCache.java
index bdc67ac..672cb65 100644
--- a/clients/src/main/java/org/apache/kafka/common/cache/LRUCache.java
+++ b/clients/src/main/java/org/apache/kafka/common/cache/LRUCache.java
@@ -29,7 +29,7 @@ public class LRUCache<K, V> implements Cache<K, V> {
cache = new LinkedHashMap<K, V>(16, .75f, true) {
@Override
protected boolean removeEldestEntry(Map.Entry<K, V> eldest) {
- return size() > maxSize;
+ return this.size() > maxSize; // require this. prefix to make lgtm.com happy
}
};
}
diff --git a/core/src/main/scala/kafka/tools/StreamsResetter.java b/core/src/main/scala/kafka/tools/StreamsResetter.java
index 31c69ee..f88fce7 100644
--- a/core/src/main/scala/kafka/tools/StreamsResetter.java
+++ b/core/src/main/scala/kafka/tools/StreamsResetter.java
@@ -386,7 +386,7 @@ public class StreamsResetter {
shiftOffsetsBy(client, inputTopicPartitions, options.valueOf(shiftByOption));
} else if (options.has(toDatetimeOption)) {
final String ts = options.valueOf(toDatetimeOption);
- final Long timestamp = getDateTime(ts);
+ final long timestamp = getDateTime(ts);
resetToDatetime(client, inputTopicPartitions, timestamp);
} else if (options.has(byDurationOption)) {
final String duration = options.valueOf(byDurationOption);
@@ -401,8 +401,7 @@ public class StreamsResetter {
}
for (final TopicPartition p : inputTopicPartitions) {
- final Long position = client.position(p);
- System.out.println("Topic: " + p.topic() + " Partition: " + p.partition() + " Offset: " + position);
+ System.out.println("Topic: " + p.topic() + " Partition: " + p.partition() + " Offset: " + client.position(p));
}
}
}
@@ -416,8 +415,7 @@ public class StreamsResetter {
checkOffsetRange(topicPartitionsAndOffset, beginningOffsets, endOffsets);
for (final TopicPartition topicPartition : inputTopicPartitions) {
- final Long offset = validatedTopicPartitionsAndOffset.get(topicPartition);
- client.seek(topicPartition, offset);
+ client.seek(topicPartition, validatedTopicPartitionsAndOffset.get(topicPartition));
}
}
@@ -429,7 +427,7 @@ public class StreamsResetter {
private void resetByDuration(Consumer<byte[], byte[]> client, Set<TopicPartition> inputTopicPartitions, Duration duration) throws DatatypeConfigurationException {
final Date now = new Date();
duration.negate().addTo(now);
- final Long timestamp = now.getTime();
+ final long timestamp = now.getTime();
final Map<TopicPartition, Long> topicPartitionsAndTimes = new HashMap<>(inputTopicPartitions.size());
for (final TopicPartition topicPartition : inputTopicPartitions) {
@@ -439,8 +437,7 @@ public class StreamsResetter {
final Map<TopicPartition, OffsetAndTimestamp> topicPartitionsAndOffset = client.offsetsForTimes(topicPartitionsAndTimes);
for (final TopicPartition topicPartition : inputTopicPartitions) {
- final Long offset = topicPartitionsAndOffset.get(topicPartition).offset();
- client.seek(topicPartition, offset);
+ client.seek(topicPartition, topicPartitionsAndOffset.get(topicPartition).offset());
}
}
@@ -453,20 +450,19 @@ public class StreamsResetter {
final Map<TopicPartition, OffsetAndTimestamp> topicPartitionsAndOffset = client.offsetsForTimes(topicPartitionsAndTimes);
for (final TopicPartition topicPartition : inputTopicPartitions) {
- final Long offset = topicPartitionsAndOffset.get(topicPartition).offset();
- client.seek(topicPartition, offset);
+ client.seek(topicPartition, topicPartitionsAndOffset.get(topicPartition).offset());
}
}
// visible for testing
- public void shiftOffsetsBy(Consumer<byte[], byte[]> client, Set<TopicPartition> inputTopicPartitions, Long shiftBy) {
+ public void shiftOffsetsBy(Consumer<byte[], byte[]> client, Set<TopicPartition> inputTopicPartitions, long shiftBy) {
final Map<TopicPartition, Long> endOffsets = client.endOffsets(inputTopicPartitions);
final Map<TopicPartition, Long> beginningOffsets = client.beginningOffsets(inputTopicPartitions);
final Map<TopicPartition, Long> topicPartitionsAndOffset = new HashMap<>(inputTopicPartitions.size());
for (final TopicPartition topicPartition : inputTopicPartitions) {
- final Long position = client.position(topicPartition);
- final Long offset = position + shiftBy;
+ final long position = client.position(topicPartition);
+ final long offset = position + shiftBy;
topicPartitionsAndOffset.put(topicPartition, offset);
}
@@ -497,7 +493,7 @@ public class StreamsResetter {
}
// visible for testing
- public Long getDateTime(String timestamp) throws ParseException {
+ public long getDateTime(String timestamp) throws ParseException {
final String[] timestampParts = timestamp.split("T");
if (timestampParts.length < 2) {
throw new ParseException("Error parsing timestamp. It does not contain a 'T' according to ISO8601 format", timestamp.length());
@@ -549,10 +545,10 @@ public class StreamsResetter {
final Map<TopicPartition, Long> endOffsets) {
final Map<TopicPartition, Long> validatedTopicPartitionsOffsets = new HashMap<>();
for (final Map.Entry<TopicPartition, Long> topicPartitionAndOffset : inputTopicPartitionsAndOffset.entrySet()) {
- final Long endOffset = endOffsets.get(topicPartitionAndOffset.getKey());
- final Long offset = topicPartitionAndOffset.getValue();
+ final long endOffset = endOffsets.get(topicPartitionAndOffset.getKey());
+ final long offset = topicPartitionAndOffset.getValue();
if (offset < endOffset) {
- final Long beginningOffset = beginningOffsets.get(topicPartitionAndOffset.getKey());
+ final long beginningOffset = beginningOffsets.get(topicPartitionAndOffset.getKey());
if (offset > beginningOffset) {
validatedTopicPartitionsOffsets.put(topicPartitionAndOffset.getKey(), offset);
} else {
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/Printed.java b/streams/src/main/java/org/apache/kafka/streams/kstream/Printed.java
index 8d2c22a..5a1d07f 100644
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/Printed.java
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/Printed.java
@@ -19,9 +19,8 @@ package org.apache.kafka.streams.kstream;
import org.apache.kafka.streams.errors.TopologyException;
import java.io.FileNotFoundException;
-import java.io.PrintWriter;
-import java.io.UnsupportedEncodingException;
-import java.nio.charset.StandardCharsets;
+import java.io.FileOutputStream;
+import java.io.OutputStream;
import java.util.Objects;
/**
@@ -32,7 +31,7 @@ import java.util.Objects;
* @see KStream#print(Printed)
*/
public class Printed<K, V> {
- protected final PrintWriter printWriter;
+ protected final OutputStream outputStream;
protected String label;
protected KeyValueMapper<? super K, ? super V, String> mapper = new KeyValueMapper<K, V, String>() {
@Override
@@ -41,8 +40,8 @@ public class Printed<K, V> {
}
};
- private Printed(final PrintWriter printWriter) {
- this.printWriter = printWriter;
+ private Printed(final OutputStream outputStream) {
+ this.outputStream = outputStream;
}
/**
@@ -50,7 +49,7 @@ public class Printed<K, V> {
* @param printed instance of {@link Printed} to copy
*/
protected Printed(final Printed<K, V> printed) {
- this.printWriter = printed.printWriter;
+ this.outputStream = printed.outputStream;
this.label = printed.label;
this.mapper = printed.mapper;
}
@@ -69,8 +68,8 @@ public class Printed<K, V> {
throw new TopologyException("filePath can't be an empty string");
}
try {
- return new Printed<>(new PrintWriter(filePath, StandardCharsets.UTF_8.name()));
- } catch (final FileNotFoundException | UnsupportedEncodingException e) {
+ return new Printed<>(new FileOutputStream(filePath));
+ } catch (final FileNotFoundException e) {
throw new TopologyException("Unable to write stream to file at [" + filePath + "] " + e.getMessage());
}
}
@@ -83,7 +82,7 @@ public class Printed<K, V> {
* @return a new Printed instance
*/
public static <K, V> Printed<K, V> toSysOut() {
- return new Printed<>((PrintWriter) null);
+ return new Printed<>(System.out);
}
/**
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableImpl.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableImpl.java
index a746d31..11b8c51 100644
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableImpl.java
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableImpl.java
@@ -39,9 +39,7 @@ import org.apache.kafka.streams.state.KeyValueStore;
import org.apache.kafka.streams.state.StoreBuilder;
import java.io.FileNotFoundException;
-import java.io.PrintWriter;
-import java.io.UnsupportedEncodingException;
-import java.nio.charset.StandardCharsets;
+import java.io.FileOutputStream;
import java.util.Objects;
import java.util.Set;
@@ -346,7 +344,10 @@ public class KTableImpl<K, S, V> extends AbstractStream<K> implements KTable<K,
final String label) {
Objects.requireNonNull(label, "label can't be null");
final String name = builder.newProcessorName(PRINTING_NAME);
- builder.internalTopologyBuilder.addProcessor(name, new KStreamPrint<>(new PrintForeachAction(null, defaultKeyValueMapper, label)), this.name);
+ builder.internalTopologyBuilder.addProcessor(
+ name,
+ new KStreamPrint<>(new PrintForeachAction<>(System.out, defaultKeyValueMapper, label)),
+ this.name);
}
@SuppressWarnings("deprecation")
@@ -384,11 +385,13 @@ public class KTableImpl<K, S, V> extends AbstractStream<K> implements KTable<K,
if (filePath.trim().isEmpty()) {
throw new TopologyException("filePath can't be an empty string");
}
- String name = builder.newProcessorName(PRINTING_NAME);
+ final String name = builder.newProcessorName(PRINTING_NAME);
try {
- PrintWriter printWriter = new PrintWriter(filePath, StandardCharsets.UTF_8.name());
- builder.internalTopologyBuilder.addProcessor(name, new KStreamPrint<>(new PrintForeachAction(printWriter, defaultKeyValueMapper, label)), this.name);
- } catch (final FileNotFoundException | UnsupportedEncodingException e) {
+ builder.internalTopologyBuilder.addProcessor(
+ name,
+ new KStreamPrint<>(new PrintForeachAction<>(new FileOutputStream(filePath), defaultKeyValueMapper, label)),
+ this.name);
+ } catch (final FileNotFoundException e) {
throw new TopologyException(String.format("Unable to write stream to file at [%s] %s", filePath, e.getMessage()));
}
}
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/PrintForeachAction.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/PrintForeachAction.java
index dcdd44f..174319f 100644
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/PrintForeachAction.java
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/PrintForeachAction.java
@@ -19,26 +19,30 @@ package org.apache.kafka.streams.kstream.internals;
import org.apache.kafka.streams.kstream.ForeachAction;
import org.apache.kafka.streams.kstream.KeyValueMapper;
+import java.io.OutputStream;
+import java.io.OutputStreamWriter;
import java.io.PrintWriter;
+import java.nio.charset.StandardCharsets;
public class PrintForeachAction<K, V> implements ForeachAction<K, V> {
private final String label;
private final PrintWriter printWriter;
+ private final boolean closable;
private final KeyValueMapper<? super K, ? super V, String> mapper;
+
/**
- * Print customized output with given writer. The PrintWriter can be null in order to
- * distinguish between {@code System.out} and the others. If the PrintWriter is {@code PrintWriter(System.out)},
- * then it would close {@code System.out} output stream.
- * <p>
- * Afterall, not to pass in {@code PrintWriter(System.out)} but {@code null} instead.
+ * Print customized output with given writer. The {@link OutputStream} can be {@link System#out} or the others.
*
- * @param printWriter Use {@code System.out.println} if {@code null}.
+ * @param outputStream The output stream to write to.
* @param mapper The mapper which can allow user to customize output will be printed.
* @param label The given name will be printed.
*/
- public PrintForeachAction(final PrintWriter printWriter, final KeyValueMapper<? super K, ? super V, String> mapper, final String label) {
- this.printWriter = printWriter;
+ PrintForeachAction(final OutputStream outputStream,
+ final KeyValueMapper<? super K, ? super V, String> mapper,
+ final String label) {
+ this.printWriter = new PrintWriter(new OutputStreamWriter(outputStream, StandardCharsets.UTF_8));
+ this.closable = outputStream != System.out && outputStream != System.err;
this.mapper = mapper;
this.label = label;
}
@@ -46,18 +50,14 @@ public class PrintForeachAction<K, V> implements ForeachAction<K, V> {
@Override
public void apply(final K key, final V value) {
final String data = String.format("[%s]: %s", label, mapper.apply(key, value));
- if (printWriter == null) {
- System.out.println(data);
- } else {
- printWriter.println(data);
- }
+ printWriter.println(data);
}
public void close() {
- if (printWriter == null) {
- System.out.flush();
- } else {
+ if (closable) {
printWriter.close();
+ } else {
+ printWriter.flush();
}
}
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/PrintedInternal.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/PrintedInternal.java
index 7e1a02d..45e2513 100644
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/PrintedInternal.java
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/PrintedInternal.java
@@ -31,6 +31,6 @@ public class PrintedInternal<K, V> extends Printed<K, V> {
* @return the {@code ProcessorSupplier} to be used for printing
*/
public ProcessorSupplier<K, V> build(final String processorName) {
- return new KStreamPrint<>(new PrintForeachAction<>(printWriter, mapper, label != null ? label : processorName));
+ return new KStreamPrint<>(new PrintForeachAction<>(outputStream, mapper, label != null ? label : processorName));
}
}
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/InternalTopicManager.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/InternalTopicManager.java
index 05d079b..aeff946 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/InternalTopicManager.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/InternalTopicManager.java
@@ -221,7 +221,7 @@ public class InternalTopicManager {
final Map<String, Integer> existingTopicNamesPartitions) {
final Set<InternalTopicConfig> topicsToBeCreated = new HashSet<>();
for (final InternalTopicConfig topic : topicsPartitionsMap) {
- final Integer numberOfPartitions = topic.numberOfPartitions();
+ final int numberOfPartitions = topic.numberOfPartitions();
if (existingTopicNamesPartitions.containsKey(topic.name())) {
if (!existingTopicNamesPartitions.get(topic.name()).equals(numberOfPartitions)) {
final String errorMsg = String.format("Existing internal topic %s has invalid partitions: " +
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamPartitionAssignor.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamPartitionAssignor.java
index 2a08308..2a26272 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamPartitionAssignor.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamPartitionAssignor.java
@@ -377,7 +377,7 @@ public class StreamPartitionAssignor implements PartitionAssignor, Configurable
final Map<TopicPartition, PartitionInfo> allRepartitionTopicPartitions = new HashMap<>();
for (Map.Entry<String, InternalTopicMetadata> entry : repartitionTopicMetadata.entrySet()) {
final String topic = entry.getKey();
- final Integer numPartitions = entry.getValue().numPartitions;
+ final int numPartitions = entry.getValue().numPartitions;
for (int partition = 0; partition < numPartitions; partition++) {
allRepartitionTopicPartitions.put(new TopicPartition(topic, partition),
@@ -638,7 +638,7 @@ public class StreamPartitionAssignor implements PartitionAssignor, Configurable
for (final InternalTopicMetadata metadata : topicPartitions.values()) {
final InternalTopicConfig topic = metadata.config;
- final Integer numPartitions = metadata.numPartitions;
+ final int numPartitions = metadata.numPartitions;
if (numPartitions == NOT_AVAILABLE) {
continue;
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java
index 5e25d02..61a22be 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java
@@ -894,15 +894,13 @@ public class StreamThread extends Thread {
* @param records Records, can be null
*/
private void addRecordsToTasks(final ConsumerRecords<byte[], byte[]> records) {
- if (records != null && !records.isEmpty()) {
- int numAddedRecords = 0;
+ int numAddedRecords = 0;
- for (final TopicPartition partition : records.partitions()) {
- final StreamTask task = taskManager.activeTask(partition);
- numAddedRecords += task.addRecords(partition, records.records(partition));
- }
- streamsMetrics.skippedRecordsSensor.record(records.count() - numAddedRecords, timerStartedMs);
+ for (final TopicPartition partition : records.partitions()) {
+ final StreamTask task = taskManager.activeTask(partition);
+ numAddedRecords += task.addRecords(partition, records.records(partition));
}
+ streamsMetrics.skippedRecordsSensor.record(records.count() - numAddedRecords, timerStartedMs);
}
/**
diff --git a/streams/src/test/java/org/apache/kafka/streams/kstream/PrintedTest.java b/streams/src/test/java/org/apache/kafka/streams/kstream/PrintedTest.java
index adec9ff..a50fce9 100644
--- a/streams/src/test/java/org/apache/kafka/streams/kstream/PrintedTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/kstream/PrintedTest.java
@@ -41,11 +41,12 @@ public class PrintedTest {
private final PrintStream originalSysOut = System.out;
private final ByteArrayOutputStream sysOut = new ByteArrayOutputStream();
- private final Printed<String, Integer> sysOutPrinter = Printed.toSysOut();
+ private Printed<String, Integer> sysOutPrinter;
@Before
public void before() {
System.setOut(new PrintStream(sysOut));
+ sysOutPrinter = Printed.toSysOut();
}
@After
@@ -72,7 +73,10 @@ public class PrintedTest {
@Test
public void shouldCreateProcessorThatPrintsToStdOut() throws UnsupportedEncodingException {
final ProcessorSupplier<String, Integer> supplier = new PrintedInternal<>(sysOutPrinter).build("processor");
- supplier.get().process("good", 2);
+ final Processor<String, Integer> processor = supplier.get();
+
+ processor.process("good", 2);
+ processor.close();
assertThat(sysOut.toString(StandardCharsets.UTF_8.name()), equalTo("[processor]: good, 2\n"));
}
@@ -83,6 +87,7 @@ public class PrintedTest {
.get();
processor.process("hello", 3);
+ processor.close();
assertThat(sysOut.toString(StandardCharsets.UTF_8.name()), equalTo("[label]: hello, 3\n"));
}
@@ -97,6 +102,7 @@ public class PrintedTest {
})).build("processor")
.get();
processor.process("hello", 1);
+ processor.close();
assertThat(sysOut.toString(StandardCharsets.UTF_8.name()), equalTo("[processor]: hello -> 1\n"));
}
diff --git a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamPrintTest.java b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamPrintTest.java
index e1a014d..3ba88e7 100644
--- a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamPrintTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamPrintTest.java
@@ -16,22 +16,16 @@
*/
package org.apache.kafka.streams.kstream.internals;
-import org.apache.kafka.common.serialization.Serde;
-import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.streams.KeyValue;
import org.apache.kafka.streams.kstream.KeyValueMapper;
import org.apache.kafka.streams.processor.Processor;
import org.apache.kafka.streams.processor.ProcessorContext;
-
import org.easymock.EasyMock;
import org.junit.Before;
import org.junit.Test;
import java.io.ByteArrayOutputStream;
-import java.io.OutputStreamWriter;
-import java.io.PrintWriter;
import java.nio.charset.Charset;
-import java.nio.charset.StandardCharsets;
import java.util.Arrays;
import java.util.List;
@@ -39,9 +33,6 @@ import static org.junit.Assert.assertEquals;
public class KStreamPrintTest {
- private final Serde<Integer> intSerd = Serdes.Integer();
- private final Serde<String> stringSerd = Serdes.String();
- private PrintWriter printWriter;
private ByteArrayOutputStream byteOutStream;
private KeyValueMapper<Integer, String, String> mapper;
@@ -49,9 +40,8 @@ public class KStreamPrintTest {
private Processor printProcessor;
@Before
- public void setUp() {
+ public void setUp() throws Exception {
byteOutStream = new ByteArrayOutputStream();
- printWriter = new PrintWriter(new OutputStreamWriter(byteOutStream, StandardCharsets.UTF_8));
mapper = new KeyValueMapper<Integer, String, String>() {
@Override
@@ -60,7 +50,7 @@ public class KStreamPrintTest {
}
};
- kStreamPrint = new KStreamPrint<>(new PrintForeachAction<>(printWriter, mapper, "test-stream"));
+ kStreamPrint = new KStreamPrint<>(new PrintForeachAction<>(byteOutStream, mapper, "test-stream"));
printProcessor = kStreamPrint.get();
ProcessorContext processorContext = EasyMock.createNiceMock(ProcessorContext.class);
@@ -98,7 +88,7 @@ public class KStreamPrintTest {
for (KeyValue<K, V> record: inputRecords) {
printProcessor.process(record.key, record.value);
}
- printWriter.flush();
+ printProcessor.close();
assertFlushData(expectedResult, byteOutStream);
}
}
--
To stop receiving notification emails like this one, please contact
mjsax@apache.org.