You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by ju...@apache.org on 2011/10/18 19:52:16 UTC
svn commit: r1185772 [4/4] - in
/incubator/kafka/trunk/clients/csharp/src/Kafka: Kafka.Client/
Kafka.Client/Cfg/ Kafka.Client/Consumers/ Kafka.Client/Exceptions/
Kafka.Client/Messages/ Kafka.Client/Producers/
Kafka.Client/Producers/Async/ Kafka.Client/...
Modified: incubator/kafka/trunk/clients/csharp/src/Kafka/Tests/Kafka.Client.IntegrationTests/ZKBrokerPartitionInfoTests.cs
URL: http://svn.apache.org/viewvc/incubator/kafka/trunk/clients/csharp/src/Kafka/Tests/Kafka.Client.IntegrationTests/ZKBrokerPartitionInfoTests.cs?rev=1185772&r1=1185771&r2=1185772&view=diff
==============================================================================
--- incubator/kafka/trunk/clients/csharp/src/Kafka/Tests/Kafka.Client.IntegrationTests/ZKBrokerPartitionInfoTests.cs (original)
+++ incubator/kafka/trunk/clients/csharp/src/Kafka/Tests/Kafka.Client.IntegrationTests/ZKBrokerPartitionInfoTests.cs Tue Oct 18 17:52:13 2011
@@ -19,7 +19,6 @@ namespace Kafka.Client.IntegrationTests
{
using System.Collections.Generic;
using System.Linq;
- using Kafka.Client.Cfg;
using Kafka.Client.Cluster;
using Kafka.Client.Producers.Partitioning;
using Kafka.Client.Utils;
@@ -29,60 +28,53 @@ namespace Kafka.Client.IntegrationTests
using ZooKeeperNet;
[TestFixture]
- public class ZKBrokerPartitionInfoTests : IntegrationFixtureBase
+ public class ZkBrokerPartitionInfoTests : IntegrationFixtureBase
{
- private KafkaClientConfiguration clientConfig;
- private ZKConfig zkConfig;
-
- [TestFixtureSetUp]
- public void SetUp()
- {
- clientConfig = KafkaClientConfiguration.GetConfiguration();
- zkConfig = new ProducerConfig(clientConfig);
- }
-
[Test]
- public void ZKBrokerPartitionInfoGetsAllBrokerInfo()
+ public void ZkBrokerPartitionInfoGetsAllBrokerInfo()
{
+ var prodConfig = this.ZooKeeperBasedSyncProdConfig;
+ var prodConfigNotZk = this.ConfigBasedSyncProdConfig;
+
IDictionary<int, Broker> allBrokerInfo;
- using (var brokerPartitionInfo = new ZKBrokerPartitionInfo(zkConfig, null))
+ using (var brokerPartitionInfo = new ZKBrokerPartitionInfo(prodConfig, null))
{
allBrokerInfo = brokerPartitionInfo.GetAllBrokerInfo();
}
- Assert.AreEqual(clientConfig.BrokerPartitionInfos.Count, allBrokerInfo.Count);
- foreach (BrokerPartitionInfo cfgBrPartInfo in clientConfig.BrokerPartitionInfos)
- {
- Assert.IsTrue(allBrokerInfo.ContainsKey(cfgBrPartInfo.Id));
- Assert.AreEqual(cfgBrPartInfo.Address, allBrokerInfo[cfgBrPartInfo.Id].Host);
- Assert.AreEqual(cfgBrPartInfo.Port, allBrokerInfo[cfgBrPartInfo.Id].Port);
- }
+ Assert.AreEqual(prodConfigNotZk.Brokers.Count, allBrokerInfo.Count);
+ allBrokerInfo.Values.All(x => prodConfigNotZk.Brokers.Any(
+ y => x.Id == y.BrokerId
+ && x.Host == y.Host
+ && x.Port == y.Port));
}
[Test]
- public void ZKBrokerPartitionInfoGetsBrokerPartitionInfo()
+ public void ZkBrokerPartitionInfoGetsBrokerPartitionInfo()
{
+ var prodconfig = this.ZooKeeperBasedSyncProdConfig;
SortedSet<Partition> partitions;
- using (var brokerPartitionInfo = new ZKBrokerPartitionInfo(zkConfig, null))
+ using (var brokerPartitionInfo = new ZKBrokerPartitionInfo(prodconfig, null))
{
partitions = brokerPartitionInfo.GetBrokerPartitionInfo("test");
}
Assert.NotNull(partitions);
Assert.GreaterOrEqual(partitions.Count, 2);
- var partition = partitions.ToList()[0];
- Assert.AreEqual(0, partition.BrokerId);
}
[Test]
public void ZkBrokerPartitionInfoGetsBrokerInfo()
{
- using (var brokerPartitionInfo = new ZKBrokerPartitionInfo(zkConfig, null))
+ var prodConfig = this.ZooKeeperBasedSyncProdConfig;
+ var prodConfigNotZk = this.ConfigBasedSyncProdConfig;
+
+ using (var brokerPartitionInfo = new ZKBrokerPartitionInfo(prodConfig, null))
{
- var testBroker = clientConfig.BrokerPartitionInfos[0];
- Broker broker = brokerPartitionInfo.GetBrokerInfo(testBroker.Id);
+ var testBroker = prodConfigNotZk.Brokers[0];
+ Broker broker = brokerPartitionInfo.GetBrokerInfo(testBroker.BrokerId);
Assert.NotNull(broker);
- Assert.AreEqual(testBroker.Address, broker.Host);
+ Assert.AreEqual(testBroker.Host, broker.Host);
Assert.AreEqual(testBroker.Port, broker.Port);
}
}
@@ -90,14 +82,15 @@ namespace Kafka.Client.IntegrationTests
[Test]
public void WhenNewTopicIsAddedBrokerTopicsListenerCreatesNewMapping()
{
- var producerConfig = new ProducerConfig(clientConfig);
+ var prodConfig = this.ZooKeeperBasedSyncProdConfig;
+
IDictionary<string, SortedSet<Partition>> mappings;
IDictionary<int, Broker> brokers;
string topicPath = ZooKeeperClient.DefaultBrokerTopicsPath + "/" + CurrentTestTopic;
using (IZooKeeperClient client = new ZooKeeperClient(
- producerConfig.ZkConnect,
- producerConfig.ZkSessionTimeoutMs,
+ prodConfig.ZooKeeper.ZkConnect,
+ prodConfig.ZooKeeper.ZkSessionTimeoutMs,
ZooKeeperStringSerializer.Serializer))
{
using (var brokerPartitionInfo = new ZKBrokerPartitionInfo(client))
@@ -114,8 +107,8 @@ namespace Kafka.Client.IntegrationTests
Assert.NotNull(mappings);
Assert.Greater(mappings.Count, 0);
using (IZooKeeperClient client = new ZooKeeperClient(
- producerConfig.ZkConnect,
- producerConfig.ZkSessionTimeoutMs,
+ prodConfig.ZooKeeper.ZkConnect,
+ prodConfig.ZooKeeper.ZkSessionTimeoutMs,
ZooKeeperStringSerializer.Serializer))
{
client.Connect();
@@ -125,6 +118,7 @@ namespace Kafka.Client.IntegrationTests
client.CreatePersistent(topicPath, true);
WaitUntillIdle(client, 500);
client.UnsubscribeAll();
+ WaitUntillIdle(client, 500);
client.DeleteRecursive(topicPath);
}
@@ -134,13 +128,14 @@ namespace Kafka.Client.IntegrationTests
[Test]
public void WhenNewBrokerIsAddedBrokerTopicsListenerUpdatesBrokersList()
{
- var producerConfig = new ProducerConfig(clientConfig);
+ var prodConfig = this.ZooKeeperBasedSyncProdConfig;
+
IDictionary<string, SortedSet<Partition>> mappings;
IDictionary<int, Broker> brokers;
string brokerPath = ZooKeeperClient.DefaultBrokerIdsPath + "/" + 2345;
using (IZooKeeperClient client = new ZooKeeperClient(
- producerConfig.ZkConnect,
- producerConfig.ZkSessionTimeoutMs,
+ prodConfig.ZooKeeper.ZkConnect,
+ prodConfig.ZooKeeper.ZkSessionTimeoutMs,
ZooKeeperStringSerializer.Serializer))
{
using (var brokerPartitionInfo = new ZKBrokerPartitionInfo(client))
@@ -157,18 +152,20 @@ namespace Kafka.Client.IntegrationTests
Assert.NotNull(mappings);
Assert.Greater(mappings.Count, 0);
using (IZooKeeperClient client = new ZooKeeperClient(
- producerConfig.ZkConnect,
- producerConfig.ZkSessionTimeoutMs,
+ prodConfig.ZooKeeper.ZkConnect,
+ prodConfig.ZooKeeper.ZkSessionTimeoutMs,
ZooKeeperStringSerializer.Serializer))
{
client.Connect();
WaitUntillIdle(client, 500);
var brokerTopicsListener = new BrokerTopicsListener(client, mappings, brokers, null);
client.Subscribe(ZooKeeperClient.DefaultBrokerIdsPath, brokerTopicsListener);
+ WaitUntillIdle(client, 500);
client.CreatePersistent(brokerPath, true);
client.WriteData(brokerPath, "192.168.1.39-1310449279123:192.168.1.39:9102");
WaitUntillIdle(client, 500);
client.UnsubscribeAll();
+ WaitUntillIdle(client, 500);
client.DeleteRecursive(brokerPath);
}
@@ -181,13 +178,14 @@ namespace Kafka.Client.IntegrationTests
[Test]
public void WhenBrokerIsRemovedBrokerTopicsListenerUpdatesBrokersList()
{
- var producerConfig = new ProducerConfig(clientConfig);
+ var prodConfig = this.ZooKeeperBasedSyncProdConfig;
+
IDictionary<string, SortedSet<Partition>> mappings;
IDictionary<int, Broker> brokers;
string brokerPath = ZooKeeperClient.DefaultBrokerIdsPath + "/" + 2345;
using (IZooKeeperClient client = new ZooKeeperClient(
- producerConfig.ZkConnect,
- producerConfig.ZkSessionTimeoutMs,
+ prodConfig.ZooKeeper.ZkConnect,
+ prodConfig.ZooKeeper.ZkSessionTimeoutMs,
ZooKeeperStringSerializer.Serializer))
{
using (var brokerPartitionInfo = new ZKBrokerPartitionInfo(client))
@@ -204,8 +202,8 @@ namespace Kafka.Client.IntegrationTests
Assert.NotNull(mappings);
Assert.Greater(mappings.Count, 0);
using (IZooKeeperClient client = new ZooKeeperClient(
- producerConfig.ZkConnect,
- producerConfig.ZkSessionTimeoutMs,
+ prodConfig.ZooKeeper.ZkConnect,
+ prodConfig.ZooKeeper.ZkSessionTimeoutMs,
ZooKeeperStringSerializer.Serializer))
{
client.Connect();
@@ -225,15 +223,16 @@ namespace Kafka.Client.IntegrationTests
[Test]
public void WhenNewBrokerInTopicIsAddedBrokerTopicsListenerUpdatesMappings()
{
- var producerConfig = new ProducerConfig(clientConfig);
+ var prodConfig = this.ZooKeeperBasedSyncProdConfig;
+
IDictionary<string, SortedSet<Partition>> mappings;
IDictionary<int, Broker> brokers;
string brokerPath = ZooKeeperClient.DefaultBrokerIdsPath + "/" + 2345;
string topicPath = ZooKeeperClient.DefaultBrokerTopicsPath + "/" + CurrentTestTopic;
string topicBrokerPath = topicPath + "/" + 2345;
using (IZooKeeperClient client = new ZooKeeperClient(
- producerConfig.ZkConnect,
- producerConfig.ZkSessionTimeoutMs,
+ prodConfig.ZooKeeper.ZkConnect,
+ prodConfig.ZooKeeper.ZkSessionTimeoutMs,
ZooKeeperStringSerializer.Serializer))
{
using (var brokerPartitionInfo = new ZKBrokerPartitionInfo(client))
@@ -250,8 +249,8 @@ namespace Kafka.Client.IntegrationTests
Assert.NotNull(mappings);
Assert.Greater(mappings.Count, 0);
using (IZooKeeperClient client = new ZooKeeperClient(
- producerConfig.ZkConnect,
- producerConfig.ZkSessionTimeoutMs,
+ prodConfig.ZooKeeper.ZkConnect,
+ prodConfig.ZooKeeper.ZkSessionTimeoutMs,
ZooKeeperStringSerializer.Serializer))
{
client.Connect();
@@ -269,6 +268,7 @@ namespace Kafka.Client.IntegrationTests
client.WriteData(topicBrokerPath, 5);
WaitUntillIdle(client, 500);
client.UnsubscribeAll();
+ WaitUntillIdle(client, 500);
client.DeleteRecursive(brokerPath);
client.DeleteRecursive(topicPath);
}
@@ -281,55 +281,56 @@ namespace Kafka.Client.IntegrationTests
[Test]
public void WhenSessionIsExpiredListenerRecreatesEphemeralNodes()
{
- {
- var producerConfig = new ProducerConfig(clientConfig);
- IDictionary<string, SortedSet<Partition>> mappings;
- IDictionary<int, Broker> brokers;
- IDictionary<string, SortedSet<Partition>> mappings2;
- IDictionary<int, Broker> brokers2;
- using (IZooKeeperClient client = new ZooKeeperClient(
- producerConfig.ZkConnect,
- producerConfig.ZkSessionTimeoutMs,
+ var prodConfig = this.ZooKeeperBasedSyncProdConfig;
+
+ IDictionary<string, SortedSet<Partition>> mappings;
+ IDictionary<int, Broker> brokers;
+ IDictionary<string, SortedSet<Partition>> mappings2;
+ IDictionary<int, Broker> brokers2;
+ using (
+ IZooKeeperClient client = new ZooKeeperClient(
+ prodConfig.ZooKeeper.ZkConnect,
+ prodConfig.ZooKeeper.ZkSessionTimeoutMs,
ZooKeeperStringSerializer.Serializer))
+ {
+ using (var brokerPartitionInfo = new ZKBrokerPartitionInfo(client))
{
- using (var brokerPartitionInfo = new ZKBrokerPartitionInfo(client))
- {
- brokers = brokerPartitionInfo.GetAllBrokerInfo();
- mappings =
- ReflectionHelper.GetInstanceField<IDictionary<string, SortedSet<Partition>>>(
- "topicBrokerPartitions", brokerPartitionInfo);
- Assert.NotNull(brokers);
- Assert.Greater(brokers.Count, 0);
- Assert.NotNull(mappings);
- Assert.Greater(mappings.Count, 0);
- client.Process(new WatchedEvent(KeeperState.Expired, EventType.None, null));
- WaitUntillIdle(client, 3000);
- brokers2 = brokerPartitionInfo.GetAllBrokerInfo();
- mappings2 =
- ReflectionHelper.GetInstanceField<IDictionary<string, SortedSet<Partition>>>(
- "topicBrokerPartitions", brokerPartitionInfo);
- }
+ brokers = brokerPartitionInfo.GetAllBrokerInfo();
+ mappings =
+ ReflectionHelper.GetInstanceField<IDictionary<string, SortedSet<Partition>>>(
+ "topicBrokerPartitions", brokerPartitionInfo);
+ Assert.NotNull(brokers);
+ Assert.Greater(brokers.Count, 0);
+ Assert.NotNull(mappings);
+ Assert.Greater(mappings.Count, 0);
+ client.Process(new WatchedEvent(KeeperState.Expired, EventType.None, null));
+ WaitUntillIdle(client, 3000);
+ brokers2 = brokerPartitionInfo.GetAllBrokerInfo();
+ mappings2 =
+ ReflectionHelper.GetInstanceField<IDictionary<string, SortedSet<Partition>>>(
+ "topicBrokerPartitions", brokerPartitionInfo);
}
-
- Assert.NotNull(brokers2);
- Assert.Greater(brokers2.Count, 0);
- Assert.NotNull(mappings2);
- Assert.Greater(mappings2.Count, 0);
- Assert.AreEqual(brokers.Count, brokers2.Count);
- Assert.AreEqual(mappings.Count, mappings2.Count);
}
+
+ Assert.NotNull(brokers2);
+ Assert.Greater(brokers2.Count, 0);
+ Assert.NotNull(mappings2);
+ Assert.Greater(mappings2.Count, 0);
+ Assert.AreEqual(brokers.Count, brokers2.Count);
+ Assert.AreEqual(mappings.Count, mappings2.Count);
}
[Test]
- public void WhenNewTopicIsAddedZKBrokerPartitionInfoUpdatesMappings()
+ public void WhenNewTopicIsAddedZkBrokerPartitionInfoUpdatesMappings()
{
- var producerConfig = new ProducerConfig(clientConfig);
+ var prodConfig = this.ZooKeeperBasedSyncProdConfig;
+
IDictionary<string, SortedSet<Partition>> mappings;
string topicPath = ZooKeeperClient.DefaultBrokerTopicsPath + "/" + CurrentTestTopic;
using (IZooKeeperClient client = new ZooKeeperClient(
- producerConfig.ZkConnect,
- producerConfig.ZkSessionTimeoutMs,
+ prodConfig.ZooKeeper.ZkConnect,
+ prodConfig.ZooKeeper.ZkSessionTimeoutMs,
ZooKeeperStringSerializer.Serializer))
{
using (var brokerPartitionInfo = new ZKBrokerPartitionInfo(client))
@@ -340,6 +341,7 @@ namespace Kafka.Client.IntegrationTests
client.CreatePersistent(topicPath, true);
WaitUntillIdle(client, 500);
client.UnsubscribeAll();
+ WaitUntillIdle(client, 500);
client.DeleteRecursive(topicPath);
}
}
@@ -350,14 +352,15 @@ namespace Kafka.Client.IntegrationTests
}
[Test]
- public void WhenNewBrokerIsAddedZKBrokerPartitionInfoUpdatesBrokersList()
+ public void WhenNewBrokerIsAddedZkBrokerPartitionInfoUpdatesBrokersList()
{
- var producerConfig = new ProducerConfig(clientConfig);
+ var prodConfig = this.ZooKeeperBasedSyncProdConfig;
+
IDictionary<int, Broker> brokers;
string brokerPath = ZooKeeperClient.DefaultBrokerIdsPath + "/" + 2345;
using (IZooKeeperClient client = new ZooKeeperClient(
- producerConfig.ZkConnect,
- producerConfig.ZkSessionTimeoutMs,
+ prodConfig.ZooKeeper.ZkConnect,
+ prodConfig.ZooKeeper.ZkSessionTimeoutMs,
ZooKeeperStringSerializer.Serializer))
{
using (var brokerPartitionInfo = new ZKBrokerPartitionInfo(client))
@@ -367,6 +370,7 @@ namespace Kafka.Client.IntegrationTests
client.WriteData(brokerPath, "192.168.1.39-1310449279123:192.168.1.39:9102");
WaitUntillIdle(client, 500);
client.UnsubscribeAll();
+ WaitUntillIdle(client, 500);
client.DeleteRecursive(brokerPath);
}
}
@@ -380,18 +384,20 @@ namespace Kafka.Client.IntegrationTests
}
[Test]
- public void WhenBrokerIsRemovedZKBrokerPartitionInfoUpdatesBrokersList()
+ public void WhenBrokerIsRemovedZkBrokerPartitionInfoUpdatesBrokersList()
{
- var producerConfig = new ProducerConfig(clientConfig);
+ var prodConfig = this.ZooKeeperBasedSyncProdConfig;
+
IDictionary<int, Broker> brokers;
string brokerPath = ZooKeeperClient.DefaultBrokerIdsPath + "/" + 2345;
using (IZooKeeperClient client = new ZooKeeperClient(
- producerConfig.ZkConnect,
- producerConfig.ZkSessionTimeoutMs,
+ prodConfig.ZooKeeper.ZkConnect,
+ prodConfig.ZooKeeper.ZkSessionTimeoutMs,
ZooKeeperStringSerializer.Serializer))
{
using (var brokerPartitionInfo = new ZKBrokerPartitionInfo(client))
{
+ WaitUntillIdle(client, 500);
brokers = brokerPartitionInfo.GetAllBrokerInfo();
client.CreatePersistent(brokerPath, true);
client.WriteData(brokerPath, "192.168.1.39-1310449279123:192.168.1.39:9102");
@@ -410,17 +416,18 @@ namespace Kafka.Client.IntegrationTests
}
[Test]
- public void WhenNewBrokerInTopicIsAddedZKBrokerPartitionInfoUpdatesMappings()
+ public void WhenNewBrokerInTopicIsAddedZkBrokerPartitionInfoUpdatesMappings()
{
- var producerConfig = new ProducerConfig(clientConfig);
+ var prodConfig = this.ZooKeeperBasedSyncProdConfig;
+
IDictionary<string, SortedSet<Partition>> mappings;
IDictionary<int, Broker> brokers;
string brokerPath = ZooKeeperClient.DefaultBrokerIdsPath + "/" + 2345;
string topicPath = ZooKeeperClient.DefaultBrokerTopicsPath + "/" + CurrentTestTopic;
string topicBrokerPath = topicPath + "/" + 2345;
using (IZooKeeperClient client = new ZooKeeperClient(
- producerConfig.ZkConnect,
- producerConfig.ZkSessionTimeoutMs,
+ prodConfig.ZooKeeper.ZkConnect,
+ prodConfig.ZooKeeper.ZkSessionTimeoutMs,
ZooKeeperStringSerializer.Serializer))
{
using (var brokerPartitionInfo = new ZKBrokerPartitionInfo(client))
@@ -439,6 +446,7 @@ namespace Kafka.Client.IntegrationTests
client.WriteData(topicBrokerPath, 5);
WaitUntillIdle(client, 500);
client.UnsubscribeAll();
+ WaitUntillIdle(client, 500);
client.DeleteRecursive(brokerPath);
client.DeleteRecursive(topicPath);
}
Modified: incubator/kafka/trunk/clients/csharp/src/Kafka/Tests/Kafka.Client.IntegrationTests/ZooKeeperAwareProducerTests.cs
URL: http://svn.apache.org/viewvc/incubator/kafka/trunk/clients/csharp/src/Kafka/Tests/Kafka.Client.IntegrationTests/ZooKeeperAwareProducerTests.cs?rev=1185772&r1=1185771&r2=1185772&view=diff
==============================================================================
--- incubator/kafka/trunk/clients/csharp/src/Kafka/Tests/Kafka.Client.IntegrationTests/ZooKeeperAwareProducerTests.cs (original)
+++ incubator/kafka/trunk/clients/csharp/src/Kafka/Tests/Kafka.Client.IntegrationTests/ZooKeeperAwareProducerTests.cs Tue Oct 18 17:52:13 2011
@@ -33,44 +33,34 @@ namespace Kafka.Client.IntegrationTests
public class ZooKeeperAwareProducerTests : IntegrationFixtureBase
{
/// <summary>
- /// Kafka Client configuration
- /// </summary>
- private KafkaClientConfiguration clientConfig;
-
- /// <summary>
/// Maximum amount of time to wait trying to get a specific test message from Kafka server (in miliseconds)
/// </summary>
- private readonly int MaxTestWaitTimeInMiliseconds = 5000;
-
- [TestFixtureSetUp]
- public void SetUp()
- {
- clientConfig = KafkaClientConfiguration.GetConfiguration();
- }
+ private readonly int maxTestWaitTimeInMiliseconds = 5000;
[Test]
- public void ZKAwareProducerSends1Message()
+ public void ZkAwareProducerSends1Message()
{
+ var prodConfig = this.ZooKeeperBasedSyncProdConfig;
+
int totalWaitTimeInMiliseconds = 0;
int waitSingle = 100;
var originalMessage = new Message(Encoding.UTF8.GetBytes("TestData"));
var multipleBrokersHelper = new TestMultipleBrokersHelper(CurrentTestTopic);
- multipleBrokersHelper.GetCurrentOffsets();
+ multipleBrokersHelper.GetCurrentOffsets(new[] { this.SyncProducerConfig1, this.SyncProducerConfig2, this.SyncProducerConfig3 });
- var producerConfig = new ProducerConfig(clientConfig);
var mockPartitioner = new MockAlwaysZeroPartitioner();
- using (var producer = new Producer<string, Message>(producerConfig, mockPartitioner, new DefaultEncoder()))
+ using (var producer = new Producer<string, Message>(prodConfig, mockPartitioner, new DefaultEncoder()))
{
var producerData = new ProducerData<string, Message>(
- CurrentTestTopic, "somekey", new List<Message>() { originalMessage });
+ CurrentTestTopic, "somekey", new List<Message> { originalMessage });
producer.Send(producerData);
- while (!multipleBrokersHelper.CheckIfAnyBrokerHasChanged())
+ while (!multipleBrokersHelper.CheckIfAnyBrokerHasChanged(new[] { this.SyncProducerConfig1, this.SyncProducerConfig2, this.SyncProducerConfig3 }))
{
totalWaitTimeInMiliseconds += waitSingle;
Thread.Sleep(waitSingle);
- if (totalWaitTimeInMiliseconds > MaxTestWaitTimeInMiliseconds)
+ if (totalWaitTimeInMiliseconds > this.maxTestWaitTimeInMiliseconds)
{
Assert.Fail("None of the brokers changed their offset after sending a message");
}
@@ -78,13 +68,11 @@ namespace Kafka.Client.IntegrationTests
totalWaitTimeInMiliseconds = 0;
- var consumerConfig = new ConsumerConfig(clientConfig)
- {
- Host = multipleBrokersHelper.BrokerThatHasChanged.Address,
- Port = multipleBrokersHelper.BrokerThatHasChanged.Port
- };
- IConsumer consumer = new Consumers.Consumer(consumerConfig);
- var request = new FetchRequest(CurrentTestTopic, 0, multipleBrokersHelper.OffsetFromBeforeTheChange);
+ var consumerConfig = new ConsumerConfiguration(
+ multipleBrokersHelper.BrokerThatHasChanged.Host,
+ multipleBrokersHelper.BrokerThatHasChanged.Port);
+ IConsumer consumer = new Consumer(consumerConfig);
+ var request = new FetchRequest(CurrentTestTopic, multipleBrokersHelper.PartitionThatHasChanged, multipleBrokersHelper.OffsetFromBeforeTheChange);
BufferedMessageSet response;
@@ -98,7 +86,7 @@ namespace Kafka.Client.IntegrationTests
}
totalWaitTimeInMiliseconds += waitSingle;
- if (totalWaitTimeInMiliseconds >= MaxTestWaitTimeInMiliseconds)
+ if (totalWaitTimeInMiliseconds >= this.maxTestWaitTimeInMiliseconds)
{
break;
}
@@ -113,6 +101,7 @@ namespace Kafka.Client.IntegrationTests
[Test]
public void ZkAwareProducerSends3Messages()
{
+ var prodConfig = this.ZooKeeperBasedSyncProdConfig;
int totalWaitTimeInMiliseconds = 0;
int waitSingle = 100;
var originalMessage1 = new Message(Encoding.UTF8.GetBytes("TestData1"));
@@ -121,20 +110,19 @@ namespace Kafka.Client.IntegrationTests
var originalMessageList = new List<Message> { originalMessage1, originalMessage2, originalMessage3 };
var multipleBrokersHelper = new TestMultipleBrokersHelper(CurrentTestTopic);
- multipleBrokersHelper.GetCurrentOffsets();
+ multipleBrokersHelper.GetCurrentOffsets(new[] { this.SyncProducerConfig1, this.SyncProducerConfig2, this.SyncProducerConfig3 });
- var producerConfig = new ProducerConfig(clientConfig);
var mockPartitioner = new MockAlwaysZeroPartitioner();
- using (var producer = new Producer<string, Message>(producerConfig, mockPartitioner, new DefaultEncoder()))
+ using (var producer = new Producer<string, Message>(prodConfig, mockPartitioner, new DefaultEncoder()))
{
var producerData = new ProducerData<string, Message>(CurrentTestTopic, "somekey", originalMessageList);
producer.Send(producerData);
- while (!multipleBrokersHelper.CheckIfAnyBrokerHasChanged())
+ while (!multipleBrokersHelper.CheckIfAnyBrokerHasChanged(new[] { this.SyncProducerConfig1, this.SyncProducerConfig2, this.SyncProducerConfig3 }))
{
totalWaitTimeInMiliseconds += waitSingle;
Thread.Sleep(waitSingle);
- if (totalWaitTimeInMiliseconds > MaxTestWaitTimeInMiliseconds)
+ if (totalWaitTimeInMiliseconds > this.maxTestWaitTimeInMiliseconds)
{
Assert.Fail("None of the brokers changed their offset after sending a message");
}
@@ -142,12 +130,10 @@ namespace Kafka.Client.IntegrationTests
totalWaitTimeInMiliseconds = 0;
- var consumerConfig = new ConsumerConfig(clientConfig)
- {
- Host = multipleBrokersHelper.BrokerThatHasChanged.Address,
- Port = multipleBrokersHelper.BrokerThatHasChanged.Port
- };
- IConsumer consumer = new Consumers.Consumer(consumerConfig);
+ var consumerConfig = new ConsumerConfiguration(
+ multipleBrokersHelper.BrokerThatHasChanged.Host,
+ multipleBrokersHelper.BrokerThatHasChanged.Port);
+ IConsumer consumer = new Consumer(consumerConfig);
var request = new FetchRequest(CurrentTestTopic, 0, multipleBrokersHelper.OffsetFromBeforeTheChange);
BufferedMessageSet response;
@@ -161,7 +147,7 @@ namespace Kafka.Client.IntegrationTests
}
totalWaitTimeInMiliseconds += waitSingle;
- if (totalWaitTimeInMiliseconds >= MaxTestWaitTimeInMiliseconds)
+ if (totalWaitTimeInMiliseconds >= this.maxTestWaitTimeInMiliseconds)
{
break;
}
@@ -178,26 +164,27 @@ namespace Kafka.Client.IntegrationTests
[Test]
public void ZkAwareProducerSends1MessageUsingNotDefaultEncoder()
{
+ var prodConfig = this.ZooKeeperBasedSyncProdConfig;
+
int totalWaitTimeInMiliseconds = 0;
int waitSingle = 100;
string originalMessage = "TestData";
var multipleBrokersHelper = new TestMultipleBrokersHelper(CurrentTestTopic);
- multipleBrokersHelper.GetCurrentOffsets();
+ multipleBrokersHelper.GetCurrentOffsets(new[] { this.SyncProducerConfig1, this.SyncProducerConfig2, this.SyncProducerConfig3 });
- var producerConfig = new ProducerConfig(clientConfig);
var mockPartitioner = new MockAlwaysZeroPartitioner();
- using (var producer = new Producer<string, string>(producerConfig, mockPartitioner, new StringEncoder(), null))
+ using (var producer = new Producer<string, string>(prodConfig, mockPartitioner, new StringEncoder(), null))
{
var producerData = new ProducerData<string, string>(
CurrentTestTopic, "somekey", new List<string> { originalMessage });
producer.Send(producerData);
- while (!multipleBrokersHelper.CheckIfAnyBrokerHasChanged())
+ while (!multipleBrokersHelper.CheckIfAnyBrokerHasChanged(new[] { this.SyncProducerConfig1, this.SyncProducerConfig2, this.SyncProducerConfig3 }))
{
totalWaitTimeInMiliseconds += waitSingle;
Thread.Sleep(waitSingle);
- if (totalWaitTimeInMiliseconds > MaxTestWaitTimeInMiliseconds)
+ if (totalWaitTimeInMiliseconds > this.maxTestWaitTimeInMiliseconds)
{
Assert.Fail("None of the brokers changed their offset after sending a message");
}
@@ -205,12 +192,10 @@ namespace Kafka.Client.IntegrationTests
totalWaitTimeInMiliseconds = 0;
- var consumerConfig = new ConsumerConfig(clientConfig)
- {
- Host = multipleBrokersHelper.BrokerThatHasChanged.Address,
- Port = multipleBrokersHelper.BrokerThatHasChanged.Port
- };
- IConsumer consumer = new Consumers.Consumer(consumerConfig);
+ var consumerConfig = new ConsumerConfiguration(
+ multipleBrokersHelper.BrokerThatHasChanged.Host,
+ multipleBrokersHelper.BrokerThatHasChanged.Port);
+ IConsumer consumer = new Consumer(consumerConfig);
var request = new FetchRequest(CurrentTestTopic, 0, multipleBrokersHelper.OffsetFromBeforeTheChange);
BufferedMessageSet response;
@@ -224,7 +209,7 @@ namespace Kafka.Client.IntegrationTests
}
totalWaitTimeInMiliseconds += waitSingle;
- if (totalWaitTimeInMiliseconds >= MaxTestWaitTimeInMiliseconds)
+ if (totalWaitTimeInMiliseconds >= this.maxTestWaitTimeInMiliseconds)
{
break;
}
Modified: incubator/kafka/trunk/clients/csharp/src/Kafka/Tests/Kafka.Client.IntegrationTests/ZooKeeperClientTests.cs
URL: http://svn.apache.org/viewvc/incubator/kafka/trunk/clients/csharp/src/Kafka/Tests/Kafka.Client.IntegrationTests/ZooKeeperClientTests.cs?rev=1185772&r1=1185771&r2=1185772&view=diff
==============================================================================
--- incubator/kafka/trunk/clients/csharp/src/Kafka/Tests/Kafka.Client.IntegrationTests/ZooKeeperClientTests.cs (original)
+++ incubator/kafka/trunk/clients/csharp/src/Kafka/Tests/Kafka.Client.IntegrationTests/ZooKeeperClientTests.cs Tue Oct 18 17:52:13 2011
@@ -35,16 +35,8 @@ namespace Kafka.Client.IntegrationTests
internal class ZooKeeperClientTests : IntegrationFixtureBase, IZooKeeperDataListener, IZooKeeperStateListener, IZooKeeperChildListener
{
private static readonly ILog Logger = LogManager.GetLogger(MethodBase.GetCurrentMethod().DeclaringType);
-
- private KafkaClientConfiguration clientConfig;
private readonly IList<ZooKeeperEventArgs> events = new List<ZooKeeperEventArgs>();
- [TestFixtureSetUp]
- public void SetUp()
- {
- clientConfig = KafkaClientConfiguration.GetConfiguration();
- }
-
[SetUp]
public void TestSetup()
{
@@ -54,8 +46,12 @@ namespace Kafka.Client.IntegrationTests
[Test]
public void ZooKeeperClientCreateWorkerThreadsOnBeingCreated()
{
- var producerConfig = new ProducerConfig(clientConfig);
- using (IZooKeeperClient client = new ZooKeeperClient(producerConfig.ZkConnect, producerConfig.ZkSessionTimeoutMs, ZooKeeperStringSerializer.Serializer))
+ var prodConfig = this.ZooKeeperBasedSyncProdConfig;
+
+ using (IZooKeeperClient client = new ZooKeeperClient(
+ prodConfig.ZooKeeper.ZkConnect,
+ prodConfig.ZooKeeper.ZkSessionTimeoutMs,
+ ZooKeeperStringSerializer.Serializer))
{
client.Connect();
var eventWorker = ReflectionHelper.GetInstanceField<Thread>("eventWorker", client);
@@ -68,8 +64,12 @@ namespace Kafka.Client.IntegrationTests
[Test]
public void ZooKeeperClientFailsWhenCreatedWithWrongConnectionInfo()
{
- var producerConfig = new ProducerConfig(clientConfig);
- using (IZooKeeperClient client = new ZooKeeperClient("random text", producerConfig.ZkSessionTimeoutMs, ZooKeeperStringSerializer.Serializer))
+ var prodConfig = this.ZooKeeperBasedSyncProdConfig;
+
+ using (IZooKeeperClient client = new ZooKeeperClient(
+ "random text",
+ prodConfig.ZooKeeper.ZkSessionTimeoutMs,
+ ZooKeeperStringSerializer.Serializer))
{
Assert.Throws<FormatException>(client.Connect);
}
@@ -78,8 +78,12 @@ namespace Kafka.Client.IntegrationTests
[Test]
public void WhenStateChangedToConnectedStateListenerFires()
{
- var producerConfig = new ProducerConfig(clientConfig);
- using (IZooKeeperClient client = new ZooKeeperClient(producerConfig.ZkConnect, producerConfig.ZkSessionTimeoutMs, ZooKeeperStringSerializer.Serializer))
+ var prodConfig = this.ZooKeeperBasedSyncProdConfig;
+
+ using (IZooKeeperClient client = new ZooKeeperClient(
+ prodConfig.ZooKeeper.ZkConnect,
+ prodConfig.ZooKeeper.ZkSessionTimeoutMs,
+ ZooKeeperStringSerializer.Serializer))
{
client.Subscribe(this);
client.Connect();
@@ -96,8 +100,12 @@ namespace Kafka.Client.IntegrationTests
[Test]
public void WhenStateChangedToDisconnectedStateListenerFires()
{
- var producerConfig = new ProducerConfig(clientConfig);
- using (IZooKeeperClient client = new ZooKeeperClient(producerConfig.ZkConnect, producerConfig.ZkSessionTimeoutMs, ZooKeeperStringSerializer.Serializer))
+ var prodConfig = this.ZooKeeperBasedSyncProdConfig;
+
+ using (IZooKeeperClient client = new ZooKeeperClient(
+ prodConfig.ZooKeeper.ZkConnect,
+ prodConfig.ZooKeeper.ZkSessionTimeoutMs,
+ ZooKeeperStringSerializer.Serializer))
{
client.Subscribe(this);
client.Connect();
@@ -116,8 +124,12 @@ namespace Kafka.Client.IntegrationTests
[Test]
public void WhenStateChangedToExpiredStateAndSessionListenersFire()
{
- var producerConfig = new ProducerConfig(clientConfig);
- using (IZooKeeperClient client = new ZooKeeperClient(producerConfig.ZkConnect, producerConfig.ZkSessionTimeoutMs, ZooKeeperStringSerializer.Serializer))
+ var prodConfig = this.ZooKeeperBasedSyncProdConfig;
+
+ using (IZooKeeperClient client = new ZooKeeperClient(
+ prodConfig.ZooKeeper.ZkConnect,
+ prodConfig.ZooKeeper.ZkSessionTimeoutMs,
+ ZooKeeperStringSerializer.Serializer))
{
client.Subscribe(this);
client.Connect();
@@ -143,10 +155,14 @@ namespace Kafka.Client.IntegrationTests
[Test]
public void WhenSessionExpiredClientReconnects()
{
- var producerConfig = new ProducerConfig(clientConfig);
+ var prodConfig = this.ZooKeeperBasedSyncProdConfig;
+
IZooKeeperConnection conn1;
IZooKeeperConnection conn2;
- using (IZooKeeperClient client = new ZooKeeperClient(producerConfig.ZkConnect, producerConfig.ZkSessionTimeoutMs, ZooKeeperStringSerializer.Serializer))
+ using (IZooKeeperClient client = new ZooKeeperClient(
+ prodConfig.ZooKeeper.ZkConnect,
+ prodConfig.ZooKeeper.ZkSessionTimeoutMs,
+ ZooKeeperStringSerializer.Serializer))
{
client.Connect();
conn1 = ReflectionHelper.GetInstanceField<ZooKeeperConnection>("connection", client);
@@ -161,8 +177,12 @@ namespace Kafka.Client.IntegrationTests
[Test]
public void ZooKeeperClientChecksIfPathExists()
{
- var producerConfig = new ProducerConfig(clientConfig);
- using (IZooKeeperClient client = new ZooKeeperClient(producerConfig.ZkConnect, producerConfig.ZkSessionTimeoutMs, ZooKeeperStringSerializer.Serializer))
+ var prodConfig = this.ZooKeeperBasedSyncProdConfig;
+
+ using (IZooKeeperClient client = new ZooKeeperClient(
+ prodConfig.ZooKeeper.ZkConnect,
+ prodConfig.ZooKeeper.ZkSessionTimeoutMs,
+ ZooKeeperStringSerializer.Serializer))
{
client.Connect();
Assert.IsTrue(client.Exists(ZooKeeperClient.DefaultBrokerTopicsPath, false));
@@ -172,8 +192,12 @@ namespace Kafka.Client.IntegrationTests
[Test]
public void ZooKeeperClientCreatesANewPathAndDeletesIt()
{
- var producerConfig = new ProducerConfig(clientConfig);
- using (IZooKeeperClient client = new ZooKeeperClient(producerConfig.ZkConnect, producerConfig.ZkSessionTimeoutMs, ZooKeeperStringSerializer.Serializer))
+ var prodConfig = this.ZooKeeperBasedSyncProdConfig;
+
+ using (IZooKeeperClient client = new ZooKeeperClient(
+ prodConfig.ZooKeeper.ZkConnect,
+ prodConfig.ZooKeeper.ZkSessionTimeoutMs,
+ ZooKeeperStringSerializer.Serializer))
{
client.Connect();
string myPath = "/" + Guid.NewGuid();
@@ -187,9 +211,13 @@ namespace Kafka.Client.IntegrationTests
[Test]
public void WhenChildIsCreatedChilListenerOnParentFires()
{
+ var prodConfig = this.ZooKeeperBasedSyncProdConfig;
+
string myPath = "/" + Guid.NewGuid();
- var producerConfig = new ProducerConfig(clientConfig);
- using (IZooKeeperClient client = new ZooKeeperClient(producerConfig.ZkConnect, producerConfig.ZkSessionTimeoutMs, ZooKeeperStringSerializer.Serializer))
+ using (IZooKeeperClient client = new ZooKeeperClient(
+ prodConfig.ZooKeeper.ZkConnect,
+ prodConfig.ZooKeeper.ZkSessionTimeoutMs,
+ ZooKeeperStringSerializer.Serializer))
{
client.Connect();
WaitUntillIdle(client, 500);
@@ -212,9 +240,13 @@ namespace Kafka.Client.IntegrationTests
[Test]
public void WhenChildIsDeletedChildListenerOnParentFires()
{
+ var prodConfig = this.ZooKeeperBasedSyncProdConfig;
+
string myPath = "/" + Guid.NewGuid();
- var producerConfig = new ProducerConfig(clientConfig);
- using (IZooKeeperClient client = new ZooKeeperClient(producerConfig.ZkConnect, producerConfig.ZkSessionTimeoutMs, ZooKeeperStringSerializer.Serializer))
+ using (IZooKeeperClient client = new ZooKeeperClient(
+ prodConfig.ZooKeeper.ZkConnect,
+ prodConfig.ZooKeeper.ZkSessionTimeoutMs,
+ ZooKeeperStringSerializer.Serializer))
{
client.Connect();
client.CreatePersistent(myPath, true);
@@ -236,9 +268,13 @@ namespace Kafka.Client.IntegrationTests
[Test]
public void WhenZNodeIsDeletedChildAndDataDeletedListenersFire()
{
- var producerConfig = new ProducerConfig(clientConfig);
+ var prodConfig = this.ZooKeeperBasedSyncProdConfig;
+
string myPath = "/" + Guid.NewGuid();
- using (IZooKeeperClient client = new ZooKeeperClient(producerConfig.ZkConnect, producerConfig.ZkSessionTimeoutMs, ZooKeeperStringSerializer.Serializer))
+ using (IZooKeeperClient client = new ZooKeeperClient(
+ prodConfig.ZooKeeper.ZkConnect,
+ prodConfig.ZooKeeper.ZkSessionTimeoutMs,
+ ZooKeeperStringSerializer.Serializer))
{
client.Connect();
client.CreatePersistent(myPath, true);
@@ -265,8 +301,12 @@ namespace Kafka.Client.IntegrationTests
[Test]
public void ZooKeeperClientCreatesAChildAndGetsChildren()
{
- var producerConfig = new ProducerConfig(clientConfig);
- using (IZooKeeperClient client = new ZooKeeperClient(producerConfig.ZkConnect, producerConfig.ZkSessionTimeoutMs, ZooKeeperStringSerializer.Serializer))
+ var prodConfig = this.ZooKeeperBasedSyncProdConfig;
+
+ using (IZooKeeperClient client = new ZooKeeperClient(
+ prodConfig.ZooKeeper.ZkConnect,
+ prodConfig.ZooKeeper.ZkSessionTimeoutMs,
+ ZooKeeperStringSerializer.Serializer))
{
client.Connect();
string child = Guid.NewGuid().ToString();
@@ -284,11 +324,15 @@ namespace Kafka.Client.IntegrationTests
[Test]
public void WhenDataChangedDataListenerFires()
{
- var producerConfig = new ProducerConfig(clientConfig);
+ var prodConfig = this.ZooKeeperBasedSyncProdConfig;
+
string myPath = "/" + Guid.NewGuid();
string sourceData = "my test data";
string resultData;
- using (IZooKeeperClient client = new ZooKeeperClient(producerConfig.ZkConnect, producerConfig.ZkSessionTimeoutMs, ZooKeeperStringSerializer.Serializer))
+ using (IZooKeeperClient client = new ZooKeeperClient(
+ prodConfig.ZooKeeper.ZkConnect,
+ prodConfig.ZooKeeper.ZkSessionTimeoutMs,
+ ZooKeeperStringSerializer.Serializer))
{
client.Connect();
client.CreatePersistent(myPath, true);
@@ -317,11 +361,12 @@ namespace Kafka.Client.IntegrationTests
[ExpectedException(typeof(ZooKeeperException))]
public void WhenClientWillNotConnectWithinGivenTimeThrows()
{
- var producerConfig = new ProducerConfig(clientConfig);
+ var prodConfig = this.ZooKeeperBasedSyncProdConfig;
+
using (IZooKeeperClient client =
new ZooKeeperClient(
- producerConfig.ZkConnect,
- producerConfig.ZkSessionTimeoutMs,
+ prodConfig.ZooKeeper.ZkConnect,
+ prodConfig.ZooKeeper.ZkSessionTimeoutMs,
ZooKeeperStringSerializer.Serializer,
1))
{
Modified: incubator/kafka/trunk/clients/csharp/src/Kafka/Tests/Kafka.Client.IntegrationTests/ZooKeeperConnectionTests.cs
URL: http://svn.apache.org/viewvc/incubator/kafka/trunk/clients/csharp/src/Kafka/Tests/Kafka.Client.IntegrationTests/ZooKeeperConnectionTests.cs?rev=1185772&r1=1185771&r2=1185772&view=diff
==============================================================================
--- incubator/kafka/trunk/clients/csharp/src/Kafka/Tests/Kafka.Client.IntegrationTests/ZooKeeperConnectionTests.cs (original)
+++ incubator/kafka/trunk/clients/csharp/src/Kafka/Tests/Kafka.Client.IntegrationTests/ZooKeeperConnectionTests.cs Tue Oct 18 17:52:13 2011
@@ -25,21 +25,14 @@ namespace Kafka.Client.IntegrationTests
using ZooKeeperNet;
[TestFixture]
- public class ZooKeeperConnectionTests
+ public class ZooKeeperConnectionTests : IntegrationFixtureBase
{
- private KafkaClientConfiguration clientConfig;
-
- [TestFixtureSetUp]
- public void SetUp()
- {
- clientConfig = KafkaClientConfiguration.GetConfiguration();
- }
-
[Test]
public void ZooKeeperConnectionCreatesAndDeletesPath()
{
- var producerConfig = new ProducerConfig(clientConfig);
- using (IZooKeeperConnection connection = new ZooKeeperConnection(producerConfig.ZkConnect))
+ var prodConfig = this.ZooKeeperBasedSyncProdConfig;
+
+ using (IZooKeeperConnection connection = new ZooKeeperConnection(prodConfig.ZooKeeper.ZkConnect))
{
connection.Connect(null);
string pathName = "/" + Guid.NewGuid();
@@ -53,9 +46,10 @@ namespace Kafka.Client.IntegrationTests
[Test]
public void ZooKeeperConnectionConnectsAndDisposes()
{
- var producerConfig = new ProducerConfig(clientConfig);
+ var prodConfig = this.ZooKeeperBasedSyncProdConfig;
+
IZooKeeperConnection connection;
- using (connection = new ZooKeeperConnection(producerConfig.ZkConnect))
+ using (connection = new ZooKeeperConnection(prodConfig.ZooKeeper.ZkConnect))
{
Assert.IsNull(connection.ClientState);
connection.Connect(null);
@@ -69,8 +63,9 @@ namespace Kafka.Client.IntegrationTests
[Test]
public void ZooKeeperConnectionCreatesAndGetsCreateTime()
{
- var producerConfig = new ProducerConfig(clientConfig);
- using (IZooKeeperConnection connection = new ZooKeeperConnection(producerConfig.ZkConnect))
+ var prodConfig = this.ZooKeeperBasedSyncProdConfig;
+
+ using (IZooKeeperConnection connection = new ZooKeeperConnection(prodConfig.ZooKeeper.ZkConnect))
{
connection.Connect(null);
string pathName = "/" + Guid.NewGuid();
@@ -84,8 +79,9 @@ namespace Kafka.Client.IntegrationTests
[Test]
public void ZooKeeperConnectionCreatesAndGetsChildren()
{
- var producerConfig = new ProducerConfig(clientConfig);
- using (IZooKeeperConnection connection = new ZooKeeperConnection(producerConfig.ZkConnect))
+ var prodConfig = this.ZooKeeperBasedSyncProdConfig;
+
+ using (IZooKeeperConnection connection = new ZooKeeperConnection(prodConfig.ZooKeeper.ZkConnect))
{
connection.Connect(null);
string child = Guid.NewGuid().ToString();
@@ -101,14 +97,15 @@ namespace Kafka.Client.IntegrationTests
[Test]
public void ZooKeeperConnectionWritesAndReadsData()
{
- var producerConfig = new ProducerConfig(clientConfig);
- using (IZooKeeperConnection connection = new ZooKeeperConnection(producerConfig.ZkConnect))
+ var prodConfig = this.ZooKeeperBasedSyncProdConfig;
+
+ using (IZooKeeperConnection connection = new ZooKeeperConnection(prodConfig.ZooKeeper.ZkConnect))
{
connection.Connect(null);
string child = Guid.NewGuid().ToString();
string pathName = "/" + child;
connection.Create(pathName, null, CreateMode.Persistent);
- var sourceData = new byte[2] { 1, 2 };
+ var sourceData = new byte[] { 1, 2 };
connection.WriteData(pathName, sourceData);
byte[] resultData = connection.ReadData(pathName, null, false);
Assert.IsNotNull(resultData);
Modified: incubator/kafka/trunk/clients/csharp/src/Kafka/Tests/Kafka.Client.Tests/Kafka.Client.Tests.csproj
URL: http://svn.apache.org/viewvc/incubator/kafka/trunk/clients/csharp/src/Kafka/Tests/Kafka.Client.Tests/Kafka.Client.Tests.csproj?rev=1185772&r1=1185771&r2=1185772&view=diff
==============================================================================
--- incubator/kafka/trunk/clients/csharp/src/Kafka/Tests/Kafka.Client.Tests/Kafka.Client.Tests.csproj (original)
+++ incubator/kafka/trunk/clients/csharp/src/Kafka/Tests/Kafka.Client.Tests/Kafka.Client.Tests.csproj Tue Oct 18 17:52:13 2011
@@ -79,10 +79,12 @@
<HintPath>..\..\..\..\lib\nunit\2.5.9\nunit.framework.dll</HintPath>
</Reference>
<Reference Include="System" />
+ <Reference Include="System.configuration" />
<Reference Include="System.Core" />
<Reference Include="Microsoft.CSharp" />
</ItemGroup>
<ItemGroup>
+ <Compile Include="CompressionTests.cs" />
<Compile Include="MessageSetTests.cs" />
<Compile Include="MessageTests.cs" />
<Compile Include="Producers\PartitioningTests.cs" />
@@ -103,6 +105,14 @@
<ItemGroup>
<Folder Include="ZooKeeper\" />
</ItemGroup>
+ <ItemGroup>
+ <None Include="..\..\..\..\Settings.StyleCop">
+ <Link>Settings.StyleCop</Link>
+ </None>
+ <None Include="App.config">
+ <SubType>Designer</SubType>
+ </None>
+ </ItemGroup>
<Import Project="$(MSBuildToolsPath)\Microsoft.CSharp.targets" />
<Import Project="..\..\..\..\lib\StyleCop\Microsoft.StyleCop.Targets" />
-</Project>
\ No newline at end of file
+</Project>
Modified: incubator/kafka/trunk/clients/csharp/src/Kafka/Tests/Kafka.Client.Tests/MessageSetTests.cs
URL: http://svn.apache.org/viewvc/incubator/kafka/trunk/clients/csharp/src/Kafka/Tests/Kafka.Client.Tests/MessageSetTests.cs?rev=1185772&r1=1185771&r2=1185772&view=diff
==============================================================================
--- incubator/kafka/trunk/clients/csharp/src/Kafka/Tests/Kafka.Client.Tests/MessageSetTests.cs (original)
+++ incubator/kafka/trunk/clients/csharp/src/Kafka/Tests/Kafka.Client.Tests/MessageSetTests.cs Tue Oct 18 17:52:13 2011
@@ -29,12 +29,14 @@ namespace Kafka.Client.Tests
{
private const int MessageLengthPartLength = 4;
private const int MagicNumberPartLength = 1;
+ private const int AttributesPartLength = 1;
private const int ChecksumPartLength = 4;
-
+
private const int MessageLengthPartOffset = 0;
private const int MagicNumberPartOffset = 4;
- private const int ChecksumPartOffset = 5;
- private const int DataPartOffset = 9;
+ private const int AttributesPartOffset = 5;
+ private const int ChecksumPartOffset = 6;
+ private const int DataPartOffset = 10;
[Test]
public void BufferedMessageSetWriteToValidSequence()
@@ -55,9 +57,9 @@ namespace Kafka.Client.Tests
Array.Reverse(messageLength);
}
- Assert.AreEqual(MagicNumberPartLength + ChecksumPartLength + messageBytes.Length, BitConverter.ToInt32(messageLength, 0));
+ Assert.AreEqual(MagicNumberPartLength + AttributesPartLength + ChecksumPartLength + messageBytes.Length, BitConverter.ToInt32(messageLength, 0));
- Assert.AreEqual(0, ms.ToArray()[MagicNumberPartOffset]); // default magic number should be 0
+ Assert.AreEqual(1, ms.ToArray()[MagicNumberPartOffset]); // default magic number should be 1
byte[] checksumPart = new byte[ChecksumPartLength];
Array.Copy(ms.ToArray(), ChecksumPartOffset, checksumPart, 0, ChecksumPartLength);
@@ -68,7 +70,7 @@ namespace Kafka.Client.Tests
Assert.AreEqual(messageBytes, dataPart);
////second message
- int secondMessageOffset = MessageLengthPartLength + MagicNumberPartLength + ChecksumPartLength +
+ int secondMessageOffset = MessageLengthPartLength + MagicNumberPartLength + AttributesPartLength + ChecksumPartLength +
messageBytes.Length;
messageLength = new byte[MessageLengthPartLength];
@@ -78,9 +80,9 @@ namespace Kafka.Client.Tests
Array.Reverse(messageLength);
}
- Assert.AreEqual(MagicNumberPartLength + ChecksumPartLength + messageBytes.Length, BitConverter.ToInt32(messageLength, 0));
+ Assert.AreEqual(MagicNumberPartLength + AttributesPartLength + ChecksumPartLength + messageBytes.Length, BitConverter.ToInt32(messageLength, 0));
- Assert.AreEqual(0, ms.ToArray()[secondMessageOffset + MagicNumberPartOffset]); // default magic number should be 0
+ Assert.AreEqual(1, ms.ToArray()[secondMessageOffset + MagicNumberPartOffset]); // default magic number should be 1
checksumPart = new byte[ChecksumPartLength];
Array.Copy(ms.ToArray(), secondMessageOffset + ChecksumPartOffset, checksumPart, 0, ChecksumPartLength);
@@ -99,7 +101,7 @@ namespace Kafka.Client.Tests
Message msg2 = new Message(messageBytes);
MessageSet messageSet = new BufferedMessageSet(new List<Message>() { msg1, msg2 });
Assert.AreEqual(
- 2 * (MessageLengthPartLength + MagicNumberPartLength + ChecksumPartLength + messageBytes.Length),
+ 2 * (MessageLengthPartLength + MagicNumberPartLength + AttributesPartLength + ChecksumPartLength + messageBytes.Length),
messageSet.SetSize);
}
}
Modified: incubator/kafka/trunk/clients/csharp/src/Kafka/Tests/Kafka.Client.Tests/MessageTests.cs
URL: http://svn.apache.org/viewvc/incubator/kafka/trunk/clients/csharp/src/Kafka/Tests/Kafka.Client.Tests/MessageTests.cs?rev=1185772&r1=1185771&r2=1185772&view=diff
==============================================================================
--- incubator/kafka/trunk/clients/csharp/src/Kafka/Tests/Kafka.Client.Tests/MessageTests.cs (original)
+++ incubator/kafka/trunk/clients/csharp/src/Kafka/Tests/Kafka.Client.Tests/MessageTests.cs Tue Oct 18 17:52:13 2011
@@ -34,8 +34,8 @@ namespace Kafka.Client.Tests
private readonly int ChecksumPartLength = 4;
private readonly int MagicNumberPartOffset = 0;
- private readonly int ChecksumPartOffset = 1;
- private readonly int DataPartOffset = 5;
+ private readonly int ChecksumPartOffset = 2;
+ private readonly int DataPartOffset = 6;
/// <summary>
/// Demonstrates a properly parsed message.
@@ -46,16 +46,18 @@ namespace Kafka.Client.Tests
Crc32Hasher crc32 = new Crc32Hasher();
string payload = "kafka";
- byte magic = 0;
+ byte magic = 1;
+ byte attributes = 0;
byte[] payloadData = Encoding.UTF8.GetBytes(payload);
byte[] payloadSize = BitConverter.GetBytes(payloadData.Length);
byte[] checksum = crc32.ComputeHash(payloadData);
- byte[] messageData = new byte[payloadData.Length + 1 + payloadSize.Length + checksum.Length];
+ byte[] messageData = new byte[payloadData.Length + 2 + payloadSize.Length + checksum.Length];
Buffer.BlockCopy(payloadSize, 0, messageData, 0, payloadSize.Length);
messageData[4] = magic;
- Buffer.BlockCopy(checksum, 0, messageData, payloadSize.Length + 1, checksum.Length);
- Buffer.BlockCopy(payloadData, 0, messageData, payloadSize.Length + 1 + checksum.Length, payloadData.Length);
+ messageData[5] = attributes;
+ Buffer.BlockCopy(checksum, 0, messageData, payloadSize.Length + 2, checksum.Length);
+ Buffer.BlockCopy(payloadData, 0, messageData, payloadSize.Length + 2 + checksum.Length, payloadData.Length);
Message message = Message.ParseFrom(messageData);
@@ -71,22 +73,25 @@ namespace Kafka.Client.Tests
[Test]
public void GetBytesValidSequence()
{
- Message message = new Message(new byte[10], (byte)245);
+ Message message = new Message(new byte[10], CompressionCodecs.NoCompressionCodec);
MemoryStream ms = new MemoryStream();
message.WriteTo(ms);
// len(payload) + 1 + 4
- Assert.AreEqual(15, ms.Length);
+ Assert.AreEqual(16, ms.Length);
// first 4 bytes = the magic number
- Assert.AreEqual((byte)245, ms.ToArray()[0]);
+ Assert.AreEqual((byte)1, ms.ToArray()[0]);
+
+ // attributes
+ Assert.AreEqual((byte)0, ms.ToArray()[1]);
// next 4 bytes = the checksum
- Assert.IsTrue(message.Checksum.SequenceEqual(ms.ToArray().Skip(1).Take(4).ToArray<byte>()));
+ Assert.IsTrue(message.Checksum.SequenceEqual(ms.ToArray().Skip(2).Take(4).ToArray<byte>()));
// remaining bytes = the payload
- Assert.AreEqual(10, ms.ToArray().Skip(5).ToArray<byte>().Length);
+ Assert.AreEqual(10, ms.ToArray().Skip(6).ToArray<byte>().Length);
}
[Test]
@@ -97,12 +102,14 @@ namespace Kafka.Client.Tests
MemoryStream ms = new MemoryStream();
message.WriteTo(ms);
- Assert.AreEqual(0, ms.ToArray()[MagicNumberPartOffset]); // default magic number should be 0
+ Assert.AreEqual(1, ms.ToArray()[MagicNumberPartOffset]); // default magic number should be 1
byte[] checksumPart = new byte[ChecksumPartLength];
Array.Copy(ms.ToArray(), ChecksumPartOffset, checksumPart, 0, ChecksumPartLength);
Assert.AreEqual(Crc32Hasher.Compute(messageBytes), checksumPart);
+ message.ToString();
+
byte[] dataPart = new byte[messageBytes.Length];
Array.Copy(ms.ToArray(), DataPartOffset, dataPart, 0, messageBytes.Length);
Assert.AreEqual(messageBytes, dataPart);
@@ -113,11 +120,11 @@ namespace Kafka.Client.Tests
{
byte[] messageBytes = new byte[] { 1, 2, 3, 4, 5, 6, 7, 8, 9 };
byte[] customChecksum = new byte[] { 3, 4, 5, 6 };
- Message message = new Message(messageBytes, (byte)33, customChecksum);
+ Message message = new Message(messageBytes, customChecksum);
MemoryStream ms = new MemoryStream();
message.WriteTo(ms);
- Assert.AreEqual((byte)33, ms.ToArray()[MagicNumberPartOffset]);
+ Assert.AreEqual((byte)1, ms.ToArray()[MagicNumberPartOffset]);
byte[] checksumPart = new byte[ChecksumPartLength];
Array.Copy(ms.ToArray(), ChecksumPartOffset, checksumPart, 0, ChecksumPartLength);
Modified: incubator/kafka/trunk/clients/csharp/src/Kafka/Tests/Kafka.Client.Tests/Producers/PartitioningTests.cs
URL: http://svn.apache.org/viewvc/incubator/kafka/trunk/clients/csharp/src/Kafka/Tests/Kafka.Client.Tests/Producers/PartitioningTests.cs?rev=1185772&r1=1185771&r2=1185772&view=diff
==============================================================================
--- incubator/kafka/trunk/clients/csharp/src/Kafka/Tests/Kafka.Client.Tests/Producers/PartitioningTests.cs (original)
+++ incubator/kafka/trunk/clients/csharp/src/Kafka/Tests/Kafka.Client.Tests/Producers/PartitioningTests.cs Tue Oct 18 17:52:13 2011
@@ -21,17 +21,20 @@ namespace Kafka.Client.Tests.Producers
using Kafka.Client.Cluster;
using Kafka.Client.Producers.Partitioning;
using NUnit.Framework;
+ using System.Collections.Generic;
[TestFixture]
public class PartitioningTests
{
- private ProducerConfig config;
+ private ProducerConfiguration config;
[TestFixtureSetUp]
public void SetUp()
{
- config = new ProducerConfig();
- config.BrokerPartitionInfo = "1:192.168.0.1:1234,2:192.168.0.2:3456";
+ var brokers = new List<BrokerConfiguration>();
+ brokers.Add(new BrokerConfiguration { BrokerId = 1, Host = "192.168.0.1", Port = 1234 });
+ brokers.Add(new BrokerConfiguration { BrokerId = 2, Host = "192.168.0.2", Port = 3456 });
+ config = new ProducerConfiguration(brokers);
}
[Test]
Modified: incubator/kafka/trunk/clients/csharp/src/Kafka/Tests/Kafka.Client.Tests/Request/MultiProducerRequestTests.cs
URL: http://svn.apache.org/viewvc/incubator/kafka/trunk/clients/csharp/src/Kafka/Tests/Kafka.Client.Tests/Request/MultiProducerRequestTests.cs?rev=1185772&r1=1185771&r2=1185772&view=diff
==============================================================================
--- incubator/kafka/trunk/clients/csharp/src/Kafka/Tests/Kafka.Client.Tests/Request/MultiProducerRequestTests.cs (original)
+++ incubator/kafka/trunk/clients/csharp/src/Kafka/Tests/Kafka.Client.Tests/Request/MultiProducerRequestTests.cs Tue Oct 18 17:52:13 2011
@@ -54,10 +54,10 @@ namespace Kafka.Client.Tests.Request
request.WriteTo(ms);
byte[] bytes = ms.ToArray();
Assert.IsNotNull(bytes);
- Assert.AreEqual(152, bytes.Length);
+ Assert.AreEqual(156, bytes.Length);
// first 4 bytes = the length of the request
- Assert.AreEqual(148, BitConverter.ToInt32(BitWorks.ReverseBytes(bytes.Take(4).ToArray<byte>()), 0));
+ Assert.AreEqual(152, BitConverter.ToInt32(BitWorks.ReverseBytes(bytes.Take(4).ToArray<byte>()), 0));
// next 2 bytes = the RequestType which in this case should be Produce
Assert.AreEqual((short)RequestTypes.MultiProduce, BitConverter.ToInt16(BitWorks.ReverseBytes(bytes.Skip(4).Take(2).ToArray<byte>()), 0));
Modified: incubator/kafka/trunk/clients/csharp/src/Kafka/Tests/Kafka.Client.Tests/Request/ProducerRequestTests.cs
URL: http://svn.apache.org/viewvc/incubator/kafka/trunk/clients/csharp/src/Kafka/Tests/Kafka.Client.Tests/Request/ProducerRequestTests.cs?rev=1185772&r1=1185771&r2=1185772&view=diff
==============================================================================
--- incubator/kafka/trunk/clients/csharp/src/Kafka/Tests/Kafka.Client.Tests/Request/ProducerRequestTests.cs (original)
+++ incubator/kafka/trunk/clients/csharp/src/Kafka/Tests/Kafka.Client.Tests/Request/ProducerRequestTests.cs Tue Oct 18 17:52:13 2011
@@ -50,10 +50,10 @@ namespace Kafka.Client.Tests.Request
byte[] bytes = ms.ToArray();
Assert.IsNotNull(bytes);
- Assert.AreEqual(40, bytes.Length);
+ Assert.AreEqual(41, bytes.Length);
// next 4 bytes = the length of the request
- Assert.AreEqual(36, BitConverter.ToInt32(BitWorks.ReverseBytes(bytes.Take(4).ToArray<byte>()), 0));
+ Assert.AreEqual(37, BitConverter.ToInt32(BitWorks.ReverseBytes(bytes.Take(4).ToArray<byte>()), 0));
// next 2 bytes = the RequestType which in this case should be Produce
Assert.AreEqual((short)RequestTypes.Produce, BitConverter.ToInt16(BitWorks.ReverseBytes(bytes.Skip(4).Take(2).ToArray<byte>()), 0));
@@ -68,10 +68,10 @@ namespace Kafka.Client.Tests.Request
Assert.AreEqual(0, BitConverter.ToInt32(BitWorks.ReverseBytes(bytes.Skip(13).Take(4).ToArray<byte>()), 0));
// next 4 bytes = the length of the individual messages in the pack
- Assert.AreEqual(19, BitConverter.ToInt32(BitWorks.ReverseBytes(bytes.Skip(17).Take(4).ToArray<byte>()), 0));
+ Assert.AreEqual(20, BitConverter.ToInt32(BitWorks.ReverseBytes(bytes.Skip(17).Take(4).ToArray<byte>()), 0));
// fianl bytes = the individual messages in the pack
- Assert.AreEqual(19, bytes.Skip(21).ToArray<byte>().Length);
+ Assert.AreEqual(20, bytes.Skip(21).ToArray<byte>().Length);
}
}
}