You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by ju...@apache.org on 2011/10/18 19:52:16 UTC

svn commit: r1185772 [4/4] - in /incubator/kafka/trunk/clients/csharp/src/Kafka: Kafka.Client/ Kafka.Client/Cfg/ Kafka.Client/Consumers/ Kafka.Client/Exceptions/ Kafka.Client/Messages/ Kafka.Client/Producers/ Kafka.Client/Producers/Async/ Kafka.Client/...

Modified: incubator/kafka/trunk/clients/csharp/src/Kafka/Tests/Kafka.Client.IntegrationTests/ZKBrokerPartitionInfoTests.cs
URL: http://svn.apache.org/viewvc/incubator/kafka/trunk/clients/csharp/src/Kafka/Tests/Kafka.Client.IntegrationTests/ZKBrokerPartitionInfoTests.cs?rev=1185772&r1=1185771&r2=1185772&view=diff
==============================================================================
--- incubator/kafka/trunk/clients/csharp/src/Kafka/Tests/Kafka.Client.IntegrationTests/ZKBrokerPartitionInfoTests.cs (original)
+++ incubator/kafka/trunk/clients/csharp/src/Kafka/Tests/Kafka.Client.IntegrationTests/ZKBrokerPartitionInfoTests.cs Tue Oct 18 17:52:13 2011
@@ -19,7 +19,6 @@ namespace Kafka.Client.IntegrationTests
 {
     using System.Collections.Generic;
     using System.Linq;
-    using Kafka.Client.Cfg;
     using Kafka.Client.Cluster;
     using Kafka.Client.Producers.Partitioning;
     using Kafka.Client.Utils;
@@ -29,60 +28,53 @@ namespace Kafka.Client.IntegrationTests
     using ZooKeeperNet;
 
     [TestFixture]
-    public class ZKBrokerPartitionInfoTests : IntegrationFixtureBase
+    public class ZkBrokerPartitionInfoTests : IntegrationFixtureBase
     {
-        private KafkaClientConfiguration clientConfig;
-        private ZKConfig zkConfig;
-
-        [TestFixtureSetUp]
-        public void SetUp()
-        {
-            clientConfig = KafkaClientConfiguration.GetConfiguration();
-            zkConfig = new ProducerConfig(clientConfig);
-        }
-
         [Test]
-        public void ZKBrokerPartitionInfoGetsAllBrokerInfo()
+        public void ZkBrokerPartitionInfoGetsAllBrokerInfo()
         {
+            var prodConfig = this.ZooKeeperBasedSyncProdConfig;
+            var prodConfigNotZk = this.ConfigBasedSyncProdConfig;
+
             IDictionary<int, Broker> allBrokerInfo;
-            using (var brokerPartitionInfo = new ZKBrokerPartitionInfo(zkConfig, null))
+            using (var brokerPartitionInfo = new ZKBrokerPartitionInfo(prodConfig, null))
             {
                 allBrokerInfo = brokerPartitionInfo.GetAllBrokerInfo();
             }
 
-            Assert.AreEqual(clientConfig.BrokerPartitionInfos.Count, allBrokerInfo.Count);
-            foreach (BrokerPartitionInfo cfgBrPartInfo in clientConfig.BrokerPartitionInfos)
-            {
-                Assert.IsTrue(allBrokerInfo.ContainsKey(cfgBrPartInfo.Id));
-                Assert.AreEqual(cfgBrPartInfo.Address, allBrokerInfo[cfgBrPartInfo.Id].Host);
-                Assert.AreEqual(cfgBrPartInfo.Port, allBrokerInfo[cfgBrPartInfo.Id].Port);
-            }
+            Assert.AreEqual(prodConfigNotZk.Brokers.Count, allBrokerInfo.Count);
+            allBrokerInfo.Values.All(x => prodConfigNotZk.Brokers.Any(
+                y => x.Id == y.BrokerId
+                && x.Host == y.Host
+                && x.Port == y.Port));
         }
 
         [Test]
-        public void ZKBrokerPartitionInfoGetsBrokerPartitionInfo()
+        public void ZkBrokerPartitionInfoGetsBrokerPartitionInfo()
         {
+            var prodconfig = this.ZooKeeperBasedSyncProdConfig;
             SortedSet<Partition> partitions;
-            using (var brokerPartitionInfo = new ZKBrokerPartitionInfo(zkConfig, null))
+            using (var brokerPartitionInfo = new ZKBrokerPartitionInfo(prodconfig, null))
             {
                 partitions = brokerPartitionInfo.GetBrokerPartitionInfo("test");
             }
 
             Assert.NotNull(partitions);
             Assert.GreaterOrEqual(partitions.Count, 2);
-            var partition = partitions.ToList()[0];
-            Assert.AreEqual(0, partition.BrokerId);
         }
 
         [Test]
         public void ZkBrokerPartitionInfoGetsBrokerInfo()
         {
-            using (var brokerPartitionInfo = new ZKBrokerPartitionInfo(zkConfig, null))
+            var prodConfig = this.ZooKeeperBasedSyncProdConfig;
+            var prodConfigNotZk = this.ConfigBasedSyncProdConfig;
+
+            using (var brokerPartitionInfo = new ZKBrokerPartitionInfo(prodConfig, null))
             {
-                var testBroker = clientConfig.BrokerPartitionInfos[0];
-                Broker broker = brokerPartitionInfo.GetBrokerInfo(testBroker.Id);
+                var testBroker = prodConfigNotZk.Brokers[0];
+                Broker broker = brokerPartitionInfo.GetBrokerInfo(testBroker.BrokerId);
                 Assert.NotNull(broker);
-                Assert.AreEqual(testBroker.Address, broker.Host);
+                Assert.AreEqual(testBroker.Host, broker.Host);
                 Assert.AreEqual(testBroker.Port, broker.Port);
             }
         }
@@ -90,14 +82,15 @@ namespace Kafka.Client.IntegrationTests
         [Test]
         public void WhenNewTopicIsAddedBrokerTopicsListenerCreatesNewMapping()
         {
-            var producerConfig = new ProducerConfig(clientConfig);
+            var prodConfig = this.ZooKeeperBasedSyncProdConfig;
+
             IDictionary<string, SortedSet<Partition>> mappings;
             IDictionary<int, Broker> brokers;
             string topicPath = ZooKeeperClient.DefaultBrokerTopicsPath + "/" + CurrentTestTopic;
 
             using (IZooKeeperClient client = new ZooKeeperClient(
-                producerConfig.ZkConnect,
-                producerConfig.ZkSessionTimeoutMs,
+                prodConfig.ZooKeeper.ZkConnect,
+                prodConfig.ZooKeeper.ZkSessionTimeoutMs,
                 ZooKeeperStringSerializer.Serializer))
             {
                 using (var brokerPartitionInfo = new ZKBrokerPartitionInfo(client))
@@ -114,8 +107,8 @@ namespace Kafka.Client.IntegrationTests
             Assert.NotNull(mappings);
             Assert.Greater(mappings.Count, 0);
             using (IZooKeeperClient client = new ZooKeeperClient(
-                producerConfig.ZkConnect,
-                producerConfig.ZkSessionTimeoutMs,
+                prodConfig.ZooKeeper.ZkConnect,
+                prodConfig.ZooKeeper.ZkSessionTimeoutMs,
                 ZooKeeperStringSerializer.Serializer))
             {
                 client.Connect();
@@ -125,6 +118,7 @@ namespace Kafka.Client.IntegrationTests
                 client.CreatePersistent(topicPath, true);
                 WaitUntillIdle(client, 500);
                 client.UnsubscribeAll();
+                WaitUntillIdle(client, 500);
                 client.DeleteRecursive(topicPath);
             }
 
@@ -134,13 +128,14 @@ namespace Kafka.Client.IntegrationTests
         [Test]
         public void WhenNewBrokerIsAddedBrokerTopicsListenerUpdatesBrokersList()
         {
-            var producerConfig = new ProducerConfig(clientConfig);
+            var prodConfig = this.ZooKeeperBasedSyncProdConfig;
+
             IDictionary<string, SortedSet<Partition>> mappings;
             IDictionary<int, Broker> brokers;
             string brokerPath = ZooKeeperClient.DefaultBrokerIdsPath + "/" + 2345;
             using (IZooKeeperClient client = new ZooKeeperClient(
-                producerConfig.ZkConnect,
-                producerConfig.ZkSessionTimeoutMs,
+                prodConfig.ZooKeeper.ZkConnect,
+                prodConfig.ZooKeeper.ZkSessionTimeoutMs,
                 ZooKeeperStringSerializer.Serializer))
             {
                 using (var brokerPartitionInfo = new ZKBrokerPartitionInfo(client))
@@ -157,18 +152,20 @@ namespace Kafka.Client.IntegrationTests
             Assert.NotNull(mappings);
             Assert.Greater(mappings.Count, 0);
             using (IZooKeeperClient client = new ZooKeeperClient(
-                producerConfig.ZkConnect,
-                producerConfig.ZkSessionTimeoutMs,
+                prodConfig.ZooKeeper.ZkConnect,
+                prodConfig.ZooKeeper.ZkSessionTimeoutMs,
                 ZooKeeperStringSerializer.Serializer))
             {
                 client.Connect();
                 WaitUntillIdle(client, 500);
                 var brokerTopicsListener = new BrokerTopicsListener(client, mappings, brokers, null);
                 client.Subscribe(ZooKeeperClient.DefaultBrokerIdsPath, brokerTopicsListener);
+                WaitUntillIdle(client, 500);
                 client.CreatePersistent(brokerPath, true);
                 client.WriteData(brokerPath, "192.168.1.39-1310449279123:192.168.1.39:9102");
                 WaitUntillIdle(client, 500);
                 client.UnsubscribeAll();
+                WaitUntillIdle(client, 500);
                 client.DeleteRecursive(brokerPath);
             }
 
@@ -181,13 +178,14 @@ namespace Kafka.Client.IntegrationTests
         [Test]
         public void WhenBrokerIsRemovedBrokerTopicsListenerUpdatesBrokersList()
         {
-            var producerConfig = new ProducerConfig(clientConfig);
+            var prodConfig = this.ZooKeeperBasedSyncProdConfig;
+
             IDictionary<string, SortedSet<Partition>> mappings;
             IDictionary<int, Broker> brokers;
             string brokerPath = ZooKeeperClient.DefaultBrokerIdsPath + "/" + 2345;
             using (IZooKeeperClient client = new ZooKeeperClient(
-                producerConfig.ZkConnect,
-                producerConfig.ZkSessionTimeoutMs,
+                prodConfig.ZooKeeper.ZkConnect,
+                prodConfig.ZooKeeper.ZkSessionTimeoutMs,
                 ZooKeeperStringSerializer.Serializer))
             {
                 using (var brokerPartitionInfo = new ZKBrokerPartitionInfo(client))
@@ -204,8 +202,8 @@ namespace Kafka.Client.IntegrationTests
             Assert.NotNull(mappings);
             Assert.Greater(mappings.Count, 0);
             using (IZooKeeperClient client = new ZooKeeperClient(
-                producerConfig.ZkConnect,
-                producerConfig.ZkSessionTimeoutMs,
+                prodConfig.ZooKeeper.ZkConnect,
+                prodConfig.ZooKeeper.ZkSessionTimeoutMs,
                 ZooKeeperStringSerializer.Serializer))
             {
                 client.Connect();
@@ -225,15 +223,16 @@ namespace Kafka.Client.IntegrationTests
         [Test]
         public void WhenNewBrokerInTopicIsAddedBrokerTopicsListenerUpdatesMappings()
         {
-            var producerConfig = new ProducerConfig(clientConfig);
+            var prodConfig = this.ZooKeeperBasedSyncProdConfig;
+
             IDictionary<string, SortedSet<Partition>> mappings;
             IDictionary<int, Broker> brokers;
             string brokerPath = ZooKeeperClient.DefaultBrokerIdsPath + "/" + 2345;
             string topicPath = ZooKeeperClient.DefaultBrokerTopicsPath + "/" + CurrentTestTopic;
             string topicBrokerPath = topicPath + "/" + 2345;
             using (IZooKeeperClient client = new ZooKeeperClient(
-                producerConfig.ZkConnect,
-                producerConfig.ZkSessionTimeoutMs,
+                prodConfig.ZooKeeper.ZkConnect,
+                prodConfig.ZooKeeper.ZkSessionTimeoutMs,
                 ZooKeeperStringSerializer.Serializer))
             {
                 using (var brokerPartitionInfo = new ZKBrokerPartitionInfo(client))
@@ -250,8 +249,8 @@ namespace Kafka.Client.IntegrationTests
             Assert.NotNull(mappings);
             Assert.Greater(mappings.Count, 0);
             using (IZooKeeperClient client = new ZooKeeperClient(
-                producerConfig.ZkConnect,
-                producerConfig.ZkSessionTimeoutMs,
+                prodConfig.ZooKeeper.ZkConnect,
+                prodConfig.ZooKeeper.ZkSessionTimeoutMs,
                 ZooKeeperStringSerializer.Serializer))
             {
                 client.Connect();
@@ -269,6 +268,7 @@ namespace Kafka.Client.IntegrationTests
                 client.WriteData(topicBrokerPath, 5);
                 WaitUntillIdle(client, 500);
                 client.UnsubscribeAll();
+                WaitUntillIdle(client, 500);
                 client.DeleteRecursive(brokerPath);
                 client.DeleteRecursive(topicPath);
             }
@@ -281,55 +281,56 @@ namespace Kafka.Client.IntegrationTests
         [Test]
         public void WhenSessionIsExpiredListenerRecreatesEphemeralNodes()
         {
-            {
-                var producerConfig = new ProducerConfig(clientConfig);
-                IDictionary<string, SortedSet<Partition>> mappings;
-                IDictionary<int, Broker> brokers;
-                IDictionary<string, SortedSet<Partition>> mappings2;
-                IDictionary<int, Broker> brokers2;
-                using (IZooKeeperClient client = new ZooKeeperClient(
-                    producerConfig.ZkConnect,
-                    producerConfig.ZkSessionTimeoutMs,
+            var prodConfig = this.ZooKeeperBasedSyncProdConfig;
+
+            IDictionary<string, SortedSet<Partition>> mappings;
+            IDictionary<int, Broker> brokers;
+            IDictionary<string, SortedSet<Partition>> mappings2;
+            IDictionary<int, Broker> brokers2;
+            using (
+                IZooKeeperClient client = new ZooKeeperClient(
+                    prodConfig.ZooKeeper.ZkConnect,
+                    prodConfig.ZooKeeper.ZkSessionTimeoutMs,
                     ZooKeeperStringSerializer.Serializer))
+            {
+                using (var brokerPartitionInfo = new ZKBrokerPartitionInfo(client))
                 {
-                    using (var brokerPartitionInfo = new ZKBrokerPartitionInfo(client))
-                    {
-                        brokers = brokerPartitionInfo.GetAllBrokerInfo();
-                        mappings =
-                            ReflectionHelper.GetInstanceField<IDictionary<string, SortedSet<Partition>>>(
-                                "topicBrokerPartitions", brokerPartitionInfo);
-                        Assert.NotNull(brokers);
-                        Assert.Greater(brokers.Count, 0);
-                        Assert.NotNull(mappings);
-                        Assert.Greater(mappings.Count, 0);
-                        client.Process(new WatchedEvent(KeeperState.Expired, EventType.None, null));
-                        WaitUntillIdle(client, 3000);
-                        brokers2 = brokerPartitionInfo.GetAllBrokerInfo();
-                        mappings2 =
-                            ReflectionHelper.GetInstanceField<IDictionary<string, SortedSet<Partition>>>(
-                                "topicBrokerPartitions", brokerPartitionInfo);
-                    }
+                    brokers = brokerPartitionInfo.GetAllBrokerInfo();
+                    mappings =
+                        ReflectionHelper.GetInstanceField<IDictionary<string, SortedSet<Partition>>>(
+                            "topicBrokerPartitions", brokerPartitionInfo);
+                    Assert.NotNull(brokers);
+                    Assert.Greater(brokers.Count, 0);
+                    Assert.NotNull(mappings);
+                    Assert.Greater(mappings.Count, 0);
+                    client.Process(new WatchedEvent(KeeperState.Expired, EventType.None, null));
+                    WaitUntillIdle(client, 3000);
+                    brokers2 = brokerPartitionInfo.GetAllBrokerInfo();
+                    mappings2 =
+                        ReflectionHelper.GetInstanceField<IDictionary<string, SortedSet<Partition>>>(
+                            "topicBrokerPartitions", brokerPartitionInfo);
                 }
-
-                Assert.NotNull(brokers2);
-                Assert.Greater(brokers2.Count, 0);
-                Assert.NotNull(mappings2);
-                Assert.Greater(mappings2.Count, 0);
-                Assert.AreEqual(brokers.Count, brokers2.Count);
-                Assert.AreEqual(mappings.Count, mappings2.Count);
             }
+
+            Assert.NotNull(brokers2);
+            Assert.Greater(brokers2.Count, 0);
+            Assert.NotNull(mappings2);
+            Assert.Greater(mappings2.Count, 0);
+            Assert.AreEqual(brokers.Count, brokers2.Count);
+            Assert.AreEqual(mappings.Count, mappings2.Count);
         }
 
         [Test]
-        public void WhenNewTopicIsAddedZKBrokerPartitionInfoUpdatesMappings()
+        public void WhenNewTopicIsAddedZkBrokerPartitionInfoUpdatesMappings()
         {
-            var producerConfig = new ProducerConfig(clientConfig);
+            var prodConfig = this.ZooKeeperBasedSyncProdConfig;
+
             IDictionary<string, SortedSet<Partition>> mappings;
             string topicPath = ZooKeeperClient.DefaultBrokerTopicsPath + "/" + CurrentTestTopic;
 
             using (IZooKeeperClient client = new ZooKeeperClient(
-                producerConfig.ZkConnect,
-                producerConfig.ZkSessionTimeoutMs,
+                prodConfig.ZooKeeper.ZkConnect,
+                prodConfig.ZooKeeper.ZkSessionTimeoutMs,
                 ZooKeeperStringSerializer.Serializer))
             {
                 using (var brokerPartitionInfo = new ZKBrokerPartitionInfo(client))
@@ -340,6 +341,7 @@ namespace Kafka.Client.IntegrationTests
                     client.CreatePersistent(topicPath, true);
                     WaitUntillIdle(client, 500);
                     client.UnsubscribeAll();
+                    WaitUntillIdle(client, 500);
                     client.DeleteRecursive(topicPath);
                 }
             }
@@ -350,14 +352,15 @@ namespace Kafka.Client.IntegrationTests
         }
 
         [Test]
-        public void WhenNewBrokerIsAddedZKBrokerPartitionInfoUpdatesBrokersList()
+        public void WhenNewBrokerIsAddedZkBrokerPartitionInfoUpdatesBrokersList()
         {
-            var producerConfig = new ProducerConfig(clientConfig);
+            var prodConfig = this.ZooKeeperBasedSyncProdConfig;
+
             IDictionary<int, Broker> brokers;
             string brokerPath = ZooKeeperClient.DefaultBrokerIdsPath + "/" + 2345;
             using (IZooKeeperClient client = new ZooKeeperClient(
-                producerConfig.ZkConnect,
-                producerConfig.ZkSessionTimeoutMs,
+                prodConfig.ZooKeeper.ZkConnect,
+                prodConfig.ZooKeeper.ZkSessionTimeoutMs,
                 ZooKeeperStringSerializer.Serializer))
             {
                 using (var brokerPartitionInfo = new ZKBrokerPartitionInfo(client))
@@ -367,6 +370,7 @@ namespace Kafka.Client.IntegrationTests
                     client.WriteData(brokerPath, "192.168.1.39-1310449279123:192.168.1.39:9102");
                     WaitUntillIdle(client, 500);
                     client.UnsubscribeAll();
+                    WaitUntillIdle(client, 500);
                     client.DeleteRecursive(brokerPath);
                 }
             }
@@ -380,18 +384,20 @@ namespace Kafka.Client.IntegrationTests
         }
 
         [Test]
-        public void WhenBrokerIsRemovedZKBrokerPartitionInfoUpdatesBrokersList()
+        public void WhenBrokerIsRemovedZkBrokerPartitionInfoUpdatesBrokersList()
         {
-            var producerConfig = new ProducerConfig(clientConfig);
+            var prodConfig = this.ZooKeeperBasedSyncProdConfig;
+
             IDictionary<int, Broker> brokers;
             string brokerPath = ZooKeeperClient.DefaultBrokerIdsPath + "/" + 2345;
             using (IZooKeeperClient client = new ZooKeeperClient(
-                producerConfig.ZkConnect,
-                producerConfig.ZkSessionTimeoutMs,
+                prodConfig.ZooKeeper.ZkConnect,
+                prodConfig.ZooKeeper.ZkSessionTimeoutMs,
                 ZooKeeperStringSerializer.Serializer))
             {
                 using (var brokerPartitionInfo = new ZKBrokerPartitionInfo(client))
                 {
+                    WaitUntillIdle(client, 500);
                     brokers = brokerPartitionInfo.GetAllBrokerInfo();
                     client.CreatePersistent(brokerPath, true);
                     client.WriteData(brokerPath, "192.168.1.39-1310449279123:192.168.1.39:9102");
@@ -410,17 +416,18 @@ namespace Kafka.Client.IntegrationTests
         }
 
         [Test]
-        public void WhenNewBrokerInTopicIsAddedZKBrokerPartitionInfoUpdatesMappings()
+        public void WhenNewBrokerInTopicIsAddedZkBrokerPartitionInfoUpdatesMappings()
         {
-            var producerConfig = new ProducerConfig(clientConfig);
+            var prodConfig = this.ZooKeeperBasedSyncProdConfig;
+
             IDictionary<string, SortedSet<Partition>> mappings;
             IDictionary<int, Broker> brokers;
             string brokerPath = ZooKeeperClient.DefaultBrokerIdsPath + "/" + 2345;
             string topicPath = ZooKeeperClient.DefaultBrokerTopicsPath + "/" + CurrentTestTopic;
             string topicBrokerPath = topicPath + "/" + 2345;
             using (IZooKeeperClient client = new ZooKeeperClient(
-                producerConfig.ZkConnect,
-                producerConfig.ZkSessionTimeoutMs,
+                prodConfig.ZooKeeper.ZkConnect,
+                prodConfig.ZooKeeper.ZkSessionTimeoutMs,
                 ZooKeeperStringSerializer.Serializer))
             {
                 using (var brokerPartitionInfo = new ZKBrokerPartitionInfo(client))
@@ -439,6 +446,7 @@ namespace Kafka.Client.IntegrationTests
                     client.WriteData(topicBrokerPath, 5);
                     WaitUntillIdle(client, 500);
                     client.UnsubscribeAll();
+                    WaitUntillIdle(client, 500);
                     client.DeleteRecursive(brokerPath);
                     client.DeleteRecursive(topicPath);
                 }

Modified: incubator/kafka/trunk/clients/csharp/src/Kafka/Tests/Kafka.Client.IntegrationTests/ZooKeeperAwareProducerTests.cs
URL: http://svn.apache.org/viewvc/incubator/kafka/trunk/clients/csharp/src/Kafka/Tests/Kafka.Client.IntegrationTests/ZooKeeperAwareProducerTests.cs?rev=1185772&r1=1185771&r2=1185772&view=diff
==============================================================================
--- incubator/kafka/trunk/clients/csharp/src/Kafka/Tests/Kafka.Client.IntegrationTests/ZooKeeperAwareProducerTests.cs (original)
+++ incubator/kafka/trunk/clients/csharp/src/Kafka/Tests/Kafka.Client.IntegrationTests/ZooKeeperAwareProducerTests.cs Tue Oct 18 17:52:13 2011
@@ -33,44 +33,34 @@ namespace Kafka.Client.IntegrationTests
     public class ZooKeeperAwareProducerTests : IntegrationFixtureBase
     {
         /// <summary>
-        /// Kafka Client configuration
-        /// </summary>
-        private KafkaClientConfiguration clientConfig;
-
-        /// <summary>
         /// Maximum amount of time to wait trying to get a specific test message from Kafka server (in miliseconds)
         /// </summary>
-        private readonly int MaxTestWaitTimeInMiliseconds = 5000;
-
-        [TestFixtureSetUp]
-        public void SetUp()
-        {
-            clientConfig = KafkaClientConfiguration.GetConfiguration();
-        }
+        private readonly int maxTestWaitTimeInMiliseconds = 5000;
 
         [Test]
-        public void ZKAwareProducerSends1Message()
+        public void ZkAwareProducerSends1Message()
         {
+            var prodConfig = this.ZooKeeperBasedSyncProdConfig;
+
             int totalWaitTimeInMiliseconds = 0;
             int waitSingle = 100;
             var originalMessage = new Message(Encoding.UTF8.GetBytes("TestData"));
 
             var multipleBrokersHelper = new TestMultipleBrokersHelper(CurrentTestTopic);
-            multipleBrokersHelper.GetCurrentOffsets();
+            multipleBrokersHelper.GetCurrentOffsets(new[] { this.SyncProducerConfig1, this.SyncProducerConfig2, this.SyncProducerConfig3 });
 
-            var producerConfig = new ProducerConfig(clientConfig);
             var mockPartitioner = new MockAlwaysZeroPartitioner();
-            using (var producer = new Producer<string, Message>(producerConfig, mockPartitioner, new DefaultEncoder()))
+            using (var producer = new Producer<string, Message>(prodConfig, mockPartitioner, new DefaultEncoder()))
             {
                 var producerData = new ProducerData<string, Message>(
-                    CurrentTestTopic, "somekey", new List<Message>() { originalMessage });
+                    CurrentTestTopic, "somekey", new List<Message> { originalMessage });
                 producer.Send(producerData);
 
-                while (!multipleBrokersHelper.CheckIfAnyBrokerHasChanged())
+                while (!multipleBrokersHelper.CheckIfAnyBrokerHasChanged(new[] { this.SyncProducerConfig1, this.SyncProducerConfig2, this.SyncProducerConfig3 }))
                 {
                     totalWaitTimeInMiliseconds += waitSingle;
                     Thread.Sleep(waitSingle);
-                    if (totalWaitTimeInMiliseconds > MaxTestWaitTimeInMiliseconds)
+                    if (totalWaitTimeInMiliseconds > this.maxTestWaitTimeInMiliseconds)
                     {
                         Assert.Fail("None of the brokers changed their offset after sending a message");
                     }
@@ -78,13 +68,11 @@ namespace Kafka.Client.IntegrationTests
 
                 totalWaitTimeInMiliseconds = 0;
 
-                var consumerConfig = new ConsumerConfig(clientConfig)
-                    {
-                        Host = multipleBrokersHelper.BrokerThatHasChanged.Address,
-                        Port = multipleBrokersHelper.BrokerThatHasChanged.Port
-                    };
-                IConsumer consumer = new Consumers.Consumer(consumerConfig);
-                var request = new FetchRequest(CurrentTestTopic, 0, multipleBrokersHelper.OffsetFromBeforeTheChange);
+                var consumerConfig = new ConsumerConfiguration(
+                    multipleBrokersHelper.BrokerThatHasChanged.Host,
+                    multipleBrokersHelper.BrokerThatHasChanged.Port);
+                IConsumer consumer = new Consumer(consumerConfig);
+                var request = new FetchRequest(CurrentTestTopic, multipleBrokersHelper.PartitionThatHasChanged, multipleBrokersHelper.OffsetFromBeforeTheChange);
 
                 BufferedMessageSet response;
 
@@ -98,7 +86,7 @@ namespace Kafka.Client.IntegrationTests
                     }
 
                     totalWaitTimeInMiliseconds += waitSingle;
-                    if (totalWaitTimeInMiliseconds >= MaxTestWaitTimeInMiliseconds)
+                    if (totalWaitTimeInMiliseconds >= this.maxTestWaitTimeInMiliseconds)
                     {
                         break;
                     }
@@ -113,6 +101,7 @@ namespace Kafka.Client.IntegrationTests
         [Test]
         public void ZkAwareProducerSends3Messages()
         {
+            var prodConfig = this.ZooKeeperBasedSyncProdConfig;
             int totalWaitTimeInMiliseconds = 0;
             int waitSingle = 100;
             var originalMessage1 = new Message(Encoding.UTF8.GetBytes("TestData1"));
@@ -121,20 +110,19 @@ namespace Kafka.Client.IntegrationTests
             var originalMessageList = new List<Message> { originalMessage1, originalMessage2, originalMessage3 };
 
             var multipleBrokersHelper = new TestMultipleBrokersHelper(CurrentTestTopic);
-            multipleBrokersHelper.GetCurrentOffsets();
+            multipleBrokersHelper.GetCurrentOffsets(new[] { this.SyncProducerConfig1, this.SyncProducerConfig2, this.SyncProducerConfig3 });
 
-            var producerConfig = new ProducerConfig(clientConfig);
             var mockPartitioner = new MockAlwaysZeroPartitioner();
-            using (var producer = new Producer<string, Message>(producerConfig, mockPartitioner, new DefaultEncoder()))
+            using (var producer = new Producer<string, Message>(prodConfig, mockPartitioner, new DefaultEncoder()))
             {
                 var producerData = new ProducerData<string, Message>(CurrentTestTopic, "somekey", originalMessageList);
                 producer.Send(producerData);
 
-                while (!multipleBrokersHelper.CheckIfAnyBrokerHasChanged())
+                while (!multipleBrokersHelper.CheckIfAnyBrokerHasChanged(new[] { this.SyncProducerConfig1, this.SyncProducerConfig2, this.SyncProducerConfig3 }))
                 {
                     totalWaitTimeInMiliseconds += waitSingle;
                     Thread.Sleep(waitSingle);
-                    if (totalWaitTimeInMiliseconds > MaxTestWaitTimeInMiliseconds)
+                    if (totalWaitTimeInMiliseconds > this.maxTestWaitTimeInMiliseconds)
                     {
                         Assert.Fail("None of the brokers changed their offset after sending a message");
                     }
@@ -142,12 +130,10 @@ namespace Kafka.Client.IntegrationTests
 
                 totalWaitTimeInMiliseconds = 0;
 
-                var consumerConfig = new ConsumerConfig(clientConfig)
-                    {
-                        Host = multipleBrokersHelper.BrokerThatHasChanged.Address,
-                        Port = multipleBrokersHelper.BrokerThatHasChanged.Port
-                    };
-                IConsumer consumer = new Consumers.Consumer(consumerConfig);
+                var consumerConfig = new ConsumerConfiguration(
+                    multipleBrokersHelper.BrokerThatHasChanged.Host,
+                    multipleBrokersHelper.BrokerThatHasChanged.Port);
+                IConsumer consumer = new Consumer(consumerConfig);
                 var request = new FetchRequest(CurrentTestTopic, 0, multipleBrokersHelper.OffsetFromBeforeTheChange);
                 BufferedMessageSet response;
 
@@ -161,7 +147,7 @@ namespace Kafka.Client.IntegrationTests
                     }
 
                     totalWaitTimeInMiliseconds += waitSingle;
-                    if (totalWaitTimeInMiliseconds >= MaxTestWaitTimeInMiliseconds)
+                    if (totalWaitTimeInMiliseconds >= this.maxTestWaitTimeInMiliseconds)
                     {
                         break;
                     }
@@ -178,26 +164,27 @@ namespace Kafka.Client.IntegrationTests
         [Test]
         public void ZkAwareProducerSends1MessageUsingNotDefaultEncoder()
         {
+            var prodConfig = this.ZooKeeperBasedSyncProdConfig;
+
             int totalWaitTimeInMiliseconds = 0;
             int waitSingle = 100;
             string originalMessage = "TestData";
 
             var multipleBrokersHelper = new TestMultipleBrokersHelper(CurrentTestTopic);
-            multipleBrokersHelper.GetCurrentOffsets();
+            multipleBrokersHelper.GetCurrentOffsets(new[] { this.SyncProducerConfig1, this.SyncProducerConfig2, this.SyncProducerConfig3 });
 
-            var producerConfig = new ProducerConfig(clientConfig);
             var mockPartitioner = new MockAlwaysZeroPartitioner();
-            using (var producer = new Producer<string, string>(producerConfig, mockPartitioner, new StringEncoder(), null))
+            using (var producer = new Producer<string, string>(prodConfig, mockPartitioner, new StringEncoder(), null))
             {
                 var producerData = new ProducerData<string, string>(
                     CurrentTestTopic, "somekey", new List<string> { originalMessage });
                 producer.Send(producerData);
 
-                while (!multipleBrokersHelper.CheckIfAnyBrokerHasChanged())
+                while (!multipleBrokersHelper.CheckIfAnyBrokerHasChanged(new[] { this.SyncProducerConfig1, this.SyncProducerConfig2, this.SyncProducerConfig3 }))
                 {
                     totalWaitTimeInMiliseconds += waitSingle;
                     Thread.Sleep(waitSingle);
-                    if (totalWaitTimeInMiliseconds > MaxTestWaitTimeInMiliseconds)
+                    if (totalWaitTimeInMiliseconds > this.maxTestWaitTimeInMiliseconds)
                     {
                         Assert.Fail("None of the brokers changed their offset after sending a message");
                     }
@@ -205,12 +192,10 @@ namespace Kafka.Client.IntegrationTests
 
                 totalWaitTimeInMiliseconds = 0;
 
-                var consumerConfig = new ConsumerConfig(clientConfig)
-                    {
-                        Host = multipleBrokersHelper.BrokerThatHasChanged.Address,
-                        Port = multipleBrokersHelper.BrokerThatHasChanged.Port
-                    };
-                IConsumer consumer = new Consumers.Consumer(consumerConfig);
+                var consumerConfig = new ConsumerConfiguration(
+                    multipleBrokersHelper.BrokerThatHasChanged.Host,
+                    multipleBrokersHelper.BrokerThatHasChanged.Port);
+                IConsumer consumer = new Consumer(consumerConfig);
                 var request = new FetchRequest(CurrentTestTopic, 0, multipleBrokersHelper.OffsetFromBeforeTheChange);
                 BufferedMessageSet response;
 
@@ -224,7 +209,7 @@ namespace Kafka.Client.IntegrationTests
                     }
 
                     totalWaitTimeInMiliseconds += waitSingle;
-                    if (totalWaitTimeInMiliseconds >= MaxTestWaitTimeInMiliseconds)
+                    if (totalWaitTimeInMiliseconds >= this.maxTestWaitTimeInMiliseconds)
                     {
                         break;
                     }

Modified: incubator/kafka/trunk/clients/csharp/src/Kafka/Tests/Kafka.Client.IntegrationTests/ZooKeeperClientTests.cs
URL: http://svn.apache.org/viewvc/incubator/kafka/trunk/clients/csharp/src/Kafka/Tests/Kafka.Client.IntegrationTests/ZooKeeperClientTests.cs?rev=1185772&r1=1185771&r2=1185772&view=diff
==============================================================================
--- incubator/kafka/trunk/clients/csharp/src/Kafka/Tests/Kafka.Client.IntegrationTests/ZooKeeperClientTests.cs (original)
+++ incubator/kafka/trunk/clients/csharp/src/Kafka/Tests/Kafka.Client.IntegrationTests/ZooKeeperClientTests.cs Tue Oct 18 17:52:13 2011
@@ -35,16 +35,8 @@ namespace Kafka.Client.IntegrationTests
     internal class ZooKeeperClientTests : IntegrationFixtureBase, IZooKeeperDataListener, IZooKeeperStateListener, IZooKeeperChildListener
     {
         private static readonly ILog Logger = LogManager.GetLogger(MethodBase.GetCurrentMethod().DeclaringType);
-
-        private KafkaClientConfiguration clientConfig;
         private readonly IList<ZooKeeperEventArgs> events = new List<ZooKeeperEventArgs>();
 
-        [TestFixtureSetUp]
-        public void SetUp()
-        {
-            clientConfig = KafkaClientConfiguration.GetConfiguration();
-        }
-
         [SetUp]
         public void TestSetup()
         {
@@ -54,8 +46,12 @@ namespace Kafka.Client.IntegrationTests
         [Test]
         public void ZooKeeperClientCreateWorkerThreadsOnBeingCreated()
         {
-            var producerConfig = new ProducerConfig(clientConfig);
-            using (IZooKeeperClient client = new ZooKeeperClient(producerConfig.ZkConnect, producerConfig.ZkSessionTimeoutMs, ZooKeeperStringSerializer.Serializer))
+            var prodConfig = this.ZooKeeperBasedSyncProdConfig;
+
+            using (IZooKeeperClient client = new ZooKeeperClient(
+                prodConfig.ZooKeeper.ZkConnect, 
+                prodConfig.ZooKeeper.ZkSessionTimeoutMs, 
+                ZooKeeperStringSerializer.Serializer))
             {
                 client.Connect();
                 var eventWorker = ReflectionHelper.GetInstanceField<Thread>("eventWorker", client);
@@ -68,8 +64,12 @@ namespace Kafka.Client.IntegrationTests
         [Test]
         public void ZooKeeperClientFailsWhenCreatedWithWrongConnectionInfo()
         {
-            var producerConfig = new ProducerConfig(clientConfig);
-            using (IZooKeeperClient client = new ZooKeeperClient("random text", producerConfig.ZkSessionTimeoutMs, ZooKeeperStringSerializer.Serializer))
+            var prodConfig = this.ZooKeeperBasedSyncProdConfig;
+
+            using (IZooKeeperClient client = new ZooKeeperClient(
+                "random text", 
+                prodConfig.ZooKeeper.ZkSessionTimeoutMs, 
+                ZooKeeperStringSerializer.Serializer))
             {
                 Assert.Throws<FormatException>(client.Connect);
             }
@@ -78,8 +78,12 @@ namespace Kafka.Client.IntegrationTests
         [Test]
         public void WhenStateChangedToConnectedStateListenerFires()
         {
-            var producerConfig = new ProducerConfig(clientConfig);
-            using (IZooKeeperClient client = new ZooKeeperClient(producerConfig.ZkConnect, producerConfig.ZkSessionTimeoutMs, ZooKeeperStringSerializer.Serializer))
+            var prodConfig = this.ZooKeeperBasedSyncProdConfig;
+
+            using (IZooKeeperClient client = new ZooKeeperClient(
+                prodConfig.ZooKeeper.ZkConnect,
+                prodConfig.ZooKeeper.ZkSessionTimeoutMs, 
+                ZooKeeperStringSerializer.Serializer))
             {
                 client.Subscribe(this);
                 client.Connect();
@@ -96,8 +100,12 @@ namespace Kafka.Client.IntegrationTests
         [Test]
         public void WhenStateChangedToDisconnectedStateListenerFires()
         {
-            var producerConfig = new ProducerConfig(clientConfig);
-            using (IZooKeeperClient client = new ZooKeeperClient(producerConfig.ZkConnect, producerConfig.ZkSessionTimeoutMs, ZooKeeperStringSerializer.Serializer))
+            var prodConfig = this.ZooKeeperBasedSyncProdConfig;
+
+            using (IZooKeeperClient client = new ZooKeeperClient(
+                prodConfig.ZooKeeper.ZkConnect,
+                prodConfig.ZooKeeper.ZkSessionTimeoutMs, 
+                ZooKeeperStringSerializer.Serializer))
             {
                 client.Subscribe(this);
                 client.Connect();
@@ -116,8 +124,12 @@ namespace Kafka.Client.IntegrationTests
         [Test]
         public void WhenStateChangedToExpiredStateAndSessionListenersFire()
         {
-            var producerConfig = new ProducerConfig(clientConfig);
-            using (IZooKeeperClient client = new ZooKeeperClient(producerConfig.ZkConnect, producerConfig.ZkSessionTimeoutMs, ZooKeeperStringSerializer.Serializer))
+            var prodConfig = this.ZooKeeperBasedSyncProdConfig;
+
+            using (IZooKeeperClient client = new ZooKeeperClient(
+                prodConfig.ZooKeeper.ZkConnect,
+                prodConfig.ZooKeeper.ZkSessionTimeoutMs, 
+                ZooKeeperStringSerializer.Serializer))
             {
                 client.Subscribe(this);
                 client.Connect();
@@ -143,10 +155,14 @@ namespace Kafka.Client.IntegrationTests
         [Test]
         public void WhenSessionExpiredClientReconnects()
         {
-            var producerConfig = new ProducerConfig(clientConfig);
+            var prodConfig = this.ZooKeeperBasedSyncProdConfig;
+
             IZooKeeperConnection conn1;
             IZooKeeperConnection conn2;
-            using (IZooKeeperClient client = new ZooKeeperClient(producerConfig.ZkConnect, producerConfig.ZkSessionTimeoutMs, ZooKeeperStringSerializer.Serializer))
+            using (IZooKeeperClient client = new ZooKeeperClient(
+                prodConfig.ZooKeeper.ZkConnect, 
+                prodConfig.ZooKeeper.ZkSessionTimeoutMs, 
+                ZooKeeperStringSerializer.Serializer))
             {
                 client.Connect();
                 conn1 = ReflectionHelper.GetInstanceField<ZooKeeperConnection>("connection", client);
@@ -161,8 +177,12 @@ namespace Kafka.Client.IntegrationTests
         [Test]
         public void ZooKeeperClientChecksIfPathExists()
         {
-            var producerConfig = new ProducerConfig(clientConfig);
-            using (IZooKeeperClient client = new ZooKeeperClient(producerConfig.ZkConnect, producerConfig.ZkSessionTimeoutMs, ZooKeeperStringSerializer.Serializer))
+            var prodConfig = this.ZooKeeperBasedSyncProdConfig;
+
+            using (IZooKeeperClient client = new ZooKeeperClient(
+                prodConfig.ZooKeeper.ZkConnect,
+                prodConfig.ZooKeeper.ZkSessionTimeoutMs, 
+                ZooKeeperStringSerializer.Serializer))
             {
                 client.Connect();
                 Assert.IsTrue(client.Exists(ZooKeeperClient.DefaultBrokerTopicsPath, false));
@@ -172,8 +192,12 @@ namespace Kafka.Client.IntegrationTests
         [Test]
         public void ZooKeeperClientCreatesANewPathAndDeletesIt()
         {
-            var producerConfig = new ProducerConfig(clientConfig);
-            using (IZooKeeperClient client = new ZooKeeperClient(producerConfig.ZkConnect, producerConfig.ZkSessionTimeoutMs, ZooKeeperStringSerializer.Serializer))
+            var prodConfig = this.ZooKeeperBasedSyncProdConfig;
+
+            using (IZooKeeperClient client = new ZooKeeperClient(
+                prodConfig.ZooKeeper.ZkConnect,
+                prodConfig.ZooKeeper.ZkSessionTimeoutMs, 
+                ZooKeeperStringSerializer.Serializer))
             {
                 client.Connect();
                 string myPath = "/" + Guid.NewGuid();
@@ -187,9 +211,13 @@ namespace Kafka.Client.IntegrationTests
         [Test]
         public void WhenChildIsCreatedChilListenerOnParentFires()
         {
+            var prodConfig = this.ZooKeeperBasedSyncProdConfig;
+
             string myPath = "/" + Guid.NewGuid();
-            var producerConfig = new ProducerConfig(clientConfig);
-            using (IZooKeeperClient client = new ZooKeeperClient(producerConfig.ZkConnect, producerConfig.ZkSessionTimeoutMs, ZooKeeperStringSerializer.Serializer))
+            using (IZooKeeperClient client = new ZooKeeperClient(
+                prodConfig.ZooKeeper.ZkConnect,
+                prodConfig.ZooKeeper.ZkSessionTimeoutMs, 
+                ZooKeeperStringSerializer.Serializer))
             {
                 client.Connect();
                 WaitUntillIdle(client, 500);
@@ -212,9 +240,13 @@ namespace Kafka.Client.IntegrationTests
         [Test]
         public void WhenChildIsDeletedChildListenerOnParentFires()
         {
+            var prodConfig = this.ZooKeeperBasedSyncProdConfig;
+
             string myPath = "/" + Guid.NewGuid();
-            var producerConfig = new ProducerConfig(clientConfig);
-            using (IZooKeeperClient client = new ZooKeeperClient(producerConfig.ZkConnect, producerConfig.ZkSessionTimeoutMs, ZooKeeperStringSerializer.Serializer))
+            using (IZooKeeperClient client = new ZooKeeperClient(
+                prodConfig.ZooKeeper.ZkConnect,
+                prodConfig.ZooKeeper.ZkSessionTimeoutMs, 
+                ZooKeeperStringSerializer.Serializer))
             {
                 client.Connect();
                 client.CreatePersistent(myPath, true);
@@ -236,9 +268,13 @@ namespace Kafka.Client.IntegrationTests
         [Test]
         public void WhenZNodeIsDeletedChildAndDataDeletedListenersFire()
         {
-            var producerConfig = new ProducerConfig(clientConfig);
+            var prodConfig = this.ZooKeeperBasedSyncProdConfig;
+
             string myPath = "/" + Guid.NewGuid();
-            using (IZooKeeperClient client = new ZooKeeperClient(producerConfig.ZkConnect, producerConfig.ZkSessionTimeoutMs, ZooKeeperStringSerializer.Serializer))
+            using (IZooKeeperClient client = new ZooKeeperClient(
+                prodConfig.ZooKeeper.ZkConnect,
+                prodConfig.ZooKeeper.ZkSessionTimeoutMs, 
+                ZooKeeperStringSerializer.Serializer))
             {
                 client.Connect();
                 client.CreatePersistent(myPath, true);
@@ -265,8 +301,12 @@ namespace Kafka.Client.IntegrationTests
         [Test]
         public void ZooKeeperClientCreatesAChildAndGetsChildren()
         {
-            var producerConfig = new ProducerConfig(clientConfig);
-            using (IZooKeeperClient client = new ZooKeeperClient(producerConfig.ZkConnect, producerConfig.ZkSessionTimeoutMs, ZooKeeperStringSerializer.Serializer))
+            var prodConfig = this.ZooKeeperBasedSyncProdConfig;
+
+            using (IZooKeeperClient client = new ZooKeeperClient(
+                prodConfig.ZooKeeper.ZkConnect, 
+                prodConfig.ZooKeeper.ZkSessionTimeoutMs, 
+                ZooKeeperStringSerializer.Serializer))
             {
                 client.Connect();
                 string child = Guid.NewGuid().ToString();
@@ -284,11 +324,15 @@ namespace Kafka.Client.IntegrationTests
         [Test]
         public void WhenDataChangedDataListenerFires()
         {
-            var producerConfig = new ProducerConfig(clientConfig);
+            var prodConfig = this.ZooKeeperBasedSyncProdConfig;
+
             string myPath = "/" + Guid.NewGuid();
             string sourceData = "my test data";
             string resultData;
-            using (IZooKeeperClient client = new ZooKeeperClient(producerConfig.ZkConnect, producerConfig.ZkSessionTimeoutMs, ZooKeeperStringSerializer.Serializer))
+            using (IZooKeeperClient client = new ZooKeeperClient(
+                prodConfig.ZooKeeper.ZkConnect,
+                prodConfig.ZooKeeper.ZkSessionTimeoutMs, 
+                ZooKeeperStringSerializer.Serializer))
             {
                 client.Connect();
                 client.CreatePersistent(myPath, true);
@@ -317,11 +361,12 @@ namespace Kafka.Client.IntegrationTests
         [ExpectedException(typeof(ZooKeeperException))]
         public void WhenClientWillNotConnectWithinGivenTimeThrows()
         {
-            var producerConfig = new ProducerConfig(clientConfig);
+            var prodConfig = this.ZooKeeperBasedSyncProdConfig;
+
             using (IZooKeeperClient client = 
                 new ZooKeeperClient(
-                    producerConfig.ZkConnect, 
-                    producerConfig.ZkSessionTimeoutMs, 
+                    prodConfig.ZooKeeper.ZkConnect,
+                    prodConfig.ZooKeeper.ZkSessionTimeoutMs, 
                     ZooKeeperStringSerializer.Serializer,
                     1))
             {

Modified: incubator/kafka/trunk/clients/csharp/src/Kafka/Tests/Kafka.Client.IntegrationTests/ZooKeeperConnectionTests.cs
URL: http://svn.apache.org/viewvc/incubator/kafka/trunk/clients/csharp/src/Kafka/Tests/Kafka.Client.IntegrationTests/ZooKeeperConnectionTests.cs?rev=1185772&r1=1185771&r2=1185772&view=diff
==============================================================================
--- incubator/kafka/trunk/clients/csharp/src/Kafka/Tests/Kafka.Client.IntegrationTests/ZooKeeperConnectionTests.cs (original)
+++ incubator/kafka/trunk/clients/csharp/src/Kafka/Tests/Kafka.Client.IntegrationTests/ZooKeeperConnectionTests.cs Tue Oct 18 17:52:13 2011
@@ -25,21 +25,14 @@ namespace Kafka.Client.IntegrationTests
     using ZooKeeperNet;
 
     [TestFixture]
-    public class ZooKeeperConnectionTests
+    public class ZooKeeperConnectionTests : IntegrationFixtureBase
     {
-        private KafkaClientConfiguration clientConfig;
-
-        [TestFixtureSetUp]
-        public void SetUp()
-        {
-            clientConfig = KafkaClientConfiguration.GetConfiguration();
-        }
-
         [Test]
         public void ZooKeeperConnectionCreatesAndDeletesPath()
         {
-            var producerConfig = new ProducerConfig(clientConfig);
-            using (IZooKeeperConnection connection = new ZooKeeperConnection(producerConfig.ZkConnect))
+            var prodConfig = this.ZooKeeperBasedSyncProdConfig;
+
+            using (IZooKeeperConnection connection = new ZooKeeperConnection(prodConfig.ZooKeeper.ZkConnect))
             {
                 connection.Connect(null);
                 string pathName = "/" + Guid.NewGuid();
@@ -53,9 +46,10 @@ namespace Kafka.Client.IntegrationTests
         [Test]
         public void ZooKeeperConnectionConnectsAndDisposes()
         {
-            var producerConfig = new ProducerConfig(clientConfig);
+            var prodConfig = this.ZooKeeperBasedSyncProdConfig;
+
             IZooKeeperConnection connection;
-            using (connection = new ZooKeeperConnection(producerConfig.ZkConnect))
+            using (connection = new ZooKeeperConnection(prodConfig.ZooKeeper.ZkConnect))
             {
                 Assert.IsNull(connection.ClientState);
                 connection.Connect(null);
@@ -69,8 +63,9 @@ namespace Kafka.Client.IntegrationTests
         [Test]
         public void ZooKeeperConnectionCreatesAndGetsCreateTime()
         {
-            var producerConfig = new ProducerConfig(clientConfig);
-            using (IZooKeeperConnection connection = new ZooKeeperConnection(producerConfig.ZkConnect))
+            var prodConfig = this.ZooKeeperBasedSyncProdConfig;
+
+            using (IZooKeeperConnection connection = new ZooKeeperConnection(prodConfig.ZooKeeper.ZkConnect))
             {
                 connection.Connect(null);
                 string pathName = "/" + Guid.NewGuid();
@@ -84,8 +79,9 @@ namespace Kafka.Client.IntegrationTests
         [Test]
         public void ZooKeeperConnectionCreatesAndGetsChildren()
         {
-            var producerConfig = new ProducerConfig(clientConfig);
-            using (IZooKeeperConnection connection = new ZooKeeperConnection(producerConfig.ZkConnect))
+            var prodConfig = this.ZooKeeperBasedSyncProdConfig;
+
+            using (IZooKeeperConnection connection = new ZooKeeperConnection(prodConfig.ZooKeeper.ZkConnect))
             {
                 connection.Connect(null);
                 string child = Guid.NewGuid().ToString();
@@ -101,14 +97,15 @@ namespace Kafka.Client.IntegrationTests
         [Test]
         public void ZooKeeperConnectionWritesAndReadsData()
         {
-            var producerConfig = new ProducerConfig(clientConfig);
-            using (IZooKeeperConnection connection = new ZooKeeperConnection(producerConfig.ZkConnect))
+            var prodConfig = this.ZooKeeperBasedSyncProdConfig;
+
+            using (IZooKeeperConnection connection = new ZooKeeperConnection(prodConfig.ZooKeeper.ZkConnect))
             {
                 connection.Connect(null);
                 string child = Guid.NewGuid().ToString();
                 string pathName = "/" + child;
                 connection.Create(pathName, null, CreateMode.Persistent);
-                var sourceData = new byte[2] { 1, 2 };
+                var sourceData = new byte[] { 1, 2 };
                 connection.WriteData(pathName, sourceData);
                 byte[] resultData = connection.ReadData(pathName, null, false);
                 Assert.IsNotNull(resultData);

Modified: incubator/kafka/trunk/clients/csharp/src/Kafka/Tests/Kafka.Client.Tests/Kafka.Client.Tests.csproj
URL: http://svn.apache.org/viewvc/incubator/kafka/trunk/clients/csharp/src/Kafka/Tests/Kafka.Client.Tests/Kafka.Client.Tests.csproj?rev=1185772&r1=1185771&r2=1185772&view=diff
==============================================================================
--- incubator/kafka/trunk/clients/csharp/src/Kafka/Tests/Kafka.Client.Tests/Kafka.Client.Tests.csproj (original)
+++ incubator/kafka/trunk/clients/csharp/src/Kafka/Tests/Kafka.Client.Tests/Kafka.Client.Tests.csproj Tue Oct 18 17:52:13 2011
@@ -79,10 +79,12 @@
       <HintPath>..\..\..\..\lib\nunit\2.5.9\nunit.framework.dll</HintPath>
     </Reference>
     <Reference Include="System" />
+    <Reference Include="System.configuration" />
     <Reference Include="System.Core" />
     <Reference Include="Microsoft.CSharp" />
   </ItemGroup>
   <ItemGroup>
+    <Compile Include="CompressionTests.cs" />
     <Compile Include="MessageSetTests.cs" />
     <Compile Include="MessageTests.cs" />
     <Compile Include="Producers\PartitioningTests.cs" />
@@ -103,6 +105,14 @@
   <ItemGroup>
     <Folder Include="ZooKeeper\" />
   </ItemGroup>
+  <ItemGroup>
+    <None Include="..\..\..\..\Settings.StyleCop">
+      <Link>Settings.StyleCop</Link>
+    </None>
+    <None Include="App.config">
+      <SubType>Designer</SubType>
+    </None>
+  </ItemGroup>
   <Import Project="$(MSBuildToolsPath)\Microsoft.CSharp.targets" />
   <Import Project="..\..\..\..\lib\StyleCop\Microsoft.StyleCop.Targets" />
-</Project>
\ No newline at end of file
+</Project>

Modified: incubator/kafka/trunk/clients/csharp/src/Kafka/Tests/Kafka.Client.Tests/MessageSetTests.cs
URL: http://svn.apache.org/viewvc/incubator/kafka/trunk/clients/csharp/src/Kafka/Tests/Kafka.Client.Tests/MessageSetTests.cs?rev=1185772&r1=1185771&r2=1185772&view=diff
==============================================================================
--- incubator/kafka/trunk/clients/csharp/src/Kafka/Tests/Kafka.Client.Tests/MessageSetTests.cs (original)
+++ incubator/kafka/trunk/clients/csharp/src/Kafka/Tests/Kafka.Client.Tests/MessageSetTests.cs Tue Oct 18 17:52:13 2011
@@ -29,12 +29,14 @@ namespace Kafka.Client.Tests
     {
         private const int MessageLengthPartLength = 4;
         private const int MagicNumberPartLength = 1;
+        private const int AttributesPartLength = 1;
         private const int ChecksumPartLength = 4;
-
+        
         private const int MessageLengthPartOffset = 0;
         private const int MagicNumberPartOffset = 4;
-        private const int ChecksumPartOffset = 5;
-        private const int DataPartOffset = 9;
+        private const int AttributesPartOffset = 5;
+        private const int ChecksumPartOffset = 6;
+        private const int DataPartOffset = 10;
 
         [Test]
         public void BufferedMessageSetWriteToValidSequence()
@@ -55,9 +57,9 @@ namespace Kafka.Client.Tests
                 Array.Reverse(messageLength);
             }
 
-            Assert.AreEqual(MagicNumberPartLength + ChecksumPartLength + messageBytes.Length, BitConverter.ToInt32(messageLength, 0));
+            Assert.AreEqual(MagicNumberPartLength + AttributesPartLength + ChecksumPartLength + messageBytes.Length, BitConverter.ToInt32(messageLength, 0));
 
-            Assert.AreEqual(0, ms.ToArray()[MagicNumberPartOffset]);    // default magic number should be 0
+            Assert.AreEqual(1, ms.ToArray()[MagicNumberPartOffset]);    // default magic number should be 1
 
             byte[] checksumPart = new byte[ChecksumPartLength];
             Array.Copy(ms.ToArray(), ChecksumPartOffset, checksumPart, 0, ChecksumPartLength);
@@ -68,7 +70,7 @@ namespace Kafka.Client.Tests
             Assert.AreEqual(messageBytes, dataPart);
 
             ////second message
-            int secondMessageOffset = MessageLengthPartLength + MagicNumberPartLength + ChecksumPartLength +
+            int secondMessageOffset = MessageLengthPartLength + MagicNumberPartLength + AttributesPartLength + ChecksumPartLength +
                                       messageBytes.Length;
 
             messageLength = new byte[MessageLengthPartLength];
@@ -78,9 +80,9 @@ namespace Kafka.Client.Tests
                 Array.Reverse(messageLength);
             }
 
-            Assert.AreEqual(MagicNumberPartLength + ChecksumPartLength + messageBytes.Length, BitConverter.ToInt32(messageLength, 0));
+            Assert.AreEqual(MagicNumberPartLength + AttributesPartLength + ChecksumPartLength + messageBytes.Length, BitConverter.ToInt32(messageLength, 0));
 
-            Assert.AreEqual(0, ms.ToArray()[secondMessageOffset + MagicNumberPartOffset]);    // default magic number should be 0
+            Assert.AreEqual(1, ms.ToArray()[secondMessageOffset + MagicNumberPartOffset]);    // default magic number should be 1
 
             checksumPart = new byte[ChecksumPartLength];
             Array.Copy(ms.ToArray(), secondMessageOffset + ChecksumPartOffset, checksumPart, 0, ChecksumPartLength);
@@ -99,7 +101,7 @@ namespace Kafka.Client.Tests
             Message msg2 = new Message(messageBytes);
             MessageSet messageSet = new BufferedMessageSet(new List<Message>() { msg1, msg2 });
             Assert.AreEqual(
-                2 * (MessageLengthPartLength + MagicNumberPartLength + ChecksumPartLength + messageBytes.Length),
+                2 * (MessageLengthPartLength + MagicNumberPartLength + AttributesPartLength + ChecksumPartLength + messageBytes.Length),
                 messageSet.SetSize);
         }
     }

Modified: incubator/kafka/trunk/clients/csharp/src/Kafka/Tests/Kafka.Client.Tests/MessageTests.cs
URL: http://svn.apache.org/viewvc/incubator/kafka/trunk/clients/csharp/src/Kafka/Tests/Kafka.Client.Tests/MessageTests.cs?rev=1185772&r1=1185771&r2=1185772&view=diff
==============================================================================
--- incubator/kafka/trunk/clients/csharp/src/Kafka/Tests/Kafka.Client.Tests/MessageTests.cs (original)
+++ incubator/kafka/trunk/clients/csharp/src/Kafka/Tests/Kafka.Client.Tests/MessageTests.cs Tue Oct 18 17:52:13 2011
@@ -34,8 +34,8 @@ namespace Kafka.Client.Tests
         private readonly int ChecksumPartLength = 4;
 
         private readonly int MagicNumberPartOffset = 0;
-        private readonly int ChecksumPartOffset = 1;
-        private readonly int DataPartOffset = 5;
+        private readonly int ChecksumPartOffset = 2;
+        private readonly int DataPartOffset = 6;
 
         /// <summary>
         /// Demonstrates a properly parsed message.
@@ -46,16 +46,18 @@ namespace Kafka.Client.Tests
             Crc32Hasher crc32 = new Crc32Hasher();
 
             string payload = "kafka";
-            byte magic = 0;
+            byte magic = 1;
+            byte attributes = 0;
             byte[] payloadData = Encoding.UTF8.GetBytes(payload);
             byte[] payloadSize = BitConverter.GetBytes(payloadData.Length);
             byte[] checksum = crc32.ComputeHash(payloadData);
-            byte[] messageData = new byte[payloadData.Length + 1 + payloadSize.Length + checksum.Length];
+            byte[] messageData = new byte[payloadData.Length + 2 + payloadSize.Length + checksum.Length];
 
             Buffer.BlockCopy(payloadSize, 0, messageData, 0, payloadSize.Length);
             messageData[4] = magic;
-            Buffer.BlockCopy(checksum, 0, messageData, payloadSize.Length + 1, checksum.Length);
-            Buffer.BlockCopy(payloadData, 0, messageData, payloadSize.Length + 1 + checksum.Length, payloadData.Length);
+            messageData[5] = attributes;
+            Buffer.BlockCopy(checksum, 0, messageData, payloadSize.Length + 2, checksum.Length);
+            Buffer.BlockCopy(payloadData, 0, messageData, payloadSize.Length + 2 + checksum.Length, payloadData.Length);
 
             Message message = Message.ParseFrom(messageData);
 
@@ -71,22 +73,25 @@ namespace Kafka.Client.Tests
         [Test]
         public void GetBytesValidSequence()
         {
-            Message message = new Message(new byte[10], (byte)245);
+            Message message = new Message(new byte[10], CompressionCodecs.NoCompressionCodec);
 
             MemoryStream ms = new MemoryStream();
             message.WriteTo(ms);
 
             // len(payload) + 1 + 4
-            Assert.AreEqual(15, ms.Length);
+            Assert.AreEqual(16, ms.Length);
 
             // first 4 bytes = the magic number
-            Assert.AreEqual((byte)245, ms.ToArray()[0]);
+            Assert.AreEqual((byte)1, ms.ToArray()[0]);
+
+            // attributes
+            Assert.AreEqual((byte)0, ms.ToArray()[1]);
 
             // next 4 bytes = the checksum
-            Assert.IsTrue(message.Checksum.SequenceEqual(ms.ToArray().Skip(1).Take(4).ToArray<byte>()));
+            Assert.IsTrue(message.Checksum.SequenceEqual(ms.ToArray().Skip(2).Take(4).ToArray<byte>()));
 
             // remaining bytes = the payload
-            Assert.AreEqual(10, ms.ToArray().Skip(5).ToArray<byte>().Length);
+            Assert.AreEqual(10, ms.ToArray().Skip(6).ToArray<byte>().Length);
         }
 
         [Test]
@@ -97,12 +102,14 @@ namespace Kafka.Client.Tests
             MemoryStream ms = new MemoryStream();
             message.WriteTo(ms);
 
-            Assert.AreEqual(0, ms.ToArray()[MagicNumberPartOffset]);    // default magic number should be 0
+            Assert.AreEqual(1, ms.ToArray()[MagicNumberPartOffset]);    // default magic number should be 1
 
             byte[] checksumPart = new byte[ChecksumPartLength];
             Array.Copy(ms.ToArray(), ChecksumPartOffset, checksumPart, 0, ChecksumPartLength);
             Assert.AreEqual(Crc32Hasher.Compute(messageBytes), checksumPart);
 
+            message.ToString();
+
             byte[] dataPart = new byte[messageBytes.Length];
             Array.Copy(ms.ToArray(), DataPartOffset, dataPart, 0, messageBytes.Length);
             Assert.AreEqual(messageBytes, dataPart);
@@ -113,11 +120,11 @@ namespace Kafka.Client.Tests
         {
             byte[] messageBytes = new byte[] { 1, 2, 3, 4, 5, 6, 7, 8, 9 };
             byte[] customChecksum = new byte[] { 3, 4, 5, 6 };
-            Message message = new Message(messageBytes, (byte)33, customChecksum);
+            Message message = new Message(messageBytes, customChecksum);
             MemoryStream ms = new MemoryStream();
             message.WriteTo(ms);
 
-            Assert.AreEqual((byte)33, ms.ToArray()[MagicNumberPartOffset]);
+            Assert.AreEqual((byte)1, ms.ToArray()[MagicNumberPartOffset]);
 
             byte[] checksumPart = new byte[ChecksumPartLength];
             Array.Copy(ms.ToArray(), ChecksumPartOffset, checksumPart, 0, ChecksumPartLength);

Modified: incubator/kafka/trunk/clients/csharp/src/Kafka/Tests/Kafka.Client.Tests/Producers/PartitioningTests.cs
URL: http://svn.apache.org/viewvc/incubator/kafka/trunk/clients/csharp/src/Kafka/Tests/Kafka.Client.Tests/Producers/PartitioningTests.cs?rev=1185772&r1=1185771&r2=1185772&view=diff
==============================================================================
--- incubator/kafka/trunk/clients/csharp/src/Kafka/Tests/Kafka.Client.Tests/Producers/PartitioningTests.cs (original)
+++ incubator/kafka/trunk/clients/csharp/src/Kafka/Tests/Kafka.Client.Tests/Producers/PartitioningTests.cs Tue Oct 18 17:52:13 2011
@@ -21,17 +21,20 @@ namespace Kafka.Client.Tests.Producers
     using Kafka.Client.Cluster;
     using Kafka.Client.Producers.Partitioning;
     using NUnit.Framework;
+    using System.Collections.Generic;
 
     [TestFixture]
     public class PartitioningTests
     {
-        private ProducerConfig config;
+        private ProducerConfiguration config;
 
         [TestFixtureSetUp]
         public void SetUp()
         {
-            config = new ProducerConfig();
-            config.BrokerPartitionInfo = "1:192.168.0.1:1234,2:192.168.0.2:3456";
+            var brokers = new List<BrokerConfiguration>();
+            brokers.Add(new BrokerConfiguration { BrokerId = 1, Host = "192.168.0.1", Port = 1234 });
+            brokers.Add(new BrokerConfiguration { BrokerId = 2, Host = "192.168.0.2", Port = 3456 });
+            config = new ProducerConfiguration(brokers);
         }
 
         [Test]

Modified: incubator/kafka/trunk/clients/csharp/src/Kafka/Tests/Kafka.Client.Tests/Request/MultiProducerRequestTests.cs
URL: http://svn.apache.org/viewvc/incubator/kafka/trunk/clients/csharp/src/Kafka/Tests/Kafka.Client.Tests/Request/MultiProducerRequestTests.cs?rev=1185772&r1=1185771&r2=1185772&view=diff
==============================================================================
--- incubator/kafka/trunk/clients/csharp/src/Kafka/Tests/Kafka.Client.Tests/Request/MultiProducerRequestTests.cs (original)
+++ incubator/kafka/trunk/clients/csharp/src/Kafka/Tests/Kafka.Client.Tests/Request/MultiProducerRequestTests.cs Tue Oct 18 17:52:13 2011
@@ -54,10 +54,10 @@ namespace Kafka.Client.Tests.Request
             request.WriteTo(ms);
             byte[] bytes = ms.ToArray();
             Assert.IsNotNull(bytes);
-            Assert.AreEqual(152, bytes.Length);
+            Assert.AreEqual(156, bytes.Length);
 
             // first 4 bytes = the length of the request
-            Assert.AreEqual(148, BitConverter.ToInt32(BitWorks.ReverseBytes(bytes.Take(4).ToArray<byte>()), 0));
+            Assert.AreEqual(152, BitConverter.ToInt32(BitWorks.ReverseBytes(bytes.Take(4).ToArray<byte>()), 0));
 
             // next 2 bytes = the RequestType which in this case should be Produce
             Assert.AreEqual((short)RequestTypes.MultiProduce, BitConverter.ToInt16(BitWorks.ReverseBytes(bytes.Skip(4).Take(2).ToArray<byte>()), 0));

Modified: incubator/kafka/trunk/clients/csharp/src/Kafka/Tests/Kafka.Client.Tests/Request/ProducerRequestTests.cs
URL: http://svn.apache.org/viewvc/incubator/kafka/trunk/clients/csharp/src/Kafka/Tests/Kafka.Client.Tests/Request/ProducerRequestTests.cs?rev=1185772&r1=1185771&r2=1185772&view=diff
==============================================================================
--- incubator/kafka/trunk/clients/csharp/src/Kafka/Tests/Kafka.Client.Tests/Request/ProducerRequestTests.cs (original)
+++ incubator/kafka/trunk/clients/csharp/src/Kafka/Tests/Kafka.Client.Tests/Request/ProducerRequestTests.cs Tue Oct 18 17:52:13 2011
@@ -50,10 +50,10 @@ namespace Kafka.Client.Tests.Request
 
             byte[] bytes = ms.ToArray();
             Assert.IsNotNull(bytes);
-            Assert.AreEqual(40, bytes.Length);
+            Assert.AreEqual(41, bytes.Length);
 
             // next 4 bytes = the length of the request
-            Assert.AreEqual(36, BitConverter.ToInt32(BitWorks.ReverseBytes(bytes.Take(4).ToArray<byte>()), 0));
+            Assert.AreEqual(37, BitConverter.ToInt32(BitWorks.ReverseBytes(bytes.Take(4).ToArray<byte>()), 0));
 
             // next 2 bytes = the RequestType which in this case should be Produce
             Assert.AreEqual((short)RequestTypes.Produce, BitConverter.ToInt16(BitWorks.ReverseBytes(bytes.Skip(4).Take(2).ToArray<byte>()), 0));
@@ -68,10 +68,10 @@ namespace Kafka.Client.Tests.Request
             Assert.AreEqual(0, BitConverter.ToInt32(BitWorks.ReverseBytes(bytes.Skip(13).Take(4).ToArray<byte>()), 0));
 
             // next 4 bytes = the length of the individual messages in the pack
-            Assert.AreEqual(19, BitConverter.ToInt32(BitWorks.ReverseBytes(bytes.Skip(17).Take(4).ToArray<byte>()), 0));
+            Assert.AreEqual(20, BitConverter.ToInt32(BitWorks.ReverseBytes(bytes.Skip(17).Take(4).ToArray<byte>()), 0));
 
             // fianl bytes = the individual messages in the pack
-            Assert.AreEqual(19, bytes.Skip(21).ToArray<byte>().Length);
+            Assert.AreEqual(20, bytes.Skip(21).ToArray<byte>().Length);
         }
     }
 }