You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by ju...@apache.org on 2011/09/21 21:17:25 UTC

svn commit: r1173797 [9/10] - in /incubator/kafka/trunk/clients/csharp: ./ lib/StyleCop/ src/Kafka/ src/Kafka/Kafka.Client/ src/Kafka/Kafka.Client/Cfg/ src/Kafka/Kafka.Client/Cluster/ src/Kafka/Kafka.Client/Consumers/ src/Kafka/Kafka.Client/Exceptions/...

Added: incubator/kafka/trunk/clients/csharp/src/Kafka/Tests/Kafka.Client.IntegrationTests/TestHelper.cs
URL: http://svn.apache.org/viewvc/incubator/kafka/trunk/clients/csharp/src/Kafka/Tests/Kafka.Client.IntegrationTests/TestHelper.cs?rev=1173797&view=auto
==============================================================================
--- incubator/kafka/trunk/clients/csharp/src/Kafka/Tests/Kafka.Client.IntegrationTests/TestHelper.cs (added)
+++ incubator/kafka/trunk/clients/csharp/src/Kafka/Tests/Kafka.Client.IntegrationTests/TestHelper.cs Wed Sep 21 19:17:19 2011
@@ -0,0 +1,50 @@
+/*
+ * Copyright 2011 LinkedIn
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+*/
+
+namespace Kafka.Client.IntegrationTests
+{
+    using System;
+    using System.Collections.Generic;
+    using Kafka.Client.Cfg;
+    using Kafka.Client.Consumers;
+    using Kafka.Client.Requests;
+
+    public static class TestHelper
+    {
+        public static long GetCurrentKafkaOffset(string topic, KafkaClientConfiguration clientConfig)
+        {
+            return GetCurrentKafkaOffset(topic, clientConfig.KafkaServer.Address, clientConfig.KafkaServer.Port);
+        }
+
+        public static long GetCurrentKafkaOffset(string topic, string address, int port)
+        {
+            OffsetRequest request = new OffsetRequest(topic, 0, DateTime.Now.AddDays(-5).Ticks, 10);
+            ConsumerConfig consumerConfig = new ConsumerConfig();
+            consumerConfig.Host = address;
+            consumerConfig.Port = port;
+            IConsumer consumer = new Consumers.Consumer(consumerConfig);
+            IList<long> list = consumer.GetOffsetsBefore(request);
+            if (list.Count > 0)
+            {
+                return list[0];
+            }
+            else
+            {
+                return 0;
+            }
+        }
+    }
+}

Added: incubator/kafka/trunk/clients/csharp/src/Kafka/Tests/Kafka.Client.IntegrationTests/TestMultipleBrokersHelper.cs
URL: http://svn.apache.org/viewvc/incubator/kafka/trunk/clients/csharp/src/Kafka/Tests/Kafka.Client.IntegrationTests/TestMultipleBrokersHelper.cs?rev=1173797&view=auto
==============================================================================
--- incubator/kafka/trunk/clients/csharp/src/Kafka/Tests/Kafka.Client.IntegrationTests/TestMultipleBrokersHelper.cs (added)
+++ incubator/kafka/trunk/clients/csharp/src/Kafka/Tests/Kafka.Client.IntegrationTests/TestMultipleBrokersHelper.cs Wed Sep 21 19:17:19 2011
@@ -0,0 +1,80 @@
+/*
+ * Copyright 2011 LinkedIn
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+*/
+
+namespace Kafka.Client.IntegrationTests
+{
+    using System.Collections.Generic;
+    using Kafka.Client.Cfg;
+
+    public class TestMultipleBrokersHelper
+    {
+        private BrokerPartitionInfoCollection configBrokers =
+            KafkaClientConfiguration.GetConfiguration().BrokerPartitionInfos;
+
+        private Dictionary<int, long> offsets = new Dictionary<int, long>();
+
+        private BrokerPartitionInfo changedBroker;
+
+        private string topic;
+
+        public TestMultipleBrokersHelper(string topic)
+        {
+            this.topic = topic;
+        }
+
+        public BrokerPartitionInfo BrokerThatHasChanged
+        {
+            get { return changedBroker; }
+        }
+
+        public long OffsetFromBeforeTheChange
+        {
+            get
+            {
+                if (changedBroker != null)
+                {
+                    return offsets[changedBroker.Id];
+                }
+                else
+                {
+                    return 0;
+                }
+            }
+        }
+
+        public void GetCurrentOffsets()
+        {
+            foreach (BrokerPartitionInfo broker in configBrokers)
+            {
+                offsets.Add(broker.Id, TestHelper.GetCurrentKafkaOffset(topic, broker.Address, broker.Port));
+            }
+        }
+
+        public bool CheckIfAnyBrokerHasChanged()
+        {
+            foreach (BrokerPartitionInfo broker in configBrokers)
+            {
+                if (TestHelper.GetCurrentKafkaOffset(topic, broker.Address, broker.Port) != offsets[broker.Id])
+                {
+                    changedBroker = broker;
+                    return true;
+                }
+            }
+
+            return false;
+        }
+    }
+}

Added: incubator/kafka/trunk/clients/csharp/src/Kafka/Tests/Kafka.Client.IntegrationTests/TestsSetup.cs
URL: http://svn.apache.org/viewvc/incubator/kafka/trunk/clients/csharp/src/Kafka/Tests/Kafka.Client.IntegrationTests/TestsSetup.cs?rev=1173797&view=auto
==============================================================================
--- incubator/kafka/trunk/clients/csharp/src/Kafka/Tests/Kafka.Client.IntegrationTests/TestsSetup.cs (added)
+++ incubator/kafka/trunk/clients/csharp/src/Kafka/Tests/Kafka.Client.IntegrationTests/TestsSetup.cs Wed Sep 21 19:17:19 2011
@@ -0,0 +1,34 @@
+/*
+ * Copyright 2011 LinkedIn
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+*/
+
+namespace Kafka.Client.IntegrationTests
+{
+    using log4net;
+    using log4net.Config;
+    using NUnit.Framework;
+
+    [SetUpFixture]
+    public class TestsSetup
+    {
+        [SetUp]
+        public void Setup()
+        {
+            XmlConfigurator.Configure();
+            ILog logger = LogManager.GetLogger(typeof(TestsSetup));
+            logger.Info("Start logging");
+        }
+    }
+}

Added: incubator/kafka/trunk/clients/csharp/src/Kafka/Tests/Kafka.Client.IntegrationTests/ZKBrokerPartitionInfoTests.cs
URL: http://svn.apache.org/viewvc/incubator/kafka/trunk/clients/csharp/src/Kafka/Tests/Kafka.Client.IntegrationTests/ZKBrokerPartitionInfoTests.cs?rev=1173797&view=auto
==============================================================================
--- incubator/kafka/trunk/clients/csharp/src/Kafka/Tests/Kafka.Client.IntegrationTests/ZKBrokerPartitionInfoTests.cs (added)
+++ incubator/kafka/trunk/clients/csharp/src/Kafka/Tests/Kafka.Client.IntegrationTests/ZKBrokerPartitionInfoTests.cs Wed Sep 21 19:17:19 2011
@@ -0,0 +1,455 @@
+/*
+ * Copyright 2011 LinkedIn
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+*/
+
+namespace Kafka.Client.IntegrationTests
+{
+    using System.Collections.Generic;
+    using System.Linq;
+    using Kafka.Client.Cfg;
+    using Kafka.Client.Cluster;
+    using Kafka.Client.Producers.Partitioning;
+    using Kafka.Client.Utils;
+    using Kafka.Client.ZooKeeperIntegration;
+    using Kafka.Client.ZooKeeperIntegration.Listeners;
+    using NUnit.Framework;
+    using ZooKeeperNet;
+
+    [TestFixture]
+    public class ZKBrokerPartitionInfoTests : IntegrationFixtureBase
+    {
+        private KafkaClientConfiguration clientConfig;
+        private ZKConfig zkConfig;
+
+        [TestFixtureSetUp]
+        public void SetUp()
+        {
+            clientConfig = KafkaClientConfiguration.GetConfiguration();
+            zkConfig = new ProducerConfig(clientConfig);
+        }
+
+        [Test]
+        public void ZKBrokerPartitionInfoGetsAllBrokerInfo()
+        {
+            IDictionary<int, Broker> allBrokerInfo;
+            using (var brokerPartitionInfo = new ZKBrokerPartitionInfo(zkConfig, null))
+            {
+                allBrokerInfo = brokerPartitionInfo.GetAllBrokerInfo();
+            }
+
+            Assert.AreEqual(clientConfig.BrokerPartitionInfos.Count, allBrokerInfo.Count);
+            foreach (BrokerPartitionInfo cfgBrPartInfo in clientConfig.BrokerPartitionInfos)
+            {
+                Assert.IsTrue(allBrokerInfo.ContainsKey(cfgBrPartInfo.Id));
+                Assert.AreEqual(cfgBrPartInfo.Address, allBrokerInfo[cfgBrPartInfo.Id].Host);
+                Assert.AreEqual(cfgBrPartInfo.Port, allBrokerInfo[cfgBrPartInfo.Id].Port);
+            }
+        }
+
+        [Test]
+        public void ZKBrokerPartitionInfoGetsBrokerPartitionInfo()
+        {
+            SortedSet<Partition> partitions;
+            using (var brokerPartitionInfo = new ZKBrokerPartitionInfo(zkConfig, null))
+            {
+                partitions = brokerPartitionInfo.GetBrokerPartitionInfo("test");
+            }
+
+            Assert.NotNull(partitions);
+            Assert.GreaterOrEqual(partitions.Count, 2);
+            var partition = partitions.ToList()[0];
+            Assert.AreEqual(0, partition.BrokerId);
+        }
+
+        [Test]
+        public void ZkBrokerPartitionInfoGetsBrokerInfo()
+        {
+            using (var brokerPartitionInfo = new ZKBrokerPartitionInfo(zkConfig, null))
+            {
+                var testBroker = clientConfig.BrokerPartitionInfos[0];
+                Broker broker = brokerPartitionInfo.GetBrokerInfo(testBroker.Id);
+                Assert.NotNull(broker);
+                Assert.AreEqual(testBroker.Address, broker.Host);
+                Assert.AreEqual(testBroker.Port, broker.Port);
+            }
+        }
+
+        [Test]
+        public void WhenNewTopicIsAddedBrokerTopicsListenerCreatesNewMapping()
+        {
+            var producerConfig = new ProducerConfig(clientConfig);
+            IDictionary<string, SortedSet<Partition>> mappings;
+            IDictionary<int, Broker> brokers;
+            string topicPath = ZooKeeperClient.DefaultBrokerTopicsPath + "/" + CurrentTestTopic;
+
+            using (IZooKeeperClient client = new ZooKeeperClient(
+                producerConfig.ZkConnect,
+                producerConfig.ZkSessionTimeoutMs,
+                ZooKeeperStringSerializer.Serializer))
+            {
+                using (var brokerPartitionInfo = new ZKBrokerPartitionInfo(client))
+                {
+                    brokers = brokerPartitionInfo.GetAllBrokerInfo();
+                    mappings =
+                        ReflectionHelper.GetInstanceField<IDictionary<string, SortedSet<Partition>>>(
+                            "topicBrokerPartitions", brokerPartitionInfo);
+                }
+            }
+
+            Assert.NotNull(brokers);
+            Assert.Greater(brokers.Count, 0);
+            Assert.NotNull(mappings);
+            Assert.Greater(mappings.Count, 0);
+            using (IZooKeeperClient client = new ZooKeeperClient(
+                producerConfig.ZkConnect,
+                producerConfig.ZkSessionTimeoutMs,
+                ZooKeeperStringSerializer.Serializer))
+            {
+                client.Connect();
+                WaitUntillIdle(client, 500);
+                var brokerTopicsListener = new BrokerTopicsListener(client, mappings, brokers, null);
+                client.Subscribe(ZooKeeperClient.DefaultBrokerTopicsPath, brokerTopicsListener);
+                client.CreatePersistent(topicPath, true);
+                WaitUntillIdle(client, 500);
+                client.UnsubscribeAll();
+                client.DeleteRecursive(topicPath);
+            }
+
+            Assert.IsTrue(mappings.ContainsKey(CurrentTestTopic));
+        }
+
+        [Test]
+        public void WhenNewBrokerIsAddedBrokerTopicsListenerUpdatesBrokersList()
+        {
+            var producerConfig = new ProducerConfig(clientConfig);
+            IDictionary<string, SortedSet<Partition>> mappings;
+            IDictionary<int, Broker> brokers;
+            string brokerPath = ZooKeeperClient.DefaultBrokerIdsPath + "/" + 2345;
+            using (IZooKeeperClient client = new ZooKeeperClient(
+                producerConfig.ZkConnect,
+                producerConfig.ZkSessionTimeoutMs,
+                ZooKeeperStringSerializer.Serializer))
+            {
+                using (var brokerPartitionInfo = new ZKBrokerPartitionInfo(client))
+                {
+                    brokers = brokerPartitionInfo.GetAllBrokerInfo();
+                    mappings =
+                        ReflectionHelper.GetInstanceField<IDictionary<string, SortedSet<Partition>>>(
+                            "topicBrokerPartitions", brokerPartitionInfo);
+                }
+            }
+
+            Assert.NotNull(brokers);
+            Assert.Greater(brokers.Count, 0);
+            Assert.NotNull(mappings);
+            Assert.Greater(mappings.Count, 0);
+            using (IZooKeeperClient client = new ZooKeeperClient(
+                producerConfig.ZkConnect,
+                producerConfig.ZkSessionTimeoutMs,
+                ZooKeeperStringSerializer.Serializer))
+            {
+                client.Connect();
+                WaitUntillIdle(client, 500);
+                var brokerTopicsListener = new BrokerTopicsListener(client, mappings, brokers, null);
+                client.Subscribe(ZooKeeperClient.DefaultBrokerIdsPath, brokerTopicsListener);
+                client.CreatePersistent(brokerPath, true);
+                client.WriteData(brokerPath, "192.168.1.39-1310449279123:192.168.1.39:9102");
+                WaitUntillIdle(client, 500);
+                client.UnsubscribeAll();
+                client.DeleteRecursive(brokerPath);
+            }
+
+            Assert.IsTrue(brokers.ContainsKey(2345));
+            Assert.AreEqual("192.168.1.39", brokers[2345].Host);
+            Assert.AreEqual(9102, brokers[2345].Port);
+            Assert.AreEqual(2345, brokers[2345].Id);
+        }
+
+        [Test]
+        public void WhenBrokerIsRemovedBrokerTopicsListenerUpdatesBrokersList()
+        {
+            var producerConfig = new ProducerConfig(clientConfig);
+            IDictionary<string, SortedSet<Partition>> mappings;
+            IDictionary<int, Broker> brokers;
+            string brokerPath = ZooKeeperClient.DefaultBrokerIdsPath + "/" + 2345;
+            using (IZooKeeperClient client = new ZooKeeperClient(
+                producerConfig.ZkConnect,
+                producerConfig.ZkSessionTimeoutMs,
+                ZooKeeperStringSerializer.Serializer))
+            {
+                using (var brokerPartitionInfo = new ZKBrokerPartitionInfo(client))
+                {
+                    brokers = brokerPartitionInfo.GetAllBrokerInfo();
+                    mappings =
+                        ReflectionHelper.GetInstanceField<IDictionary<string, SortedSet<Partition>>>(
+                            "topicBrokerPartitions", brokerPartitionInfo);
+                }
+            }
+
+            Assert.NotNull(brokers);
+            Assert.Greater(brokers.Count, 0);
+            Assert.NotNull(mappings);
+            Assert.Greater(mappings.Count, 0);
+            using (IZooKeeperClient client = new ZooKeeperClient(
+                producerConfig.ZkConnect,
+                producerConfig.ZkSessionTimeoutMs,
+                ZooKeeperStringSerializer.Serializer))
+            {
+                client.Connect();
+                WaitUntillIdle(client, 500); 
+                var brokerTopicsListener = new BrokerTopicsListener(client, mappings, brokers, null);
+                client.Subscribe(ZooKeeperClient.DefaultBrokerIdsPath, brokerTopicsListener);
+                client.CreatePersistent(brokerPath, true);
+                client.WriteData(brokerPath, "192.168.1.39-1310449279123:192.168.1.39:9102");
+                WaitUntillIdle(client, 500); 
+                Assert.IsTrue(brokers.ContainsKey(2345));
+                client.DeleteRecursive(brokerPath);
+                WaitUntillIdle(client, 500); 
+                Assert.IsFalse(brokers.ContainsKey(2345));
+            }
+        }
+
+        [Test]
+        public void WhenNewBrokerInTopicIsAddedBrokerTopicsListenerUpdatesMappings()
+        {
+            var producerConfig = new ProducerConfig(clientConfig);
+            IDictionary<string, SortedSet<Partition>> mappings;
+            IDictionary<int, Broker> brokers;
+            string brokerPath = ZooKeeperClient.DefaultBrokerIdsPath + "/" + 2345;
+            string topicPath = ZooKeeperClient.DefaultBrokerTopicsPath + "/" + CurrentTestTopic;
+            string topicBrokerPath = topicPath + "/" + 2345;
+            using (IZooKeeperClient client = new ZooKeeperClient(
+                producerConfig.ZkConnect,
+                producerConfig.ZkSessionTimeoutMs,
+                ZooKeeperStringSerializer.Serializer))
+            {
+                using (var brokerPartitionInfo = new ZKBrokerPartitionInfo(client))
+                {
+                    brokers = brokerPartitionInfo.GetAllBrokerInfo();
+                    mappings =
+                        ReflectionHelper.GetInstanceField<IDictionary<string, SortedSet<Partition>>>(
+                            "topicBrokerPartitions", brokerPartitionInfo);
+                }
+            }
+
+            Assert.NotNull(brokers);
+            Assert.Greater(brokers.Count, 0);
+            Assert.NotNull(mappings);
+            Assert.Greater(mappings.Count, 0);
+            using (IZooKeeperClient client = new ZooKeeperClient(
+                producerConfig.ZkConnect,
+                producerConfig.ZkSessionTimeoutMs,
+                ZooKeeperStringSerializer.Serializer))
+            {
+                client.Connect();
+                WaitUntillIdle(client, 500);
+                var brokerTopicsListener = new BrokerTopicsListener(client, mappings, brokers, null);
+                client.Subscribe(ZooKeeperClient.DefaultBrokerIdsPath, brokerTopicsListener);
+                client.Subscribe(ZooKeeperClient.DefaultBrokerTopicsPath, brokerTopicsListener);
+                client.CreatePersistent(brokerPath, true);
+                client.WriteData(brokerPath, "192.168.1.39-1310449279123:192.168.1.39:9102");
+                client.CreatePersistent(topicPath, true);
+                WaitUntillIdle(client, 500);
+                Assert.IsTrue(brokers.ContainsKey(2345));
+                Assert.IsTrue(mappings.ContainsKey(CurrentTestTopic));
+                client.CreatePersistent(topicBrokerPath, true);
+                client.WriteData(topicBrokerPath, 5);
+                WaitUntillIdle(client, 500);
+                client.UnsubscribeAll();
+                client.DeleteRecursive(brokerPath);
+                client.DeleteRecursive(topicPath);
+            }
+
+            Assert.IsTrue(brokers.ContainsKey(2345));
+            Assert.IsTrue(mappings.Keys.Contains(CurrentTestTopic));
+            Assert.AreEqual(5, mappings[CurrentTestTopic].Count);
+        }
+
+        [Test]
+        public void WhenSessionIsExpiredListenerRecreatesEphemeralNodes()
+        {
+            {
+                var producerConfig = new ProducerConfig(clientConfig);
+                IDictionary<string, SortedSet<Partition>> mappings;
+                IDictionary<int, Broker> brokers;
+                IDictionary<string, SortedSet<Partition>> mappings2;
+                IDictionary<int, Broker> brokers2;
+                using (IZooKeeperClient client = new ZooKeeperClient(
+                    producerConfig.ZkConnect,
+                    producerConfig.ZkSessionTimeoutMs,
+                    ZooKeeperStringSerializer.Serializer))
+                {
+                    using (var brokerPartitionInfo = new ZKBrokerPartitionInfo(client))
+                    {
+                        brokers = brokerPartitionInfo.GetAllBrokerInfo();
+                        mappings =
+                            ReflectionHelper.GetInstanceField<IDictionary<string, SortedSet<Partition>>>(
+                                "topicBrokerPartitions", brokerPartitionInfo);
+                        Assert.NotNull(brokers);
+                        Assert.Greater(brokers.Count, 0);
+                        Assert.NotNull(mappings);
+                        Assert.Greater(mappings.Count, 0);
+                        client.Process(new WatchedEvent(KeeperState.Expired, EventType.None, null));
+                        WaitUntillIdle(client, 3000);
+                        brokers2 = brokerPartitionInfo.GetAllBrokerInfo();
+                        mappings2 =
+                            ReflectionHelper.GetInstanceField<IDictionary<string, SortedSet<Partition>>>(
+                                "topicBrokerPartitions", brokerPartitionInfo);
+                    }
+                }
+
+                Assert.NotNull(brokers2);
+                Assert.Greater(brokers2.Count, 0);
+                Assert.NotNull(mappings2);
+                Assert.Greater(mappings2.Count, 0);
+                Assert.AreEqual(brokers.Count, brokers2.Count);
+                Assert.AreEqual(mappings.Count, mappings2.Count);
+            }
+        }
+
+        [Test]
+        public void WhenNewTopicIsAddedZKBrokerPartitionInfoUpdatesMappings()
+        {
+            var producerConfig = new ProducerConfig(clientConfig);
+            IDictionary<string, SortedSet<Partition>> mappings;
+            string topicPath = ZooKeeperClient.DefaultBrokerTopicsPath + "/" + CurrentTestTopic;
+
+            using (IZooKeeperClient client = new ZooKeeperClient(
+                producerConfig.ZkConnect,
+                producerConfig.ZkSessionTimeoutMs,
+                ZooKeeperStringSerializer.Serializer))
+            {
+                using (var brokerPartitionInfo = new ZKBrokerPartitionInfo(client))
+                {
+                    mappings =
+                        ReflectionHelper.GetInstanceField<IDictionary<string, SortedSet<Partition>>>(
+                            "topicBrokerPartitions", brokerPartitionInfo);
+                    client.CreatePersistent(topicPath, true);
+                    WaitUntillIdle(client, 500);
+                    client.UnsubscribeAll();
+                    client.DeleteRecursive(topicPath);
+                }
+            }
+
+            Assert.NotNull(mappings);
+            Assert.Greater(mappings.Count, 0);
+            Assert.IsTrue(mappings.ContainsKey(CurrentTestTopic));
+        }
+
+        [Test]
+        public void WhenNewBrokerIsAddedZKBrokerPartitionInfoUpdatesBrokersList()
+        {
+            var producerConfig = new ProducerConfig(clientConfig);
+            IDictionary<int, Broker> brokers;
+            string brokerPath = ZooKeeperClient.DefaultBrokerIdsPath + "/" + 2345;
+            using (IZooKeeperClient client = new ZooKeeperClient(
+                producerConfig.ZkConnect,
+                producerConfig.ZkSessionTimeoutMs,
+                ZooKeeperStringSerializer.Serializer))
+            {
+                using (var brokerPartitionInfo = new ZKBrokerPartitionInfo(client))
+                {
+                    brokers = brokerPartitionInfo.GetAllBrokerInfo();
+                    client.CreatePersistent(brokerPath, true);
+                    client.WriteData(brokerPath, "192.168.1.39-1310449279123:192.168.1.39:9102");
+                    WaitUntillIdle(client, 500);
+                    client.UnsubscribeAll();
+                    client.DeleteRecursive(brokerPath);
+                }
+            }
+
+            Assert.NotNull(brokers);
+            Assert.Greater(brokers.Count, 0);
+            Assert.IsTrue(brokers.ContainsKey(2345));
+            Assert.AreEqual("192.168.1.39", brokers[2345].Host);
+            Assert.AreEqual(9102, brokers[2345].Port);
+            Assert.AreEqual(2345, brokers[2345].Id);
+        }
+
+        [Test]
+        public void WhenBrokerIsRemovedZKBrokerPartitionInfoUpdatesBrokersList()
+        {
+            var producerConfig = new ProducerConfig(clientConfig);
+            IDictionary<int, Broker> brokers;
+            string brokerPath = ZooKeeperClient.DefaultBrokerIdsPath + "/" + 2345;
+            using (IZooKeeperClient client = new ZooKeeperClient(
+                producerConfig.ZkConnect,
+                producerConfig.ZkSessionTimeoutMs,
+                ZooKeeperStringSerializer.Serializer))
+            {
+                using (var brokerPartitionInfo = new ZKBrokerPartitionInfo(client))
+                {
+                    brokers = brokerPartitionInfo.GetAllBrokerInfo();
+                    client.CreatePersistent(brokerPath, true);
+                    client.WriteData(brokerPath, "192.168.1.39-1310449279123:192.168.1.39:9102");
+                    WaitUntillIdle(client, 500);
+                    Assert.NotNull(brokers);
+                    Assert.Greater(brokers.Count, 0);
+                    Assert.IsTrue(brokers.ContainsKey(2345));
+                    client.DeleteRecursive(brokerPath);
+                    WaitUntillIdle(client, 500);
+                }
+            }
+
+            Assert.NotNull(brokers);
+            Assert.Greater(brokers.Count, 0);
+            Assert.IsFalse(brokers.ContainsKey(2345));
+        }
+
+        [Test]
+        public void WhenNewBrokerInTopicIsAddedZKBrokerPartitionInfoUpdatesMappings()
+        {
+            var producerConfig = new ProducerConfig(clientConfig);
+            IDictionary<string, SortedSet<Partition>> mappings;
+            IDictionary<int, Broker> brokers;
+            string brokerPath = ZooKeeperClient.DefaultBrokerIdsPath + "/" + 2345;
+            string topicPath = ZooKeeperClient.DefaultBrokerTopicsPath + "/" + CurrentTestTopic;
+            string topicBrokerPath = topicPath + "/" + 2345;
+            using (IZooKeeperClient client = new ZooKeeperClient(
+                producerConfig.ZkConnect,
+                producerConfig.ZkSessionTimeoutMs,
+                ZooKeeperStringSerializer.Serializer))
+            {
+                using (var brokerPartitionInfo = new ZKBrokerPartitionInfo(client))
+                {
+                    brokers = brokerPartitionInfo.GetAllBrokerInfo();
+                    mappings =
+                        ReflectionHelper.GetInstanceField<IDictionary<string, SortedSet<Partition>>>(
+                            "topicBrokerPartitions", brokerPartitionInfo);
+                    client.CreatePersistent(brokerPath, true);
+                    client.WriteData(brokerPath, "192.168.1.39-1310449279123:192.168.1.39:9102");
+                    client.CreatePersistent(topicPath, true);
+                    WaitUntillIdle(client, 500);
+                    Assert.IsTrue(brokers.ContainsKey(2345));
+                    Assert.IsTrue(mappings.ContainsKey(CurrentTestTopic));
+                    client.CreatePersistent(topicBrokerPath, true);
+                    client.WriteData(topicBrokerPath, 5);
+                    WaitUntillIdle(client, 500);
+                    client.UnsubscribeAll();
+                    client.DeleteRecursive(brokerPath);
+                    client.DeleteRecursive(topicPath);
+                }
+            }
+
+            Assert.NotNull(brokers);
+            Assert.Greater(brokers.Count, 0);
+            Assert.NotNull(mappings);
+            Assert.Greater(mappings.Count, 0);
+            Assert.IsTrue(brokers.ContainsKey(2345));
+            Assert.IsTrue(mappings.Keys.Contains(CurrentTestTopic));
+            Assert.AreEqual(5, mappings[CurrentTestTopic].Count);
+        }
+    }
+}

Added: incubator/kafka/trunk/clients/csharp/src/Kafka/Tests/Kafka.Client.IntegrationTests/ZooKeeperAwareProducerTests.cs
URL: http://svn.apache.org/viewvc/incubator/kafka/trunk/clients/csharp/src/Kafka/Tests/Kafka.Client.IntegrationTests/ZooKeeperAwareProducerTests.cs?rev=1173797&view=auto
==============================================================================
--- incubator/kafka/trunk/clients/csharp/src/Kafka/Tests/Kafka.Client.IntegrationTests/ZooKeeperAwareProducerTests.cs (added)
+++ incubator/kafka/trunk/clients/csharp/src/Kafka/Tests/Kafka.Client.IntegrationTests/ZooKeeperAwareProducerTests.cs Wed Sep 21 19:17:19 2011
@@ -0,0 +1,238 @@
+/*
+ * Copyright 2011 LinkedIn
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+*/
+
+namespace Kafka.Client.IntegrationTests
+{
+    using System.Collections.Generic;
+    using System.Linq;
+    using System.Text;
+    using System.Threading;
+    using Kafka.Client.Cfg;
+    using Kafka.Client.Consumers;
+    using Kafka.Client.Messages;
+    using Kafka.Client.Producers;
+    using Kafka.Client.Requests;
+    using Kafka.Client.Serialization;
+    using NUnit.Framework;
+
+    [TestFixture]
+    public class ZooKeeperAwareProducerTests : IntegrationFixtureBase
+    {
+        /// <summary>
+        /// Kafka Client configuration
+        /// </summary>
+        private KafkaClientConfiguration clientConfig;
+
+        /// <summary>
+        /// Maximum amount of time to wait trying to get a specific test message from Kafka server (in miliseconds)
+        /// </summary>
+        private readonly int MaxTestWaitTimeInMiliseconds = 5000;
+
+        [TestFixtureSetUp]
+        public void SetUp()
+        {
+            clientConfig = KafkaClientConfiguration.GetConfiguration();
+        }
+
+        [Test]
+        public void ZKAwareProducerSends1Message()
+        {
+            int totalWaitTimeInMiliseconds = 0;
+            int waitSingle = 100;
+            var originalMessage = new Message(Encoding.UTF8.GetBytes("TestData"));
+
+            var multipleBrokersHelper = new TestMultipleBrokersHelper(CurrentTestTopic);
+            multipleBrokersHelper.GetCurrentOffsets();
+
+            var producerConfig = new ProducerConfig(clientConfig);
+            var mockPartitioner = new MockAlwaysZeroPartitioner();
+            using (var producer = new Producer<string, Message>(producerConfig, mockPartitioner, new DefaultEncoder()))
+            {
+                var producerData = new ProducerData<string, Message>(
+                    CurrentTestTopic, "somekey", new List<Message>() { originalMessage });
+                producer.Send(producerData);
+
+                while (!multipleBrokersHelper.CheckIfAnyBrokerHasChanged())
+                {
+                    totalWaitTimeInMiliseconds += waitSingle;
+                    Thread.Sleep(waitSingle);
+                    if (totalWaitTimeInMiliseconds > MaxTestWaitTimeInMiliseconds)
+                    {
+                        Assert.Fail("None of the brokers changed their offset after sending a message");
+                    }
+                }
+
+                totalWaitTimeInMiliseconds = 0;
+
+                var consumerConfig = new ConsumerConfig(clientConfig)
+                    {
+                        Host = multipleBrokersHelper.BrokerThatHasChanged.Address,
+                        Port = multipleBrokersHelper.BrokerThatHasChanged.Port
+                    };
+                IConsumer consumer = new Consumers.Consumer(consumerConfig);
+                var request = new FetchRequest(CurrentTestTopic, 0, multipleBrokersHelper.OffsetFromBeforeTheChange);
+
+                BufferedMessageSet response;
+
+                while (true)
+                {
+                    Thread.Sleep(waitSingle);
+                    response = consumer.Fetch(request);
+                    if (response != null & response.Messages.Count() > 0)
+                    {
+                        break;
+                    }
+
+                    totalWaitTimeInMiliseconds += waitSingle;
+                    if (totalWaitTimeInMiliseconds >= MaxTestWaitTimeInMiliseconds)
+                    {
+                        break;
+                    }
+                }
+
+                Assert.NotNull(response);
+                Assert.AreEqual(1, response.Messages.Count());
+                Assert.AreEqual(originalMessage.ToString(), response.Messages.First().ToString());
+            }
+        }
+
+        [Test]
+        public void ZkAwareProducerSends3Messages()
+        {
+            int totalWaitTimeInMiliseconds = 0;
+            int waitSingle = 100;
+            var originalMessage1 = new Message(Encoding.UTF8.GetBytes("TestData1"));
+            var originalMessage2 = new Message(Encoding.UTF8.GetBytes("TestData2"));
+            var originalMessage3 = new Message(Encoding.UTF8.GetBytes("TestData3"));
+            var originalMessageList = new List<Message> { originalMessage1, originalMessage2, originalMessage3 };
+
+            var multipleBrokersHelper = new TestMultipleBrokersHelper(CurrentTestTopic);
+            multipleBrokersHelper.GetCurrentOffsets();
+
+            var producerConfig = new ProducerConfig(clientConfig);
+            var mockPartitioner = new MockAlwaysZeroPartitioner();
+            using (var producer = new Producer<string, Message>(producerConfig, mockPartitioner, new DefaultEncoder()))
+            {
+                var producerData = new ProducerData<string, Message>(CurrentTestTopic, "somekey", originalMessageList);
+                producer.Send(producerData);
+
+                while (!multipleBrokersHelper.CheckIfAnyBrokerHasChanged())
+                {
+                    totalWaitTimeInMiliseconds += waitSingle;
+                    Thread.Sleep(waitSingle);
+                    if (totalWaitTimeInMiliseconds > MaxTestWaitTimeInMiliseconds)
+                    {
+                        Assert.Fail("None of the brokers changed their offset after sending a message");
+                    }
+                }
+
+                totalWaitTimeInMiliseconds = 0;
+
+                var consumerConfig = new ConsumerConfig(clientConfig)
+                    {
+                        Host = multipleBrokersHelper.BrokerThatHasChanged.Address,
+                        Port = multipleBrokersHelper.BrokerThatHasChanged.Port
+                    };
+                IConsumer consumer = new Consumers.Consumer(consumerConfig);
+                var request = new FetchRequest(CurrentTestTopic, 0, multipleBrokersHelper.OffsetFromBeforeTheChange);
+                BufferedMessageSet response;
+
+                while (true)
+                {
+                    Thread.Sleep(waitSingle);
+                    response = consumer.Fetch(request);
+                    if (response != null && response.Messages.Count() > 2)
+                    {
+                        break;
+                    }
+
+                    totalWaitTimeInMiliseconds += waitSingle;
+                    if (totalWaitTimeInMiliseconds >= MaxTestWaitTimeInMiliseconds)
+                    {
+                        break;
+                    }
+                }
+
+                Assert.NotNull(response);
+                Assert.AreEqual(3, response.Messages.Count());
+                Assert.AreEqual(originalMessage1.ToString(), response.Messages.First().ToString());
+                Assert.AreEqual(originalMessage2.ToString(), response.Messages.Skip(1).First().ToString());
+                Assert.AreEqual(originalMessage3.ToString(), response.Messages.Skip(2).First().ToString());
+            }
+        }
+
+        [Test]
+        public void ZkAwareProducerSends1MessageUsingNotDefaultEncoder()
+        {
+            int totalWaitTimeInMiliseconds = 0;
+            int waitSingle = 100;
+            string originalMessage = "TestData";
+
+            var multipleBrokersHelper = new TestMultipleBrokersHelper(CurrentTestTopic);
+            multipleBrokersHelper.GetCurrentOffsets();
+
+            var producerConfig = new ProducerConfig(clientConfig);
+            var mockPartitioner = new MockAlwaysZeroPartitioner();
+            using (var producer = new Producer<string, string>(producerConfig, mockPartitioner, new StringEncoder(), null))
+            {
+                var producerData = new ProducerData<string, string>(
+                    CurrentTestTopic, "somekey", new List<string> { originalMessage });
+                producer.Send(producerData);
+
+                while (!multipleBrokersHelper.CheckIfAnyBrokerHasChanged())
+                {
+                    totalWaitTimeInMiliseconds += waitSingle;
+                    Thread.Sleep(waitSingle);
+                    if (totalWaitTimeInMiliseconds > MaxTestWaitTimeInMiliseconds)
+                    {
+                        Assert.Fail("None of the brokers changed their offset after sending a message");
+                    }
+                }
+
+                totalWaitTimeInMiliseconds = 0;
+
+                var consumerConfig = new ConsumerConfig(clientConfig)
+                    {
+                        Host = multipleBrokersHelper.BrokerThatHasChanged.Address,
+                        Port = multipleBrokersHelper.BrokerThatHasChanged.Port
+                    };
+                IConsumer consumer = new Consumers.Consumer(consumerConfig);
+                var request = new FetchRequest(CurrentTestTopic, 0, multipleBrokersHelper.OffsetFromBeforeTheChange);
+                BufferedMessageSet response;
+
+                while (true)
+                {
+                    Thread.Sleep(waitSingle);
+                    response = consumer.Fetch(request);
+                    if (response != null && response.Messages.Count() > 0)
+                    {
+                        break;
+                    }
+
+                    totalWaitTimeInMiliseconds += waitSingle;
+                    if (totalWaitTimeInMiliseconds >= MaxTestWaitTimeInMiliseconds)
+                    {
+                        break;
+                    }
+                }
+
+                Assert.NotNull(response);
+                Assert.AreEqual(1, response.Messages.Count());
+                Assert.AreEqual(originalMessage, Encoding.UTF8.GetString(response.Messages.First().Payload));
+            }
+        }
+    }
+}

Added: incubator/kafka/trunk/clients/csharp/src/Kafka/Tests/Kafka.Client.IntegrationTests/ZooKeeperClientTests.cs
URL: http://svn.apache.org/viewvc/incubator/kafka/trunk/clients/csharp/src/Kafka/Tests/Kafka.Client.IntegrationTests/ZooKeeperClientTests.cs?rev=1173797&view=auto
==============================================================================
--- incubator/kafka/trunk/clients/csharp/src/Kafka/Tests/Kafka.Client.IntegrationTests/ZooKeeperClientTests.cs (added)
+++ incubator/kafka/trunk/clients/csharp/src/Kafka/Tests/Kafka.Client.IntegrationTests/ZooKeeperClientTests.cs Wed Sep 21 19:17:19 2011
@@ -0,0 +1,365 @@
+/*
+ * Copyright 2011 LinkedIn
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+*/
+
+namespace Kafka.Client.IntegrationTests
+{
+    using System;
+    using System.Collections.Generic;
+    using System.Reflection;
+    using System.Threading;
+    using Kafka.Client.Cfg;
+    using Kafka.Client.Exceptions;
+    using Kafka.Client.Utils;
+    using Kafka.Client.ZooKeeperIntegration;
+    using Kafka.Client.ZooKeeperIntegration.Events;
+    using Kafka.Client.ZooKeeperIntegration.Listeners;
+    using log4net;
+    using NUnit.Framework;
+    using ZooKeeperNet;
+
+    [TestFixture]
+    internal class ZooKeeperClientTests : IntegrationFixtureBase, IZooKeeperDataListener, IZooKeeperStateListener, IZooKeeperChildListener
+    {
+        private static readonly ILog Logger = LogManager.GetLogger(MethodBase.GetCurrentMethod().DeclaringType);
+
+        private KafkaClientConfiguration clientConfig;
+        private readonly IList<ZooKeeperEventArgs> events = new List<ZooKeeperEventArgs>();
+
+        [TestFixtureSetUp]
+        public void SetUp()
+        {
+            clientConfig = KafkaClientConfiguration.GetConfiguration();
+        }
+
+        [SetUp]
+        public void TestSetup()
+        {
+            this.events.Clear();
+        }
+
+        [Test]
+        public void ZooKeeperClientCreateWorkerThreadsOnBeingCreated()
+        {
+            var producerConfig = new ProducerConfig(clientConfig);
+            using (IZooKeeperClient client = new ZooKeeperClient(producerConfig.ZkConnect, producerConfig.ZkSessionTimeoutMs, ZooKeeperStringSerializer.Serializer))
+            {
+                client.Connect();
+                var eventWorker = ReflectionHelper.GetInstanceField<Thread>("eventWorker", client);
+                var zooKeeperWorker = ReflectionHelper.GetInstanceField<Thread>("zooKeeperEventWorker", client);
+                Assert.NotNull(eventWorker);
+                Assert.NotNull(zooKeeperWorker);
+            }
+        }
+
+        [Test]
+        public void ZooKeeperClientFailsWhenCreatedWithWrongConnectionInfo()
+        {
+            var producerConfig = new ProducerConfig(clientConfig);
+            using (IZooKeeperClient client = new ZooKeeperClient("random text", producerConfig.ZkSessionTimeoutMs, ZooKeeperStringSerializer.Serializer))
+            {
+                Assert.Throws<FormatException>(client.Connect);
+            }
+        }
+
+        [Test]
+        public void WhenStateChangedToConnectedStateListenerFires()
+        {
+            var producerConfig = new ProducerConfig(clientConfig);
+            using (IZooKeeperClient client = new ZooKeeperClient(producerConfig.ZkConnect, producerConfig.ZkSessionTimeoutMs, ZooKeeperStringSerializer.Serializer))
+            {
+                client.Subscribe(this);
+                client.Connect();
+                WaitUntillIdle(client, 500);
+            }
+
+            Assert.AreEqual(1, this.events.Count);
+            ZooKeeperEventArgs e = this.events[0];
+            Assert.AreEqual(ZooKeeperEventTypes.StateChanged, e.Type);
+            Assert.IsInstanceOf<ZooKeeperStateChangedEventArgs>(e);
+            Assert.AreEqual(((ZooKeeperStateChangedEventArgs)e).State, KeeperState.SyncConnected);
+        }
+
+        [Test]
+        public void WhenStateChangedToDisconnectedStateListenerFires()
+        {
+            var producerConfig = new ProducerConfig(clientConfig);
+            using (IZooKeeperClient client = new ZooKeeperClient(producerConfig.ZkConnect, producerConfig.ZkSessionTimeoutMs, ZooKeeperStringSerializer.Serializer))
+            {
+                client.Subscribe(this);
+                client.Connect();
+                WaitUntillIdle(client, 500);
+                client.Process(new WatchedEvent(KeeperState.Disconnected, EventType.None, null));
+                WaitUntillIdle(client, 500);
+            }
+
+            Assert.AreEqual(2, this.events.Count);
+            ZooKeeperEventArgs e = this.events[1];
+            Assert.AreEqual(ZooKeeperEventTypes.StateChanged, e.Type);
+            Assert.IsInstanceOf<ZooKeeperStateChangedEventArgs>(e);
+            Assert.AreEqual(((ZooKeeperStateChangedEventArgs)e).State, KeeperState.Disconnected);
+        }
+
+        [Test]
+        public void WhenStateChangedToExpiredStateAndSessionListenersFire()
+        {
+            var producerConfig = new ProducerConfig(clientConfig);
+            using (IZooKeeperClient client = new ZooKeeperClient(producerConfig.ZkConnect, producerConfig.ZkSessionTimeoutMs, ZooKeeperStringSerializer.Serializer))
+            {
+                client.Subscribe(this);
+                client.Connect();
+                WaitUntillIdle(client, 500);
+                client.Process(new WatchedEvent(KeeperState.Expired, EventType.None, null));
+                WaitUntillIdle(client, 3000);
+            }
+
+            Assert.AreEqual(4, this.events.Count);
+            ZooKeeperEventArgs e = this.events[1];
+            Assert.AreEqual(ZooKeeperEventTypes.StateChanged, e.Type);
+            Assert.IsInstanceOf<ZooKeeperStateChangedEventArgs>(e);
+            Assert.AreEqual(((ZooKeeperStateChangedEventArgs)e).State, KeeperState.Expired);
+            e = this.events[2];
+            Assert.AreEqual(ZooKeeperEventTypes.SessionCreated, e.Type);
+            Assert.IsInstanceOf<ZooKeeperSessionCreatedEventArgs>(e);
+            e = this.events[3];
+            Assert.AreEqual(ZooKeeperEventTypes.StateChanged, e.Type);
+            Assert.IsInstanceOf<ZooKeeperStateChangedEventArgs>(e);
+            Assert.AreEqual(((ZooKeeperStateChangedEventArgs)e).State, KeeperState.SyncConnected);
+        }
+
+        [Test]
+        public void WhenSessionExpiredClientReconnects()
+        {
+            var producerConfig = new ProducerConfig(clientConfig);
+            IZooKeeperConnection conn1;
+            IZooKeeperConnection conn2;
+            using (IZooKeeperClient client = new ZooKeeperClient(producerConfig.ZkConnect, producerConfig.ZkSessionTimeoutMs, ZooKeeperStringSerializer.Serializer))
+            {
+                client.Connect();
+                conn1 = ReflectionHelper.GetInstanceField<ZooKeeperConnection>("connection", client);
+                client.Process(new WatchedEvent(KeeperState.Expired, EventType.None, null));
+                WaitUntillIdle(client, 1000);
+                conn2 = ReflectionHelper.GetInstanceField<ZooKeeperConnection>("connection", client);
+            }
+
+            Assert.AreNotEqual(conn1, conn2);
+        }
+
+        [Test]
+        public void ZooKeeperClientChecksIfPathExists()
+        {
+            var producerConfig = new ProducerConfig(clientConfig);
+            using (IZooKeeperClient client = new ZooKeeperClient(producerConfig.ZkConnect, producerConfig.ZkSessionTimeoutMs, ZooKeeperStringSerializer.Serializer))
+            {
+                client.Connect();
+                Assert.IsTrue(client.Exists(ZooKeeperClient.DefaultBrokerTopicsPath, false));
+            }
+        }
+
+        [Test]
+        public void ZooKeeperClientCreatesANewPathAndDeletesIt()
+        {
+            var producerConfig = new ProducerConfig(clientConfig);
+            using (IZooKeeperClient client = new ZooKeeperClient(producerConfig.ZkConnect, producerConfig.ZkSessionTimeoutMs, ZooKeeperStringSerializer.Serializer))
+            {
+                client.Connect();
+                string myPath = "/" + Guid.NewGuid();
+                client.CreatePersistent(myPath, false);
+                Assert.IsTrue(client.Exists(myPath));
+                client.Delete(myPath);
+                Assert.IsFalse(client.Exists(myPath));
+            }
+        }
+
+        [Test]
+        public void WhenChildIsCreatedChilListenerOnParentFires()
+        {
+            string myPath = "/" + Guid.NewGuid();
+            var producerConfig = new ProducerConfig(clientConfig);
+            using (IZooKeeperClient client = new ZooKeeperClient(producerConfig.ZkConnect, producerConfig.ZkSessionTimeoutMs, ZooKeeperStringSerializer.Serializer))
+            {
+                client.Connect();
+                WaitUntillIdle(client, 500);
+                client.Subscribe("/", this as IZooKeeperChildListener);
+                client.CreatePersistent(myPath, true);
+                WaitUntillIdle(client, 500);
+                client.UnsubscribeAll();
+                client.Delete(myPath);
+            }
+
+            Assert.AreEqual(1, this.events.Count);
+            ZooKeeperEventArgs e = this.events[0];
+            Assert.AreEqual(ZooKeeperEventTypes.ChildChanged, e.Type);
+            Assert.IsInstanceOf<ZooKeeperChildChangedEventArgs>(e);
+            Assert.AreEqual(((ZooKeeperChildChangedEventArgs)e).Path, "/");
+            Assert.Greater(((ZooKeeperChildChangedEventArgs)e).Children.Count, 0);
+            Assert.IsTrue(((ZooKeeperChildChangedEventArgs)e).Children.Contains(myPath.Replace("/", string.Empty)));
+        }
+
+        [Test]
+        public void WhenChildIsDeletedChildListenerOnParentFires()
+        {
+            string myPath = "/" + Guid.NewGuid();
+            var producerConfig = new ProducerConfig(clientConfig);
+            using (IZooKeeperClient client = new ZooKeeperClient(producerConfig.ZkConnect, producerConfig.ZkSessionTimeoutMs, ZooKeeperStringSerializer.Serializer))
+            {
+                client.Connect();
+                client.CreatePersistent(myPath, true);
+                WaitUntillIdle(client, 500);
+                client.Subscribe("/", this as IZooKeeperChildListener);
+                client.Delete(myPath);
+                WaitUntillIdle(client, 500);
+            }
+
+            Assert.AreEqual(1, this.events.Count);
+            ZooKeeperEventArgs e = this.events[0];
+            Assert.AreEqual(ZooKeeperEventTypes.ChildChanged, e.Type);
+            Assert.IsInstanceOf<ZooKeeperChildChangedEventArgs>(e);
+            Assert.AreEqual(((ZooKeeperChildChangedEventArgs)e).Path, "/");
+            Assert.Greater(((ZooKeeperChildChangedEventArgs)e).Children.Count, 0);
+            Assert.IsFalse(((ZooKeeperChildChangedEventArgs)e).Children.Contains(myPath.Replace("/", string.Empty)));
+        }
+
+        [Test]
+        public void WhenZNodeIsDeletedChildAndDataDeletedListenersFire()
+        {
+            var producerConfig = new ProducerConfig(clientConfig);
+            string myPath = "/" + Guid.NewGuid();
+            using (IZooKeeperClient client = new ZooKeeperClient(producerConfig.ZkConnect, producerConfig.ZkSessionTimeoutMs, ZooKeeperStringSerializer.Serializer))
+            {
+                client.Connect();
+                client.CreatePersistent(myPath, true);
+                WaitUntillIdle(client, 500);
+                client.Subscribe(myPath, this as IZooKeeperChildListener);
+                client.Subscribe(myPath, this as IZooKeeperDataListener);
+                client.Delete(myPath);
+                WaitUntillIdle(client, 500);
+            }
+
+            Assert.AreEqual(2, this.events.Count);
+            ZooKeeperEventArgs e = this.events[0];
+            Assert.AreEqual(ZooKeeperEventTypes.ChildChanged, e.Type);
+            Assert.IsInstanceOf<ZooKeeperChildChangedEventArgs>(e);
+            Assert.AreEqual(((ZooKeeperChildChangedEventArgs)e).Path, myPath);
+            Assert.IsNull(((ZooKeeperChildChangedEventArgs)e).Children);
+            e = this.events[1];
+            Assert.AreEqual(ZooKeeperEventTypes.DataChanged, e.Type);
+            Assert.IsInstanceOf<ZooKeeperDataChangedEventArgs>(e);
+            Assert.AreEqual(((ZooKeeperDataChangedEventArgs)e).Path, myPath);
+            Assert.IsNull(((ZooKeeperDataChangedEventArgs)e).Data);
+        }
+
+        [Test]
+        public void ZooKeeperClientCreatesAChildAndGetsChildren()
+        {
+            var producerConfig = new ProducerConfig(clientConfig);
+            using (IZooKeeperClient client = new ZooKeeperClient(producerConfig.ZkConnect, producerConfig.ZkSessionTimeoutMs, ZooKeeperStringSerializer.Serializer))
+            {
+                client.Connect();
+                string child = Guid.NewGuid().ToString();
+                string myPath = "/" + child;
+                client.CreatePersistent(myPath, false);
+                IList<string> children = client.GetChildren("/", false);
+                int countChildren = client.CountChildren("/");
+                Assert.Greater(children.Count, 0);
+                Assert.AreEqual(children.Count, countChildren);
+                Assert.IsTrue(children.Contains(child));
+                client.Delete(myPath);
+            }
+        }
+
+        [Test]
+        public void WhenDataChangedDataListenerFires()
+        {
+            var producerConfig = new ProducerConfig(clientConfig);
+            string myPath = "/" + Guid.NewGuid();
+            string sourceData = "my test data";
+            string resultData;
+            using (IZooKeeperClient client = new ZooKeeperClient(producerConfig.ZkConnect, producerConfig.ZkSessionTimeoutMs, ZooKeeperStringSerializer.Serializer))
+            {
+                client.Connect();
+                client.CreatePersistent(myPath, true);
+                WaitUntillIdle(client, 500);
+                client.Subscribe(myPath, this as IZooKeeperDataListener);
+                client.Subscribe(myPath, this as IZooKeeperChildListener);
+                client.WriteData(myPath, sourceData);
+                WaitUntillIdle(client, 500);
+                client.UnsubscribeAll();
+                resultData = client.ReadData<string>(myPath);
+                client.Delete(myPath);
+            }
+
+            Assert.IsTrue(!string.IsNullOrEmpty(resultData));
+            Assert.AreEqual(sourceData, resultData);
+            Assert.AreEqual(1, this.events.Count);
+            ZooKeeperEventArgs e = this.events[0];
+            Assert.AreEqual(ZooKeeperEventTypes.DataChanged, e.Type);
+            Assert.IsInstanceOf<ZooKeeperDataChangedEventArgs>(e);
+            Assert.AreEqual(((ZooKeeperDataChangedEventArgs)e).Path, myPath);
+            Assert.IsNotNull(((ZooKeeperDataChangedEventArgs)e).Data);
+            Assert.AreEqual(((ZooKeeperDataChangedEventArgs)e).Data, sourceData);
+        }
+
+        [Test]
+        [ExpectedException(typeof(ZooKeeperException))]
+        public void WhenClientWillNotConnectWithinGivenTimeThrows()
+        {
+            var producerConfig = new ProducerConfig(clientConfig);
+            using (IZooKeeperClient client = 
+                new ZooKeeperClient(
+                    producerConfig.ZkConnect, 
+                    producerConfig.ZkSessionTimeoutMs, 
+                    ZooKeeperStringSerializer.Serializer,
+                    1))
+            {
+                client.Connect();
+            }
+        }
+
+        public void HandleDataChange(ZooKeeperDataChangedEventArgs args)
+        {
+            Logger.Debug(args + " reach test event handler");
+            this.events.Add(args);
+        }
+
+        public void HandleDataDelete(ZooKeeperDataChangedEventArgs args)
+        {
+            Logger.Debug(args + " reach test event handler");
+            this.events.Add(args);
+        }
+
+        public void HandleStateChanged(ZooKeeperStateChangedEventArgs args)
+        {
+            Logger.Debug(args + " reach test event handler");
+            this.events.Add(args);
+        }
+
+        public void HandleSessionCreated(ZooKeeperSessionCreatedEventArgs args)
+        {
+            Logger.Debug(args + " reach test event handler");
+            this.events.Add(args);
+        }
+
+        public void HandleChildChange(ZooKeeperChildChangedEventArgs args)
+        {
+            Logger.Debug(args + " reach test event handler");
+            this.events.Add(args);
+        }
+
+        public void ResetState()
+        {
+        }
+    }
+}

Added: incubator/kafka/trunk/clients/csharp/src/Kafka/Tests/Kafka.Client.IntegrationTests/ZooKeeperConnectionTests.cs
URL: http://svn.apache.org/viewvc/incubator/kafka/trunk/clients/csharp/src/Kafka/Tests/Kafka.Client.IntegrationTests/ZooKeeperConnectionTests.cs?rev=1173797&view=auto
==============================================================================
--- incubator/kafka/trunk/clients/csharp/src/Kafka/Tests/Kafka.Client.IntegrationTests/ZooKeeperConnectionTests.cs (added)
+++ incubator/kafka/trunk/clients/csharp/src/Kafka/Tests/Kafka.Client.IntegrationTests/ZooKeeperConnectionTests.cs Wed Sep 21 19:17:19 2011
@@ -0,0 +1,120 @@
+/*
+ * Copyright 2011 LinkedIn
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+*/
+
+namespace Kafka.Client.IntegrationTests
+{
+    using System;
+    using System.Collections.Generic;
+    using Kafka.Client.Cfg;
+    using Kafka.Client.ZooKeeperIntegration;
+    using NUnit.Framework;
+    using ZooKeeperNet;
+
+    [TestFixture]
+    public class ZooKeeperConnectionTests
+    {
+        private KafkaClientConfiguration clientConfig;
+
+        [TestFixtureSetUp]
+        public void SetUp()
+        {
+            clientConfig = KafkaClientConfiguration.GetConfiguration();
+        }
+
+        [Test]
+        public void ZooKeeperConnectionCreatesAndDeletesPath()
+        {
+            var producerConfig = new ProducerConfig(clientConfig);
+            using (IZooKeeperConnection connection = new ZooKeeperConnection(producerConfig.ZkConnect))
+            {
+                connection.Connect(null);
+                string pathName = "/" + Guid.NewGuid();
+                connection.Create(pathName, null, CreateMode.Persistent);
+                Assert.IsTrue(connection.Exists(pathName, false));
+                connection.Delete(pathName);
+                Assert.IsFalse(connection.Exists(pathName, false));
+            }
+        }
+
+        [Test]
+        public void ZooKeeperConnectionConnectsAndDisposes()
+        {
+            var producerConfig = new ProducerConfig(clientConfig);
+            IZooKeeperConnection connection;
+            using (connection = new ZooKeeperConnection(producerConfig.ZkConnect))
+            {
+                Assert.IsNull(connection.ClientState);
+                connection.Connect(null);
+                Assert.NotNull(connection.Client);
+                Assert.AreEqual(ZooKeeper.States.CONNECTING, connection.ClientState);
+            }
+
+            Assert.Null(connection.Client);
+        }
+
+        [Test]
+        public void ZooKeeperConnectionCreatesAndGetsCreateTime()
+        {
+            var producerConfig = new ProducerConfig(clientConfig);
+            using (IZooKeeperConnection connection = new ZooKeeperConnection(producerConfig.ZkConnect))
+            {
+                connection.Connect(null);
+                string pathName = "/" + Guid.NewGuid();
+                connection.Create(pathName, null, CreateMode.Persistent);
+                long createTime = connection.GetCreateTime(pathName);
+                Assert.Greater(createTime, 0);
+                connection.Delete(pathName);
+            }
+        }
+
+        [Test]
+        public void ZooKeeperConnectionCreatesAndGetsChildren()
+        {
+            var producerConfig = new ProducerConfig(clientConfig);
+            using (IZooKeeperConnection connection = new ZooKeeperConnection(producerConfig.ZkConnect))
+            {
+                connection.Connect(null);
+                string child = Guid.NewGuid().ToString();
+                string pathName = "/" + child;
+                connection.Create(pathName, null, CreateMode.Persistent);
+                IList<string> children = connection.GetChildren("/", false);
+                Assert.Greater(children.Count, 0);
+                Assert.IsTrue(children.Contains(child));
+                connection.Delete(pathName);
+            }
+        }
+
+        [Test]
+        public void ZooKeeperConnectionWritesAndReadsData()
+        {
+            var producerConfig = new ProducerConfig(clientConfig);
+            using (IZooKeeperConnection connection = new ZooKeeperConnection(producerConfig.ZkConnect))
+            {
+                connection.Connect(null);
+                string child = Guid.NewGuid().ToString();
+                string pathName = "/" + child;
+                connection.Create(pathName, null, CreateMode.Persistent);
+                var sourceData = new byte[2] { 1, 2 };
+                connection.WriteData(pathName, sourceData);
+                byte[] resultData = connection.ReadData(pathName, null, false);
+                Assert.IsNotNull(resultData);
+                Assert.AreEqual(sourceData[0], resultData[0]);
+                Assert.AreEqual(sourceData[1], resultData[1]);
+                connection.Delete(pathName);
+            }
+        }
+    }
+}

Modified: incubator/kafka/trunk/clients/csharp/src/Kafka/Tests/Kafka.Client.Tests/Kafka.Client.Tests.csproj
URL: http://svn.apache.org/viewvc/incubator/kafka/trunk/clients/csharp/src/Kafka/Tests/Kafka.Client.Tests/Kafka.Client.Tests.csproj?rev=1173797&r1=1173796&r2=1173797&view=diff
==============================================================================
--- incubator/kafka/trunk/clients/csharp/src/Kafka/Tests/Kafka.Client.Tests/Kafka.Client.Tests.csproj (original)
+++ incubator/kafka/trunk/clients/csharp/src/Kafka/Tests/Kafka.Client.Tests/Kafka.Client.Tests.csproj Wed Sep 21 19:17:19 2011
@@ -1,70 +1,108 @@
-<?xml version="1.0" encoding="utf-8"?>
-<Project ToolsVersion="4.0" DefaultTargets="Build" xmlns="http://schemas.microsoft.com/developer/msbuild/2003">
-  <PropertyGroup>
-    <Configuration Condition=" '$(Configuration)' == '' ">Debug</Configuration>
-    <Platform Condition=" '$(Platform)' == '' ">AnyCPU</Platform>
-    <ProductVersion>8.0.30703</ProductVersion>
-    <SchemaVersion>2.0</SchemaVersion>
-    <ProjectGuid>{9BA1A0BF-B207-4A11-8883-5F64B113C07D}</ProjectGuid>
-    <OutputType>Library</OutputType>
-    <AppDesignerFolder>Properties</AppDesignerFolder>
-    <RootNamespace>Kafka.Client.Tests</RootNamespace>
-    <AssemblyName>Kafka.Client.Tests</AssemblyName>
-    <TargetFrameworkVersion>v4.0</TargetFrameworkVersion>
-    <FileAlignment>512</FileAlignment>
-  </PropertyGroup>
-  <PropertyGroup Condition=" '$(Configuration)|$(Platform)' == 'Debug|AnyCPU' ">
-    <DebugSymbols>true</DebugSymbols>
-    <DebugType>full</DebugType>
-    <Optimize>false</Optimize>
-    <OutputPath>bin\Debug\</OutputPath>
-    <DefineConstants>DEBUG;TRACE</DefineConstants>
-    <ErrorReport>prompt</ErrorReport>
-    <WarningLevel>4</WarningLevel>
-  </PropertyGroup>
-  <PropertyGroup Condition=" '$(Configuration)|$(Platform)' == 'Release|AnyCPU' ">
-    <DebugType>pdbonly</DebugType>
-    <Optimize>true</Optimize>
-    <OutputPath>bin\Release\</OutputPath>
-    <DefineConstants>TRACE</DefineConstants>
-    <ErrorReport>prompt</ErrorReport>
-    <WarningLevel>4</WarningLevel>
-  </PropertyGroup>
-  <ItemGroup>
-    <Reference Include="nunit.framework, Version=2.5.9.10348, Culture=neutral, PublicKeyToken=96d09a1eb7f44a77, processorArchitecture=MSIL">
-      <SpecificVersion>False</SpecificVersion>
-      <HintPath>..\..\..\..\lib\nunit\2.5.9\nunit.framework.dll</HintPath>
-    </Reference>
-    <Reference Include="System" />
-    <Reference Include="System.Core" />
-    <Reference Include="System.Xml.Linq" />
-    <Reference Include="System.Data.DataSetExtensions" />
-    <Reference Include="Microsoft.CSharp" />
-    <Reference Include="System.Data" />
-    <Reference Include="System.Xml" />
-  </ItemGroup>
-  <ItemGroup>
-    <Compile Include="MessageTests.cs" />
-    <Compile Include="Properties\AssemblyInfo.cs" />
-    <Compile Include="Request\FetchRequestTests.cs" />
-    <Compile Include="Request\MultiFetchRequestTests.cs" />
-    <Compile Include="Request\MultiProducerRequestTests.cs" />
-    <Compile Include="Request\OffsetRequestTests.cs" />
-    <Compile Include="Request\ProducerRequestTests.cs" />
-    <Compile Include="Util\BitWorksTests.cs" />
-  </ItemGroup>
-  <ItemGroup>
-    <ProjectReference Include="..\..\Kafka.Client\Kafka.Client.csproj">
-      <Project>{A92DD03B-EE4F-4A78-9FB2-279B6348C7D2}</Project>
-      <Name>Kafka.Client</Name>
-    </ProjectReference>
-  </ItemGroup>
-  <Import Project="$(MSBuildToolsPath)\Microsoft.CSharp.targets" />
-  <!-- To modify your build process, add your task inside one of the targets below and uncomment it. 
-       Other similar extension points exist, see Microsoft.Common.targets.
-  <Target Name="BeforeBuild">
-  </Target>
-  <Target Name="AfterBuild">
-  </Target>
-  -->
+<?xml version="1.0" encoding="utf-8"?>
+<Project ToolsVersion="4.0" DefaultTargets="Build" xmlns="http://schemas.microsoft.com/developer/msbuild/2003">
+  <PropertyGroup>
+    <Configuration Condition=" '$(Configuration)' == '' ">Debug</Configuration>
+    <Platform Condition=" '$(Platform)' == '' ">AnyCPU</Platform>
+    <ProductVersion>8.0.30703</ProductVersion>
+    <SchemaVersion>2.0</SchemaVersion>
+    <ProjectGuid>{9BA1A0BF-B207-4A11-8883-5F64B113C07D}</ProjectGuid>
+    <OutputType>Library</OutputType>
+    <AppDesignerFolder>Properties</AppDesignerFolder>
+    <RootNamespace>Kafka.Client.Tests</RootNamespace>
+    <AssemblyName>Kafka.Client.Tests</AssemblyName>
+    <TargetFrameworkVersion>v4.0</TargetFrameworkVersion>
+    <FileAlignment>512</FileAlignment>
+    <CodeContractsAssemblyMode>0</CodeContractsAssemblyMode>
+  </PropertyGroup>
+  <PropertyGroup Condition=" '$(Configuration)|$(Platform)' == 'Debug|AnyCPU' ">
+    <DebugSymbols>true</DebugSymbols>
+    <DebugType>full</DebugType>
+    <Optimize>false</Optimize>
+    <OutputPath>bin\Debug\</OutputPath>
+    <DefineConstants>DEBUG;TRACE</DefineConstants>
+    <ErrorReport>prompt</ErrorReport>
+    <WarningLevel>4</WarningLevel>
+    <StyleCopTreatErrorsAsWarnings>true</StyleCopTreatErrorsAsWarnings>
+    <CodeContractsEnableRuntimeChecking>False</CodeContractsEnableRuntimeChecking>
+    <CodeContractsRuntimeOnlyPublicSurface>False</CodeContractsRuntimeOnlyPublicSurface>
+    <CodeContractsRuntimeThrowOnFailure>True</CodeContractsRuntimeThrowOnFailure>
+    <CodeContractsRuntimeCallSiteRequires>False</CodeContractsRuntimeCallSiteRequires>
+    <CodeContractsRuntimeSkipQuantifiers>False</CodeContractsRuntimeSkipQuantifiers>
+    <CodeContractsRunCodeAnalysis>False</CodeContractsRunCodeAnalysis>
+    <CodeContractsNonNullObligations>False</CodeContractsNonNullObligations>
+    <CodeContractsBoundsObligations>False</CodeContractsBoundsObligations>
+    <CodeContractsArithmeticObligations>False</CodeContractsArithmeticObligations>
+    <CodeContractsEnumObligations>False</CodeContractsEnumObligations>
+    <CodeContractsRedundantAssumptions>False</CodeContractsRedundantAssumptions>
+    <CodeContractsRunInBackground>True</CodeContractsRunInBackground>
+    <CodeContractsShowSquigglies>False</CodeContractsShowSquigglies>
+    <CodeContractsUseBaseLine>False</CodeContractsUseBaseLine>
+    <CodeContractsEmitXMLDocs>False</CodeContractsEmitXMLDocs>
+    <CodeContractsCustomRewriterAssembly />
+    <CodeContractsCustomRewriterClass />
+    <CodeContractsLibPaths />
+    <CodeContractsExtraRewriteOptions />
+    <CodeContractsExtraAnalysisOptions />
+    <CodeContractsBaseLineFile />
+    <CodeContractsCacheAnalysisResults>False</CodeContractsCacheAnalysisResults>
+    <CodeContractsRuntimeCheckingLevel>Full</CodeContractsRuntimeCheckingLevel>
+    <CodeContractsReferenceAssembly>%28none%29</CodeContractsReferenceAssembly>
+    <CodeContractsAnalysisWarningLevel>0</CodeContractsAnalysisWarningLevel>
+  </PropertyGroup>
+  <PropertyGroup Condition=" '$(Configuration)|$(Platform)' == 'Release|AnyCPU' ">
+    <DebugType>pdbonly</DebugType>
+    <Optimize>true</Optimize>
+    <OutputPath>bin\Release\</OutputPath>
+    <DefineConstants>TRACE</DefineConstants>
+    <ErrorReport>prompt</ErrorReport>
+    <WarningLevel>4</WarningLevel>
+    <StyleCopTreatErrorsAsWarnings>false</StyleCopTreatErrorsAsWarnings>
+  </PropertyGroup>
+  <PropertyGroup Condition="'$(Configuration)|$(Platform)' == 'Integration|AnyCPU'">
+    <DebugSymbols>true</DebugSymbols>
+    <OutputPath>bin\Integration\</OutputPath>
+    <DefineConstants>DEBUG;TRACE</DefineConstants>
+    <DebugType>full</DebugType>
+    <PlatformTarget>AnyCPU</PlatformTarget>
+    <ErrorReport>prompt</ErrorReport>
+    <CodeAnalysisIgnoreBuiltInRuleSets>true</CodeAnalysisIgnoreBuiltInRuleSets>
+    <CodeAnalysisIgnoreBuiltInRules>true</CodeAnalysisIgnoreBuiltInRules>
+    <CodeAnalysisFailOnMissingRules>true</CodeAnalysisFailOnMissingRules>
+    <StyleCopTreatErrorsAsWarnings>false</StyleCopTreatErrorsAsWarnings>
+  </PropertyGroup>
+  <ItemGroup>
+    <Reference Include="log4net">
+      <HintPath>..\..\..\..\lib\log4Net\log4net.dll</HintPath>
+    </Reference>
+    <Reference Include="nunit.framework, Version=2.5.9.10348, Culture=neutral, PublicKeyToken=96d09a1eb7f44a77, processorArchitecture=MSIL">
+      <SpecificVersion>False</SpecificVersion>
+      <HintPath>..\..\..\..\lib\nunit\2.5.9\nunit.framework.dll</HintPath>
+    </Reference>
+    <Reference Include="System" />
+    <Reference Include="System.Core" />
+    <Reference Include="Microsoft.CSharp" />
+  </ItemGroup>
+  <ItemGroup>
+    <Compile Include="MessageSetTests.cs" />
+    <Compile Include="MessageTests.cs" />
+    <Compile Include="Producers\PartitioningTests.cs" />
+    <Compile Include="Properties\AssemblyInfo.cs" />
+    <Compile Include="Request\FetchRequestTests.cs" />
+    <Compile Include="Request\MultiFetchRequestTests.cs" />
+    <Compile Include="Request\MultiProducerRequestTests.cs" />
+    <Compile Include="Request\OffsetRequestTests.cs" />
+    <Compile Include="Request\ProducerRequestTests.cs" />
+    <Compile Include="Util\BitWorksTests.cs" />
+  </ItemGroup>
+  <ItemGroup>
+    <ProjectReference Include="..\..\Kafka.Client\Kafka.Client.csproj">
+      <Project>{A92DD03B-EE4F-4A78-9FB2-279B6348C7D2}</Project>
+      <Name>Kafka.Client</Name>
+    </ProjectReference>
+  </ItemGroup>
+  <ItemGroup>
+    <Folder Include="ZooKeeper\" />
+  </ItemGroup>
+  <Import Project="$(MSBuildToolsPath)\Microsoft.CSharp.targets" />
+  <Import Project="..\..\..\..\lib\StyleCop\Microsoft.StyleCop.Targets" />
 </Project>
\ No newline at end of file

Added: incubator/kafka/trunk/clients/csharp/src/Kafka/Tests/Kafka.Client.Tests/MessageSetTests.cs
URL: http://svn.apache.org/viewvc/incubator/kafka/trunk/clients/csharp/src/Kafka/Tests/Kafka.Client.Tests/MessageSetTests.cs?rev=1173797&view=auto
==============================================================================
--- incubator/kafka/trunk/clients/csharp/src/Kafka/Tests/Kafka.Client.Tests/MessageSetTests.cs (added)
+++ incubator/kafka/trunk/clients/csharp/src/Kafka/Tests/Kafka.Client.Tests/MessageSetTests.cs Wed Sep 21 19:17:19 2011
@@ -0,0 +1,105 @@
+/*
+ * Copyright 2011 LinkedIn
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+*/
+
+namespace Kafka.Client.Tests
+{
+    using System;
+    using System.Collections.Generic;
+    using System.IO;
+    using Kafka.Client.Messages;
+    using Kafka.Client.Utils;
+    using NUnit.Framework;
+
+    [TestFixture]
+    public class MessageSetTests
+    {
+        private const int MessageLengthPartLength = 4;
+        private const int MagicNumberPartLength = 1;
+        private const int ChecksumPartLength = 4;
+
+        private const int MessageLengthPartOffset = 0;
+        private const int MagicNumberPartOffset = 4;
+        private const int ChecksumPartOffset = 5;
+        private const int DataPartOffset = 9;
+
+        [Test]
+        public void BufferedMessageSetWriteToValidSequence()
+        {
+            byte[] messageBytes = new byte[] { 1, 2, 3, 4, 5 };
+            Message msg1 = new Message(messageBytes);
+            Message msg2 = new Message(messageBytes);
+            MessageSet messageSet = new BufferedMessageSet(new List<Message>() { msg1, msg2 });
+            MemoryStream ms = new MemoryStream();
+            messageSet.WriteTo(ms);
+
+            ////first message
+
+            byte[] messageLength = new byte[MessageLengthPartLength];
+            Array.Copy(ms.ToArray(), MessageLengthPartOffset, messageLength, 0, MessageLengthPartLength);
+            if (BitConverter.IsLittleEndian)
+            {
+                Array.Reverse(messageLength);
+            }
+
+            Assert.AreEqual(MagicNumberPartLength + ChecksumPartLength + messageBytes.Length, BitConverter.ToInt32(messageLength, 0));
+
+            Assert.AreEqual(0, ms.ToArray()[MagicNumberPartOffset]);    // default magic number should be 0
+
+            byte[] checksumPart = new byte[ChecksumPartLength];
+            Array.Copy(ms.ToArray(), ChecksumPartOffset, checksumPart, 0, ChecksumPartLength);
+            Assert.AreEqual(Crc32Hasher.Compute(messageBytes), checksumPart);
+
+            byte[] dataPart = new byte[messageBytes.Length];
+            Array.Copy(ms.ToArray(), DataPartOffset, dataPart, 0, messageBytes.Length);
+            Assert.AreEqual(messageBytes, dataPart);
+
+            ////second message
+            int secondMessageOffset = MessageLengthPartLength + MagicNumberPartLength + ChecksumPartLength +
+                                      messageBytes.Length;
+
+            messageLength = new byte[MessageLengthPartLength];
+            Array.Copy(ms.ToArray(), secondMessageOffset + MessageLengthPartOffset, messageLength, 0, MessageLengthPartLength);
+            if (BitConverter.IsLittleEndian)
+            {
+                Array.Reverse(messageLength);
+            }
+
+            Assert.AreEqual(MagicNumberPartLength + ChecksumPartLength + messageBytes.Length, BitConverter.ToInt32(messageLength, 0));
+
+            Assert.AreEqual(0, ms.ToArray()[secondMessageOffset + MagicNumberPartOffset]);    // default magic number should be 0
+
+            checksumPart = new byte[ChecksumPartLength];
+            Array.Copy(ms.ToArray(), secondMessageOffset + ChecksumPartOffset, checksumPart, 0, ChecksumPartLength);
+            Assert.AreEqual(Crc32Hasher.Compute(messageBytes), checksumPart);
+
+            dataPart = new byte[messageBytes.Length];
+            Array.Copy(ms.ToArray(), secondMessageOffset + DataPartOffset, dataPart, 0, messageBytes.Length);
+            Assert.AreEqual(messageBytes, dataPart);
+        }
+
+        [Test]
+        public void SetSizeValid()
+        {
+            byte[] messageBytes = new byte[] { 1, 2, 3, 4, 5 };
+            Message msg1 = new Message(messageBytes);
+            Message msg2 = new Message(messageBytes);
+            MessageSet messageSet = new BufferedMessageSet(new List<Message>() { msg1, msg2 });
+            Assert.AreEqual(
+                2 * (MessageLengthPartLength + MagicNumberPartLength + ChecksumPartLength + messageBytes.Length),
+                messageSet.SetSize);
+        }
+    }
+}

Modified: incubator/kafka/trunk/clients/csharp/src/Kafka/Tests/Kafka.Client.Tests/MessageTests.cs
URL: http://svn.apache.org/viewvc/incubator/kafka/trunk/clients/csharp/src/Kafka/Tests/Kafka.Client.Tests/MessageTests.cs?rev=1173797&r1=1173796&r2=1173797&view=diff
==============================================================================
--- incubator/kafka/trunk/clients/csharp/src/Kafka/Tests/Kafka.Client.Tests/MessageTests.cs (original)
+++ incubator/kafka/trunk/clients/csharp/src/Kafka/Tests/Kafka.Client.Tests/MessageTests.cs Wed Sep 21 19:17:19 2011
@@ -1,68 +1,130 @@
-using System;
-using System.Linq;
-using System.Text;
-using Kafka.Client.Util;
-using NUnit.Framework;
-
-namespace Kafka.Client.Tests
-{
-    /// <summary>
-    /// Tests for the <see cref="Message"/> class.
-    /// </summary>
-    [TestFixture]
-    public class MessageTests
-    {
-        /// <summary>
-        /// Demonstrates a properly parsed message.
-        /// </summary>
-        [Test]
-        public void ParseFromValid()
-        {
-            Crc32 crc32 = new Crc32();
-
-            string payload = "kafka";
-            byte magic = 0;
-            byte[] payloadData = Encoding.UTF8.GetBytes(payload);
-            byte[] payloadSize = BitConverter.GetBytes(payloadData.Length);
-            byte[] checksum = crc32.ComputeHash(payloadData);
-            byte[] messageData = new byte[payloadData.Length + 1 + payloadSize.Length + checksum.Length];
-
-            Buffer.BlockCopy(payloadSize, 0, messageData, 0, payloadSize.Length);
-            messageData[4] = magic;
-            Buffer.BlockCopy(checksum, 0, messageData, payloadSize.Length + 1, checksum.Length);
-            Buffer.BlockCopy(payloadData, 0, messageData, payloadSize.Length + 1 + checksum.Length, payloadData.Length);
-
-            Message message = Message.ParseFrom(messageData);
-
-            Assert.IsNotNull(message);
-            Assert.AreEqual(magic, message.Magic);
-            Assert.IsTrue(payloadData.SequenceEqual(message.Payload));
-            Assert.IsTrue(checksum.SequenceEqual(message.Checksum));
-        }
-
-        /// <summary>
-        /// Ensure that the bytes returned from the message are in valid kafka sequence.
-        /// </summary>
-        [Test]
-        public void GetBytesValidSequence()
-        {
-            Message message = new Message(new byte[10], (byte)245);
-
-            byte[] bytes = message.GetBytes();
-
-            Assert.IsNotNull(bytes);
-
-            // len(payload) + 1 + 4
-            Assert.AreEqual(15, bytes.Length);
-
-            // first 4 bytes = the magic number
-            Assert.AreEqual((byte)245, bytes[0]);
-
-            // next 4 bytes = the checksum
-            Assert.IsTrue(message.Checksum.SequenceEqual(bytes.Skip(1).Take(4).ToArray<byte>()));
-
-            // remaining bytes = the payload
-            Assert.AreEqual(10, bytes.Skip(5).ToArray<byte>().Length);
-        }
-    }
-}
+/*
+ * Copyright 2011 LinkedIn
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+*/
+
+namespace Kafka.Client.Tests
+{
+    using System;
+    using System.IO;
+    using System.Linq;
+    using System.Text;
+    using Kafka.Client.Messages;
+    using Kafka.Client.Utils;
+    using NUnit.Framework;
+
+    /// <summary>
+    /// Tests for the <see cref="Message"/> class.
+    /// </summary>
+    [TestFixture]
+    public class MessageTests
+    {
+        private readonly int ChecksumPartLength = 4;
+
+        private readonly int MagicNumberPartOffset = 0;
+        private readonly int ChecksumPartOffset = 1;
+        private readonly int DataPartOffset = 5;
+
+        /// <summary>
+        /// Demonstrates a properly parsed message.
+        /// </summary>
+        [Test]
+        public void ParseFromValid()
+        {
+            Crc32Hasher crc32 = new Crc32Hasher();
+
+            string payload = "kafka";
+            byte magic = 0;
+            byte[] payloadData = Encoding.UTF8.GetBytes(payload);
+            byte[] payloadSize = BitConverter.GetBytes(payloadData.Length);
+            byte[] checksum = crc32.ComputeHash(payloadData);
+            byte[] messageData = new byte[payloadData.Length + 1 + payloadSize.Length + checksum.Length];
+
+            Buffer.BlockCopy(payloadSize, 0, messageData, 0, payloadSize.Length);
+            messageData[4] = magic;
+            Buffer.BlockCopy(checksum, 0, messageData, payloadSize.Length + 1, checksum.Length);
+            Buffer.BlockCopy(payloadData, 0, messageData, payloadSize.Length + 1 + checksum.Length, payloadData.Length);
+
+            Message message = Message.ParseFrom(messageData);
+
+            Assert.IsNotNull(message);
+            Assert.AreEqual(magic, message.Magic);
+            Assert.IsTrue(payloadData.SequenceEqual(message.Payload));
+            Assert.IsTrue(checksum.SequenceEqual(message.Checksum));
+        }
+
+        /// <summary>
+        /// Ensure that the bytes returned from the message are in valid kafka sequence.
+        /// </summary>
+        [Test]
+        public void GetBytesValidSequence()
+        {
+            Message message = new Message(new byte[10], (byte)245);
+
+            MemoryStream ms = new MemoryStream();
+            message.WriteTo(ms);
+
+            // len(payload) + 1 + 4
+            Assert.AreEqual(15, ms.Length);
+
+            // first 4 bytes = the magic number
+            Assert.AreEqual((byte)245, ms.ToArray()[0]);
+
+            // next 4 bytes = the checksum
+            Assert.IsTrue(message.Checksum.SequenceEqual(ms.ToArray().Skip(1).Take(4).ToArray<byte>()));
+
+            // remaining bytes = the payload
+            Assert.AreEqual(10, ms.ToArray().Skip(5).ToArray<byte>().Length);
+        }
+
+        [Test]
+        public void WriteToValidSequenceForDefaultConstructor()
+        {
+            byte[] messageBytes = new byte[] { 1, 2, 3, 4, 5, 6, 7, 8, 9 };
+            Message message = new Message(messageBytes);
+            MemoryStream ms = new MemoryStream();
+            message.WriteTo(ms);
+
+            Assert.AreEqual(0, ms.ToArray()[MagicNumberPartOffset]);    // default magic number should be 0
+
+            byte[] checksumPart = new byte[ChecksumPartLength];
+            Array.Copy(ms.ToArray(), ChecksumPartOffset, checksumPart, 0, ChecksumPartLength);
+            Assert.AreEqual(Crc32Hasher.Compute(messageBytes), checksumPart);
+
+            byte[] dataPart = new byte[messageBytes.Length];
+            Array.Copy(ms.ToArray(), DataPartOffset, dataPart, 0, messageBytes.Length);
+            Assert.AreEqual(messageBytes, dataPart);
+        }
+
+        [Test]
+        public void WriteToValidSequenceForCustomConstructor()
+        {
+            byte[] messageBytes = new byte[] { 1, 2, 3, 4, 5, 6, 7, 8, 9 };
+            byte[] customChecksum = new byte[] { 3, 4, 5, 6 };
+            Message message = new Message(messageBytes, (byte)33, customChecksum);
+            MemoryStream ms = new MemoryStream();
+            message.WriteTo(ms);
+
+            Assert.AreEqual((byte)33, ms.ToArray()[MagicNumberPartOffset]);
+
+            byte[] checksumPart = new byte[ChecksumPartLength];
+            Array.Copy(ms.ToArray(), ChecksumPartOffset, checksumPart, 0, ChecksumPartLength);
+            Assert.AreEqual(customChecksum, checksumPart);
+
+            byte[] dataPart = new byte[messageBytes.Length];
+            Array.Copy(ms.ToArray(), DataPartOffset, dataPart, 0, messageBytes.Length);
+            Assert.AreEqual(messageBytes, dataPart);
+        }
+    }
+}