You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by ju...@apache.org on 2011/09/21 21:17:25 UTC
svn commit: r1173797 [9/10] - in /incubator/kafka/trunk/clients/csharp: ./
lib/StyleCop/ src/Kafka/ src/Kafka/Kafka.Client/
src/Kafka/Kafka.Client/Cfg/ src/Kafka/Kafka.Client/Cluster/
src/Kafka/Kafka.Client/Consumers/ src/Kafka/Kafka.Client/Exceptions/...
Added: incubator/kafka/trunk/clients/csharp/src/Kafka/Tests/Kafka.Client.IntegrationTests/TestHelper.cs
URL: http://svn.apache.org/viewvc/incubator/kafka/trunk/clients/csharp/src/Kafka/Tests/Kafka.Client.IntegrationTests/TestHelper.cs?rev=1173797&view=auto
==============================================================================
--- incubator/kafka/trunk/clients/csharp/src/Kafka/Tests/Kafka.Client.IntegrationTests/TestHelper.cs (added)
+++ incubator/kafka/trunk/clients/csharp/src/Kafka/Tests/Kafka.Client.IntegrationTests/TestHelper.cs Wed Sep 21 19:17:19 2011
@@ -0,0 +1,50 @@
+/*
+ * Copyright 2011 LinkedIn
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+*/
+
+namespace Kafka.Client.IntegrationTests
+{
+ using System;
+ using System.Collections.Generic;
+ using Kafka.Client.Cfg;
+ using Kafka.Client.Consumers;
+ using Kafka.Client.Requests;
+
+ public static class TestHelper
+ {
+ public static long GetCurrentKafkaOffset(string topic, KafkaClientConfiguration clientConfig)
+ {
+ return GetCurrentKafkaOffset(topic, clientConfig.KafkaServer.Address, clientConfig.KafkaServer.Port);
+ }
+
+ public static long GetCurrentKafkaOffset(string topic, string address, int port)
+ {
+ OffsetRequest request = new OffsetRequest(topic, 0, DateTime.Now.AddDays(-5).Ticks, 10);
+ ConsumerConfig consumerConfig = new ConsumerConfig();
+ consumerConfig.Host = address;
+ consumerConfig.Port = port;
+ IConsumer consumer = new Consumers.Consumer(consumerConfig);
+ IList<long> list = consumer.GetOffsetsBefore(request);
+ if (list.Count > 0)
+ {
+ return list[0];
+ }
+ else
+ {
+ return 0;
+ }
+ }
+ }
+}
Added: incubator/kafka/trunk/clients/csharp/src/Kafka/Tests/Kafka.Client.IntegrationTests/TestMultipleBrokersHelper.cs
URL: http://svn.apache.org/viewvc/incubator/kafka/trunk/clients/csharp/src/Kafka/Tests/Kafka.Client.IntegrationTests/TestMultipleBrokersHelper.cs?rev=1173797&view=auto
==============================================================================
--- incubator/kafka/trunk/clients/csharp/src/Kafka/Tests/Kafka.Client.IntegrationTests/TestMultipleBrokersHelper.cs (added)
+++ incubator/kafka/trunk/clients/csharp/src/Kafka/Tests/Kafka.Client.IntegrationTests/TestMultipleBrokersHelper.cs Wed Sep 21 19:17:19 2011
@@ -0,0 +1,80 @@
+/*
+ * Copyright 2011 LinkedIn
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+*/
+
+namespace Kafka.Client.IntegrationTests
+{
+ using System.Collections.Generic;
+ using Kafka.Client.Cfg;
+
+ public class TestMultipleBrokersHelper
+ {
+ private BrokerPartitionInfoCollection configBrokers =
+ KafkaClientConfiguration.GetConfiguration().BrokerPartitionInfos;
+
+ private Dictionary<int, long> offsets = new Dictionary<int, long>();
+
+ private BrokerPartitionInfo changedBroker;
+
+ private string topic;
+
+ public TestMultipleBrokersHelper(string topic)
+ {
+ this.topic = topic;
+ }
+
+ public BrokerPartitionInfo BrokerThatHasChanged
+ {
+ get { return changedBroker; }
+ }
+
+ public long OffsetFromBeforeTheChange
+ {
+ get
+ {
+ if (changedBroker != null)
+ {
+ return offsets[changedBroker.Id];
+ }
+ else
+ {
+ return 0;
+ }
+ }
+ }
+
+ public void GetCurrentOffsets()
+ {
+ foreach (BrokerPartitionInfo broker in configBrokers)
+ {
+ offsets.Add(broker.Id, TestHelper.GetCurrentKafkaOffset(topic, broker.Address, broker.Port));
+ }
+ }
+
+ public bool CheckIfAnyBrokerHasChanged()
+ {
+ foreach (BrokerPartitionInfo broker in configBrokers)
+ {
+ if (TestHelper.GetCurrentKafkaOffset(topic, broker.Address, broker.Port) != offsets[broker.Id])
+ {
+ changedBroker = broker;
+ return true;
+ }
+ }
+
+ return false;
+ }
+ }
+}
Added: incubator/kafka/trunk/clients/csharp/src/Kafka/Tests/Kafka.Client.IntegrationTests/TestsSetup.cs
URL: http://svn.apache.org/viewvc/incubator/kafka/trunk/clients/csharp/src/Kafka/Tests/Kafka.Client.IntegrationTests/TestsSetup.cs?rev=1173797&view=auto
==============================================================================
--- incubator/kafka/trunk/clients/csharp/src/Kafka/Tests/Kafka.Client.IntegrationTests/TestsSetup.cs (added)
+++ incubator/kafka/trunk/clients/csharp/src/Kafka/Tests/Kafka.Client.IntegrationTests/TestsSetup.cs Wed Sep 21 19:17:19 2011
@@ -0,0 +1,34 @@
+/*
+ * Copyright 2011 LinkedIn
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+*/
+
+namespace Kafka.Client.IntegrationTests
+{
+ using log4net;
+ using log4net.Config;
+ using NUnit.Framework;
+
+ [SetUpFixture]
+ public class TestsSetup
+ {
+ [SetUp]
+ public void Setup()
+ {
+ XmlConfigurator.Configure();
+ ILog logger = LogManager.GetLogger(typeof(TestsSetup));
+ logger.Info("Start logging");
+ }
+ }
+}
Added: incubator/kafka/trunk/clients/csharp/src/Kafka/Tests/Kafka.Client.IntegrationTests/ZKBrokerPartitionInfoTests.cs
URL: http://svn.apache.org/viewvc/incubator/kafka/trunk/clients/csharp/src/Kafka/Tests/Kafka.Client.IntegrationTests/ZKBrokerPartitionInfoTests.cs?rev=1173797&view=auto
==============================================================================
--- incubator/kafka/trunk/clients/csharp/src/Kafka/Tests/Kafka.Client.IntegrationTests/ZKBrokerPartitionInfoTests.cs (added)
+++ incubator/kafka/trunk/clients/csharp/src/Kafka/Tests/Kafka.Client.IntegrationTests/ZKBrokerPartitionInfoTests.cs Wed Sep 21 19:17:19 2011
@@ -0,0 +1,455 @@
+/*
+ * Copyright 2011 LinkedIn
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+*/
+
+namespace Kafka.Client.IntegrationTests
+{
+ using System.Collections.Generic;
+ using System.Linq;
+ using Kafka.Client.Cfg;
+ using Kafka.Client.Cluster;
+ using Kafka.Client.Producers.Partitioning;
+ using Kafka.Client.Utils;
+ using Kafka.Client.ZooKeeperIntegration;
+ using Kafka.Client.ZooKeeperIntegration.Listeners;
+ using NUnit.Framework;
+ using ZooKeeperNet;
+
+ [TestFixture]
+ public class ZKBrokerPartitionInfoTests : IntegrationFixtureBase
+ {
+ private KafkaClientConfiguration clientConfig;
+ private ZKConfig zkConfig;
+
+ [TestFixtureSetUp]
+ public void SetUp()
+ {
+ clientConfig = KafkaClientConfiguration.GetConfiguration();
+ zkConfig = new ProducerConfig(clientConfig);
+ }
+
+ [Test]
+ public void ZKBrokerPartitionInfoGetsAllBrokerInfo()
+ {
+ IDictionary<int, Broker> allBrokerInfo;
+ using (var brokerPartitionInfo = new ZKBrokerPartitionInfo(zkConfig, null))
+ {
+ allBrokerInfo = brokerPartitionInfo.GetAllBrokerInfo();
+ }
+
+ Assert.AreEqual(clientConfig.BrokerPartitionInfos.Count, allBrokerInfo.Count);
+ foreach (BrokerPartitionInfo cfgBrPartInfo in clientConfig.BrokerPartitionInfos)
+ {
+ Assert.IsTrue(allBrokerInfo.ContainsKey(cfgBrPartInfo.Id));
+ Assert.AreEqual(cfgBrPartInfo.Address, allBrokerInfo[cfgBrPartInfo.Id].Host);
+ Assert.AreEqual(cfgBrPartInfo.Port, allBrokerInfo[cfgBrPartInfo.Id].Port);
+ }
+ }
+
+ [Test]
+ public void ZKBrokerPartitionInfoGetsBrokerPartitionInfo()
+ {
+ SortedSet<Partition> partitions;
+ using (var brokerPartitionInfo = new ZKBrokerPartitionInfo(zkConfig, null))
+ {
+ partitions = brokerPartitionInfo.GetBrokerPartitionInfo("test");
+ }
+
+ Assert.NotNull(partitions);
+ Assert.GreaterOrEqual(partitions.Count, 2);
+ var partition = partitions.ToList()[0];
+ Assert.AreEqual(0, partition.BrokerId);
+ }
+
+ [Test]
+ public void ZkBrokerPartitionInfoGetsBrokerInfo()
+ {
+ using (var brokerPartitionInfo = new ZKBrokerPartitionInfo(zkConfig, null))
+ {
+ var testBroker = clientConfig.BrokerPartitionInfos[0];
+ Broker broker = brokerPartitionInfo.GetBrokerInfo(testBroker.Id);
+ Assert.NotNull(broker);
+ Assert.AreEqual(testBroker.Address, broker.Host);
+ Assert.AreEqual(testBroker.Port, broker.Port);
+ }
+ }
+
+ [Test]
+ public void WhenNewTopicIsAddedBrokerTopicsListenerCreatesNewMapping()
+ {
+ var producerConfig = new ProducerConfig(clientConfig);
+ IDictionary<string, SortedSet<Partition>> mappings;
+ IDictionary<int, Broker> brokers;
+ string topicPath = ZooKeeperClient.DefaultBrokerTopicsPath + "/" + CurrentTestTopic;
+
+ using (IZooKeeperClient client = new ZooKeeperClient(
+ producerConfig.ZkConnect,
+ producerConfig.ZkSessionTimeoutMs,
+ ZooKeeperStringSerializer.Serializer))
+ {
+ using (var brokerPartitionInfo = new ZKBrokerPartitionInfo(client))
+ {
+ brokers = brokerPartitionInfo.GetAllBrokerInfo();
+ mappings =
+ ReflectionHelper.GetInstanceField<IDictionary<string, SortedSet<Partition>>>(
+ "topicBrokerPartitions", brokerPartitionInfo);
+ }
+ }
+
+ Assert.NotNull(brokers);
+ Assert.Greater(brokers.Count, 0);
+ Assert.NotNull(mappings);
+ Assert.Greater(mappings.Count, 0);
+ using (IZooKeeperClient client = new ZooKeeperClient(
+ producerConfig.ZkConnect,
+ producerConfig.ZkSessionTimeoutMs,
+ ZooKeeperStringSerializer.Serializer))
+ {
+ client.Connect();
+ WaitUntillIdle(client, 500);
+ var brokerTopicsListener = new BrokerTopicsListener(client, mappings, brokers, null);
+ client.Subscribe(ZooKeeperClient.DefaultBrokerTopicsPath, brokerTopicsListener);
+ client.CreatePersistent(topicPath, true);
+ WaitUntillIdle(client, 500);
+ client.UnsubscribeAll();
+ client.DeleteRecursive(topicPath);
+ }
+
+ Assert.IsTrue(mappings.ContainsKey(CurrentTestTopic));
+ }
+
+ [Test]
+ public void WhenNewBrokerIsAddedBrokerTopicsListenerUpdatesBrokersList()
+ {
+ var producerConfig = new ProducerConfig(clientConfig);
+ IDictionary<string, SortedSet<Partition>> mappings;
+ IDictionary<int, Broker> brokers;
+ string brokerPath = ZooKeeperClient.DefaultBrokerIdsPath + "/" + 2345;
+ using (IZooKeeperClient client = new ZooKeeperClient(
+ producerConfig.ZkConnect,
+ producerConfig.ZkSessionTimeoutMs,
+ ZooKeeperStringSerializer.Serializer))
+ {
+ using (var brokerPartitionInfo = new ZKBrokerPartitionInfo(client))
+ {
+ brokers = brokerPartitionInfo.GetAllBrokerInfo();
+ mappings =
+ ReflectionHelper.GetInstanceField<IDictionary<string, SortedSet<Partition>>>(
+ "topicBrokerPartitions", brokerPartitionInfo);
+ }
+ }
+
+ Assert.NotNull(brokers);
+ Assert.Greater(brokers.Count, 0);
+ Assert.NotNull(mappings);
+ Assert.Greater(mappings.Count, 0);
+ using (IZooKeeperClient client = new ZooKeeperClient(
+ producerConfig.ZkConnect,
+ producerConfig.ZkSessionTimeoutMs,
+ ZooKeeperStringSerializer.Serializer))
+ {
+ client.Connect();
+ WaitUntillIdle(client, 500);
+ var brokerTopicsListener = new BrokerTopicsListener(client, mappings, brokers, null);
+ client.Subscribe(ZooKeeperClient.DefaultBrokerIdsPath, brokerTopicsListener);
+ client.CreatePersistent(brokerPath, true);
+ client.WriteData(brokerPath, "192.168.1.39-1310449279123:192.168.1.39:9102");
+ WaitUntillIdle(client, 500);
+ client.UnsubscribeAll();
+ client.DeleteRecursive(brokerPath);
+ }
+
+ Assert.IsTrue(brokers.ContainsKey(2345));
+ Assert.AreEqual("192.168.1.39", brokers[2345].Host);
+ Assert.AreEqual(9102, brokers[2345].Port);
+ Assert.AreEqual(2345, brokers[2345].Id);
+ }
+
+ [Test]
+ public void WhenBrokerIsRemovedBrokerTopicsListenerUpdatesBrokersList()
+ {
+ var producerConfig = new ProducerConfig(clientConfig);
+ IDictionary<string, SortedSet<Partition>> mappings;
+ IDictionary<int, Broker> brokers;
+ string brokerPath = ZooKeeperClient.DefaultBrokerIdsPath + "/" + 2345;
+ using (IZooKeeperClient client = new ZooKeeperClient(
+ producerConfig.ZkConnect,
+ producerConfig.ZkSessionTimeoutMs,
+ ZooKeeperStringSerializer.Serializer))
+ {
+ using (var brokerPartitionInfo = new ZKBrokerPartitionInfo(client))
+ {
+ brokers = brokerPartitionInfo.GetAllBrokerInfo();
+ mappings =
+ ReflectionHelper.GetInstanceField<IDictionary<string, SortedSet<Partition>>>(
+ "topicBrokerPartitions", brokerPartitionInfo);
+ }
+ }
+
+ Assert.NotNull(brokers);
+ Assert.Greater(brokers.Count, 0);
+ Assert.NotNull(mappings);
+ Assert.Greater(mappings.Count, 0);
+ using (IZooKeeperClient client = new ZooKeeperClient(
+ producerConfig.ZkConnect,
+ producerConfig.ZkSessionTimeoutMs,
+ ZooKeeperStringSerializer.Serializer))
+ {
+ client.Connect();
+ WaitUntillIdle(client, 500);
+ var brokerTopicsListener = new BrokerTopicsListener(client, mappings, brokers, null);
+ client.Subscribe(ZooKeeperClient.DefaultBrokerIdsPath, brokerTopicsListener);
+ client.CreatePersistent(brokerPath, true);
+ client.WriteData(brokerPath, "192.168.1.39-1310449279123:192.168.1.39:9102");
+ WaitUntillIdle(client, 500);
+ Assert.IsTrue(brokers.ContainsKey(2345));
+ client.DeleteRecursive(brokerPath);
+ WaitUntillIdle(client, 500);
+ Assert.IsFalse(brokers.ContainsKey(2345));
+ }
+ }
+
+ [Test]
+ public void WhenNewBrokerInTopicIsAddedBrokerTopicsListenerUpdatesMappings()
+ {
+ var producerConfig = new ProducerConfig(clientConfig);
+ IDictionary<string, SortedSet<Partition>> mappings;
+ IDictionary<int, Broker> brokers;
+ string brokerPath = ZooKeeperClient.DefaultBrokerIdsPath + "/" + 2345;
+ string topicPath = ZooKeeperClient.DefaultBrokerTopicsPath + "/" + CurrentTestTopic;
+ string topicBrokerPath = topicPath + "/" + 2345;
+ using (IZooKeeperClient client = new ZooKeeperClient(
+ producerConfig.ZkConnect,
+ producerConfig.ZkSessionTimeoutMs,
+ ZooKeeperStringSerializer.Serializer))
+ {
+ using (var brokerPartitionInfo = new ZKBrokerPartitionInfo(client))
+ {
+ brokers = brokerPartitionInfo.GetAllBrokerInfo();
+ mappings =
+ ReflectionHelper.GetInstanceField<IDictionary<string, SortedSet<Partition>>>(
+ "topicBrokerPartitions", brokerPartitionInfo);
+ }
+ }
+
+ Assert.NotNull(brokers);
+ Assert.Greater(brokers.Count, 0);
+ Assert.NotNull(mappings);
+ Assert.Greater(mappings.Count, 0);
+ using (IZooKeeperClient client = new ZooKeeperClient(
+ producerConfig.ZkConnect,
+ producerConfig.ZkSessionTimeoutMs,
+ ZooKeeperStringSerializer.Serializer))
+ {
+ client.Connect();
+ WaitUntillIdle(client, 500);
+ var brokerTopicsListener = new BrokerTopicsListener(client, mappings, brokers, null);
+ client.Subscribe(ZooKeeperClient.DefaultBrokerIdsPath, brokerTopicsListener);
+ client.Subscribe(ZooKeeperClient.DefaultBrokerTopicsPath, brokerTopicsListener);
+ client.CreatePersistent(brokerPath, true);
+ client.WriteData(brokerPath, "192.168.1.39-1310449279123:192.168.1.39:9102");
+ client.CreatePersistent(topicPath, true);
+ WaitUntillIdle(client, 500);
+ Assert.IsTrue(brokers.ContainsKey(2345));
+ Assert.IsTrue(mappings.ContainsKey(CurrentTestTopic));
+ client.CreatePersistent(topicBrokerPath, true);
+ client.WriteData(topicBrokerPath, 5);
+ WaitUntillIdle(client, 500);
+ client.UnsubscribeAll();
+ client.DeleteRecursive(brokerPath);
+ client.DeleteRecursive(topicPath);
+ }
+
+ Assert.IsTrue(brokers.ContainsKey(2345));
+ Assert.IsTrue(mappings.Keys.Contains(CurrentTestTopic));
+ Assert.AreEqual(5, mappings[CurrentTestTopic].Count);
+ }
+
+ [Test]
+ public void WhenSessionIsExpiredListenerRecreatesEphemeralNodes()
+ {
+ {
+ var producerConfig = new ProducerConfig(clientConfig);
+ IDictionary<string, SortedSet<Partition>> mappings;
+ IDictionary<int, Broker> brokers;
+ IDictionary<string, SortedSet<Partition>> mappings2;
+ IDictionary<int, Broker> brokers2;
+ using (IZooKeeperClient client = new ZooKeeperClient(
+ producerConfig.ZkConnect,
+ producerConfig.ZkSessionTimeoutMs,
+ ZooKeeperStringSerializer.Serializer))
+ {
+ using (var brokerPartitionInfo = new ZKBrokerPartitionInfo(client))
+ {
+ brokers = brokerPartitionInfo.GetAllBrokerInfo();
+ mappings =
+ ReflectionHelper.GetInstanceField<IDictionary<string, SortedSet<Partition>>>(
+ "topicBrokerPartitions", brokerPartitionInfo);
+ Assert.NotNull(brokers);
+ Assert.Greater(brokers.Count, 0);
+ Assert.NotNull(mappings);
+ Assert.Greater(mappings.Count, 0);
+ client.Process(new WatchedEvent(KeeperState.Expired, EventType.None, null));
+ WaitUntillIdle(client, 3000);
+ brokers2 = brokerPartitionInfo.GetAllBrokerInfo();
+ mappings2 =
+ ReflectionHelper.GetInstanceField<IDictionary<string, SortedSet<Partition>>>(
+ "topicBrokerPartitions", brokerPartitionInfo);
+ }
+ }
+
+ Assert.NotNull(brokers2);
+ Assert.Greater(brokers2.Count, 0);
+ Assert.NotNull(mappings2);
+ Assert.Greater(mappings2.Count, 0);
+ Assert.AreEqual(brokers.Count, brokers2.Count);
+ Assert.AreEqual(mappings.Count, mappings2.Count);
+ }
+ }
+
+ [Test]
+ public void WhenNewTopicIsAddedZKBrokerPartitionInfoUpdatesMappings()
+ {
+ var producerConfig = new ProducerConfig(clientConfig);
+ IDictionary<string, SortedSet<Partition>> mappings;
+ string topicPath = ZooKeeperClient.DefaultBrokerTopicsPath + "/" + CurrentTestTopic;
+
+ using (IZooKeeperClient client = new ZooKeeperClient(
+ producerConfig.ZkConnect,
+ producerConfig.ZkSessionTimeoutMs,
+ ZooKeeperStringSerializer.Serializer))
+ {
+ using (var brokerPartitionInfo = new ZKBrokerPartitionInfo(client))
+ {
+ mappings =
+ ReflectionHelper.GetInstanceField<IDictionary<string, SortedSet<Partition>>>(
+ "topicBrokerPartitions", brokerPartitionInfo);
+ client.CreatePersistent(topicPath, true);
+ WaitUntillIdle(client, 500);
+ client.UnsubscribeAll();
+ client.DeleteRecursive(topicPath);
+ }
+ }
+
+ Assert.NotNull(mappings);
+ Assert.Greater(mappings.Count, 0);
+ Assert.IsTrue(mappings.ContainsKey(CurrentTestTopic));
+ }
+
+ [Test]
+ public void WhenNewBrokerIsAddedZKBrokerPartitionInfoUpdatesBrokersList()
+ {
+ var producerConfig = new ProducerConfig(clientConfig);
+ IDictionary<int, Broker> brokers;
+ string brokerPath = ZooKeeperClient.DefaultBrokerIdsPath + "/" + 2345;
+ using (IZooKeeperClient client = new ZooKeeperClient(
+ producerConfig.ZkConnect,
+ producerConfig.ZkSessionTimeoutMs,
+ ZooKeeperStringSerializer.Serializer))
+ {
+ using (var brokerPartitionInfo = new ZKBrokerPartitionInfo(client))
+ {
+ brokers = brokerPartitionInfo.GetAllBrokerInfo();
+ client.CreatePersistent(brokerPath, true);
+ client.WriteData(brokerPath, "192.168.1.39-1310449279123:192.168.1.39:9102");
+ WaitUntillIdle(client, 500);
+ client.UnsubscribeAll();
+ client.DeleteRecursive(brokerPath);
+ }
+ }
+
+ Assert.NotNull(brokers);
+ Assert.Greater(brokers.Count, 0);
+ Assert.IsTrue(brokers.ContainsKey(2345));
+ Assert.AreEqual("192.168.1.39", brokers[2345].Host);
+ Assert.AreEqual(9102, brokers[2345].Port);
+ Assert.AreEqual(2345, brokers[2345].Id);
+ }
+
+ [Test]
+ public void WhenBrokerIsRemovedZKBrokerPartitionInfoUpdatesBrokersList()
+ {
+ var producerConfig = new ProducerConfig(clientConfig);
+ IDictionary<int, Broker> brokers;
+ string brokerPath = ZooKeeperClient.DefaultBrokerIdsPath + "/" + 2345;
+ using (IZooKeeperClient client = new ZooKeeperClient(
+ producerConfig.ZkConnect,
+ producerConfig.ZkSessionTimeoutMs,
+ ZooKeeperStringSerializer.Serializer))
+ {
+ using (var brokerPartitionInfo = new ZKBrokerPartitionInfo(client))
+ {
+ brokers = brokerPartitionInfo.GetAllBrokerInfo();
+ client.CreatePersistent(brokerPath, true);
+ client.WriteData(brokerPath, "192.168.1.39-1310449279123:192.168.1.39:9102");
+ WaitUntillIdle(client, 500);
+ Assert.NotNull(brokers);
+ Assert.Greater(brokers.Count, 0);
+ Assert.IsTrue(brokers.ContainsKey(2345));
+ client.DeleteRecursive(brokerPath);
+ WaitUntillIdle(client, 500);
+ }
+ }
+
+ Assert.NotNull(brokers);
+ Assert.Greater(brokers.Count, 0);
+ Assert.IsFalse(brokers.ContainsKey(2345));
+ }
+
+ [Test]
+ public void WhenNewBrokerInTopicIsAddedZKBrokerPartitionInfoUpdatesMappings()
+ {
+ var producerConfig = new ProducerConfig(clientConfig);
+ IDictionary<string, SortedSet<Partition>> mappings;
+ IDictionary<int, Broker> brokers;
+ string brokerPath = ZooKeeperClient.DefaultBrokerIdsPath + "/" + 2345;
+ string topicPath = ZooKeeperClient.DefaultBrokerTopicsPath + "/" + CurrentTestTopic;
+ string topicBrokerPath = topicPath + "/" + 2345;
+ using (IZooKeeperClient client = new ZooKeeperClient(
+ producerConfig.ZkConnect,
+ producerConfig.ZkSessionTimeoutMs,
+ ZooKeeperStringSerializer.Serializer))
+ {
+ using (var brokerPartitionInfo = new ZKBrokerPartitionInfo(client))
+ {
+ brokers = brokerPartitionInfo.GetAllBrokerInfo();
+ mappings =
+ ReflectionHelper.GetInstanceField<IDictionary<string, SortedSet<Partition>>>(
+ "topicBrokerPartitions", brokerPartitionInfo);
+ client.CreatePersistent(brokerPath, true);
+ client.WriteData(brokerPath, "192.168.1.39-1310449279123:192.168.1.39:9102");
+ client.CreatePersistent(topicPath, true);
+ WaitUntillIdle(client, 500);
+ Assert.IsTrue(brokers.ContainsKey(2345));
+ Assert.IsTrue(mappings.ContainsKey(CurrentTestTopic));
+ client.CreatePersistent(topicBrokerPath, true);
+ client.WriteData(topicBrokerPath, 5);
+ WaitUntillIdle(client, 500);
+ client.UnsubscribeAll();
+ client.DeleteRecursive(brokerPath);
+ client.DeleteRecursive(topicPath);
+ }
+ }
+
+ Assert.NotNull(brokers);
+ Assert.Greater(brokers.Count, 0);
+ Assert.NotNull(mappings);
+ Assert.Greater(mappings.Count, 0);
+ Assert.IsTrue(brokers.ContainsKey(2345));
+ Assert.IsTrue(mappings.Keys.Contains(CurrentTestTopic));
+ Assert.AreEqual(5, mappings[CurrentTestTopic].Count);
+ }
+ }
+}
Added: incubator/kafka/trunk/clients/csharp/src/Kafka/Tests/Kafka.Client.IntegrationTests/ZooKeeperAwareProducerTests.cs
URL: http://svn.apache.org/viewvc/incubator/kafka/trunk/clients/csharp/src/Kafka/Tests/Kafka.Client.IntegrationTests/ZooKeeperAwareProducerTests.cs?rev=1173797&view=auto
==============================================================================
--- incubator/kafka/trunk/clients/csharp/src/Kafka/Tests/Kafka.Client.IntegrationTests/ZooKeeperAwareProducerTests.cs (added)
+++ incubator/kafka/trunk/clients/csharp/src/Kafka/Tests/Kafka.Client.IntegrationTests/ZooKeeperAwareProducerTests.cs Wed Sep 21 19:17:19 2011
@@ -0,0 +1,238 @@
+/*
+ * Copyright 2011 LinkedIn
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+*/
+
+namespace Kafka.Client.IntegrationTests
+{
+ using System.Collections.Generic;
+ using System.Linq;
+ using System.Text;
+ using System.Threading;
+ using Kafka.Client.Cfg;
+ using Kafka.Client.Consumers;
+ using Kafka.Client.Messages;
+ using Kafka.Client.Producers;
+ using Kafka.Client.Requests;
+ using Kafka.Client.Serialization;
+ using NUnit.Framework;
+
+ [TestFixture]
+ public class ZooKeeperAwareProducerTests : IntegrationFixtureBase
+ {
+ /// <summary>
+ /// Kafka Client configuration
+ /// </summary>
+ private KafkaClientConfiguration clientConfig;
+
+ /// <summary>
+ /// Maximum amount of time to wait trying to get a specific test message from Kafka server (in miliseconds)
+ /// </summary>
+ private readonly int MaxTestWaitTimeInMiliseconds = 5000;
+
+ [TestFixtureSetUp]
+ public void SetUp()
+ {
+ clientConfig = KafkaClientConfiguration.GetConfiguration();
+ }
+
+ [Test]
+ public void ZKAwareProducerSends1Message()
+ {
+ int totalWaitTimeInMiliseconds = 0;
+ int waitSingle = 100;
+ var originalMessage = new Message(Encoding.UTF8.GetBytes("TestData"));
+
+ var multipleBrokersHelper = new TestMultipleBrokersHelper(CurrentTestTopic);
+ multipleBrokersHelper.GetCurrentOffsets();
+
+ var producerConfig = new ProducerConfig(clientConfig);
+ var mockPartitioner = new MockAlwaysZeroPartitioner();
+ using (var producer = new Producer<string, Message>(producerConfig, mockPartitioner, new DefaultEncoder()))
+ {
+ var producerData = new ProducerData<string, Message>(
+ CurrentTestTopic, "somekey", new List<Message>() { originalMessage });
+ producer.Send(producerData);
+
+ while (!multipleBrokersHelper.CheckIfAnyBrokerHasChanged())
+ {
+ totalWaitTimeInMiliseconds += waitSingle;
+ Thread.Sleep(waitSingle);
+ if (totalWaitTimeInMiliseconds > MaxTestWaitTimeInMiliseconds)
+ {
+ Assert.Fail("None of the brokers changed their offset after sending a message");
+ }
+ }
+
+ totalWaitTimeInMiliseconds = 0;
+
+ var consumerConfig = new ConsumerConfig(clientConfig)
+ {
+ Host = multipleBrokersHelper.BrokerThatHasChanged.Address,
+ Port = multipleBrokersHelper.BrokerThatHasChanged.Port
+ };
+ IConsumer consumer = new Consumers.Consumer(consumerConfig);
+ var request = new FetchRequest(CurrentTestTopic, 0, multipleBrokersHelper.OffsetFromBeforeTheChange);
+
+ BufferedMessageSet response;
+
+ while (true)
+ {
+ Thread.Sleep(waitSingle);
+ response = consumer.Fetch(request);
+ if (response != null & response.Messages.Count() > 0)
+ {
+ break;
+ }
+
+ totalWaitTimeInMiliseconds += waitSingle;
+ if (totalWaitTimeInMiliseconds >= MaxTestWaitTimeInMiliseconds)
+ {
+ break;
+ }
+ }
+
+ Assert.NotNull(response);
+ Assert.AreEqual(1, response.Messages.Count());
+ Assert.AreEqual(originalMessage.ToString(), response.Messages.First().ToString());
+ }
+ }
+
+ [Test]
+ public void ZkAwareProducerSends3Messages()
+ {
+ int totalWaitTimeInMiliseconds = 0;
+ int waitSingle = 100;
+ var originalMessage1 = new Message(Encoding.UTF8.GetBytes("TestData1"));
+ var originalMessage2 = new Message(Encoding.UTF8.GetBytes("TestData2"));
+ var originalMessage3 = new Message(Encoding.UTF8.GetBytes("TestData3"));
+ var originalMessageList = new List<Message> { originalMessage1, originalMessage2, originalMessage3 };
+
+ var multipleBrokersHelper = new TestMultipleBrokersHelper(CurrentTestTopic);
+ multipleBrokersHelper.GetCurrentOffsets();
+
+ var producerConfig = new ProducerConfig(clientConfig);
+ var mockPartitioner = new MockAlwaysZeroPartitioner();
+ using (var producer = new Producer<string, Message>(producerConfig, mockPartitioner, new DefaultEncoder()))
+ {
+ var producerData = new ProducerData<string, Message>(CurrentTestTopic, "somekey", originalMessageList);
+ producer.Send(producerData);
+
+ while (!multipleBrokersHelper.CheckIfAnyBrokerHasChanged())
+ {
+ totalWaitTimeInMiliseconds += waitSingle;
+ Thread.Sleep(waitSingle);
+ if (totalWaitTimeInMiliseconds > MaxTestWaitTimeInMiliseconds)
+ {
+ Assert.Fail("None of the brokers changed their offset after sending a message");
+ }
+ }
+
+ totalWaitTimeInMiliseconds = 0;
+
+ var consumerConfig = new ConsumerConfig(clientConfig)
+ {
+ Host = multipleBrokersHelper.BrokerThatHasChanged.Address,
+ Port = multipleBrokersHelper.BrokerThatHasChanged.Port
+ };
+ IConsumer consumer = new Consumers.Consumer(consumerConfig);
+ var request = new FetchRequest(CurrentTestTopic, 0, multipleBrokersHelper.OffsetFromBeforeTheChange);
+ BufferedMessageSet response;
+
+ while (true)
+ {
+ Thread.Sleep(waitSingle);
+ response = consumer.Fetch(request);
+ if (response != null && response.Messages.Count() > 2)
+ {
+ break;
+ }
+
+ totalWaitTimeInMiliseconds += waitSingle;
+ if (totalWaitTimeInMiliseconds >= MaxTestWaitTimeInMiliseconds)
+ {
+ break;
+ }
+ }
+
+ Assert.NotNull(response);
+ Assert.AreEqual(3, response.Messages.Count());
+ Assert.AreEqual(originalMessage1.ToString(), response.Messages.First().ToString());
+ Assert.AreEqual(originalMessage2.ToString(), response.Messages.Skip(1).First().ToString());
+ Assert.AreEqual(originalMessage3.ToString(), response.Messages.Skip(2).First().ToString());
+ }
+ }
+
+ [Test]
+ public void ZkAwareProducerSends1MessageUsingNotDefaultEncoder()
+ {
+ int totalWaitTimeInMiliseconds = 0;
+ int waitSingle = 100;
+ string originalMessage = "TestData";
+
+ var multipleBrokersHelper = new TestMultipleBrokersHelper(CurrentTestTopic);
+ multipleBrokersHelper.GetCurrentOffsets();
+
+ var producerConfig = new ProducerConfig(clientConfig);
+ var mockPartitioner = new MockAlwaysZeroPartitioner();
+ using (var producer = new Producer<string, string>(producerConfig, mockPartitioner, new StringEncoder(), null))
+ {
+ var producerData = new ProducerData<string, string>(
+ CurrentTestTopic, "somekey", new List<string> { originalMessage });
+ producer.Send(producerData);
+
+ while (!multipleBrokersHelper.CheckIfAnyBrokerHasChanged())
+ {
+ totalWaitTimeInMiliseconds += waitSingle;
+ Thread.Sleep(waitSingle);
+ if (totalWaitTimeInMiliseconds > MaxTestWaitTimeInMiliseconds)
+ {
+ Assert.Fail("None of the brokers changed their offset after sending a message");
+ }
+ }
+
+ totalWaitTimeInMiliseconds = 0;
+
+ var consumerConfig = new ConsumerConfig(clientConfig)
+ {
+ Host = multipleBrokersHelper.BrokerThatHasChanged.Address,
+ Port = multipleBrokersHelper.BrokerThatHasChanged.Port
+ };
+ IConsumer consumer = new Consumers.Consumer(consumerConfig);
+ var request = new FetchRequest(CurrentTestTopic, 0, multipleBrokersHelper.OffsetFromBeforeTheChange);
+ BufferedMessageSet response;
+
+ while (true)
+ {
+ Thread.Sleep(waitSingle);
+ response = consumer.Fetch(request);
+ if (response != null && response.Messages.Count() > 0)
+ {
+ break;
+ }
+
+ totalWaitTimeInMiliseconds += waitSingle;
+ if (totalWaitTimeInMiliseconds >= MaxTestWaitTimeInMiliseconds)
+ {
+ break;
+ }
+ }
+
+ Assert.NotNull(response);
+ Assert.AreEqual(1, response.Messages.Count());
+ Assert.AreEqual(originalMessage, Encoding.UTF8.GetString(response.Messages.First().Payload));
+ }
+ }
+ }
+}
Added: incubator/kafka/trunk/clients/csharp/src/Kafka/Tests/Kafka.Client.IntegrationTests/ZooKeeperClientTests.cs
URL: http://svn.apache.org/viewvc/incubator/kafka/trunk/clients/csharp/src/Kafka/Tests/Kafka.Client.IntegrationTests/ZooKeeperClientTests.cs?rev=1173797&view=auto
==============================================================================
--- incubator/kafka/trunk/clients/csharp/src/Kafka/Tests/Kafka.Client.IntegrationTests/ZooKeeperClientTests.cs (added)
+++ incubator/kafka/trunk/clients/csharp/src/Kafka/Tests/Kafka.Client.IntegrationTests/ZooKeeperClientTests.cs Wed Sep 21 19:17:19 2011
@@ -0,0 +1,365 @@
+/*
+ * Copyright 2011 LinkedIn
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+*/
+
+namespace Kafka.Client.IntegrationTests
+{
+ using System;
+ using System.Collections.Generic;
+ using System.Reflection;
+ using System.Threading;
+ using Kafka.Client.Cfg;
+ using Kafka.Client.Exceptions;
+ using Kafka.Client.Utils;
+ using Kafka.Client.ZooKeeperIntegration;
+ using Kafka.Client.ZooKeeperIntegration.Events;
+ using Kafka.Client.ZooKeeperIntegration.Listeners;
+ using log4net;
+ using NUnit.Framework;
+ using ZooKeeperNet;
+
+ [TestFixture]
+ internal class ZooKeeperClientTests : IntegrationFixtureBase, IZooKeeperDataListener, IZooKeeperStateListener, IZooKeeperChildListener
+ {
+ private static readonly ILog Logger = LogManager.GetLogger(MethodBase.GetCurrentMethod().DeclaringType);
+
+ private KafkaClientConfiguration clientConfig;
+ private readonly IList<ZooKeeperEventArgs> events = new List<ZooKeeperEventArgs>();
+
+ [TestFixtureSetUp]
+ public void SetUp()
+ {
+ clientConfig = KafkaClientConfiguration.GetConfiguration();
+ }
+
+ [SetUp]
+ public void TestSetup()
+ {
+ this.events.Clear();
+ }
+
+ [Test]
+ public void ZooKeeperClientCreateWorkerThreadsOnBeingCreated()
+ {
+ var producerConfig = new ProducerConfig(clientConfig);
+ using (IZooKeeperClient client = new ZooKeeperClient(producerConfig.ZkConnect, producerConfig.ZkSessionTimeoutMs, ZooKeeperStringSerializer.Serializer))
+ {
+ client.Connect();
+ var eventWorker = ReflectionHelper.GetInstanceField<Thread>("eventWorker", client);
+ var zooKeeperWorker = ReflectionHelper.GetInstanceField<Thread>("zooKeeperEventWorker", client);
+ Assert.NotNull(eventWorker);
+ Assert.NotNull(zooKeeperWorker);
+ }
+ }
+
+ [Test]
+ public void ZooKeeperClientFailsWhenCreatedWithWrongConnectionInfo()
+ {
+ var producerConfig = new ProducerConfig(clientConfig);
+ using (IZooKeeperClient client = new ZooKeeperClient("random text", producerConfig.ZkSessionTimeoutMs, ZooKeeperStringSerializer.Serializer))
+ {
+ Assert.Throws<FormatException>(client.Connect);
+ }
+ }
+
+ [Test]
+ public void WhenStateChangedToConnectedStateListenerFires()
+ {
+ var producerConfig = new ProducerConfig(clientConfig);
+ using (IZooKeeperClient client = new ZooKeeperClient(producerConfig.ZkConnect, producerConfig.ZkSessionTimeoutMs, ZooKeeperStringSerializer.Serializer))
+ {
+ client.Subscribe(this);
+ client.Connect();
+ WaitUntillIdle(client, 500);
+ }
+
+ Assert.AreEqual(1, this.events.Count);
+ ZooKeeperEventArgs e = this.events[0];
+ Assert.AreEqual(ZooKeeperEventTypes.StateChanged, e.Type);
+ Assert.IsInstanceOf<ZooKeeperStateChangedEventArgs>(e);
+ Assert.AreEqual(((ZooKeeperStateChangedEventArgs)e).State, KeeperState.SyncConnected);
+ }
+
+ [Test]
+ public void WhenStateChangedToDisconnectedStateListenerFires()
+ {
+ var producerConfig = new ProducerConfig(clientConfig);
+ using (IZooKeeperClient client = new ZooKeeperClient(producerConfig.ZkConnect, producerConfig.ZkSessionTimeoutMs, ZooKeeperStringSerializer.Serializer))
+ {
+ client.Subscribe(this);
+ client.Connect();
+ WaitUntillIdle(client, 500);
+ client.Process(new WatchedEvent(KeeperState.Disconnected, EventType.None, null));
+ WaitUntillIdle(client, 500);
+ }
+
+ Assert.AreEqual(2, this.events.Count);
+ ZooKeeperEventArgs e = this.events[1];
+ Assert.AreEqual(ZooKeeperEventTypes.StateChanged, e.Type);
+ Assert.IsInstanceOf<ZooKeeperStateChangedEventArgs>(e);
+ Assert.AreEqual(((ZooKeeperStateChangedEventArgs)e).State, KeeperState.Disconnected);
+ }
+
+ [Test]
+ public void WhenStateChangedToExpiredStateAndSessionListenersFire()
+ {
+ var producerConfig = new ProducerConfig(clientConfig);
+ using (IZooKeeperClient client = new ZooKeeperClient(producerConfig.ZkConnect, producerConfig.ZkSessionTimeoutMs, ZooKeeperStringSerializer.Serializer))
+ {
+ client.Subscribe(this);
+ client.Connect();
+ WaitUntillIdle(client, 500);
+ client.Process(new WatchedEvent(KeeperState.Expired, EventType.None, null));
+ WaitUntillIdle(client, 3000);
+ }
+
+ Assert.AreEqual(4, this.events.Count);
+ ZooKeeperEventArgs e = this.events[1];
+ Assert.AreEqual(ZooKeeperEventTypes.StateChanged, e.Type);
+ Assert.IsInstanceOf<ZooKeeperStateChangedEventArgs>(e);
+ Assert.AreEqual(((ZooKeeperStateChangedEventArgs)e).State, KeeperState.Expired);
+ e = this.events[2];
+ Assert.AreEqual(ZooKeeperEventTypes.SessionCreated, e.Type);
+ Assert.IsInstanceOf<ZooKeeperSessionCreatedEventArgs>(e);
+ e = this.events[3];
+ Assert.AreEqual(ZooKeeperEventTypes.StateChanged, e.Type);
+ Assert.IsInstanceOf<ZooKeeperStateChangedEventArgs>(e);
+ Assert.AreEqual(((ZooKeeperStateChangedEventArgs)e).State, KeeperState.SyncConnected);
+ }
+
+ [Test]
+ public void WhenSessionExpiredClientReconnects()
+ {
+ var producerConfig = new ProducerConfig(clientConfig);
+ IZooKeeperConnection conn1;
+ IZooKeeperConnection conn2;
+ using (IZooKeeperClient client = new ZooKeeperClient(producerConfig.ZkConnect, producerConfig.ZkSessionTimeoutMs, ZooKeeperStringSerializer.Serializer))
+ {
+ client.Connect();
+ conn1 = ReflectionHelper.GetInstanceField<ZooKeeperConnection>("connection", client);
+ client.Process(new WatchedEvent(KeeperState.Expired, EventType.None, null));
+ WaitUntillIdle(client, 1000);
+ conn2 = ReflectionHelper.GetInstanceField<ZooKeeperConnection>("connection", client);
+ }
+
+ Assert.AreNotEqual(conn1, conn2);
+ }
+
+ [Test]
+ public void ZooKeeperClientChecksIfPathExists()
+ {
+ var producerConfig = new ProducerConfig(clientConfig);
+ using (IZooKeeperClient client = new ZooKeeperClient(producerConfig.ZkConnect, producerConfig.ZkSessionTimeoutMs, ZooKeeperStringSerializer.Serializer))
+ {
+ client.Connect();
+ Assert.IsTrue(client.Exists(ZooKeeperClient.DefaultBrokerTopicsPath, false));
+ }
+ }
+
+ [Test]
+ public void ZooKeeperClientCreatesANewPathAndDeletesIt()
+ {
+ var producerConfig = new ProducerConfig(clientConfig);
+ using (IZooKeeperClient client = new ZooKeeperClient(producerConfig.ZkConnect, producerConfig.ZkSessionTimeoutMs, ZooKeeperStringSerializer.Serializer))
+ {
+ client.Connect();
+ string myPath = "/" + Guid.NewGuid();
+ client.CreatePersistent(myPath, false);
+ Assert.IsTrue(client.Exists(myPath));
+ client.Delete(myPath);
+ Assert.IsFalse(client.Exists(myPath));
+ }
+ }
+
+ [Test]
+ public void WhenChildIsCreatedChilListenerOnParentFires()
+ {
+ string myPath = "/" + Guid.NewGuid();
+ var producerConfig = new ProducerConfig(clientConfig);
+ using (IZooKeeperClient client = new ZooKeeperClient(producerConfig.ZkConnect, producerConfig.ZkSessionTimeoutMs, ZooKeeperStringSerializer.Serializer))
+ {
+ client.Connect();
+ WaitUntillIdle(client, 500);
+ client.Subscribe("/", this as IZooKeeperChildListener);
+ client.CreatePersistent(myPath, true);
+ WaitUntillIdle(client, 500);
+ client.UnsubscribeAll();
+ client.Delete(myPath);
+ }
+
+ Assert.AreEqual(1, this.events.Count);
+ ZooKeeperEventArgs e = this.events[0];
+ Assert.AreEqual(ZooKeeperEventTypes.ChildChanged, e.Type);
+ Assert.IsInstanceOf<ZooKeeperChildChangedEventArgs>(e);
+ Assert.AreEqual(((ZooKeeperChildChangedEventArgs)e).Path, "/");
+ Assert.Greater(((ZooKeeperChildChangedEventArgs)e).Children.Count, 0);
+ Assert.IsTrue(((ZooKeeperChildChangedEventArgs)e).Children.Contains(myPath.Replace("/", string.Empty)));
+ }
+
+ [Test]
+ public void WhenChildIsDeletedChildListenerOnParentFires()
+ {
+ string myPath = "/" + Guid.NewGuid();
+ var producerConfig = new ProducerConfig(clientConfig);
+ using (IZooKeeperClient client = new ZooKeeperClient(producerConfig.ZkConnect, producerConfig.ZkSessionTimeoutMs, ZooKeeperStringSerializer.Serializer))
+ {
+ client.Connect();
+ client.CreatePersistent(myPath, true);
+ WaitUntillIdle(client, 500);
+ client.Subscribe("/", this as IZooKeeperChildListener);
+ client.Delete(myPath);
+ WaitUntillIdle(client, 500);
+ }
+
+ Assert.AreEqual(1, this.events.Count);
+ ZooKeeperEventArgs e = this.events[0];
+ Assert.AreEqual(ZooKeeperEventTypes.ChildChanged, e.Type);
+ Assert.IsInstanceOf<ZooKeeperChildChangedEventArgs>(e);
+ Assert.AreEqual(((ZooKeeperChildChangedEventArgs)e).Path, "/");
+ Assert.Greater(((ZooKeeperChildChangedEventArgs)e).Children.Count, 0);
+ Assert.IsFalse(((ZooKeeperChildChangedEventArgs)e).Children.Contains(myPath.Replace("/", string.Empty)));
+ }
+
+ [Test]
+ public void WhenZNodeIsDeletedChildAndDataDeletedListenersFire()
+ {
+ var producerConfig = new ProducerConfig(clientConfig);
+ string myPath = "/" + Guid.NewGuid();
+ using (IZooKeeperClient client = new ZooKeeperClient(producerConfig.ZkConnect, producerConfig.ZkSessionTimeoutMs, ZooKeeperStringSerializer.Serializer))
+ {
+ client.Connect();
+ client.CreatePersistent(myPath, true);
+ WaitUntillIdle(client, 500);
+ client.Subscribe(myPath, this as IZooKeeperChildListener);
+ client.Subscribe(myPath, this as IZooKeeperDataListener);
+ client.Delete(myPath);
+ WaitUntillIdle(client, 500);
+ }
+
+ Assert.AreEqual(2, this.events.Count);
+ ZooKeeperEventArgs e = this.events[0];
+ Assert.AreEqual(ZooKeeperEventTypes.ChildChanged, e.Type);
+ Assert.IsInstanceOf<ZooKeeperChildChangedEventArgs>(e);
+ Assert.AreEqual(((ZooKeeperChildChangedEventArgs)e).Path, myPath);
+ Assert.IsNull(((ZooKeeperChildChangedEventArgs)e).Children);
+ e = this.events[1];
+ Assert.AreEqual(ZooKeeperEventTypes.DataChanged, e.Type);
+ Assert.IsInstanceOf<ZooKeeperDataChangedEventArgs>(e);
+ Assert.AreEqual(((ZooKeeperDataChangedEventArgs)e).Path, myPath);
+ Assert.IsNull(((ZooKeeperDataChangedEventArgs)e).Data);
+ }
+
+ [Test]
+ public void ZooKeeperClientCreatesAChildAndGetsChildren()
+ {
+ var producerConfig = new ProducerConfig(clientConfig);
+ using (IZooKeeperClient client = new ZooKeeperClient(producerConfig.ZkConnect, producerConfig.ZkSessionTimeoutMs, ZooKeeperStringSerializer.Serializer))
+ {
+ client.Connect();
+ string child = Guid.NewGuid().ToString();
+ string myPath = "/" + child;
+ client.CreatePersistent(myPath, false);
+ IList<string> children = client.GetChildren("/", false);
+ int countChildren = client.CountChildren("/");
+ Assert.Greater(children.Count, 0);
+ Assert.AreEqual(children.Count, countChildren);
+ Assert.IsTrue(children.Contains(child));
+ client.Delete(myPath);
+ }
+ }
+
+ [Test]
+ public void WhenDataChangedDataListenerFires()
+ {
+ var producerConfig = new ProducerConfig(clientConfig);
+ string myPath = "/" + Guid.NewGuid();
+ string sourceData = "my test data";
+ string resultData;
+ using (IZooKeeperClient client = new ZooKeeperClient(producerConfig.ZkConnect, producerConfig.ZkSessionTimeoutMs, ZooKeeperStringSerializer.Serializer))
+ {
+ client.Connect();
+ client.CreatePersistent(myPath, true);
+ WaitUntillIdle(client, 500);
+ client.Subscribe(myPath, this as IZooKeeperDataListener);
+ client.Subscribe(myPath, this as IZooKeeperChildListener);
+ client.WriteData(myPath, sourceData);
+ WaitUntillIdle(client, 500);
+ client.UnsubscribeAll();
+ resultData = client.ReadData<string>(myPath);
+ client.Delete(myPath);
+ }
+
+ Assert.IsTrue(!string.IsNullOrEmpty(resultData));
+ Assert.AreEqual(sourceData, resultData);
+ Assert.AreEqual(1, this.events.Count);
+ ZooKeeperEventArgs e = this.events[0];
+ Assert.AreEqual(ZooKeeperEventTypes.DataChanged, e.Type);
+ Assert.IsInstanceOf<ZooKeeperDataChangedEventArgs>(e);
+ Assert.AreEqual(((ZooKeeperDataChangedEventArgs)e).Path, myPath);
+ Assert.IsNotNull(((ZooKeeperDataChangedEventArgs)e).Data);
+ Assert.AreEqual(((ZooKeeperDataChangedEventArgs)e).Data, sourceData);
+ }
+
+ [Test]
+ [ExpectedException(typeof(ZooKeeperException))]
+ public void WhenClientWillNotConnectWithinGivenTimeThrows()
+ {
+ var producerConfig = new ProducerConfig(clientConfig);
+ using (IZooKeeperClient client =
+ new ZooKeeperClient(
+ producerConfig.ZkConnect,
+ producerConfig.ZkSessionTimeoutMs,
+ ZooKeeperStringSerializer.Serializer,
+ 1))
+ {
+ client.Connect();
+ }
+ }
+
+ public void HandleDataChange(ZooKeeperDataChangedEventArgs args)
+ {
+ Logger.Debug(args + " reach test event handler");
+ this.events.Add(args);
+ }
+
+ public void HandleDataDelete(ZooKeeperDataChangedEventArgs args)
+ {
+ Logger.Debug(args + " reach test event handler");
+ this.events.Add(args);
+ }
+
+ public void HandleStateChanged(ZooKeeperStateChangedEventArgs args)
+ {
+ Logger.Debug(args + " reach test event handler");
+ this.events.Add(args);
+ }
+
+ public void HandleSessionCreated(ZooKeeperSessionCreatedEventArgs args)
+ {
+ Logger.Debug(args + " reach test event handler");
+ this.events.Add(args);
+ }
+
+ public void HandleChildChange(ZooKeeperChildChangedEventArgs args)
+ {
+ Logger.Debug(args + " reach test event handler");
+ this.events.Add(args);
+ }
+
+ public void ResetState()
+ {
+ }
+ }
+}
Added: incubator/kafka/trunk/clients/csharp/src/Kafka/Tests/Kafka.Client.IntegrationTests/ZooKeeperConnectionTests.cs
URL: http://svn.apache.org/viewvc/incubator/kafka/trunk/clients/csharp/src/Kafka/Tests/Kafka.Client.IntegrationTests/ZooKeeperConnectionTests.cs?rev=1173797&view=auto
==============================================================================
--- incubator/kafka/trunk/clients/csharp/src/Kafka/Tests/Kafka.Client.IntegrationTests/ZooKeeperConnectionTests.cs (added)
+++ incubator/kafka/trunk/clients/csharp/src/Kafka/Tests/Kafka.Client.IntegrationTests/ZooKeeperConnectionTests.cs Wed Sep 21 19:17:19 2011
@@ -0,0 +1,120 @@
+/*
+ * Copyright 2011 LinkedIn
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+*/
+
+namespace Kafka.Client.IntegrationTests
+{
+ using System;
+ using System.Collections.Generic;
+ using Kafka.Client.Cfg;
+ using Kafka.Client.ZooKeeperIntegration;
+ using NUnit.Framework;
+ using ZooKeeperNet;
+
+ [TestFixture]
+ public class ZooKeeperConnectionTests
+ {
+ private KafkaClientConfiguration clientConfig;
+
+ [TestFixtureSetUp]
+ public void SetUp()
+ {
+ clientConfig = KafkaClientConfiguration.GetConfiguration();
+ }
+
+ [Test]
+ public void ZooKeeperConnectionCreatesAndDeletesPath()
+ {
+ var producerConfig = new ProducerConfig(clientConfig);
+ using (IZooKeeperConnection connection = new ZooKeeperConnection(producerConfig.ZkConnect))
+ {
+ connection.Connect(null);
+ string pathName = "/" + Guid.NewGuid();
+ connection.Create(pathName, null, CreateMode.Persistent);
+ Assert.IsTrue(connection.Exists(pathName, false));
+ connection.Delete(pathName);
+ Assert.IsFalse(connection.Exists(pathName, false));
+ }
+ }
+
+ [Test]
+ public void ZooKeeperConnectionConnectsAndDisposes()
+ {
+ var producerConfig = new ProducerConfig(clientConfig);
+ IZooKeeperConnection connection;
+ using (connection = new ZooKeeperConnection(producerConfig.ZkConnect))
+ {
+ Assert.IsNull(connection.ClientState);
+ connection.Connect(null);
+ Assert.NotNull(connection.Client);
+ Assert.AreEqual(ZooKeeper.States.CONNECTING, connection.ClientState);
+ }
+
+ Assert.Null(connection.Client);
+ }
+
+ [Test]
+ public void ZooKeeperConnectionCreatesAndGetsCreateTime()
+ {
+ var producerConfig = new ProducerConfig(clientConfig);
+ using (IZooKeeperConnection connection = new ZooKeeperConnection(producerConfig.ZkConnect))
+ {
+ connection.Connect(null);
+ string pathName = "/" + Guid.NewGuid();
+ connection.Create(pathName, null, CreateMode.Persistent);
+ long createTime = connection.GetCreateTime(pathName);
+ Assert.Greater(createTime, 0);
+ connection.Delete(pathName);
+ }
+ }
+
+ [Test]
+ public void ZooKeeperConnectionCreatesAndGetsChildren()
+ {
+ var producerConfig = new ProducerConfig(clientConfig);
+ using (IZooKeeperConnection connection = new ZooKeeperConnection(producerConfig.ZkConnect))
+ {
+ connection.Connect(null);
+ string child = Guid.NewGuid().ToString();
+ string pathName = "/" + child;
+ connection.Create(pathName, null, CreateMode.Persistent);
+ IList<string> children = connection.GetChildren("/", false);
+ Assert.Greater(children.Count, 0);
+ Assert.IsTrue(children.Contains(child));
+ connection.Delete(pathName);
+ }
+ }
+
+ [Test]
+ public void ZooKeeperConnectionWritesAndReadsData()
+ {
+ var producerConfig = new ProducerConfig(clientConfig);
+ using (IZooKeeperConnection connection = new ZooKeeperConnection(producerConfig.ZkConnect))
+ {
+ connection.Connect(null);
+ string child = Guid.NewGuid().ToString();
+ string pathName = "/" + child;
+ connection.Create(pathName, null, CreateMode.Persistent);
+ var sourceData = new byte[2] { 1, 2 };
+ connection.WriteData(pathName, sourceData);
+ byte[] resultData = connection.ReadData(pathName, null, false);
+ Assert.IsNotNull(resultData);
+ Assert.AreEqual(sourceData[0], resultData[0]);
+ Assert.AreEqual(sourceData[1], resultData[1]);
+ connection.Delete(pathName);
+ }
+ }
+ }
+}
Modified: incubator/kafka/trunk/clients/csharp/src/Kafka/Tests/Kafka.Client.Tests/Kafka.Client.Tests.csproj
URL: http://svn.apache.org/viewvc/incubator/kafka/trunk/clients/csharp/src/Kafka/Tests/Kafka.Client.Tests/Kafka.Client.Tests.csproj?rev=1173797&r1=1173796&r2=1173797&view=diff
==============================================================================
--- incubator/kafka/trunk/clients/csharp/src/Kafka/Tests/Kafka.Client.Tests/Kafka.Client.Tests.csproj (original)
+++ incubator/kafka/trunk/clients/csharp/src/Kafka/Tests/Kafka.Client.Tests/Kafka.Client.Tests.csproj Wed Sep 21 19:17:19 2011
@@ -1,70 +1,108 @@
-<?xml version="1.0" encoding="utf-8"?>
-<Project ToolsVersion="4.0" DefaultTargets="Build" xmlns="http://schemas.microsoft.com/developer/msbuild/2003">
- <PropertyGroup>
- <Configuration Condition=" '$(Configuration)' == '' ">Debug</Configuration>
- <Platform Condition=" '$(Platform)' == '' ">AnyCPU</Platform>
- <ProductVersion>8.0.30703</ProductVersion>
- <SchemaVersion>2.0</SchemaVersion>
- <ProjectGuid>{9BA1A0BF-B207-4A11-8883-5F64B113C07D}</ProjectGuid>
- <OutputType>Library</OutputType>
- <AppDesignerFolder>Properties</AppDesignerFolder>
- <RootNamespace>Kafka.Client.Tests</RootNamespace>
- <AssemblyName>Kafka.Client.Tests</AssemblyName>
- <TargetFrameworkVersion>v4.0</TargetFrameworkVersion>
- <FileAlignment>512</FileAlignment>
- </PropertyGroup>
- <PropertyGroup Condition=" '$(Configuration)|$(Platform)' == 'Debug|AnyCPU' ">
- <DebugSymbols>true</DebugSymbols>
- <DebugType>full</DebugType>
- <Optimize>false</Optimize>
- <OutputPath>bin\Debug\</OutputPath>
- <DefineConstants>DEBUG;TRACE</DefineConstants>
- <ErrorReport>prompt</ErrorReport>
- <WarningLevel>4</WarningLevel>
- </PropertyGroup>
- <PropertyGroup Condition=" '$(Configuration)|$(Platform)' == 'Release|AnyCPU' ">
- <DebugType>pdbonly</DebugType>
- <Optimize>true</Optimize>
- <OutputPath>bin\Release\</OutputPath>
- <DefineConstants>TRACE</DefineConstants>
- <ErrorReport>prompt</ErrorReport>
- <WarningLevel>4</WarningLevel>
- </PropertyGroup>
- <ItemGroup>
- <Reference Include="nunit.framework, Version=2.5.9.10348, Culture=neutral, PublicKeyToken=96d09a1eb7f44a77, processorArchitecture=MSIL">
- <SpecificVersion>False</SpecificVersion>
- <HintPath>..\..\..\..\lib\nunit\2.5.9\nunit.framework.dll</HintPath>
- </Reference>
- <Reference Include="System" />
- <Reference Include="System.Core" />
- <Reference Include="System.Xml.Linq" />
- <Reference Include="System.Data.DataSetExtensions" />
- <Reference Include="Microsoft.CSharp" />
- <Reference Include="System.Data" />
- <Reference Include="System.Xml" />
- </ItemGroup>
- <ItemGroup>
- <Compile Include="MessageTests.cs" />
- <Compile Include="Properties\AssemblyInfo.cs" />
- <Compile Include="Request\FetchRequestTests.cs" />
- <Compile Include="Request\MultiFetchRequestTests.cs" />
- <Compile Include="Request\MultiProducerRequestTests.cs" />
- <Compile Include="Request\OffsetRequestTests.cs" />
- <Compile Include="Request\ProducerRequestTests.cs" />
- <Compile Include="Util\BitWorksTests.cs" />
- </ItemGroup>
- <ItemGroup>
- <ProjectReference Include="..\..\Kafka.Client\Kafka.Client.csproj">
- <Project>{A92DD03B-EE4F-4A78-9FB2-279B6348C7D2}</Project>
- <Name>Kafka.Client</Name>
- </ProjectReference>
- </ItemGroup>
- <Import Project="$(MSBuildToolsPath)\Microsoft.CSharp.targets" />
- <!-- To modify your build process, add your task inside one of the targets below and uncomment it.
- Other similar extension points exist, see Microsoft.Common.targets.
- <Target Name="BeforeBuild">
- </Target>
- <Target Name="AfterBuild">
- </Target>
- -->
+<?xml version="1.0" encoding="utf-8"?>
+<Project ToolsVersion="4.0" DefaultTargets="Build" xmlns="http://schemas.microsoft.com/developer/msbuild/2003">
+ <PropertyGroup>
+ <Configuration Condition=" '$(Configuration)' == '' ">Debug</Configuration>
+ <Platform Condition=" '$(Platform)' == '' ">AnyCPU</Platform>
+ <ProductVersion>8.0.30703</ProductVersion>
+ <SchemaVersion>2.0</SchemaVersion>
+ <ProjectGuid>{9BA1A0BF-B207-4A11-8883-5F64B113C07D}</ProjectGuid>
+ <OutputType>Library</OutputType>
+ <AppDesignerFolder>Properties</AppDesignerFolder>
+ <RootNamespace>Kafka.Client.Tests</RootNamespace>
+ <AssemblyName>Kafka.Client.Tests</AssemblyName>
+ <TargetFrameworkVersion>v4.0</TargetFrameworkVersion>
+ <FileAlignment>512</FileAlignment>
+ <CodeContractsAssemblyMode>0</CodeContractsAssemblyMode>
+ </PropertyGroup>
+ <PropertyGroup Condition=" '$(Configuration)|$(Platform)' == 'Debug|AnyCPU' ">
+ <DebugSymbols>true</DebugSymbols>
+ <DebugType>full</DebugType>
+ <Optimize>false</Optimize>
+ <OutputPath>bin\Debug\</OutputPath>
+ <DefineConstants>DEBUG;TRACE</DefineConstants>
+ <ErrorReport>prompt</ErrorReport>
+ <WarningLevel>4</WarningLevel>
+ <StyleCopTreatErrorsAsWarnings>true</StyleCopTreatErrorsAsWarnings>
+ <CodeContractsEnableRuntimeChecking>False</CodeContractsEnableRuntimeChecking>
+ <CodeContractsRuntimeOnlyPublicSurface>False</CodeContractsRuntimeOnlyPublicSurface>
+ <CodeContractsRuntimeThrowOnFailure>True</CodeContractsRuntimeThrowOnFailure>
+ <CodeContractsRuntimeCallSiteRequires>False</CodeContractsRuntimeCallSiteRequires>
+ <CodeContractsRuntimeSkipQuantifiers>False</CodeContractsRuntimeSkipQuantifiers>
+ <CodeContractsRunCodeAnalysis>False</CodeContractsRunCodeAnalysis>
+ <CodeContractsNonNullObligations>False</CodeContractsNonNullObligations>
+ <CodeContractsBoundsObligations>False</CodeContractsBoundsObligations>
+ <CodeContractsArithmeticObligations>False</CodeContractsArithmeticObligations>
+ <CodeContractsEnumObligations>False</CodeContractsEnumObligations>
+ <CodeContractsRedundantAssumptions>False</CodeContractsRedundantAssumptions>
+ <CodeContractsRunInBackground>True</CodeContractsRunInBackground>
+ <CodeContractsShowSquigglies>False</CodeContractsShowSquigglies>
+ <CodeContractsUseBaseLine>False</CodeContractsUseBaseLine>
+ <CodeContractsEmitXMLDocs>False</CodeContractsEmitXMLDocs>
+ <CodeContractsCustomRewriterAssembly />
+ <CodeContractsCustomRewriterClass />
+ <CodeContractsLibPaths />
+ <CodeContractsExtraRewriteOptions />
+ <CodeContractsExtraAnalysisOptions />
+ <CodeContractsBaseLineFile />
+ <CodeContractsCacheAnalysisResults>False</CodeContractsCacheAnalysisResults>
+ <CodeContractsRuntimeCheckingLevel>Full</CodeContractsRuntimeCheckingLevel>
+ <CodeContractsReferenceAssembly>%28none%29</CodeContractsReferenceAssembly>
+ <CodeContractsAnalysisWarningLevel>0</CodeContractsAnalysisWarningLevel>
+ </PropertyGroup>
+ <PropertyGroup Condition=" '$(Configuration)|$(Platform)' == 'Release|AnyCPU' ">
+ <DebugType>pdbonly</DebugType>
+ <Optimize>true</Optimize>
+ <OutputPath>bin\Release\</OutputPath>
+ <DefineConstants>TRACE</DefineConstants>
+ <ErrorReport>prompt</ErrorReport>
+ <WarningLevel>4</WarningLevel>
+ <StyleCopTreatErrorsAsWarnings>false</StyleCopTreatErrorsAsWarnings>
+ </PropertyGroup>
+ <PropertyGroup Condition="'$(Configuration)|$(Platform)' == 'Integration|AnyCPU'">
+ <DebugSymbols>true</DebugSymbols>
+ <OutputPath>bin\Integration\</OutputPath>
+ <DefineConstants>DEBUG;TRACE</DefineConstants>
+ <DebugType>full</DebugType>
+ <PlatformTarget>AnyCPU</PlatformTarget>
+ <ErrorReport>prompt</ErrorReport>
+ <CodeAnalysisIgnoreBuiltInRuleSets>true</CodeAnalysisIgnoreBuiltInRuleSets>
+ <CodeAnalysisIgnoreBuiltInRules>true</CodeAnalysisIgnoreBuiltInRules>
+ <CodeAnalysisFailOnMissingRules>true</CodeAnalysisFailOnMissingRules>
+ <StyleCopTreatErrorsAsWarnings>false</StyleCopTreatErrorsAsWarnings>
+ </PropertyGroup>
+ <ItemGroup>
+ <Reference Include="log4net">
+ <HintPath>..\..\..\..\lib\log4Net\log4net.dll</HintPath>
+ </Reference>
+ <Reference Include="nunit.framework, Version=2.5.9.10348, Culture=neutral, PublicKeyToken=96d09a1eb7f44a77, processorArchitecture=MSIL">
+ <SpecificVersion>False</SpecificVersion>
+ <HintPath>..\..\..\..\lib\nunit\2.5.9\nunit.framework.dll</HintPath>
+ </Reference>
+ <Reference Include="System" />
+ <Reference Include="System.Core" />
+ <Reference Include="Microsoft.CSharp" />
+ </ItemGroup>
+ <ItemGroup>
+ <Compile Include="MessageSetTests.cs" />
+ <Compile Include="MessageTests.cs" />
+ <Compile Include="Producers\PartitioningTests.cs" />
+ <Compile Include="Properties\AssemblyInfo.cs" />
+ <Compile Include="Request\FetchRequestTests.cs" />
+ <Compile Include="Request\MultiFetchRequestTests.cs" />
+ <Compile Include="Request\MultiProducerRequestTests.cs" />
+ <Compile Include="Request\OffsetRequestTests.cs" />
+ <Compile Include="Request\ProducerRequestTests.cs" />
+ <Compile Include="Util\BitWorksTests.cs" />
+ </ItemGroup>
+ <ItemGroup>
+ <ProjectReference Include="..\..\Kafka.Client\Kafka.Client.csproj">
+ <Project>{A92DD03B-EE4F-4A78-9FB2-279B6348C7D2}</Project>
+ <Name>Kafka.Client</Name>
+ </ProjectReference>
+ </ItemGroup>
+ <ItemGroup>
+ <Folder Include="ZooKeeper\" />
+ </ItemGroup>
+ <Import Project="$(MSBuildToolsPath)\Microsoft.CSharp.targets" />
+ <Import Project="..\..\..\..\lib\StyleCop\Microsoft.StyleCop.Targets" />
</Project>
\ No newline at end of file
Added: incubator/kafka/trunk/clients/csharp/src/Kafka/Tests/Kafka.Client.Tests/MessageSetTests.cs
URL: http://svn.apache.org/viewvc/incubator/kafka/trunk/clients/csharp/src/Kafka/Tests/Kafka.Client.Tests/MessageSetTests.cs?rev=1173797&view=auto
==============================================================================
--- incubator/kafka/trunk/clients/csharp/src/Kafka/Tests/Kafka.Client.Tests/MessageSetTests.cs (added)
+++ incubator/kafka/trunk/clients/csharp/src/Kafka/Tests/Kafka.Client.Tests/MessageSetTests.cs Wed Sep 21 19:17:19 2011
@@ -0,0 +1,105 @@
+/*
+ * Copyright 2011 LinkedIn
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+*/
+
+namespace Kafka.Client.Tests
+{
+ using System;
+ using System.Collections.Generic;
+ using System.IO;
+ using Kafka.Client.Messages;
+ using Kafka.Client.Utils;
+ using NUnit.Framework;
+
+ [TestFixture]
+ public class MessageSetTests
+ {
+ private const int MessageLengthPartLength = 4;
+ private const int MagicNumberPartLength = 1;
+ private const int ChecksumPartLength = 4;
+
+ private const int MessageLengthPartOffset = 0;
+ private const int MagicNumberPartOffset = 4;
+ private const int ChecksumPartOffset = 5;
+ private const int DataPartOffset = 9;
+
+ [Test]
+ public void BufferedMessageSetWriteToValidSequence()
+ {
+ byte[] messageBytes = new byte[] { 1, 2, 3, 4, 5 };
+ Message msg1 = new Message(messageBytes);
+ Message msg2 = new Message(messageBytes);
+ MessageSet messageSet = new BufferedMessageSet(new List<Message>() { msg1, msg2 });
+ MemoryStream ms = new MemoryStream();
+ messageSet.WriteTo(ms);
+
+ ////first message
+
+ byte[] messageLength = new byte[MessageLengthPartLength];
+ Array.Copy(ms.ToArray(), MessageLengthPartOffset, messageLength, 0, MessageLengthPartLength);
+ if (BitConverter.IsLittleEndian)
+ {
+ Array.Reverse(messageLength);
+ }
+
+ Assert.AreEqual(MagicNumberPartLength + ChecksumPartLength + messageBytes.Length, BitConverter.ToInt32(messageLength, 0));
+
+ Assert.AreEqual(0, ms.ToArray()[MagicNumberPartOffset]); // default magic number should be 0
+
+ byte[] checksumPart = new byte[ChecksumPartLength];
+ Array.Copy(ms.ToArray(), ChecksumPartOffset, checksumPart, 0, ChecksumPartLength);
+ Assert.AreEqual(Crc32Hasher.Compute(messageBytes), checksumPart);
+
+ byte[] dataPart = new byte[messageBytes.Length];
+ Array.Copy(ms.ToArray(), DataPartOffset, dataPart, 0, messageBytes.Length);
+ Assert.AreEqual(messageBytes, dataPart);
+
+ ////second message
+ int secondMessageOffset = MessageLengthPartLength + MagicNumberPartLength + ChecksumPartLength +
+ messageBytes.Length;
+
+ messageLength = new byte[MessageLengthPartLength];
+ Array.Copy(ms.ToArray(), secondMessageOffset + MessageLengthPartOffset, messageLength, 0, MessageLengthPartLength);
+ if (BitConverter.IsLittleEndian)
+ {
+ Array.Reverse(messageLength);
+ }
+
+ Assert.AreEqual(MagicNumberPartLength + ChecksumPartLength + messageBytes.Length, BitConverter.ToInt32(messageLength, 0));
+
+ Assert.AreEqual(0, ms.ToArray()[secondMessageOffset + MagicNumberPartOffset]); // default magic number should be 0
+
+ checksumPart = new byte[ChecksumPartLength];
+ Array.Copy(ms.ToArray(), secondMessageOffset + ChecksumPartOffset, checksumPart, 0, ChecksumPartLength);
+ Assert.AreEqual(Crc32Hasher.Compute(messageBytes), checksumPart);
+
+ dataPart = new byte[messageBytes.Length];
+ Array.Copy(ms.ToArray(), secondMessageOffset + DataPartOffset, dataPart, 0, messageBytes.Length);
+ Assert.AreEqual(messageBytes, dataPart);
+ }
+
+ [Test]
+ public void SetSizeValid()
+ {
+ byte[] messageBytes = new byte[] { 1, 2, 3, 4, 5 };
+ Message msg1 = new Message(messageBytes);
+ Message msg2 = new Message(messageBytes);
+ MessageSet messageSet = new BufferedMessageSet(new List<Message>() { msg1, msg2 });
+ Assert.AreEqual(
+ 2 * (MessageLengthPartLength + MagicNumberPartLength + ChecksumPartLength + messageBytes.Length),
+ messageSet.SetSize);
+ }
+ }
+}
Modified: incubator/kafka/trunk/clients/csharp/src/Kafka/Tests/Kafka.Client.Tests/MessageTests.cs
URL: http://svn.apache.org/viewvc/incubator/kafka/trunk/clients/csharp/src/Kafka/Tests/Kafka.Client.Tests/MessageTests.cs?rev=1173797&r1=1173796&r2=1173797&view=diff
==============================================================================
--- incubator/kafka/trunk/clients/csharp/src/Kafka/Tests/Kafka.Client.Tests/MessageTests.cs (original)
+++ incubator/kafka/trunk/clients/csharp/src/Kafka/Tests/Kafka.Client.Tests/MessageTests.cs Wed Sep 21 19:17:19 2011
@@ -1,68 +1,130 @@
-using System;
-using System.Linq;
-using System.Text;
-using Kafka.Client.Util;
-using NUnit.Framework;
-
-namespace Kafka.Client.Tests
-{
- /// <summary>
- /// Tests for the <see cref="Message"/> class.
- /// </summary>
- [TestFixture]
- public class MessageTests
- {
- /// <summary>
- /// Demonstrates a properly parsed message.
- /// </summary>
- [Test]
- public void ParseFromValid()
- {
- Crc32 crc32 = new Crc32();
-
- string payload = "kafka";
- byte magic = 0;
- byte[] payloadData = Encoding.UTF8.GetBytes(payload);
- byte[] payloadSize = BitConverter.GetBytes(payloadData.Length);
- byte[] checksum = crc32.ComputeHash(payloadData);
- byte[] messageData = new byte[payloadData.Length + 1 + payloadSize.Length + checksum.Length];
-
- Buffer.BlockCopy(payloadSize, 0, messageData, 0, payloadSize.Length);
- messageData[4] = magic;
- Buffer.BlockCopy(checksum, 0, messageData, payloadSize.Length + 1, checksum.Length);
- Buffer.BlockCopy(payloadData, 0, messageData, payloadSize.Length + 1 + checksum.Length, payloadData.Length);
-
- Message message = Message.ParseFrom(messageData);
-
- Assert.IsNotNull(message);
- Assert.AreEqual(magic, message.Magic);
- Assert.IsTrue(payloadData.SequenceEqual(message.Payload));
- Assert.IsTrue(checksum.SequenceEqual(message.Checksum));
- }
-
- /// <summary>
- /// Ensure that the bytes returned from the message are in valid kafka sequence.
- /// </summary>
- [Test]
- public void GetBytesValidSequence()
- {
- Message message = new Message(new byte[10], (byte)245);
-
- byte[] bytes = message.GetBytes();
-
- Assert.IsNotNull(bytes);
-
- // len(payload) + 1 + 4
- Assert.AreEqual(15, bytes.Length);
-
- // first 4 bytes = the magic number
- Assert.AreEqual((byte)245, bytes[0]);
-
- // next 4 bytes = the checksum
- Assert.IsTrue(message.Checksum.SequenceEqual(bytes.Skip(1).Take(4).ToArray<byte>()));
-
- // remaining bytes = the payload
- Assert.AreEqual(10, bytes.Skip(5).ToArray<byte>().Length);
- }
- }
-}
+/*
+ * Copyright 2011 LinkedIn
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+*/
+
+namespace Kafka.Client.Tests
+{
+ using System;
+ using System.IO;
+ using System.Linq;
+ using System.Text;
+ using Kafka.Client.Messages;
+ using Kafka.Client.Utils;
+ using NUnit.Framework;
+
+ /// <summary>
+ /// Tests for the <see cref="Message"/> class.
+ /// </summary>
+ [TestFixture]
+ public class MessageTests
+ {
+ private readonly int ChecksumPartLength = 4;
+
+ private readonly int MagicNumberPartOffset = 0;
+ private readonly int ChecksumPartOffset = 1;
+ private readonly int DataPartOffset = 5;
+
+ /// <summary>
+ /// Demonstrates a properly parsed message.
+ /// </summary>
+ [Test]
+ public void ParseFromValid()
+ {
+ Crc32Hasher crc32 = new Crc32Hasher();
+
+ string payload = "kafka";
+ byte magic = 0;
+ byte[] payloadData = Encoding.UTF8.GetBytes(payload);
+ byte[] payloadSize = BitConverter.GetBytes(payloadData.Length);
+ byte[] checksum = crc32.ComputeHash(payloadData);
+ byte[] messageData = new byte[payloadData.Length + 1 + payloadSize.Length + checksum.Length];
+
+ Buffer.BlockCopy(payloadSize, 0, messageData, 0, payloadSize.Length);
+ messageData[4] = magic;
+ Buffer.BlockCopy(checksum, 0, messageData, payloadSize.Length + 1, checksum.Length);
+ Buffer.BlockCopy(payloadData, 0, messageData, payloadSize.Length + 1 + checksum.Length, payloadData.Length);
+
+ Message message = Message.ParseFrom(messageData);
+
+ Assert.IsNotNull(message);
+ Assert.AreEqual(magic, message.Magic);
+ Assert.IsTrue(payloadData.SequenceEqual(message.Payload));
+ Assert.IsTrue(checksum.SequenceEqual(message.Checksum));
+ }
+
+ /// <summary>
+ /// Ensure that the bytes returned from the message are in valid kafka sequence.
+ /// </summary>
+ [Test]
+ public void GetBytesValidSequence()
+ {
+ Message message = new Message(new byte[10], (byte)245);
+
+ MemoryStream ms = new MemoryStream();
+ message.WriteTo(ms);
+
+ // len(payload) + 1 + 4
+ Assert.AreEqual(15, ms.Length);
+
+ // first 4 bytes = the magic number
+ Assert.AreEqual((byte)245, ms.ToArray()[0]);
+
+ // next 4 bytes = the checksum
+ Assert.IsTrue(message.Checksum.SequenceEqual(ms.ToArray().Skip(1).Take(4).ToArray<byte>()));
+
+ // remaining bytes = the payload
+ Assert.AreEqual(10, ms.ToArray().Skip(5).ToArray<byte>().Length);
+ }
+
+ [Test]
+ public void WriteToValidSequenceForDefaultConstructor()
+ {
+ byte[] messageBytes = new byte[] { 1, 2, 3, 4, 5, 6, 7, 8, 9 };
+ Message message = new Message(messageBytes);
+ MemoryStream ms = new MemoryStream();
+ message.WriteTo(ms);
+
+ Assert.AreEqual(0, ms.ToArray()[MagicNumberPartOffset]); // default magic number should be 0
+
+ byte[] checksumPart = new byte[ChecksumPartLength];
+ Array.Copy(ms.ToArray(), ChecksumPartOffset, checksumPart, 0, ChecksumPartLength);
+ Assert.AreEqual(Crc32Hasher.Compute(messageBytes), checksumPart);
+
+ byte[] dataPart = new byte[messageBytes.Length];
+ Array.Copy(ms.ToArray(), DataPartOffset, dataPart, 0, messageBytes.Length);
+ Assert.AreEqual(messageBytes, dataPart);
+ }
+
+ [Test]
+ public void WriteToValidSequenceForCustomConstructor()
+ {
+ byte[] messageBytes = new byte[] { 1, 2, 3, 4, 5, 6, 7, 8, 9 };
+ byte[] customChecksum = new byte[] { 3, 4, 5, 6 };
+ Message message = new Message(messageBytes, (byte)33, customChecksum);
+ MemoryStream ms = new MemoryStream();
+ message.WriteTo(ms);
+
+ Assert.AreEqual((byte)33, ms.ToArray()[MagicNumberPartOffset]);
+
+ byte[] checksumPart = new byte[ChecksumPartLength];
+ Array.Copy(ms.ToArray(), ChecksumPartOffset, checksumPart, 0, ChecksumPartLength);
+ Assert.AreEqual(customChecksum, checksumPart);
+
+ byte[] dataPart = new byte[messageBytes.Length];
+ Array.Copy(ms.ToArray(), DataPartOffset, dataPart, 0, messageBytes.Length);
+ Assert.AreEqual(messageBytes, dataPart);
+ }
+ }
+}