You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by ju...@apache.org on 2011/09/21 21:17:25 UTC
svn commit: r1173797 [6/10] - in /incubator/kafka/trunk/clients/csharp: ./
lib/StyleCop/ src/Kafka/ src/Kafka/Kafka.Client/
src/Kafka/Kafka.Client/Cfg/ src/Kafka/Kafka.Client/Cluster/
src/Kafka/Kafka.Client/Consumers/ src/Kafka/Kafka.Client/Exceptions/...
Added: incubator/kafka/trunk/clients/csharp/src/Kafka/Kafka.Client/ZooKeeperIntegration/Events/ZooKeeperSessionCreatedEventArgs.cs
URL: http://svn.apache.org/viewvc/incubator/kafka/trunk/clients/csharp/src/Kafka/Kafka.Client/ZooKeeperIntegration/Events/ZooKeeperSessionCreatedEventArgs.cs?rev=1173797&view=auto
==============================================================================
--- incubator/kafka/trunk/clients/csharp/src/Kafka/Kafka.Client/ZooKeeperIntegration/Events/ZooKeeperSessionCreatedEventArgs.cs (added)
+++ incubator/kafka/trunk/clients/csharp/src/Kafka/Kafka.Client/ZooKeeperIntegration/Events/ZooKeeperSessionCreatedEventArgs.cs Wed Sep 21 19:17:19 2011
@@ -0,0 +1,45 @@
+/*
+ * Copyright 2011 LinkedIn
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+*/
+
+namespace Kafka.Client.ZooKeeperIntegration.Events
+{
+ /// <summary>
+ /// Contains ZooKeeper session created event data
+ /// </summary>
+ internal class ZooKeeperSessionCreatedEventArgs : ZooKeeperEventArgs
+ {
+ public static new readonly ZooKeeperSessionCreatedEventArgs Empty = new ZooKeeperSessionCreatedEventArgs();
+
+ /// <summary>
+ /// Initializes a new instance of the <see cref="ZooKeeperSessionCreatedEventArgs"/> class.
+ /// </summary>
+ protected ZooKeeperSessionCreatedEventArgs()
+ : base("New session created")
+ {
+ }
+
+ /// <summary>
+ /// Gets the event type.
+ /// </summary>
+ public override ZooKeeperEventTypes Type
+ {
+ get
+ {
+ return ZooKeeperEventTypes.SessionCreated;
+ }
+ }
+ }
+}
Added: incubator/kafka/trunk/clients/csharp/src/Kafka/Kafka.Client/ZooKeeperIntegration/Events/ZooKeeperStateChangedEventArgs.cs
URL: http://svn.apache.org/viewvc/incubator/kafka/trunk/clients/csharp/src/Kafka/Kafka.Client/ZooKeeperIntegration/Events/ZooKeeperStateChangedEventArgs.cs?rev=1173797&view=auto
==============================================================================
--- incubator/kafka/trunk/clients/csharp/src/Kafka/Kafka.Client/ZooKeeperIntegration/Events/ZooKeeperStateChangedEventArgs.cs (added)
+++ incubator/kafka/trunk/clients/csharp/src/Kafka/Kafka.Client/ZooKeeperIntegration/Events/ZooKeeperStateChangedEventArgs.cs Wed Sep 21 19:17:19 2011
@@ -0,0 +1,54 @@
+/*
+ * Copyright 2011 LinkedIn
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+*/
+
+namespace Kafka.Client.ZooKeeperIntegration.Events
+{
+ using ZooKeeperNet;
+
+ /// <summary>
+ /// Contains ZooKeeper session state changed event data
+ /// </summary>
+ internal class ZooKeeperStateChangedEventArgs : ZooKeeperEventArgs
+ {
+ /// <summary>
+ /// Initializes a new instance of the <see cref="ZooKeeperStateChangedEventArgs"/> class.
+ /// </summary>
+ /// <param name="state">
+ /// The current ZooKeeper state.
+ /// </param>
+ public ZooKeeperStateChangedEventArgs(KeeperState state)
+ : base("State changed to " + state)
+ {
+ this.State = state;
+ }
+
+ /// <summary>
+ /// Gets current ZooKeeper state
+ /// </summary>
+ public KeeperState State { get; private set; }
+
+ /// <summary>
+ /// Gets the event type.
+ /// </summary>
+ public override ZooKeeperEventTypes Type
+ {
+ get
+ {
+ return ZooKeeperEventTypes.StateChanged;
+ }
+ }
+ }
+}
Added: incubator/kafka/trunk/clients/csharp/src/Kafka/Kafka.Client/ZooKeeperIntegration/IZooKeeperClient.cs
URL: http://svn.apache.org/viewvc/incubator/kafka/trunk/clients/csharp/src/Kafka/Kafka.Client/ZooKeeperIntegration/IZooKeeperClient.cs?rev=1173797&view=auto
==============================================================================
--- incubator/kafka/trunk/clients/csharp/src/Kafka/Kafka.Client/ZooKeeperIntegration/IZooKeeperClient.cs (added)
+++ incubator/kafka/trunk/clients/csharp/src/Kafka/Kafka.Client/ZooKeeperIntegration/IZooKeeperClient.cs Wed Sep 21 19:17:19 2011
@@ -0,0 +1,468 @@
+/*
+ * Copyright 2011 LinkedIn
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+*/
+
+namespace Kafka.Client.ZooKeeperIntegration
+{
+ using System;
+ using System.Collections.Generic;
+ using Kafka.Client.ZooKeeperIntegration.Listeners;
+ using Org.Apache.Zookeeper.Data;
+ using ZooKeeperNet;
+
+ /// <summary>
+ /// Abstracts the interaction with zookeeper
+ /// </summary>
+ internal interface IZooKeeperClient : IWatcher, IDisposable
+ {
+ /// <summary>
+ /// Gets time (in miliseconds) of event thread idleness
+ /// </summary>
+ /// <remarks>
+ /// Used for testing purpose
+ /// </remarks>
+ int IdleTime { get; }
+
+ /// <summary>
+ /// Connects to ZooKeeper server within given time period and installs watcher in ZooKeeper
+ /// </summary>
+ void Connect();
+
+ /// <summary>
+ /// Closes current connection to ZooKeeper
+ /// </summary>
+ void Disconnect();
+
+ /// <summary>
+ /// Re-connect to ZooKeeper server when session expired
+ /// </summary>
+ /// <param name="servers">
+ /// The servers.
+ /// </param>
+ /// <param name="connectionTimeout">
+ /// The connection timeout.
+ /// </param>
+ void Reconnect(string servers, int connectionTimeout);
+
+ /// <summary>
+ /// Waits untill ZooKeeper connection is established
+ /// </summary>
+ /// <param name="connectionTimeout">
+ /// The connection timeout.
+ /// </param>
+ /// <returns>
+ /// Status
+ /// </returns>
+ bool WaitUntilConnected(int connectionTimeout);
+
+ /// <summary>
+ /// Retries given delegate until connections is established
+ /// </summary>
+ /// <param name="callback">
+ /// The delegate to invoke.
+ /// </param>
+ /// <typeparam name="T">
+ /// Type of data returned by delegate
+ /// </typeparam>
+ /// <returns>
+ /// data returned by delegate
+ /// </returns>
+ T RetryUntilConnected<T>(Func<T> callback);
+
+ /// <summary>
+ /// Subscribes listeners on ZooKeeper state changes events
+ /// </summary>
+ /// <param name="listener">
+ /// The listener.
+ /// </param>
+ void Subscribe(IZooKeeperStateListener listener);
+
+ /// <summary>
+ /// Un-subscribes listeners on ZooKeeper state changes events
+ /// </summary>
+ /// <param name="listener">
+ /// The listener.
+ /// </param>
+ void Unsubscribe(IZooKeeperStateListener listener);
+
+ /// <summary>
+ /// Subscribes listeners on ZooKeeper child changes under given path
+ /// </summary>
+ /// <param name="path">
+ /// The parent path.
+ /// </param>
+ /// <param name="listener">
+ /// The listener.
+ /// </param>
+ void Subscribe(string path, IZooKeeperChildListener listener);
+
+ /// <summary>
+ /// Un-subscribes listeners on ZooKeeper child changes under given path
+ /// </summary>
+ /// <param name="path">
+ /// The parent path.
+ /// </param>
+ /// <param name="listener">
+ /// The listener.
+ /// </param>
+ void Unsubscribe(string path, IZooKeeperChildListener listener);
+
+ /// <summary>
+ /// Subscribes listeners on ZooKeeper data changes under given path
+ /// </summary>
+ /// <param name="path">
+ /// The parent path.
+ /// </param>
+ /// <param name="listener">
+ /// The listener.
+ /// </param>
+ void Subscribe(string path, IZooKeeperDataListener listener);
+
+ /// <summary>
+ /// Un-subscribes listeners on ZooKeeper data changes under given path
+ /// </summary>
+ /// <param name="path">
+ /// The parent path.
+ /// </param>
+ /// <param name="listener">
+ /// The listener.
+ /// </param>
+ void Unsubscribe(string path, IZooKeeperDataListener listener);
+
+ /// <summary>
+ /// Un-subscribes all listeners
+ /// </summary>
+ void UnsubscribeAll();
+
+ /// <summary>
+ /// Installs a child watch for the given path.
+ /// </summary>
+ /// <param name="path">
+ /// The parent path.
+ /// </param>
+ /// <returns>
+ /// the current children of the path or null if the znode with the given path doesn't exist
+ /// </returns>
+ IList<string> WatchForChilds(string path);
+
+ /// <summary>
+ /// Installs a data watch for the given path.
+ /// </summary>
+ /// <param name="path">
+ /// The parent path.
+ /// </param>
+ void WatchForData(string path);
+
+ /// <summary>
+ /// Checks whether znode for a given path exists
+ /// </summary>
+ /// <param name="path">
+ /// The given path.
+ /// </param>
+ /// <returns>
+ /// Result of check
+ /// </returns>
+ bool Exists(string path);
+
+ /// <summary>
+ /// Checks whether znode for a given path exists.
+ /// </summary>
+ /// <param name="path">
+ /// The given path.
+ /// </param>
+ /// <param name="watch">
+ /// Indicates whether should reinstall watcher in ZooKeeper.
+ /// </param>
+ /// <returns>
+ /// Result of check
+ /// </returns>
+ bool Exists(string path, bool watch);
+
+ /// <summary>
+ /// Gets all children for a given path
+ /// </summary>
+ /// <param name="path">
+ /// The given path.
+ /// </param>
+ /// <returns>
+ /// Children
+ /// </returns>
+ IList<string> GetChildren(string path);
+
+ /// <summary>
+ /// Gets all children for a given path
+ /// </summary>
+ /// <param name="path">
+ /// The given path.
+ /// </param>
+ /// <param name="watch">
+ /// Indicates whether should reinstall watcher in ZooKeeper.
+ /// </param>
+ /// <returns>
+ /// Children
+ /// </returns>
+ IList<string> GetChildren(string path, bool watch);
+
+ /// <summary>
+ /// Counts number of children for a given path.
+ /// </summary>
+ /// <param name="path">
+ /// The given path.
+ /// </param>
+ /// <returns>
+ /// Number of children
+ /// </returns>
+ int CountChildren(string path);
+
+ /// <summary>
+ /// Fetches data from a given path in ZooKeeper
+ /// </summary>
+ /// <param name="path">
+ /// The given path.
+ /// </param>
+ /// <param name="stats">
+ /// The statistics.
+ /// </param>
+ /// <param name="watch">
+ /// Indicates whether should reinstall watcher in ZooKeeper.
+ /// </param>
+ /// <typeparam name="T">
+ /// Expected type of data
+ /// </typeparam>
+ /// <returns>
+ /// Data
+ /// </returns>
+ T ReadData<T>(string path, Stat stats, bool watch)
+ where T : class;
+
+ /// <summary>
+ /// Fetches data from a given path in ZooKeeper
+ /// </summary>
+ /// <param name="path">
+ /// The given path.
+ /// </param>
+ /// <param name="stats">
+ /// The statistics.
+ /// </param>
+ /// <typeparam name="T">
+ /// Expected type of data
+ /// </typeparam>
+ /// <returns>
+ /// Data
+ /// </returns>
+ T ReadData<T>(string path, Stat stats)
+ where T : class;
+
+ /// <summary>
+ /// Fetches data from a given path in ZooKeeper
+ /// </summary>
+ /// <typeparam name="T">
+ /// Expected type of data
+ /// </typeparam>
+ /// <param name="path">
+ /// The given path.
+ /// </param>
+ /// <returns>
+ /// Data or null, if znode does not exist
+ /// </returns>
+ T ReadData<T>(string path)
+ where T : class;
+
+ /// <summary>
+ /// Fetches data for given path
+ /// </summary>
+ /// <param name="path">
+ /// The given path.
+ /// </param>
+ /// <param name="returnNullIfPathNotExists">
+ /// Indicates, whether should return null or throw exception when
+ /// znode doesn't exist
+ /// </param>
+ /// <typeparam name="T">
+ /// Expected type of data
+ /// </typeparam>
+ /// <returns>
+ /// Data
+ /// </returns>
+ T ReadData<T>(string path, bool returnNullIfPathNotExists)
+ where T : class;
+
+ /// <summary>
+ /// Writes data for a given path
+ /// </summary>
+ /// <param name="path">
+ /// The given path.
+ /// </param>
+ /// <param name="data">
+ /// The data to write.
+ /// </param>
+ void WriteData(string path, object data);
+
+ /// <summary>
+ /// Writes data for a given path
+ /// </summary>
+ /// <param name="path">
+ /// The given path.
+ /// </param>
+ /// <param name="data">
+ /// The data to write.
+ /// </param>
+ /// <param name="expectedVersion">
+ /// Expected version of data
+ /// </param>
+ void WriteData(string path, object data, int expectedVersion);
+
+ /// <summary>
+ /// Deletes znode for a given path
+ /// </summary>
+ /// <param name="path">
+ /// The given path.
+ /// </param>
+ /// <returns>
+ /// Status
+ /// </returns>
+ bool Delete(string path);
+
+ /// <summary>
+ /// Deletes znode and his children for a given path
+ /// </summary>
+ /// <param name="path">
+ /// The given path.
+ /// </param>
+ /// <returns>
+ /// Status
+ /// </returns>
+ bool DeleteRecursive(string path);
+
+ /// <summary>
+ /// Creates persistent znode and all intermediate znodes (if do not exist) for a given path
+ /// </summary>
+ /// <param name="path">
+ /// The given path.
+ /// </param>
+ void MakeSurePersistentPathExists(string path);
+
+ /// <summary>
+ /// Fetches children for a given path
+ /// </summary>
+ /// <param name="path">
+ /// The path.
+ /// </param>
+ /// <returns>
+ /// Children or null, if znode does not exist
+ /// </returns>
+ IList<string> GetChildrenParentMayNotExist(string path);
+
+ /// <summary>
+ /// Creates a persistent znode for a given path
+ /// </summary>
+ /// <param name="path">
+ /// The given path.
+ /// </param>
+ /// <param name="createParents">
+ /// Indicates whether should create all intermediate znodes
+ /// </param>
+ /// <remarks>
+ /// Persistent znodes won't disappear after session close
+ /// Doesn't re-create missing intermediate znodes
+ /// </remarks>
+ void CreatePersistent(string path, bool createParents);
+
+ /// <summary>
+ /// Creates a persistent znode for a given path
+ /// </summary>
+ /// <param name="path">
+ /// The given path.
+ /// </param>
+ /// <remarks>
+ /// Persistent znodes won't disappear after session close
+ /// Doesn't re-create missing intermediate znodes
+ /// </remarks>
+ void CreatePersistent(string path);
+
+ /// <summary>
+ /// Creates a persistent znode for a given path and writes data into it
+ /// </summary>
+ /// <param name="path">
+ /// The given path.
+ /// </param>
+ /// <param name="data">
+ /// The data to write.
+ /// </param>
+ /// <remarks>
+ /// Persistent znodes won't disappear after session close
+ /// </remarks>
+ void CreatePersistent(string path, object data);
+
+ /// <summary>
+ /// Creates a sequential, persistent znode for a given path and writes data into it
+ /// </summary>
+ /// <param name="path">
+ /// The given path.
+ /// </param>
+ /// <param name="data">
+ /// The data to write.
+ /// </param>
+ /// <remarks>
+ /// Persistent znodes won't dissapear after session close
+ /// </remarks>
+ /// <returns>
+ /// The created znode's path
+ /// </returns>
+ string CreatePersistentSequential(string path, object data);
+
+ /// <summary>
+ /// Creates a ephemeral znode for a given path
+ /// </summary>
+ /// <param name="path">
+ /// The given path.
+ /// </param>
+ /// <remarks>
+ /// Ephemeral znodes will disappear after session close
+ /// </remarks>
+ void CreateEphemeral(string path);
+
+ /// <summary>
+ /// Creates a ephemeral znode for a given path and writes data into it
+ /// </summary>
+ /// <param name="path">
+ /// The given path.
+ /// </param>
+ /// <param name="data">
+ /// The data to write.
+ /// </param>
+ /// <remarks>
+ /// Ephemeral znodes will disappear after session close
+ /// </remarks>
+ void CreateEphemeral(string path, object data);
+
+ /// <summary>
+ /// Creates a ephemeral, sequential znode for a given path and writes data into it
+ /// </summary>
+ /// <param name="path">
+ /// The given path.
+ /// </param>
+ /// <param name="data">
+ /// The data to write.
+ /// </param>
+ /// <remarks>
+ /// Ephemeral znodes will disappear after session close
+ /// </remarks>
+ /// <returns>
+ /// Created znode's path
+ /// </returns>
+ string CreateEphemeralSequential(string path, object data);
+ }
+}
Added: incubator/kafka/trunk/clients/csharp/src/Kafka/Kafka.Client/ZooKeeperIntegration/IZooKeeperConnection.cs
URL: http://svn.apache.org/viewvc/incubator/kafka/trunk/clients/csharp/src/Kafka/Kafka.Client/ZooKeeperIntegration/IZooKeeperConnection.cs?rev=1173797&view=auto
==============================================================================
--- incubator/kafka/trunk/clients/csharp/src/Kafka/Kafka.Client/ZooKeeperIntegration/IZooKeeperConnection.cs (added)
+++ incubator/kafka/trunk/clients/csharp/src/Kafka/Kafka.Client/ZooKeeperIntegration/IZooKeeperConnection.cs Wed Sep 21 19:17:19 2011
@@ -0,0 +1,163 @@
+/*
+ * Copyright 2011 LinkedIn
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+*/
+
+namespace Kafka.Client.ZooKeeperIntegration
+{
+ using System;
+ using System.Collections.Generic;
+ using Org.Apache.Zookeeper.Data;
+ using ZooKeeperNet;
+
+ /// <summary>
+ /// Abstracts connection with ZooKeeper server
+ /// </summary>
+ internal interface IZooKeeperConnection : IDisposable
+ {
+ /// <summary>
+ /// Gets the ZooKeeper client state
+ /// </summary>
+ ZooKeeper.States ClientState { get; }
+
+ /// <summary>
+ /// Gets the list of ZooKeeper servers.
+ /// </summary>
+ string Servers { get; }
+
+ /// <summary>
+ /// Gets the ZooKeeper session timeout
+ /// </summary>
+ int SessionTimeout { get; }
+
+ /// <summary>
+ /// Gets ZooKeeper client.
+ /// </summary>
+ ZooKeeper Client { get; }
+
+ /// <summary>
+ /// Connects to ZooKeeper server
+ /// </summary>
+ /// <param name="watcher">
+ /// The watcher to be installed in ZooKeeper.
+ /// </param>
+ void Connect(IWatcher watcher);
+
+ /// <summary>
+ /// Creates znode using given create mode for given path and writes given data to it
+ /// </summary>
+ /// <param name="path">
+ /// The given path.
+ /// </param>
+ /// <param name="data">
+ /// The data to write.
+ /// </param>
+ /// <param name="mode">
+ /// The create mode.
+ /// </param>
+ /// <returns>
+ /// The created znode's path
+ /// </returns>
+ string Create(string path, byte[] data, CreateMode mode);
+
+ /// <summary>
+ /// Deletes znode for a given path
+ /// </summary>
+ /// <param name="path">
+ /// The given path.
+ /// </param>
+ void Delete(string path);
+
+ /// <summary>
+ /// Checks whether znode for a given path exists.
+ /// </summary>
+ /// <param name="path">
+ /// The given path.
+ /// </param>
+ /// <param name="watch">
+ /// Indicates whether should reinstall watcher in ZooKeeper.
+ /// </param>
+ /// <returns>
+ /// Result of check
+ /// </returns>
+ bool Exists(string path, bool watch);
+
+ /// <summary>
+ /// Gets all children for a given path
+ /// </summary>
+ /// <param name="path">
+ /// The given path.
+ /// </param>
+ /// <param name="watch">
+ /// Indicates whether should reinstall watcher in ZooKeeper.
+ /// </param>
+ /// <returns>
+ /// Children
+ /// </returns>
+ IList<string> GetChildren(string path, bool watch);
+
+ /// <summary>
+ /// Fetches data from a given path in ZooKeeper
+ /// </summary>
+ /// <param name="path">
+ /// The given path.
+ /// </param>
+ /// <param name="stats">
+ /// The statistics.
+ /// </param>
+ /// <param name="watch">
+ /// Indicates whether should reinstall watcher in ZooKeeper.
+ /// </param>
+ /// <returns>
+ /// Data
+ /// </returns>
+ byte[] ReadData(string path, Stat stats, bool watch);
+
+ /// <summary>
+ /// Writes data for a given path
+ /// </summary>
+ /// <param name="path">
+ /// The given path.
+ /// </param>
+ /// <param name="data">
+ /// The data to write.
+ /// </param>
+ void WriteData(string path, byte[] data);
+
+ /// <summary>
+ /// Writes data for a given path
+ /// </summary>
+ /// <param name="path">
+ /// The given path.
+ /// </param>
+ /// <param name="data">
+ /// The data to write.
+ /// </param>
+ /// <param name="version">
+ /// Expected version of data
+ /// </param>
+ void WriteData(string path, byte[] data, int version);
+
+ /// <summary>
+ /// Gets time when connetion was created
+ /// </summary>
+ /// <param name="path">
+ /// The path.
+ /// </param>
+ /// <returns>
+ /// Connection creation time
+ /// </returns>
+ long GetCreateTime(string path);
+ }
+}
Added: incubator/kafka/trunk/clients/csharp/src/Kafka/Kafka.Client/ZooKeeperIntegration/IZooKeeperSerializer.cs
URL: http://svn.apache.org/viewvc/incubator/kafka/trunk/clients/csharp/src/Kafka/Kafka.Client/ZooKeeperIntegration/IZooKeeperSerializer.cs?rev=1173797&view=auto
==============================================================================
--- incubator/kafka/trunk/clients/csharp/src/Kafka/Kafka.Client/ZooKeeperIntegration/IZooKeeperSerializer.cs (added)
+++ incubator/kafka/trunk/clients/csharp/src/Kafka/Kafka.Client/ZooKeeperIntegration/IZooKeeperSerializer.cs Wed Sep 21 19:17:19 2011
@@ -0,0 +1,47 @@
+/*
+ * Copyright 2011 LinkedIn
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+*/
+
+namespace Kafka.Client.ZooKeeperIntegration
+{
+ /// <summary>
+ /// Zookeeper is able to store data in form of byte arrays. This interfacte is a bridge between those byte-array format
+ /// and higher level objects.
+ /// </summary>
+ internal interface IZooKeeperSerializer
+ {
+ /// <summary>
+ /// Serializes data
+ /// </summary>
+ /// <param name="obj">
+ /// The data to serialize
+ /// </param>
+ /// <returns>
+ /// Serialized data
+ /// </returns>
+ byte[] Serialize(object obj);
+
+ /// <summary>
+ /// Deserializes data
+ /// </summary>
+ /// <param name="bytes">
+ /// The serialized data
+ /// </param>
+ /// <returns>
+ /// The deserialized data
+ /// </returns>
+ object Deserialize(byte[] bytes);
+ }
+}
Added: incubator/kafka/trunk/clients/csharp/src/Kafka/Kafka.Client/ZooKeeperIntegration/Listeners/BrokerTopicsListener.cs
URL: http://svn.apache.org/viewvc/incubator/kafka/trunk/clients/csharp/src/Kafka/Kafka.Client/ZooKeeperIntegration/Listeners/BrokerTopicsListener.cs?rev=1173797&view=auto
==============================================================================
--- incubator/kafka/trunk/clients/csharp/src/Kafka/Kafka.Client/ZooKeeperIntegration/Listeners/BrokerTopicsListener.cs (added)
+++ incubator/kafka/trunk/clients/csharp/src/Kafka/Kafka.Client/ZooKeeperIntegration/Listeners/BrokerTopicsListener.cs Wed Sep 21 19:17:19 2011
@@ -0,0 +1,256 @@
+/*
+ * Copyright 2011 LinkedIn
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+*/
+
+namespace Kafka.Client.ZooKeeperIntegration.Listeners
+{
+ using System;
+ using System.Collections.Generic;
+ using System.Globalization;
+ using System.Linq;
+ using System.Reflection;
+ using Kafka.Client.Cluster;
+ using Kafka.Client.Utils;
+ using Kafka.Client.ZooKeeperIntegration.Events;
+ using log4net;
+
+ /// <summary>
+ /// Listens to new broker registrations under a particular topic, in zookeeper and
+ /// keeps the related data structures updated
+ /// </summary>
+ internal class BrokerTopicsListener : IZooKeeperChildListener
+ {
+ private static readonly ILog Logger = LogManager.GetLogger(MethodBase.GetCurrentMethod().DeclaringType);
+
+ private readonly IDictionary<int, Broker> actualBrokerIdMap;
+ private readonly Action<int, string, int> callback;
+ private readonly IDictionary<string, SortedSet<Partition>> actualBrokerTopicsPartitionsMap;
+ private IDictionary<int, Broker> oldBrokerIdMap;
+ private IDictionary<string, SortedSet<Partition>> oldBrokerTopicsPartitionsMap;
+ private readonly IZooKeeperClient zkclient;
+ private readonly object syncLock = new object();
+
+ /// <summary>
+ /// Initializes a new instance of the <see cref="BrokerTopicsListener"/> class.
+ /// </summary>
+ /// <param name="zkclient">The wrapper on ZooKeeper client.</param>
+ /// <param name="actualBrokerTopicsPartitionsMap">The actual broker topics partitions map.</param>
+ /// <param name="actualBrokerIdMap">The actual broker id map.</param>
+ /// <param name="callback">The callback invoked after new broker is added.</param>
+ public BrokerTopicsListener(
+ IZooKeeperClient zkclient,
+ IDictionary<string, SortedSet<Partition>> actualBrokerTopicsPartitionsMap,
+ IDictionary<int, Broker> actualBrokerIdMap,
+ Action<int, string, int> callback)
+ {
+ this.zkclient = zkclient;
+ this.actualBrokerTopicsPartitionsMap = actualBrokerTopicsPartitionsMap;
+ this.actualBrokerIdMap = actualBrokerIdMap;
+ this.callback = callback;
+ this.oldBrokerIdMap = new Dictionary<int, Broker>(this.actualBrokerIdMap);
+ this.oldBrokerTopicsPartitionsMap = new Dictionary<string, SortedSet<Partition>>(this.actualBrokerTopicsPartitionsMap);
+ Logger.Debug("Creating broker topics listener to watch the following paths - \n"
+ + "/broker/topics, /broker/topics/topic, /broker/ids");
+ Logger.Debug("Initialized this broker topics listener with initial mapping of broker id to "
+ + "partition id per topic with " + this.oldBrokerTopicsPartitionsMap.ToMultiString(
+ x => x.Key + " --> " + x.Value.ToMultiString(y => y.ToString(), ","), "; "));
+ }
+
+ /// <summary>
+ /// Called when the children of the given path changed
+ /// </summary>
+ /// <param name="e">The <see cref="Kafka.Client.ZooKeeperIntegration.Events.ZooKeeperChildChangedEventArgs"/> instance containing the event data
+ /// as parent path and children (null if parent was deleted).
+ /// </param>
+ public void HandleChildChange(ZooKeeperChildChangedEventArgs e)
+ {
+ Guard.Assert<ArgumentNullException>(() => e != null);
+ Guard.Assert<ArgumentException>(() => !string.IsNullOrEmpty(e.Path));
+ Guard.Assert<ArgumentNullException>(() => e.Children != null);
+
+ lock (this.syncLock)
+ {
+ try
+ {
+ string path = e.Path;
+ IList<string> childs = e.Children;
+ Logger.Debug("Watcher fired for path: " + path);
+ switch (path)
+ {
+ case ZooKeeperClient.DefaultBrokerTopicsPath:
+ List<string> oldTopics = this.oldBrokerTopicsPartitionsMap.Keys.ToList();
+ List<string> newTopics = childs.Except(oldTopics).ToList();
+ Logger.Debug("List of topics was changed at " + e.Path);
+ Logger.Debug("Current topics -> " + e.Children.ToMultiString(","));
+ Logger.Debug("Old list of topics -> " + oldTopics.ToMultiString(","));
+ Logger.Debug("List of newly registered topics -> " + newTopics.ToMultiString(","));
+ foreach (var newTopic in newTopics)
+ {
+ string brokerTopicPath = ZooKeeperClient.DefaultBrokerTopicsPath + "/" + newTopic;
+ IList<string> brokerList = this.zkclient.GetChildrenParentMayNotExist(brokerTopicPath);
+ this.ProcessNewBrokerInExistingTopic(newTopic, brokerList);
+ this.zkclient.Subscribe(ZooKeeperClient.DefaultBrokerTopicsPath + "/" + newTopic, this);
+ }
+
+ break;
+ case ZooKeeperClient.DefaultBrokerIdsPath:
+ Logger.Debug("List of brokers changed in the Kafka cluster " + e.Path);
+ Logger.Debug("Currently registered list of brokers -> " + e.Children.ToMultiString(","));
+ this.ProcessBrokerChange(path, childs);
+ break;
+ default:
+ string[] parts = path.Split('/');
+ string topic = parts.Last();
+ if (parts.Length == 4 && parts[2] == "topics" && childs != null)
+ {
+ Logger.Debug("List of brokers changed at " + path);
+ Logger.Debug(
+ "Currently registered list of brokers for topic " + topic + " -> " +
+ childs.ToMultiString(","));
+ this.ProcessNewBrokerInExistingTopic(topic, childs);
+ }
+
+ break;
+ }
+
+ this.oldBrokerTopicsPartitionsMap = this.actualBrokerTopicsPartitionsMap;
+ this.oldBrokerIdMap = this.actualBrokerIdMap;
+ }
+ catch (Exception exc)
+ {
+ Logger.Debug("Error while handling " + e, exc);
+ }
+ }
+ }
+
+ /// <summary>
+ /// Resets the state of listener.
+ /// </summary>
+ public void ResetState()
+ {
+ Logger.Debug("Before reseting broker topic partitions state -> "
+ + this.oldBrokerTopicsPartitionsMap.ToMultiString(
+ x => x.Key + " --> " + x.Value.ToMultiString(y => y.ToString(), ","), "; "));
+ this.oldBrokerTopicsPartitionsMap = actualBrokerTopicsPartitionsMap;
+ Logger.Debug("After reseting broker topic partitions state -> "
+ + this.oldBrokerTopicsPartitionsMap.ToMultiString(
+ x => x.Key + " --> " + x.Value.ToMultiString(y => y.ToString(), ","), "; "));
+ Logger.Debug("Before reseting broker id map state -> "
+ + this.oldBrokerIdMap.ToMultiString(", "));
+ this.oldBrokerIdMap = this.actualBrokerIdMap;
+ Logger.Debug("After reseting broker id map state -> "
+ + this.oldBrokerIdMap.ToMultiString(", "));
+ }
+
+ /// <summary>
+ /// Generate the updated mapping of (brokerId, numPartitions) for the new list of brokers
+ /// registered under some topic.
+ /// </summary>
+ /// <param name="topic">The path of the topic under which the brokers have changed..</param>
+ /// <param name="childs">The list of changed brokers.</param>
+ private void ProcessNewBrokerInExistingTopic(string topic, IEnumerable<string> childs)
+ {
+ if (this.actualBrokerTopicsPartitionsMap.ContainsKey(topic))
+ {
+ Logger.Debug("Old list of brokers -> " + this.oldBrokerTopicsPartitionsMap[topic].ToMultiString(x => x.BrokerId.ToString(), ","));
+ }
+
+ var updatedBrokers = new SortedSet<int>(childs.Select(x => int.Parse(x, CultureInfo.InvariantCulture)));
+ string brokerTopicPath = ZooKeeperClient.DefaultBrokerTopicsPath + "/" + topic;
+ var sortedBrokerPartitions = new SortedDictionary<int, int>();
+ foreach (var bid in updatedBrokers)
+ {
+ var num = this.zkclient.ReadData<string>(brokerTopicPath + "/" + bid);
+ sortedBrokerPartitions.Add(bid, int.Parse(num, CultureInfo.InvariantCulture));
+ }
+
+ var updatedBrokerParts = new SortedSet<Partition>();
+ foreach (var bp in sortedBrokerPartitions)
+ {
+ for (int i = 0; i < bp.Value; i++)
+ {
+ var bidPid = new Partition(bp.Key, i);
+ updatedBrokerParts.Add(bidPid);
+ }
+ }
+
+ Logger.Debug("Currently registered list of brokers for topic " + topic + " -> " + childs.ToMultiString(", "));
+ SortedSet<Partition> mergedBrokerParts = updatedBrokerParts;
+ if (this.actualBrokerTopicsPartitionsMap.ContainsKey(topic))
+ {
+ SortedSet<Partition> oldBrokerParts = this.actualBrokerTopicsPartitionsMap[topic];
+ Logger.Debug(
+ "Unregistered list of brokers for topic " + topic + " -> " + oldBrokerParts.ToMultiString(", "));
+ foreach (var oldBrokerPart in oldBrokerParts)
+ {
+ mergedBrokerParts.Add(oldBrokerPart);
+ }
+ }
+ else
+ {
+ this.actualBrokerTopicsPartitionsMap.Add(topic, null);
+ }
+
+ this.actualBrokerTopicsPartitionsMap[topic] = new SortedSet<Partition>(mergedBrokerParts.Where(x => this.actualBrokerIdMap.ContainsKey(x.BrokerId)));
+ }
+
+ /// <summary>
+ /// Processes change in the broker lists.
+ /// </summary>
+ /// <param name="path">The parent path of brokers list.</param>
+ /// <param name="childs">The current brokers.</param>
+ private void ProcessBrokerChange(string path, IEnumerable<string> childs)
+ {
+ if (path != ZooKeeperClient.DefaultBrokerIdsPath)
+ {
+ return;
+ }
+
+ List<int> updatedBrokers = childs.Select(x => int.Parse(x, CultureInfo.InvariantCulture)).ToList();
+ List<int> oldBrokers = this.oldBrokerIdMap.Select(x => x.Key).ToList();
+ List<int> newBrokers = updatedBrokers.Except(oldBrokers).ToList();
+ Logger.Debug("List of newly registered brokers -> " + newBrokers.ToMultiString(","));
+ foreach (int bid in newBrokers)
+ {
+ string brokerInfo = this.zkclient.ReadData<string>(ZooKeeperClient.DefaultBrokerIdsPath + "/" + bid);
+ string[] brokerHost = brokerInfo.Split(':');
+ var port = int.Parse(brokerHost[2], CultureInfo.InvariantCulture);
+ this.actualBrokerIdMap.Add(bid, new Broker(bid, brokerHost[1], brokerHost[1], port));
+ if (this.callback != null)
+ {
+ Logger.Debug("Invoking the callback for broker: " + bid);
+ this.callback(bid, brokerHost[1], port);
+ }
+ }
+
+ List<int> deadBrokers = oldBrokers.Except(updatedBrokers).ToList();
+ Logger.Debug("Deleting broker ids for dead brokers -> " + deadBrokers.ToMultiString(","));
+ foreach (int bid in deadBrokers)
+ {
+ Logger.Debug("Deleting dead broker: " + bid);
+ this.actualBrokerIdMap.Remove(bid);
+ foreach (var topicMap in this.actualBrokerTopicsPartitionsMap)
+ {
+ int affected = topicMap.Value.RemoveWhere(x => x.BrokerId == bid);
+ if (affected > 0)
+ {
+ Logger.Debug("Removing dead broker " + bid + " for topic: " + topicMap.Key);
+ Logger.Debug("Actual list of mapped brokers is -> " + topicMap.Value.ToMultiString(x => x.ToString(), ","));
+ }
+ }
+ }
+ }
+ }
+}
Added: incubator/kafka/trunk/clients/csharp/src/Kafka/Kafka.Client/ZooKeeperIntegration/Listeners/IZooKeeperChildListener.cs
URL: http://svn.apache.org/viewvc/incubator/kafka/trunk/clients/csharp/src/Kafka/Kafka.Client/ZooKeeperIntegration/Listeners/IZooKeeperChildListener.cs?rev=1173797&view=auto
==============================================================================
--- incubator/kafka/trunk/clients/csharp/src/Kafka/Kafka.Client/ZooKeeperIntegration/Listeners/IZooKeeperChildListener.cs (added)
+++ incubator/kafka/trunk/clients/csharp/src/Kafka/Kafka.Client/ZooKeeperIntegration/Listeners/IZooKeeperChildListener.cs Wed Sep 21 19:17:19 2011
@@ -0,0 +1,42 @@
+/*
+ * Copyright 2011 LinkedIn
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+*/
+
+namespace Kafka.Client.ZooKeeperIntegration.Listeners
+{
+ using Kafka.Client.ZooKeeperIntegration.Events;
+
+ /// <summary>
+ /// Listener that can be registered for listening on ZooKeeper znode changes for a given path
+ /// </summary>
+ internal interface IZooKeeperChildListener
+ {
+ /// <summary>
+ /// Called when the children of the given path changed
+ /// </summary>
+ /// <param name="args">The <see cref="Kafka.Client.ZooKeeperIntegration.Events.ZooKeeperChildChangedEventArgs"/> instance containing the event data
+ /// as parent path and children (null if parent was deleted).
+ /// </param>
+ /// <remarks>
+ /// http://zookeeper.wiki.sourceforge.net/ZooKeeperWatches
+ /// </remarks>
+ void HandleChildChange(ZooKeeperChildChangedEventArgs args);
+
+ /// <summary>
+ /// Resets the state of listener.
+ /// </summary>
+ void ResetState();
+ }
+}
Added: incubator/kafka/trunk/clients/csharp/src/Kafka/Kafka.Client/ZooKeeperIntegration/Listeners/IZooKeeperDataListener.cs
URL: http://svn.apache.org/viewvc/incubator/kafka/trunk/clients/csharp/src/Kafka/Kafka.Client/ZooKeeperIntegration/Listeners/IZooKeeperDataListener.cs?rev=1173797&view=auto
==============================================================================
--- incubator/kafka/trunk/clients/csharp/src/Kafka/Kafka.Client/ZooKeeperIntegration/Listeners/IZooKeeperDataListener.cs (added)
+++ incubator/kafka/trunk/clients/csharp/src/Kafka/Kafka.Client/ZooKeeperIntegration/Listeners/IZooKeeperDataListener.cs Wed Sep 21 19:17:19 2011
@@ -0,0 +1,48 @@
+/*
+ * Copyright 2011 LinkedIn
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+*/
+
+namespace Kafka.Client.ZooKeeperIntegration.Listeners
+{
+ using Kafka.Client.ZooKeeperIntegration.Events;
+
+ /// <summary>
+ /// Listener that can be registered for listening on ZooKeeper znode data changes for a given path
+ /// </summary>
+ internal interface IZooKeeperDataListener
+ {
+ /// <summary>
+ /// Called when the data of the given path changed
+ /// </summary>
+ /// <param name="args">The <see cref="ZooKeeperDataChangedEventArgs"/> instance containing the event data
+ /// as path and data.
+ /// </param>
+ /// <remarks>
+ /// http://zookeeper.wiki.sourceforge.net/ZooKeeperWatches
+ /// </remarks>
+ void HandleDataChange(ZooKeeperDataChangedEventArgs args);
+
+ /// <summary>
+ /// Called when the data of the given path was deleted
+ /// </summary>
+ /// <param name="args">The <see cref="ZooKeeperDataChangedEventArgs"/> instance containing the event data
+ /// as path.
+ /// </param>
+ /// <remarks>
+ /// http://zookeeper.wiki.sourceforge.net/ZooKeeperWatches
+ /// </remarks>
+ void HandleDataDelete(ZooKeeperDataChangedEventArgs args);
+ }
+}
Added: incubator/kafka/trunk/clients/csharp/src/Kafka/Kafka.Client/ZooKeeperIntegration/Listeners/IZooKeeperStateListener.cs
URL: http://svn.apache.org/viewvc/incubator/kafka/trunk/clients/csharp/src/Kafka/Kafka.Client/ZooKeeperIntegration/Listeners/IZooKeeperStateListener.cs?rev=1173797&view=auto
==============================================================================
--- incubator/kafka/trunk/clients/csharp/src/Kafka/Kafka.Client/ZooKeeperIntegration/Listeners/IZooKeeperStateListener.cs (added)
+++ incubator/kafka/trunk/clients/csharp/src/Kafka/Kafka.Client/ZooKeeperIntegration/Listeners/IZooKeeperStateListener.cs Wed Sep 21 19:17:19 2011
@@ -0,0 +1,41 @@
+/*
+ * Copyright 2011 LinkedIn
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+*/
+
+namespace Kafka.Client.ZooKeeperIntegration.Listeners
+{
+ using Kafka.Client.ZooKeeperIntegration.Events;
+
+ /// <summary>
+ /// Handles the session expiration event in ZooKeeper
+ /// </summary>
+ internal interface IZooKeeperStateListener
+ {
+ /// <summary>
+ /// Called when the ZooKeeper connection state has changed.
+ /// </summary>
+ /// <param name="args">The <see cref="Kafka.Client.ZooKeeperIntegration.Events.ZooKeeperStateChangedEventArgs"/> instance containing the event data.</param>
+ void HandleStateChanged(ZooKeeperStateChangedEventArgs args);
+
+ /// <summary>
+ /// Called after the ZooKeeper session has expired and a new session has been created.
+ /// </summary>
+ /// <param name="args">The <see cref="Kafka.Client.ZooKeeperIntegration.Events.ZooKeeperSessionCreatedEventArgs"/> instance containing the event data.</param>
+ /// <remarks>
+ /// You would have to re-create any ephemeral nodes here.
+ /// </remarks>
+ void HandleSessionCreated(ZooKeeperSessionCreatedEventArgs args);
+ }
+}
Added: incubator/kafka/trunk/clients/csharp/src/Kafka/Kafka.Client/ZooKeeperIntegration/Listeners/ZKRebalancerListener.cs
URL: http://svn.apache.org/viewvc/incubator/kafka/trunk/clients/csharp/src/Kafka/Kafka.Client/ZooKeeperIntegration/Listeners/ZKRebalancerListener.cs?rev=1173797&view=auto
==============================================================================
--- incubator/kafka/trunk/clients/csharp/src/Kafka/Kafka.Client/ZooKeeperIntegration/Listeners/ZKRebalancerListener.cs (added)
+++ incubator/kafka/trunk/clients/csharp/src/Kafka/Kafka.Client/ZooKeeperIntegration/Listeners/ZKRebalancerListener.cs Wed Sep 21 19:17:19 2011
@@ -0,0 +1,367 @@
+/*
+ * Copyright 2011 LinkedIn
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+*/
+
+namespace Kafka.Client.ZooKeeperIntegration.Listeners
+{
+ using System;
+ using System.Collections.Concurrent;
+ using System.Collections.Generic;
+ using System.Globalization;
+ using System.Linq;
+ using System.Reflection;
+ using System.Threading;
+ using Kafka.Client.Cfg;
+ using Kafka.Client.Cluster;
+ using Kafka.Client.Consumers;
+ using Kafka.Client.Exceptions;
+ using Kafka.Client.Utils;
+ using Kafka.Client.ZooKeeperIntegration.Events;
+ using log4net;
+ using ZooKeeperNet;
+
+ internal class ZKRebalancerListener : IZooKeeperChildListener
+ {
+ private IDictionary<string, IList<string>> oldPartitionsPerTopicMap = new Dictionary<string, IList<string>>();
+
+ private IDictionary<string, IList<string>> oldConsumersPerTopicMap = new Dictionary<string, IList<string>>();
+
+ private readonly IDictionary<string, IDictionary<Partition, PartitionTopicInfo>> topicRegistry;
+
+ private readonly IDictionary<Tuple<string, string>, BlockingCollection<FetchedDataChunk>> queues;
+
+ private static readonly ILog Logger = LogManager.GetLogger(MethodBase.GetCurrentMethod().DeclaringType);
+
+ private readonly string consumerIdString;
+
+ private readonly object syncLock;
+
+ private readonly ConsumerConfig config;
+
+ private readonly IZooKeeperClient zkClient;
+
+ private readonly ZKGroupDirs dirs;
+
+ private readonly Fetcher fetcher;
+
+ private readonly ZookeeperConsumerConnector zkConsumerConnector;
+
+ internal ZKRebalancerListener(
+ ConsumerConfig config,
+ string consumerIdString,
+ IDictionary<string, IDictionary<Partition, PartitionTopicInfo>> topicRegistry,
+ IZooKeeperClient zkClient,
+ ZookeeperConsumerConnector zkConsumerConnector,
+ IDictionary<Tuple<string, string>, BlockingCollection<FetchedDataChunk>> queues,
+ Fetcher fetcher,
+ object syncLock)
+ {
+ this.syncLock = syncLock;
+ this.consumerIdString = consumerIdString;
+ this.config = config;
+ this.topicRegistry = topicRegistry;
+ this.zkClient = zkClient;
+ this.dirs = new ZKGroupDirs(config.GroupId);
+ this.zkConsumerConnector = zkConsumerConnector;
+ this.queues = queues;
+ this.fetcher = fetcher;
+ }
+
+ public void SyncedRebalance()
+ {
+ lock (this.syncLock)
+ {
+ for (int i = 0; i < ZookeeperConsumerConnector.MaxNRetries; i++)
+ {
+ Logger.InfoFormat(CultureInfo.CurrentCulture, "begin rebalancing consumer {0} try #{1}", consumerIdString, i);
+ bool done = false;
+ try
+ {
+ done = this.Rebalance();
+ }
+ catch (Exception ex)
+ {
+ Logger.InfoFormat(CultureInfo.CurrentCulture, "exception during rebalance {0}", ex);
+ }
+
+ Logger.InfoFormat(CultureInfo.CurrentCulture, "end rebalancing consumer {0} try #{1}", consumerIdString, i);
+ if (done)
+ {
+ return;
+ }
+
+ //// release all partitions, reset state and retry
+ this.ReleasePartitionOwnership();
+ this.ResetState();
+ Thread.Sleep(config.ZkSyncTimeMs);
+ }
+ }
+
+ throw new ZKRebalancerException(string.Format(CultureInfo.CurrentCulture, "{0} can't rebalance after {1} retries", this.consumerIdString, ZookeeperConsumerConnector.MaxNRetries));
+ }
+
+ /// <summary>
+ /// Called when the children of the given path changed
+ /// </summary>
+ /// <param name="args">The <see cref="Kafka.Client.ZooKeeperIntegration.Events.ZooKeeperChildChangedEventArgs"/> instance containing the event data
+ /// as parent path and children (null if parent was deleted).
+ /// </param>
+ /// <remarks>
+ /// http://zookeeper.wiki.sourceforge.net/ZooKeeperWatches
+ /// </remarks>
+ public void HandleChildChange(ZooKeeperChildChangedEventArgs args)
+ {
+ Guard.Assert<ArgumentNullException>(() => args != null);
+ Guard.Assert<ArgumentException>(() => !string.IsNullOrEmpty(args.Path));
+ Guard.Assert<ArgumentNullException>(() => args.Children != null);
+
+ SyncedRebalance();
+ }
+
+ /// <summary>
+ /// Resets the state of listener.
+ /// </summary>
+ public void ResetState()
+ {
+ this.topicRegistry.Clear();
+ this.oldConsumersPerTopicMap.Clear();
+ this.oldPartitionsPerTopicMap.Clear();
+ }
+
+ private bool Rebalance()
+ {
+ var myTopicThresdIdsMap = this.GetTopicCount(this.consumerIdString).GetConsumerThreadIdsPerTopic();
+ var cluster = new Cluster(zkClient);
+ var consumersPerTopicMap = this.GetConsumersPerTopic(this.config.GroupId);
+ var partitionsPerTopicMap = ZkUtils.GetPartitionsForTopics(this.zkClient, myTopicThresdIdsMap.Keys);
+ var relevantTopicThreadIdsMap = GetRelevantTopicMap(
+ myTopicThresdIdsMap,
+ partitionsPerTopicMap,
+ this.oldPartitionsPerTopicMap,
+ consumersPerTopicMap,
+ this.oldConsumersPerTopicMap);
+ if (relevantTopicThreadIdsMap.Count <= 0)
+ {
+ Logger.InfoFormat(CultureInfo.CurrentCulture, "Consumer {0} with {1} doesn't need to rebalance.", this.consumerIdString, consumersPerTopicMap);
+ return true;
+ }
+
+ Logger.Info("Committing all offsets");
+ this.zkConsumerConnector.CommitOffsets();
+
+ Logger.Info("Releasing parittion ownership");
+ this.ReleasePartitionOwnership();
+
+ var queuesToBeCleared = new List<BlockingCollection<FetchedDataChunk>>();
+ foreach (var item in relevantTopicThreadIdsMap)
+ {
+ this.topicRegistry.Remove(item.Key);
+ this.topicRegistry.Add(item.Key, new Dictionary<Partition, PartitionTopicInfo>());
+
+ var topicDirs = new ZKGroupTopicDirs(config.GroupId, item.Key);
+ var curConsumers = consumersPerTopicMap[item.Key];
+ var curPartitions = new List<string>(partitionsPerTopicMap[item.Key]);
+
+ var numberOfPartsPerConsumer = curPartitions.Count / curConsumers.Count;
+ var numberOfConsumersWithExtraPart = curPartitions.Count % curConsumers.Count;
+
+ Logger.InfoFormat(
+ CultureInfo.CurrentCulture,
+ "Consumer {0} rebalancing the following partitions: {1} for topic {2} with consumers: {3}",
+ this.consumerIdString,
+ string.Join(",", curPartitions),
+ item.Key,
+ string.Join(",", curConsumers));
+
+ foreach (string consumerThreadId in item.Value)
+ {
+ var myConsumerPosition = curConsumers.IndexOf(consumerThreadId);
+ if (myConsumerPosition < 0)
+ {
+ continue;
+ }
+
+ var startPart = (numberOfPartsPerConsumer * myConsumerPosition) +
+ Math.Min(myConsumerPosition, numberOfConsumersWithExtraPart);
+ var numberOfParts = numberOfPartsPerConsumer + (myConsumerPosition + 1 > numberOfConsumersWithExtraPart ? 0 : 1);
+
+ if (numberOfParts <= 0)
+ {
+ Logger.WarnFormat(CultureInfo.CurrentCulture, "No broker partitions consumed by consumer thread {0} for topic {1}", consumerThreadId, item.Key);
+ }
+ else
+ {
+ for (int i = startPart; i < startPart + numberOfParts; i++)
+ {
+ var partition = curPartitions[i];
+ Logger.InfoFormat(CultureInfo.CurrentCulture, "{0} attempting to claim partition {1}", consumerThreadId, partition);
+ if (!this.ProcessPartition(topicDirs, partition, item.Key, consumerThreadId))
+ {
+ return false;
+ }
+ }
+
+ queuesToBeCleared.Add(queues[new Tuple<string, string>(item.Key, consumerThreadId)]);
+ }
+ }
+ }
+
+ this.UpdateFetcher(cluster, queuesToBeCleared);
+ this.oldPartitionsPerTopicMap = partitionsPerTopicMap;
+ this.oldConsumersPerTopicMap = consumersPerTopicMap;
+ return true;
+ }
+
+ private void UpdateFetcher(Cluster cluster, IEnumerable<BlockingCollection<FetchedDataChunk>> queuesToBeCleared)
+ {
+ var allPartitionInfos = new List<PartitionTopicInfo>();
+ foreach (var item in this.topicRegistry.Values)
+ {
+ foreach (var partitionTopicInfo in item.Values)
+ {
+ allPartitionInfos.Add(partitionTopicInfo);
+ }
+ }
+
+ Logger.InfoFormat(
+ CultureInfo.CurrentCulture,
+ "Consumer {0} selected partitions: {1}",
+ this.consumerIdString,
+ string.Join(",", allPartitionInfos.OrderBy(x => x.Partition.Name).Select(y => y.Partition.Name)));
+ if (this.fetcher != null)
+ {
+ this.fetcher.InitConnections(allPartitionInfos, cluster, queuesToBeCleared);
+ }
+ }
+
+ private bool ProcessPartition(ZKGroupTopicDirs topicDirs, string partition, string topic, string consumerThreadId)
+ {
+ var partitionOwnerPath = topicDirs.ConsumerOwnerDir + "/" + partition;
+ try
+ {
+ ZkUtils.CreateEphemeralPathExpectConflict(zkClient, partitionOwnerPath, consumerThreadId);
+ }
+ catch (KeeperException.NodeExistsException)
+ {
+ //// The node hasn't been deleted by the original owner. So wait a bit and retry.
+ Logger.InfoFormat(CultureInfo.CurrentCulture, "waiting for the partition ownership to be deleted: {0}", partition);
+ return false;
+ }
+
+ AddPartitionTopicInfo(topicDirs, partition, topic, consumerThreadId);
+ return true;
+ }
+
+ private void AddPartitionTopicInfo(ZKGroupTopicDirs topicDirs, string partitionString, string topic, string consumerThreadId)
+ {
+ var partition = Partition.ParseFrom(partitionString);
+ var partTopicInfoMap = this.topicRegistry[topic];
+ var znode = topicDirs.ConsumerOffsetDir + "/" + partition.Name;
+ var offsetString = this.zkClient.ReadData<string>(znode, true);
+ long offset = string.IsNullOrEmpty(offsetString) ? 0 : long.Parse(offsetString, CultureInfo.InvariantCulture);
+ var queue = this.queues[new Tuple<string, string>(topic, consumerThreadId)];
+ var partTopicInfo = new PartitionTopicInfo(
+ topic,
+ partition.BrokerId,
+ partition,
+ queue,
+ offset,
+ offset,
+ this.config.FetchSize);
+ partTopicInfoMap.Add(partition, partTopicInfo);
+ if (Logger.IsDebugEnabled)
+ {
+ Logger.DebugFormat(CultureInfo.CurrentCulture, "{0} selected new offset {1}", partTopicInfo, offset);
+ }
+ }
+
+ private void ReleasePartitionOwnership()
+ {
+ foreach (KeyValuePair<string, IDictionary<Partition, PartitionTopicInfo>> item in topicRegistry)
+ {
+ var topicDirs = new ZKGroupTopicDirs(this.config.GroupId, item.Key);
+ foreach (var partition in item.Value.Keys)
+ {
+ string znode = topicDirs.ConsumerOwnerDir + "/" + partition.Name;
+ ZkUtils.DeletePath(zkClient, znode);
+ if (Logger.IsDebugEnabled)
+ {
+ Logger.DebugFormat(CultureInfo.CurrentCulture, "Consumer {0} releasing {1}", this.consumerIdString, znode);
+ }
+ }
+ }
+ }
+
+ private TopicCount GetTopicCount(string consumerId)
+ {
+ var topicCountJson = this.zkClient.ReadData<string>(this.dirs.ConsumerRegistryDir + "/" + consumerId);
+ return TopicCount.ConstructTopicCount(consumerId, topicCountJson);
+ }
+
+ private IDictionary<string, IList<string>> GetConsumersPerTopic(string group)
+ {
+ var consumers = this.zkClient.GetChildrenParentMayNotExist(this.dirs.ConsumerRegistryDir);
+ var consumersPerTopicMap = new Dictionary<string, IList<string>>();
+ foreach (var consumer in consumers)
+ {
+ TopicCount topicCount = GetTopicCount(consumer);
+ foreach (KeyValuePair<string, IList<string>> consumerThread in topicCount.GetConsumerThreadIdsPerTopic())
+ {
+ foreach (string consumerThreadId in consumerThread.Value)
+ {
+ if (!consumersPerTopicMap.ContainsKey(consumerThread.Key))
+ {
+ consumersPerTopicMap.Add(consumerThread.Key, new List<string> { consumerThreadId });
+ }
+ else
+ {
+ consumersPerTopicMap[consumerThread.Key].Add(consumerThreadId);
+ }
+ }
+ }
+ }
+
+ foreach (KeyValuePair<string, IList<string>> item in consumersPerTopicMap)
+ {
+ item.Value.ToList().Sort();
+ }
+
+ return consumersPerTopicMap;
+ }
+
+ private static IDictionary<string, IList<string>> GetRelevantTopicMap(
+ IDictionary<string, IList<string>> myTopicThreadIdsMap,
+ IDictionary<string, IList<string>> newPartMap,
+ IDictionary<string, IList<string>> oldPartMap,
+ IDictionary<string, IList<string>> newConsumerMap,
+ IDictionary<string, IList<string>> oldConsumerMap)
+ {
+ var relevantTopicThreadIdsMap = new Dictionary<string, IList<string>>();
+ foreach (var myMap in myTopicThreadIdsMap)
+ {
+ var oldPartValue = oldPartMap.ContainsKey(myMap.Key) ? oldPartMap[myMap.Key] : null;
+ var newPartValue = newPartMap.ContainsKey(myMap.Key) ? newPartMap[myMap.Key] : null;
+ var oldConsumerValue = oldConsumerMap.ContainsKey(myMap.Key) ? oldConsumerMap[myMap.Key] : null;
+ var newConsumerValue = newConsumerMap.ContainsKey(myMap.Key) ? newConsumerMap[myMap.Key] : null;
+ if (oldPartValue != newPartValue || oldConsumerValue != newConsumerValue)
+ {
+ relevantTopicThreadIdsMap.Add(myMap.Key, myMap.Value);
+ }
+ }
+
+ return relevantTopicThreadIdsMap;
+ }
+ }
+}
Added: incubator/kafka/trunk/clients/csharp/src/Kafka/Kafka.Client/ZooKeeperIntegration/Listeners/ZKSessionExpireListener.cs
URL: http://svn.apache.org/viewvc/incubator/kafka/trunk/clients/csharp/src/Kafka/Kafka.Client/ZooKeeperIntegration/Listeners/ZKSessionExpireListener.cs?rev=1173797&view=auto
==============================================================================
--- incubator/kafka/trunk/clients/csharp/src/Kafka/Kafka.Client/ZooKeeperIntegration/Listeners/ZKSessionExpireListener.cs (added)
+++ incubator/kafka/trunk/clients/csharp/src/Kafka/Kafka.Client/ZooKeeperIntegration/Listeners/ZKSessionExpireListener.cs Wed Sep 21 19:17:19 2011
@@ -0,0 +1,85 @@
+/*
+ * Copyright 2011 LinkedIn
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+*/
+
+namespace Kafka.Client.ZooKeeperIntegration.Listeners
+{
+ using System;
+ using System.Globalization;
+ using System.Reflection;
+ using Kafka.Client.Consumers;
+ using Kafka.Client.Utils;
+ using Kafka.Client.ZooKeeperIntegration.Events;
+ using log4net;
+ using ZooKeeperNet;
+
+ internal class ZKSessionExpireListener : IZooKeeperStateListener
+ {
+ private static readonly ILog Logger = LogManager.GetLogger(MethodBase.GetCurrentMethod().DeclaringType);
+
+ private readonly string consumerIdString;
+
+ private readonly ZKRebalancerListener loadBalancerListener;
+
+ private readonly ZookeeperConsumerConnector zkConsumerConnector;
+
+ private readonly ZKGroupDirs dirs;
+
+ private readonly TopicCount topicCount;
+
+ public ZKSessionExpireListener(ZKGroupDirs dirs, string consumerIdString, TopicCount topicCount, ZKRebalancerListener loadBalancerListener, ZookeeperConsumerConnector zkConsumerConnector)
+ {
+ this.consumerIdString = consumerIdString;
+ this.loadBalancerListener = loadBalancerListener;
+ this.zkConsumerConnector = zkConsumerConnector;
+ this.dirs = dirs;
+ this.topicCount = topicCount;
+ }
+
+ /// <summary>
+ /// Called when the ZooKeeper connection state has changed.
+ /// </summary>
+ /// <param name="args">The <see cref="Kafka.Client.ZooKeeperIntegration.Events.ZooKeeperStateChangedEventArgs"/> instance containing the event data.</param>
+ /// <remarks>
+ /// Do nothing, since zkclient will do reconnect for us.
+ /// </remarks>
+ public void HandleStateChanged(ZooKeeperStateChangedEventArgs args)
+ {
+ Guard.Assert<ArgumentNullException>(() => args != null);
+ Guard.Assert<ArgumentException>(() => args.State != KeeperState.Unknown);
+ }
+
+ /// <summary>
+ /// Called after the ZooKeeper session has expired and a new session has been created.
+ /// </summary>
+ /// <param name="args">The <see cref="Kafka.Client.ZooKeeperIntegration.Events.ZooKeeperSessionCreatedEventArgs"/> instance containing the event data.</param>
+ /// <remarks>
+ /// You would have to re-create any ephemeral nodes here.
+ /// Explicitly trigger load balancing for this consumer.
+ /// </remarks>
+ public void HandleSessionCreated(ZooKeeperSessionCreatedEventArgs args)
+ {
+ Guard.Assert<ArgumentNullException>(() => args != null);
+
+ Logger.InfoFormat(
+ CultureInfo.CurrentCulture,
+ "ZK expired; release old broker partition ownership; re-register consumer {0}",
+ this.consumerIdString);
+ this.loadBalancerListener.ResetState();
+ this.zkConsumerConnector.RegisterConsumerInZk(this.dirs, this.consumerIdString, this.topicCount);
+ this.loadBalancerListener.SyncedRebalance();
+ }
+ }
+}