You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by ju...@apache.org on 2011/09/21 21:17:25 UTC
svn commit: r1173797 [8/10] - in /incubator/kafka/trunk/clients/csharp: ./
lib/StyleCop/ src/Kafka/ src/Kafka/Kafka.Client/
src/Kafka/Kafka.Client/Cfg/ src/Kafka/Kafka.Client/Cluster/
src/Kafka/Kafka.Client/Consumers/ src/Kafka/Kafka.Client/Exceptions/...
Added: incubator/kafka/trunk/clients/csharp/src/Kafka/Tests/Kafka.Client.IntegrationTests/Config/Integration/App.config
URL: http://svn.apache.org/viewvc/incubator/kafka/trunk/clients/csharp/src/Kafka/Tests/Kafka.Client.IntegrationTests/Config/Integration/App.config?rev=1173797&view=auto
==============================================================================
--- incubator/kafka/trunk/clients/csharp/src/Kafka/Tests/Kafka.Client.IntegrationTests/Config/Integration/App.config (added)
+++ incubator/kafka/trunk/clients/csharp/src/Kafka/Tests/Kafka.Client.IntegrationTests/Config/Integration/App.config Wed Sep 21 19:17:19 2011
@@ -0,0 +1,21 @@
+<?xml version="1.0" encoding="utf-8" ?>
+<configuration>
+ <configSections>
+ <section
+ name="kafkaClientConfiguration"
+ type="Kafka.Client.Cfg.KafkaClientConfiguration, Kafka.Client"
+ allowLocation="true"
+ allowDefinition="Everywhere"
+ />
+ </configSections>
+ <kafkaClientConfiguration>
+ <kafkaServer address="192.168.1.39" port="9092"></kafkaServer>
+ <consumer numberOfTries="2" groupId="testGroup" timeout="10000" autoOffsetReset="smallest" autoCommit="true" autoCommitIntervalMs="10000" fetchSize="307200" backOffIncrementMs="2000"/>
+ <brokerPartitionInfos>
+ <add id="0" address="192.168.1.39" port="9092" />
+ <add id="1" address="192.168.1.39" port="9101" />
+ <add id="2" address="192.168.1.39" port="9102" />
+ </brokerPartitionInfos>
+ <zooKeeperServers addressList="192.168.1.39:2181" sessionTimeout="30000" connectionTimeout="3000"></zooKeeperServers>
+ </kafkaClientConfiguration>
+</configuration>
\ No newline at end of file
Added: incubator/kafka/trunk/clients/csharp/src/Kafka/Tests/Kafka.Client.IntegrationTests/ConsumerRebalancingTests.cs
URL: http://svn.apache.org/viewvc/incubator/kafka/trunk/clients/csharp/src/Kafka/Tests/Kafka.Client.IntegrationTests/ConsumerRebalancingTests.cs?rev=1173797&view=auto
==============================================================================
--- incubator/kafka/trunk/clients/csharp/src/Kafka/Tests/Kafka.Client.IntegrationTests/ConsumerRebalancingTests.cs (added)
+++ incubator/kafka/trunk/clients/csharp/src/Kafka/Tests/Kafka.Client.IntegrationTests/ConsumerRebalancingTests.cs Wed Sep 21 19:17:19 2011
@@ -0,0 +1,267 @@
+/*
+ * Copyright 2011 LinkedIn
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+*/
+
+namespace Kafka.Client.IntegrationTests
+{
+ using System.Collections.Generic;
+ using System.Linq;
+ using System.Threading;
+ using Kafka.Client.Cfg;
+ using Kafka.Client.Cluster;
+ using Kafka.Client.Consumers;
+ using Kafka.Client.Utils;
+ using Kafka.Client.ZooKeeperIntegration;
+ using NUnit.Framework;
+
+ [TestFixture]
+ public class ConsumerRebalancingTests : IntegrationFixtureBase
+ {
+ /// <summary>
+ /// Kafka Client configuration
+ /// </summary>
+ private static KafkaClientConfiguration clientConfig;
+
+ [TestFixtureSetUp]
+ public void SetUp()
+ {
+ clientConfig = KafkaClientConfiguration.GetConfiguration();
+ }
+
+ [Test]
+ public void ConsumerPorformsRebalancingOnStart()
+ {
+ var config = new ConsumerConfig(clientConfig) { AutoCommit = false, GroupId = "group1" };
+ using (var consumerConnector = new ZookeeperConsumerConnector(config, true))
+ {
+ ZooKeeperClient client = ReflectionHelper.GetInstanceField<ZooKeeperClient>("zkClient", consumerConnector);
+ Assert.IsNotNull(client);
+ client.DeleteRecursive("/consumers/group1");
+ var topicCount = new Dictionary<string, int> { { "test", 1 } };
+ consumerConnector.CreateMessageStreams(topicCount);
+ WaitUntillIdle(client, 1000);
+ IList<string> children = client.GetChildren("/consumers", false);
+ Assert.That(children, Is.Not.Null.And.Not.Empty);
+ Assert.That(children, Contains.Item("group1"));
+ children = client.GetChildren("/consumers/group1", false);
+ Assert.That(children, Is.Not.Null.And.Not.Empty);
+ Assert.That(children, Contains.Item("ids"));
+ Assert.That(children, Contains.Item("owners"));
+ children = client.GetChildren("/consumers/group1/ids", false);
+ Assert.That(children, Is.Not.Null.And.Not.Empty);
+ string consumerId = children[0];
+ children = client.GetChildren("/consumers/group1/owners", false);
+ Assert.That(children, Is.Not.Null.And.Not.Empty);
+ Assert.That(children.Count, Is.EqualTo(1));
+ Assert.That(children, Contains.Item("test"));
+ children = client.GetChildren("/consumers/group1/owners/test", false);
+ Assert.That(children, Is.Not.Null.And.Not.Empty);
+ Assert.That(children.Count, Is.EqualTo(2));
+ string partId = children[0];
+ string data = client.ReadData<string>("/consumers/group1/owners/test/" + partId);
+ Assert.That(data, Is.Not.Null.And.Not.Empty);
+ Assert.That(data, Contains.Substring(consumerId));
+ data = client.ReadData<string>("/consumers/group1/ids/" + consumerId);
+ Assert.That(data, Is.Not.Null.And.Not.Empty);
+ Assert.That(data, Is.EqualTo("{ \"test\": 1 }"));
+ }
+
+ using (var client = new ZooKeeperClient(config.ZkConnect, config.ZkSessionTimeoutMs, ZooKeeperStringSerializer.Serializer))
+ {
+ client.Connect();
+ //// Should be created as ephemeral
+ IList<string> children = client.GetChildren("/consumers/group1/ids");
+ Assert.That(children, Is.Null.Or.Empty);
+ //// Should be created as ephemeral
+ children = client.GetChildren("/consumers/group1/owners/test");
+ Assert.That(children, Is.Null.Or.Empty);
+ }
+ }
+
+ [Test]
+ public void ConsumerPorformsRebalancingWhenNewBrokerIsAddedToTopic()
+ {
+ var config = new ConsumerConfig(clientConfig)
+ {
+ AutoCommit = false,
+ GroupId = "group1",
+ ZkSessionTimeoutMs = 60000,
+ ZkConnectionTimeoutMs = 60000
+ };
+ string brokerPath = ZooKeeperClient.DefaultBrokerIdsPath + "/" + 2345;
+ string brokerTopicPath = ZooKeeperClient.DefaultBrokerTopicsPath + "/test/" + 2345;
+ using (var consumerConnector = new ZookeeperConsumerConnector(config, true))
+ {
+ var client = ReflectionHelper.GetInstanceField<ZooKeeperClient>(
+ "zkClient", consumerConnector);
+ Assert.IsNotNull(client);
+ client.DeleteRecursive("/consumers/group1");
+ var topicCount = new Dictionary<string, int> { { "test", 1 } };
+ consumerConnector.CreateMessageStreams(topicCount);
+ WaitUntillIdle(client, 1000);
+ IList<string> children = client.GetChildren("/consumers/group1/ids", false);
+ string consumerId = children[0];
+ client.CreateEphemeral(brokerPath, "192.168.1.39-1310449279123:192.168.1.39:9102");
+ client.CreateEphemeral(brokerTopicPath, 1);
+ WaitUntillIdle(client, 500);
+ children = client.GetChildren("/consumers/group1/owners/test", false);
+ Assert.That(children.Count, Is.EqualTo(3));
+ Assert.That(children, Contains.Item("2345-0"));
+ string data = client.ReadData<string>("/consumers/group1/owners/test/2345-0");
+ Assert.That(data, Is.Not.Null);
+ Assert.That(data, Contains.Substring(consumerId));
+ var topicRegistry =
+ ReflectionHelper.GetInstanceField<IDictionary<string, IDictionary<Partition, PartitionTopicInfo>>>(
+ "topicRegistry", consumerConnector);
+ Assert.That(topicRegistry, Is.Not.Null.And.Not.Empty);
+ Assert.That(topicRegistry.Count, Is.EqualTo(1));
+ var item = topicRegistry["test"];
+ Assert.That(item.Count, Is.EqualTo(3));
+ var broker = topicRegistry["test"].SingleOrDefault(x => x.Key.BrokerId == 2345);
+ Assert.That(broker, Is.Not.Null);
+ }
+ }
+
+ [Test]
+ public void ConsumerPorformsRebalancingWhenBrokerIsRemovedFromTopic()
+ {
+ var config = new ConsumerConfig(clientConfig) { AutoCommit = false, GroupId = "group1", ZkSessionTimeoutMs = 60000, ZkConnectionTimeoutMs = 60000 };
+ string brokerPath = ZooKeeperClient.DefaultBrokerIdsPath + "/" + 2345;
+ string brokerTopicPath = ZooKeeperClient.DefaultBrokerTopicsPath + "/test/" + 2345;
+ using (var consumerConnector = new ZookeeperConsumerConnector(config, true))
+ {
+ var client = ReflectionHelper.GetInstanceField<ZooKeeperClient>("zkClient", consumerConnector);
+ Assert.IsNotNull(client);
+ client.DeleteRecursive("/consumers/group1");
+ var topicCount = new Dictionary<string, int> { { "test", 1 } };
+ consumerConnector.CreateMessageStreams(topicCount);
+ WaitUntillIdle(client, 1000);
+ client.CreateEphemeral(brokerPath, "192.168.1.39-1310449279123:192.168.1.39:9102");
+ client.CreateEphemeral(brokerTopicPath, 1);
+ WaitUntillIdle(client, 1000);
+ client.DeleteRecursive(brokerTopicPath);
+ WaitUntillIdle(client, 1000);
+
+ IList<string> children = client.GetChildren("/consumers/group1/owners/test", false);
+ Assert.That(children.Count, Is.EqualTo(2));
+ Assert.That(children, Has.None.EqualTo("2345-0"));
+ var topicRegistry = ReflectionHelper.GetInstanceField<IDictionary<string, IDictionary<Partition, PartitionTopicInfo>>>("topicRegistry", consumerConnector);
+ Assert.That(topicRegistry, Is.Not.Null.And.Not.Empty);
+ Assert.That(topicRegistry.Count, Is.EqualTo(1));
+ var item = topicRegistry["test"];
+ Assert.That(item.Count, Is.EqualTo(2));
+ Assert.That(item.Where(x => x.Value.BrokerId == 2345).Count(), Is.EqualTo(0));
+ }
+ }
+
+ [Test]
+ public void ConsumerPerformsRebalancingWhenNewConsumerIsAddedAndTheyDividePartitions()
+ {
+ var config = new ConsumerConfig(clientConfig)
+ {
+ AutoCommit = false,
+ GroupId = "group1",
+ ZkSessionTimeoutMs = 60000,
+ ZkConnectionTimeoutMs = 60000
+ };
+
+ IList<string> ids;
+ IList<string> owners;
+ using (var consumerConnector = new ZookeeperConsumerConnector(config, true))
+ {
+ var client = ReflectionHelper.GetInstanceField<ZooKeeperClient>(
+ "zkClient", consumerConnector);
+ Assert.IsNotNull(client);
+ client.DeleteRecursive("/consumers/group1");
+ var topicCount = new Dictionary<string, int> { { "test", 1 } };
+ consumerConnector.CreateMessageStreams(topicCount);
+ WaitUntillIdle(client, 1000);
+ using (var consumerConnector2 = new ZookeeperConsumerConnector(config, true))
+ {
+ consumerConnector2.CreateMessageStreams(topicCount);
+ WaitUntillIdle(client, 1000);
+ ids = client.GetChildren("/consumers/group1/ids", false).ToList();
+ owners = client.GetChildren("/consumers/group1/owners/test", false).ToList();
+
+ Assert.That(ids, Is.Not.Null.And.Not.Empty);
+ Assert.That(ids.Count, Is.EqualTo(2));
+ Assert.That(owners, Is.Not.Null.And.Not.Empty);
+ Assert.That(owners.Count, Is.EqualTo(2));
+
+ var data1 = client.ReadData<string>("/consumers/group1/owners/test/" + owners[0], false);
+ var data2 = client.ReadData<string>("/consumers/group1/owners/test/" + owners[1], false);
+
+ Assert.That(data1, Is.Not.Null.And.Not.Empty);
+ Assert.That(data2, Is.Not.Null.And.Not.Empty);
+ Assert.That(data1, Is.Not.EqualTo(data2));
+ Assert.That(data1, Is.StringStarting(ids[0]).Or.StringStarting(ids[1]));
+ Assert.That(data2, Is.StringStarting(ids[0]).Or.StringStarting(ids[1]));
+ }
+ }
+ }
+
+ [Test]
+ public void ConsumerPerformsRebalancingWhenConsumerIsRemovedAndTakesItsPartitions()
+ {
+ var config = new ConsumerConfig(clientConfig)
+ {
+ AutoCommit = false,
+ GroupId = "group1",
+ ZkSessionTimeoutMs = 60000,
+ ZkConnectionTimeoutMs = 60000
+ };
+
+ IList<string> ids;
+ IList<string> owners;
+ using (var consumerConnector = new ZookeeperConsumerConnector(config, true))
+ {
+ var client = ReflectionHelper.GetInstanceField<ZooKeeperClient>("zkClient", consumerConnector);
+ Assert.IsNotNull(client);
+ client.DeleteRecursive("/consumers/group1");
+ var topicCount = new Dictionary<string, int> { { "test", 1 } };
+ consumerConnector.CreateMessageStreams(topicCount);
+ WaitUntillIdle(client, 1000);
+ using (var consumerConnector2 = new ZookeeperConsumerConnector(config, true))
+ {
+ consumerConnector2.CreateMessageStreams(topicCount);
+ WaitUntillIdle(client, 1000);
+ ids = client.GetChildren("/consumers/group1/ids", false).ToList();
+ owners = client.GetChildren("/consumers/group1/owners/test", false).ToList();
+ Assert.That(ids, Is.Not.Null.And.Not.Empty);
+ Assert.That(ids.Count, Is.EqualTo(2));
+ Assert.That(owners, Is.Not.Null.And.Not.Empty);
+ Assert.That(owners.Count, Is.EqualTo(2));
+ }
+
+ WaitUntillIdle(client, 1000);
+ ids = client.GetChildren("/consumers/group1/ids", false).ToList();
+ owners = client.GetChildren("/consumers/group1/owners/test", false).ToList();
+
+ Assert.That(ids, Is.Not.Null.And.Not.Empty);
+ Assert.That(ids.Count, Is.EqualTo(1));
+ Assert.That(owners, Is.Not.Null.And.Not.Empty);
+ Assert.That(owners.Count, Is.EqualTo(2));
+
+ var data1 = client.ReadData<string>("/consumers/group1/owners/test/" + owners[0], false);
+ var data2 = client.ReadData<string>("/consumers/group1/owners/test/" + owners[1], false);
+
+ Assert.That(data1, Is.Not.Null.And.Not.Empty);
+ Assert.That(data2, Is.Not.Null.And.Not.Empty);
+ Assert.That(data1, Is.EqualTo(data2));
+ Assert.That(data1, Is.StringStarting(ids[0]));
+ }
+ }
+ }
+}
Added: incubator/kafka/trunk/clients/csharp/src/Kafka/Tests/Kafka.Client.IntegrationTests/ConsumerTests.cs
URL: http://svn.apache.org/viewvc/incubator/kafka/trunk/clients/csharp/src/Kafka/Tests/Kafka.Client.IntegrationTests/ConsumerTests.cs?rev=1173797&view=auto
==============================================================================
--- incubator/kafka/trunk/clients/csharp/src/Kafka/Tests/Kafka.Client.IntegrationTests/ConsumerTests.cs (added)
+++ incubator/kafka/trunk/clients/csharp/src/Kafka/Tests/Kafka.Client.IntegrationTests/ConsumerTests.cs Wed Sep 21 19:17:19 2011
@@ -0,0 +1,309 @@
+/*
+ * Copyright 2011 LinkedIn
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+*/
+
+namespace Kafka.Client.IntegrationTests
+{
+ using System;
+ using System.Collections.Concurrent;
+ using System.Collections.Generic;
+ using System.Reflection;
+ using System.Text;
+ using System.Threading;
+ using Kafka.Client.Cfg;
+ using Kafka.Client.Consumers;
+ using Kafka.Client.Exceptions;
+ using Kafka.Client.Messages;
+ using Kafka.Client.Producers.Sync;
+ using Kafka.Client.Requests;
+ using NUnit.Framework;
+
+ [TestFixture]
+ public class ConsumerTests : IntegrationFixtureBase
+ {
+ /// <summary>
+ /// Kafka Client configuration
+ /// </summary>
+ private static KafkaClientConfiguration clientConfig;
+
+ [TestFixtureSetUp]
+ public void SetUp()
+ {
+ clientConfig = KafkaClientConfiguration.GetConfiguration();
+ }
+
+ [Test]
+ public void ConsumerConnectorIsCreatedConnectsDisconnectsAndShutsDown()
+ {
+ var config = new ConsumerConfig(clientConfig);
+ using (new ZookeeperConsumerConnector(config, true))
+ {
+ }
+ }
+
+ [Test]
+ public void SimpleSyncProducerSends2MessagesAndConsumerConnectorGetsThemBack()
+ {
+ // first producing
+ string payload1 = "kafka 1.";
+ byte[] payloadData1 = Encoding.UTF8.GetBytes(payload1);
+ var msg1 = new Message(payloadData1);
+
+ string payload2 = "kafka 2.";
+ byte[] payloadData2 = Encoding.UTF8.GetBytes(payload2);
+ var msg2 = new Message(payloadData2);
+
+ var producerConfig = new SyncProducerConfig(clientConfig);
+ var producer = new SyncProducer(producerConfig);
+ var producerRequest = new ProducerRequest(CurrentTestTopic, 0, new List<Message> { msg1, msg2 });
+ producer.Send(producerRequest);
+
+ // now consuming
+ var config = new ConsumerConfig(clientConfig) { AutoCommit = false };
+ var resultMessages = new List<Message>();
+ using (IConsumerConnector consumerConnector = new ZookeeperConsumerConnector(config, true))
+ {
+ var topicCount = new Dictionary<string, int> { { CurrentTestTopic, 1 } };
+ var messages = consumerConnector.CreateMessageStreams(topicCount);
+ var sets = messages[CurrentTestTopic];
+ try
+ {
+ foreach (var set in sets)
+ {
+ foreach (var message in set)
+ {
+ resultMessages.Add(message);
+ }
+ }
+ }
+ catch (ConsumerTimeoutException)
+ {
+ // do nothing, this is expected
+ }
+ }
+
+ Assert.AreEqual(2, resultMessages.Count);
+ Assert.AreEqual(msg1.ToString(), resultMessages[0].ToString());
+ Assert.AreEqual(msg2.ToString(), resultMessages[1].ToString());
+ }
+
+ [Test]
+ public void OneMessageIsSentAndReceivedThenExceptionsWhenNoMessageThenAnotherMessageIsSentAndReceived()
+ {
+ // first producing
+ string payload1 = "kafka 1.";
+ byte[] payloadData1 = Encoding.UTF8.GetBytes(payload1);
+ var msg1 = new Message(payloadData1);
+
+ var producerConfig = new SyncProducerConfig(clientConfig);
+ var producer = new SyncProducer(producerConfig);
+ var producerRequest = new ProducerRequest(CurrentTestTopic, 0, new List<Message> { msg1 });
+ producer.Send(producerRequest);
+
+ // now consuming
+ var config = new ConsumerConfig(clientConfig) { AutoCommit = false, Timeout = 5000 };
+ using (IConsumerConnector consumerConnector = new ZookeeperConsumerConnector(config, true))
+ {
+ var topicCount = new Dictionary<string, int> { { CurrentTestTopic, 1 } };
+ var messages = consumerConnector.CreateMessageStreams(topicCount);
+ var sets = messages[CurrentTestTopic];
+ KafkaMessageStream myStream = sets[0];
+ var enumerator = myStream.GetEnumerator();
+
+ Assert.IsTrue(enumerator.MoveNext());
+ Assert.AreEqual(msg1.ToString(), enumerator.Current.ToString());
+
+ Assert.Throws<ConsumerTimeoutException>(() => enumerator.MoveNext());
+
+ Assert.Throws<Exception>(() => enumerator.MoveNext()); // iterator is in failed state
+
+ enumerator.Reset();
+
+ // producing again
+ string payload2 = "kafka 2.";
+ byte[] payloadData2 = Encoding.UTF8.GetBytes(payload2);
+ var msg2 = new Message(payloadData2);
+
+ var producerRequest2 = new ProducerRequest(CurrentTestTopic, 0, new List<Message> { msg2 });
+ producer.Send(producerRequest2);
+
+ Thread.Sleep(3000);
+
+ Assert.IsTrue(enumerator.MoveNext());
+ Assert.AreEqual(msg2.ToString(), enumerator.Current.ToString());
+ }
+ }
+
+ [Test]
+ public void ConsumerConnectorConsumesTwoDifferentTopics()
+ {
+ string topic1 = CurrentTestTopic + "1";
+ string topic2 = CurrentTestTopic + "2";
+
+ // first producing
+ string payload1 = "kafka 1.";
+ byte[] payloadData1 = Encoding.UTF8.GetBytes(payload1);
+ var msg1 = new Message(payloadData1);
+
+ string payload2 = "kafka 2.";
+ byte[] payloadData2 = Encoding.UTF8.GetBytes(payload2);
+ var msg2 = new Message(payloadData2);
+
+ var producerConfig = new SyncProducerConfig(clientConfig);
+ var producer = new SyncProducer(producerConfig);
+ var producerRequest1 = new ProducerRequest(topic1, 0, new List<Message> { msg1 });
+ producer.Send(producerRequest1);
+ var producerRequest2 = new ProducerRequest(topic2, 0, new List<Message> { msg2 });
+ producer.Send(producerRequest2);
+
+ // now consuming
+ var config = new ConsumerConfig(clientConfig) { AutoCommit = false };
+ var resultMessages1 = new List<Message>();
+ var resultMessages2 = new List<Message>();
+ using (IConsumerConnector consumerConnector = new ZookeeperConsumerConnector(config, true))
+ {
+ var topicCount = new Dictionary<string, int> { { topic1, 1 }, { topic2, 1 } };
+ var messages = consumerConnector.CreateMessageStreams(topicCount);
+
+ Assert.IsTrue(messages.ContainsKey(topic1));
+ Assert.IsTrue(messages.ContainsKey(topic2));
+
+ var sets1 = messages[topic1];
+ try
+ {
+ foreach (var set in sets1)
+ {
+ foreach (var message in set)
+ {
+ resultMessages1.Add(message);
+ }
+ }
+ }
+ catch (ConsumerTimeoutException)
+ {
+ // do nothing, this is expected
+ }
+
+ var sets2 = messages[topic2];
+ try
+ {
+ foreach (var set in sets2)
+ {
+ foreach (var message in set)
+ {
+ resultMessages2.Add(message);
+ }
+ }
+ }
+ catch (ConsumerTimeoutException)
+ {
+ // do nothing, this is expected
+ }
+ }
+
+ Assert.AreEqual(1, resultMessages1.Count);
+ Assert.AreEqual(msg1.ToString(), resultMessages1[0].ToString());
+
+ Assert.AreEqual(1, resultMessages2.Count);
+ Assert.AreEqual(msg2.ToString(), resultMessages2[0].ToString());
+ }
+
+ [Test]
+ public void ConsumerConnectorReceivesAShutdownSignal()
+ {
+ // now consuming
+ var config = new ConsumerConfig(clientConfig) { AutoCommit = false };
+ using (IConsumerConnector consumerConnector = new ZookeeperConsumerConnector(config, true))
+ {
+ var topicCount = new Dictionary<string, int> { { CurrentTestTopic, 1 } };
+ var messages = consumerConnector.CreateMessageStreams(topicCount);
+
+ // putting the shutdown command into the queue
+ FieldInfo fi = typeof(ZookeeperConsumerConnector).GetField(
+ "queues", BindingFlags.NonPublic | BindingFlags.Instance);
+ var value =
+ (IDictionary<Tuple<string, string>, BlockingCollection<FetchedDataChunk>>)
+ fi.GetValue(consumerConnector);
+ foreach (var topicConsumerQueueMap in value)
+ {
+ topicConsumerQueueMap.Value.Add(ZookeeperConsumerConnector.ShutdownCommand);
+ }
+
+ var sets = messages[CurrentTestTopic];
+ var resultMessages = new List<Message>();
+
+ foreach (var set in sets)
+ {
+ foreach (var message in set)
+ {
+ resultMessages.Add(message);
+ }
+ }
+
+ Assert.AreEqual(0, resultMessages.Count);
+ }
+ }
+
+ [Test]
+ public void ProducersSendMessagesToDifferentPartitionsAndConsumerConnectorGetsThemBack()
+ {
+ // first producing
+ string payload1 = "kafka 1.";
+ byte[] payloadData1 = Encoding.UTF8.GetBytes(payload1);
+ var msg1 = new Message(payloadData1);
+
+ string payload2 = "kafka 2.";
+ byte[] payloadData2 = Encoding.UTF8.GetBytes(payload2);
+ var msg2 = new Message(payloadData2);
+
+ var producerConfig = new SyncProducerConfig(clientConfig);
+ var producer = new SyncProducer(producerConfig);
+ var producerRequest1 = new ProducerRequest(CurrentTestTopic, 0, new List<Message>() { msg1 });
+ producer.Send(producerRequest1);
+ var producerRequest2 = new ProducerRequest(CurrentTestTopic, 1, new List<Message>() { msg2 });
+ producer.Send(producerRequest2);
+
+ // now consuming
+ var config = new ConsumerConfig(clientConfig) { AutoCommit = false };
+ var resultMessages = new List<Message>();
+ using (IConsumerConnector consumerConnector = new ZookeeperConsumerConnector(config, true))
+ {
+ var topicCount = new Dictionary<string, int> { { CurrentTestTopic, 1 } };
+ var messages = consumerConnector.CreateMessageStreams(topicCount);
+
+ var sets = messages[CurrentTestTopic];
+
+ try
+ {
+ foreach (var set in sets)
+ {
+ foreach (var message in set)
+ {
+ resultMessages.Add(message);
+ }
+ }
+ }
+ catch (ConsumerTimeoutException)
+ {
+ // do nothing, this is expected
+ }
+ }
+
+ Assert.AreEqual(2, resultMessages.Count);
+ Assert.AreEqual(msg1.ToString(), resultMessages[0].ToString());
+ Assert.AreEqual(msg2.ToString(), resultMessages[1].ToString());
+ }
+ }
+}
Added: incubator/kafka/trunk/clients/csharp/src/Kafka/Tests/Kafka.Client.IntegrationTests/IntegrationFixtureBase.cs
URL: http://svn.apache.org/viewvc/incubator/kafka/trunk/clients/csharp/src/Kafka/Tests/Kafka.Client.IntegrationTests/IntegrationFixtureBase.cs?rev=1173797&view=auto
==============================================================================
--- incubator/kafka/trunk/clients/csharp/src/Kafka/Tests/Kafka.Client.IntegrationTests/IntegrationFixtureBase.cs (added)
+++ incubator/kafka/trunk/clients/csharp/src/Kafka/Tests/Kafka.Client.IntegrationTests/IntegrationFixtureBase.cs Wed Sep 21 19:17:19 2011
@@ -0,0 +1,45 @@
+/*
+ * Copyright 2011 LinkedIn
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+*/
+
+namespace Kafka.Client.IntegrationTests
+{
+ using System;
+ using System.Threading;
+ using Kafka.Client.ZooKeeperIntegration;
+ using NUnit.Framework;
+
+ public abstract class IntegrationFixtureBase
+ {
+ protected string CurrentTestTopic { get; set; }
+
+ [SetUp]
+ public void SetupCurrentTestTopic()
+ {
+ CurrentTestTopic = TestContext.CurrentContext.Test.Name + "_" + Guid.NewGuid().ToString();
+ }
+
+ internal static void WaitUntillIdle(IZooKeeperClient client, int timeout)
+ {
+ Thread.Sleep(timeout);
+ int rest = timeout - client.IdleTime;
+ while (rest > 0)
+ {
+ Thread.Sleep(rest);
+ rest = timeout - client.IdleTime;
+ }
+ }
+ }
+}
\ No newline at end of file
Modified: incubator/kafka/trunk/clients/csharp/src/Kafka/Tests/Kafka.Client.IntegrationTests/Kafka.Client.IntegrationTests.csproj
URL: http://svn.apache.org/viewvc/incubator/kafka/trunk/clients/csharp/src/Kafka/Tests/Kafka.Client.IntegrationTests/Kafka.Client.IntegrationTests.csproj?rev=1173797&r1=1173796&r2=1173797&view=diff
==============================================================================
--- incubator/kafka/trunk/clients/csharp/src/Kafka/Tests/Kafka.Client.IntegrationTests/Kafka.Client.IntegrationTests.csproj (original)
+++ incubator/kafka/trunk/clients/csharp/src/Kafka/Tests/Kafka.Client.IntegrationTests/Kafka.Client.IntegrationTests.csproj Wed Sep 21 19:17:19 2011
@@ -1,64 +1,137 @@
-<?xml version="1.0" encoding="utf-8"?>
-<Project ToolsVersion="4.0" DefaultTargets="Build" xmlns="http://schemas.microsoft.com/developer/msbuild/2003">
- <PropertyGroup>
- <Configuration Condition=" '$(Configuration)' == '' ">Debug</Configuration>
- <Platform Condition=" '$(Platform)' == '' ">AnyCPU</Platform>
- <ProductVersion>8.0.30703</ProductVersion>
- <SchemaVersion>2.0</SchemaVersion>
- <ProjectGuid>{AF29C330-49BD-4648-B692-882E922C435B}</ProjectGuid>
- <OutputType>Library</OutputType>
- <AppDesignerFolder>Properties</AppDesignerFolder>
- <RootNamespace>Kafka.Client.IntegrationTests</RootNamespace>
- <AssemblyName>Kafka.Client.IntegrationTests</AssemblyName>
- <TargetFrameworkVersion>v4.0</TargetFrameworkVersion>
- <FileAlignment>512</FileAlignment>
- </PropertyGroup>
- <PropertyGroup Condition=" '$(Configuration)|$(Platform)' == 'Debug|AnyCPU' ">
- <DebugSymbols>true</DebugSymbols>
- <DebugType>full</DebugType>
- <Optimize>false</Optimize>
- <OutputPath>bin\Debug\</OutputPath>
- <DefineConstants>DEBUG;TRACE</DefineConstants>
- <ErrorReport>prompt</ErrorReport>
- <WarningLevel>4</WarningLevel>
- </PropertyGroup>
- <PropertyGroup Condition=" '$(Configuration)|$(Platform)' == 'Release|AnyCPU' ">
- <DebugType>pdbonly</DebugType>
- <Optimize>true</Optimize>
- <OutputPath>bin\Release\</OutputPath>
- <DefineConstants>TRACE</DefineConstants>
- <ErrorReport>prompt</ErrorReport>
- <WarningLevel>4</WarningLevel>
- </PropertyGroup>
- <ItemGroup>
- <Reference Include="nunit.framework, Version=2.5.9.10348, Culture=neutral, PublicKeyToken=96d09a1eb7f44a77, processorArchitecture=MSIL">
- <SpecificVersion>False</SpecificVersion>
- <HintPath>..\..\..\..\lib\nunit\2.5.9\nunit.framework.dll</HintPath>
- </Reference>
- <Reference Include="System" />
- <Reference Include="System.Core" />
- <Reference Include="System.Xml.Linq" />
- <Reference Include="System.Data.DataSetExtensions" />
- <Reference Include="Microsoft.CSharp" />
- <Reference Include="System.Data" />
- <Reference Include="System.Xml" />
- </ItemGroup>
- <ItemGroup>
- <Compile Include="KafkaIntegrationTest.cs" />
- <Compile Include="Properties\AssemblyInfo.cs" />
- </ItemGroup>
- <ItemGroup>
- <ProjectReference Include="..\..\Kafka.Client\Kafka.Client.csproj">
- <Project>{A92DD03B-EE4F-4A78-9FB2-279B6348C7D2}</Project>
- <Name>Kafka.Client</Name>
- </ProjectReference>
- </ItemGroup>
- <Import Project="$(MSBuildToolsPath)\Microsoft.CSharp.targets" />
- <!-- To modify your build process, add your task inside one of the targets below and uncomment it.
- Other similar extension points exist, see Microsoft.Common.targets.
- <Target Name="BeforeBuild">
- </Target>
- <Target Name="AfterBuild">
- </Target>
- -->
+<?xml version="1.0" encoding="utf-8"?>
+<Project ToolsVersion="4.0" DefaultTargets="Build" xmlns="http://schemas.microsoft.com/developer/msbuild/2003">
+ <PropertyGroup>
+ <Configuration Condition=" '$(Configuration)' == '' ">Debug</Configuration>
+ <Platform Condition=" '$(Platform)' == '' ">AnyCPU</Platform>
+ <ProductVersion>8.0.30703</ProductVersion>
+ <SchemaVersion>2.0</SchemaVersion>
+ <ProjectGuid>{AF29C330-49BD-4648-B692-882E922C435B}</ProjectGuid>
+ <OutputType>Library</OutputType>
+ <AppDesignerFolder>Properties</AppDesignerFolder>
+ <RootNamespace>Kafka.Client.IntegrationTests</RootNamespace>
+ <AssemblyName>Kafka.Client.IntegrationTests</AssemblyName>
+ <TargetFrameworkVersion>v4.0</TargetFrameworkVersion>
+ <FileAlignment>512</FileAlignment>
+ <CodeContractsAssemblyMode>0</CodeContractsAssemblyMode>
+ </PropertyGroup>
+ <PropertyGroup Condition=" '$(Configuration)|$(Platform)' == 'Debug|AnyCPU' ">
+ <DebugSymbols>true</DebugSymbols>
+ <DebugType>full</DebugType>
+ <Optimize>false</Optimize>
+ <OutputPath>bin\Debug\</OutputPath>
+ <DefineConstants>DEBUG;TRACE</DefineConstants>
+ <ErrorReport>prompt</ErrorReport>
+ <WarningLevel>4</WarningLevel>
+ <StyleCopTreatErrorsAsWarnings>true</StyleCopTreatErrorsAsWarnings>
+ <CodeContractsEnableRuntimeChecking>False</CodeContractsEnableRuntimeChecking>
+ <CodeContractsRuntimeOnlyPublicSurface>False</CodeContractsRuntimeOnlyPublicSurface>
+ <CodeContractsRuntimeThrowOnFailure>True</CodeContractsRuntimeThrowOnFailure>
+ <CodeContractsRuntimeCallSiteRequires>False</CodeContractsRuntimeCallSiteRequires>
+ <CodeContractsRuntimeSkipQuantifiers>False</CodeContractsRuntimeSkipQuantifiers>
+ <CodeContractsRunCodeAnalysis>False</CodeContractsRunCodeAnalysis>
+ <CodeContractsNonNullObligations>False</CodeContractsNonNullObligations>
+ <CodeContractsBoundsObligations>False</CodeContractsBoundsObligations>
+ <CodeContractsArithmeticObligations>False</CodeContractsArithmeticObligations>
+ <CodeContractsEnumObligations>False</CodeContractsEnumObligations>
+ <CodeContractsRedundantAssumptions>False</CodeContractsRedundantAssumptions>
+ <CodeContractsRunInBackground>True</CodeContractsRunInBackground>
+ <CodeContractsShowSquigglies>False</CodeContractsShowSquigglies>
+ <CodeContractsUseBaseLine>False</CodeContractsUseBaseLine>
+ <CodeContractsEmitXMLDocs>False</CodeContractsEmitXMLDocs>
+ <CodeContractsCustomRewriterAssembly />
+ <CodeContractsCustomRewriterClass />
+ <CodeContractsLibPaths />
+ <CodeContractsExtraRewriteOptions />
+ <CodeContractsExtraAnalysisOptions />
+ <CodeContractsBaseLineFile />
+ <CodeContractsCacheAnalysisResults>False</CodeContractsCacheAnalysisResults>
+ <CodeContractsRuntimeCheckingLevel>Full</CodeContractsRuntimeCheckingLevel>
+ <CodeContractsReferenceAssembly>%28none%29</CodeContractsReferenceAssembly>
+ <CodeContractsAnalysisWarningLevel>0</CodeContractsAnalysisWarningLevel>
+ </PropertyGroup>
+ <PropertyGroup Condition=" '$(Configuration)|$(Platform)' == 'Release|AnyCPU' ">
+ <DebugType>pdbonly</DebugType>
+ <Optimize>true</Optimize>
+ <OutputPath>bin\Release\</OutputPath>
+ <DefineConstants>TRACE</DefineConstants>
+ <ErrorReport>prompt</ErrorReport>
+ <WarningLevel>4</WarningLevel>
+ <StyleCopTreatErrorsAsWarnings>false</StyleCopTreatErrorsAsWarnings>
+ </PropertyGroup>
+ <PropertyGroup Condition=" '$(Configuration)|$(Platform)' == 'Integration|AnyCPU' ">
+ <DebugSymbols>true</DebugSymbols>
+ <DebugType>full</DebugType>
+ <Optimize>false</Optimize>
+ <OutputPath>bin\Integration\</OutputPath>
+ <DefineConstants>DEBUG;TRACE</DefineConstants>
+ <ErrorReport>prompt</ErrorReport>
+ <WarningLevel>4</WarningLevel>
+ <StyleCopTreatErrorsAsWarnings>false</StyleCopTreatErrorsAsWarnings>
+ </PropertyGroup>
+ <PropertyGroup>
+ <StartupObject />
+ </PropertyGroup>
+ <ItemGroup>
+ <Reference Include="log4net">
+ <HintPath>..\..\..\..\lib\log4Net\log4net.dll</HintPath>
+ </Reference>
+ <Reference Include="nunit.framework, Version=2.5.9.10348, Culture=neutral, PublicKeyToken=96d09a1eb7f44a77, processorArchitecture=MSIL">
+ <SpecificVersion>False</SpecificVersion>
+ <HintPath>..\..\..\..\lib\nunit\2.5.9\nunit.framework.dll</HintPath>
+ </Reference>
+ <Reference Include="System" />
+ <Reference Include="System.Configuration" />
+ <Reference Include="System.Core" />
+ <Reference Include="Microsoft.CSharp" />
+ <Reference Include="ZooKeeperNet">
+ <HintPath>..\..\..\..\lib\zookeeper\ZooKeeperNet.dll</HintPath>
+ </Reference>
+ </ItemGroup>
+ <ItemGroup>
+ <Compile Include="ConsumerRebalancingTests.cs" />
+ <Compile Include="ConsumerTests.cs" />
+ <Compile Include="IntegrationFixtureBase.cs" />
+ <Compile Include="KafkaIntegrationTest.cs" />
+ <Compile Include="MockAlwaysZeroPartitioner.cs" />
+ <Compile Include="ProducerTests.cs" />
+ <Compile Include="Properties\AssemblyInfo.cs" />
+ <Compile Include="TestHelper.cs" />
+ <Compile Include="TestsSetup.cs" />
+ <Compile Include="TestMultipleBrokersHelper.cs" />
+ <Compile Include="ZKBrokerPartitionInfoTests.cs" />
+ <Compile Include="ZooKeeperAwareProducerTests.cs" />
+ <Compile Include="ZooKeeperClientTests.cs" />
+ <Compile Include="ZooKeeperConnectionTests.cs" />
+ </ItemGroup>
+ <ItemGroup>
+ <ProjectReference Include="..\..\Kafka.Client\Kafka.Client.csproj">
+ <Project>{A92DD03B-EE4F-4A78-9FB2-279B6348C7D2}</Project>
+ <Name>Kafka.Client</Name>
+ </ProjectReference>
+ </ItemGroup>
+ <ItemGroup>
+ <None Include="App.config">
+ <SubType>Designer</SubType>
+ </None>
+ <None Include="Config\Debug\App.config">
+ <SubType>Designer</SubType>
+ </None>
+ <None Include="Log4Net.config">
+ <CopyToOutputDirectory>Always</CopyToOutputDirectory>
+ </None>
+ <None Include="Config\Integration\App.config">
+ <SubType>Designer</SubType>
+ </None>
+ </ItemGroup>
+ <Import Project="$(MSBuildToolsPath)\Microsoft.CSharp.targets" />
+ <Import Project="..\..\..\..\lib\StyleCop\Microsoft.StyleCop.Targets" />
+ <PropertyGroup>
+ <PostBuildEvent>
+ </PostBuildEvent>
+ </PropertyGroup>
+ <Target Name="BeforeBuild">
+ <Copy SourceFiles="Config\$(Configuration)\App.config" DestinationFiles="App.config" />
+ </Target>
+ <Target Name="AfterBuild">
+ </Target>
</Project>
\ No newline at end of file
Modified: incubator/kafka/trunk/clients/csharp/src/Kafka/Tests/Kafka.Client.IntegrationTests/KafkaIntegrationTest.cs
URL: http://svn.apache.org/viewvc/incubator/kafka/trunk/clients/csharp/src/Kafka/Tests/Kafka.Client.IntegrationTests/KafkaIntegrationTest.cs?rev=1173797&r1=1173796&r2=1173797&view=diff
==============================================================================
--- incubator/kafka/trunk/clients/csharp/src/Kafka/Tests/Kafka.Client.IntegrationTests/KafkaIntegrationTest.cs (original)
+++ incubator/kafka/trunk/clients/csharp/src/Kafka/Tests/Kafka.Client.IntegrationTests/KafkaIntegrationTest.cs Wed Sep 21 19:17:19 2011
@@ -1,181 +1,508 @@
-using System;
-using System.Collections.Generic;
-using System.Text;
-using System.Threading;
-using Kafka.Client.Request;
-using NUnit.Framework;
-
-namespace Kafka.Client.Tests
-{
- /// <summary>
- /// Contains tests that go all the way to Kafka and back.
- /// </summary>
- [TestFixture]
- [Ignore("Requires a Kafka server running to execute")]
- public class KafkaIntegrationTest
- {
- /// <summary>
- /// Kafka server to test against.
- /// </summary>
- private static readonly string KafkaServer = "192.168.50.203";
-
- /// <summary>
- /// Port of the Kafka server to test against.
- /// </summary>
- private static readonly int KafkaPort = 9092;
-
- /// <summary>
- /// Sends a pair of message to Kafka.
- /// </summary>
- [Test]
- public void ProducerSendsMessage()
- {
- string payload1 = "kafka 1.";
- byte[] payloadData1 = Encoding.UTF8.GetBytes(payload1);
- Message msg1 = new Message(payloadData1);
-
- string payload2 = "kafka 2.";
- byte[] payloadData2 = Encoding.UTF8.GetBytes(payload2);
- Message msg2 = new Message(payloadData2);
-
- Producer producer = new Producer(KafkaServer, KafkaPort);
- producer.Send("test", 0, new List<Message> { msg1, msg2 });
- }
-
- /// <summary>
- /// Asynchronously sends a pair of message to Kafka.
- /// </summary>
- [Test]
- public void ProducerSendsMessageAsynchronously()
- {
- bool waiting = true;
-
- List<Message> messages = GenerateRandomMessages(50);
-
- Producer producer = new Producer(KafkaServer, KafkaPort);
- producer.SendAsync(
- "test",
- 0,
- messages,
- (requestContext) => { waiting = false; });
-
- while (waiting)
- {
- Console.WriteLine("Keep going...");
- Thread.Sleep(10);
- }
- }
-
- /// <summary>
- /// Send a multi-produce request to Kafka.
- /// </summary>
- [Test]
- public void ProducerSendMultiRequest()
- {
- List<ProducerRequest> requests = new List<ProducerRequest>
- {
- new ProducerRequest("test", 0, new List<Message> { new Message(Encoding.UTF8.GetBytes("1: " + DateTime.UtcNow)) }),
- new ProducerRequest("test", 0, new List<Message> { new Message(Encoding.UTF8.GetBytes("2: " + DateTime.UtcNow)) }),
- new ProducerRequest("testa", 0, new List<Message> { new Message(Encoding.UTF8.GetBytes("3: " + DateTime.UtcNow)) }),
- new ProducerRequest("testa", 0, new List<Message> { new Message(Encoding.UTF8.GetBytes("4: " + DateTime.UtcNow)) })
- };
-
- MultiProducerRequest request = new MultiProducerRequest(requests);
- Producer producer = new Producer(KafkaServer, KafkaPort);
- producer.Send(request);
- }
-
- /// <summary>
- /// Generates messages for Kafka then gets them back.
- /// </summary>
- [Test]
- public void ConsumerFetchMessage()
- {
- ProducerSendsMessage();
-
- Consumer consumer = new Consumer(KafkaServer, KafkaPort);
- List<Message> messages = consumer.Consume("test", 0, 0);
-
- foreach (Message msg in messages)
- {
- Console.WriteLine(msg);
- }
- }
-
- /// <summary>
- /// Generates multiple messages for Kafka then gets them back.
- /// </summary>
- [Test]
- public void ConsumerMultiFetchGetsMessage()
- {
- ProducerSendMultiRequest();
-
- Consumer consumer = new Consumer(KafkaServer, KafkaPort);
- MultiFetchRequest request = new MultiFetchRequest(new List<FetchRequest>
- {
- new FetchRequest("test", 0, 0),
- new FetchRequest("test", 0, 0),
- new FetchRequest("testa", 0, 0)
- });
-
- List<List<Message>> messages = consumer.Consume(request);
-
- for (int ix = 0; ix < messages.Count; ix++)
- {
- List<Message> messageSet = messages[ix];
- Console.WriteLine(string.Format("Request #{0}-->", ix));
- foreach (Message msg in messageSet)
- {
- Console.WriteLine(msg);
- }
- }
- }
-
- /// <summary>
- /// Gets offsets from Kafka.
- /// </summary>
- [Test]
- public void ConsumerGetsOffsets()
- {
- OffsetRequest request = new OffsetRequest("test", 0, DateTime.Now.AddHours(-24).Ticks, 10);
-
- Consumer consumer = new Consumer(KafkaServer, KafkaPort);
- IList<long> list = consumer.GetOffsetsBefore(request);
-
- foreach (long l in list)
- {
- Console.Out.WriteLine(l);
- }
- }
-
- /// <summary>
- /// Gererates a randome list of messages.
- /// </summary>
- /// <param name="numberOfMessages">The number of messages to generate.</param>
- /// <returns>A list of random messages.</returns>
- private static List<Message> GenerateRandomMessages(int numberOfMessages)
- {
- List<Message> messages = new List<Message>();
- for (int ix = 0; ix < numberOfMessages; ix++)
- {
- messages.Add(new Message(GenerateRandomBytes(10000)));
- }
-
- return messages;
- }
-
- /// <summary>
- /// Generate a random set of bytes.
- /// </summary>
- /// <param name="length">Length of the byte array.</param>
- /// <returns>Random byte array.</returns>
- private static byte[] GenerateRandomBytes(int length)
- {
- byte[] randBytes = new byte[length];
- Random randNum = new Random();
- randNum.NextBytes(randBytes);
-
- return randBytes;
- }
- }
-}
+/*
+ * Copyright 2011 LinkedIn
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+*/
+
+namespace Kafka.Client.IntegrationTests
+{
+ using System;
+ using System.Collections.Generic;
+ using System.Linq;
+ using System.Text;
+ using System.Threading;
+ using Kafka.Client.Cfg;
+ using Kafka.Client.Consumers;
+ using Kafka.Client.Messages;
+ using Kafka.Client.Producers.Async;
+ using Kafka.Client.Producers.Sync;
+ using Kafka.Client.Requests;
+ using NUnit.Framework;
+
+ /// <summary>
+ /// Contains tests that go all the way to Kafka and back.
+ /// </summary>
+ [TestFixture]
+ public class KafkaIntegrationTest : IntegrationFixtureBase
+ {
+ /// <summary>
+ /// Kafka Client configuration
+ /// </summary>
+ private static KafkaClientConfiguration clientConfig;
+
+ /// <summary>
+ /// Maximum amount of time to wait trying to get a specific test message from Kafka server (in miliseconds)
+ /// </summary>
+ private static readonly int MaxTestWaitTimeInMiliseconds = 5000;
+
+ [TestFixtureSetUp]
+ public void SetUp()
+ {
+ clientConfig = KafkaClientConfiguration.GetConfiguration();
+ }
+
+ /// <summary>
+ /// Sends a pair of message to Kafka.
+ /// </summary>
+ [Test]
+ public void ProducerSendsMessage()
+ {
+ string payload1 = "kafka 1.";
+ byte[] payloadData1 = Encoding.UTF8.GetBytes(payload1);
+ Message msg1 = new Message(payloadData1);
+
+ string payload2 = "kafka 2.";
+ byte[] payloadData2 = Encoding.UTF8.GetBytes(payload2);
+ Message msg2 = new Message(payloadData2);
+
+ var config = new SyncProducerConfig(clientConfig);
+ var producer = new SyncProducer(config);
+ var producerRequest = new ProducerRequest(CurrentTestTopic, 0, new List<Message>() { msg1, msg2 });
+ producer.Send(producerRequest);
+ }
+
+ /// <summary>
+ /// Sends a message with long topic to Kafka.
+ /// </summary>
+ [Test]
+ public void ProducerSendsMessageWithLongTopic()
+ {
+ Message msg = new Message(Encoding.UTF8.GetBytes("test message"));
+ string topic = "ThisIsAVeryLongTopicThisIsAVeryLongTopicThisIsAVeryLongTopicThisIsAVeryLongTopicThisIsAVeryLongTopicThisIsAVeryLongTopic";
+ var config = new SyncProducerConfig(clientConfig);
+ var producer = new SyncProducer(config);
+ var producerRequest = new ProducerRequest(topic, 0, new List<Message>() { msg });
+ producer.Send(producerRequest);
+ }
+
+ /// <summary>
+ /// Asynchronously sends many random messages to Kafka
+ /// </summary>
+ [Test]
+ public void AsyncProducerSendsManyLongRandomMessages()
+ {
+ List<Message> messages = GenerateRandomTextMessages(50);
+
+ var config = new AsyncProducerConfig(clientConfig);
+
+ var producer = new AsyncProducer(config);
+ producer.Send(CurrentTestTopic, 0, messages);
+ }
+
+ /// <summary>
+ /// Asynchronously sends few short fixed messages to Kafka
+ /// </summary>
+ [Test]
+ public void AsyncProducerSendsFewShortFixedMessages()
+ {
+ List<Message> messages = new List<Message>()
+ {
+ new Message(Encoding.UTF8.GetBytes("Async Test Message 1")),
+ new Message(Encoding.UTF8.GetBytes("Async Test Message 2")),
+ new Message(Encoding.UTF8.GetBytes("Async Test Message 3")),
+ new Message(Encoding.UTF8.GetBytes("Async Test Message 4"))
+ };
+
+ var config = new AsyncProducerConfig(clientConfig);
+
+ var producer = new AsyncProducer(config);
+ producer.Send(CurrentTestTopic, 0, messages);
+ }
+
+ /// <summary>
+ /// Asynchronously sends few short fixed messages to Kafka in separate send actions
+ /// </summary>
+ [Test]
+ public void AsyncProducerSendsFewShortFixedMessagesInSeparateSendActions()
+ {
+ var config = new AsyncProducerConfig(clientConfig);
+ using (var producer = new AsyncProducer(config))
+ {
+ ProducerRequest req1 = new ProducerRequest(
+ CurrentTestTopic,
+ 0,
+ new List<Message>() { new Message(Encoding.UTF8.GetBytes("Async Test Message 1")) });
+ producer.Send(req1);
+
+ ProducerRequest req2 = new ProducerRequest(
+ CurrentTestTopic,
+ 0,
+ new List<Message>() { new Message(Encoding.UTF8.GetBytes("Async Test Message 2")) });
+ producer.Send(req2);
+
+ ProducerRequest req3 = new ProducerRequest(
+ CurrentTestTopic,
+ 0,
+ new List<Message>() { new Message(Encoding.UTF8.GetBytes("Async Test Message 3")) });
+ producer.Send(req3);
+ }
+ }
+
+ [Test]
+ public void AsyncProducerSendsMessageWithCallbackClass()
+ {
+ List<Message> messages = new List<Message>()
+ {
+ new Message(Encoding.UTF8.GetBytes("Async Test Message 1")),
+ };
+ var config = new AsyncProducerConfig(clientConfig);
+ TestCallbackHandler myHandler = new TestCallbackHandler();
+ var producer = new AsyncProducer(config, myHandler);
+ producer.Send(CurrentTestTopic, 0, messages);
+ Thread.Sleep(1000);
+ Assert.IsTrue(myHandler.WasRun);
+ }
+
+ [Test]
+ public void AsyncProducerSendsMessageWithCallback()
+ {
+ List<Message> messages = new List<Message>()
+ {
+ new Message(Encoding.UTF8.GetBytes("Async Test Message 1")),
+ };
+ var config = new AsyncProducerConfig(clientConfig);
+ TestCallbackHandler myHandler = new TestCallbackHandler();
+ var producer = new AsyncProducer(config);
+ producer.Send(CurrentTestTopic, 0, messages, myHandler.Handle);
+ Thread.Sleep(1000);
+ Assert.IsTrue(myHandler.WasRun);
+ }
+
+ private class TestCallbackHandler : ICallbackHandler
+ {
+ public bool WasRun { get; private set; }
+
+ public void Handle(RequestContext<ProducerRequest> context)
+ {
+ WasRun = true;
+ }
+ }
+
+ /// <summary>
+ /// Send a multi-produce request to Kafka.
+ /// </summary>
+ [Test]
+ public void ProducerSendMultiRequest()
+ {
+ List<ProducerRequest> requests = new List<ProducerRequest>
+ {
+ new ProducerRequest(CurrentTestTopic, 0, new List<Message> { new Message(Encoding.UTF8.GetBytes("1: " + DateTime.UtcNow)) }),
+ new ProducerRequest(CurrentTestTopic, 0, new List<Message> { new Message(Encoding.UTF8.GetBytes("2: " + DateTime.UtcNow)) }),
+ new ProducerRequest(CurrentTestTopic, 0, new List<Message> { new Message(Encoding.UTF8.GetBytes("3: " + DateTime.UtcNow)) }),
+ new ProducerRequest(CurrentTestTopic, 0, new List<Message> { new Message(Encoding.UTF8.GetBytes("4: " + DateTime.UtcNow)) })
+ };
+
+ var config = new SyncProducerConfig(clientConfig);
+ var producer = new SyncProducer(config);
+ producer.MultiSend(requests);
+ }
+
+ /// <summary>
+ /// Generates messages for Kafka then gets them back.
+ /// </summary>
+ [Test]
+ public void ConsumerFetchMessage()
+ {
+ ProducerSendsMessage();
+
+ ConsumerConfig config = new ConsumerConfig(clientConfig);
+ IConsumer consumer = new Kafka.Client.Consumers.Consumer(config);
+ FetchRequest request = new FetchRequest(CurrentTestTopic, 0, 0);
+ BufferedMessageSet response = consumer.Fetch(request);
+ Assert.NotNull(response);
+ foreach (var message in response.Messages)
+ {
+ Console.WriteLine(message);
+ }
+ }
+
+ /// <summary>
+ /// Generates multiple messages for Kafka then gets them back.
+ /// </summary>
+ [Test]
+ public void ConsumerMultiFetchGetsMessage()
+ {
+ ProducerSendMultiRequest();
+
+ ConsumerConfig config = new ConsumerConfig(clientConfig);
+ IConsumer cons = new Consumers.Consumer(config);
+ MultiFetchRequest request = new MultiFetchRequest(new List<FetchRequest>
+ {
+ new FetchRequest(CurrentTestTopic, 0, 0),
+ new FetchRequest(CurrentTestTopic, 0, 0),
+ new FetchRequest(CurrentTestTopic + "2", 0, 0)
+ });
+
+ IList<BufferedMessageSet> response = cons.MultiFetch(request);
+ for (int ix = 0; ix < response.Count; ix++)
+ {
+ IEnumerable<Message> messageSet = response[ix].Messages;
+ Console.WriteLine(string.Format("Request #{0}-->", ix));
+ foreach (Message msg in messageSet)
+ {
+ Console.WriteLine(msg);
+ }
+ }
+ }
+
+ /// <summary>
+ /// Gets offsets from Kafka.
+ /// </summary>
+ [Test]
+ public void ConsumerGetsOffsets()
+ {
+ OffsetRequest request = new OffsetRequest(CurrentTestTopic, 0, DateTime.Now.AddHours(-24).Ticks, 10);
+
+ ConsumerConfig config = new ConsumerConfig(clientConfig);
+ IConsumer consumer = new Consumers.Consumer(config);
+ IList<long> list = consumer.GetOffsetsBefore(request);
+
+ foreach (long l in list)
+ {
+ Console.Out.WriteLine(l);
+ }
+ }
+
+ /// <summary>
+ /// Synchronous Producer sends a single simple message and then a consumer consumes it
+ /// </summary>
+ [Test]
+ public void ProducerSendsAndConsumerReceivesSingleSimpleMessage()
+ {
+ Message sourceMessage = new Message(Encoding.UTF8.GetBytes("test message"));
+
+ var config = new SyncProducerConfig(clientConfig);
+ var producer = new SyncProducer(config);
+ var producerRequest = new ProducerRequest(CurrentTestTopic, 0, new List<Message>() { sourceMessage });
+
+ long currentOffset = TestHelper.GetCurrentKafkaOffset(CurrentTestTopic, clientConfig);
+
+ producer.Send(producerRequest);
+
+ ConsumerConfig consumerConfig = new ConsumerConfig(clientConfig);
+ IConsumer consumer = new Consumers.Consumer(consumerConfig);
+ FetchRequest request = new FetchRequest(CurrentTestTopic, 0, currentOffset);
+
+ BufferedMessageSet response;
+ int totalWaitTimeInMiliseconds = 0;
+ int waitSingle = 100;
+ while (true)
+ {
+ Thread.Sleep(waitSingle);
+ response = consumer.Fetch(request);
+ if (response != null && response.Messages.Count() > 0)
+ {
+ break;
+ }
+ else
+ {
+ totalWaitTimeInMiliseconds += waitSingle;
+ if (totalWaitTimeInMiliseconds >= MaxTestWaitTimeInMiliseconds)
+ {
+ break;
+ }
+ }
+ }
+
+ Assert.NotNull(response);
+ Assert.AreEqual(1, response.Messages.Count());
+ Message resultMessage = response.Messages.First();
+ Assert.AreEqual(sourceMessage.ToString(), resultMessage.ToString());
+ }
+
+ /// <summary>
+ /// Asynchronous Producer sends a single simple message and then a consumer consumes it
+ /// </summary>
+ [Test]
+ public void AsyncProducerSendsAndConsumerReceivesSingleSimpleMessage()
+ {
+ Message sourceMessage = new Message(Encoding.UTF8.GetBytes("test message"));
+
+ var config = new AsyncProducerConfig(clientConfig);
+ var producer = new AsyncProducer(config);
+ var producerRequest = new ProducerRequest(CurrentTestTopic, 0, new List<Message>() { sourceMessage });
+
+ long currentOffset = TestHelper.GetCurrentKafkaOffset(CurrentTestTopic, clientConfig);
+
+ producer.Send(producerRequest);
+
+ ConsumerConfig consumerConfig = new ConsumerConfig(clientConfig);
+ IConsumer consumer = new Consumers.Consumer(consumerConfig);
+ FetchRequest request = new FetchRequest(CurrentTestTopic, 0, currentOffset);
+
+ BufferedMessageSet response;
+ int totalWaitTimeInMiliseconds = 0;
+ int waitSingle = 100;
+ while (true)
+ {
+ Thread.Sleep(waitSingle);
+ response = consumer.Fetch(request);
+ if (response != null && response.Messages.Count() > 0)
+ {
+ break;
+ }
+ else
+ {
+ totalWaitTimeInMiliseconds += waitSingle;
+ if (totalWaitTimeInMiliseconds >= MaxTestWaitTimeInMiliseconds)
+ {
+ break;
+ }
+ }
+ }
+
+ Assert.NotNull(response);
+ Assert.AreEqual(1, response.Messages.Count());
+ Message resultMessage = response.Messages.First();
+ Assert.AreEqual(sourceMessage.ToString(), resultMessage.ToString());
+ }
+
+ /// <summary>
+ /// Synchronous producer sends a multi request and a consumer receives it from to Kafka.
+ /// </summary>
+ [Test]
+ public void ProducerSendsAndConsumerReceivesMultiRequest()
+ {
+ string testTopic1 = CurrentTestTopic + "1";
+ string testTopic2 = CurrentTestTopic + "2";
+ string testTopic3 = CurrentTestTopic + "3";
+
+ Message sourceMessage1 = new Message(Encoding.UTF8.GetBytes("1: TestMessage"));
+ Message sourceMessage2 = new Message(Encoding.UTF8.GetBytes("2: TestMessage"));
+ Message sourceMessage3 = new Message(Encoding.UTF8.GetBytes("3: TestMessage"));
+ Message sourceMessage4 = new Message(Encoding.UTF8.GetBytes("4: TestMessage"));
+
+ List<ProducerRequest> requests = new List<ProducerRequest>
+ {
+ new ProducerRequest(testTopic1, 0, new List<Message> { sourceMessage1 }),
+ new ProducerRequest(testTopic1, 0, new List<Message> { sourceMessage2 }),
+ new ProducerRequest(testTopic2, 0, new List<Message> { sourceMessage3 }),
+ new ProducerRequest(testTopic3, 0, new List<Message> { sourceMessage4 })
+ };
+
+ var config = new SyncProducerConfig(clientConfig);
+ var producer = new SyncProducer(config);
+
+ long currentOffset1 = TestHelper.GetCurrentKafkaOffset(testTopic1, clientConfig);
+ long currentOffset2 = TestHelper.GetCurrentKafkaOffset(testTopic2, clientConfig);
+ long currentOffset3 = TestHelper.GetCurrentKafkaOffset(testTopic3, clientConfig);
+
+ producer.MultiSend(requests);
+
+ ConsumerConfig consumerConfig = new ConsumerConfig(clientConfig);
+ IConsumer consumer = new Consumers.Consumer(consumerConfig);
+ MultiFetchRequest request = new MultiFetchRequest(new List<FetchRequest>
+ {
+ new FetchRequest(testTopic1, 0, currentOffset1),
+ new FetchRequest(testTopic2, 0, currentOffset2),
+ new FetchRequest(testTopic3, 0, currentOffset3)
+ });
+ IList<BufferedMessageSet> messageSets;
+ int totalWaitTimeInMiliseconds = 0;
+ int waitSingle = 100;
+ while (true)
+ {
+ Thread.Sleep(waitSingle);
+ messageSets = consumer.MultiFetch(request);
+ if (messageSets.Count > 2 && messageSets[0].Messages.Count() > 0 && messageSets[1].Messages.Count() > 0 && messageSets[2].Messages.Count() > 0)
+ {
+ break;
+ }
+ else
+ {
+ totalWaitTimeInMiliseconds += waitSingle;
+ if (totalWaitTimeInMiliseconds >= MaxTestWaitTimeInMiliseconds)
+ {
+ break;
+ }
+ }
+ }
+
+ Assert.AreEqual(3, messageSets.Count);
+ Assert.AreEqual(2, messageSets[0].Messages.Count());
+ Assert.AreEqual(1, messageSets[1].Messages.Count());
+ Assert.AreEqual(1, messageSets[2].Messages.Count());
+ Assert.AreEqual(sourceMessage1.ToString(), messageSets[0].Messages.First().ToString());
+ Assert.AreEqual(sourceMessage2.ToString(), messageSets[0].Messages.Skip(1).First().ToString());
+ Assert.AreEqual(sourceMessage3.ToString(), messageSets[1].Messages.First().ToString());
+ Assert.AreEqual(sourceMessage4.ToString(), messageSets[2].Messages.First().ToString());
+ }
+
+ /// <summary>
+ /// Gererates a randome list of messages.
+ /// </summary>
+ /// <param name="numberOfMessages">The number of messages to generate.</param>
+ /// <returns>A list of random messages.</returns>
+ private static List<Message> GenerateRandomMessages(int numberOfMessages)
+ {
+ List<Message> messages = new List<Message>();
+ for (int ix = 0; ix < numberOfMessages; ix++)
+ {
+ messages.Add(new Message(GenerateRandomBytes(10000)));
+ }
+
+ return messages;
+ }
+
+ /// <summary>
+ /// Generate a random set of bytes.
+ /// </summary>
+ /// <param name="length">Length of the byte array.</param>
+ /// <returns>Random byte array.</returns>
+ private static byte[] GenerateRandomBytes(int length)
+ {
+ byte[] randBytes = new byte[length];
+ Random randNum = new Random();
+ randNum.NextBytes(randBytes);
+
+ return randBytes;
+ }
+
+ /// <summary>
+ /// Gererates a randome list of text messages.
+ /// </summary>
+ /// <param name="numberOfMessages">The number of messages to generate.</param>
+ /// <returns>A list of random text messages.</returns>
+ private static List<Message> GenerateRandomTextMessages(int numberOfMessages)
+ {
+ List<Message> messages = new List<Message>();
+ for (int ix = 0; ix < numberOfMessages; ix++)
+ {
+ ////messages.Add(new Message(GenerateRandomBytes(10000)));
+ messages.Add(new Message(Encoding.UTF8.GetBytes(GenerateRandomMessage(10000))));
+ }
+
+ return messages;
+ }
+
+ /// <summary>
+ /// Generate a random message text.
+ /// </summary>
+ /// <param name="length">Length of the message string.</param>
+ /// <returns>Random message string.</returns>
+ private static string GenerateRandomMessage(int length)
+ {
+ StringBuilder builder = new StringBuilder();
+ Random random = new Random();
+ char ch;
+ for (int i = 0; i < length; i++)
+ {
+ ch = Convert.ToChar(Convert.ToInt32(
+ Math.Floor((26 * random.NextDouble()) + 65)));
+ builder.Append(ch);
+ }
+
+ return builder.ToString();
+ }
+ }
+}
Added: incubator/kafka/trunk/clients/csharp/src/Kafka/Tests/Kafka.Client.IntegrationTests/Log4Net.config
URL: http://svn.apache.org/viewvc/incubator/kafka/trunk/clients/csharp/src/Kafka/Tests/Kafka.Client.IntegrationTests/Log4Net.config?rev=1173797&view=auto
==============================================================================
--- incubator/kafka/trunk/clients/csharp/src/Kafka/Tests/Kafka.Client.IntegrationTests/Log4Net.config (added)
+++ incubator/kafka/trunk/clients/csharp/src/Kafka/Tests/Kafka.Client.IntegrationTests/Log4Net.config Wed Sep 21 19:17:19 2011
@@ -0,0 +1,39 @@
+<?xml version="1.0" encoding="utf-8" ?>
+<log4net>
+ <root>
+ <level value="ALL" />
+ <!--<appender-ref ref="ConsoleAppender" />-->
+ <appender-ref ref="KafkaFileAppender" />
+ <appender-ref ref="ZookeeperFileAppender" />
+ </root>
+ <!--<appender name="ConsoleAppender" type="log4net.Appender.ConsoleAppender">
+ <layout type="log4net.Layout.PatternLayout">
+ <conversionPattern value="%-5level - %message - %logger%newline" />
+ </layout>
+ </appender>-->
+ <appender name="KafkaFileAppender" type="log4net.Appender.FileAppender">
+ <filter type="log4net.Filter.LoggerMatchFilter">
+ <LoggerToMatch value="Kafka.Client."/>
+ </filter>
+ <filter type="log4net.Filter.DenyAllFilter" />
+ <file value="kafka-logs.txt" />
+ <appendToFile value="false" />
+ <layout type="log4net.Layout.PatternLayout">
+ <conversionPattern value="[%-5level] - [%logger] - %message%newline" />
+ </layout>
+ </appender>
+ <appender name="ZookeeperFileAppender" type="log4net.Appender.FileAppender">
+ <filter type="log4net.Filter.LoggerMatchFilter">
+ <LoggerToMatch value="ZooKeeperNet."/>
+ </filter>
+ <filter type="log4net.Filter.LoggerMatchFilter">
+ <LoggerToMatch value="Org.Apache.Zookeeper.Data."/>
+ </filter>
+ <filter type="log4net.Filter.DenyAllFilter" />
+ <file value="zookeeper-logs.txt" />
+ <appendToFile value="false" />
+ <layout type="log4net.Layout.PatternLayout">
+ <conversionPattern value="[%-5level] - [%logger] - %message%newline" />
+ </layout>
+ </appender>
+</log4net>
\ No newline at end of file
Added: incubator/kafka/trunk/clients/csharp/src/Kafka/Tests/Kafka.Client.IntegrationTests/MockAlwaysZeroPartitioner.cs
URL: http://svn.apache.org/viewvc/incubator/kafka/trunk/clients/csharp/src/Kafka/Tests/Kafka.Client.IntegrationTests/MockAlwaysZeroPartitioner.cs?rev=1173797&view=auto
==============================================================================
--- incubator/kafka/trunk/clients/csharp/src/Kafka/Tests/Kafka.Client.IntegrationTests/MockAlwaysZeroPartitioner.cs (added)
+++ incubator/kafka/trunk/clients/csharp/src/Kafka/Tests/Kafka.Client.IntegrationTests/MockAlwaysZeroPartitioner.cs Wed Sep 21 19:17:19 2011
@@ -0,0 +1,19 @@
+using System;
+using System.Collections.Generic;
+using System.Linq;
+using System.Text;
+using Kafka.Client.Producers.Partitioning;
+
+namespace Kafka.Client.IntegrationTests
+{
+ /// <summary>
+ /// This mock partitioner will always point to the first partition (the one of index = 0)
+ /// </summary>
+ public class MockAlwaysZeroPartitioner : IPartitioner<string>
+ {
+ public int Partition(string key, int numPartitions)
+ {
+ return 0;
+ }
+ }
+}
Added: incubator/kafka/trunk/clients/csharp/src/Kafka/Tests/Kafka.Client.IntegrationTests/ProducerTests.cs
URL: http://svn.apache.org/viewvc/incubator/kafka/trunk/clients/csharp/src/Kafka/Tests/Kafka.Client.IntegrationTests/ProducerTests.cs?rev=1173797&view=auto
==============================================================================
--- incubator/kafka/trunk/clients/csharp/src/Kafka/Tests/Kafka.Client.IntegrationTests/ProducerTests.cs (added)
+++ incubator/kafka/trunk/clients/csharp/src/Kafka/Tests/Kafka.Client.IntegrationTests/ProducerTests.cs Wed Sep 21 19:17:19 2011
@@ -0,0 +1,244 @@
+/*
+ * Copyright 2011 LinkedIn
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+*/
+
+namespace Kafka.Client.IntegrationTests
+{
+ using System.Collections.Generic;
+ using System.Linq;
+ using System.Text;
+ using System.Threading;
+ using Kafka.Client.Cfg;
+ using Kafka.Client.Consumers;
+ using Kafka.Client.Messages;
+ using Kafka.Client.Producers;
+ using Kafka.Client.Requests;
+ using Kafka.Client.Serialization;
+ using NUnit.Framework;
+
+ [TestFixture]
+ public class ProducerTests : IntegrationFixtureBase
+ {
+ /// <summary>
+ /// Kafka Client configuration
+ /// </summary>
+ private KafkaClientConfiguration clientConfig;
+
+ /// <summary>
+ /// Maximum amount of time to wait trying to get a specific test message from Kafka server (in miliseconds)
+ /// </summary>
+ private readonly int maxTestWaitTimeInMiliseconds = 5000;
+
+ [TestFixtureSetUp]
+ public void SetUp()
+ {
+ clientConfig = KafkaClientConfiguration.GetConfiguration();
+ clientConfig.SupressZooKeeper();
+ }
+
+ [Test]
+ public void ProducerSends1Message()
+ {
+ int totalWaitTimeInMiliseconds = 0;
+ int waitSingle = 100;
+ var originalMessage = new Message(Encoding.UTF8.GetBytes("TestData"));
+
+ var multipleBrokersHelper = new TestMultipleBrokersHelper(CurrentTestTopic);
+ multipleBrokersHelper.GetCurrentOffsets();
+
+ var producerConfig = new ProducerConfig(clientConfig);
+ var mockPartitioner = new MockAlwaysZeroPartitioner();
+ using (var producer = new Producer<string, Message>(producerConfig, mockPartitioner, new DefaultEncoder(), null))
+ {
+ var producerData = new ProducerData<string, Message>(
+ CurrentTestTopic, "somekey", new List<Message> { originalMessage });
+ producer.Send(producerData);
+ Thread.Sleep(waitSingle);
+
+ while (!multipleBrokersHelper.CheckIfAnyBrokerHasChanged())
+ {
+ totalWaitTimeInMiliseconds += waitSingle;
+ Thread.Sleep(waitSingle);
+ if (totalWaitTimeInMiliseconds > this.maxTestWaitTimeInMiliseconds)
+ {
+ Assert.Fail("None of the brokers changed their offset after sending a message");
+ }
+ }
+
+ totalWaitTimeInMiliseconds = 0;
+
+ var consumerConfig = new ConsumerConfig(clientConfig)
+ {
+ Host = multipleBrokersHelper.BrokerThatHasChanged.Address,
+ Port = multipleBrokersHelper.BrokerThatHasChanged.Port
+ };
+ IConsumer consumer = new Consumers.Consumer(consumerConfig);
+ var request = new FetchRequest(CurrentTestTopic, 0, multipleBrokersHelper.OffsetFromBeforeTheChange);
+
+ BufferedMessageSet response;
+
+ while (true)
+ {
+ Thread.Sleep(waitSingle);
+ response = consumer.Fetch(request);
+ if (response != null && response.Messages.Count() > 0)
+ {
+ break;
+ }
+
+ totalWaitTimeInMiliseconds += waitSingle;
+ if (totalWaitTimeInMiliseconds >= this.maxTestWaitTimeInMiliseconds)
+ {
+ break;
+ }
+ }
+
+ Assert.NotNull(response);
+ Assert.AreEqual(1, response.Messages.Count());
+ Assert.AreEqual(originalMessage.ToString(), response.Messages.First().ToString());
+ }
+ }
+
+ [Test]
+ public void ProducerSends3Messages()
+ {
+ int totalWaitTimeInMiliseconds = 0;
+ int waitSingle = 100;
+ var originalMessage1 = new Message(Encoding.UTF8.GetBytes("TestData1"));
+ var originalMessage2 = new Message(Encoding.UTF8.GetBytes("TestData2"));
+ var originalMessage3 = new Message(Encoding.UTF8.GetBytes("TestData3"));
+ var originalMessageList =
+ new List<Message> { originalMessage1, originalMessage2, originalMessage3 };
+
+ var multipleBrokersHelper = new TestMultipleBrokersHelper(CurrentTestTopic);
+ multipleBrokersHelper.GetCurrentOffsets();
+
+ var producerConfig = new ProducerConfig(clientConfig);
+ var mockPartitioner = new MockAlwaysZeroPartitioner();
+ using (var producer = new Producer<string, Message>(producerConfig, mockPartitioner, new DefaultEncoder(), null))
+ {
+ var producerData = new ProducerData<string, Message>(CurrentTestTopic, "somekey", originalMessageList);
+ producer.Send(producerData);
+ Thread.Sleep(waitSingle);
+
+ while (!multipleBrokersHelper.CheckIfAnyBrokerHasChanged())
+ {
+ totalWaitTimeInMiliseconds += waitSingle;
+ Thread.Sleep(waitSingle);
+ if (totalWaitTimeInMiliseconds > this.maxTestWaitTimeInMiliseconds)
+ {
+ Assert.Fail("None of the brokers changed their offset after sending a message");
+ }
+ }
+
+ totalWaitTimeInMiliseconds = 0;
+
+ var consumerConfig = new ConsumerConfig(clientConfig)
+ {
+ Host = multipleBrokersHelper.BrokerThatHasChanged.Address,
+ Port = multipleBrokersHelper.BrokerThatHasChanged.Port
+ };
+ IConsumer consumer = new Consumers.Consumer(consumerConfig);
+ var request = new FetchRequest(CurrentTestTopic, 0, multipleBrokersHelper.OffsetFromBeforeTheChange);
+
+ BufferedMessageSet response;
+ while (true)
+ {
+ Thread.Sleep(waitSingle);
+ response = consumer.Fetch(request);
+ if (response != null && response.Messages.Count() > 2)
+ {
+ break;
+ }
+
+ totalWaitTimeInMiliseconds += waitSingle;
+ if (totalWaitTimeInMiliseconds >= this.maxTestWaitTimeInMiliseconds)
+ {
+ break;
+ }
+ }
+
+ Assert.NotNull(response);
+ Assert.AreEqual(3, response.Messages.Count());
+ Assert.AreEqual(originalMessage1.ToString(), response.Messages.First().ToString());
+ Assert.AreEqual(originalMessage2.ToString(), response.Messages.Skip(1).First().ToString());
+ Assert.AreEqual(originalMessage3.ToString(), response.Messages.Skip(2).First().ToString());
+ }
+ }
+
+ [Test]
+ public void ProducerSends1MessageUsingNotDefaultEncoder()
+ {
+ int totalWaitTimeInMiliseconds = 0;
+ int waitSingle = 100;
+ string originalMessage = "TestData";
+
+ var multipleBrokersHelper = new TestMultipleBrokersHelper(CurrentTestTopic);
+ multipleBrokersHelper.GetCurrentOffsets();
+
+ var producerConfig = new ProducerConfig(clientConfig);
+ var mockPartitioner = new MockAlwaysZeroPartitioner();
+ using (var producer = new Producer<string, string>(producerConfig, mockPartitioner, new StringEncoder(), null))
+ {
+ var producerData = new ProducerData<string, string>(
+ CurrentTestTopic, "somekey", new List<string> { originalMessage });
+
+ producer.Send(producerData);
+ Thread.Sleep(waitSingle);
+
+ while (!multipleBrokersHelper.CheckIfAnyBrokerHasChanged())
+ {
+ totalWaitTimeInMiliseconds += waitSingle;
+ Thread.Sleep(waitSingle);
+ if (totalWaitTimeInMiliseconds > this.maxTestWaitTimeInMiliseconds)
+ {
+ Assert.Fail("None of the brokers changed their offset after sending a message");
+ }
+ }
+
+ totalWaitTimeInMiliseconds = 0;
+
+ var consumerConfig = new ConsumerConfig(clientConfig)
+ {
+ Host = multipleBrokersHelper.BrokerThatHasChanged.Address,
+ Port = multipleBrokersHelper.BrokerThatHasChanged.Port
+ };
+ IConsumer consumer = new Consumers.Consumer(consumerConfig);
+ var request = new FetchRequest(CurrentTestTopic, 0, multipleBrokersHelper.OffsetFromBeforeTheChange);
+
+ BufferedMessageSet response;
+ while (true)
+ {
+ Thread.Sleep(waitSingle);
+ response = consumer.Fetch(request);
+ if (response != null && response.Messages.Count() > 0)
+ {
+ break;
+ }
+
+ totalWaitTimeInMiliseconds += waitSingle;
+ if (totalWaitTimeInMiliseconds >= this.maxTestWaitTimeInMiliseconds)
+ {
+ break;
+ }
+ }
+
+ Assert.NotNull(response);
+ Assert.AreEqual(1, response.Messages.Count());
+ Assert.AreEqual(originalMessage, Encoding.UTF8.GetString(response.Messages.First().Payload));
+ }
+ }
+ }
+}
Modified: incubator/kafka/trunk/clients/csharp/src/Kafka/Tests/Kafka.Client.IntegrationTests/Properties/AssemblyInfo.cs
URL: http://svn.apache.org/viewvc/incubator/kafka/trunk/clients/csharp/src/Kafka/Tests/Kafka.Client.IntegrationTests/Properties/AssemblyInfo.cs?rev=1173797&r1=1173796&r2=1173797&view=diff
==============================================================================
--- incubator/kafka/trunk/clients/csharp/src/Kafka/Tests/Kafka.Client.IntegrationTests/Properties/AssemblyInfo.cs (original)
+++ incubator/kafka/trunk/clients/csharp/src/Kafka/Tests/Kafka.Client.IntegrationTests/Properties/AssemblyInfo.cs Wed Sep 21 19:17:19 2011
@@ -1,36 +1,35 @@
-using System.Reflection;
-using System.Runtime.CompilerServices;
-using System.Runtime.InteropServices;
-
-// General Information about an assembly is controlled through the following
-// set of attributes. Change these attribute values to modify the information
-// associated with an assembly.
-[assembly: AssemblyTitle("Kafka.Client.IntegrationTests")]
-[assembly: AssemblyDescription("")]
-[assembly: AssemblyConfiguration("")]
-[assembly: AssemblyCompany("Microsoft")]
-[assembly: AssemblyProduct("Kafka.Client.IntegrationTests")]
-[assembly: AssemblyCopyright("Copyright © Microsoft 2011")]
-[assembly: AssemblyTrademark("")]
-[assembly: AssemblyCulture("")]
-
-// Setting ComVisible to false makes the types in this assembly not visible
-// to COM components. If you need to access a type in this assembly from
-// COM, set the ComVisible attribute to true on that type.
-[assembly: ComVisible(false)]
-
-// The following GUID is for the ID of the typelib if this project is exposed to COM
-[assembly: Guid("7b2387b7-6a58-4e8b-ae06-8aadf1a64949")]
-
-// Version information for an assembly consists of the following four values:
-//
-// Major Version
-// Minor Version
-// Build Number
-// Revision
-//
-// You can specify all the values or you can default the Build and Revision Numbers
-// by using the '*' as shown below:
-// [assembly: AssemblyVersion("1.0.*")]
-[assembly: AssemblyVersion("1.0.0.0")]
-[assembly: AssemblyFileVersion("1.0.0.0")]
+using System.Reflection;
+using System.Runtime.InteropServices;
+
+// General Information about an assembly is controlled through the following
+// set of attributes. Change these attribute values to modify the information
+// associated with an assembly.
+[assembly: AssemblyTitle("Kafka.Client.IntegrationTests")]
+[assembly: AssemblyDescription("")]
+[assembly: AssemblyConfiguration("")]
+[assembly: AssemblyCompany("Microsoft")]
+[assembly: AssemblyProduct("Kafka.Client.IntegrationTests")]
+[assembly: AssemblyCopyright("Copyright © Microsoft 2011")]
+[assembly: AssemblyTrademark("")]
+[assembly: AssemblyCulture("")]
+
+// Setting ComVisible to false makes the types in this assembly not visible
+// to COM components. If you need to access a type in this assembly from
+// COM, set the ComVisible attribute to true on that type.
+[assembly: ComVisible(false)]
+
+// The following GUID is for the ID of the typelib if this project is exposed to COM
+[assembly: Guid("7b2387b7-6a58-4e8b-ae06-8aadf1a64949")]
+
+// Version information for an assembly consists of the following four values:
+//
+// Major Version
+// Minor Version
+// Build Number
+// Revision
+//
+// You can specify all the values or you can default the Build and Revision Numbers
+// by using the '*' as shown below:
+// [assembly: AssemblyVersion("1.0.*")]
+[assembly: AssemblyVersion("1.0.0.0")]
+[assembly: AssemblyFileVersion("1.0.0.0")]