You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by mj...@apache.org on 2018/05/08 23:07:36 UTC
[kafka] branch trunk updated: MINOR: add equals()/hashCode() for
Produced/Consumed (#4979)
This is an automated email from the ASF dual-hosted git repository.
mjsax pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push:
new b328fc7 MINOR: add equals()/hashCode() for Produced/Consumed (#4979)
b328fc7 is described below
commit b328fc729b79fee6a9d210cf25890068d92943ee
Author: dan norwood <da...@gmail.com>
AuthorDate: Tue May 8 16:07:33 2018 -0700
MINOR: add equals()/hashCode() for Produced/Consumed (#4979)
Reviewer: Matthias J. Sax <ma...@confluent.io>
---
.../java/org/apache/kafka/streams/Consumed.java | 22 ++++++++++++++++++++++
.../org/apache/kafka/streams/kstream/Produced.java | 21 +++++++++++++++++++++
2 files changed, 43 insertions(+)
diff --git a/streams/src/main/java/org/apache/kafka/streams/Consumed.java b/streams/src/main/java/org/apache/kafka/streams/Consumed.java
index 78d6b24..37d30a2 100644
--- a/streams/src/main/java/org/apache/kafka/streams/Consumed.java
+++ b/streams/src/main/java/org/apache/kafka/streams/Consumed.java
@@ -22,6 +22,8 @@ import org.apache.kafka.streams.kstream.KStream;
import org.apache.kafka.streams.kstream.KTable;
import org.apache.kafka.streams.processor.TimestampExtractor;
+import java.util.Objects;
+
/**
* The {@code Consumed} class is used to define the optional parameters when using {@link StreamsBuilder} to
* build instances of {@link KStream}, {@link KTable}, and {@link GlobalKTable}.
@@ -174,4 +176,24 @@ public class Consumed<K, V> {
this.resetPolicy = resetPolicy;
return this;
}
+
+ @Override
+ public boolean equals(final Object o) {
+ if (this == o) {
+ return true;
+ }
+ if (o == null || getClass() != o.getClass()) {
+ return false;
+ }
+ final Consumed<?, ?> consumed = (Consumed<?, ?>) o;
+ return Objects.equals(keySerde, consumed.keySerde) &&
+ Objects.equals(valueSerde, consumed.valueSerde) &&
+ Objects.equals(timestampExtractor, consumed.timestampExtractor) &&
+ resetPolicy == consumed.resetPolicy;
+ }
+
+ @Override
+ public int hashCode() {
+ return Objects.hash(keySerde, valueSerde, timestampExtractor, resetPolicy);
+ }
}
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/Produced.java b/streams/src/main/java/org/apache/kafka/streams/kstream/Produced.java
index b2513ea..8d2742a 100644
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/Produced.java
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/Produced.java
@@ -22,6 +22,8 @@ import org.apache.kafka.streams.kstream.internals.WindowedSerializer;
import org.apache.kafka.streams.kstream.internals.WindowedStreamPartitioner;
import org.apache.kafka.streams.processor.StreamPartitioner;
+import java.util.Objects;
+
/**
* This class is used to provide the optional parameters when producing to new topics
* using {@link KStream#through(String, Produced)} or {@link KStream#to(String, Produced)}.
@@ -154,4 +156,23 @@ public class Produced<K, V> {
this.keySerde = keySerde;
return this;
}
+
+ @Override
+ public boolean equals(final Object o) {
+ if (this == o) {
+ return true;
+ }
+ if (o == null || getClass() != o.getClass()) {
+ return false;
+ }
+ final Produced<?, ?> produced = (Produced<?, ?>) o;
+ return Objects.equals(keySerde, produced.keySerde) &&
+ Objects.equals(valueSerde, produced.valueSerde) &&
+ Objects.equals(partitioner, produced.partitioner);
+ }
+
+ @Override
+ public int hashCode() {
+ return Objects.hash(keySerde, valueSerde, partitioner);
+ }
}
--
To stop receiving notification emails like this one, please contact
mjsax@apache.org.