You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by ju...@apache.org on 2011/10/18 19:52:16 UTC

svn commit: r1185772 [3/4] - in /incubator/kafka/trunk/clients/csharp/src/Kafka: Kafka.Client/ Kafka.Client/Cfg/ Kafka.Client/Consumers/ Kafka.Client/Exceptions/ Kafka.Client/Messages/ Kafka.Client/Producers/ Kafka.Client/Producers/Async/ Kafka.Client/...

Modified: incubator/kafka/trunk/clients/csharp/src/Kafka/Tests/Kafka.Client.IntegrationTests/ConsumerRebalancingTests.cs
URL: http://svn.apache.org/viewvc/incubator/kafka/trunk/clients/csharp/src/Kafka/Tests/Kafka.Client.IntegrationTests/ConsumerRebalancingTests.cs?rev=1185772&r1=1185771&r2=1185772&view=diff
==============================================================================
--- incubator/kafka/trunk/clients/csharp/src/Kafka/Tests/Kafka.Client.IntegrationTests/ConsumerRebalancingTests.cs (original)
+++ incubator/kafka/trunk/clients/csharp/src/Kafka/Tests/Kafka.Client.IntegrationTests/ConsumerRebalancingTests.cs Tue Oct 18 17:52:13 2011
@@ -19,8 +19,6 @@ namespace Kafka.Client.IntegrationTests
 {
     using System.Collections.Generic;
     using System.Linq;
-    using System.Threading;
-    using Kafka.Client.Cfg;
     using Kafka.Client.Cluster;
     using Kafka.Client.Consumers;
     using Kafka.Client.Utils;
@@ -30,24 +28,13 @@ namespace Kafka.Client.IntegrationTests
     [TestFixture]
     public class ConsumerRebalancingTests : IntegrationFixtureBase
     {
-        /// <summary>
-        /// Kafka Client configuration
-        /// </summary>
-        private static KafkaClientConfiguration clientConfig;
-
-        [TestFixtureSetUp]
-        public void SetUp()
-        {
-            clientConfig = KafkaClientConfiguration.GetConfiguration();
-        }
-
         [Test]
         public void ConsumerPorformsRebalancingOnStart()
         {
-            var config = new ConsumerConfig(clientConfig) { AutoCommit = false, GroupId = "group1" };
+            var config = this.ZooKeeperBasedConsumerConfig;
             using (var consumerConnector = new ZookeeperConsumerConnector(config, true))
             {
-                ZooKeeperClient client = ReflectionHelper.GetInstanceField<ZooKeeperClient>("zkClient", consumerConnector);
+                var client = ReflectionHelper.GetInstanceField<ZooKeeperClient>("zkClient", consumerConnector);
                 Assert.IsNotNull(client);
                 client.DeleteRecursive("/consumers/group1");
                 var topicCount = new Dictionary<string, int> { { "test", 1 } };
@@ -71,7 +58,7 @@ namespace Kafka.Client.IntegrationTests
                 Assert.That(children, Is.Not.Null.And.Not.Empty);
                 Assert.That(children.Count, Is.EqualTo(2));
                 string partId = children[0];
-                string data = client.ReadData<string>("/consumers/group1/owners/test/" + partId);
+                var data = client.ReadData<string>("/consumers/group1/owners/test/" + partId);
                 Assert.That(data, Is.Not.Null.And.Not.Empty);
                 Assert.That(data, Contains.Substring(consumerId));
                 data = client.ReadData<string>("/consumers/group1/ids/" + consumerId);
@@ -79,7 +66,7 @@ namespace Kafka.Client.IntegrationTests
                 Assert.That(data, Is.EqualTo("{ \"test\": 1 }"));
             }
 
-            using (var client = new ZooKeeperClient(config.ZkConnect, config.ZkSessionTimeoutMs, ZooKeeperStringSerializer.Serializer))
+            using (var client = new ZooKeeperClient(config.ZooKeeper.ZkConnect, config.ZooKeeper.ZkSessionTimeoutMs, ZooKeeperStringSerializer.Serializer))
             {
                 client.Connect();
                 //// Should be created as ephemeral
@@ -94,13 +81,7 @@ namespace Kafka.Client.IntegrationTests
         [Test]
         public void ConsumerPorformsRebalancingWhenNewBrokerIsAddedToTopic()
         {
-            var config = new ConsumerConfig(clientConfig)
-                             {
-                                 AutoCommit = false,
-                                 GroupId = "group1",
-                                 ZkSessionTimeoutMs = 60000,
-                                 ZkConnectionTimeoutMs = 60000
-                             };
+            var config = this.ZooKeeperBasedConsumerConfig;
             string brokerPath = ZooKeeperClient.DefaultBrokerIdsPath + "/" + 2345;
             string brokerTopicPath = ZooKeeperClient.DefaultBrokerTopicsPath + "/test/" + 2345;
             using (var consumerConnector = new ZookeeperConsumerConnector(config, true))
@@ -120,7 +101,7 @@ namespace Kafka.Client.IntegrationTests
                 children = client.GetChildren("/consumers/group1/owners/test", false);
                 Assert.That(children.Count, Is.EqualTo(3));
                 Assert.That(children, Contains.Item("2345-0"));
-                string data = client.ReadData<string>("/consumers/group1/owners/test/2345-0");
+                var data = client.ReadData<string>("/consumers/group1/owners/test/2345-0");
                 Assert.That(data, Is.Not.Null);
                 Assert.That(data, Contains.Substring(consumerId));
                 var topicRegistry =
@@ -138,7 +119,7 @@ namespace Kafka.Client.IntegrationTests
         [Test]
         public void ConsumerPorformsRebalancingWhenBrokerIsRemovedFromTopic()
         {
-            var config = new ConsumerConfig(clientConfig) { AutoCommit = false, GroupId = "group1", ZkSessionTimeoutMs = 60000, ZkConnectionTimeoutMs = 60000 };
+            var config = this.ZooKeeperBasedConsumerConfig;
             string brokerPath = ZooKeeperClient.DefaultBrokerIdsPath + "/" + 2345;
             string brokerTopicPath = ZooKeeperClient.DefaultBrokerTopicsPath + "/test/" + 2345;
             using (var consumerConnector = new ZookeeperConsumerConnector(config, true))
@@ -170,14 +151,7 @@ namespace Kafka.Client.IntegrationTests
         [Test]
         public void ConsumerPerformsRebalancingWhenNewConsumerIsAddedAndTheyDividePartitions()
         {
-            var config = new ConsumerConfig(clientConfig)
-            {
-                AutoCommit = false,
-                GroupId = "group1",
-                ZkSessionTimeoutMs = 60000,
-                ZkConnectionTimeoutMs = 60000
-            };
-
+            var config = this.ZooKeeperBasedConsumerConfig;
             IList<string> ids;
             IList<string> owners;
             using (var consumerConnector = new ZookeeperConsumerConnector(config, true))
@@ -216,14 +190,8 @@ namespace Kafka.Client.IntegrationTests
         [Test]
         public void ConsumerPerformsRebalancingWhenConsumerIsRemovedAndTakesItsPartitions()
         {
-            var config = new ConsumerConfig(clientConfig)
-            {
-                AutoCommit = false,
-                GroupId = "group1",
-                ZkSessionTimeoutMs = 60000,
-                ZkConnectionTimeoutMs = 60000
-            };
-
+            var config = this.ZooKeeperBasedConsumerConfig;
+            string basePath = "/consumers/" + config.GroupId;
             IList<string> ids;
             IList<string> owners;
             using (var consumerConnector = new ZookeeperConsumerConnector(config, true))

Modified: incubator/kafka/trunk/clients/csharp/src/Kafka/Tests/Kafka.Client.IntegrationTests/ConsumerTests.cs
URL: http://svn.apache.org/viewvc/incubator/kafka/trunk/clients/csharp/src/Kafka/Tests/Kafka.Client.IntegrationTests/ConsumerTests.cs?rev=1185772&r1=1185771&r2=1185772&view=diff
==============================================================================
--- incubator/kafka/trunk/clients/csharp/src/Kafka/Tests/Kafka.Client.IntegrationTests/ConsumerTests.cs (original)
+++ incubator/kafka/trunk/clients/csharp/src/Kafka/Tests/Kafka.Client.IntegrationTests/ConsumerTests.cs Tue Oct 18 17:52:13 2011
@@ -23,7 +23,6 @@ namespace Kafka.Client.IntegrationTests
     using System.Reflection;
     using System.Text;
     using System.Threading;
-    using Kafka.Client.Cfg;
     using Kafka.Client.Consumers;
     using Kafka.Client.Exceptions;
     using Kafka.Client.Messages;
@@ -34,21 +33,10 @@ namespace Kafka.Client.IntegrationTests
     [TestFixture]
     public class ConsumerTests : IntegrationFixtureBase
     {
-        /// <summary>
-        /// Kafka Client configuration
-        /// </summary>
-        private static KafkaClientConfiguration clientConfig;
-
-        [TestFixtureSetUp]
-        public void SetUp()
-        {
-            clientConfig = KafkaClientConfiguration.GetConfiguration();
-        }
-
         [Test]
         public void ConsumerConnectorIsCreatedConnectsDisconnectsAndShutsDown()
         {
-            var config = new ConsumerConfig(clientConfig);
+            var config = this.ZooKeeperBasedConsumerConfig;
             using (new ZookeeperConsumerConnector(config, true))
             {
             }
@@ -57,6 +45,9 @@ namespace Kafka.Client.IntegrationTests
         [Test]
         public void SimpleSyncProducerSends2MessagesAndConsumerConnectorGetsThemBack()
         {
+            var prodConfig = this.SyncProducerConfig1;
+            var consumerConfig = this.ZooKeeperBasedConsumerConfig;
+
             // first producing
             string payload1 = "kafka 1.";
             byte[] payloadData1 = Encoding.UTF8.GetBytes(payload1);
@@ -66,15 +57,15 @@ namespace Kafka.Client.IntegrationTests
             byte[] payloadData2 = Encoding.UTF8.GetBytes(payload2);
             var msg2 = new Message(payloadData2);
 
-            var producerConfig = new SyncProducerConfig(clientConfig);
-            var producer = new SyncProducer(producerConfig);
             var producerRequest = new ProducerRequest(CurrentTestTopic, 0, new List<Message> { msg1, msg2 });
-            producer.Send(producerRequest);
+            using (var producer = new SyncProducer(prodConfig))
+            {
+                producer.Send(producerRequest);
+            }
 
             // now consuming
-            var config = new ConsumerConfig(clientConfig) { AutoCommit = false };
             var resultMessages = new List<Message>();
-            using (IConsumerConnector consumerConnector = new ZookeeperConsumerConnector(config, true))
+            using (IConsumerConnector consumerConnector = new ZookeeperConsumerConnector(consumerConfig, true))
             {
                 var topicCount = new Dictionary<string, int> { { CurrentTestTopic, 1 } };
                 var messages = consumerConnector.CreateMessageStreams(topicCount);
@@ -103,53 +94,57 @@ namespace Kafka.Client.IntegrationTests
         [Test]
         public void OneMessageIsSentAndReceivedThenExceptionsWhenNoMessageThenAnotherMessageIsSentAndReceived()
         {
+            var prodConfig = this.SyncProducerConfig1;
+            var consumerConfig = this.ZooKeeperBasedConsumerConfig;
+
             // first producing
             string payload1 = "kafka 1.";
             byte[] payloadData1 = Encoding.UTF8.GetBytes(payload1);
             var msg1 = new Message(payloadData1);
-
-            var producerConfig = new SyncProducerConfig(clientConfig);
-            var producer = new SyncProducer(producerConfig);
-            var producerRequest = new ProducerRequest(CurrentTestTopic, 0, new List<Message> { msg1 });
-            producer.Send(producerRequest);
-
-            // now consuming
-            var config = new ConsumerConfig(clientConfig) { AutoCommit = false, Timeout = 5000 };
-            using (IConsumerConnector consumerConnector = new ZookeeperConsumerConnector(config, true))
+            using (var producer = new SyncProducer(prodConfig))
             {
-                var topicCount = new Dictionary<string, int> { { CurrentTestTopic, 1 } };
-                var messages = consumerConnector.CreateMessageStreams(topicCount);
-                var sets = messages[CurrentTestTopic];
-                KafkaMessageStream myStream = sets[0];
-                var enumerator = myStream.GetEnumerator();
+                var producerRequest = new ProducerRequest(CurrentTestTopic, 0, new List<Message> { msg1 });
+                producer.Send(producerRequest);
 
-                Assert.IsTrue(enumerator.MoveNext());
-                Assert.AreEqual(msg1.ToString(), enumerator.Current.ToString());
+                // now consuming
+                using (IConsumerConnector consumerConnector = new ZookeeperConsumerConnector(consumerConfig, true))
+                {
+                    var topicCount = new Dictionary<string, int> { { CurrentTestTopic, 1 } };
+                    var messages = consumerConnector.CreateMessageStreams(topicCount);
+                    var sets = messages[CurrentTestTopic];
+                    KafkaMessageStream myStream = sets[0];
+                    var enumerator = myStream.GetEnumerator();
 
-                Assert.Throws<ConsumerTimeoutException>(() => enumerator.MoveNext());
+                    Assert.IsTrue(enumerator.MoveNext());
+                    Assert.AreEqual(msg1.ToString(), enumerator.Current.ToString());
 
-                Assert.Throws<Exception>(() => enumerator.MoveNext()); // iterator is in failed state
+                    Assert.Throws<ConsumerTimeoutException>(() => enumerator.MoveNext());
 
-                enumerator.Reset();
+                    Assert.Throws<IllegalStateException>(() => enumerator.MoveNext()); // iterator is in failed state
 
-                // producing again
-                string payload2 = "kafka 2.";
-                byte[] payloadData2 = Encoding.UTF8.GetBytes(payload2);
-                var msg2 = new Message(payloadData2);
+                    enumerator.Reset();
 
-                var producerRequest2 = new ProducerRequest(CurrentTestTopic, 0, new List<Message> { msg2 });
-                producer.Send(producerRequest2);
+                    // producing again
+                    string payload2 = "kafka 2.";
+                    byte[] payloadData2 = Encoding.UTF8.GetBytes(payload2);
+                    var msg2 = new Message(payloadData2);
 
-                Thread.Sleep(3000);
+                    var producerRequest2 = new ProducerRequest(CurrentTestTopic, 0, new List<Message> { msg2 });
+                    producer.Send(producerRequest2);
+                    Thread.Sleep(3000);
 
-                Assert.IsTrue(enumerator.MoveNext());
-                Assert.AreEqual(msg2.ToString(), enumerator.Current.ToString());
+                    Assert.IsTrue(enumerator.MoveNext());
+                    Assert.AreEqual(msg2.ToString(), enumerator.Current.ToString());
+                }
             }
         }
 
         [Test]
         public void ConsumerConnectorConsumesTwoDifferentTopics()
         {
+            var prodConfig = this.SyncProducerConfig1;
+            var consumerConfig = this.ZooKeeperBasedConsumerConfig;
+
             string topic1 = CurrentTestTopic + "1";
             string topic2 = CurrentTestTopic + "2";
 
@@ -162,18 +157,18 @@ namespace Kafka.Client.IntegrationTests
             byte[] payloadData2 = Encoding.UTF8.GetBytes(payload2);
             var msg2 = new Message(payloadData2);
 
-            var producerConfig = new SyncProducerConfig(clientConfig);
-            var producer = new SyncProducer(producerConfig);
-            var producerRequest1 = new ProducerRequest(topic1, 0, new List<Message> { msg1 });
-            producer.Send(producerRequest1);
-            var producerRequest2 = new ProducerRequest(topic2, 0, new List<Message> { msg2 });
-            producer.Send(producerRequest2);
+            using (var producer = new SyncProducer(prodConfig))
+            {
+                var producerRequest1 = new ProducerRequest(topic1, 0, new List<Message> { msg1 });
+                producer.Send(producerRequest1);
+                var producerRequest2 = new ProducerRequest(topic2, 0, new List<Message> { msg2 });
+                producer.Send(producerRequest2);
+            }
 
             // now consuming
-            var config = new ConsumerConfig(clientConfig) { AutoCommit = false };
             var resultMessages1 = new List<Message>();
             var resultMessages2 = new List<Message>();
-            using (IConsumerConnector consumerConnector = new ZookeeperConsumerConnector(config, true))
+            using (IConsumerConnector consumerConnector = new ZookeeperConsumerConnector(consumerConfig, true))
             {
                 var topicCount = new Dictionary<string, int> { { topic1, 1 }, { topic2, 1 } };
                 var messages = consumerConnector.CreateMessageStreams(topicCount);
@@ -224,9 +219,10 @@ namespace Kafka.Client.IntegrationTests
         [Test]
         public void ConsumerConnectorReceivesAShutdownSignal()
         {
+            var consumerConfig = this.ZooKeeperBasedConsumerConfig;
+
             // now consuming
-            var config = new ConsumerConfig(clientConfig) { AutoCommit = false };
-            using (IConsumerConnector consumerConnector = new ZookeeperConsumerConnector(config, true))
+            using (IConsumerConnector consumerConnector = new ZookeeperConsumerConnector(consumerConfig, true))
             {
                 var topicCount = new Dictionary<string, int> { { CurrentTestTopic, 1 } };
                 var messages = consumerConnector.CreateMessageStreams(topicCount);
@@ -260,6 +256,9 @@ namespace Kafka.Client.IntegrationTests
         [Test]
         public void ProducersSendMessagesToDifferentPartitionsAndConsumerConnectorGetsThemBack()
         {
+            var prodConfig = this.SyncProducerConfig1;
+            var consumerConfig = this.ZooKeeperBasedConsumerConfig;
+
             // first producing
             string payload1 = "kafka 1.";
             byte[] payloadData1 = Encoding.UTF8.GetBytes(payload1);
@@ -269,23 +268,21 @@ namespace Kafka.Client.IntegrationTests
             byte[] payloadData2 = Encoding.UTF8.GetBytes(payload2);
             var msg2 = new Message(payloadData2);
 
-            var producerConfig = new SyncProducerConfig(clientConfig);
-            var producer = new SyncProducer(producerConfig);
-            var producerRequest1 = new ProducerRequest(CurrentTestTopic, 0, new List<Message>() { msg1 });
-            producer.Send(producerRequest1);
-            var producerRequest2 = new ProducerRequest(CurrentTestTopic, 1, new List<Message>() { msg2 });
-            producer.Send(producerRequest2);
+            using (var producer = new SyncProducer(prodConfig))
+            {
+                var producerRequest1 = new ProducerRequest(CurrentTestTopic, 0, new List<Message> { msg1 });
+                producer.Send(producerRequest1);
+                var producerRequest2 = new ProducerRequest(CurrentTestTopic, 1, new List<Message> { msg2 });
+                producer.Send(producerRequest2);
+            }
 
             // now consuming
-            var config = new ConsumerConfig(clientConfig) { AutoCommit = false };
             var resultMessages = new List<Message>();
-            using (IConsumerConnector consumerConnector = new ZookeeperConsumerConnector(config, true))
+            using (IConsumerConnector consumerConnector = new ZookeeperConsumerConnector(consumerConfig, true))
             {
                 var topicCount = new Dictionary<string, int> { { CurrentTestTopic, 1 } };
                 var messages = consumerConnector.CreateMessageStreams(topicCount);
-
                 var sets = messages[CurrentTestTopic];
-                
                 try
                 {
                     foreach (var set in sets)

Modified: incubator/kafka/trunk/clients/csharp/src/Kafka/Tests/Kafka.Client.IntegrationTests/IntegrationFixtureBase.cs
URL: http://svn.apache.org/viewvc/incubator/kafka/trunk/clients/csharp/src/Kafka/Tests/Kafka.Client.IntegrationTests/IntegrationFixtureBase.cs?rev=1185772&r1=1185771&r2=1185772&view=diff
==============================================================================
--- incubator/kafka/trunk/clients/csharp/src/Kafka/Tests/Kafka.Client.IntegrationTests/IntegrationFixtureBase.cs (original)
+++ incubator/kafka/trunk/clients/csharp/src/Kafka/Tests/Kafka.Client.IntegrationTests/IntegrationFixtureBase.cs Tue Oct 18 17:52:13 2011
@@ -19,6 +19,7 @@ namespace Kafka.Client.IntegrationTests
 {
     using System;
     using System.Threading;
+    using Kafka.Client.Cfg;
     using Kafka.Client.ZooKeeperIntegration;
     using NUnit.Framework;
 
@@ -26,21 +27,121 @@ namespace Kafka.Client.IntegrationTests
     {
         protected string CurrentTestTopic { get; set; }
 
+        protected ProducerConfiguration ConfigBasedSyncProdConfig
+        {
+            get
+            {
+                return ProducerConfiguration.Configure(ProducerConfiguration.DefaultSectionName);
+            }
+        }
+
+        protected SyncProducerConfiguration SyncProducerConfig1
+        {
+            get
+            {
+                var prodConfig = this.ConfigBasedSyncProdConfig;
+                return new SyncProducerConfiguration(
+                    prodConfig,
+                    prodConfig.Brokers[0].BrokerId,
+                    prodConfig.Brokers[0].Host,
+                    prodConfig.Brokers[0].Port);
+            }
+        }
+
+        protected SyncProducerConfiguration SyncProducerConfig2
+        {
+            get
+            {
+                var prodConfig = this.ConfigBasedSyncProdConfig;
+                return new SyncProducerConfiguration(
+                    prodConfig,
+                    prodConfig.Brokers[1].BrokerId,
+                    prodConfig.Brokers[1].Host,
+                    prodConfig.Brokers[1].Port);
+            }
+        }
+
+        protected SyncProducerConfiguration SyncProducerConfig3
+        {
+            get
+            {
+                var prodConfig = this.ConfigBasedSyncProdConfig;
+                return new SyncProducerConfiguration(
+                    prodConfig,
+                    prodConfig.Brokers[2].BrokerId,
+                    prodConfig.Brokers[2].Host,
+                    prodConfig.Brokers[2].Port);
+            }
+        }
+
+        protected ProducerConfiguration ZooKeeperBasedSyncProdConfig
+        {
+            get
+            {
+                return ProducerConfiguration.Configure(ProducerConfiguration.DefaultSectionName + 2);
+            }
+        }
+
+        protected AsyncProducerConfiguration AsyncProducerConfig1
+        {
+            get
+            {
+                var asyncUberConfig = ProducerConfiguration.Configure(ProducerConfiguration.DefaultSectionName + 3);
+                return new AsyncProducerConfiguration(
+                    asyncUberConfig,
+                    asyncUberConfig.Brokers[0].BrokerId,
+                    asyncUberConfig.Brokers[0].Host,
+                    asyncUberConfig.Brokers[0].Port);
+            }
+        }
+
+        protected ConsumerConfiguration ConsumerConfig1
+        {
+            get
+            {
+                return ConsumerConfiguration.Configure(ConsumerConfiguration.DefaultSection + 1);
+            }
+        }
+
+        protected ConsumerConfiguration ConsumerConfig2
+        {
+            get
+            {
+                return ConsumerConfiguration.Configure(ConsumerConfiguration.DefaultSection + 2);
+            }
+        }
+
+        protected ConsumerConfiguration ConsumerConfig3
+        {
+            get
+            {
+                return ConsumerConfiguration.Configure(ConsumerConfiguration.DefaultSection + 3);
+            }
+        }
+
+        protected ConsumerConfiguration ZooKeeperBasedConsumerConfig
+        {
+            get
+            {
+                return ConsumerConfiguration.Configure(ConsumerConfiguration.DefaultSection + 4);
+            }
+        }
+
         [SetUp]
         public void SetupCurrentTestTopic()
         {
-            CurrentTestTopic = TestContext.CurrentContext.Test.Name + "_" + Guid.NewGuid().ToString();
+            CurrentTestTopic = TestContext.CurrentContext.Test.Name + "_" + Guid.NewGuid();
         }
 
         internal static void WaitUntillIdle(IZooKeeperClient client, int timeout)
         {
             Thread.Sleep(timeout);
-            int rest = timeout - client.IdleTime;
+            int rest = client.IdleTime.HasValue ? timeout - client.IdleTime.Value : timeout;
             while (rest > 0)
             {
                 Thread.Sleep(rest);
-                rest = timeout - client.IdleTime;
+                rest = client.IdleTime.HasValue ? timeout - client.IdleTime.Value : timeout;
             }
         }
     }
-}
\ No newline at end of file
+}

Modified: incubator/kafka/trunk/clients/csharp/src/Kafka/Tests/Kafka.Client.IntegrationTests/Kafka.Client.IntegrationTests.csproj
URL: http://svn.apache.org/viewvc/incubator/kafka/trunk/clients/csharp/src/Kafka/Tests/Kafka.Client.IntegrationTests/Kafka.Client.IntegrationTests.csproj?rev=1185772&r1=1185771&r2=1185772&view=diff
==============================================================================
--- incubator/kafka/trunk/clients/csharp/src/Kafka/Tests/Kafka.Client.IntegrationTests/Kafka.Client.IntegrationTests.csproj (original)
+++ incubator/kafka/trunk/clients/csharp/src/Kafka/Tests/Kafka.Client.IntegrationTests/Kafka.Client.IntegrationTests.csproj Tue Oct 18 17:52:13 2011
@@ -88,6 +88,7 @@
     </Reference>
   </ItemGroup>
   <ItemGroup>
+    <Compile Include="CompressionTests.cs" />
     <Compile Include="ConsumerRebalancingTests.cs" />
     <Compile Include="ConsumerTests.cs" />
     <Compile Include="IntegrationFixtureBase.cs" />
@@ -110,6 +111,9 @@
     </ProjectReference>
   </ItemGroup>
   <ItemGroup>
+    <None Include="..\..\..\..\Settings.StyleCop">
+      <Link>Settings.StyleCop</Link>
+    </None>
     <None Include="App.config">
       <SubType>Designer</SubType>
     </None>
@@ -134,4 +138,4 @@
   </Target>
   <Target Name="AfterBuild">
   </Target>
-</Project>
\ No newline at end of file
+</Project>

Modified: incubator/kafka/trunk/clients/csharp/src/Kafka/Tests/Kafka.Client.IntegrationTests/KafkaIntegrationTest.cs
URL: http://svn.apache.org/viewvc/incubator/kafka/trunk/clients/csharp/src/Kafka/Tests/Kafka.Client.IntegrationTests/KafkaIntegrationTest.cs?rev=1185772&r1=1185771&r2=1185772&view=diff
==============================================================================
--- incubator/kafka/trunk/clients/csharp/src/Kafka/Tests/Kafka.Client.IntegrationTests/KafkaIntegrationTest.cs (original)
+++ incubator/kafka/trunk/clients/csharp/src/Kafka/Tests/Kafka.Client.IntegrationTests/KafkaIntegrationTest.cs Tue Oct 18 17:52:13 2011
@@ -22,7 +22,6 @@ namespace Kafka.Client.IntegrationTests
     using System.Linq;
     using System.Text;
     using System.Threading;
-    using Kafka.Client.Cfg;
     using Kafka.Client.Consumers;
     using Kafka.Client.Messages;
     using Kafka.Client.Producers.Async;
@@ -37,39 +36,31 @@ namespace Kafka.Client.IntegrationTests
     public class KafkaIntegrationTest : IntegrationFixtureBase
     {
         /// <summary>
-        /// Kafka Client configuration
-        /// </summary>
-        private static KafkaClientConfiguration clientConfig;
-
-        /// <summary>
         /// Maximum amount of time to wait trying to get a specific test message from Kafka server (in miliseconds)
         /// </summary>
         private static readonly int MaxTestWaitTimeInMiliseconds = 5000;
 
-        [TestFixtureSetUp]
-        public void SetUp()
-        {
-            clientConfig = KafkaClientConfiguration.GetConfiguration();
-        }
-
         /// <summary>
         /// Sends a pair of message to Kafka.
         /// </summary>
         [Test]
         public void ProducerSendsMessage()
         {
+            var prodConfig = this.SyncProducerConfig1;
+
             string payload1 = "kafka 1.";
             byte[] payloadData1 = Encoding.UTF8.GetBytes(payload1);
-            Message msg1 = new Message(payloadData1);
- 
+            var msg1 = new Message(payloadData1);
+
             string payload2 = "kafka 2.";
             byte[] payloadData2 = Encoding.UTF8.GetBytes(payload2);
-            Message msg2 = new Message(payloadData2);
+            var msg2 = new Message(payloadData2);
 
-            var config = new SyncProducerConfig(clientConfig);
-            var producer = new SyncProducer(config);
-            var producerRequest = new ProducerRequest(CurrentTestTopic, 0, new List<Message>() { msg1, msg2 });
-            producer.Send(producerRequest);
+            using (var producer = new SyncProducer(prodConfig))
+            {
+                var producerRequest = new ProducerRequest(CurrentTestTopic, 0, new List<Message> { msg1, msg2 });
+                producer.Send(producerRequest);
+            }
         }
 
         /// <summary>
@@ -78,12 +69,15 @@ namespace Kafka.Client.IntegrationTests
         [Test]
         public void ProducerSendsMessageWithLongTopic()
         {
-            Message msg = new Message(Encoding.UTF8.GetBytes("test message"));
+            var prodConfig = this.SyncProducerConfig1;
+
+            var msg = new Message(Encoding.UTF8.GetBytes("test message"));
             string topic = "ThisIsAVeryLongTopicThisIsAVeryLongTopicThisIsAVeryLongTopicThisIsAVeryLongTopicThisIsAVeryLongTopicThisIsAVeryLongTopic";
-            var config = new SyncProducerConfig(clientConfig);
-            var producer = new SyncProducer(config);
-            var producerRequest = new ProducerRequest(topic, 0, new List<Message>() { msg });
-            producer.Send(producerRequest);
+            using (var producer = new SyncProducer(prodConfig))
+            {
+                var producerRequest = new ProducerRequest(topic, 0, new List<Message> { msg });
+                producer.Send(producerRequest);
+            }
         }
 
         /// <summary>
@@ -92,12 +86,12 @@ namespace Kafka.Client.IntegrationTests
         [Test]
         public void AsyncProducerSendsManyLongRandomMessages()
         {
+            var prodConfig = this.AsyncProducerConfig1;
             List<Message> messages = GenerateRandomTextMessages(50);
-
-            var config = new AsyncProducerConfig(clientConfig);
-
-            var producer = new AsyncProducer(config);
-            producer.Send(CurrentTestTopic, 0, messages);
+            using (var producer = new AsyncProducer(prodConfig))
+            {
+                producer.Send(CurrentTestTopic, 0, messages);
+            }
         }
 
         /// <summary>
@@ -106,7 +100,9 @@ namespace Kafka.Client.IntegrationTests
         [Test]
         public void AsyncProducerSendsFewShortFixedMessages()
         {
-            List<Message> messages = new List<Message>()
+            var prodConfig = this.AsyncProducerConfig1;
+
+            var messages = new List<Message>
                                          {
                                              new Message(Encoding.UTF8.GetBytes("Async Test Message 1")),
                                              new Message(Encoding.UTF8.GetBytes("Async Test Message 2")),
@@ -114,10 +110,10 @@ namespace Kafka.Client.IntegrationTests
                                              new Message(Encoding.UTF8.GetBytes("Async Test Message 4"))
                                          };
 
-            var config = new AsyncProducerConfig(clientConfig);
-
-            var producer = new AsyncProducer(config);
-            producer.Send(CurrentTestTopic, 0, messages);
+            using (var producer = new AsyncProducer(prodConfig))
+            {
+                producer.Send(CurrentTestTopic, 0, messages);
+            }
         }
 
         /// <summary>
@@ -126,25 +122,26 @@ namespace Kafka.Client.IntegrationTests
         [Test]
         public void AsyncProducerSendsFewShortFixedMessagesInSeparateSendActions()
         {
-            var config = new AsyncProducerConfig(clientConfig);
-            using (var producer = new AsyncProducer(config))
+            var prodConfig = this.AsyncProducerConfig1;
+
+            using (var producer = new AsyncProducer(prodConfig))
             {
-                ProducerRequest req1 = new ProducerRequest(
+                var req1 = new ProducerRequest(
                     CurrentTestTopic,
                     0,
-                    new List<Message>() { new Message(Encoding.UTF8.GetBytes("Async Test Message 1")) });
+                    new List<Message> { new Message(Encoding.UTF8.GetBytes("Async Test Message 1")) });
                 producer.Send(req1);
 
-                ProducerRequest req2 = new ProducerRequest(
+                var req2 = new ProducerRequest(
                     CurrentTestTopic,
                     0,
-                    new List<Message>() { new Message(Encoding.UTF8.GetBytes("Async Test Message 2")) });
+                    new List<Message> { new Message(Encoding.UTF8.GetBytes("Async Test Message 2")) });
                 producer.Send(req2);
 
-                ProducerRequest req3 = new ProducerRequest(
+                var req3 = new ProducerRequest(
                     CurrentTestTopic,
                     0,
-                    new List<Message>() { new Message(Encoding.UTF8.GetBytes("Async Test Message 3")) });
+                    new List<Message> { new Message(Encoding.UTF8.GetBytes("Async Test Message 3")) });
                 producer.Send(req3);
             }
         }
@@ -152,14 +149,18 @@ namespace Kafka.Client.IntegrationTests
         [Test]
         public void AsyncProducerSendsMessageWithCallbackClass()
         {
-            List<Message> messages = new List<Message>()
+            var prodConfig = this.AsyncProducerConfig1;
+
+            var messages = new List<Message>
                                          {
                                              new Message(Encoding.UTF8.GetBytes("Async Test Message 1")),
                                          };
-            var config = new AsyncProducerConfig(clientConfig);
-            TestCallbackHandler myHandler = new TestCallbackHandler();
-            var producer = new AsyncProducer(config, myHandler);
-            producer.Send(CurrentTestTopic, 0, messages);
+            var myHandler = new TestCallbackHandler();
+            using (var producer = new AsyncProducer(prodConfig, myHandler))
+            {
+                producer.Send(CurrentTestTopic, 0, messages);
+            }
+
             Thread.Sleep(1000);
             Assert.IsTrue(myHandler.WasRun);
         }
@@ -167,14 +168,18 @@ namespace Kafka.Client.IntegrationTests
         [Test]
         public void AsyncProducerSendsMessageWithCallback()
         {
-            List<Message> messages = new List<Message>()
+            var prodConfig = this.AsyncProducerConfig1;
+
+            var messages = new List<Message>
                                          {
                                              new Message(Encoding.UTF8.GetBytes("Async Test Message 1")),
                                          };
-            var config = new AsyncProducerConfig(clientConfig);
-            TestCallbackHandler myHandler = new TestCallbackHandler();
-            var producer = new AsyncProducer(config);
-            producer.Send(CurrentTestTopic, 0, messages, myHandler.Handle);
+            var myHandler = new TestCallbackHandler();
+            using (var producer = new AsyncProducer(prodConfig))
+            {
+                producer.Send(CurrentTestTopic, 0, messages, myHandler.Handle);
+            }
+
             Thread.Sleep(1000);
             Assert.IsTrue(myHandler.WasRun);
         }
@@ -195,7 +200,9 @@ namespace Kafka.Client.IntegrationTests
         [Test]
         public void ProducerSendMultiRequest()
         {
-            List<ProducerRequest> requests = new List<ProducerRequest>
+            var prodConfig = this.SyncProducerConfig1;
+
+            var requests = new List<ProducerRequest>
             { 
                 new ProducerRequest(CurrentTestTopic, 0, new List<Message> { new Message(Encoding.UTF8.GetBytes("1: " + DateTime.UtcNow)) }),
                 new ProducerRequest(CurrentTestTopic, 0, new List<Message> { new Message(Encoding.UTF8.GetBytes("2: " + DateTime.UtcNow)) }),
@@ -203,9 +210,10 @@ namespace Kafka.Client.IntegrationTests
                 new ProducerRequest(CurrentTestTopic, 0, new List<Message> { new Message(Encoding.UTF8.GetBytes("4: " + DateTime.UtcNow)) })
             };
 
-            var config = new SyncProducerConfig(clientConfig);
-            var producer = new SyncProducer(config);
-            producer.MultiSend(requests);
+            using (var producer = new SyncProducer(prodConfig))
+            {
+                producer.MultiSend(requests);
+            }
         }
 
         /// <summary>
@@ -214,17 +222,21 @@ namespace Kafka.Client.IntegrationTests
         [Test]
         public void ConsumerFetchMessage()
         {
+            var consumerConfig = this.ConsumerConfig1;
             ProducerSendsMessage();
-
-            ConsumerConfig config = new ConsumerConfig(clientConfig);
-            IConsumer consumer = new Kafka.Client.Consumers.Consumer(config);
-            FetchRequest request = new FetchRequest(CurrentTestTopic, 0, 0);
+            Thread.Sleep(1000);
+            IConsumer consumer = new Consumer(consumerConfig);
+            var request = new FetchRequest(CurrentTestTopic, 0, 0);
             BufferedMessageSet response = consumer.Fetch(request);
             Assert.NotNull(response);
-            foreach (var message in response.Messages)
+            int count = 0;
+            foreach (var message in response)
             {
-                Console.WriteLine(message);
+                count++;
+                Console.WriteLine(message.Message);
             }
+
+            Assert.AreEqual(2, count);
         }
 
         /// <summary>
@@ -233,25 +245,28 @@ namespace Kafka.Client.IntegrationTests
         [Test]
         public void ConsumerMultiFetchGetsMessage()
         {
-            ProducerSendMultiRequest();
+            var config = this.ConsumerConfig1;
 
-            ConsumerConfig config = new ConsumerConfig(clientConfig);
-            IConsumer cons = new Consumers.Consumer(config);
-            MultiFetchRequest request = new MultiFetchRequest(new List<FetchRequest>
+            ProducerSendMultiRequest();
+            Thread.Sleep(2000);
+            IConsumer cons = new Consumer(config);
+            var request = new MultiFetchRequest(new List<FetchRequest>
             {
                 new FetchRequest(CurrentTestTopic, 0, 0),
                 new FetchRequest(CurrentTestTopic, 0, 0),
-                new FetchRequest(CurrentTestTopic + "2", 0, 0)
+                new FetchRequest(CurrentTestTopic, 0, 0)
             });
 
             IList<BufferedMessageSet> response = cons.MultiFetch(request);
+            Assert.AreEqual(3, response.Count);
             for (int ix = 0; ix < response.Count; ix++)
             {
                 IEnumerable<Message> messageSet = response[ix].Messages;
+                Assert.AreEqual(4, messageSet.Count());
                 Console.WriteLine(string.Format("Request #{0}-->", ix));
                 foreach (Message msg in messageSet)
                 {
-                    Console.WriteLine(msg);
+                    Console.WriteLine(msg.ToString());
                 }
             }
         }
@@ -262,10 +277,10 @@ namespace Kafka.Client.IntegrationTests
         [Test]
         public void ConsumerGetsOffsets()
         {
-            OffsetRequest request = new OffsetRequest(CurrentTestTopic, 0, DateTime.Now.AddHours(-24).Ticks, 10);
+            var consumerConfig = this.ConsumerConfig1;
 
-            ConsumerConfig config = new ConsumerConfig(clientConfig);
-            IConsumer consumer = new Consumers.Consumer(config);
+            var request = new OffsetRequest(CurrentTestTopic, 0, DateTime.Now.AddHours(-24).Ticks, 10);
+            IConsumer consumer = new Consumer(consumerConfig);
             IList<long> list = consumer.GetOffsetsBefore(request);
 
             foreach (long l in list)
@@ -280,20 +295,19 @@ namespace Kafka.Client.IntegrationTests
         [Test]
         public void ProducerSendsAndConsumerReceivesSingleSimpleMessage()
         {
-            Message sourceMessage = new Message(Encoding.UTF8.GetBytes("test message"));
-
-            var config = new SyncProducerConfig(clientConfig);
-            var producer = new SyncProducer(config);
-            var producerRequest = new ProducerRequest(CurrentTestTopic, 0, new List<Message>() { sourceMessage });
-
-            long currentOffset = TestHelper.GetCurrentKafkaOffset(CurrentTestTopic, clientConfig);
+            var prodConfig = this.SyncProducerConfig1;
+            var consumerConfig = this.ConsumerConfig1;
 
-            producer.Send(producerRequest);
-
-            ConsumerConfig consumerConfig = new ConsumerConfig(clientConfig);
-            IConsumer consumer = new Consumers.Consumer(consumerConfig);
-            FetchRequest request = new FetchRequest(CurrentTestTopic, 0, currentOffset);
+            var sourceMessage = new Message(Encoding.UTF8.GetBytes("test message"));
+            long currentOffset = TestHelper.GetCurrentKafkaOffset(CurrentTestTopic, consumerConfig);
+            using (var producer = new SyncProducer(prodConfig))
+            {
+                var producerRequest = new ProducerRequest(CurrentTestTopic, 0, new List<Message> { sourceMessage });
+                producer.Send(producerRequest);
+            }
 
+            IConsumer consumer = new Consumer(consumerConfig);
+            var request = new FetchRequest(CurrentTestTopic, 0, currentOffset);
             BufferedMessageSet response;
             int totalWaitTimeInMiliseconds = 0;
             int waitSingle = 100;
@@ -305,13 +319,11 @@ namespace Kafka.Client.IntegrationTests
                 {
                     break;
                 }
-                else
+
+                totalWaitTimeInMiliseconds += waitSingle;
+                if (totalWaitTimeInMiliseconds >= MaxTestWaitTimeInMiliseconds)
                 {
-                    totalWaitTimeInMiliseconds += waitSingle;
-                    if (totalWaitTimeInMiliseconds >= MaxTestWaitTimeInMiliseconds)
-                    {
-                        break;
-                    }
+                    break;
                 }
             }
 
@@ -327,19 +339,19 @@ namespace Kafka.Client.IntegrationTests
         [Test]
         public void AsyncProducerSendsAndConsumerReceivesSingleSimpleMessage()
         {
-            Message sourceMessage = new Message(Encoding.UTF8.GetBytes("test message"));
-
-            var config = new AsyncProducerConfig(clientConfig);
-            var producer = new AsyncProducer(config);
-            var producerRequest = new ProducerRequest(CurrentTestTopic, 0, new List<Message>() { sourceMessage });
-
-            long currentOffset = TestHelper.GetCurrentKafkaOffset(CurrentTestTopic, clientConfig);
+            var prodConfig = this.AsyncProducerConfig1;
+            var consumerConfig = this.ConsumerConfig1;
 
-            producer.Send(producerRequest);
+            var sourceMessage = new Message(Encoding.UTF8.GetBytes("test message"));
+            using (var producer = new AsyncProducer(prodConfig))
+            {
+                var producerRequest = new ProducerRequest(CurrentTestTopic, 0, new List<Message> { sourceMessage });
+                producer.Send(producerRequest);
+            }
 
-            ConsumerConfig consumerConfig = new ConsumerConfig(clientConfig);
-            IConsumer consumer = new Consumers.Consumer(consumerConfig);
-            FetchRequest request = new FetchRequest(CurrentTestTopic, 0, currentOffset);
+            long currentOffset = TestHelper.GetCurrentKafkaOffset(CurrentTestTopic, consumerConfig);
+            IConsumer consumer = new Consumer(consumerConfig);
+            var request = new FetchRequest(CurrentTestTopic, 0, currentOffset);
 
             BufferedMessageSet response;
             int totalWaitTimeInMiliseconds = 0;
@@ -352,13 +364,11 @@ namespace Kafka.Client.IntegrationTests
                 {
                     break;
                 }
-                else
+
+                totalWaitTimeInMiliseconds += waitSingle;
+                if (totalWaitTimeInMiliseconds >= MaxTestWaitTimeInMiliseconds)
                 {
-                    totalWaitTimeInMiliseconds += waitSingle;
-                    if (totalWaitTimeInMiliseconds >= MaxTestWaitTimeInMiliseconds)
-                    {
-                        break;
-                    }
+                    break;
                 }
             }
 
@@ -374,16 +384,19 @@ namespace Kafka.Client.IntegrationTests
         [Test]
         public void ProducerSendsAndConsumerReceivesMultiRequest()
         {
+            var prodConfig = this.SyncProducerConfig1;
+            var consumerConfig = this.ConsumerConfig1;
+
             string testTopic1 = CurrentTestTopic + "1";
             string testTopic2 = CurrentTestTopic + "2";
             string testTopic3 = CurrentTestTopic + "3";
 
-            Message sourceMessage1 = new Message(Encoding.UTF8.GetBytes("1: TestMessage"));
-            Message sourceMessage2 = new Message(Encoding.UTF8.GetBytes("2: TestMessage"));
-            Message sourceMessage3 = new Message(Encoding.UTF8.GetBytes("3: TestMessage"));
-            Message sourceMessage4 = new Message(Encoding.UTF8.GetBytes("4: TestMessage"));
+            var sourceMessage1 = new Message(Encoding.UTF8.GetBytes("1: TestMessage"));
+            var sourceMessage2 = new Message(Encoding.UTF8.GetBytes("2: TestMessage"));
+            var sourceMessage3 = new Message(Encoding.UTF8.GetBytes("3: TestMessage"));
+            var sourceMessage4 = new Message(Encoding.UTF8.GetBytes("4: TestMessage"));
 
-            List<ProducerRequest> requests = new List<ProducerRequest>
+            var requests = new List<ProducerRequest>
             { 
                 new ProducerRequest(testTopic1, 0, new List<Message> { sourceMessage1 }),
                 new ProducerRequest(testTopic1, 0, new List<Message> { sourceMessage2 }),
@@ -391,18 +404,17 @@ namespace Kafka.Client.IntegrationTests
                 new ProducerRequest(testTopic3, 0, new List<Message> { sourceMessage4 })
             };
 
-            var config = new SyncProducerConfig(clientConfig);
-            var producer = new SyncProducer(config);
+            long currentOffset1 = TestHelper.GetCurrentKafkaOffset(testTopic1, consumerConfig);
+            long currentOffset2 = TestHelper.GetCurrentKafkaOffset(testTopic2, consumerConfig);
+            long currentOffset3 = TestHelper.GetCurrentKafkaOffset(testTopic3, consumerConfig);
 
-            long currentOffset1 = TestHelper.GetCurrentKafkaOffset(testTopic1, clientConfig);
-            long currentOffset2 = TestHelper.GetCurrentKafkaOffset(testTopic2, clientConfig);
-            long currentOffset3 = TestHelper.GetCurrentKafkaOffset(testTopic3, clientConfig);
-
-            producer.MultiSend(requests);
-
-            ConsumerConfig consumerConfig = new ConsumerConfig(clientConfig);
-            IConsumer consumer = new Consumers.Consumer(consumerConfig);
-            MultiFetchRequest request = new MultiFetchRequest(new List<FetchRequest>
+            using (var producer = new SyncProducer(prodConfig))
+            {
+                producer.MultiSend(requests);
+            }
+
+            IConsumer consumer = new Consumer(consumerConfig);
+            var request = new MultiFetchRequest(new List<FetchRequest>
             {
                 new FetchRequest(testTopic1, 0, currentOffset1),
                 new FetchRequest(testTopic2, 0, currentOffset2),
@@ -419,13 +431,11 @@ namespace Kafka.Client.IntegrationTests
                 {
                     break;
                 }
-                else
+
+                totalWaitTimeInMiliseconds += waitSingle;
+                if (totalWaitTimeInMiliseconds >= MaxTestWaitTimeInMiliseconds)
                 {
-                    totalWaitTimeInMiliseconds += waitSingle;
-                    if (totalWaitTimeInMiliseconds >= MaxTestWaitTimeInMiliseconds)
-                    {
-                        break;
-                    }
+                    break;
                 }
             }
 
@@ -440,43 +450,13 @@ namespace Kafka.Client.IntegrationTests
         }
 
         /// <summary>
-        /// Gererates a randome list of messages.
-        /// </summary>
-        /// <param name="numberOfMessages">The number of messages to generate.</param>
-        /// <returns>A list of random messages.</returns>
-        private static List<Message> GenerateRandomMessages(int numberOfMessages)
-        {
-            List<Message> messages = new List<Message>();
-            for (int ix = 0; ix < numberOfMessages; ix++)
-            {
-                messages.Add(new Message(GenerateRandomBytes(10000)));
-            }
-
-            return messages;
-        }
-
-        /// <summary>
-        /// Generate a random set of bytes.
-        /// </summary>
-        /// <param name="length">Length of the byte array.</param>
-        /// <returns>Random byte array.</returns>
-        private static byte[] GenerateRandomBytes(int length)
-        {
-            byte[] randBytes = new byte[length];
-            Random randNum = new Random();
-            randNum.NextBytes(randBytes);
-
-            return randBytes;
-        }
-
-        /// <summary>
         /// Gererates a randome list of text messages.
         /// </summary>
         /// <param name="numberOfMessages">The number of messages to generate.</param>
         /// <returns>A list of random text messages.</returns>
         private static List<Message> GenerateRandomTextMessages(int numberOfMessages)
         {
-            List<Message> messages = new List<Message>();
+            var messages = new List<Message>();
             for (int ix = 0; ix < numberOfMessages; ix++)
             {
                 ////messages.Add(new Message(GenerateRandomBytes(10000)));
@@ -493,12 +473,11 @@ namespace Kafka.Client.IntegrationTests
         /// <returns>Random message string.</returns>
         private static string GenerateRandomMessage(int length)
         {
-            StringBuilder builder = new StringBuilder();
-            Random random = new Random();
-            char ch;
+            var builder = new StringBuilder();
+            var random = new Random();
             for (int i = 0; i < length; i++)
             {
-                ch = Convert.ToChar(Convert.ToInt32(
+                char ch = Convert.ToChar(Convert.ToInt32(
                     Math.Floor((26 * random.NextDouble()) + 65)));
                 builder.Append(ch);
             }

Modified: incubator/kafka/trunk/clients/csharp/src/Kafka/Tests/Kafka.Client.IntegrationTests/MockAlwaysZeroPartitioner.cs
URL: http://svn.apache.org/viewvc/incubator/kafka/trunk/clients/csharp/src/Kafka/Tests/Kafka.Client.IntegrationTests/MockAlwaysZeroPartitioner.cs?rev=1185772&r1=1185771&r2=1185772&view=diff
==============================================================================
--- incubator/kafka/trunk/clients/csharp/src/Kafka/Tests/Kafka.Client.IntegrationTests/MockAlwaysZeroPartitioner.cs (original)
+++ incubator/kafka/trunk/clients/csharp/src/Kafka/Tests/Kafka.Client.IntegrationTests/MockAlwaysZeroPartitioner.cs Tue Oct 18 17:52:13 2011
@@ -19,6 +19,8 @@ using Kafka.Client.Producers.Partitionin
 
 namespace Kafka.Client.IntegrationTests
 {
+    using Kafka.Client.Producers.Partitioning;
+
     /// <summary>
     /// This mock partitioner will always point to the first partition (the one of index = 0)
     /// </summary>

Modified: incubator/kafka/trunk/clients/csharp/src/Kafka/Tests/Kafka.Client.IntegrationTests/ProducerTests.cs
URL: http://svn.apache.org/viewvc/incubator/kafka/trunk/clients/csharp/src/Kafka/Tests/Kafka.Client.IntegrationTests/ProducerTests.cs?rev=1185772&r1=1185771&r2=1185772&view=diff
==============================================================================
--- incubator/kafka/trunk/clients/csharp/src/Kafka/Tests/Kafka.Client.IntegrationTests/ProducerTests.cs (original)
+++ incubator/kafka/trunk/clients/csharp/src/Kafka/Tests/Kafka.Client.IntegrationTests/ProducerTests.cs Tue Oct 18 17:52:13 2011
@@ -33,213 +33,194 @@ namespace Kafka.Client.IntegrationTests
     public class ProducerTests : IntegrationFixtureBase
     {
         /// <summary>
-        /// Kafka Client configuration
-        /// </summary>
-        private KafkaClientConfiguration clientConfig;
-
-        /// <summary>
         /// Maximum amount of time to wait trying to get a specific test message from Kafka server (in miliseconds)
         /// </summary>
         private readonly int maxTestWaitTimeInMiliseconds = 5000;
 
-        [TestFixtureSetUp]
-        public void SetUp()
-        {
-            clientConfig = KafkaClientConfiguration.GetConfiguration();
-            clientConfig.SupressZooKeeper();
-        }
-
         [Test]
         public void ProducerSends1Message()
         {
+            var prodConfig = this.ConfigBasedSyncProdConfig;
+
             int totalWaitTimeInMiliseconds = 0;
             int waitSingle = 100;
             var originalMessage = new Message(Encoding.UTF8.GetBytes("TestData"));
 
             var multipleBrokersHelper = new TestMultipleBrokersHelper(CurrentTestTopic);
-            multipleBrokersHelper.GetCurrentOffsets();
-
-            var producerConfig = new ProducerConfig(clientConfig);
-            var mockPartitioner = new MockAlwaysZeroPartitioner();
-            using (var producer = new Producer<string, Message>(producerConfig, mockPartitioner, new DefaultEncoder(), null))
+            multipleBrokersHelper.GetCurrentOffsets(
+                new[] { this.SyncProducerConfig1, this.SyncProducerConfig2, this.SyncProducerConfig3 });
+            using (var producer = new Producer(prodConfig))
             {
                 var producerData = new ProducerData<string, Message>(
-                    CurrentTestTopic, "somekey", new List<Message> { originalMessage });
+                    CurrentTestTopic, new List<Message> { originalMessage });
                 producer.Send(producerData);
                 Thread.Sleep(waitSingle);
+            }
 
-                while (!multipleBrokersHelper.CheckIfAnyBrokerHasChanged())
+            while (
+                !multipleBrokersHelper.CheckIfAnyBrokerHasChanged(
+                    new[] { this.SyncProducerConfig1, this.SyncProducerConfig2, this.SyncProducerConfig3 }))
+            {
+                totalWaitTimeInMiliseconds += waitSingle;
+                Thread.Sleep(waitSingle);
+                if (totalWaitTimeInMiliseconds > this.maxTestWaitTimeInMiliseconds)
                 {
-                    totalWaitTimeInMiliseconds += waitSingle;
-                    Thread.Sleep(waitSingle);
-                    if (totalWaitTimeInMiliseconds > this.maxTestWaitTimeInMiliseconds)
-                    {
-                        Assert.Fail("None of the brokers changed their offset after sending a message");
-                    }
-                }
-
-                totalWaitTimeInMiliseconds = 0;
-
-                var consumerConfig = new ConsumerConfig(clientConfig)
-                    {
-                        Host = multipleBrokersHelper.BrokerThatHasChanged.Address,
-                        Port = multipleBrokersHelper.BrokerThatHasChanged.Port
-                    };
-                IConsumer consumer = new Consumers.Consumer(consumerConfig);
-                var request = new FetchRequest(CurrentTestTopic, 0, multipleBrokersHelper.OffsetFromBeforeTheChange);
-
-                BufferedMessageSet response;
-
-                while (true)
-                {
-                    Thread.Sleep(waitSingle);
-                    response = consumer.Fetch(request);
-                    if (response != null && response.Messages.Count() > 0)
-                    {
-                        break;
-                    }
-
-                    totalWaitTimeInMiliseconds += waitSingle;
-                    if (totalWaitTimeInMiliseconds >= this.maxTestWaitTimeInMiliseconds)
-                    {
-                        break;
-                    }
-                }
-
-                Assert.NotNull(response);
-                Assert.AreEqual(1, response.Messages.Count());
-                Assert.AreEqual(originalMessage.ToString(), response.Messages.First().ToString());
+                    Assert.Fail("None of the brokers changed their offset after sending a message");
+                }
             }
+
+            totalWaitTimeInMiliseconds = 0;
+
+            var consumerConfig = new ConsumerConfiguration(
+                multipleBrokersHelper.BrokerThatHasChanged.Host, multipleBrokersHelper.BrokerThatHasChanged.Port);
+            IConsumer consumer = new Consumer(consumerConfig);
+            var request1 = new FetchRequest(CurrentTestTopic, multipleBrokersHelper.PartitionThatHasChanged, multipleBrokersHelper.OffsetFromBeforeTheChange);
+            BufferedMessageSet response;
+            while (true)
+            {
+                Thread.Sleep(waitSingle);
+                response = consumer.Fetch(request1);
+                if (response != null && response.Messages.Count() > 0)
+                {
+                    break;
+                }
+
+                totalWaitTimeInMiliseconds += waitSingle;
+                if (totalWaitTimeInMiliseconds >= this.maxTestWaitTimeInMiliseconds)
+                {
+                    break;
+                }
+            }
+
+            Assert.NotNull(response);
+            Assert.AreEqual(1, response.Messages.Count());
+            Assert.AreEqual(originalMessage.ToString(), response.Messages.First().ToString());
         }
 
         [Test]
         public void ProducerSends3Messages()
         {
+            var prodConfig = this.ConfigBasedSyncProdConfig;
+
             int totalWaitTimeInMiliseconds = 0;
             int waitSingle = 100;
             var originalMessage1 = new Message(Encoding.UTF8.GetBytes("TestData1"));
             var originalMessage2 = new Message(Encoding.UTF8.GetBytes("TestData2"));
             var originalMessage3 = new Message(Encoding.UTF8.GetBytes("TestData3"));
-            var originalMessageList =
-                new List<Message> { originalMessage1, originalMessage2, originalMessage3 };
+            var originalMessageList = new List<Message> { originalMessage1, originalMessage2, originalMessage3 };
 
             var multipleBrokersHelper = new TestMultipleBrokersHelper(CurrentTestTopic);
-            multipleBrokersHelper.GetCurrentOffsets();
-
-            var producerConfig = new ProducerConfig(clientConfig);
-            var mockPartitioner = new MockAlwaysZeroPartitioner();
-            using (var producer = new Producer<string, Message>(producerConfig, mockPartitioner, new DefaultEncoder(), null))
+            multipleBrokersHelper.GetCurrentOffsets(
+                new[] { this.SyncProducerConfig1, this.SyncProducerConfig2, this.SyncProducerConfig3 });
+            using (var producer = new Producer(prodConfig))
             {
-                var producerData = new ProducerData<string, Message>(CurrentTestTopic, "somekey", originalMessageList);
+                var producerData = new ProducerData<string, Message>(CurrentTestTopic, originalMessageList);
                 producer.Send(producerData);
+            }
+
+            Thread.Sleep(waitSingle);
+            while (
+                !multipleBrokersHelper.CheckIfAnyBrokerHasChanged(
+                    new[] { this.SyncProducerConfig1, this.SyncProducerConfig2, this.SyncProducerConfig3 }))
+            {
+                totalWaitTimeInMiliseconds += waitSingle;
+                Thread.Sleep(waitSingle);
+                if (totalWaitTimeInMiliseconds > this.maxTestWaitTimeInMiliseconds)
+                {
+                    Assert.Fail("None of the brokers changed their offset after sending a message");
+                }
+            }
+
+            totalWaitTimeInMiliseconds = 0;
+
+            var consumerConfig = new ConsumerConfiguration(
+                multipleBrokersHelper.BrokerThatHasChanged.Host, multipleBrokersHelper.BrokerThatHasChanged.Port);
+            IConsumer consumer = new Consumer(consumerConfig);
+            var request = new FetchRequest(CurrentTestTopic, multipleBrokersHelper.PartitionThatHasChanged, multipleBrokersHelper.OffsetFromBeforeTheChange);
+
+            BufferedMessageSet response;
+            while (true)
+            {
                 Thread.Sleep(waitSingle);
+                response = consumer.Fetch(request);
+                if (response != null && response.Messages.Count() > 2)
+                {
+                    break;
+                }
 
-                while (!multipleBrokersHelper.CheckIfAnyBrokerHasChanged())
+                totalWaitTimeInMiliseconds += waitSingle;
+                if (totalWaitTimeInMiliseconds >= this.maxTestWaitTimeInMiliseconds)
                 {
-                    totalWaitTimeInMiliseconds += waitSingle;
-                    Thread.Sleep(waitSingle);
-                    if (totalWaitTimeInMiliseconds > this.maxTestWaitTimeInMiliseconds)
-                    {
-                        Assert.Fail("None of the brokers changed their offset after sending a message");
-                    }
-                }
-
-                totalWaitTimeInMiliseconds = 0;
-
-                var consumerConfig = new ConsumerConfig(clientConfig)
-                    {
-                        Host = multipleBrokersHelper.BrokerThatHasChanged.Address,
-                        Port = multipleBrokersHelper.BrokerThatHasChanged.Port
-                    };
-                IConsumer consumer = new Consumers.Consumer(consumerConfig);
-                var request = new FetchRequest(CurrentTestTopic, 0, multipleBrokersHelper.OffsetFromBeforeTheChange);
-
-                BufferedMessageSet response;
-                while (true)
-                {
-                    Thread.Sleep(waitSingle);
-                    response = consumer.Fetch(request);
-                    if (response != null && response.Messages.Count() > 2)
-                    {
-                        break;
-                    }
-
-                    totalWaitTimeInMiliseconds += waitSingle;
-                    if (totalWaitTimeInMiliseconds >= this.maxTestWaitTimeInMiliseconds)
-                    {
-                        break;
-                    }
-                }
-
-                Assert.NotNull(response);
-                Assert.AreEqual(3, response.Messages.Count());
-                Assert.AreEqual(originalMessage1.ToString(), response.Messages.First().ToString());
-                Assert.AreEqual(originalMessage2.ToString(), response.Messages.Skip(1).First().ToString());
-                Assert.AreEqual(originalMessage3.ToString(), response.Messages.Skip(2).First().ToString());
+                    break;
+                }
             }
+
+            Assert.NotNull(response);
+            Assert.AreEqual(3, response.Messages.Count());
+            Assert.AreEqual(originalMessage1.ToString(), response.Messages.First().ToString());
+            Assert.AreEqual(originalMessage2.ToString(), response.Messages.Skip(1).First().ToString());
+            Assert.AreEqual(originalMessage3.ToString(), response.Messages.Skip(2).First().ToString());
         }
 
         [Test]
         public void ProducerSends1MessageUsingNotDefaultEncoder()
         {
+            var prodConfig = this.ConfigBasedSyncProdConfig;
+
             int totalWaitTimeInMiliseconds = 0;
             int waitSingle = 100;
             string originalMessage = "TestData";
 
             var multipleBrokersHelper = new TestMultipleBrokersHelper(CurrentTestTopic);
-            multipleBrokersHelper.GetCurrentOffsets();
-
-            var producerConfig = new ProducerConfig(clientConfig);
-            var mockPartitioner = new MockAlwaysZeroPartitioner();
-            using (var producer = new Producer<string, string>(producerConfig, mockPartitioner, new StringEncoder(), null))
+            multipleBrokersHelper.GetCurrentOffsets(new[] { this.SyncProducerConfig1, this.SyncProducerConfig2, this.SyncProducerConfig3 });
+            using (var producer = new Producer<string, string>(prodConfig, null, new StringEncoder(), null))
             {
                 var producerData = new ProducerData<string, string>(
-                    CurrentTestTopic, "somekey", new List<string> { originalMessage });
+                    CurrentTestTopic, new List<string> { originalMessage });
 
                 producer.Send(producerData);
+            }
+
+            Thread.Sleep(waitSingle);
+
+            while (!multipleBrokersHelper.CheckIfAnyBrokerHasChanged(new[] { this.SyncProducerConfig1, this.SyncProducerConfig2, this.SyncProducerConfig3 }))
+            {
+                totalWaitTimeInMiliseconds += waitSingle;
+                Thread.Sleep(waitSingle);
+                if (totalWaitTimeInMiliseconds > this.maxTestWaitTimeInMiliseconds)
+                {
+                    Assert.Fail("None of the brokers changed their offset after sending a message");
+                }
+            }
+
+            totalWaitTimeInMiliseconds = 0;
+
+            var consumerConfig = new ConsumerConfiguration(
+                multipleBrokersHelper.BrokerThatHasChanged.Host,
+                    multipleBrokersHelper.BrokerThatHasChanged.Port);
+            IConsumer consumer = new Consumer(consumerConfig);
+            var request = new FetchRequest(CurrentTestTopic, multipleBrokersHelper.PartitionThatHasChanged, multipleBrokersHelper.OffsetFromBeforeTheChange);
+
+            BufferedMessageSet response;
+            while (true)
+            {
                 Thread.Sleep(waitSingle);
+                response = consumer.Fetch(request);
+                if (response != null && response.Messages.Count() > 0)
+                {
+                    break;
+                }
 
-                while (!multipleBrokersHelper.CheckIfAnyBrokerHasChanged())
+                totalWaitTimeInMiliseconds += waitSingle;
+                if (totalWaitTimeInMiliseconds >= this.maxTestWaitTimeInMiliseconds)
                 {
-                    totalWaitTimeInMiliseconds += waitSingle;
-                    Thread.Sleep(waitSingle);
-                    if (totalWaitTimeInMiliseconds > this.maxTestWaitTimeInMiliseconds)
-                    {
-                        Assert.Fail("None of the brokers changed their offset after sending a message");
-                    }
-                }
-
-                totalWaitTimeInMiliseconds = 0;
-
-                var consumerConfig = new ConsumerConfig(clientConfig)
-                    {
-                        Host = multipleBrokersHelper.BrokerThatHasChanged.Address,
-                        Port = multipleBrokersHelper.BrokerThatHasChanged.Port
-                    };
-                IConsumer consumer = new Consumers.Consumer(consumerConfig);
-                var request = new FetchRequest(CurrentTestTopic, 0, multipleBrokersHelper.OffsetFromBeforeTheChange);
-
-                BufferedMessageSet response;
-                while (true)
-                {
-                    Thread.Sleep(waitSingle);
-                    response = consumer.Fetch(request);
-                    if (response != null && response.Messages.Count() > 0)
-                    {
-                        break;
-                    }
-
-                    totalWaitTimeInMiliseconds += waitSingle;
-                    if (totalWaitTimeInMiliseconds >= this.maxTestWaitTimeInMiliseconds)
-                    {
-                        break;
-                    }
-                }
-
-                Assert.NotNull(response);
-                Assert.AreEqual(1, response.Messages.Count());
-                Assert.AreEqual(originalMessage, Encoding.UTF8.GetString(response.Messages.First().Payload));
+                    break;
+                }
             }
+
+            Assert.NotNull(response);
+            Assert.AreEqual(1, response.Messages.Count());
+            Assert.AreEqual(originalMessage, Encoding.UTF8.GetString(response.Messages.First().Payload));
         }
     }
 }

Modified: incubator/kafka/trunk/clients/csharp/src/Kafka/Tests/Kafka.Client.IntegrationTests/TestHelper.cs
URL: http://svn.apache.org/viewvc/incubator/kafka/trunk/clients/csharp/src/Kafka/Tests/Kafka.Client.IntegrationTests/TestHelper.cs?rev=1185772&r1=1185771&r2=1185772&view=diff
==============================================================================
--- incubator/kafka/trunk/clients/csharp/src/Kafka/Tests/Kafka.Client.IntegrationTests/TestHelper.cs (original)
+++ incubator/kafka/trunk/clients/csharp/src/Kafka/Tests/Kafka.Client.IntegrationTests/TestHelper.cs Tue Oct 18 17:52:13 2011
@@ -19,33 +19,30 @@ namespace Kafka.Client.IntegrationTests
 {
     using System;
     using System.Collections.Generic;
+    using System.Linq;
     using Kafka.Client.Cfg;
     using Kafka.Client.Consumers;
     using Kafka.Client.Requests;
 
     public static class TestHelper
     {
-        public static long GetCurrentKafkaOffset(string topic, KafkaClientConfiguration clientConfig)
+        public static long GetCurrentKafkaOffset(string topic, ConsumerConfiguration clientConfig)
         {
-            return GetCurrentKafkaOffset(topic, clientConfig.KafkaServer.Address, clientConfig.KafkaServer.Port);
+            return GetCurrentKafkaOffset(topic, clientConfig.Broker.Host, clientConfig.Broker.Port);
         }
 
         public static long GetCurrentKafkaOffset(string topic, string address, int port)
         {
-            OffsetRequest request = new OffsetRequest(topic, 0, DateTime.Now.AddDays(-5).Ticks, 10);
-            ConsumerConfig consumerConfig = new ConsumerConfig();
-            consumerConfig.Host = address;
-            consumerConfig.Port = port;
-            IConsumer consumer = new Consumers.Consumer(consumerConfig);
+            return GetCurrentKafkaOffset(topic, address, port, 0);
+        }
+
+        public static long GetCurrentKafkaOffset(string topic, string address, int port, int partition)
+        {
+            var request = new OffsetRequest(topic, partition, DateTime.Now.AddDays(-5).Ticks, 10);
+            var consumerConfig = new ConsumerConfiguration(address, port);
+            IConsumer consumer = new Consumer(consumerConfig, address, port);
             IList<long> list = consumer.GetOffsetsBefore(request);
-            if (list.Count > 0)
-            {
-                return list[0];
-            }
-            else
-            {
-                return 0;
-            }
+            return list.Sum();
         }
     }
 }

Modified: incubator/kafka/trunk/clients/csharp/src/Kafka/Tests/Kafka.Client.IntegrationTests/TestMultipleBrokersHelper.cs
URL: http://svn.apache.org/viewvc/incubator/kafka/trunk/clients/csharp/src/Kafka/Tests/Kafka.Client.IntegrationTests/TestMultipleBrokersHelper.cs?rev=1185772&r1=1185771&r2=1185772&view=diff
==============================================================================
--- incubator/kafka/trunk/clients/csharp/src/Kafka/Tests/Kafka.Client.IntegrationTests/TestMultipleBrokersHelper.cs (original)
+++ incubator/kafka/trunk/clients/csharp/src/Kafka/Tests/Kafka.Client.IntegrationTests/TestMultipleBrokersHelper.cs Tue Oct 18 17:52:13 2011
@@ -22,55 +22,52 @@ namespace Kafka.Client.IntegrationTests
 
     public class TestMultipleBrokersHelper
     {
-        private BrokerPartitionInfoCollection configBrokers =
-            KafkaClientConfiguration.GetConfiguration().BrokerPartitionInfos;
+        private readonly Dictionary<int, Dictionary<int, long>> offsets = new Dictionary<int, Dictionary<int, long>>();
 
-        private Dictionary<int, long> offsets = new Dictionary<int, long>();
-
-        private BrokerPartitionInfo changedBroker;
-
-        private string topic;
+        private readonly string topic;
 
         public TestMultipleBrokersHelper(string topic)
         {
             this.topic = topic;
         }
 
-        public BrokerPartitionInfo BrokerThatHasChanged
-        {
-            get { return changedBroker; }
-        }
+        public SyncProducerConfiguration BrokerThatHasChanged { get; private set; }
+
+        public int PartitionThatHasChanged { get; private set; }
 
         public long OffsetFromBeforeTheChange
         {
             get
             {
-                if (changedBroker != null)
-                {
-                    return offsets[changedBroker.Id];
-                }
-                else
-                {
-                    return 0;
-                }
+                return this.BrokerThatHasChanged != null ? this.offsets[this.BrokerThatHasChanged.BrokerId][this.PartitionThatHasChanged] : 0;
             }
         }
 
-        public void GetCurrentOffsets()
+        public void GetCurrentOffsets(IEnumerable<SyncProducerConfiguration> brokers)
         {
-            foreach (BrokerPartitionInfo broker in configBrokers)
+            foreach (var broker in brokers)
             {
-                offsets.Add(broker.Id, TestHelper.GetCurrentKafkaOffset(topic, broker.Address, broker.Port));
+                offsets.Add(broker.BrokerId, new Dictionary<int, long>());
+                offsets[broker.BrokerId].Add(0, TestHelper.GetCurrentKafkaOffset(topic, broker.Host, broker.Port, 0));
+                offsets[broker.BrokerId].Add(1, TestHelper.GetCurrentKafkaOffset(topic, broker.Host, broker.Port, 1));
             }
         }
 
-        public bool CheckIfAnyBrokerHasChanged()
+        public bool CheckIfAnyBrokerHasChanged(IEnumerable<SyncProducerConfiguration> brokers)
         {
-            foreach (BrokerPartitionInfo broker in configBrokers)
+            foreach (var broker in brokers)
             {
-                if (TestHelper.GetCurrentKafkaOffset(topic, broker.Address, broker.Port) != offsets[broker.Id])
+                if (TestHelper.GetCurrentKafkaOffset(topic, broker.Host, broker.Port, 0) != offsets[broker.BrokerId][0])
+                {
+                    this.BrokerThatHasChanged = broker;
+                    this.PartitionThatHasChanged = 0;
+                    return true;
+                }
+
+                if (TestHelper.GetCurrentKafkaOffset(topic, broker.Host, broker.Port, 1) != offsets[broker.BrokerId][1])
                 {
-                    changedBroker = broker;
+                    this.BrokerThatHasChanged = broker;
+                    this.PartitionThatHasChanged = 1;
                     return true;
                 }
             }