You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by ju...@apache.org on 2011/10/18 19:52:16 UTC
svn commit: r1185772 [3/4] - in
/incubator/kafka/trunk/clients/csharp/src/Kafka: Kafka.Client/
Kafka.Client/Cfg/ Kafka.Client/Consumers/ Kafka.Client/Exceptions/
Kafka.Client/Messages/ Kafka.Client/Producers/
Kafka.Client/Producers/Async/ Kafka.Client/...
Modified: incubator/kafka/trunk/clients/csharp/src/Kafka/Tests/Kafka.Client.IntegrationTests/ConsumerRebalancingTests.cs
URL: http://svn.apache.org/viewvc/incubator/kafka/trunk/clients/csharp/src/Kafka/Tests/Kafka.Client.IntegrationTests/ConsumerRebalancingTests.cs?rev=1185772&r1=1185771&r2=1185772&view=diff
==============================================================================
--- incubator/kafka/trunk/clients/csharp/src/Kafka/Tests/Kafka.Client.IntegrationTests/ConsumerRebalancingTests.cs (original)
+++ incubator/kafka/trunk/clients/csharp/src/Kafka/Tests/Kafka.Client.IntegrationTests/ConsumerRebalancingTests.cs Tue Oct 18 17:52:13 2011
@@ -19,8 +19,6 @@ namespace Kafka.Client.IntegrationTests
{
using System.Collections.Generic;
using System.Linq;
- using System.Threading;
- using Kafka.Client.Cfg;
using Kafka.Client.Cluster;
using Kafka.Client.Consumers;
using Kafka.Client.Utils;
@@ -30,24 +28,13 @@ namespace Kafka.Client.IntegrationTests
[TestFixture]
public class ConsumerRebalancingTests : IntegrationFixtureBase
{
- /// <summary>
- /// Kafka Client configuration
- /// </summary>
- private static KafkaClientConfiguration clientConfig;
-
- [TestFixtureSetUp]
- public void SetUp()
- {
- clientConfig = KafkaClientConfiguration.GetConfiguration();
- }
-
[Test]
public void ConsumerPorformsRebalancingOnStart()
{
- var config = new ConsumerConfig(clientConfig) { AutoCommit = false, GroupId = "group1" };
+ var config = this.ZooKeeperBasedConsumerConfig;
using (var consumerConnector = new ZookeeperConsumerConnector(config, true))
{
- ZooKeeperClient client = ReflectionHelper.GetInstanceField<ZooKeeperClient>("zkClient", consumerConnector);
+ var client = ReflectionHelper.GetInstanceField<ZooKeeperClient>("zkClient", consumerConnector);
Assert.IsNotNull(client);
client.DeleteRecursive("/consumers/group1");
var topicCount = new Dictionary<string, int> { { "test", 1 } };
@@ -71,7 +58,7 @@ namespace Kafka.Client.IntegrationTests
Assert.That(children, Is.Not.Null.And.Not.Empty);
Assert.That(children.Count, Is.EqualTo(2));
string partId = children[0];
- string data = client.ReadData<string>("/consumers/group1/owners/test/" + partId);
+ var data = client.ReadData<string>("/consumers/group1/owners/test/" + partId);
Assert.That(data, Is.Not.Null.And.Not.Empty);
Assert.That(data, Contains.Substring(consumerId));
data = client.ReadData<string>("/consumers/group1/ids/" + consumerId);
@@ -79,7 +66,7 @@ namespace Kafka.Client.IntegrationTests
Assert.That(data, Is.EqualTo("{ \"test\": 1 }"));
}
- using (var client = new ZooKeeperClient(config.ZkConnect, config.ZkSessionTimeoutMs, ZooKeeperStringSerializer.Serializer))
+ using (var client = new ZooKeeperClient(config.ZooKeeper.ZkConnect, config.ZooKeeper.ZkSessionTimeoutMs, ZooKeeperStringSerializer.Serializer))
{
client.Connect();
//// Should be created as ephemeral
@@ -94,13 +81,7 @@ namespace Kafka.Client.IntegrationTests
[Test]
public void ConsumerPorformsRebalancingWhenNewBrokerIsAddedToTopic()
{
- var config = new ConsumerConfig(clientConfig)
- {
- AutoCommit = false,
- GroupId = "group1",
- ZkSessionTimeoutMs = 60000,
- ZkConnectionTimeoutMs = 60000
- };
+ var config = this.ZooKeeperBasedConsumerConfig;
string brokerPath = ZooKeeperClient.DefaultBrokerIdsPath + "/" + 2345;
string brokerTopicPath = ZooKeeperClient.DefaultBrokerTopicsPath + "/test/" + 2345;
using (var consumerConnector = new ZookeeperConsumerConnector(config, true))
@@ -120,7 +101,7 @@ namespace Kafka.Client.IntegrationTests
children = client.GetChildren("/consumers/group1/owners/test", false);
Assert.That(children.Count, Is.EqualTo(3));
Assert.That(children, Contains.Item("2345-0"));
- string data = client.ReadData<string>("/consumers/group1/owners/test/2345-0");
+ var data = client.ReadData<string>("/consumers/group1/owners/test/2345-0");
Assert.That(data, Is.Not.Null);
Assert.That(data, Contains.Substring(consumerId));
var topicRegistry =
@@ -138,7 +119,7 @@ namespace Kafka.Client.IntegrationTests
[Test]
public void ConsumerPorformsRebalancingWhenBrokerIsRemovedFromTopic()
{
- var config = new ConsumerConfig(clientConfig) { AutoCommit = false, GroupId = "group1", ZkSessionTimeoutMs = 60000, ZkConnectionTimeoutMs = 60000 };
+ var config = this.ZooKeeperBasedConsumerConfig;
string brokerPath = ZooKeeperClient.DefaultBrokerIdsPath + "/" + 2345;
string brokerTopicPath = ZooKeeperClient.DefaultBrokerTopicsPath + "/test/" + 2345;
using (var consumerConnector = new ZookeeperConsumerConnector(config, true))
@@ -170,14 +151,7 @@ namespace Kafka.Client.IntegrationTests
[Test]
public void ConsumerPerformsRebalancingWhenNewConsumerIsAddedAndTheyDividePartitions()
{
- var config = new ConsumerConfig(clientConfig)
- {
- AutoCommit = false,
- GroupId = "group1",
- ZkSessionTimeoutMs = 60000,
- ZkConnectionTimeoutMs = 60000
- };
-
+ var config = this.ZooKeeperBasedConsumerConfig;
IList<string> ids;
IList<string> owners;
using (var consumerConnector = new ZookeeperConsumerConnector(config, true))
@@ -216,14 +190,8 @@ namespace Kafka.Client.IntegrationTests
[Test]
public void ConsumerPerformsRebalancingWhenConsumerIsRemovedAndTakesItsPartitions()
{
- var config = new ConsumerConfig(clientConfig)
- {
- AutoCommit = false,
- GroupId = "group1",
- ZkSessionTimeoutMs = 60000,
- ZkConnectionTimeoutMs = 60000
- };
-
+ var config = this.ZooKeeperBasedConsumerConfig;
+ string basePath = "/consumers/" + config.GroupId;
IList<string> ids;
IList<string> owners;
using (var consumerConnector = new ZookeeperConsumerConnector(config, true))
Modified: incubator/kafka/trunk/clients/csharp/src/Kafka/Tests/Kafka.Client.IntegrationTests/ConsumerTests.cs
URL: http://svn.apache.org/viewvc/incubator/kafka/trunk/clients/csharp/src/Kafka/Tests/Kafka.Client.IntegrationTests/ConsumerTests.cs?rev=1185772&r1=1185771&r2=1185772&view=diff
==============================================================================
--- incubator/kafka/trunk/clients/csharp/src/Kafka/Tests/Kafka.Client.IntegrationTests/ConsumerTests.cs (original)
+++ incubator/kafka/trunk/clients/csharp/src/Kafka/Tests/Kafka.Client.IntegrationTests/ConsumerTests.cs Tue Oct 18 17:52:13 2011
@@ -23,7 +23,6 @@ namespace Kafka.Client.IntegrationTests
using System.Reflection;
using System.Text;
using System.Threading;
- using Kafka.Client.Cfg;
using Kafka.Client.Consumers;
using Kafka.Client.Exceptions;
using Kafka.Client.Messages;
@@ -34,21 +33,10 @@ namespace Kafka.Client.IntegrationTests
[TestFixture]
public class ConsumerTests : IntegrationFixtureBase
{
- /// <summary>
- /// Kafka Client configuration
- /// </summary>
- private static KafkaClientConfiguration clientConfig;
-
- [TestFixtureSetUp]
- public void SetUp()
- {
- clientConfig = KafkaClientConfiguration.GetConfiguration();
- }
-
[Test]
public void ConsumerConnectorIsCreatedConnectsDisconnectsAndShutsDown()
{
- var config = new ConsumerConfig(clientConfig);
+ var config = this.ZooKeeperBasedConsumerConfig;
using (new ZookeeperConsumerConnector(config, true))
{
}
@@ -57,6 +45,9 @@ namespace Kafka.Client.IntegrationTests
[Test]
public void SimpleSyncProducerSends2MessagesAndConsumerConnectorGetsThemBack()
{
+ var prodConfig = this.SyncProducerConfig1;
+ var consumerConfig = this.ZooKeeperBasedConsumerConfig;
+
// first producing
string payload1 = "kafka 1.";
byte[] payloadData1 = Encoding.UTF8.GetBytes(payload1);
@@ -66,15 +57,15 @@ namespace Kafka.Client.IntegrationTests
byte[] payloadData2 = Encoding.UTF8.GetBytes(payload2);
var msg2 = new Message(payloadData2);
- var producerConfig = new SyncProducerConfig(clientConfig);
- var producer = new SyncProducer(producerConfig);
var producerRequest = new ProducerRequest(CurrentTestTopic, 0, new List<Message> { msg1, msg2 });
- producer.Send(producerRequest);
+ using (var producer = new SyncProducer(prodConfig))
+ {
+ producer.Send(producerRequest);
+ }
// now consuming
- var config = new ConsumerConfig(clientConfig) { AutoCommit = false };
var resultMessages = new List<Message>();
- using (IConsumerConnector consumerConnector = new ZookeeperConsumerConnector(config, true))
+ using (IConsumerConnector consumerConnector = new ZookeeperConsumerConnector(consumerConfig, true))
{
var topicCount = new Dictionary<string, int> { { CurrentTestTopic, 1 } };
var messages = consumerConnector.CreateMessageStreams(topicCount);
@@ -103,53 +94,57 @@ namespace Kafka.Client.IntegrationTests
[Test]
public void OneMessageIsSentAndReceivedThenExceptionsWhenNoMessageThenAnotherMessageIsSentAndReceived()
{
+ var prodConfig = this.SyncProducerConfig1;
+ var consumerConfig = this.ZooKeeperBasedConsumerConfig;
+
// first producing
string payload1 = "kafka 1.";
byte[] payloadData1 = Encoding.UTF8.GetBytes(payload1);
var msg1 = new Message(payloadData1);
-
- var producerConfig = new SyncProducerConfig(clientConfig);
- var producer = new SyncProducer(producerConfig);
- var producerRequest = new ProducerRequest(CurrentTestTopic, 0, new List<Message> { msg1 });
- producer.Send(producerRequest);
-
- // now consuming
- var config = new ConsumerConfig(clientConfig) { AutoCommit = false, Timeout = 5000 };
- using (IConsumerConnector consumerConnector = new ZookeeperConsumerConnector(config, true))
+ using (var producer = new SyncProducer(prodConfig))
{
- var topicCount = new Dictionary<string, int> { { CurrentTestTopic, 1 } };
- var messages = consumerConnector.CreateMessageStreams(topicCount);
- var sets = messages[CurrentTestTopic];
- KafkaMessageStream myStream = sets[0];
- var enumerator = myStream.GetEnumerator();
+ var producerRequest = new ProducerRequest(CurrentTestTopic, 0, new List<Message> { msg1 });
+ producer.Send(producerRequest);
- Assert.IsTrue(enumerator.MoveNext());
- Assert.AreEqual(msg1.ToString(), enumerator.Current.ToString());
+ // now consuming
+ using (IConsumerConnector consumerConnector = new ZookeeperConsumerConnector(consumerConfig, true))
+ {
+ var topicCount = new Dictionary<string, int> { { CurrentTestTopic, 1 } };
+ var messages = consumerConnector.CreateMessageStreams(topicCount);
+ var sets = messages[CurrentTestTopic];
+ KafkaMessageStream myStream = sets[0];
+ var enumerator = myStream.GetEnumerator();
- Assert.Throws<ConsumerTimeoutException>(() => enumerator.MoveNext());
+ Assert.IsTrue(enumerator.MoveNext());
+ Assert.AreEqual(msg1.ToString(), enumerator.Current.ToString());
- Assert.Throws<Exception>(() => enumerator.MoveNext()); // iterator is in failed state
+ Assert.Throws<ConsumerTimeoutException>(() => enumerator.MoveNext());
- enumerator.Reset();
+ Assert.Throws<IllegalStateException>(() => enumerator.MoveNext()); // iterator is in failed state
- // producing again
- string payload2 = "kafka 2.";
- byte[] payloadData2 = Encoding.UTF8.GetBytes(payload2);
- var msg2 = new Message(payloadData2);
+ enumerator.Reset();
- var producerRequest2 = new ProducerRequest(CurrentTestTopic, 0, new List<Message> { msg2 });
- producer.Send(producerRequest2);
+ // producing again
+ string payload2 = "kafka 2.";
+ byte[] payloadData2 = Encoding.UTF8.GetBytes(payload2);
+ var msg2 = new Message(payloadData2);
- Thread.Sleep(3000);
+ var producerRequest2 = new ProducerRequest(CurrentTestTopic, 0, new List<Message> { msg2 });
+ producer.Send(producerRequest2);
+ Thread.Sleep(3000);
- Assert.IsTrue(enumerator.MoveNext());
- Assert.AreEqual(msg2.ToString(), enumerator.Current.ToString());
+ Assert.IsTrue(enumerator.MoveNext());
+ Assert.AreEqual(msg2.ToString(), enumerator.Current.ToString());
+ }
}
}
[Test]
public void ConsumerConnectorConsumesTwoDifferentTopics()
{
+ var prodConfig = this.SyncProducerConfig1;
+ var consumerConfig = this.ZooKeeperBasedConsumerConfig;
+
string topic1 = CurrentTestTopic + "1";
string topic2 = CurrentTestTopic + "2";
@@ -162,18 +157,18 @@ namespace Kafka.Client.IntegrationTests
byte[] payloadData2 = Encoding.UTF8.GetBytes(payload2);
var msg2 = new Message(payloadData2);
- var producerConfig = new SyncProducerConfig(clientConfig);
- var producer = new SyncProducer(producerConfig);
- var producerRequest1 = new ProducerRequest(topic1, 0, new List<Message> { msg1 });
- producer.Send(producerRequest1);
- var producerRequest2 = new ProducerRequest(topic2, 0, new List<Message> { msg2 });
- producer.Send(producerRequest2);
+ using (var producer = new SyncProducer(prodConfig))
+ {
+ var producerRequest1 = new ProducerRequest(topic1, 0, new List<Message> { msg1 });
+ producer.Send(producerRequest1);
+ var producerRequest2 = new ProducerRequest(topic2, 0, new List<Message> { msg2 });
+ producer.Send(producerRequest2);
+ }
// now consuming
- var config = new ConsumerConfig(clientConfig) { AutoCommit = false };
var resultMessages1 = new List<Message>();
var resultMessages2 = new List<Message>();
- using (IConsumerConnector consumerConnector = new ZookeeperConsumerConnector(config, true))
+ using (IConsumerConnector consumerConnector = new ZookeeperConsumerConnector(consumerConfig, true))
{
var topicCount = new Dictionary<string, int> { { topic1, 1 }, { topic2, 1 } };
var messages = consumerConnector.CreateMessageStreams(topicCount);
@@ -224,9 +219,10 @@ namespace Kafka.Client.IntegrationTests
[Test]
public void ConsumerConnectorReceivesAShutdownSignal()
{
+ var consumerConfig = this.ZooKeeperBasedConsumerConfig;
+
// now consuming
- var config = new ConsumerConfig(clientConfig) { AutoCommit = false };
- using (IConsumerConnector consumerConnector = new ZookeeperConsumerConnector(config, true))
+ using (IConsumerConnector consumerConnector = new ZookeeperConsumerConnector(consumerConfig, true))
{
var topicCount = new Dictionary<string, int> { { CurrentTestTopic, 1 } };
var messages = consumerConnector.CreateMessageStreams(topicCount);
@@ -260,6 +256,9 @@ namespace Kafka.Client.IntegrationTests
[Test]
public void ProducersSendMessagesToDifferentPartitionsAndConsumerConnectorGetsThemBack()
{
+ var prodConfig = this.SyncProducerConfig1;
+ var consumerConfig = this.ZooKeeperBasedConsumerConfig;
+
// first producing
string payload1 = "kafka 1.";
byte[] payloadData1 = Encoding.UTF8.GetBytes(payload1);
@@ -269,23 +268,21 @@ namespace Kafka.Client.IntegrationTests
byte[] payloadData2 = Encoding.UTF8.GetBytes(payload2);
var msg2 = new Message(payloadData2);
- var producerConfig = new SyncProducerConfig(clientConfig);
- var producer = new SyncProducer(producerConfig);
- var producerRequest1 = new ProducerRequest(CurrentTestTopic, 0, new List<Message>() { msg1 });
- producer.Send(producerRequest1);
- var producerRequest2 = new ProducerRequest(CurrentTestTopic, 1, new List<Message>() { msg2 });
- producer.Send(producerRequest2);
+ using (var producer = new SyncProducer(prodConfig))
+ {
+ var producerRequest1 = new ProducerRequest(CurrentTestTopic, 0, new List<Message> { msg1 });
+ producer.Send(producerRequest1);
+ var producerRequest2 = new ProducerRequest(CurrentTestTopic, 1, new List<Message> { msg2 });
+ producer.Send(producerRequest2);
+ }
// now consuming
- var config = new ConsumerConfig(clientConfig) { AutoCommit = false };
var resultMessages = new List<Message>();
- using (IConsumerConnector consumerConnector = new ZookeeperConsumerConnector(config, true))
+ using (IConsumerConnector consumerConnector = new ZookeeperConsumerConnector(consumerConfig, true))
{
var topicCount = new Dictionary<string, int> { { CurrentTestTopic, 1 } };
var messages = consumerConnector.CreateMessageStreams(topicCount);
-
var sets = messages[CurrentTestTopic];
-
try
{
foreach (var set in sets)
Modified: incubator/kafka/trunk/clients/csharp/src/Kafka/Tests/Kafka.Client.IntegrationTests/IntegrationFixtureBase.cs
URL: http://svn.apache.org/viewvc/incubator/kafka/trunk/clients/csharp/src/Kafka/Tests/Kafka.Client.IntegrationTests/IntegrationFixtureBase.cs?rev=1185772&r1=1185771&r2=1185772&view=diff
==============================================================================
--- incubator/kafka/trunk/clients/csharp/src/Kafka/Tests/Kafka.Client.IntegrationTests/IntegrationFixtureBase.cs (original)
+++ incubator/kafka/trunk/clients/csharp/src/Kafka/Tests/Kafka.Client.IntegrationTests/IntegrationFixtureBase.cs Tue Oct 18 17:52:13 2011
@@ -19,6 +19,7 @@ namespace Kafka.Client.IntegrationTests
{
using System;
using System.Threading;
+ using Kafka.Client.Cfg;
using Kafka.Client.ZooKeeperIntegration;
using NUnit.Framework;
@@ -26,21 +27,121 @@ namespace Kafka.Client.IntegrationTests
{
protected string CurrentTestTopic { get; set; }
+ protected ProducerConfiguration ConfigBasedSyncProdConfig
+ {
+ get
+ {
+ return ProducerConfiguration.Configure(ProducerConfiguration.DefaultSectionName);
+ }
+ }
+
+ protected SyncProducerConfiguration SyncProducerConfig1
+ {
+ get
+ {
+ var prodConfig = this.ConfigBasedSyncProdConfig;
+ return new SyncProducerConfiguration(
+ prodConfig,
+ prodConfig.Brokers[0].BrokerId,
+ prodConfig.Brokers[0].Host,
+ prodConfig.Brokers[0].Port);
+ }
+ }
+
+ protected SyncProducerConfiguration SyncProducerConfig2
+ {
+ get
+ {
+ var prodConfig = this.ConfigBasedSyncProdConfig;
+ return new SyncProducerConfiguration(
+ prodConfig,
+ prodConfig.Brokers[1].BrokerId,
+ prodConfig.Brokers[1].Host,
+ prodConfig.Brokers[1].Port);
+ }
+ }
+
+ protected SyncProducerConfiguration SyncProducerConfig3
+ {
+ get
+ {
+ var prodConfig = this.ConfigBasedSyncProdConfig;
+ return new SyncProducerConfiguration(
+ prodConfig,
+ prodConfig.Brokers[2].BrokerId,
+ prodConfig.Brokers[2].Host,
+ prodConfig.Brokers[2].Port);
+ }
+ }
+
+ protected ProducerConfiguration ZooKeeperBasedSyncProdConfig
+ {
+ get
+ {
+ return ProducerConfiguration.Configure(ProducerConfiguration.DefaultSectionName + 2);
+ }
+ }
+
+ protected AsyncProducerConfiguration AsyncProducerConfig1
+ {
+ get
+ {
+ var asyncUberConfig = ProducerConfiguration.Configure(ProducerConfiguration.DefaultSectionName + 3);
+ return new AsyncProducerConfiguration(
+ asyncUberConfig,
+ asyncUberConfig.Brokers[0].BrokerId,
+ asyncUberConfig.Brokers[0].Host,
+ asyncUberConfig.Brokers[0].Port);
+ }
+ }
+
+ protected ConsumerConfiguration ConsumerConfig1
+ {
+ get
+ {
+ return ConsumerConfiguration.Configure(ConsumerConfiguration.DefaultSection + 1);
+ }
+ }
+
+ protected ConsumerConfiguration ConsumerConfig2
+ {
+ get
+ {
+ return ConsumerConfiguration.Configure(ConsumerConfiguration.DefaultSection + 2);
+ }
+ }
+
+ protected ConsumerConfiguration ConsumerConfig3
+ {
+ get
+ {
+ return ConsumerConfiguration.Configure(ConsumerConfiguration.DefaultSection + 3);
+ }
+ }
+
+ protected ConsumerConfiguration ZooKeeperBasedConsumerConfig
+ {
+ get
+ {
+ return ConsumerConfiguration.Configure(ConsumerConfiguration.DefaultSection + 4);
+ }
+ }
+
[SetUp]
public void SetupCurrentTestTopic()
{
- CurrentTestTopic = TestContext.CurrentContext.Test.Name + "_" + Guid.NewGuid().ToString();
+ CurrentTestTopic = TestContext.CurrentContext.Test.Name + "_" + Guid.NewGuid();
}
internal static void WaitUntillIdle(IZooKeeperClient client, int timeout)
{
Thread.Sleep(timeout);
- int rest = timeout - client.IdleTime;
+ int rest = client.IdleTime.HasValue ? timeout - client.IdleTime.Value : timeout;
while (rest > 0)
{
Thread.Sleep(rest);
- rest = timeout - client.IdleTime;
+ rest = client.IdleTime.HasValue ? timeout - client.IdleTime.Value : timeout;
}
}
}
-}
\ No newline at end of file
+}
Modified: incubator/kafka/trunk/clients/csharp/src/Kafka/Tests/Kafka.Client.IntegrationTests/Kafka.Client.IntegrationTests.csproj
URL: http://svn.apache.org/viewvc/incubator/kafka/trunk/clients/csharp/src/Kafka/Tests/Kafka.Client.IntegrationTests/Kafka.Client.IntegrationTests.csproj?rev=1185772&r1=1185771&r2=1185772&view=diff
==============================================================================
--- incubator/kafka/trunk/clients/csharp/src/Kafka/Tests/Kafka.Client.IntegrationTests/Kafka.Client.IntegrationTests.csproj (original)
+++ incubator/kafka/trunk/clients/csharp/src/Kafka/Tests/Kafka.Client.IntegrationTests/Kafka.Client.IntegrationTests.csproj Tue Oct 18 17:52:13 2011
@@ -88,6 +88,7 @@
</Reference>
</ItemGroup>
<ItemGroup>
+ <Compile Include="CompressionTests.cs" />
<Compile Include="ConsumerRebalancingTests.cs" />
<Compile Include="ConsumerTests.cs" />
<Compile Include="IntegrationFixtureBase.cs" />
@@ -110,6 +111,9 @@
</ProjectReference>
</ItemGroup>
<ItemGroup>
+ <None Include="..\..\..\..\Settings.StyleCop">
+ <Link>Settings.StyleCop</Link>
+ </None>
<None Include="App.config">
<SubType>Designer</SubType>
</None>
@@ -134,4 +138,4 @@
</Target>
<Target Name="AfterBuild">
</Target>
-</Project>
\ No newline at end of file
+</Project>
Modified: incubator/kafka/trunk/clients/csharp/src/Kafka/Tests/Kafka.Client.IntegrationTests/KafkaIntegrationTest.cs
URL: http://svn.apache.org/viewvc/incubator/kafka/trunk/clients/csharp/src/Kafka/Tests/Kafka.Client.IntegrationTests/KafkaIntegrationTest.cs?rev=1185772&r1=1185771&r2=1185772&view=diff
==============================================================================
--- incubator/kafka/trunk/clients/csharp/src/Kafka/Tests/Kafka.Client.IntegrationTests/KafkaIntegrationTest.cs (original)
+++ incubator/kafka/trunk/clients/csharp/src/Kafka/Tests/Kafka.Client.IntegrationTests/KafkaIntegrationTest.cs Tue Oct 18 17:52:13 2011
@@ -22,7 +22,6 @@ namespace Kafka.Client.IntegrationTests
using System.Linq;
using System.Text;
using System.Threading;
- using Kafka.Client.Cfg;
using Kafka.Client.Consumers;
using Kafka.Client.Messages;
using Kafka.Client.Producers.Async;
@@ -37,39 +36,31 @@ namespace Kafka.Client.IntegrationTests
public class KafkaIntegrationTest : IntegrationFixtureBase
{
/// <summary>
- /// Kafka Client configuration
- /// </summary>
- private static KafkaClientConfiguration clientConfig;
-
- /// <summary>
/// Maximum amount of time to wait trying to get a specific test message from Kafka server (in miliseconds)
/// </summary>
private static readonly int MaxTestWaitTimeInMiliseconds = 5000;
- [TestFixtureSetUp]
- public void SetUp()
- {
- clientConfig = KafkaClientConfiguration.GetConfiguration();
- }
-
/// <summary>
/// Sends a pair of message to Kafka.
/// </summary>
[Test]
public void ProducerSendsMessage()
{
+ var prodConfig = this.SyncProducerConfig1;
+
string payload1 = "kafka 1.";
byte[] payloadData1 = Encoding.UTF8.GetBytes(payload1);
- Message msg1 = new Message(payloadData1);
-
+ var msg1 = new Message(payloadData1);
+
string payload2 = "kafka 2.";
byte[] payloadData2 = Encoding.UTF8.GetBytes(payload2);
- Message msg2 = new Message(payloadData2);
+ var msg2 = new Message(payloadData2);
- var config = new SyncProducerConfig(clientConfig);
- var producer = new SyncProducer(config);
- var producerRequest = new ProducerRequest(CurrentTestTopic, 0, new List<Message>() { msg1, msg2 });
- producer.Send(producerRequest);
+ using (var producer = new SyncProducer(prodConfig))
+ {
+ var producerRequest = new ProducerRequest(CurrentTestTopic, 0, new List<Message> { msg1, msg2 });
+ producer.Send(producerRequest);
+ }
}
/// <summary>
@@ -78,12 +69,15 @@ namespace Kafka.Client.IntegrationTests
[Test]
public void ProducerSendsMessageWithLongTopic()
{
- Message msg = new Message(Encoding.UTF8.GetBytes("test message"));
+ var prodConfig = this.SyncProducerConfig1;
+
+ var msg = new Message(Encoding.UTF8.GetBytes("test message"));
string topic = "ThisIsAVeryLongTopicThisIsAVeryLongTopicThisIsAVeryLongTopicThisIsAVeryLongTopicThisIsAVeryLongTopicThisIsAVeryLongTopic";
- var config = new SyncProducerConfig(clientConfig);
- var producer = new SyncProducer(config);
- var producerRequest = new ProducerRequest(topic, 0, new List<Message>() { msg });
- producer.Send(producerRequest);
+ using (var producer = new SyncProducer(prodConfig))
+ {
+ var producerRequest = new ProducerRequest(topic, 0, new List<Message> { msg });
+ producer.Send(producerRequest);
+ }
}
/// <summary>
@@ -92,12 +86,12 @@ namespace Kafka.Client.IntegrationTests
[Test]
public void AsyncProducerSendsManyLongRandomMessages()
{
+ var prodConfig = this.AsyncProducerConfig1;
List<Message> messages = GenerateRandomTextMessages(50);
-
- var config = new AsyncProducerConfig(clientConfig);
-
- var producer = new AsyncProducer(config);
- producer.Send(CurrentTestTopic, 0, messages);
+ using (var producer = new AsyncProducer(prodConfig))
+ {
+ producer.Send(CurrentTestTopic, 0, messages);
+ }
}
/// <summary>
@@ -106,7 +100,9 @@ namespace Kafka.Client.IntegrationTests
[Test]
public void AsyncProducerSendsFewShortFixedMessages()
{
- List<Message> messages = new List<Message>()
+ var prodConfig = this.AsyncProducerConfig1;
+
+ var messages = new List<Message>
{
new Message(Encoding.UTF8.GetBytes("Async Test Message 1")),
new Message(Encoding.UTF8.GetBytes("Async Test Message 2")),
@@ -114,10 +110,10 @@ namespace Kafka.Client.IntegrationTests
new Message(Encoding.UTF8.GetBytes("Async Test Message 4"))
};
- var config = new AsyncProducerConfig(clientConfig);
-
- var producer = new AsyncProducer(config);
- producer.Send(CurrentTestTopic, 0, messages);
+ using (var producer = new AsyncProducer(prodConfig))
+ {
+ producer.Send(CurrentTestTopic, 0, messages);
+ }
}
/// <summary>
@@ -126,25 +122,26 @@ namespace Kafka.Client.IntegrationTests
[Test]
public void AsyncProducerSendsFewShortFixedMessagesInSeparateSendActions()
{
- var config = new AsyncProducerConfig(clientConfig);
- using (var producer = new AsyncProducer(config))
+ var prodConfig = this.AsyncProducerConfig1;
+
+ using (var producer = new AsyncProducer(prodConfig))
{
- ProducerRequest req1 = new ProducerRequest(
+ var req1 = new ProducerRequest(
CurrentTestTopic,
0,
- new List<Message>() { new Message(Encoding.UTF8.GetBytes("Async Test Message 1")) });
+ new List<Message> { new Message(Encoding.UTF8.GetBytes("Async Test Message 1")) });
producer.Send(req1);
- ProducerRequest req2 = new ProducerRequest(
+ var req2 = new ProducerRequest(
CurrentTestTopic,
0,
- new List<Message>() { new Message(Encoding.UTF8.GetBytes("Async Test Message 2")) });
+ new List<Message> { new Message(Encoding.UTF8.GetBytes("Async Test Message 2")) });
producer.Send(req2);
- ProducerRequest req3 = new ProducerRequest(
+ var req3 = new ProducerRequest(
CurrentTestTopic,
0,
- new List<Message>() { new Message(Encoding.UTF8.GetBytes("Async Test Message 3")) });
+ new List<Message> { new Message(Encoding.UTF8.GetBytes("Async Test Message 3")) });
producer.Send(req3);
}
}
@@ -152,14 +149,18 @@ namespace Kafka.Client.IntegrationTests
[Test]
public void AsyncProducerSendsMessageWithCallbackClass()
{
- List<Message> messages = new List<Message>()
+ var prodConfig = this.AsyncProducerConfig1;
+
+ var messages = new List<Message>
{
new Message(Encoding.UTF8.GetBytes("Async Test Message 1")),
};
- var config = new AsyncProducerConfig(clientConfig);
- TestCallbackHandler myHandler = new TestCallbackHandler();
- var producer = new AsyncProducer(config, myHandler);
- producer.Send(CurrentTestTopic, 0, messages);
+ var myHandler = new TestCallbackHandler();
+ using (var producer = new AsyncProducer(prodConfig, myHandler))
+ {
+ producer.Send(CurrentTestTopic, 0, messages);
+ }
+
Thread.Sleep(1000);
Assert.IsTrue(myHandler.WasRun);
}
@@ -167,14 +168,18 @@ namespace Kafka.Client.IntegrationTests
[Test]
public void AsyncProducerSendsMessageWithCallback()
{
- List<Message> messages = new List<Message>()
+ var prodConfig = this.AsyncProducerConfig1;
+
+ var messages = new List<Message>
{
new Message(Encoding.UTF8.GetBytes("Async Test Message 1")),
};
- var config = new AsyncProducerConfig(clientConfig);
- TestCallbackHandler myHandler = new TestCallbackHandler();
- var producer = new AsyncProducer(config);
- producer.Send(CurrentTestTopic, 0, messages, myHandler.Handle);
+ var myHandler = new TestCallbackHandler();
+ using (var producer = new AsyncProducer(prodConfig))
+ {
+ producer.Send(CurrentTestTopic, 0, messages, myHandler.Handle);
+ }
+
Thread.Sleep(1000);
Assert.IsTrue(myHandler.WasRun);
}
@@ -195,7 +200,9 @@ namespace Kafka.Client.IntegrationTests
[Test]
public void ProducerSendMultiRequest()
{
- List<ProducerRequest> requests = new List<ProducerRequest>
+ var prodConfig = this.SyncProducerConfig1;
+
+ var requests = new List<ProducerRequest>
{
new ProducerRequest(CurrentTestTopic, 0, new List<Message> { new Message(Encoding.UTF8.GetBytes("1: " + DateTime.UtcNow)) }),
new ProducerRequest(CurrentTestTopic, 0, new List<Message> { new Message(Encoding.UTF8.GetBytes("2: " + DateTime.UtcNow)) }),
@@ -203,9 +210,10 @@ namespace Kafka.Client.IntegrationTests
new ProducerRequest(CurrentTestTopic, 0, new List<Message> { new Message(Encoding.UTF8.GetBytes("4: " + DateTime.UtcNow)) })
};
- var config = new SyncProducerConfig(clientConfig);
- var producer = new SyncProducer(config);
- producer.MultiSend(requests);
+ using (var producer = new SyncProducer(prodConfig))
+ {
+ producer.MultiSend(requests);
+ }
}
/// <summary>
@@ -214,17 +222,21 @@ namespace Kafka.Client.IntegrationTests
[Test]
public void ConsumerFetchMessage()
{
+ var consumerConfig = this.ConsumerConfig1;
ProducerSendsMessage();
-
- ConsumerConfig config = new ConsumerConfig(clientConfig);
- IConsumer consumer = new Kafka.Client.Consumers.Consumer(config);
- FetchRequest request = new FetchRequest(CurrentTestTopic, 0, 0);
+ Thread.Sleep(1000);
+ IConsumer consumer = new Consumer(consumerConfig);
+ var request = new FetchRequest(CurrentTestTopic, 0, 0);
BufferedMessageSet response = consumer.Fetch(request);
Assert.NotNull(response);
- foreach (var message in response.Messages)
+ int count = 0;
+ foreach (var message in response)
{
- Console.WriteLine(message);
+ count++;
+ Console.WriteLine(message.Message);
}
+
+ Assert.AreEqual(2, count);
}
/// <summary>
@@ -233,25 +245,28 @@ namespace Kafka.Client.IntegrationTests
[Test]
public void ConsumerMultiFetchGetsMessage()
{
- ProducerSendMultiRequest();
+ var config = this.ConsumerConfig1;
- ConsumerConfig config = new ConsumerConfig(clientConfig);
- IConsumer cons = new Consumers.Consumer(config);
- MultiFetchRequest request = new MultiFetchRequest(new List<FetchRequest>
+ ProducerSendMultiRequest();
+ Thread.Sleep(2000);
+ IConsumer cons = new Consumer(config);
+ var request = new MultiFetchRequest(new List<FetchRequest>
{
new FetchRequest(CurrentTestTopic, 0, 0),
new FetchRequest(CurrentTestTopic, 0, 0),
- new FetchRequest(CurrentTestTopic + "2", 0, 0)
+ new FetchRequest(CurrentTestTopic, 0, 0)
});
IList<BufferedMessageSet> response = cons.MultiFetch(request);
+ Assert.AreEqual(3, response.Count);
for (int ix = 0; ix < response.Count; ix++)
{
IEnumerable<Message> messageSet = response[ix].Messages;
+ Assert.AreEqual(4, messageSet.Count());
Console.WriteLine(string.Format("Request #{0}-->", ix));
foreach (Message msg in messageSet)
{
- Console.WriteLine(msg);
+ Console.WriteLine(msg.ToString());
}
}
}
@@ -262,10 +277,10 @@ namespace Kafka.Client.IntegrationTests
[Test]
public void ConsumerGetsOffsets()
{
- OffsetRequest request = new OffsetRequest(CurrentTestTopic, 0, DateTime.Now.AddHours(-24).Ticks, 10);
+ var consumerConfig = this.ConsumerConfig1;
- ConsumerConfig config = new ConsumerConfig(clientConfig);
- IConsumer consumer = new Consumers.Consumer(config);
+ var request = new OffsetRequest(CurrentTestTopic, 0, DateTime.Now.AddHours(-24).Ticks, 10);
+ IConsumer consumer = new Consumer(consumerConfig);
IList<long> list = consumer.GetOffsetsBefore(request);
foreach (long l in list)
@@ -280,20 +295,19 @@ namespace Kafka.Client.IntegrationTests
[Test]
public void ProducerSendsAndConsumerReceivesSingleSimpleMessage()
{
- Message sourceMessage = new Message(Encoding.UTF8.GetBytes("test message"));
-
- var config = new SyncProducerConfig(clientConfig);
- var producer = new SyncProducer(config);
- var producerRequest = new ProducerRequest(CurrentTestTopic, 0, new List<Message>() { sourceMessage });
-
- long currentOffset = TestHelper.GetCurrentKafkaOffset(CurrentTestTopic, clientConfig);
+ var prodConfig = this.SyncProducerConfig1;
+ var consumerConfig = this.ConsumerConfig1;
- producer.Send(producerRequest);
-
- ConsumerConfig consumerConfig = new ConsumerConfig(clientConfig);
- IConsumer consumer = new Consumers.Consumer(consumerConfig);
- FetchRequest request = new FetchRequest(CurrentTestTopic, 0, currentOffset);
+ var sourceMessage = new Message(Encoding.UTF8.GetBytes("test message"));
+ long currentOffset = TestHelper.GetCurrentKafkaOffset(CurrentTestTopic, consumerConfig);
+ using (var producer = new SyncProducer(prodConfig))
+ {
+ var producerRequest = new ProducerRequest(CurrentTestTopic, 0, new List<Message> { sourceMessage });
+ producer.Send(producerRequest);
+ }
+ IConsumer consumer = new Consumer(consumerConfig);
+ var request = new FetchRequest(CurrentTestTopic, 0, currentOffset);
BufferedMessageSet response;
int totalWaitTimeInMiliseconds = 0;
int waitSingle = 100;
@@ -305,13 +319,11 @@ namespace Kafka.Client.IntegrationTests
{
break;
}
- else
+
+ totalWaitTimeInMiliseconds += waitSingle;
+ if (totalWaitTimeInMiliseconds >= MaxTestWaitTimeInMiliseconds)
{
- totalWaitTimeInMiliseconds += waitSingle;
- if (totalWaitTimeInMiliseconds >= MaxTestWaitTimeInMiliseconds)
- {
- break;
- }
+ break;
}
}
@@ -327,19 +339,19 @@ namespace Kafka.Client.IntegrationTests
[Test]
public void AsyncProducerSendsAndConsumerReceivesSingleSimpleMessage()
{
- Message sourceMessage = new Message(Encoding.UTF8.GetBytes("test message"));
-
- var config = new AsyncProducerConfig(clientConfig);
- var producer = new AsyncProducer(config);
- var producerRequest = new ProducerRequest(CurrentTestTopic, 0, new List<Message>() { sourceMessage });
-
- long currentOffset = TestHelper.GetCurrentKafkaOffset(CurrentTestTopic, clientConfig);
+ var prodConfig = this.AsyncProducerConfig1;
+ var consumerConfig = this.ConsumerConfig1;
- producer.Send(producerRequest);
+ var sourceMessage = new Message(Encoding.UTF8.GetBytes("test message"));
+ using (var producer = new AsyncProducer(prodConfig))
+ {
+ var producerRequest = new ProducerRequest(CurrentTestTopic, 0, new List<Message> { sourceMessage });
+ producer.Send(producerRequest);
+ }
- ConsumerConfig consumerConfig = new ConsumerConfig(clientConfig);
- IConsumer consumer = new Consumers.Consumer(consumerConfig);
- FetchRequest request = new FetchRequest(CurrentTestTopic, 0, currentOffset);
+ long currentOffset = TestHelper.GetCurrentKafkaOffset(CurrentTestTopic, consumerConfig);
+ IConsumer consumer = new Consumer(consumerConfig);
+ var request = new FetchRequest(CurrentTestTopic, 0, currentOffset);
BufferedMessageSet response;
int totalWaitTimeInMiliseconds = 0;
@@ -352,13 +364,11 @@ namespace Kafka.Client.IntegrationTests
{
break;
}
- else
+
+ totalWaitTimeInMiliseconds += waitSingle;
+ if (totalWaitTimeInMiliseconds >= MaxTestWaitTimeInMiliseconds)
{
- totalWaitTimeInMiliseconds += waitSingle;
- if (totalWaitTimeInMiliseconds >= MaxTestWaitTimeInMiliseconds)
- {
- break;
- }
+ break;
}
}
@@ -374,16 +384,19 @@ namespace Kafka.Client.IntegrationTests
[Test]
public void ProducerSendsAndConsumerReceivesMultiRequest()
{
+ var prodConfig = this.SyncProducerConfig1;
+ var consumerConfig = this.ConsumerConfig1;
+
string testTopic1 = CurrentTestTopic + "1";
string testTopic2 = CurrentTestTopic + "2";
string testTopic3 = CurrentTestTopic + "3";
- Message sourceMessage1 = new Message(Encoding.UTF8.GetBytes("1: TestMessage"));
- Message sourceMessage2 = new Message(Encoding.UTF8.GetBytes("2: TestMessage"));
- Message sourceMessage3 = new Message(Encoding.UTF8.GetBytes("3: TestMessage"));
- Message sourceMessage4 = new Message(Encoding.UTF8.GetBytes("4: TestMessage"));
+ var sourceMessage1 = new Message(Encoding.UTF8.GetBytes("1: TestMessage"));
+ var sourceMessage2 = new Message(Encoding.UTF8.GetBytes("2: TestMessage"));
+ var sourceMessage3 = new Message(Encoding.UTF8.GetBytes("3: TestMessage"));
+ var sourceMessage4 = new Message(Encoding.UTF8.GetBytes("4: TestMessage"));
- List<ProducerRequest> requests = new List<ProducerRequest>
+ var requests = new List<ProducerRequest>
{
new ProducerRequest(testTopic1, 0, new List<Message> { sourceMessage1 }),
new ProducerRequest(testTopic1, 0, new List<Message> { sourceMessage2 }),
@@ -391,18 +404,17 @@ namespace Kafka.Client.IntegrationTests
new ProducerRequest(testTopic3, 0, new List<Message> { sourceMessage4 })
};
- var config = new SyncProducerConfig(clientConfig);
- var producer = new SyncProducer(config);
+ long currentOffset1 = TestHelper.GetCurrentKafkaOffset(testTopic1, consumerConfig);
+ long currentOffset2 = TestHelper.GetCurrentKafkaOffset(testTopic2, consumerConfig);
+ long currentOffset3 = TestHelper.GetCurrentKafkaOffset(testTopic3, consumerConfig);
- long currentOffset1 = TestHelper.GetCurrentKafkaOffset(testTopic1, clientConfig);
- long currentOffset2 = TestHelper.GetCurrentKafkaOffset(testTopic2, clientConfig);
- long currentOffset3 = TestHelper.GetCurrentKafkaOffset(testTopic3, clientConfig);
-
- producer.MultiSend(requests);
-
- ConsumerConfig consumerConfig = new ConsumerConfig(clientConfig);
- IConsumer consumer = new Consumers.Consumer(consumerConfig);
- MultiFetchRequest request = new MultiFetchRequest(new List<FetchRequest>
+ using (var producer = new SyncProducer(prodConfig))
+ {
+ producer.MultiSend(requests);
+ }
+
+ IConsumer consumer = new Consumer(consumerConfig);
+ var request = new MultiFetchRequest(new List<FetchRequest>
{
new FetchRequest(testTopic1, 0, currentOffset1),
new FetchRequest(testTopic2, 0, currentOffset2),
@@ -419,13 +431,11 @@ namespace Kafka.Client.IntegrationTests
{
break;
}
- else
+
+ totalWaitTimeInMiliseconds += waitSingle;
+ if (totalWaitTimeInMiliseconds >= MaxTestWaitTimeInMiliseconds)
{
- totalWaitTimeInMiliseconds += waitSingle;
- if (totalWaitTimeInMiliseconds >= MaxTestWaitTimeInMiliseconds)
- {
- break;
- }
+ break;
}
}
@@ -440,43 +450,13 @@ namespace Kafka.Client.IntegrationTests
}
/// <summary>
- /// Gererates a randome list of messages.
- /// </summary>
- /// <param name="numberOfMessages">The number of messages to generate.</param>
- /// <returns>A list of random messages.</returns>
- private static List<Message> GenerateRandomMessages(int numberOfMessages)
- {
- List<Message> messages = new List<Message>();
- for (int ix = 0; ix < numberOfMessages; ix++)
- {
- messages.Add(new Message(GenerateRandomBytes(10000)));
- }
-
- return messages;
- }
-
- /// <summary>
- /// Generate a random set of bytes.
- /// </summary>
- /// <param name="length">Length of the byte array.</param>
- /// <returns>Random byte array.</returns>
- private static byte[] GenerateRandomBytes(int length)
- {
- byte[] randBytes = new byte[length];
- Random randNum = new Random();
- randNum.NextBytes(randBytes);
-
- return randBytes;
- }
-
- /// <summary>
/// Gererates a randome list of text messages.
/// </summary>
/// <param name="numberOfMessages">The number of messages to generate.</param>
/// <returns>A list of random text messages.</returns>
private static List<Message> GenerateRandomTextMessages(int numberOfMessages)
{
- List<Message> messages = new List<Message>();
+ var messages = new List<Message>();
for (int ix = 0; ix < numberOfMessages; ix++)
{
////messages.Add(new Message(GenerateRandomBytes(10000)));
@@ -493,12 +473,11 @@ namespace Kafka.Client.IntegrationTests
/// <returns>Random message string.</returns>
private static string GenerateRandomMessage(int length)
{
- StringBuilder builder = new StringBuilder();
- Random random = new Random();
- char ch;
+ var builder = new StringBuilder();
+ var random = new Random();
for (int i = 0; i < length; i++)
{
- ch = Convert.ToChar(Convert.ToInt32(
+ char ch = Convert.ToChar(Convert.ToInt32(
Math.Floor((26 * random.NextDouble()) + 65)));
builder.Append(ch);
}
Modified: incubator/kafka/trunk/clients/csharp/src/Kafka/Tests/Kafka.Client.IntegrationTests/MockAlwaysZeroPartitioner.cs
URL: http://svn.apache.org/viewvc/incubator/kafka/trunk/clients/csharp/src/Kafka/Tests/Kafka.Client.IntegrationTests/MockAlwaysZeroPartitioner.cs?rev=1185772&r1=1185771&r2=1185772&view=diff
==============================================================================
--- incubator/kafka/trunk/clients/csharp/src/Kafka/Tests/Kafka.Client.IntegrationTests/MockAlwaysZeroPartitioner.cs (original)
+++ incubator/kafka/trunk/clients/csharp/src/Kafka/Tests/Kafka.Client.IntegrationTests/MockAlwaysZeroPartitioner.cs Tue Oct 18 17:52:13 2011
@@ -19,6 +19,8 @@ using Kafka.Client.Producers.Partitionin
namespace Kafka.Client.IntegrationTests
{
+ using Kafka.Client.Producers.Partitioning;
+
/// <summary>
/// This mock partitioner will always point to the first partition (the one of index = 0)
/// </summary>
Modified: incubator/kafka/trunk/clients/csharp/src/Kafka/Tests/Kafka.Client.IntegrationTests/ProducerTests.cs
URL: http://svn.apache.org/viewvc/incubator/kafka/trunk/clients/csharp/src/Kafka/Tests/Kafka.Client.IntegrationTests/ProducerTests.cs?rev=1185772&r1=1185771&r2=1185772&view=diff
==============================================================================
--- incubator/kafka/trunk/clients/csharp/src/Kafka/Tests/Kafka.Client.IntegrationTests/ProducerTests.cs (original)
+++ incubator/kafka/trunk/clients/csharp/src/Kafka/Tests/Kafka.Client.IntegrationTests/ProducerTests.cs Tue Oct 18 17:52:13 2011
@@ -33,213 +33,194 @@ namespace Kafka.Client.IntegrationTests
public class ProducerTests : IntegrationFixtureBase
{
/// <summary>
- /// Kafka Client configuration
- /// </summary>
- private KafkaClientConfiguration clientConfig;
-
- /// <summary>
/// Maximum amount of time to wait trying to get a specific test message from Kafka server (in miliseconds)
/// </summary>
private readonly int maxTestWaitTimeInMiliseconds = 5000;
- [TestFixtureSetUp]
- public void SetUp()
- {
- clientConfig = KafkaClientConfiguration.GetConfiguration();
- clientConfig.SupressZooKeeper();
- }
-
[Test]
public void ProducerSends1Message()
{
+ var prodConfig = this.ConfigBasedSyncProdConfig;
+
int totalWaitTimeInMiliseconds = 0;
int waitSingle = 100;
var originalMessage = new Message(Encoding.UTF8.GetBytes("TestData"));
var multipleBrokersHelper = new TestMultipleBrokersHelper(CurrentTestTopic);
- multipleBrokersHelper.GetCurrentOffsets();
-
- var producerConfig = new ProducerConfig(clientConfig);
- var mockPartitioner = new MockAlwaysZeroPartitioner();
- using (var producer = new Producer<string, Message>(producerConfig, mockPartitioner, new DefaultEncoder(), null))
+ multipleBrokersHelper.GetCurrentOffsets(
+ new[] { this.SyncProducerConfig1, this.SyncProducerConfig2, this.SyncProducerConfig3 });
+ using (var producer = new Producer(prodConfig))
{
var producerData = new ProducerData<string, Message>(
- CurrentTestTopic, "somekey", new List<Message> { originalMessage });
+ CurrentTestTopic, new List<Message> { originalMessage });
producer.Send(producerData);
Thread.Sleep(waitSingle);
+ }
- while (!multipleBrokersHelper.CheckIfAnyBrokerHasChanged())
+ while (
+ !multipleBrokersHelper.CheckIfAnyBrokerHasChanged(
+ new[] { this.SyncProducerConfig1, this.SyncProducerConfig2, this.SyncProducerConfig3 }))
+ {
+ totalWaitTimeInMiliseconds += waitSingle;
+ Thread.Sleep(waitSingle);
+ if (totalWaitTimeInMiliseconds > this.maxTestWaitTimeInMiliseconds)
{
- totalWaitTimeInMiliseconds += waitSingle;
- Thread.Sleep(waitSingle);
- if (totalWaitTimeInMiliseconds > this.maxTestWaitTimeInMiliseconds)
- {
- Assert.Fail("None of the brokers changed their offset after sending a message");
- }
- }
-
- totalWaitTimeInMiliseconds = 0;
-
- var consumerConfig = new ConsumerConfig(clientConfig)
- {
- Host = multipleBrokersHelper.BrokerThatHasChanged.Address,
- Port = multipleBrokersHelper.BrokerThatHasChanged.Port
- };
- IConsumer consumer = new Consumers.Consumer(consumerConfig);
- var request = new FetchRequest(CurrentTestTopic, 0, multipleBrokersHelper.OffsetFromBeforeTheChange);
-
- BufferedMessageSet response;
-
- while (true)
- {
- Thread.Sleep(waitSingle);
- response = consumer.Fetch(request);
- if (response != null && response.Messages.Count() > 0)
- {
- break;
- }
-
- totalWaitTimeInMiliseconds += waitSingle;
- if (totalWaitTimeInMiliseconds >= this.maxTestWaitTimeInMiliseconds)
- {
- break;
- }
- }
-
- Assert.NotNull(response);
- Assert.AreEqual(1, response.Messages.Count());
- Assert.AreEqual(originalMessage.ToString(), response.Messages.First().ToString());
+ Assert.Fail("None of the brokers changed their offset after sending a message");
+ }
}
+
+ totalWaitTimeInMiliseconds = 0;
+
+ var consumerConfig = new ConsumerConfiguration(
+ multipleBrokersHelper.BrokerThatHasChanged.Host, multipleBrokersHelper.BrokerThatHasChanged.Port);
+ IConsumer consumer = new Consumer(consumerConfig);
+ var request1 = new FetchRequest(CurrentTestTopic, multipleBrokersHelper.PartitionThatHasChanged, multipleBrokersHelper.OffsetFromBeforeTheChange);
+ BufferedMessageSet response;
+ while (true)
+ {
+ Thread.Sleep(waitSingle);
+ response = consumer.Fetch(request1);
+ if (response != null && response.Messages.Count() > 0)
+ {
+ break;
+ }
+
+ totalWaitTimeInMiliseconds += waitSingle;
+ if (totalWaitTimeInMiliseconds >= this.maxTestWaitTimeInMiliseconds)
+ {
+ break;
+ }
+ }
+
+ Assert.NotNull(response);
+ Assert.AreEqual(1, response.Messages.Count());
+ Assert.AreEqual(originalMessage.ToString(), response.Messages.First().ToString());
}
[Test]
public void ProducerSends3Messages()
{
+ var prodConfig = this.ConfigBasedSyncProdConfig;
+
int totalWaitTimeInMiliseconds = 0;
int waitSingle = 100;
var originalMessage1 = new Message(Encoding.UTF8.GetBytes("TestData1"));
var originalMessage2 = new Message(Encoding.UTF8.GetBytes("TestData2"));
var originalMessage3 = new Message(Encoding.UTF8.GetBytes("TestData3"));
- var originalMessageList =
- new List<Message> { originalMessage1, originalMessage2, originalMessage3 };
+ var originalMessageList = new List<Message> { originalMessage1, originalMessage2, originalMessage3 };
var multipleBrokersHelper = new TestMultipleBrokersHelper(CurrentTestTopic);
- multipleBrokersHelper.GetCurrentOffsets();
-
- var producerConfig = new ProducerConfig(clientConfig);
- var mockPartitioner = new MockAlwaysZeroPartitioner();
- using (var producer = new Producer<string, Message>(producerConfig, mockPartitioner, new DefaultEncoder(), null))
+ multipleBrokersHelper.GetCurrentOffsets(
+ new[] { this.SyncProducerConfig1, this.SyncProducerConfig2, this.SyncProducerConfig3 });
+ using (var producer = new Producer(prodConfig))
{
- var producerData = new ProducerData<string, Message>(CurrentTestTopic, "somekey", originalMessageList);
+ var producerData = new ProducerData<string, Message>(CurrentTestTopic, originalMessageList);
producer.Send(producerData);
+ }
+
+ Thread.Sleep(waitSingle);
+ while (
+ !multipleBrokersHelper.CheckIfAnyBrokerHasChanged(
+ new[] { this.SyncProducerConfig1, this.SyncProducerConfig2, this.SyncProducerConfig3 }))
+ {
+ totalWaitTimeInMiliseconds += waitSingle;
+ Thread.Sleep(waitSingle);
+ if (totalWaitTimeInMiliseconds > this.maxTestWaitTimeInMiliseconds)
+ {
+ Assert.Fail("None of the brokers changed their offset after sending a message");
+ }
+ }
+
+ totalWaitTimeInMiliseconds = 0;
+
+ var consumerConfig = new ConsumerConfiguration(
+ multipleBrokersHelper.BrokerThatHasChanged.Host, multipleBrokersHelper.BrokerThatHasChanged.Port);
+ IConsumer consumer = new Consumer(consumerConfig);
+ var request = new FetchRequest(CurrentTestTopic, multipleBrokersHelper.PartitionThatHasChanged, multipleBrokersHelper.OffsetFromBeforeTheChange);
+
+ BufferedMessageSet response;
+ while (true)
+ {
Thread.Sleep(waitSingle);
+ response = consumer.Fetch(request);
+ if (response != null && response.Messages.Count() > 2)
+ {
+ break;
+ }
- while (!multipleBrokersHelper.CheckIfAnyBrokerHasChanged())
+ totalWaitTimeInMiliseconds += waitSingle;
+ if (totalWaitTimeInMiliseconds >= this.maxTestWaitTimeInMiliseconds)
{
- totalWaitTimeInMiliseconds += waitSingle;
- Thread.Sleep(waitSingle);
- if (totalWaitTimeInMiliseconds > this.maxTestWaitTimeInMiliseconds)
- {
- Assert.Fail("None of the brokers changed their offset after sending a message");
- }
- }
-
- totalWaitTimeInMiliseconds = 0;
-
- var consumerConfig = new ConsumerConfig(clientConfig)
- {
- Host = multipleBrokersHelper.BrokerThatHasChanged.Address,
- Port = multipleBrokersHelper.BrokerThatHasChanged.Port
- };
- IConsumer consumer = new Consumers.Consumer(consumerConfig);
- var request = new FetchRequest(CurrentTestTopic, 0, multipleBrokersHelper.OffsetFromBeforeTheChange);
-
- BufferedMessageSet response;
- while (true)
- {
- Thread.Sleep(waitSingle);
- response = consumer.Fetch(request);
- if (response != null && response.Messages.Count() > 2)
- {
- break;
- }
-
- totalWaitTimeInMiliseconds += waitSingle;
- if (totalWaitTimeInMiliseconds >= this.maxTestWaitTimeInMiliseconds)
- {
- break;
- }
- }
-
- Assert.NotNull(response);
- Assert.AreEqual(3, response.Messages.Count());
- Assert.AreEqual(originalMessage1.ToString(), response.Messages.First().ToString());
- Assert.AreEqual(originalMessage2.ToString(), response.Messages.Skip(1).First().ToString());
- Assert.AreEqual(originalMessage3.ToString(), response.Messages.Skip(2).First().ToString());
+ break;
+ }
}
+
+ Assert.NotNull(response);
+ Assert.AreEqual(3, response.Messages.Count());
+ Assert.AreEqual(originalMessage1.ToString(), response.Messages.First().ToString());
+ Assert.AreEqual(originalMessage2.ToString(), response.Messages.Skip(1).First().ToString());
+ Assert.AreEqual(originalMessage3.ToString(), response.Messages.Skip(2).First().ToString());
}
[Test]
public void ProducerSends1MessageUsingNotDefaultEncoder()
{
+ var prodConfig = this.ConfigBasedSyncProdConfig;
+
int totalWaitTimeInMiliseconds = 0;
int waitSingle = 100;
string originalMessage = "TestData";
var multipleBrokersHelper = new TestMultipleBrokersHelper(CurrentTestTopic);
- multipleBrokersHelper.GetCurrentOffsets();
-
- var producerConfig = new ProducerConfig(clientConfig);
- var mockPartitioner = new MockAlwaysZeroPartitioner();
- using (var producer = new Producer<string, string>(producerConfig, mockPartitioner, new StringEncoder(), null))
+ multipleBrokersHelper.GetCurrentOffsets(new[] { this.SyncProducerConfig1, this.SyncProducerConfig2, this.SyncProducerConfig3 });
+ using (var producer = new Producer<string, string>(prodConfig, null, new StringEncoder(), null))
{
var producerData = new ProducerData<string, string>(
- CurrentTestTopic, "somekey", new List<string> { originalMessage });
+ CurrentTestTopic, new List<string> { originalMessage });
producer.Send(producerData);
+ }
+
+ Thread.Sleep(waitSingle);
+
+ while (!multipleBrokersHelper.CheckIfAnyBrokerHasChanged(new[] { this.SyncProducerConfig1, this.SyncProducerConfig2, this.SyncProducerConfig3 }))
+ {
+ totalWaitTimeInMiliseconds += waitSingle;
+ Thread.Sleep(waitSingle);
+ if (totalWaitTimeInMiliseconds > this.maxTestWaitTimeInMiliseconds)
+ {
+ Assert.Fail("None of the brokers changed their offset after sending a message");
+ }
+ }
+
+ totalWaitTimeInMiliseconds = 0;
+
+ var consumerConfig = new ConsumerConfiguration(
+ multipleBrokersHelper.BrokerThatHasChanged.Host,
+ multipleBrokersHelper.BrokerThatHasChanged.Port);
+ IConsumer consumer = new Consumer(consumerConfig);
+ var request = new FetchRequest(CurrentTestTopic, multipleBrokersHelper.PartitionThatHasChanged, multipleBrokersHelper.OffsetFromBeforeTheChange);
+
+ BufferedMessageSet response;
+ while (true)
+ {
Thread.Sleep(waitSingle);
+ response = consumer.Fetch(request);
+ if (response != null && response.Messages.Count() > 0)
+ {
+ break;
+ }
- while (!multipleBrokersHelper.CheckIfAnyBrokerHasChanged())
+ totalWaitTimeInMiliseconds += waitSingle;
+ if (totalWaitTimeInMiliseconds >= this.maxTestWaitTimeInMiliseconds)
{
- totalWaitTimeInMiliseconds += waitSingle;
- Thread.Sleep(waitSingle);
- if (totalWaitTimeInMiliseconds > this.maxTestWaitTimeInMiliseconds)
- {
- Assert.Fail("None of the brokers changed their offset after sending a message");
- }
- }
-
- totalWaitTimeInMiliseconds = 0;
-
- var consumerConfig = new ConsumerConfig(clientConfig)
- {
- Host = multipleBrokersHelper.BrokerThatHasChanged.Address,
- Port = multipleBrokersHelper.BrokerThatHasChanged.Port
- };
- IConsumer consumer = new Consumers.Consumer(consumerConfig);
- var request = new FetchRequest(CurrentTestTopic, 0, multipleBrokersHelper.OffsetFromBeforeTheChange);
-
- BufferedMessageSet response;
- while (true)
- {
- Thread.Sleep(waitSingle);
- response = consumer.Fetch(request);
- if (response != null && response.Messages.Count() > 0)
- {
- break;
- }
-
- totalWaitTimeInMiliseconds += waitSingle;
- if (totalWaitTimeInMiliseconds >= this.maxTestWaitTimeInMiliseconds)
- {
- break;
- }
- }
-
- Assert.NotNull(response);
- Assert.AreEqual(1, response.Messages.Count());
- Assert.AreEqual(originalMessage, Encoding.UTF8.GetString(response.Messages.First().Payload));
+ break;
+ }
}
+
+ Assert.NotNull(response);
+ Assert.AreEqual(1, response.Messages.Count());
+ Assert.AreEqual(originalMessage, Encoding.UTF8.GetString(response.Messages.First().Payload));
}
}
}
Modified: incubator/kafka/trunk/clients/csharp/src/Kafka/Tests/Kafka.Client.IntegrationTests/TestHelper.cs
URL: http://svn.apache.org/viewvc/incubator/kafka/trunk/clients/csharp/src/Kafka/Tests/Kafka.Client.IntegrationTests/TestHelper.cs?rev=1185772&r1=1185771&r2=1185772&view=diff
==============================================================================
--- incubator/kafka/trunk/clients/csharp/src/Kafka/Tests/Kafka.Client.IntegrationTests/TestHelper.cs (original)
+++ incubator/kafka/trunk/clients/csharp/src/Kafka/Tests/Kafka.Client.IntegrationTests/TestHelper.cs Tue Oct 18 17:52:13 2011
@@ -19,33 +19,30 @@ namespace Kafka.Client.IntegrationTests
{
using System;
using System.Collections.Generic;
+ using System.Linq;
using Kafka.Client.Cfg;
using Kafka.Client.Consumers;
using Kafka.Client.Requests;
public static class TestHelper
{
- public static long GetCurrentKafkaOffset(string topic, KafkaClientConfiguration clientConfig)
+ public static long GetCurrentKafkaOffset(string topic, ConsumerConfiguration clientConfig)
{
- return GetCurrentKafkaOffset(topic, clientConfig.KafkaServer.Address, clientConfig.KafkaServer.Port);
+ return GetCurrentKafkaOffset(topic, clientConfig.Broker.Host, clientConfig.Broker.Port);
}
public static long GetCurrentKafkaOffset(string topic, string address, int port)
{
- OffsetRequest request = new OffsetRequest(topic, 0, DateTime.Now.AddDays(-5).Ticks, 10);
- ConsumerConfig consumerConfig = new ConsumerConfig();
- consumerConfig.Host = address;
- consumerConfig.Port = port;
- IConsumer consumer = new Consumers.Consumer(consumerConfig);
+ return GetCurrentKafkaOffset(topic, address, port, 0);
+ }
+
+ public static long GetCurrentKafkaOffset(string topic, string address, int port, int partition)
+ {
+ var request = new OffsetRequest(topic, partition, DateTime.Now.AddDays(-5).Ticks, 10);
+ var consumerConfig = new ConsumerConfiguration(address, port);
+ IConsumer consumer = new Consumer(consumerConfig, address, port);
IList<long> list = consumer.GetOffsetsBefore(request);
- if (list.Count > 0)
- {
- return list[0];
- }
- else
- {
- return 0;
- }
+ return list.Sum();
}
}
}
Modified: incubator/kafka/trunk/clients/csharp/src/Kafka/Tests/Kafka.Client.IntegrationTests/TestMultipleBrokersHelper.cs
URL: http://svn.apache.org/viewvc/incubator/kafka/trunk/clients/csharp/src/Kafka/Tests/Kafka.Client.IntegrationTests/TestMultipleBrokersHelper.cs?rev=1185772&r1=1185771&r2=1185772&view=diff
==============================================================================
--- incubator/kafka/trunk/clients/csharp/src/Kafka/Tests/Kafka.Client.IntegrationTests/TestMultipleBrokersHelper.cs (original)
+++ incubator/kafka/trunk/clients/csharp/src/Kafka/Tests/Kafka.Client.IntegrationTests/TestMultipleBrokersHelper.cs Tue Oct 18 17:52:13 2011
@@ -22,55 +22,52 @@ namespace Kafka.Client.IntegrationTests
public class TestMultipleBrokersHelper
{
- private BrokerPartitionInfoCollection configBrokers =
- KafkaClientConfiguration.GetConfiguration().BrokerPartitionInfos;
+ private readonly Dictionary<int, Dictionary<int, long>> offsets = new Dictionary<int, Dictionary<int, long>>();
- private Dictionary<int, long> offsets = new Dictionary<int, long>();
-
- private BrokerPartitionInfo changedBroker;
-
- private string topic;
+ private readonly string topic;
public TestMultipleBrokersHelper(string topic)
{
this.topic = topic;
}
- public BrokerPartitionInfo BrokerThatHasChanged
- {
- get { return changedBroker; }
- }
+ public SyncProducerConfiguration BrokerThatHasChanged { get; private set; }
+
+ public int PartitionThatHasChanged { get; private set; }
public long OffsetFromBeforeTheChange
{
get
{
- if (changedBroker != null)
- {
- return offsets[changedBroker.Id];
- }
- else
- {
- return 0;
- }
+ return this.BrokerThatHasChanged != null ? this.offsets[this.BrokerThatHasChanged.BrokerId][this.PartitionThatHasChanged] : 0;
}
}
- public void GetCurrentOffsets()
+ public void GetCurrentOffsets(IEnumerable<SyncProducerConfiguration> brokers)
{
- foreach (BrokerPartitionInfo broker in configBrokers)
+ foreach (var broker in brokers)
{
- offsets.Add(broker.Id, TestHelper.GetCurrentKafkaOffset(topic, broker.Address, broker.Port));
+ offsets.Add(broker.BrokerId, new Dictionary<int, long>());
+ offsets[broker.BrokerId].Add(0, TestHelper.GetCurrentKafkaOffset(topic, broker.Host, broker.Port, 0));
+ offsets[broker.BrokerId].Add(1, TestHelper.GetCurrentKafkaOffset(topic, broker.Host, broker.Port, 1));
}
}
- public bool CheckIfAnyBrokerHasChanged()
+ public bool CheckIfAnyBrokerHasChanged(IEnumerable<SyncProducerConfiguration> brokers)
{
- foreach (BrokerPartitionInfo broker in configBrokers)
+ foreach (var broker in brokers)
{
- if (TestHelper.GetCurrentKafkaOffset(topic, broker.Address, broker.Port) != offsets[broker.Id])
+ if (TestHelper.GetCurrentKafkaOffset(topic, broker.Host, broker.Port, 0) != offsets[broker.BrokerId][0])
+ {
+ this.BrokerThatHasChanged = broker;
+ this.PartitionThatHasChanged = 0;
+ return true;
+ }
+
+ if (TestHelper.GetCurrentKafkaOffset(topic, broker.Host, broker.Port, 1) != offsets[broker.BrokerId][1])
{
- changedBroker = broker;
+ this.BrokerThatHasChanged = broker;
+ this.PartitionThatHasChanged = 1;
return true;
}
}