You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by td...@apache.org on 2015/10/27 09:29:23 UTC
spark git commit: [SPARK-11270][STREAMING] Add improved equality
testing for TopicAndPartition from the Kafka Streaming API
Repository: spark
Updated Branches:
refs/heads/master feb8d6a44 -> 8f888eea1
[SPARK-11270][STREAMING] Add improved equality testing for TopicAndPartition from the Kafka Streaming API
jerryshao tdas
I know this is kind of minor, and I know you all are busy, but this brings this class in line with the `OffsetRange` class, and makes tests a little more concise.
Instead of doing something like:
```
assert topic_and_partition_instance._topic == "foo"
assert topic_and_partition_instance._partition == 0
```
You can do something like:
```
assert topic_and_partition_instance == TopicAndPartition("foo", 0)
```
Before:
```
>>> from pyspark.streaming.kafka import TopicAndPartition
>>> TopicAndPartition("foo", 0) == TopicAndPartition("foo", 0)
False
```
After:
```
>>> from pyspark.streaming.kafka import TopicAndPartition
>>> TopicAndPartition("foo", 0) == TopicAndPartition("foo", 0)
True
```
I couldn't find any tests - am I missing something?
Author: Nick Evans <me...@nicolasevans.org>
Closes #9236 from manygrams/topic_and_partition_equality.
Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/8f888eea
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/8f888eea
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/8f888eea
Branch: refs/heads/master
Commit: 8f888eea1aef5a28916ec406a99fc19648681ecf
Parents: feb8d6a
Author: Nick Evans <me...@nicolasevans.org>
Authored: Tue Oct 27 01:29:06 2015 -0700
Committer: Tathagata Das <ta...@gmail.com>
Committed: Tue Oct 27 01:29:06 2015 -0700
----------------------------------------------------------------------
python/pyspark/streaming/kafka.py | 10 ++++++++++
python/pyspark/streaming/tests.py | 10 ++++++++++
2 files changed, 20 insertions(+)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/spark/blob/8f888eea/python/pyspark/streaming/kafka.py
----------------------------------------------------------------------
diff --git a/python/pyspark/streaming/kafka.py b/python/pyspark/streaming/kafka.py
index b35bbaf..06e1591 100644
--- a/python/pyspark/streaming/kafka.py
+++ b/python/pyspark/streaming/kafka.py
@@ -254,6 +254,16 @@ class TopicAndPartition(object):
def _jTopicAndPartition(self, helper):
return helper.createTopicAndPartition(self._topic, self._partition)
+ def __eq__(self, other):
+ if isinstance(other, self.__class__):
+ return (self._topic == other._topic
+ and self._partition == other._partition)
+ else:
+ return False
+
+ def __ne__(self, other):
+ return not self.__eq__(other)
+
class Broker(object):
"""
http://git-wip-us.apache.org/repos/asf/spark/blob/8f888eea/python/pyspark/streaming/tests.py
----------------------------------------------------------------------
diff --git a/python/pyspark/streaming/tests.py b/python/pyspark/streaming/tests.py
index 2c908da..f7fa481 100644
--- a/python/pyspark/streaming/tests.py
+++ b/python/pyspark/streaming/tests.py
@@ -898,6 +898,16 @@ class KafkaStreamTests(PySparkStreamingTestCase):
self.assertEqual(offsetRanges, [OffsetRange(topic, 0, long(0), long(6))])
+ def test_topic_and_partition_equality(self):
+ topic_and_partition_a = TopicAndPartition("foo", 0)
+ topic_and_partition_b = TopicAndPartition("foo", 0)
+ topic_and_partition_c = TopicAndPartition("bar", 0)
+ topic_and_partition_d = TopicAndPartition("foo", 1)
+
+ self.assertEqual(topic_and_partition_a, topic_and_partition_b)
+ self.assertNotEqual(topic_and_partition_a, topic_and_partition_c)
+ self.assertNotEqual(topic_and_partition_a, topic_and_partition_d)
+
class FlumeStreamTests(PySparkStreamingTestCase):
timeout = 20 # seconds
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org