You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by ju...@apache.org on 2011/09/21 21:17:25 UTC

svn commit: r1173797 [8/10] - in /incubator/kafka/trunk/clients/csharp: ./ lib/StyleCop/ src/Kafka/ src/Kafka/Kafka.Client/ src/Kafka/Kafka.Client/Cfg/ src/Kafka/Kafka.Client/Cluster/ src/Kafka/Kafka.Client/Consumers/ src/Kafka/Kafka.Client/Exceptions/...

Added: incubator/kafka/trunk/clients/csharp/src/Kafka/Tests/Kafka.Client.IntegrationTests/Config/Integration/App.config
URL: http://svn.apache.org/viewvc/incubator/kafka/trunk/clients/csharp/src/Kafka/Tests/Kafka.Client.IntegrationTests/Config/Integration/App.config?rev=1173797&view=auto
==============================================================================
--- incubator/kafka/trunk/clients/csharp/src/Kafka/Tests/Kafka.Client.IntegrationTests/Config/Integration/App.config (added)
+++ incubator/kafka/trunk/clients/csharp/src/Kafka/Tests/Kafka.Client.IntegrationTests/Config/Integration/App.config Wed Sep 21 19:17:19 2011
@@ -0,0 +1,21 @@
+<?xml version="1.0" encoding="utf-8" ?>
+<configuration>
+  <configSections>
+    <section
+        name="kafkaClientConfiguration"
+        type="Kafka.Client.Cfg.KafkaClientConfiguration, Kafka.Client"
+        allowLocation="true"
+        allowDefinition="Everywhere"
+      />
+  </configSections>
+  <kafkaClientConfiguration>
+    <kafkaServer address="192.168.1.39" port="9092"></kafkaServer>
+    <consumer numberOfTries="2" groupId="testGroup" timeout="10000" autoOffsetReset="smallest" autoCommit="true" autoCommitIntervalMs="10000" fetchSize="307200" backOffIncrementMs="2000"/>
+    <brokerPartitionInfos>
+      <add id="0" address="192.168.1.39" port="9092" />
+      <add id="1" address="192.168.1.39" port="9101" />
+      <add id="2" address="192.168.1.39" port="9102" />
+    </brokerPartitionInfos>
+    <zooKeeperServers addressList="192.168.1.39:2181" sessionTimeout="30000" connectionTimeout="3000"></zooKeeperServers>
+  </kafkaClientConfiguration>
+</configuration>
\ No newline at end of file

Added: incubator/kafka/trunk/clients/csharp/src/Kafka/Tests/Kafka.Client.IntegrationTests/ConsumerRebalancingTests.cs
URL: http://svn.apache.org/viewvc/incubator/kafka/trunk/clients/csharp/src/Kafka/Tests/Kafka.Client.IntegrationTests/ConsumerRebalancingTests.cs?rev=1173797&view=auto
==============================================================================
--- incubator/kafka/trunk/clients/csharp/src/Kafka/Tests/Kafka.Client.IntegrationTests/ConsumerRebalancingTests.cs (added)
+++ incubator/kafka/trunk/clients/csharp/src/Kafka/Tests/Kafka.Client.IntegrationTests/ConsumerRebalancingTests.cs Wed Sep 21 19:17:19 2011
@@ -0,0 +1,267 @@
+/*
+ * Copyright 2011 LinkedIn
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+*/
+
+namespace Kafka.Client.IntegrationTests
+{
+    using System.Collections.Generic;
+    using System.Linq;
+    using System.Threading;
+    using Kafka.Client.Cfg;
+    using Kafka.Client.Cluster;
+    using Kafka.Client.Consumers;
+    using Kafka.Client.Utils;
+    using Kafka.Client.ZooKeeperIntegration;
+    using NUnit.Framework;
+
+    [TestFixture]
+    public class ConsumerRebalancingTests : IntegrationFixtureBase
+    {
+        /// <summary>
+        /// Kafka Client configuration
+        /// </summary>
+        private static KafkaClientConfiguration clientConfig;
+
+        [TestFixtureSetUp]
+        public void SetUp()
+        {
+            clientConfig = KafkaClientConfiguration.GetConfiguration();
+        }
+
+        [Test]
+        public void ConsumerPorformsRebalancingOnStart()
+        {
+            var config = new ConsumerConfig(clientConfig) { AutoCommit = false, GroupId = "group1" };
+            using (var consumerConnector = new ZookeeperConsumerConnector(config, true))
+            {
+                ZooKeeperClient client = ReflectionHelper.GetInstanceField<ZooKeeperClient>("zkClient", consumerConnector);
+                Assert.IsNotNull(client);
+                client.DeleteRecursive("/consumers/group1");
+                var topicCount = new Dictionary<string, int> { { "test", 1 } };
+                consumerConnector.CreateMessageStreams(topicCount);
+                WaitUntillIdle(client, 1000);
+                IList<string> children = client.GetChildren("/consumers", false);
+                Assert.That(children, Is.Not.Null.And.Not.Empty);
+                Assert.That(children, Contains.Item("group1"));
+                children = client.GetChildren("/consumers/group1", false);
+                Assert.That(children, Is.Not.Null.And.Not.Empty);
+                Assert.That(children, Contains.Item("ids"));
+                Assert.That(children, Contains.Item("owners"));
+                children = client.GetChildren("/consumers/group1/ids", false);
+                Assert.That(children, Is.Not.Null.And.Not.Empty);
+                string consumerId = children[0];
+                children = client.GetChildren("/consumers/group1/owners", false);
+                Assert.That(children, Is.Not.Null.And.Not.Empty);
+                Assert.That(children.Count, Is.EqualTo(1));
+                Assert.That(children, Contains.Item("test"));
+                children = client.GetChildren("/consumers/group1/owners/test", false);
+                Assert.That(children, Is.Not.Null.And.Not.Empty);
+                Assert.That(children.Count, Is.EqualTo(2));
+                string partId = children[0];
+                string data = client.ReadData<string>("/consumers/group1/owners/test/" + partId);
+                Assert.That(data, Is.Not.Null.And.Not.Empty);
+                Assert.That(data, Contains.Substring(consumerId));
+                data = client.ReadData<string>("/consumers/group1/ids/" + consumerId);
+                Assert.That(data, Is.Not.Null.And.Not.Empty);
+                Assert.That(data, Is.EqualTo("{ \"test\": 1 }"));
+            }
+
+            using (var client = new ZooKeeperClient(config.ZkConnect, config.ZkSessionTimeoutMs, ZooKeeperStringSerializer.Serializer))
+            {
+                client.Connect();
+                //// Should be created as ephemeral
+                IList<string> children = client.GetChildren("/consumers/group1/ids");
+                Assert.That(children, Is.Null.Or.Empty);
+                //// Should be created as ephemeral
+                children = client.GetChildren("/consumers/group1/owners/test");
+                Assert.That(children, Is.Null.Or.Empty);
+            }
+        }
+
+        [Test]
+        public void ConsumerPorformsRebalancingWhenNewBrokerIsAddedToTopic()
+        {
+            var config = new ConsumerConfig(clientConfig)
+                             {
+                                 AutoCommit = false,
+                                 GroupId = "group1",
+                                 ZkSessionTimeoutMs = 60000,
+                                 ZkConnectionTimeoutMs = 60000
+                             };
+            string brokerPath = ZooKeeperClient.DefaultBrokerIdsPath + "/" + 2345;
+            string brokerTopicPath = ZooKeeperClient.DefaultBrokerTopicsPath + "/test/" + 2345;
+            using (var consumerConnector = new ZookeeperConsumerConnector(config, true))
+            {
+                var client = ReflectionHelper.GetInstanceField<ZooKeeperClient>(
+                    "zkClient", consumerConnector);
+                Assert.IsNotNull(client);
+                client.DeleteRecursive("/consumers/group1");
+                var topicCount = new Dictionary<string, int> { { "test", 1 } };
+                consumerConnector.CreateMessageStreams(topicCount);
+                WaitUntillIdle(client, 1000);
+                IList<string> children = client.GetChildren("/consumers/group1/ids", false);
+                string consumerId = children[0];
+                client.CreateEphemeral(brokerPath, "192.168.1.39-1310449279123:192.168.1.39:9102");
+                client.CreateEphemeral(brokerTopicPath, 1);
+                WaitUntillIdle(client, 500);
+                children = client.GetChildren("/consumers/group1/owners/test", false);
+                Assert.That(children.Count, Is.EqualTo(3));
+                Assert.That(children, Contains.Item("2345-0"));
+                string data = client.ReadData<string>("/consumers/group1/owners/test/2345-0");
+                Assert.That(data, Is.Not.Null);
+                Assert.That(data, Contains.Substring(consumerId));
+                var topicRegistry =
+                    ReflectionHelper.GetInstanceField<IDictionary<string, IDictionary<Partition, PartitionTopicInfo>>>(
+                        "topicRegistry", consumerConnector);
+                Assert.That(topicRegistry, Is.Not.Null.And.Not.Empty);
+                Assert.That(topicRegistry.Count, Is.EqualTo(1));
+                var item = topicRegistry["test"];
+                Assert.That(item.Count, Is.EqualTo(3));
+                var broker = topicRegistry["test"].SingleOrDefault(x => x.Key.BrokerId == 2345);
+                Assert.That(broker, Is.Not.Null);
+            }
+        }
+
+        [Test]
+        public void ConsumerPorformsRebalancingWhenBrokerIsRemovedFromTopic()
+        {
+            var config = new ConsumerConfig(clientConfig) { AutoCommit = false, GroupId = "group1", ZkSessionTimeoutMs = 60000, ZkConnectionTimeoutMs = 60000 };
+            string brokerPath = ZooKeeperClient.DefaultBrokerIdsPath + "/" + 2345;
+            string brokerTopicPath = ZooKeeperClient.DefaultBrokerTopicsPath + "/test/" + 2345;
+            using (var consumerConnector = new ZookeeperConsumerConnector(config, true))
+            {
+                var client = ReflectionHelper.GetInstanceField<ZooKeeperClient>("zkClient", consumerConnector);
+                Assert.IsNotNull(client);
+                client.DeleteRecursive("/consumers/group1");
+                var topicCount = new Dictionary<string, int> { { "test", 1 } };
+                consumerConnector.CreateMessageStreams(topicCount);
+                WaitUntillIdle(client, 1000);
+                client.CreateEphemeral(brokerPath, "192.168.1.39-1310449279123:192.168.1.39:9102");
+                client.CreateEphemeral(brokerTopicPath, 1);
+                WaitUntillIdle(client, 1000);
+                client.DeleteRecursive(brokerTopicPath);
+                WaitUntillIdle(client, 1000);
+
+                IList<string> children = client.GetChildren("/consumers/group1/owners/test", false);
+                Assert.That(children.Count, Is.EqualTo(2));
+                Assert.That(children, Has.None.EqualTo("2345-0"));
+                var topicRegistry = ReflectionHelper.GetInstanceField<IDictionary<string, IDictionary<Partition, PartitionTopicInfo>>>("topicRegistry", consumerConnector);
+                Assert.That(topicRegistry, Is.Not.Null.And.Not.Empty);
+                Assert.That(topicRegistry.Count, Is.EqualTo(1));
+                var item = topicRegistry["test"];
+                Assert.That(item.Count, Is.EqualTo(2));
+                Assert.That(item.Where(x => x.Value.BrokerId == 2345).Count(), Is.EqualTo(0));
+            }
+        }
+
+        [Test]
+        public void ConsumerPerformsRebalancingWhenNewConsumerIsAddedAndTheyDividePartitions()
+        {
+            var config = new ConsumerConfig(clientConfig)
+            {
+                AutoCommit = false,
+                GroupId = "group1",
+                ZkSessionTimeoutMs = 60000,
+                ZkConnectionTimeoutMs = 60000
+            };
+
+            IList<string> ids;
+            IList<string> owners;
+            using (var consumerConnector = new ZookeeperConsumerConnector(config, true))
+            {
+                var client = ReflectionHelper.GetInstanceField<ZooKeeperClient>(
+                    "zkClient", consumerConnector);
+                Assert.IsNotNull(client);
+                client.DeleteRecursive("/consumers/group1");
+                var topicCount = new Dictionary<string, int> { { "test", 1 } };
+                consumerConnector.CreateMessageStreams(topicCount);
+                WaitUntillIdle(client, 1000);
+                using (var consumerConnector2 = new ZookeeperConsumerConnector(config, true))
+                {
+                    consumerConnector2.CreateMessageStreams(topicCount);
+                    WaitUntillIdle(client, 1000);
+                    ids = client.GetChildren("/consumers/group1/ids", false).ToList();
+                    owners = client.GetChildren("/consumers/group1/owners/test", false).ToList();
+
+                    Assert.That(ids, Is.Not.Null.And.Not.Empty);
+                    Assert.That(ids.Count, Is.EqualTo(2));
+                    Assert.That(owners, Is.Not.Null.And.Not.Empty);
+                    Assert.That(owners.Count, Is.EqualTo(2));
+
+                    var data1 = client.ReadData<string>("/consumers/group1/owners/test/" + owners[0], false);
+                    var data2 = client.ReadData<string>("/consumers/group1/owners/test/" + owners[1], false);
+
+                    Assert.That(data1, Is.Not.Null.And.Not.Empty);
+                    Assert.That(data2, Is.Not.Null.And.Not.Empty);
+                    Assert.That(data1, Is.Not.EqualTo(data2));
+                    Assert.That(data1, Is.StringStarting(ids[0]).Or.StringStarting(ids[1]));
+                    Assert.That(data2, Is.StringStarting(ids[0]).Or.StringStarting(ids[1]));
+                }
+            }
+        }
+
+        [Test]
+        public void ConsumerPerformsRebalancingWhenConsumerIsRemovedAndTakesItsPartitions()
+        {
+            var config = new ConsumerConfig(clientConfig)
+            {
+                AutoCommit = false,
+                GroupId = "group1",
+                ZkSessionTimeoutMs = 60000,
+                ZkConnectionTimeoutMs = 60000
+            };
+
+            IList<string> ids;
+            IList<string> owners;
+            using (var consumerConnector = new ZookeeperConsumerConnector(config, true))
+            {
+                var client = ReflectionHelper.GetInstanceField<ZooKeeperClient>("zkClient", consumerConnector);
+                Assert.IsNotNull(client);
+                client.DeleteRecursive("/consumers/group1");
+                var topicCount = new Dictionary<string, int> { { "test", 1 } };
+                consumerConnector.CreateMessageStreams(topicCount);
+                WaitUntillIdle(client, 1000);
+                using (var consumerConnector2 = new ZookeeperConsumerConnector(config, true))
+                {
+                    consumerConnector2.CreateMessageStreams(topicCount);
+                    WaitUntillIdle(client, 1000);
+                    ids = client.GetChildren("/consumers/group1/ids", false).ToList();
+                    owners = client.GetChildren("/consumers/group1/owners/test", false).ToList();
+                    Assert.That(ids, Is.Not.Null.And.Not.Empty);
+                    Assert.That(ids.Count, Is.EqualTo(2));
+                    Assert.That(owners, Is.Not.Null.And.Not.Empty);
+                    Assert.That(owners.Count, Is.EqualTo(2));
+                }
+
+                WaitUntillIdle(client, 1000);
+                ids = client.GetChildren("/consumers/group1/ids", false).ToList();
+                owners = client.GetChildren("/consumers/group1/owners/test", false).ToList();
+
+                Assert.That(ids, Is.Not.Null.And.Not.Empty);
+                Assert.That(ids.Count, Is.EqualTo(1));
+                Assert.That(owners, Is.Not.Null.And.Not.Empty);
+                Assert.That(owners.Count, Is.EqualTo(2));
+
+                var data1 = client.ReadData<string>("/consumers/group1/owners/test/" + owners[0], false);
+                var data2 = client.ReadData<string>("/consumers/group1/owners/test/" + owners[1], false);
+
+                Assert.That(data1, Is.Not.Null.And.Not.Empty);
+                Assert.That(data2, Is.Not.Null.And.Not.Empty);
+                Assert.That(data1, Is.EqualTo(data2));
+                Assert.That(data1, Is.StringStarting(ids[0]));
+            }
+        }
+    }
+}

Added: incubator/kafka/trunk/clients/csharp/src/Kafka/Tests/Kafka.Client.IntegrationTests/ConsumerTests.cs
URL: http://svn.apache.org/viewvc/incubator/kafka/trunk/clients/csharp/src/Kafka/Tests/Kafka.Client.IntegrationTests/ConsumerTests.cs?rev=1173797&view=auto
==============================================================================
--- incubator/kafka/trunk/clients/csharp/src/Kafka/Tests/Kafka.Client.IntegrationTests/ConsumerTests.cs (added)
+++ incubator/kafka/trunk/clients/csharp/src/Kafka/Tests/Kafka.Client.IntegrationTests/ConsumerTests.cs Wed Sep 21 19:17:19 2011
@@ -0,0 +1,309 @@
+/*
+ * Copyright 2011 LinkedIn
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+*/
+
+namespace Kafka.Client.IntegrationTests
+{
+    using System;
+    using System.Collections.Concurrent;
+    using System.Collections.Generic;
+    using System.Reflection;
+    using System.Text;
+    using System.Threading;
+    using Kafka.Client.Cfg;
+    using Kafka.Client.Consumers;
+    using Kafka.Client.Exceptions;
+    using Kafka.Client.Messages;
+    using Kafka.Client.Producers.Sync;
+    using Kafka.Client.Requests;
+    using NUnit.Framework;
+
+    [TestFixture]
+    public class ConsumerTests : IntegrationFixtureBase
+    {
+        /// <summary>
+        /// Kafka Client configuration
+        /// </summary>
+        private static KafkaClientConfiguration clientConfig;
+
+        [TestFixtureSetUp]
+        public void SetUp()
+        {
+            clientConfig = KafkaClientConfiguration.GetConfiguration();
+        }
+
+        [Test]
+        public void ConsumerConnectorIsCreatedConnectsDisconnectsAndShutsDown()
+        {
+            var config = new ConsumerConfig(clientConfig);
+            using (new ZookeeperConsumerConnector(config, true))
+            {
+            }
+        }
+
+        [Test]
+        public void SimpleSyncProducerSends2MessagesAndConsumerConnectorGetsThemBack()
+        {
+            // first producing
+            string payload1 = "kafka 1.";
+            byte[] payloadData1 = Encoding.UTF8.GetBytes(payload1);
+            var msg1 = new Message(payloadData1);
+
+            string payload2 = "kafka 2.";
+            byte[] payloadData2 = Encoding.UTF8.GetBytes(payload2);
+            var msg2 = new Message(payloadData2);
+
+            var producerConfig = new SyncProducerConfig(clientConfig);
+            var producer = new SyncProducer(producerConfig);
+            var producerRequest = new ProducerRequest(CurrentTestTopic, 0, new List<Message> { msg1, msg2 });
+            producer.Send(producerRequest);
+
+            // now consuming
+            var config = new ConsumerConfig(clientConfig) { AutoCommit = false };
+            var resultMessages = new List<Message>();
+            using (IConsumerConnector consumerConnector = new ZookeeperConsumerConnector(config, true))
+            {
+                var topicCount = new Dictionary<string, int> { { CurrentTestTopic, 1 } };
+                var messages = consumerConnector.CreateMessageStreams(topicCount);
+                var sets = messages[CurrentTestTopic];
+                try
+                {
+                    foreach (var set in sets)
+                    {
+                        foreach (var message in set)
+                        {
+                            resultMessages.Add(message);
+                        }
+                    }
+                }
+                catch (ConsumerTimeoutException)
+                {
+                    // do nothing, this is expected
+                }
+            }
+
+            Assert.AreEqual(2, resultMessages.Count);
+            Assert.AreEqual(msg1.ToString(), resultMessages[0].ToString());
+            Assert.AreEqual(msg2.ToString(), resultMessages[1].ToString());
+        }
+
+        [Test]
+        public void OneMessageIsSentAndReceivedThenExceptionsWhenNoMessageThenAnotherMessageIsSentAndReceived()
+        {
+            // first producing
+            string payload1 = "kafka 1.";
+            byte[] payloadData1 = Encoding.UTF8.GetBytes(payload1);
+            var msg1 = new Message(payloadData1);
+
+            var producerConfig = new SyncProducerConfig(clientConfig);
+            var producer = new SyncProducer(producerConfig);
+            var producerRequest = new ProducerRequest(CurrentTestTopic, 0, new List<Message> { msg1 });
+            producer.Send(producerRequest);
+
+            // now consuming
+            var config = new ConsumerConfig(clientConfig) { AutoCommit = false, Timeout = 5000 };
+            using (IConsumerConnector consumerConnector = new ZookeeperConsumerConnector(config, true))
+            {
+                var topicCount = new Dictionary<string, int> { { CurrentTestTopic, 1 } };
+                var messages = consumerConnector.CreateMessageStreams(topicCount);
+                var sets = messages[CurrentTestTopic];
+                KafkaMessageStream myStream = sets[0];
+                var enumerator = myStream.GetEnumerator();
+
+                Assert.IsTrue(enumerator.MoveNext());
+                Assert.AreEqual(msg1.ToString(), enumerator.Current.ToString());
+
+                Assert.Throws<ConsumerTimeoutException>(() => enumerator.MoveNext());
+
+                Assert.Throws<Exception>(() => enumerator.MoveNext()); // iterator is in failed state
+
+                enumerator.Reset();
+
+                // producing again
+                string payload2 = "kafka 2.";
+                byte[] payloadData2 = Encoding.UTF8.GetBytes(payload2);
+                var msg2 = new Message(payloadData2);
+
+                var producerRequest2 = new ProducerRequest(CurrentTestTopic, 0, new List<Message> { msg2 });
+                producer.Send(producerRequest2);
+
+                Thread.Sleep(3000);
+
+                Assert.IsTrue(enumerator.MoveNext());
+                Assert.AreEqual(msg2.ToString(), enumerator.Current.ToString());
+            }
+        }
+
+        [Test]
+        public void ConsumerConnectorConsumesTwoDifferentTopics()
+        {
+            string topic1 = CurrentTestTopic + "1";
+            string topic2 = CurrentTestTopic + "2";
+
+            // first producing
+            string payload1 = "kafka 1.";
+            byte[] payloadData1 = Encoding.UTF8.GetBytes(payload1);
+            var msg1 = new Message(payloadData1);
+
+            string payload2 = "kafka 2.";
+            byte[] payloadData2 = Encoding.UTF8.GetBytes(payload2);
+            var msg2 = new Message(payloadData2);
+
+            var producerConfig = new SyncProducerConfig(clientConfig);
+            var producer = new SyncProducer(producerConfig);
+            var producerRequest1 = new ProducerRequest(topic1, 0, new List<Message> { msg1 });
+            producer.Send(producerRequest1);
+            var producerRequest2 = new ProducerRequest(topic2, 0, new List<Message> { msg2 });
+            producer.Send(producerRequest2);
+
+            // now consuming
+            var config = new ConsumerConfig(clientConfig) { AutoCommit = false };
+            var resultMessages1 = new List<Message>();
+            var resultMessages2 = new List<Message>();
+            using (IConsumerConnector consumerConnector = new ZookeeperConsumerConnector(config, true))
+            {
+                var topicCount = new Dictionary<string, int> { { topic1, 1 }, { topic2, 1 } };
+                var messages = consumerConnector.CreateMessageStreams(topicCount);
+
+                Assert.IsTrue(messages.ContainsKey(topic1));
+                Assert.IsTrue(messages.ContainsKey(topic2));
+
+                var sets1 = messages[topic1];
+                try
+                {
+                    foreach (var set in sets1)
+                    {
+                        foreach (var message in set)
+                        {
+                            resultMessages1.Add(message);
+                        }
+                    }
+                }
+                catch (ConsumerTimeoutException)
+                {
+                    // do nothing, this is expected
+                }
+
+                var sets2 = messages[topic2];
+                try
+                {
+                    foreach (var set in sets2)
+                    {
+                        foreach (var message in set)
+                        {
+                            resultMessages2.Add(message);
+                        }
+                    }
+                }
+                catch (ConsumerTimeoutException)
+                {
+                    // do nothing, this is expected
+                }
+            }
+
+            Assert.AreEqual(1, resultMessages1.Count);
+            Assert.AreEqual(msg1.ToString(), resultMessages1[0].ToString());
+
+            Assert.AreEqual(1, resultMessages2.Count);
+            Assert.AreEqual(msg2.ToString(), resultMessages2[0].ToString());
+        }
+
+        [Test]
+        public void ConsumerConnectorReceivesAShutdownSignal()
+        {
+            // now consuming
+            var config = new ConsumerConfig(clientConfig) { AutoCommit = false };
+            using (IConsumerConnector consumerConnector = new ZookeeperConsumerConnector(config, true))
+            {
+                var topicCount = new Dictionary<string, int> { { CurrentTestTopic, 1 } };
+                var messages = consumerConnector.CreateMessageStreams(topicCount);
+
+                // putting the shutdown command into the queue
+                FieldInfo fi = typeof(ZookeeperConsumerConnector).GetField(
+                    "queues", BindingFlags.NonPublic | BindingFlags.Instance);
+                var value =
+                    (IDictionary<Tuple<string, string>, BlockingCollection<FetchedDataChunk>>)
+                    fi.GetValue(consumerConnector);
+                foreach (var topicConsumerQueueMap in value)
+                {
+                    topicConsumerQueueMap.Value.Add(ZookeeperConsumerConnector.ShutdownCommand);
+                }
+
+                var sets = messages[CurrentTestTopic];
+                var resultMessages = new List<Message>();
+
+                foreach (var set in sets)
+                {
+                    foreach (var message in set)
+                    {
+                        resultMessages.Add(message);
+                    }
+                }
+
+                Assert.AreEqual(0, resultMessages.Count);
+            }
+        }
+
+        [Test]
+        public void ProducersSendMessagesToDifferentPartitionsAndConsumerConnectorGetsThemBack()
+        {
+            // first producing
+            string payload1 = "kafka 1.";
+            byte[] payloadData1 = Encoding.UTF8.GetBytes(payload1);
+            var msg1 = new Message(payloadData1);
+
+            string payload2 = "kafka 2.";
+            byte[] payloadData2 = Encoding.UTF8.GetBytes(payload2);
+            var msg2 = new Message(payloadData2);
+
+            var producerConfig = new SyncProducerConfig(clientConfig);
+            var producer = new SyncProducer(producerConfig);
+            var producerRequest1 = new ProducerRequest(CurrentTestTopic, 0, new List<Message>() { msg1 });
+            producer.Send(producerRequest1);
+            var producerRequest2 = new ProducerRequest(CurrentTestTopic, 1, new List<Message>() { msg2 });
+            producer.Send(producerRequest2);
+
+            // now consuming
+            var config = new ConsumerConfig(clientConfig) { AutoCommit = false };
+            var resultMessages = new List<Message>();
+            using (IConsumerConnector consumerConnector = new ZookeeperConsumerConnector(config, true))
+            {
+                var topicCount = new Dictionary<string, int> { { CurrentTestTopic, 1 } };
+                var messages = consumerConnector.CreateMessageStreams(topicCount);
+
+                var sets = messages[CurrentTestTopic];
+                
+                try
+                {
+                    foreach (var set in sets)
+                    {
+                        foreach (var message in set)
+                        {
+                            resultMessages.Add(message);
+                        }
+                    }
+                }
+                catch (ConsumerTimeoutException)
+                {
+                    // do nothing, this is expected
+                }
+            }
+
+            Assert.AreEqual(2, resultMessages.Count);
+            Assert.AreEqual(msg1.ToString(), resultMessages[0].ToString());
+            Assert.AreEqual(msg2.ToString(), resultMessages[1].ToString());
+        }
+    }
+}

Added: incubator/kafka/trunk/clients/csharp/src/Kafka/Tests/Kafka.Client.IntegrationTests/IntegrationFixtureBase.cs
URL: http://svn.apache.org/viewvc/incubator/kafka/trunk/clients/csharp/src/Kafka/Tests/Kafka.Client.IntegrationTests/IntegrationFixtureBase.cs?rev=1173797&view=auto
==============================================================================
--- incubator/kafka/trunk/clients/csharp/src/Kafka/Tests/Kafka.Client.IntegrationTests/IntegrationFixtureBase.cs (added)
+++ incubator/kafka/trunk/clients/csharp/src/Kafka/Tests/Kafka.Client.IntegrationTests/IntegrationFixtureBase.cs Wed Sep 21 19:17:19 2011
@@ -0,0 +1,45 @@
+/*
+ * Copyright 2011 LinkedIn
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+*/
+
+namespace Kafka.Client.IntegrationTests
+{
+    using System;
+    using System.Threading;
+    using Kafka.Client.ZooKeeperIntegration;
+    using NUnit.Framework;
+
+    public abstract class IntegrationFixtureBase
+    {
+        protected string CurrentTestTopic { get; set; }
+
+        [SetUp]
+        public void SetupCurrentTestTopic()
+        {
+            CurrentTestTopic = TestContext.CurrentContext.Test.Name + "_" + Guid.NewGuid().ToString();
+        }
+
+        internal static void WaitUntillIdle(IZooKeeperClient client, int timeout)
+        {
+            Thread.Sleep(timeout);
+            int rest = timeout - client.IdleTime;
+            while (rest > 0)
+            {
+                Thread.Sleep(rest);
+                rest = timeout - client.IdleTime;
+            }
+        }
+    }
+}
\ No newline at end of file

Modified: incubator/kafka/trunk/clients/csharp/src/Kafka/Tests/Kafka.Client.IntegrationTests/Kafka.Client.IntegrationTests.csproj
URL: http://svn.apache.org/viewvc/incubator/kafka/trunk/clients/csharp/src/Kafka/Tests/Kafka.Client.IntegrationTests/Kafka.Client.IntegrationTests.csproj?rev=1173797&r1=1173796&r2=1173797&view=diff
==============================================================================
--- incubator/kafka/trunk/clients/csharp/src/Kafka/Tests/Kafka.Client.IntegrationTests/Kafka.Client.IntegrationTests.csproj (original)
+++ incubator/kafka/trunk/clients/csharp/src/Kafka/Tests/Kafka.Client.IntegrationTests/Kafka.Client.IntegrationTests.csproj Wed Sep 21 19:17:19 2011
@@ -1,64 +1,137 @@
-<?xml version="1.0" encoding="utf-8"?>
-<Project ToolsVersion="4.0" DefaultTargets="Build" xmlns="http://schemas.microsoft.com/developer/msbuild/2003">
-  <PropertyGroup>
-    <Configuration Condition=" '$(Configuration)' == '' ">Debug</Configuration>
-    <Platform Condition=" '$(Platform)' == '' ">AnyCPU</Platform>
-    <ProductVersion>8.0.30703</ProductVersion>
-    <SchemaVersion>2.0</SchemaVersion>
-    <ProjectGuid>{AF29C330-49BD-4648-B692-882E922C435B}</ProjectGuid>
-    <OutputType>Library</OutputType>
-    <AppDesignerFolder>Properties</AppDesignerFolder>
-    <RootNamespace>Kafka.Client.IntegrationTests</RootNamespace>
-    <AssemblyName>Kafka.Client.IntegrationTests</AssemblyName>
-    <TargetFrameworkVersion>v4.0</TargetFrameworkVersion>
-    <FileAlignment>512</FileAlignment>
-  </PropertyGroup>
-  <PropertyGroup Condition=" '$(Configuration)|$(Platform)' == 'Debug|AnyCPU' ">
-    <DebugSymbols>true</DebugSymbols>
-    <DebugType>full</DebugType>
-    <Optimize>false</Optimize>
-    <OutputPath>bin\Debug\</OutputPath>
-    <DefineConstants>DEBUG;TRACE</DefineConstants>
-    <ErrorReport>prompt</ErrorReport>
-    <WarningLevel>4</WarningLevel>
-  </PropertyGroup>
-  <PropertyGroup Condition=" '$(Configuration)|$(Platform)' == 'Release|AnyCPU' ">
-    <DebugType>pdbonly</DebugType>
-    <Optimize>true</Optimize>
-    <OutputPath>bin\Release\</OutputPath>
-    <DefineConstants>TRACE</DefineConstants>
-    <ErrorReport>prompt</ErrorReport>
-    <WarningLevel>4</WarningLevel>
-  </PropertyGroup>
-  <ItemGroup>
-    <Reference Include="nunit.framework, Version=2.5.9.10348, Culture=neutral, PublicKeyToken=96d09a1eb7f44a77, processorArchitecture=MSIL">
-      <SpecificVersion>False</SpecificVersion>
-      <HintPath>..\..\..\..\lib\nunit\2.5.9\nunit.framework.dll</HintPath>
-    </Reference>
-    <Reference Include="System" />
-    <Reference Include="System.Core" />
-    <Reference Include="System.Xml.Linq" />
-    <Reference Include="System.Data.DataSetExtensions" />
-    <Reference Include="Microsoft.CSharp" />
-    <Reference Include="System.Data" />
-    <Reference Include="System.Xml" />
-  </ItemGroup>
-  <ItemGroup>
-    <Compile Include="KafkaIntegrationTest.cs" />
-    <Compile Include="Properties\AssemblyInfo.cs" />
-  </ItemGroup>
-  <ItemGroup>
-    <ProjectReference Include="..\..\Kafka.Client\Kafka.Client.csproj">
-      <Project>{A92DD03B-EE4F-4A78-9FB2-279B6348C7D2}</Project>
-      <Name>Kafka.Client</Name>
-    </ProjectReference>
-  </ItemGroup>
-  <Import Project="$(MSBuildToolsPath)\Microsoft.CSharp.targets" />
-  <!-- To modify your build process, add your task inside one of the targets below and uncomment it. 
-       Other similar extension points exist, see Microsoft.Common.targets.
-  <Target Name="BeforeBuild">
-  </Target>
-  <Target Name="AfterBuild">
-  </Target>
-  -->
+<?xml version="1.0" encoding="utf-8"?>
+<Project ToolsVersion="4.0" DefaultTargets="Build" xmlns="http://schemas.microsoft.com/developer/msbuild/2003">
+  <PropertyGroup>
+    <Configuration Condition=" '$(Configuration)' == '' ">Debug</Configuration>
+    <Platform Condition=" '$(Platform)' == '' ">AnyCPU</Platform>
+    <ProductVersion>8.0.30703</ProductVersion>
+    <SchemaVersion>2.0</SchemaVersion>
+    <ProjectGuid>{AF29C330-49BD-4648-B692-882E922C435B}</ProjectGuid>
+    <OutputType>Library</OutputType>
+    <AppDesignerFolder>Properties</AppDesignerFolder>
+    <RootNamespace>Kafka.Client.IntegrationTests</RootNamespace>
+    <AssemblyName>Kafka.Client.IntegrationTests</AssemblyName>
+    <TargetFrameworkVersion>v4.0</TargetFrameworkVersion>
+    <FileAlignment>512</FileAlignment>
+    <CodeContractsAssemblyMode>0</CodeContractsAssemblyMode>
+  </PropertyGroup>
+  <PropertyGroup Condition=" '$(Configuration)|$(Platform)' == 'Debug|AnyCPU' ">
+    <DebugSymbols>true</DebugSymbols>
+    <DebugType>full</DebugType>
+    <Optimize>false</Optimize>
+    <OutputPath>bin\Debug\</OutputPath>
+    <DefineConstants>DEBUG;TRACE</DefineConstants>
+    <ErrorReport>prompt</ErrorReport>
+    <WarningLevel>4</WarningLevel>
+    <StyleCopTreatErrorsAsWarnings>true</StyleCopTreatErrorsAsWarnings>
+    <CodeContractsEnableRuntimeChecking>False</CodeContractsEnableRuntimeChecking>
+    <CodeContractsRuntimeOnlyPublicSurface>False</CodeContractsRuntimeOnlyPublicSurface>
+    <CodeContractsRuntimeThrowOnFailure>True</CodeContractsRuntimeThrowOnFailure>
+    <CodeContractsRuntimeCallSiteRequires>False</CodeContractsRuntimeCallSiteRequires>
+    <CodeContractsRuntimeSkipQuantifiers>False</CodeContractsRuntimeSkipQuantifiers>
+    <CodeContractsRunCodeAnalysis>False</CodeContractsRunCodeAnalysis>
+    <CodeContractsNonNullObligations>False</CodeContractsNonNullObligations>
+    <CodeContractsBoundsObligations>False</CodeContractsBoundsObligations>
+    <CodeContractsArithmeticObligations>False</CodeContractsArithmeticObligations>
+    <CodeContractsEnumObligations>False</CodeContractsEnumObligations>
+    <CodeContractsRedundantAssumptions>False</CodeContractsRedundantAssumptions>
+    <CodeContractsRunInBackground>True</CodeContractsRunInBackground>
+    <CodeContractsShowSquigglies>False</CodeContractsShowSquigglies>
+    <CodeContractsUseBaseLine>False</CodeContractsUseBaseLine>
+    <CodeContractsEmitXMLDocs>False</CodeContractsEmitXMLDocs>
+    <CodeContractsCustomRewriterAssembly />
+    <CodeContractsCustomRewriterClass />
+    <CodeContractsLibPaths />
+    <CodeContractsExtraRewriteOptions />
+    <CodeContractsExtraAnalysisOptions />
+    <CodeContractsBaseLineFile />
+    <CodeContractsCacheAnalysisResults>False</CodeContractsCacheAnalysisResults>
+    <CodeContractsRuntimeCheckingLevel>Full</CodeContractsRuntimeCheckingLevel>
+    <CodeContractsReferenceAssembly>%28none%29</CodeContractsReferenceAssembly>
+    <CodeContractsAnalysisWarningLevel>0</CodeContractsAnalysisWarningLevel>
+  </PropertyGroup>
+  <PropertyGroup Condition=" '$(Configuration)|$(Platform)' == 'Release|AnyCPU' ">
+    <DebugType>pdbonly</DebugType>
+    <Optimize>true</Optimize>
+    <OutputPath>bin\Release\</OutputPath>
+    <DefineConstants>TRACE</DefineConstants>
+    <ErrorReport>prompt</ErrorReport>
+    <WarningLevel>4</WarningLevel>
+    <StyleCopTreatErrorsAsWarnings>false</StyleCopTreatErrorsAsWarnings>
+  </PropertyGroup>
+  <PropertyGroup Condition=" '$(Configuration)|$(Platform)' == 'Integration|AnyCPU' ">
+    <DebugSymbols>true</DebugSymbols>
+    <DebugType>full</DebugType>
+    <Optimize>false</Optimize>
+    <OutputPath>bin\Integration\</OutputPath>
+    <DefineConstants>DEBUG;TRACE</DefineConstants>
+    <ErrorReport>prompt</ErrorReport>
+    <WarningLevel>4</WarningLevel>
+    <StyleCopTreatErrorsAsWarnings>false</StyleCopTreatErrorsAsWarnings>
+  </PropertyGroup>
+  <PropertyGroup>
+    <StartupObject />
+  </PropertyGroup>
+  <ItemGroup>
+    <Reference Include="log4net">
+      <HintPath>..\..\..\..\lib\log4Net\log4net.dll</HintPath>
+    </Reference>
+    <Reference Include="nunit.framework, Version=2.5.9.10348, Culture=neutral, PublicKeyToken=96d09a1eb7f44a77, processorArchitecture=MSIL">
+      <SpecificVersion>False</SpecificVersion>
+      <HintPath>..\..\..\..\lib\nunit\2.5.9\nunit.framework.dll</HintPath>
+    </Reference>
+    <Reference Include="System" />
+    <Reference Include="System.Configuration" />
+    <Reference Include="System.Core" />
+    <Reference Include="Microsoft.CSharp" />
+    <Reference Include="ZooKeeperNet">
+      <HintPath>..\..\..\..\lib\zookeeper\ZooKeeperNet.dll</HintPath>
+    </Reference>
+  </ItemGroup>
+  <ItemGroup>
+    <Compile Include="ConsumerRebalancingTests.cs" />
+    <Compile Include="ConsumerTests.cs" />
+    <Compile Include="IntegrationFixtureBase.cs" />
+    <Compile Include="KafkaIntegrationTest.cs" />
+    <Compile Include="MockAlwaysZeroPartitioner.cs" />
+    <Compile Include="ProducerTests.cs" />
+    <Compile Include="Properties\AssemblyInfo.cs" />
+    <Compile Include="TestHelper.cs" />
+    <Compile Include="TestsSetup.cs" />
+    <Compile Include="TestMultipleBrokersHelper.cs" />
+    <Compile Include="ZKBrokerPartitionInfoTests.cs" />
+    <Compile Include="ZooKeeperAwareProducerTests.cs" />
+    <Compile Include="ZooKeeperClientTests.cs" />
+    <Compile Include="ZooKeeperConnectionTests.cs" />
+  </ItemGroup>
+  <ItemGroup>
+    <ProjectReference Include="..\..\Kafka.Client\Kafka.Client.csproj">
+      <Project>{A92DD03B-EE4F-4A78-9FB2-279B6348C7D2}</Project>
+      <Name>Kafka.Client</Name>
+    </ProjectReference>
+  </ItemGroup>
+  <ItemGroup>
+    <None Include="App.config">
+      <SubType>Designer</SubType>
+    </None>
+    <None Include="Config\Debug\App.config">
+      <SubType>Designer</SubType>
+    </None>
+    <None Include="Log4Net.config">
+      <CopyToOutputDirectory>Always</CopyToOutputDirectory>
+    </None>
+    <None Include="Config\Integration\App.config">
+      <SubType>Designer</SubType>
+    </None>
+  </ItemGroup>
+  <Import Project="$(MSBuildToolsPath)\Microsoft.CSharp.targets" />
+  <Import Project="..\..\..\..\lib\StyleCop\Microsoft.StyleCop.Targets" />
+  <PropertyGroup>
+    <PostBuildEvent>
+    </PostBuildEvent>
+  </PropertyGroup>
+  <Target Name="BeforeBuild">
+    <Copy SourceFiles="Config\$(Configuration)\App.config" DestinationFiles="App.config" />
+  </Target>
+  <Target Name="AfterBuild">
+  </Target>
 </Project>
\ No newline at end of file

Modified: incubator/kafka/trunk/clients/csharp/src/Kafka/Tests/Kafka.Client.IntegrationTests/KafkaIntegrationTest.cs
URL: http://svn.apache.org/viewvc/incubator/kafka/trunk/clients/csharp/src/Kafka/Tests/Kafka.Client.IntegrationTests/KafkaIntegrationTest.cs?rev=1173797&r1=1173796&r2=1173797&view=diff
==============================================================================
--- incubator/kafka/trunk/clients/csharp/src/Kafka/Tests/Kafka.Client.IntegrationTests/KafkaIntegrationTest.cs (original)
+++ incubator/kafka/trunk/clients/csharp/src/Kafka/Tests/Kafka.Client.IntegrationTests/KafkaIntegrationTest.cs Wed Sep 21 19:17:19 2011
@@ -1,181 +1,508 @@
-using System;
-using System.Collections.Generic;
-using System.Text;
-using System.Threading;
-using Kafka.Client.Request;
-using NUnit.Framework;
-
-namespace Kafka.Client.Tests
-{
-    /// <summary>
-    /// Contains tests that go all the way to Kafka and back.
-    /// </summary>
-    [TestFixture]
-    [Ignore("Requires a Kafka server running to execute")]
-    public class KafkaIntegrationTest
-    {
-        /// <summary>
-        /// Kafka server to test against.
-        /// </summary>
-        private static readonly string KafkaServer = "192.168.50.203";
-
-        /// <summary>
-        /// Port of the Kafka server to test against.
-        /// </summary>
-        private static readonly int KafkaPort = 9092;
-
-        /// <summary>
-        /// Sends a pair of message to Kafka.
-        /// </summary>
-        [Test]
-        public void ProducerSendsMessage()
-        {
-            string payload1 = "kafka 1.";
-            byte[] payloadData1 = Encoding.UTF8.GetBytes(payload1);
-            Message msg1 = new Message(payloadData1);
-
-            string payload2 = "kafka 2.";
-            byte[] payloadData2 = Encoding.UTF8.GetBytes(payload2);
-            Message msg2 = new Message(payloadData2);
-
-            Producer producer = new Producer(KafkaServer, KafkaPort);
-            producer.Send("test", 0, new List<Message> { msg1, msg2 });
-        }
-
-        /// <summary>
-        /// Asynchronously sends a pair of message to Kafka.
-        /// </summary>
-        [Test]
-        public void ProducerSendsMessageAsynchronously()
-        {
-            bool waiting = true;
-
-            List<Message> messages = GenerateRandomMessages(50);
-
-            Producer producer = new Producer(KafkaServer, KafkaPort);
-            producer.SendAsync(
-                "test",
-                0,
-                messages,
-                (requestContext) => { waiting = false; });
-
-            while (waiting)
-            {
-                Console.WriteLine("Keep going...");
-                Thread.Sleep(10);
-            }
-        }
-
-        /// <summary>
-        /// Send a multi-produce request to Kafka.
-        /// </summary>
-        [Test]
-        public void ProducerSendMultiRequest()
-        {
-            List<ProducerRequest> requests = new List<ProducerRequest>
-            { 
-                new ProducerRequest("test", 0, new List<Message> { new Message(Encoding.UTF8.GetBytes("1: " + DateTime.UtcNow)) }),
-                new ProducerRequest("test", 0, new List<Message> { new Message(Encoding.UTF8.GetBytes("2: " + DateTime.UtcNow)) }),
-                new ProducerRequest("testa", 0, new List<Message> { new Message(Encoding.UTF8.GetBytes("3: " + DateTime.UtcNow)) }),
-                new ProducerRequest("testa", 0, new List<Message> { new Message(Encoding.UTF8.GetBytes("4: " + DateTime.UtcNow)) })
-            };
-
-            MultiProducerRequest request = new MultiProducerRequest(requests);
-            Producer producer = new Producer(KafkaServer, KafkaPort);
-            producer.Send(request);
-        }
-
-        /// <summary>
-        /// Generates messages for Kafka then gets them back.
-        /// </summary>
-        [Test]
-        public void ConsumerFetchMessage()
-        {
-            ProducerSendsMessage();
-
-            Consumer consumer = new Consumer(KafkaServer, KafkaPort);
-            List<Message> messages = consumer.Consume("test", 0, 0);
-
-            foreach (Message msg in messages)
-            {
-                Console.WriteLine(msg);
-            }
-        }
-
-        /// <summary>
-        /// Generates multiple messages for Kafka then gets them back.
-        /// </summary>
-        [Test]
-        public void ConsumerMultiFetchGetsMessage()
-        {
-            ProducerSendMultiRequest();
-
-            Consumer consumer = new Consumer(KafkaServer, KafkaPort);
-            MultiFetchRequest request = new MultiFetchRequest(new List<FetchRequest>
-            {
-                new FetchRequest("test", 0, 0),
-                new FetchRequest("test", 0, 0),
-                new FetchRequest("testa", 0, 0)
-            });
-
-            List<List<Message>> messages = consumer.Consume(request);
-
-            for (int ix = 0; ix < messages.Count; ix++)
-            {
-                List<Message> messageSet = messages[ix];
-                Console.WriteLine(string.Format("Request #{0}-->", ix));
-                foreach (Message msg in messageSet)
-                {
-                    Console.WriteLine(msg);
-                }
-            }
-        }
-
-        /// <summary>
-        /// Gets offsets from Kafka.
-        /// </summary>
-        [Test]
-        public void ConsumerGetsOffsets()
-        {
-            OffsetRequest request = new OffsetRequest("test", 0, DateTime.Now.AddHours(-24).Ticks, 10);
-
-            Consumer consumer = new Consumer(KafkaServer, KafkaPort);
-            IList<long> list = consumer.GetOffsetsBefore(request);
-
-            foreach (long l in list)
-            {
-                Console.Out.WriteLine(l);
-            }
-        }
-
-        /// <summary>
-        /// Gererates a randome list of messages.
-        /// </summary>
-        /// <param name="numberOfMessages">The number of messages to generate.</param>
-        /// <returns>A list of random messages.</returns>
-        private static List<Message> GenerateRandomMessages(int numberOfMessages)
-        {
-            List<Message> messages = new List<Message>();
-            for (int ix = 0; ix < numberOfMessages; ix++)
-            {
-                messages.Add(new Message(GenerateRandomBytes(10000)));
-            }
-
-            return messages;
-        }
-
-        /// <summary>
-        /// Generate a random set of bytes.
-        /// </summary>
-        /// <param name="length">Length of the byte array.</param>
-        /// <returns>Random byte array.</returns>
-        private static byte[] GenerateRandomBytes(int length)
-        {
-            byte[] randBytes = new byte[length];
-            Random randNum = new Random();
-            randNum.NextBytes(randBytes);
-
-            return randBytes;
-        }
-    }
-}
+/*
+ * Copyright 2011 LinkedIn
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+*/
+
+namespace Kafka.Client.IntegrationTests
+{
+    using System;
+    using System.Collections.Generic;
+    using System.Linq;
+    using System.Text;
+    using System.Threading;
+    using Kafka.Client.Cfg;
+    using Kafka.Client.Consumers;
+    using Kafka.Client.Messages;
+    using Kafka.Client.Producers.Async;
+    using Kafka.Client.Producers.Sync;
+    using Kafka.Client.Requests;
+    using NUnit.Framework;
+
+    /// <summary>
+    /// Contains tests that go all the way to Kafka and back.
+    /// </summary>
+    [TestFixture]
+    public class KafkaIntegrationTest : IntegrationFixtureBase
+    {
+        /// <summary>
+        /// Kafka Client configuration
+        /// </summary>
+        private static KafkaClientConfiguration clientConfig;
+
+        /// <summary>
+        /// Maximum amount of time to wait trying to get a specific test message from Kafka server (in miliseconds)
+        /// </summary>
+        private static readonly int MaxTestWaitTimeInMiliseconds = 5000;
+
+        [TestFixtureSetUp]
+        public void SetUp()
+        {
+            clientConfig = KafkaClientConfiguration.GetConfiguration();
+        }
+
+        /// <summary>
+        /// Sends a pair of message to Kafka.
+        /// </summary>
+        [Test]
+        public void ProducerSendsMessage()
+        {
+            string payload1 = "kafka 1.";
+            byte[] payloadData1 = Encoding.UTF8.GetBytes(payload1);
+            Message msg1 = new Message(payloadData1);
+ 
+            string payload2 = "kafka 2.";
+            byte[] payloadData2 = Encoding.UTF8.GetBytes(payload2);
+            Message msg2 = new Message(payloadData2);
+
+            var config = new SyncProducerConfig(clientConfig);
+            var producer = new SyncProducer(config);
+            var producerRequest = new ProducerRequest(CurrentTestTopic, 0, new List<Message>() { msg1, msg2 });
+            producer.Send(producerRequest);
+        }
+
+        /// <summary>
+        /// Sends a message with long topic to Kafka.
+        /// </summary>
+        [Test]
+        public void ProducerSendsMessageWithLongTopic()
+        {
+            Message msg = new Message(Encoding.UTF8.GetBytes("test message"));
+            string topic = "ThisIsAVeryLongTopicThisIsAVeryLongTopicThisIsAVeryLongTopicThisIsAVeryLongTopicThisIsAVeryLongTopicThisIsAVeryLongTopic";
+            var config = new SyncProducerConfig(clientConfig);
+            var producer = new SyncProducer(config);
+            var producerRequest = new ProducerRequest(topic, 0, new List<Message>() { msg });
+            producer.Send(producerRequest);
+        }
+
+        /// <summary>
+        /// Asynchronously sends many random messages to Kafka
+        /// </summary>
+        [Test]
+        public void AsyncProducerSendsManyLongRandomMessages()
+        {
+            List<Message> messages = GenerateRandomTextMessages(50);
+
+            var config = new AsyncProducerConfig(clientConfig);
+
+            var producer = new AsyncProducer(config);
+            producer.Send(CurrentTestTopic, 0, messages);
+        }
+
+        /// <summary>
+        /// Asynchronously sends few short fixed messages to Kafka
+        /// </summary>
+        [Test]
+        public void AsyncProducerSendsFewShortFixedMessages()
+        {
+            List<Message> messages = new List<Message>()
+                                         {
+                                             new Message(Encoding.UTF8.GetBytes("Async Test Message 1")),
+                                             new Message(Encoding.UTF8.GetBytes("Async Test Message 2")),
+                                             new Message(Encoding.UTF8.GetBytes("Async Test Message 3")),
+                                             new Message(Encoding.UTF8.GetBytes("Async Test Message 4"))
+                                         };
+
+            var config = new AsyncProducerConfig(clientConfig);
+
+            var producer = new AsyncProducer(config);
+            producer.Send(CurrentTestTopic, 0, messages);
+        }
+
+        /// <summary>
+        /// Asynchronously sends few short fixed messages to Kafka in separate send actions
+        /// </summary>
+        [Test]
+        public void AsyncProducerSendsFewShortFixedMessagesInSeparateSendActions()
+        {
+            var config = new AsyncProducerConfig(clientConfig);
+            using (var producer = new AsyncProducer(config))
+            {
+                ProducerRequest req1 = new ProducerRequest(
+                    CurrentTestTopic,
+                    0,
+                    new List<Message>() { new Message(Encoding.UTF8.GetBytes("Async Test Message 1")) });
+                producer.Send(req1);
+
+                ProducerRequest req2 = new ProducerRequest(
+                    CurrentTestTopic,
+                    0,
+                    new List<Message>() { new Message(Encoding.UTF8.GetBytes("Async Test Message 2")) });
+                producer.Send(req2);
+
+                ProducerRequest req3 = new ProducerRequest(
+                    CurrentTestTopic,
+                    0,
+                    new List<Message>() { new Message(Encoding.UTF8.GetBytes("Async Test Message 3")) });
+                producer.Send(req3);
+            }
+        }
+
+        [Test]
+        public void AsyncProducerSendsMessageWithCallbackClass()
+        {
+            List<Message> messages = new List<Message>()
+                                         {
+                                             new Message(Encoding.UTF8.GetBytes("Async Test Message 1")),
+                                         };
+            var config = new AsyncProducerConfig(clientConfig);
+            TestCallbackHandler myHandler = new TestCallbackHandler();
+            var producer = new AsyncProducer(config, myHandler);
+            producer.Send(CurrentTestTopic, 0, messages);
+            Thread.Sleep(1000);
+            Assert.IsTrue(myHandler.WasRun);
+        }
+
+        [Test]
+        public void AsyncProducerSendsMessageWithCallback()
+        {
+            List<Message> messages = new List<Message>()
+                                         {
+                                             new Message(Encoding.UTF8.GetBytes("Async Test Message 1")),
+                                         };
+            var config = new AsyncProducerConfig(clientConfig);
+            TestCallbackHandler myHandler = new TestCallbackHandler();
+            var producer = new AsyncProducer(config);
+            producer.Send(CurrentTestTopic, 0, messages, myHandler.Handle);
+            Thread.Sleep(1000);
+            Assert.IsTrue(myHandler.WasRun);
+        }
+
+        private class TestCallbackHandler : ICallbackHandler
+        {
+            public bool WasRun { get; private set; }
+
+            public void Handle(RequestContext<ProducerRequest> context)
+            {
+                WasRun = true;
+            }
+        }
+
+        /// <summary>
+        /// Send a multi-produce request to Kafka.
+        /// </summary>
+        [Test]
+        public void ProducerSendMultiRequest()
+        {
+            List<ProducerRequest> requests = new List<ProducerRequest>
+            { 
+                new ProducerRequest(CurrentTestTopic, 0, new List<Message> { new Message(Encoding.UTF8.GetBytes("1: " + DateTime.UtcNow)) }),
+                new ProducerRequest(CurrentTestTopic, 0, new List<Message> { new Message(Encoding.UTF8.GetBytes("2: " + DateTime.UtcNow)) }),
+                new ProducerRequest(CurrentTestTopic, 0, new List<Message> { new Message(Encoding.UTF8.GetBytes("3: " + DateTime.UtcNow)) }),
+                new ProducerRequest(CurrentTestTopic, 0, new List<Message> { new Message(Encoding.UTF8.GetBytes("4: " + DateTime.UtcNow)) })
+            };
+
+            var config = new SyncProducerConfig(clientConfig);
+            var producer = new SyncProducer(config);
+            producer.MultiSend(requests);
+        }
+
+        /// <summary>
+        /// Generates messages for Kafka then gets them back.
+        /// </summary>
+        [Test]
+        public void ConsumerFetchMessage()
+        {
+            ProducerSendsMessage();
+
+            ConsumerConfig config = new ConsumerConfig(clientConfig);
+            IConsumer consumer = new Kafka.Client.Consumers.Consumer(config);
+            FetchRequest request = new FetchRequest(CurrentTestTopic, 0, 0);
+            BufferedMessageSet response = consumer.Fetch(request);
+            Assert.NotNull(response);
+            foreach (var message in response.Messages)
+            {
+                Console.WriteLine(message);
+            }
+        }
+
+        /// <summary>
+        /// Generates multiple messages for Kafka then gets them back.
+        /// </summary>
+        [Test]
+        public void ConsumerMultiFetchGetsMessage()
+        {
+            ProducerSendMultiRequest();
+
+            ConsumerConfig config = new ConsumerConfig(clientConfig);
+            IConsumer cons = new Consumers.Consumer(config);
+            MultiFetchRequest request = new MultiFetchRequest(new List<FetchRequest>
+            {
+                new FetchRequest(CurrentTestTopic, 0, 0),
+                new FetchRequest(CurrentTestTopic, 0, 0),
+                new FetchRequest(CurrentTestTopic + "2", 0, 0)
+            });
+
+            IList<BufferedMessageSet> response = cons.MultiFetch(request);
+            for (int ix = 0; ix < response.Count; ix++)
+            {
+                IEnumerable<Message> messageSet = response[ix].Messages;
+                Console.WriteLine(string.Format("Request #{0}-->", ix));
+                foreach (Message msg in messageSet)
+                {
+                    Console.WriteLine(msg);
+                }
+            }
+        }
+
+        /// <summary>
+        /// Gets offsets from Kafka.
+        /// </summary>
+        [Test]
+        public void ConsumerGetsOffsets()
+        {
+            OffsetRequest request = new OffsetRequest(CurrentTestTopic, 0, DateTime.Now.AddHours(-24).Ticks, 10);
+
+            ConsumerConfig config = new ConsumerConfig(clientConfig);
+            IConsumer consumer = new Consumers.Consumer(config);
+            IList<long> list = consumer.GetOffsetsBefore(request);
+
+            foreach (long l in list)
+            {
+                Console.Out.WriteLine(l);
+            }
+        }
+
+        /// <summary>
+        /// Synchronous Producer sends a single simple message and then a consumer consumes it
+        /// </summary>
+        [Test]
+        public void ProducerSendsAndConsumerReceivesSingleSimpleMessage()
+        {
+            Message sourceMessage = new Message(Encoding.UTF8.GetBytes("test message"));
+
+            var config = new SyncProducerConfig(clientConfig);
+            var producer = new SyncProducer(config);
+            var producerRequest = new ProducerRequest(CurrentTestTopic, 0, new List<Message>() { sourceMessage });
+
+            long currentOffset = TestHelper.GetCurrentKafkaOffset(CurrentTestTopic, clientConfig);
+
+            producer.Send(producerRequest);
+
+            ConsumerConfig consumerConfig = new ConsumerConfig(clientConfig);
+            IConsumer consumer = new Consumers.Consumer(consumerConfig);
+            FetchRequest request = new FetchRequest(CurrentTestTopic, 0, currentOffset);
+
+            BufferedMessageSet response;
+            int totalWaitTimeInMiliseconds = 0;
+            int waitSingle = 100;
+            while (true)
+            {
+                Thread.Sleep(waitSingle);
+                response = consumer.Fetch(request);
+                if (response != null && response.Messages.Count() > 0)
+                {
+                    break;
+                }
+                else
+                {
+                    totalWaitTimeInMiliseconds += waitSingle;
+                    if (totalWaitTimeInMiliseconds >= MaxTestWaitTimeInMiliseconds)
+                    {
+                        break;
+                    }
+                }
+            }
+
+            Assert.NotNull(response);
+            Assert.AreEqual(1, response.Messages.Count());
+            Message resultMessage = response.Messages.First();
+            Assert.AreEqual(sourceMessage.ToString(), resultMessage.ToString());
+        }
+
+        /// <summary>
+        /// Asynchronous Producer sends a single simple message and then a consumer consumes it
+        /// </summary>
+        [Test]
+        public void AsyncProducerSendsAndConsumerReceivesSingleSimpleMessage()
+        {
+            Message sourceMessage = new Message(Encoding.UTF8.GetBytes("test message"));
+
+            var config = new AsyncProducerConfig(clientConfig);
+            var producer = new AsyncProducer(config);
+            var producerRequest = new ProducerRequest(CurrentTestTopic, 0, new List<Message>() { sourceMessage });
+
+            long currentOffset = TestHelper.GetCurrentKafkaOffset(CurrentTestTopic, clientConfig);
+
+            producer.Send(producerRequest);
+
+            ConsumerConfig consumerConfig = new ConsumerConfig(clientConfig);
+            IConsumer consumer = new Consumers.Consumer(consumerConfig);
+            FetchRequest request = new FetchRequest(CurrentTestTopic, 0, currentOffset);
+
+            BufferedMessageSet response;
+            int totalWaitTimeInMiliseconds = 0;
+            int waitSingle = 100;
+            while (true)
+            {
+                Thread.Sleep(waitSingle);
+                response = consumer.Fetch(request);
+                if (response != null && response.Messages.Count() > 0)
+                {
+                    break;
+                }
+                else
+                {
+                    totalWaitTimeInMiliseconds += waitSingle;
+                    if (totalWaitTimeInMiliseconds >= MaxTestWaitTimeInMiliseconds)
+                    {
+                        break;
+                    }
+                }
+            }
+
+            Assert.NotNull(response);
+            Assert.AreEqual(1, response.Messages.Count());
+            Message resultMessage = response.Messages.First();
+            Assert.AreEqual(sourceMessage.ToString(), resultMessage.ToString());
+        }
+
+        /// <summary>
+        /// Synchronous producer sends a multi request and a consumer receives it from to Kafka.
+        /// </summary>
+        [Test]
+        public void ProducerSendsAndConsumerReceivesMultiRequest()
+        {
+            string testTopic1 = CurrentTestTopic + "1";
+            string testTopic2 = CurrentTestTopic + "2";
+            string testTopic3 = CurrentTestTopic + "3";
+
+            Message sourceMessage1 = new Message(Encoding.UTF8.GetBytes("1: TestMessage"));
+            Message sourceMessage2 = new Message(Encoding.UTF8.GetBytes("2: TestMessage"));
+            Message sourceMessage3 = new Message(Encoding.UTF8.GetBytes("3: TestMessage"));
+            Message sourceMessage4 = new Message(Encoding.UTF8.GetBytes("4: TestMessage"));
+
+            List<ProducerRequest> requests = new List<ProducerRequest>
+            { 
+                new ProducerRequest(testTopic1, 0, new List<Message> { sourceMessage1 }),
+                new ProducerRequest(testTopic1, 0, new List<Message> { sourceMessage2 }),
+                new ProducerRequest(testTopic2, 0, new List<Message> { sourceMessage3 }),
+                new ProducerRequest(testTopic3, 0, new List<Message> { sourceMessage4 })
+            };
+
+            var config = new SyncProducerConfig(clientConfig);
+            var producer = new SyncProducer(config);
+
+            long currentOffset1 = TestHelper.GetCurrentKafkaOffset(testTopic1, clientConfig);
+            long currentOffset2 = TestHelper.GetCurrentKafkaOffset(testTopic2, clientConfig);
+            long currentOffset3 = TestHelper.GetCurrentKafkaOffset(testTopic3, clientConfig);
+
+            producer.MultiSend(requests);
+
+            ConsumerConfig consumerConfig = new ConsumerConfig(clientConfig);
+            IConsumer consumer = new Consumers.Consumer(consumerConfig);
+            MultiFetchRequest request = new MultiFetchRequest(new List<FetchRequest>
+            {
+                new FetchRequest(testTopic1, 0, currentOffset1),
+                new FetchRequest(testTopic2, 0, currentOffset2),
+                new FetchRequest(testTopic3, 0, currentOffset3)
+            });
+            IList<BufferedMessageSet> messageSets;
+            int totalWaitTimeInMiliseconds = 0;
+            int waitSingle = 100;
+            while (true)
+            {
+                Thread.Sleep(waitSingle);
+                messageSets = consumer.MultiFetch(request);
+                if (messageSets.Count > 2 && messageSets[0].Messages.Count() > 0 && messageSets[1].Messages.Count() > 0 && messageSets[2].Messages.Count() > 0)
+                {
+                    break;
+                }
+                else
+                {
+                    totalWaitTimeInMiliseconds += waitSingle;
+                    if (totalWaitTimeInMiliseconds >= MaxTestWaitTimeInMiliseconds)
+                    {
+                        break;
+                    }
+                }
+            }
+
+            Assert.AreEqual(3, messageSets.Count);
+            Assert.AreEqual(2, messageSets[0].Messages.Count());
+            Assert.AreEqual(1, messageSets[1].Messages.Count());
+            Assert.AreEqual(1, messageSets[2].Messages.Count());
+            Assert.AreEqual(sourceMessage1.ToString(), messageSets[0].Messages.First().ToString());
+            Assert.AreEqual(sourceMessage2.ToString(), messageSets[0].Messages.Skip(1).First().ToString());
+            Assert.AreEqual(sourceMessage3.ToString(), messageSets[1].Messages.First().ToString());
+            Assert.AreEqual(sourceMessage4.ToString(), messageSets[2].Messages.First().ToString());
+        }
+
+        /// <summary>
+        /// Gererates a randome list of messages.
+        /// </summary>
+        /// <param name="numberOfMessages">The number of messages to generate.</param>
+        /// <returns>A list of random messages.</returns>
+        private static List<Message> GenerateRandomMessages(int numberOfMessages)
+        {
+            List<Message> messages = new List<Message>();
+            for (int ix = 0; ix < numberOfMessages; ix++)
+            {
+                messages.Add(new Message(GenerateRandomBytes(10000)));
+            }
+
+            return messages;
+        }
+
+        /// <summary>
+        /// Generate a random set of bytes.
+        /// </summary>
+        /// <param name="length">Length of the byte array.</param>
+        /// <returns>Random byte array.</returns>
+        private static byte[] GenerateRandomBytes(int length)
+        {
+            byte[] randBytes = new byte[length];
+            Random randNum = new Random();
+            randNum.NextBytes(randBytes);
+
+            return randBytes;
+        }
+
+        /// <summary>
+        /// Gererates a randome list of text messages.
+        /// </summary>
+        /// <param name="numberOfMessages">The number of messages to generate.</param>
+        /// <returns>A list of random text messages.</returns>
+        private static List<Message> GenerateRandomTextMessages(int numberOfMessages)
+        {
+            List<Message> messages = new List<Message>();
+            for (int ix = 0; ix < numberOfMessages; ix++)
+            {
+                ////messages.Add(new Message(GenerateRandomBytes(10000)));
+                messages.Add(new Message(Encoding.UTF8.GetBytes(GenerateRandomMessage(10000))));
+            }
+
+            return messages;
+        }
+
+        /// <summary>
+        /// Generate a random message text.
+        /// </summary>
+        /// <param name="length">Length of the message string.</param>
+        /// <returns>Random message string.</returns>
+        private static string GenerateRandomMessage(int length)
+        {
+            StringBuilder builder = new StringBuilder();
+            Random random = new Random();
+            char ch;
+            for (int i = 0; i < length; i++)
+            {
+                ch = Convert.ToChar(Convert.ToInt32(
+                    Math.Floor((26 * random.NextDouble()) + 65)));
+                builder.Append(ch);
+            }
+
+            return builder.ToString();
+        }
+    }
+}

Added: incubator/kafka/trunk/clients/csharp/src/Kafka/Tests/Kafka.Client.IntegrationTests/Log4Net.config
URL: http://svn.apache.org/viewvc/incubator/kafka/trunk/clients/csharp/src/Kafka/Tests/Kafka.Client.IntegrationTests/Log4Net.config?rev=1173797&view=auto
==============================================================================
--- incubator/kafka/trunk/clients/csharp/src/Kafka/Tests/Kafka.Client.IntegrationTests/Log4Net.config (added)
+++ incubator/kafka/trunk/clients/csharp/src/Kafka/Tests/Kafka.Client.IntegrationTests/Log4Net.config Wed Sep 21 19:17:19 2011
@@ -0,0 +1,39 @@
+<?xml version="1.0" encoding="utf-8" ?>
+<log4net>
+    <root>
+        <level value="ALL" />
+        <!--<appender-ref ref="ConsoleAppender" />-->
+        <appender-ref ref="KafkaFileAppender" />
+        <appender-ref ref="ZookeeperFileAppender" />
+    </root>
+    <!--<appender name="ConsoleAppender" type="log4net.Appender.ConsoleAppender">
+        <layout type="log4net.Layout.PatternLayout">
+            <conversionPattern value="%-5level - %message - %logger%newline" />
+        </layout>
+    </appender>-->
+    <appender name="KafkaFileAppender" type="log4net.Appender.FileAppender">
+        <filter type="log4net.Filter.LoggerMatchFilter">
+            <LoggerToMatch value="Kafka.Client."/>
+        </filter>
+        <filter type="log4net.Filter.DenyAllFilter" />
+        <file value="kafka-logs.txt" />
+        <appendToFile value="false" />
+        <layout type="log4net.Layout.PatternLayout">
+            <conversionPattern value="[%-5level] - [%logger] - %message%newline" />
+        </layout>
+    </appender>
+    <appender name="ZookeeperFileAppender" type="log4net.Appender.FileAppender">
+        <filter type="log4net.Filter.LoggerMatchFilter">
+            <LoggerToMatch value="ZooKeeperNet."/>
+        </filter>
+        <filter type="log4net.Filter.LoggerMatchFilter">
+            <LoggerToMatch value="Org.Apache.Zookeeper.Data."/>
+        </filter>
+        <filter type="log4net.Filter.DenyAllFilter" />
+        <file value="zookeeper-logs.txt" />
+        <appendToFile value="false" />
+        <layout type="log4net.Layout.PatternLayout">
+            <conversionPattern value="[%-5level] - [%logger] - %message%newline" />
+        </layout>
+    </appender>
+</log4net>
\ No newline at end of file

Added: incubator/kafka/trunk/clients/csharp/src/Kafka/Tests/Kafka.Client.IntegrationTests/MockAlwaysZeroPartitioner.cs
URL: http://svn.apache.org/viewvc/incubator/kafka/trunk/clients/csharp/src/Kafka/Tests/Kafka.Client.IntegrationTests/MockAlwaysZeroPartitioner.cs?rev=1173797&view=auto
==============================================================================
--- incubator/kafka/trunk/clients/csharp/src/Kafka/Tests/Kafka.Client.IntegrationTests/MockAlwaysZeroPartitioner.cs (added)
+++ incubator/kafka/trunk/clients/csharp/src/Kafka/Tests/Kafka.Client.IntegrationTests/MockAlwaysZeroPartitioner.cs Wed Sep 21 19:17:19 2011
@@ -0,0 +1,19 @@
+using System;
+using System.Collections.Generic;
+using System.Linq;
+using System.Text;
+using Kafka.Client.Producers.Partitioning;
+
+namespace Kafka.Client.IntegrationTests
+{
+    /// <summary>
+    /// This mock partitioner will always point to the first partition (the one of index = 0)
+    /// </summary>
+    public class MockAlwaysZeroPartitioner : IPartitioner<string>
+    {
+        public int Partition(string key, int numPartitions)
+        {
+            return 0;
+        }
+    }
+}

Added: incubator/kafka/trunk/clients/csharp/src/Kafka/Tests/Kafka.Client.IntegrationTests/ProducerTests.cs
URL: http://svn.apache.org/viewvc/incubator/kafka/trunk/clients/csharp/src/Kafka/Tests/Kafka.Client.IntegrationTests/ProducerTests.cs?rev=1173797&view=auto
==============================================================================
--- incubator/kafka/trunk/clients/csharp/src/Kafka/Tests/Kafka.Client.IntegrationTests/ProducerTests.cs (added)
+++ incubator/kafka/trunk/clients/csharp/src/Kafka/Tests/Kafka.Client.IntegrationTests/ProducerTests.cs Wed Sep 21 19:17:19 2011
@@ -0,0 +1,244 @@
+/*
+ * Copyright 2011 LinkedIn
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+*/
+
+namespace Kafka.Client.IntegrationTests
+{
+    using System.Collections.Generic;
+    using System.Linq;
+    using System.Text;
+    using System.Threading;
+    using Kafka.Client.Cfg;
+    using Kafka.Client.Consumers;
+    using Kafka.Client.Messages;
+    using Kafka.Client.Producers;
+    using Kafka.Client.Requests;
+    using Kafka.Client.Serialization;
+    using NUnit.Framework;
+
+    [TestFixture]
+    public class ProducerTests : IntegrationFixtureBase
+    {
+        /// <summary>
+        /// Kafka Client configuration
+        /// </summary>
+        private KafkaClientConfiguration clientConfig;
+
+        /// <summary>
+        /// Maximum amount of time to wait trying to get a specific test message from Kafka server (in miliseconds)
+        /// </summary>
+        private readonly int maxTestWaitTimeInMiliseconds = 5000;
+
+        [TestFixtureSetUp]
+        public void SetUp()
+        {
+            clientConfig = KafkaClientConfiguration.GetConfiguration();
+            clientConfig.SupressZooKeeper();
+        }
+
+        [Test]
+        public void ProducerSends1Message()
+        {
+            int totalWaitTimeInMiliseconds = 0;
+            int waitSingle = 100;
+            var originalMessage = new Message(Encoding.UTF8.GetBytes("TestData"));
+
+            var multipleBrokersHelper = new TestMultipleBrokersHelper(CurrentTestTopic);
+            multipleBrokersHelper.GetCurrentOffsets();
+
+            var producerConfig = new ProducerConfig(clientConfig);
+            var mockPartitioner = new MockAlwaysZeroPartitioner();
+            using (var producer = new Producer<string, Message>(producerConfig, mockPartitioner, new DefaultEncoder(), null))
+            {
+                var producerData = new ProducerData<string, Message>(
+                    CurrentTestTopic, "somekey", new List<Message> { originalMessage });
+                producer.Send(producerData);
+                Thread.Sleep(waitSingle);
+
+                while (!multipleBrokersHelper.CheckIfAnyBrokerHasChanged())
+                {
+                    totalWaitTimeInMiliseconds += waitSingle;
+                    Thread.Sleep(waitSingle);
+                    if (totalWaitTimeInMiliseconds > this.maxTestWaitTimeInMiliseconds)
+                    {
+                        Assert.Fail("None of the brokers changed their offset after sending a message");
+                    }
+                }
+
+                totalWaitTimeInMiliseconds = 0;
+
+                var consumerConfig = new ConsumerConfig(clientConfig)
+                    {
+                        Host = multipleBrokersHelper.BrokerThatHasChanged.Address,
+                        Port = multipleBrokersHelper.BrokerThatHasChanged.Port
+                    };
+                IConsumer consumer = new Consumers.Consumer(consumerConfig);
+                var request = new FetchRequest(CurrentTestTopic, 0, multipleBrokersHelper.OffsetFromBeforeTheChange);
+
+                BufferedMessageSet response;
+
+                while (true)
+                {
+                    Thread.Sleep(waitSingle);
+                    response = consumer.Fetch(request);
+                    if (response != null && response.Messages.Count() > 0)
+                    {
+                        break;
+                    }
+
+                    totalWaitTimeInMiliseconds += waitSingle;
+                    if (totalWaitTimeInMiliseconds >= this.maxTestWaitTimeInMiliseconds)
+                    {
+                        break;
+                    }
+                }
+
+                Assert.NotNull(response);
+                Assert.AreEqual(1, response.Messages.Count());
+                Assert.AreEqual(originalMessage.ToString(), response.Messages.First().ToString());
+            }
+        }
+
+        [Test]
+        public void ProducerSends3Messages()
+        {
+            int totalWaitTimeInMiliseconds = 0;
+            int waitSingle = 100;
+            var originalMessage1 = new Message(Encoding.UTF8.GetBytes("TestData1"));
+            var originalMessage2 = new Message(Encoding.UTF8.GetBytes("TestData2"));
+            var originalMessage3 = new Message(Encoding.UTF8.GetBytes("TestData3"));
+            var originalMessageList =
+                new List<Message> { originalMessage1, originalMessage2, originalMessage3 };
+
+            var multipleBrokersHelper = new TestMultipleBrokersHelper(CurrentTestTopic);
+            multipleBrokersHelper.GetCurrentOffsets();
+
+            var producerConfig = new ProducerConfig(clientConfig);
+            var mockPartitioner = new MockAlwaysZeroPartitioner();
+            using (var producer = new Producer<string, Message>(producerConfig, mockPartitioner, new DefaultEncoder(), null))
+            {
+                var producerData = new ProducerData<string, Message>(CurrentTestTopic, "somekey", originalMessageList);
+                producer.Send(producerData);
+                Thread.Sleep(waitSingle);
+
+                while (!multipleBrokersHelper.CheckIfAnyBrokerHasChanged())
+                {
+                    totalWaitTimeInMiliseconds += waitSingle;
+                    Thread.Sleep(waitSingle);
+                    if (totalWaitTimeInMiliseconds > this.maxTestWaitTimeInMiliseconds)
+                    {
+                        Assert.Fail("None of the brokers changed their offset after sending a message");
+                    }
+                }
+
+                totalWaitTimeInMiliseconds = 0;
+
+                var consumerConfig = new ConsumerConfig(clientConfig)
+                    {
+                        Host = multipleBrokersHelper.BrokerThatHasChanged.Address,
+                        Port = multipleBrokersHelper.BrokerThatHasChanged.Port
+                    };
+                IConsumer consumer = new Consumers.Consumer(consumerConfig);
+                var request = new FetchRequest(CurrentTestTopic, 0, multipleBrokersHelper.OffsetFromBeforeTheChange);
+
+                BufferedMessageSet response;
+                while (true)
+                {
+                    Thread.Sleep(waitSingle);
+                    response = consumer.Fetch(request);
+                    if (response != null && response.Messages.Count() > 2)
+                    {
+                        break;
+                    }
+
+                    totalWaitTimeInMiliseconds += waitSingle;
+                    if (totalWaitTimeInMiliseconds >= this.maxTestWaitTimeInMiliseconds)
+                    {
+                        break;
+                    }
+                }
+
+                Assert.NotNull(response);
+                Assert.AreEqual(3, response.Messages.Count());
+                Assert.AreEqual(originalMessage1.ToString(), response.Messages.First().ToString());
+                Assert.AreEqual(originalMessage2.ToString(), response.Messages.Skip(1).First().ToString());
+                Assert.AreEqual(originalMessage3.ToString(), response.Messages.Skip(2).First().ToString());
+            }
+        }
+
+        [Test]
+        public void ProducerSends1MessageUsingNotDefaultEncoder()
+        {
+            int totalWaitTimeInMiliseconds = 0;
+            int waitSingle = 100;
+            string originalMessage = "TestData";
+
+            var multipleBrokersHelper = new TestMultipleBrokersHelper(CurrentTestTopic);
+            multipleBrokersHelper.GetCurrentOffsets();
+
+            var producerConfig = new ProducerConfig(clientConfig);
+            var mockPartitioner = new MockAlwaysZeroPartitioner();
+            using (var producer = new Producer<string, string>(producerConfig, mockPartitioner, new StringEncoder(), null))
+            {
+                var producerData = new ProducerData<string, string>(
+                    CurrentTestTopic, "somekey", new List<string> { originalMessage });
+
+                producer.Send(producerData);
+                Thread.Sleep(waitSingle);
+
+                while (!multipleBrokersHelper.CheckIfAnyBrokerHasChanged())
+                {
+                    totalWaitTimeInMiliseconds += waitSingle;
+                    Thread.Sleep(waitSingle);
+                    if (totalWaitTimeInMiliseconds > this.maxTestWaitTimeInMiliseconds)
+                    {
+                        Assert.Fail("None of the brokers changed their offset after sending a message");
+                    }
+                }
+
+                totalWaitTimeInMiliseconds = 0;
+
+                var consumerConfig = new ConsumerConfig(clientConfig)
+                    {
+                        Host = multipleBrokersHelper.BrokerThatHasChanged.Address,
+                        Port = multipleBrokersHelper.BrokerThatHasChanged.Port
+                    };
+                IConsumer consumer = new Consumers.Consumer(consumerConfig);
+                var request = new FetchRequest(CurrentTestTopic, 0, multipleBrokersHelper.OffsetFromBeforeTheChange);
+
+                BufferedMessageSet response;
+                while (true)
+                {
+                    Thread.Sleep(waitSingle);
+                    response = consumer.Fetch(request);
+                    if (response != null && response.Messages.Count() > 0)
+                    {
+                        break;
+                    }
+
+                    totalWaitTimeInMiliseconds += waitSingle;
+                    if (totalWaitTimeInMiliseconds >= this.maxTestWaitTimeInMiliseconds)
+                    {
+                        break;
+                    }
+                }
+
+                Assert.NotNull(response);
+                Assert.AreEqual(1, response.Messages.Count());
+                Assert.AreEqual(originalMessage, Encoding.UTF8.GetString(response.Messages.First().Payload));
+            }
+        }
+    }
+}

Modified: incubator/kafka/trunk/clients/csharp/src/Kafka/Tests/Kafka.Client.IntegrationTests/Properties/AssemblyInfo.cs
URL: http://svn.apache.org/viewvc/incubator/kafka/trunk/clients/csharp/src/Kafka/Tests/Kafka.Client.IntegrationTests/Properties/AssemblyInfo.cs?rev=1173797&r1=1173796&r2=1173797&view=diff
==============================================================================
--- incubator/kafka/trunk/clients/csharp/src/Kafka/Tests/Kafka.Client.IntegrationTests/Properties/AssemblyInfo.cs (original)
+++ incubator/kafka/trunk/clients/csharp/src/Kafka/Tests/Kafka.Client.IntegrationTests/Properties/AssemblyInfo.cs Wed Sep 21 19:17:19 2011
@@ -1,36 +1,35 @@
-using System.Reflection;
-using System.Runtime.CompilerServices;
-using System.Runtime.InteropServices;
-
-// General Information about an assembly is controlled through the following 
-// set of attributes. Change these attribute values to modify the information
-// associated with an assembly.
-[assembly: AssemblyTitle("Kafka.Client.IntegrationTests")]
-[assembly: AssemblyDescription("")]
-[assembly: AssemblyConfiguration("")]
-[assembly: AssemblyCompany("Microsoft")]
-[assembly: AssemblyProduct("Kafka.Client.IntegrationTests")]
-[assembly: AssemblyCopyright("Copyright © Microsoft 2011")]
-[assembly: AssemblyTrademark("")]
-[assembly: AssemblyCulture("")]
-
-// Setting ComVisible to false makes the types in this assembly not visible 
-// to COM components.  If you need to access a type in this assembly from 
-// COM, set the ComVisible attribute to true on that type.
-[assembly: ComVisible(false)]
-
-// The following GUID is for the ID of the typelib if this project is exposed to COM
-[assembly: Guid("7b2387b7-6a58-4e8b-ae06-8aadf1a64949")]
-
-// Version information for an assembly consists of the following four values:
-//
-//      Major Version
-//      Minor Version 
-//      Build Number
-//      Revision
-//
-// You can specify all the values or you can default the Build and Revision Numbers 
-// by using the '*' as shown below:
-// [assembly: AssemblyVersion("1.0.*")]
-[assembly: AssemblyVersion("1.0.0.0")]
-[assembly: AssemblyFileVersion("1.0.0.0")]
+using System.Reflection;
+using System.Runtime.InteropServices;
+
+// General Information about an assembly is controlled through the following 
+// set of attributes. Change these attribute values to modify the information
+// associated with an assembly.
+[assembly: AssemblyTitle("Kafka.Client.IntegrationTests")]
+[assembly: AssemblyDescription("")]
+[assembly: AssemblyConfiguration("")]
+[assembly: AssemblyCompany("Microsoft")]
+[assembly: AssemblyProduct("Kafka.Client.IntegrationTests")]
+[assembly: AssemblyCopyright("Copyright © Microsoft 2011")]
+[assembly: AssemblyTrademark("")]
+[assembly: AssemblyCulture("")]
+
+// Setting ComVisible to false makes the types in this assembly not visible 
+// to COM components.  If you need to access a type in this assembly from 
+// COM, set the ComVisible attribute to true on that type.
+[assembly: ComVisible(false)]
+
+// The following GUID is for the ID of the typelib if this project is exposed to COM
+[assembly: Guid("7b2387b7-6a58-4e8b-ae06-8aadf1a64949")]
+
+// Version information for an assembly consists of the following four values:
+//
+//      Major Version
+//      Minor Version 
+//      Build Number
+//      Revision
+//
+// You can specify all the values or you can default the Build and Revision Numbers 
+// by using the '*' as shown below:
+// [assembly: AssemblyVersion("1.0.*")]
+[assembly: AssemblyVersion("1.0.0.0")]
+[assembly: AssemblyFileVersion("1.0.0.0")]