You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by ju...@apache.org on 2011/09/21 21:17:25 UTC
svn commit: r1173797 [7/10] - in /incubator/kafka/trunk/clients/csharp: ./
lib/StyleCop/ src/Kafka/ src/Kafka/Kafka.Client/
src/Kafka/Kafka.Client/Cfg/ src/Kafka/Kafka.Client/Cluster/
src/Kafka/Kafka.Client/Consumers/ src/Kafka/Kafka.Client/Exceptions/...
Added: incubator/kafka/trunk/clients/csharp/src/Kafka/Kafka.Client/ZooKeeperIntegration/ZooKeeperClient.Watcher.cs
URL: http://svn.apache.org/viewvc/incubator/kafka/trunk/clients/csharp/src/Kafka/Kafka.Client/ZooKeeperIntegration/ZooKeeperClient.Watcher.cs?rev=1173797&view=auto
==============================================================================
--- incubator/kafka/trunk/clients/csharp/src/Kafka/Kafka.Client/ZooKeeperIntegration/ZooKeeperClient.Watcher.cs (added)
+++ incubator/kafka/trunk/clients/csharp/src/Kafka/Kafka.Client/ZooKeeperIntegration/ZooKeeperClient.Watcher.cs Wed Sep 21 19:17:19 2011
@@ -0,0 +1,680 @@
+/*
+ * Copyright 2011 LinkedIn
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+*/
+
+namespace Kafka.Client.ZooKeeperIntegration
+{
+ using System;
+ using System.Collections.Concurrent;
+ using System.Collections.Generic;
+ using System.Threading;
+ using Kafka.Client.Utils;
+ using Kafka.Client.ZooKeeperIntegration.Events;
+ using Kafka.Client.ZooKeeperIntegration.Listeners;
+ using ZooKeeperNet;
+
+ internal partial class ZooKeeperClient
+ {
+ /// <summary>
+ /// Represents the method that will handle a ZooKeeper event
+ /// </summary>
+ /// <param name="args">
+ /// The args.
+ /// </param>
+ /// <typeparam name="T">
+ /// Type of event data
+ /// </typeparam>
+ public delegate void ZooKeeperEventHandler<T>(T args)
+ where T : ZooKeeperEventArgs;
+
+ /// <summary>
+ /// Occurs when ZooKeeper connection state changes
+ /// </summary>
+ public event ZooKeeperEventHandler<ZooKeeperStateChangedEventArgs> StateChanged
+ {
+ add
+ {
+ this.EnsuresNotDisposed();
+ lock (this.eventLock)
+ {
+ this.stateChangedHandlers -= value;
+ this.stateChangedHandlers += value;
+ }
+ }
+
+ remove
+ {
+ this.EnsuresNotDisposed();
+ lock (this.eventLock)
+ {
+ this.stateChangedHandlers -= value;
+ }
+ }
+ }
+
+ /// <summary>
+ /// Occurs when ZooKeeper session re-creates
+ /// </summary>
+ public event ZooKeeperEventHandler<ZooKeeperSessionCreatedEventArgs> SessionCreated
+ {
+ add
+ {
+ this.EnsuresNotDisposed();
+ lock (this.eventLock)
+ {
+ this.sessionCreatedHandlers -= value;
+ this.sessionCreatedHandlers += value;
+ }
+ }
+
+ remove
+ {
+ this.EnsuresNotDisposed();
+ lock (this.eventLock)
+ {
+ this.sessionCreatedHandlers -= value;
+ }
+ }
+ }
+
+ private readonly ConcurrentQueue<ZooKeeperEventArgs> eventsQueue = new ConcurrentQueue<ZooKeeperEventArgs>();
+ private readonly object eventLock = new object();
+ private ZooKeeperEventHandler<ZooKeeperStateChangedEventArgs> stateChangedHandlers;
+ private ZooKeeperEventHandler<ZooKeeperSessionCreatedEventArgs> sessionCreatedHandlers;
+ private Thread eventWorker;
+ private Thread zooKeeperEventWorker;
+ private readonly ConcurrentDictionary<string, ChildChangedEventItem> childChangedHandlers = new ConcurrentDictionary<string, ChildChangedEventItem>();
+ private readonly ConcurrentDictionary<string, DataChangedEventItem> dataChangedHandlers = new ConcurrentDictionary<string, DataChangedEventItem>();
+ private DateTime? idleTime;
+
+ /// <summary>
+ /// Gets time (in miliseconds) of event thread iddleness
+ /// </summary>
+ /// <remarks>
+ /// Used for testing purpose
+ /// </remarks>
+ public int IdleTime
+ {
+ get
+ {
+ return this.idleTime.HasValue ? Convert.ToInt32((DateTime.Now - this.idleTime.Value).TotalMilliseconds) : 0;
+ }
+ }
+
+ /// <summary>
+ /// Processes ZooKeeper event
+ /// </summary>
+ /// <param name="e">
+ /// The event data.
+ /// </param>
+ /// <remarks>
+ /// Requires installed watcher
+ /// </remarks>
+ public void Process(WatchedEvent e)
+ {
+ this.EnsuresNotDisposed();
+ Logger.Debug("Received event: " + e);
+ this.zooKeeperEventWorker = Thread.CurrentThread;
+ if (this.shutdownTriggered)
+ {
+ Logger.Debug("ignoring event '{" + e.Type + " | " + e.Path + "}' since shutdown triggered");
+ return;
+ }
+
+ bool stateChanged = e.Path == null;
+ bool znodeChanged = e.Path != null;
+ bool dataChanged =
+ e.Type == EventType.NodeDataChanged
+ || e.Type == EventType.NodeDeleted
+ || e.Type == EventType.NodeCreated
+ || e.Type == EventType.NodeChildrenChanged;
+
+ lock (this.somethingChanged)
+ {
+ try
+ {
+ if (stateChanged)
+ {
+ this.ProcessStateChange(e);
+ }
+
+ if (dataChanged)
+ {
+ this.ProcessDataOrChildChange(e);
+ }
+ }
+ finally
+ {
+ if (stateChanged)
+ {
+ lock (this.stateChangedLock)
+ {
+ Monitor.PulseAll(this.stateChangedLock);
+ }
+
+ if (e.State == KeeperState.Expired)
+ {
+ lock (this.znodeChangedLock)
+ {
+ Monitor.PulseAll(this.znodeChangedLock);
+ }
+
+ foreach (string path in this.childChangedHandlers.Keys)
+ {
+ this.Enqueue(new ZooKeeperChildChangedEventArgs(path));
+ }
+
+ foreach (string path in this.dataChangedHandlers.Keys)
+ {
+ this.Enqueue(new ZooKeeperDataChangedEventArgs(path));
+ }
+ }
+ }
+
+ if (znodeChanged)
+ {
+ lock (this.znodeChangedLock)
+ {
+ Monitor.PulseAll(this.znodeChangedLock);
+ }
+ }
+ }
+
+ Monitor.PulseAll(this.somethingChanged);
+ }
+ }
+
+ /// <summary>
+ /// Subscribes listeners on ZooKeeper state changes events
+ /// </summary>
+ /// <param name="listener">
+ /// The listener.
+ /// </param>
+ public void Subscribe(IZooKeeperStateListener listener)
+ {
+ Guard.Assert<ArgumentNullException>(() => listener != null);
+
+ this.EnsuresNotDisposed();
+ this.StateChanged += listener.HandleStateChanged;
+ this.SessionCreated += listener.HandleSessionCreated;
+ Logger.Debug("Subscribed state changes handler " + listener.GetType().Name);
+ }
+
+ /// <summary>
+ /// Un-subscribes listeners on ZooKeeper state changes events
+ /// </summary>
+ /// <param name="listener">
+ /// The listener.
+ /// </param>
+ public void Unsubscribe(IZooKeeperStateListener listener)
+ {
+ Guard.Assert<ArgumentNullException>(() => listener != null);
+
+ this.EnsuresNotDisposed();
+ this.StateChanged -= listener.HandleStateChanged;
+ this.SessionCreated -= listener.HandleSessionCreated;
+ Logger.Debug("Unsubscribed state changes handler " + listener.GetType().Name);
+ }
+
+ /// <summary>
+ /// Subscribes listeners on ZooKeeper child changes under given path
+ /// </summary>
+ /// <param name="path">
+ /// The parent path.
+ /// </param>
+ /// <param name="listener">
+ /// The listener.
+ /// </param>
+ public void Subscribe(string path, IZooKeeperChildListener listener)
+ {
+ Guard.Assert<ArgumentException>(() => !string.IsNullOrEmpty(path));
+ Guard.Assert<ArgumentNullException>(() => listener != null);
+
+ this.EnsuresNotDisposed();
+ this.childChangedHandlers.AddOrUpdate(
+ path,
+ new ChildChangedEventItem(Logger, listener.HandleChildChange),
+ (key, oldValue) => { oldValue.ChildChanged += listener.HandleChildChange; return oldValue; });
+ this.WatchForChilds(path);
+ Logger.Debug("Subscribed child changes handler " + listener.GetType().Name + " for path: " + path);
+ }
+
+ /// <summary>
+ /// Un-subscribes listeners on ZooKeeper child changes under given path
+ /// </summary>
+ /// <param name="path">
+ /// The parent path.
+ /// </param>
+ /// <param name="listener">
+ /// The listener.
+ /// </param>
+ public void Unsubscribe(string path, IZooKeeperChildListener listener)
+ {
+ Guard.Assert<ArgumentException>(() => !string.IsNullOrEmpty(path));
+ Guard.Assert<ArgumentNullException>(() => listener != null);
+
+ this.EnsuresNotDisposed();
+ this.childChangedHandlers.AddOrUpdate(
+ path,
+ new ChildChangedEventItem(Logger),
+ (key, oldValue) => { oldValue.ChildChanged -= listener.HandleChildChange; return oldValue; });
+ Logger.Debug("Unsubscribed child changes handler " + listener.GetType().Name + " for path: " + path);
+ }
+
+ /// <summary>
+ /// Subscribes listeners on ZooKeeper data changes under given path
+ /// </summary>
+ /// <param name="path">
+ /// The parent path.
+ /// </param>
+ /// <param name="listener">
+ /// The listener.
+ /// </param>
+ public void Subscribe(string path, IZooKeeperDataListener listener)
+ {
+ Guard.Assert<ArgumentException>(() => !string.IsNullOrEmpty(path));
+ Guard.Assert<ArgumentNullException>(() => listener != null);
+
+ this.EnsuresNotDisposed();
+ this.dataChangedHandlers.AddOrUpdate(
+ path,
+ new DataChangedEventItem(Logger, listener.HandleDataChange, listener.HandleDataDelete),
+ (key, oldValue) =>
+ {
+ oldValue.DataChanged += listener.HandleDataChange;
+ oldValue.DataDeleted += listener.HandleDataDelete;
+ return oldValue;
+ });
+ this.WatchForData(path);
+ Logger.Debug("Subscribed data changes handler " + listener.GetType().Name + " for path: " + path);
+ }
+
+ /// <summary>
+ /// Un-subscribes listeners on ZooKeeper data changes under given path
+ /// </summary>
+ /// <param name="path">
+ /// The parent path.
+ /// </param>
+ /// <param name="listener">
+ /// The listener.
+ /// </param>
+ public void Unsubscribe(string path, IZooKeeperDataListener listener)
+ {
+ Guard.Assert<ArgumentException>(() => !string.IsNullOrEmpty(path));
+ Guard.Assert<ArgumentNullException>(() => listener != null);
+
+ this.EnsuresNotDisposed();
+ this.dataChangedHandlers.AddOrUpdate(
+ path,
+ new DataChangedEventItem(Logger),
+ (key, oldValue) =>
+ {
+ oldValue.DataChanged -= listener.HandleDataChange;
+ oldValue.DataDeleted -= listener.HandleDataDelete;
+ return oldValue;
+ });
+ Logger.Debug("Unsubscribed data changes handler " + listener.GetType().Name + " for path: " + path);
+ }
+
+ /// <summary>
+ /// Un-subscribes all listeners
+ /// </summary>
+ public void UnsubscribeAll()
+ {
+ this.EnsuresNotDisposed();
+ lock (this.eventLock)
+ {
+ this.stateChangedHandlers = null;
+ this.sessionCreatedHandlers = null;
+ this.childChangedHandlers.Clear();
+ this.dataChangedHandlers.Clear();
+ }
+
+ Logger.Debug("Unsubscribed all handlers");
+ }
+
+ /// <summary>
+ /// Installs a child watch for the given path.
+ /// </summary>
+ /// <param name="path">
+ /// The parent path.
+ /// </param>
+ /// <returns>
+ /// the current children of the path or null if the znode with the given path doesn't exist
+ /// </returns>
+ public IList<string> WatchForChilds(string path)
+ {
+ Guard.Assert<ArgumentException>(() => !string.IsNullOrEmpty(path));
+
+ this.EnsuresNotDisposed();
+ if (this.zooKeeperEventWorker != null && Thread.CurrentThread == this.zooKeeperEventWorker)
+ {
+ throw new InvalidOperationException("Must not be done in the zookeeper event thread.");
+ }
+
+ return this.RetryUntilConnected(
+ () =>
+ {
+ this.Exists(path);
+ try
+ {
+ return this.GetChildren(path);
+ }
+ catch (KeeperException.NoNodeException)
+ {
+ return null;
+ }
+ });
+ }
+
+ /// <summary>
+ /// Installs a data watch for the given path.
+ /// </summary>
+ /// <param name="path">
+ /// The parent path.
+ /// </param>
+ public void WatchForData(string path)
+ {
+ Guard.Assert<ArgumentException>(() => !string.IsNullOrEmpty(path));
+
+ this.EnsuresNotDisposed();
+ this.RetryUntilConnected(
+ () => this.Exists(path, true));
+ }
+
+ /// <summary>
+ /// Checks whether any data or child listeners are registered
+ /// </summary>
+ /// <param name="path">
+ /// The path.
+ /// </param>
+ /// <returns>
+ /// Value indicates whether any data or child listeners are registered
+ /// </returns>
+ private bool HasListeners(string path)
+ {
+ ChildChangedEventItem childChanged;
+ this.childChangedHandlers.TryGetValue(path, out childChanged);
+ if (childChanged != null && childChanged.Count > 0)
+ {
+ return true;
+ }
+
+ DataChangedEventItem dataChanged;
+ this.dataChangedHandlers.TryGetValue(path, out dataChanged);
+ if (dataChanged != null && dataChanged.TotalCount > 0)
+ {
+ return true;
+ }
+
+ return false;
+ }
+
+ /// <summary>
+ /// Event thread starting method
+ /// </summary>
+ private void RunEventWorker()
+ {
+ Logger.Debug("Starting ZooKeeper watcher event thread");
+ try
+ {
+ this.PoolEventsQueue();
+ }
+ catch (ThreadInterruptedException)
+ {
+ Logger.Debug("Terminate ZooKeeper watcher event thread");
+ }
+ }
+
+ /// <summary>
+ /// Pools ZooKeeper events form events queue
+ /// </summary>
+ /// <remarks>
+ /// Thread sleeps if queue is empty
+ /// </remarks>
+ private void PoolEventsQueue()
+ {
+ while (true)
+ {
+ while (!this.eventsQueue.IsEmpty)
+ {
+ this.Dequeue();
+ }
+
+ lock (this.somethingChanged)
+ {
+ Logger.Debug("Awaiting events ...");
+ this.idleTime = DateTime.Now;
+ Monitor.Wait(this.somethingChanged);
+ this.idleTime = null;
+ }
+ }
+ }
+
+ /// <summary>
+ /// Enqueues new event from ZooKeeper in events queue
+ /// </summary>
+ /// <param name="e">
+ /// The event from ZooKeeper.
+ /// </param>
+ private void Enqueue(ZooKeeperEventArgs e)
+ {
+ Logger.Debug("New event queued: " + e);
+ this.eventsQueue.Enqueue(e);
+ }
+
+ /// <summary>
+ /// Dequeues event from events queue and invokes subscribed handlers
+ /// </summary>
+ private void Dequeue()
+ {
+ try
+ {
+ ZooKeeperEventArgs e;
+ var success = this.eventsQueue.TryDequeue(out e);
+ if (success)
+ {
+ if (e != null)
+ {
+ Logger.Debug("Event dequeued: " + e);
+ switch (e.Type)
+ {
+ case ZooKeeperEventTypes.StateChanged:
+ this.OnStateChanged((ZooKeeperStateChangedEventArgs)e);
+ break;
+ case ZooKeeperEventTypes.SessionCreated:
+ this.OnSessionCreated((ZooKeeperSessionCreatedEventArgs)e);
+ break;
+ case ZooKeeperEventTypes.ChildChanged:
+ this.OnChildChanged((ZooKeeperChildChangedEventArgs)e);
+ break;
+ case ZooKeeperEventTypes.DataChanged:
+ this.OnDataChanged((ZooKeeperDataChangedEventArgs)e);
+ break;
+ default:
+ throw new InvalidOperationException("Not supported event type");
+ }
+ }
+ }
+ }
+ catch (Exception exc)
+ {
+ Logger.Warn("Error handling event ", exc);
+ }
+ }
+
+ /// <summary>
+ /// Processess ZooKeeper state changes events
+ /// </summary>
+ /// <param name="e">
+ /// The event data.
+ /// </param>
+ private void ProcessStateChange(WatchedEvent e)
+ {
+ Logger.Info("zookeeper state changed (" + e.State + ")");
+ lock (this.stateChangedLock)
+ {
+ this.currentState = e.State;
+ }
+
+ if (this.shutdownTriggered)
+ {
+ return;
+ }
+
+ this.Enqueue(new ZooKeeperStateChangedEventArgs(e.State));
+ if (e.State == KeeperState.Expired)
+ {
+ this.Reconnect(this.connection.Servers, this.connection.SessionTimeout);
+ this.Enqueue(ZooKeeperSessionCreatedEventArgs.Empty);
+ }
+ }
+
+ /// <summary>
+ /// Processess ZooKeeper childs or data changes events
+ /// </summary>
+ /// <param name="e">
+ /// The event data.
+ /// </param>
+ private void ProcessDataOrChildChange(WatchedEvent e)
+ {
+ if (this.shutdownTriggered)
+ {
+ return;
+ }
+
+ if (e.Type == EventType.NodeChildrenChanged
+ || e.Type == EventType.NodeCreated
+ || e.Type == EventType.NodeDeleted)
+ {
+ this.Enqueue(new ZooKeeperChildChangedEventArgs(e.Path));
+ }
+
+ if (e.Type == EventType.NodeDataChanged
+ || e.Type == EventType.NodeCreated
+ || e.Type == EventType.NodeDeleted)
+ {
+ this.Enqueue(new ZooKeeperDataChangedEventArgs(e.Path));
+ }
+ }
+
+ /// <summary>
+ /// Invokes subscribed handlers for ZooKeeeper state changes event
+ /// </summary>
+ /// <param name="e">
+ /// The event data.
+ /// </param>
+ private void OnStateChanged(ZooKeeperStateChangedEventArgs e)
+ {
+ try
+ {
+ var handlers = this.stateChangedHandlers;
+ if (handlers == null)
+ {
+ return;
+ }
+
+ foreach (var handler in handlers.GetInvocationList())
+ {
+ Logger.Debug(e + " sent to " + handler.Target);
+ }
+
+ handlers(e);
+ }
+ catch (Exception exc)
+ {
+ Logger.Error("Failed to handle state changed event.", exc);
+ }
+ }
+
+ /// <summary>
+ /// Invokes subscribed handlers for ZooKeeeper session re-creates event
+ /// </summary>
+ /// <param name="e">
+ /// The event data.
+ /// </param>
+ private void OnSessionCreated(ZooKeeperSessionCreatedEventArgs e)
+ {
+ var handlers = this.sessionCreatedHandlers;
+ if (handlers == null)
+ {
+ return;
+ }
+
+ foreach (var handler in handlers.GetInvocationList())
+ {
+ Logger.Debug(e + " sent to " + handler.Target);
+ }
+
+ handlers(e);
+ }
+
+ /// <summary>
+ /// Invokes subscribed handlers for ZooKeeeper child changes event
+ /// </summary>
+ /// <param name="e">
+ /// The event data.
+ /// </param>
+ private void OnChildChanged(ZooKeeperChildChangedEventArgs e)
+ {
+ ChildChangedEventItem handlers;
+ this.childChangedHandlers.TryGetValue(e.Path, out handlers);
+ if (handlers == null || handlers.Count == 0)
+ {
+ return;
+ }
+
+ this.Exists(e.Path);
+ try
+ {
+ IList<string> children = this.GetChildren(e.Path);
+ e.Children = children;
+ }
+ catch (KeeperException.NoNodeException)
+ {
+ }
+
+ handlers.OnChildChanged(e);
+ }
+
+ /// <summary>
+ /// Invokes subscribed handlers for ZooKeeeper data changes event
+ /// </summary>
+ /// <param name="e">
+ /// The event data.
+ /// </param>
+ private void OnDataChanged(ZooKeeperDataChangedEventArgs e)
+ {
+ DataChangedEventItem handlers;
+ this.dataChangedHandlers.TryGetValue(e.Path, out handlers);
+ if (handlers == null || handlers.TotalCount == 0)
+ {
+ return;
+ }
+
+ try
+ {
+ this.Exists(e.Path, true);
+ var data = this.ReadData<string>(e.Path, null, true);
+ e.Data = data;
+ handlers.OnDataChanged(e);
+ }
+ catch (KeeperException.NoNodeException)
+ {
+ handlers.OnDataDeleted(e);
+ }
+ }
+ }
+}
Added: incubator/kafka/trunk/clients/csharp/src/Kafka/Kafka.Client/ZooKeeperIntegration/ZooKeeperClient.cs
URL: http://svn.apache.org/viewvc/incubator/kafka/trunk/clients/csharp/src/Kafka/Kafka.Client/ZooKeeperIntegration/ZooKeeperClient.cs?rev=1173797&view=auto
==============================================================================
--- incubator/kafka/trunk/clients/csharp/src/Kafka/Kafka.Client/ZooKeeperIntegration/ZooKeeperClient.cs (added)
+++ incubator/kafka/trunk/clients/csharp/src/Kafka/Kafka.Client/ZooKeeperIntegration/ZooKeeperClient.cs Wed Sep 21 19:17:19 2011
@@ -0,0 +1,891 @@
+/*
+ * Copyright 2011 LinkedIn
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+*/
+
+namespace Kafka.Client.ZooKeeperIntegration
+{
+ using System;
+ using System.Collections.Generic;
+ using System.Reflection;
+ using System.Threading;
+ using Kafka.Client.Exceptions;
+ using Kafka.Client.Utils;
+ using log4net;
+ using Org.Apache.Zookeeper.Data;
+ using ZooKeeperNet;
+
+ /// <summary>
+ /// Abstracts the interaction with zookeeper and allows permanent (not just one time) watches on nodes in ZooKeeper
+ /// </summary>
+ internal partial class ZooKeeperClient : IZooKeeperClient
+ {
+ private const int DefaultConnectionTimeout = int.MaxValue;
+ public const string DefaultConsumersPath = "/consumers";
+ public const string DefaultBrokerIdsPath = "/brokers/ids";
+ public const string DefaultBrokerTopicsPath = "/brokers/topics";
+ private static readonly ILog Logger = LogManager.GetLogger(MethodBase.GetCurrentMethod().DeclaringType);
+ private IZooKeeperConnection connection;
+ private bool shutdownTriggered;
+ private KeeperState currentState;
+ private readonly IZooKeeperSerializer serializer;
+ private readonly object stateChangedLock = new object();
+ private readonly object znodeChangedLock = new object();
+ private readonly object somethingChanged = new object();
+ private readonly object shuttingDownLock = new object();
+ private volatile bool disposed;
+ private readonly int connectionTimeout;
+
+ /// <summary>
+ /// Initializes a new instance of the <see cref="ZooKeeperClient"/> class.
+ /// </summary>
+ /// <param name="connection">
+ /// The connection to ZooKeeper.
+ /// </param>
+ /// <param name="serializer">
+ /// The given serializer.
+ /// </param>
+ /// <param name="connectionTimeout">
+ /// The connection timeout (in miliseconds). Default is infinitive.
+ /// </param>
+ /// <remarks>
+ /// Default serializer is string UTF-8 serializer
+ /// </remarks>
+ public ZooKeeperClient(
+ IZooKeeperConnection connection,
+ IZooKeeperSerializer serializer,
+ int connectionTimeout = DefaultConnectionTimeout)
+ {
+ this.serializer = serializer;
+ this.connection = connection;
+ this.connectionTimeout = connectionTimeout;
+ }
+
+ /// <summary>
+ /// Initializes a new instance of the <see cref="ZooKeeperClient"/> class.
+ /// </summary>
+ /// <param name="servers">
+ /// The list of ZooKeeper servers.
+ /// </param>
+ /// <param name="sessionTimeout">
+ /// The session timeout (in miliseconds).
+ /// </param>
+ /// <param name="serializer">
+ /// The given serializer.
+ /// </param>
+ /// <remarks>
+ /// Default serializer is string UTF-8 serializer.
+ /// It is recommended to use quite large sessions timeouts for ZooKeeper.
+ /// </remarks>
+ public ZooKeeperClient(string servers, int sessionTimeout, IZooKeeperSerializer serializer)
+ : this(new ZooKeeperConnection(servers, sessionTimeout), serializer)
+ {
+ }
+
+ /// <summary>
+ /// Initializes a new instance of the <see cref="ZooKeeperClient"/> class.
+ /// </summary>
+ /// <param name="servers">
+ /// The list of ZooKeeper servers.
+ /// </param>
+ /// <param name="sessionTimeout">
+ /// The session timeout (in miliseconds).
+ /// </param>
+ /// <param name="serializer">
+ /// The given serializer.
+ /// </param>
+ /// <param name="connectionTimeout">
+ /// The connection timeout (in miliseconds).
+ /// </param>
+ /// <remarks>
+ /// Default serializer is string UTF-8 serializer.
+ /// It is recommended to use quite large sessions timeouts for ZooKeeper.
+ /// </remarks>
+ public ZooKeeperClient(
+ string servers,
+ int sessionTimeout,
+ IZooKeeperSerializer serializer,
+ int connectionTimeout)
+ : this(new ZooKeeperConnection(servers, sessionTimeout), serializer, connectionTimeout)
+ {
+ }
+
+ /// <summary>
+ /// Connects to ZooKeeper server within given time period and installs watcher in ZooKeeper
+ /// </summary>
+ /// <remarks>
+ /// Also, starts background thread for event handling
+ /// </remarks>
+ public void Connect()
+ {
+ this.EnsuresNotDisposed();
+ bool started = false;
+ try
+ {
+ this.shutdownTriggered = false;
+ this.eventWorker = new Thread(this.RunEventWorker) { IsBackground = true };
+ this.eventWorker.Name = "ZooKeeperkWatcher-EventThread-" + this.eventWorker.ManagedThreadId + "-" + this.connection.Servers;
+ this.eventWorker.Start();
+ this.connection.Connect(this);
+ Logger.Debug("Awaiting connection to Zookeeper server");
+ if (!this.WaitUntilConnected(this.connectionTimeout))
+ {
+ throw new ZooKeeperException(
+ "Unable to connect to zookeeper server within timeout: " + this.connection.SessionTimeout);
+ }
+
+ started = true;
+ Logger.Debug("Connection to Zookeeper server established");
+ }
+ catch (ThreadInterruptedException)
+ {
+ throw new InvalidOperationException(
+ "Not connected with zookeeper server yet. Current state is " + this.connection.ClientState);
+ }
+ finally
+ {
+ if (!started)
+ {
+ this.Disconnect();
+ }
+ }
+ }
+
+ /// <summary>
+ /// Closes current connection to ZooKeeper
+ /// </summary>
+ /// <remarks>
+ /// Also, stops background thread
+ /// </remarks>
+ public void Disconnect()
+ {
+ Logger.Debug("Closing ZooKeeperClient...");
+ this.shutdownTriggered = true;
+ this.eventWorker.Interrupt();
+ this.eventWorker.Join(2000);
+ this.connection.Dispose();
+ this.connection = null;
+ }
+
+ /// <summary>
+ /// Re-connect to ZooKeeper server when session expired
+ /// </summary>
+ /// <param name="servers">
+ /// The servers.
+ /// </param>
+ /// <param name="connectionTimeout">
+ /// The connection timeout.
+ /// </param>
+ public void Reconnect(string servers, int connectionTimeout)
+ {
+ this.EnsuresNotDisposed();
+ Logger.Debug("Reconnecting");
+ this.connection.Dispose();
+ this.connection = new ZooKeeperConnection(servers, connectionTimeout);
+ this.connection.Connect(this);
+ Logger.Debug("Reconnected");
+ }
+
+ /// <summary>
+ /// Waits untill ZooKeeper connection is established
+ /// </summary>
+ /// <param name="connectionTimeout">
+ /// The connection timeout.
+ /// </param>
+ /// <returns>
+ /// Status
+ /// </returns>
+ public bool WaitUntilConnected(int connectionTimeout)
+ {
+ Guard.Assert<ArgumentOutOfRangeException>(() => connectionTimeout > 0);
+ this.EnsuresNotDisposed();
+ if (this.eventWorker != null && this.eventWorker == Thread.CurrentThread)
+ {
+ throw new InvalidOperationException("Must not be done in the ZooKeeper event thread.");
+ }
+
+ Logger.Debug("Waiting for keeper state: " + KeeperState.SyncConnected);
+ bool stillWaiting = true;
+ lock (this.stateChangedLock)
+ {
+ while (this.currentState != KeeperState.SyncConnected)
+ {
+ if (!stillWaiting)
+ {
+ return false;
+ }
+
+ stillWaiting = Monitor.Wait(this.stateChangedLock, connectionTimeout);
+ }
+
+ Logger.Debug("State is " + this.currentState);
+ }
+
+ return true;
+ }
+
+ /// <summary>
+ /// Retries given delegate until connections is established
+ /// </summary>
+ /// <param name="callback">
+ /// The delegate to invoke.
+ /// </param>
+ /// <typeparam name="T">
+ /// Type of data returned by delegate
+ /// </typeparam>
+ /// <returns>
+ /// data returned by delegate
+ /// </returns>
+ public T RetryUntilConnected<T>(Func<T> callback)
+ {
+ Guard.Assert<ArgumentNullException>(() => callback != null);
+ this.EnsuresNotDisposed();
+ if (this.zooKeeperEventWorker != null && this.zooKeeperEventWorker == Thread.CurrentThread)
+ {
+ throw new InvalidOperationException("Must not be done in the zookeeper event thread");
+ }
+
+ while (true)
+ {
+ try
+ {
+ return callback();
+ }
+ catch (KeeperException.ConnectionLossException)
+ {
+ Thread.Yield();
+ this.WaitUntilConnected(this.connection.SessionTimeout);
+ }
+ catch (KeeperException.SessionExpiredException)
+ {
+ Thread.Yield();
+ this.WaitUntilConnected(this.connection.SessionTimeout);
+ }
+ }
+ }
+
+ /// <summary>
+ /// Checks whether znode for a given path exists
+ /// </summary>
+ /// <param name="path">
+ /// The given path.
+ /// </param>
+ /// <returns>
+ /// Result of check
+ /// </returns>
+ /// <remarks>
+ /// Will reinstall watcher in ZooKeeper if any listener for given path exists
+ /// </remarks>
+ public bool Exists(string path)
+ {
+ Guard.Assert<ArgumentException>(() => !string.IsNullOrEmpty(path));
+
+ this.EnsuresNotDisposed();
+ bool hasListeners = this.HasListeners(path);
+ return this.Exists(path, hasListeners);
+ }
+
+ /// <summary>
+ /// Checks whether znode for a given path exists.
+ /// </summary>
+ /// <param name="path">
+ /// The given path.
+ /// </param>
+ /// <param name="watch">
+ /// Indicates whether should reinstall watcher in ZooKeeper.
+ /// </param>
+ /// <returns>
+ /// Result of check
+ /// </returns>
+ public bool Exists(string path, bool watch)
+ {
+ Guard.Assert<ArgumentException>(() => !string.IsNullOrEmpty(path));
+
+ this.EnsuresNotDisposed();
+ return this.RetryUntilConnected(
+ () => this.connection.Exists(path, watch));
+ }
+
+ /// <summary>
+ /// Gets all children for a given path
+ /// </summary>
+ /// <param name="path">
+ /// The given path.
+ /// </param>
+ /// <returns>
+ /// Children
+ /// </returns>
+ /// <remarks>
+ /// Will reinstall watcher in ZooKeeper if any listener for given path exists
+ /// </remarks>
+ public IList<string> GetChildren(string path)
+ {
+ Guard.Assert<ArgumentException>(() => !string.IsNullOrEmpty(path));
+
+ this.EnsuresNotDisposed();
+ bool hasListeners = this.HasListeners(path);
+ return this.GetChildren(path, hasListeners);
+ }
+
+ /// <summary>
+ /// Gets all children for a given path
+ /// </summary>
+ /// <param name="path">
+ /// The given path.
+ /// </param>
+ /// <param name="watch">
+ /// Indicates whether should reinstall watcher in ZooKeeper.
+ /// </param>
+ /// <returns>
+ /// Children
+ /// </returns>
+ public IList<string> GetChildren(string path, bool watch)
+ {
+ Guard.Assert<ArgumentException>(() => !string.IsNullOrEmpty(path));
+
+ this.EnsuresNotDisposed();
+ return this.RetryUntilConnected(
+ () => this.connection.GetChildren(path, watch));
+ }
+
+ /// <summary>
+ /// Counts number of children for a given path.
+ /// </summary>
+ /// <param name="path">
+ /// The given path.
+ /// </param>
+ /// <returns>
+ /// Number of children
+ /// </returns>
+ /// <remarks>
+ /// Will reinstall watcher in ZooKeeper if any listener for given path exists.
+ /// Returns 0 if path does not exist
+ /// </remarks>
+ public int CountChildren(string path)
+ {
+ Guard.Assert<ArgumentException>(() => !string.IsNullOrEmpty(path));
+
+ this.EnsuresNotDisposed();
+ try
+ {
+ return this.GetChildren(path).Count;
+ }
+ catch (KeeperException.NoNodeException)
+ {
+ return 0;
+ }
+ }
+
+ /// <summary>
+ /// Fetches data from a given path in ZooKeeper
+ /// </summary>
+ /// <param name="path">
+ /// The given path.
+ /// </param>
+ /// <param name="stats">
+ /// The statistics.
+ /// </param>
+ /// <param name="watch">
+ /// Indicates whether should reinstall watcher in ZooKeeper.
+ /// </param>
+ /// <typeparam name="T">
+ /// Expected type of data
+ /// </typeparam>
+ /// <returns>
+ /// Data
+ /// </returns>
+ /// <remarks>
+ /// Uses given serializer to deserialize data
+ /// Use null for stats
+ /// </remarks>
+ public T ReadData<T>(string path, Stat stats, bool watch)
+ where T : class
+ {
+ Guard.Assert<ArgumentException>(() => !string.IsNullOrEmpty(path));
+
+ this.EnsuresNotDisposed();
+ byte[] bytes = this.RetryUntilConnected(
+ () => this.connection.ReadData(path, stats, watch));
+ return this.serializer.Deserialize(bytes) as T;
+ }
+
+ /// <summary>
+ /// Fetches data from a given path in ZooKeeper
+ /// </summary>
+ /// <param name="path">
+ /// The given path.
+ /// </param>
+ /// <param name="stats">
+ /// The statistics.
+ /// </param>
+ /// <typeparam name="T">
+ /// Expected type of data
+ /// </typeparam>
+ /// <returns>
+ /// Data
+ /// </returns>
+ /// <remarks>
+ /// Uses given serializer to deserialize data.
+ /// Will reinstall watcher in ZooKeeper if any listener for given path exists.
+ /// Use null for stats
+ /// </remarks>
+ public T ReadData<T>(string path, Stat stats) where T : class
+ {
+ Guard.Assert<ArgumentException>(() => !string.IsNullOrEmpty(path));
+
+ this.EnsuresNotDisposed();
+ bool hasListeners = this.HasListeners(path);
+ return this.ReadData<T>(path, null, hasListeners);
+ }
+
+ /// <summary>
+ /// Writes data for a given path
+ /// </summary>
+ /// <param name="path">
+ /// The given path.
+ /// </param>
+ /// <param name="data">
+ /// The data to write.
+ /// </param>
+ public void WriteData(string path, object data)
+ {
+ Guard.Assert<ArgumentException>(() => !string.IsNullOrEmpty(path));
+
+ this.EnsuresNotDisposed();
+ this.WriteData(path, data, -1);
+ }
+
+ /// <summary>
+ /// Writes data for a given path
+ /// </summary>
+ /// <param name="path">
+ /// The given path.
+ /// </param>
+ /// <param name="data">
+ /// The data to write.
+ /// </param>
+ /// <param name="expectedVersion">
+ /// Expected version of data
+ /// </param>
+ /// <remarks>
+ /// Use -1 for expected version
+ /// </remarks>
+ public void WriteData(string path, object data, int expectedVersion)
+ {
+ Guard.Assert<ArgumentException>(() => !string.IsNullOrEmpty(path));
+
+ this.EnsuresNotDisposed();
+ byte[] bytes = this.serializer.Serialize(data);
+ this.RetryUntilConnected(
+ () =>
+ {
+ this.connection.WriteData(path, bytes, expectedVersion);
+ return null as object;
+ });
+ }
+
+ /// <summary>
+ /// Deletes znode for a given path
+ /// </summary>
+ /// <param name="path">
+ /// The given path.
+ /// </param>
+ /// <returns>
+ /// Status
+ /// </returns>
+ public bool Delete(string path)
+ {
+ Guard.Assert<ArgumentException>(() => !string.IsNullOrEmpty(path));
+
+ this.EnsuresNotDisposed();
+ return this.RetryUntilConnected(
+ () =>
+ {
+ try
+ {
+ this.connection.Delete(path);
+ return true;
+ }
+ catch (KeeperException.NoNodeException)
+ {
+ return false;
+ }
+ });
+ }
+
+ /// <summary>
+ /// Deletes znode and his children for a given path
+ /// </summary>
+ /// <param name="path">
+ /// The given path.
+ /// </param>
+ /// <returns>
+ /// Status
+ /// </returns>
+ public bool DeleteRecursive(string path)
+ {
+ Guard.Assert<ArgumentException>(() => !string.IsNullOrEmpty(path));
+
+ this.EnsuresNotDisposed();
+ IList<string> children;
+ try
+ {
+ children = this.GetChildren(path, false);
+ }
+ catch (KeeperException.NoNodeException)
+ {
+ return true;
+ }
+
+ foreach (var child in children)
+ {
+ if (!this.DeleteRecursive(path + "/" + child))
+ {
+ return false;
+ }
+ }
+
+ return this.Delete(path);
+ }
+
+ /// <summary>
+ /// Creates persistent znode and all intermediate znodes (if do not exist) for a given path
+ /// </summary>
+ /// <param name="path">
+ /// The given path.
+ /// </param>
+ public void MakeSurePersistentPathExists(string path)
+ {
+ Guard.Assert<ArgumentException>(() => !string.IsNullOrEmpty(path));
+
+ this.EnsuresNotDisposed();
+ if (!this.Exists(path))
+ {
+ this.CreatePersistent(path, true);
+ }
+ }
+
+ /// <summary>
+ /// Fetches children for a given path
+ /// </summary>
+ /// <param name="path">
+ /// The path.
+ /// </param>
+ /// <returns>
+ /// Children or null, if znode does not exist
+ /// </returns>
+ public IList<string> GetChildrenParentMayNotExist(string path)
+ {
+ Guard.Assert<ArgumentException>(() => !string.IsNullOrEmpty(path));
+
+ this.EnsuresNotDisposed();
+ try
+ {
+ return this.GetChildren(path);
+ }
+ catch (KeeperException.NoNodeException)
+ {
+ return null;
+ }
+ }
+
+ /// <summary>
+ /// Fetches data from a given path in ZooKeeper
+ /// </summary>
+ /// <typeparam name="T">
+ /// Expected type of data
+ /// </typeparam>
+ /// <param name="path">
+ /// The given path.
+ /// </param>
+ /// <returns>
+ /// Data or null, if znode does not exist
+ /// </returns>
+ public T ReadData<T>(string path)
+ where T : class
+ {
+ Guard.Assert<ArgumentException>(() => !string.IsNullOrEmpty(path));
+
+ this.EnsuresNotDisposed();
+ return this.ReadData<T>(path, false);
+ }
+
+ /// <summary>
+ /// Closes connection to ZooKeeper
+ /// </summary>
+ public void Dispose()
+ {
+ if (this.disposed)
+ {
+ return;
+ }
+
+ lock (this.shuttingDownLock)
+ {
+ if (this.disposed)
+ {
+ return;
+ }
+
+ this.disposed = true;
+ }
+
+ try
+ {
+ this.Disconnect();
+ }
+ catch (ThreadInterruptedException)
+ {
+ }
+ catch (Exception exc)
+ {
+ Logger.Debug("Ignoring unexpected errors on closing ZooKeeperClient", exc);
+ }
+
+ Logger.Debug("Closing ZooKeeperClient... done");
+ }
+
+ /// <summary>
+ /// Creates a persistent znode for a given path
+ /// </summary>
+ /// <param name="path">
+ /// The given path.
+ /// </param>
+ /// <param name="createParents">
+ /// Indicates, whether should create missing intermediate znodes
+ /// </param>
+ /// <remarks>
+ /// Persistent znodes won't disappear after session close
+ /// </remarks>
+ public void CreatePersistent(string path, bool createParents)
+ {
+ Guard.Assert<ArgumentException>(() => !string.IsNullOrEmpty(path));
+
+ this.EnsuresNotDisposed();
+ try
+ {
+ this.Create(path, null, CreateMode.Persistent);
+ }
+ catch (KeeperException.NodeExistsException)
+ {
+ if (!createParents)
+ {
+ throw;
+ }
+ }
+ catch (KeeperException.NoNodeException)
+ {
+ if (!createParents)
+ {
+ throw;
+ }
+
+ string parentDir = path.Substring(0, path.LastIndexOf('/'));
+ this.CreatePersistent(parentDir, true);
+ this.CreatePersistent(path, true);
+ }
+ }
+
+ /// <summary>
+ /// Creates a persistent znode for a given path
+ /// </summary>
+ /// <param name="path">
+ /// The given path.
+ /// </param>
+ /// <remarks>
+ /// Persistent znodes won't disappear after session close
+ /// Doesn't re-create missing intermediate znodes
+ /// </remarks>
+ public void CreatePersistent(string path)
+ {
+ Guard.Assert<ArgumentException>(() => !string.IsNullOrEmpty(path));
+
+ this.CreatePersistent(path, false);
+ }
+
+ /// <summary>
+ /// Creates a persistent znode for a given path and writes data into it
+ /// </summary>
+ /// <param name="path">
+ /// The given path.
+ /// </param>
+ /// <param name="data">
+ /// The data to write.
+ /// </param>
+ /// <remarks>
+ /// Persistent znodes won't disappear after session close
+ /// Doesn't re-create missing intermediate znodes
+ /// </remarks>
+ public void CreatePersistent(string path, object data)
+ {
+ Guard.Assert<ArgumentException>(() => !string.IsNullOrEmpty(path));
+
+ this.Create(path, data, CreateMode.Persistent);
+ }
+
+ /// <summary>
+ /// Creates a sequential, persistent znode for a given path and writes data into it
+ /// </summary>
+ /// <param name="path">
+ /// The given path.
+ /// </param>
+ /// <param name="data">
+ /// The data to write.
+ /// </param>
+ /// <remarks>
+ /// Persistent znodes won't disappear after session close
+ /// Doesn't re-create missing intermediate znodes
+ /// </remarks>
+ /// <returns>
+ /// The created znode's path
+ /// </returns>
+ public string CreatePersistentSequential(string path, object data)
+ {
+ Guard.Assert<ArgumentException>(() => !string.IsNullOrEmpty(path));
+
+ return this.Create(path, data, CreateMode.PersistentSequential);
+ }
+
+ /// <summary>
+ /// Helper method to create znode
+ /// </summary>
+ /// <param name="path">
+ /// The given path.
+ /// </param>
+ /// <param name="data">
+ /// The data to write.
+ /// </param>
+ /// <param name="mode">
+ /// The create mode.
+ /// </param>
+ /// <returns>
+ /// The created znode's path
+ /// </returns>
+ private string Create(string path, object data, CreateMode mode)
+ {
+ if (path == null)
+ {
+ throw new ArgumentNullException("Path must not be null");
+ }
+
+ byte[] bytes = data == null ? null : this.serializer.Serialize(data);
+ return this.RetryUntilConnected(() =>
+ this.connection.Create(path, bytes, mode));
+ }
+
+ /// <summary>
+ /// Creates a ephemeral znode for a given path
+ /// </summary>
+ /// <param name="path">
+ /// The given path.
+ /// </param>
+ /// <remarks>
+ /// Ephemeral znodes will disappear after session close
+ /// </remarks>
+ public void CreateEphemeral(string path)
+ {
+ Guard.Assert<ArgumentException>(() => !string.IsNullOrEmpty(path));
+
+ this.Create(path, null, CreateMode.Ephemeral);
+ }
+
+ /// <summary>
+ /// Creates a ephemeral znode for a given path and writes data into it
+ /// </summary>
+ /// <param name="path">
+ /// The given path.
+ /// </param>
+ /// <param name="data">
+ /// The data to write.
+ /// </param>
+ /// <remarks>
+ /// Ephemeral znodes will disappear after session close
+ /// </remarks>
+ public void CreateEphemeral(string path, object data)
+ {
+ Guard.Assert<ArgumentException>(() => !string.IsNullOrEmpty(path));
+
+ this.Create(path, data, CreateMode.Ephemeral);
+ }
+
+ /// <summary>
+ /// Creates a ephemeral, sequential znode for a given path and writes data into it
+ /// </summary>
+ /// <param name="path">
+ /// The given path.
+ /// </param>
+ /// <param name="data">
+ /// The data to write.
+ /// </param>
+ /// <remarks>
+ /// Ephemeral znodes will disappear after session close
+ /// </remarks>
+ /// <returns>
+ /// Created znode's path
+ /// </returns>
+ public string CreateEphemeralSequential(string path, object data)
+ {
+ Guard.Assert<ArgumentException>(() => !string.IsNullOrEmpty(path));
+
+ return this.Create(path, data, CreateMode.EphemeralSequential);
+ }
+
+ /// <summary>
+ /// Fetches data for given path
+ /// </summary>
+ /// <param name="path">
+ /// The given path.
+ /// </param>
+ /// <param name="returnNullIfPathNotExists">
+ /// Indicates, whether should return null or throw exception when
+ /// znode doesn't exist
+ /// </param>
+ /// <typeparam name="T">
+ /// Expected type of data
+ /// </typeparam>
+ /// <returns>
+ /// Data
+ /// </returns>
+ public T ReadData<T>(string path, bool returnNullIfPathNotExists)
+ where T : class
+ {
+ Guard.Assert<ArgumentException>(() => !string.IsNullOrEmpty(path));
+
+ try
+ {
+ return this.ReadData<T>(path, null);
+ }
+ catch (KeeperException.NoNodeException)
+ {
+ if (!returnNullIfPathNotExists)
+ {
+ throw;
+ }
+
+ return null;
+ }
+ }
+
+ /// <summary>
+ /// Ensures that object wasn't disposed
+ /// </summary>
+ private void EnsuresNotDisposed()
+ {
+ if (this.disposed)
+ {
+ throw new ObjectDisposedException(this.GetType().Name);
+ }
+ }
+ }
+}
Added: incubator/kafka/trunk/clients/csharp/src/Kafka/Kafka.Client/ZooKeeperIntegration/ZooKeeperConnection.cs
URL: http://svn.apache.org/viewvc/incubator/kafka/trunk/clients/csharp/src/Kafka/Kafka.Client/ZooKeeperIntegration/ZooKeeperConnection.cs?rev=1173797&view=auto
==============================================================================
--- incubator/kafka/trunk/clients/csharp/src/Kafka/Kafka.Client/ZooKeeperIntegration/ZooKeeperConnection.cs (added)
+++ incubator/kafka/trunk/clients/csharp/src/Kafka/Kafka.Client/ZooKeeperIntegration/ZooKeeperConnection.cs Wed Sep 21 19:17:19 2011
@@ -0,0 +1,326 @@
+/*
+ * Copyright 2011 LinkedIn
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+*/
+
+namespace Kafka.Client.ZooKeeperIntegration
+{
+ using System;
+ using System.Collections.Generic;
+ using System.IO;
+ using System.Reflection;
+ using Kafka.Client.Exceptions;
+ using Kafka.Client.Utils;
+ using log4net;
+ using Org.Apache.Zookeeper.Data;
+ using ZooKeeperNet;
+
+ /// <summary>
+ /// Abstracts connection with ZooKeeper server
+ /// </summary>
+ internal class ZooKeeperConnection : IZooKeeperConnection
+ {
+ private static readonly ILog Logger = LogManager.GetLogger(MethodBase.GetCurrentMethod().DeclaringType);
+
+ public const int DefaultSessionTimeout = 30000;
+
+ private readonly object syncLock = new object();
+
+ private readonly object shuttingDownLock = new object();
+
+ private volatile bool disposed;
+
+ /// <summary>
+ /// Initializes a new instance of the <see cref="ZooKeeperConnection"/> class.
+ /// </summary>
+ /// <param name="servers">
+ /// The list of ZooKeeper servers.
+ /// </param>
+ public ZooKeeperConnection(string servers)
+ : this(servers, DefaultSessionTimeout)
+ {
+ }
+
+ /// <summary>
+ /// Initializes a new instance of the <see cref="ZooKeeperConnection"/> class.
+ /// </summary>
+ /// <param name="servers">
+ /// The list of ZooKeeper servers.
+ /// </param>
+ /// <param name="sessionTimeout">
+ /// The session timeout.
+ /// </param>
+ public ZooKeeperConnection(string servers, int sessionTimeout)
+ {
+ this.Servers = servers;
+ this.SessionTimeout = sessionTimeout;
+ }
+
+ /// <summary>
+ /// Gets the list of ZooKeeper servers.
+ /// </summary>
+ public string Servers { get; private set; }
+
+ /// <summary>
+ /// Gets the ZooKeeper session timeout
+ /// </summary>
+ public int SessionTimeout { get; private set; }
+
+ /// <summary>
+ /// Gets ZooKeeper client.
+ /// </summary>
+ public ZooKeeper Client { get; private set; }
+
+ /// <summary>
+ /// Gets the ZooKeeper client state
+ /// </summary>
+ public ZooKeeper.States ClientState
+ {
+ get
+ {
+ return this.Client == null ? null : this.Client.State;
+ }
+ }
+
+ /// <summary>
+ /// Connects to ZooKeeper server
+ /// </summary>
+ /// <param name="watcher">
+ /// The watcher to be installed in ZooKeeper.
+ /// </param>
+ public void Connect(IWatcher watcher)
+ {
+ this.EnsuresNotDisposed();
+ lock (this.syncLock)
+ {
+ if (this.Client != null)
+ {
+ throw new InvalidOperationException("ZooKeeper client has already been started");
+ }
+
+ try
+ {
+ Logger.Debug("Starting ZK client");
+ this.Client = new ZooKeeper(this.Servers, new TimeSpan(0, 0, 0, 0, this.SessionTimeout), watcher);
+ }
+ catch (IOException exc)
+ {
+ throw new ZooKeeperException("Unable to connect to " + this.Servers, exc);
+ }
+ }
+ }
+
+ /// <summary>
+ /// Deletes znode for a given path
+ /// </summary>
+ /// <param name="path">
+ /// The given path.
+ /// </param>
+ public void Delete(string path)
+ {
+ Guard.Assert<ArgumentException>(() => !string.IsNullOrEmpty(path));
+
+ this.EnsuresNotDisposed();
+ this.Client.Delete(path, -1);
+ }
+
+ /// <summary>
+ /// Checks whether znode for a given path exists.
+ /// </summary>
+ /// <param name="path">
+ /// The given path.
+ /// </param>
+ /// <param name="watch">
+ /// Indicates whether should reinstall watcher in ZooKeeper.
+ /// </param>
+ /// <returns>
+ /// Result of check
+ /// </returns>
+ public bool Exists(string path, bool watch)
+ {
+ Guard.Assert<ArgumentException>(() => !string.IsNullOrEmpty(path));
+
+ this.EnsuresNotDisposed();
+ return this.Client.Exists(path, true) != null;
+ }
+
+ /// <summary>
+ /// Creates znode using given create mode for given path and writes given data to it
+ /// </summary>
+ /// <param name="path">
+ /// The given path.
+ /// </param>
+ /// <param name="data">
+ /// The data to write.
+ /// </param>
+ /// <param name="mode">
+ /// The create mode.
+ /// </param>
+ /// <returns>
+ /// The created znode's path
+ /// </returns>
+ public string Create(string path, byte[] data, CreateMode mode)
+ {
+ Guard.Assert<ArgumentException>(() => !string.IsNullOrEmpty(path));
+
+ this.EnsuresNotDisposed();
+ return this.Client.Create(path, data, Ids.OPEN_ACL_UNSAFE, mode);
+ }
+
+ /// <summary>
+ /// Gets all children for a given path
+ /// </summary>
+ /// <param name="path">
+ /// The given path.
+ /// </param>
+ /// <param name="watch">
+ /// Indicates whether should reinstall watcher in ZooKeeper.
+ /// </param>
+ /// <returns>
+ /// Children
+ /// </returns>
+ public IList<string> GetChildren(string path, bool watch)
+ {
+ Guard.Assert<ArgumentException>(() => !string.IsNullOrEmpty(path));
+
+ this.EnsuresNotDisposed();
+ return this.Client.GetChildren(path, watch);
+ }
+
+ /// <summary>
+ /// Fetches data from a given path in ZooKeeper
+ /// </summary>
+ /// <param name="path">
+ /// The given path.
+ /// </param>
+ /// <param name="stats">
+ /// The statistics.
+ /// </param>
+ /// <param name="watch">
+ /// Indicates whether should reinstall watcher in ZooKeeper.
+ /// </param>
+ /// <returns>
+ /// Data
+ /// </returns>
+ public byte[] ReadData(string path, Stat stats, bool watch)
+ {
+ Guard.Assert<ArgumentException>(() => !string.IsNullOrEmpty(path));
+
+ this.EnsuresNotDisposed();
+ return this.Client.GetData(path, watch, stats);
+ }
+
+ /// <summary>
+ /// Writes data for a given path
+ /// </summary>
+ /// <param name="path">
+ /// The given path.
+ /// </param>
+ /// <param name="data">
+ /// The data to write.
+ /// </param>
+ public void WriteData(string path, byte[] data)
+ {
+ Guard.Assert<ArgumentException>(() => !string.IsNullOrEmpty(path));
+
+ this.EnsuresNotDisposed();
+ this.WriteData(path, data, -1);
+ }
+
+ /// <summary>
+ /// Writes data for a given path
+ /// </summary>
+ /// <param name="path">
+ /// The given path.
+ /// </param>
+ /// <param name="data">
+ /// The data to write.
+ /// </param>
+ /// <param name="version">
+ /// Expected version of data
+ /// </param>
+ public void WriteData(string path, byte[] data, int version)
+ {
+ Guard.Assert<ArgumentException>(() => !string.IsNullOrEmpty(path));
+
+ this.EnsuresNotDisposed();
+ this.Client.SetData(path, data, version);
+ }
+
+ /// <summary>
+ /// Gets time when connetion was created
+ /// </summary>
+ /// <param name="path">
+ /// The path.
+ /// </param>
+ /// <returns>
+ /// Connection creation time
+ /// </returns>
+ public long GetCreateTime(string path)
+ {
+ Guard.Assert<ArgumentException>(() => !string.IsNullOrEmpty(path));
+
+ this.EnsuresNotDisposed();
+ Stat stats = this.Client.Exists(path, false);
+ return stats != null ? stats.Ctime : -1;
+ }
+
+ /// <summary>
+ /// Closes underlying ZooKeeper client
+ /// </summary>
+ public void Dispose()
+ {
+ if (this.disposed)
+ {
+ return;
+ }
+
+ lock (this.shuttingDownLock)
+ {
+ if (this.disposed)
+ {
+ return;
+ }
+
+ this.disposed = true;
+ }
+
+ try
+ {
+ if (this.Client != null)
+ {
+ Logger.Debug("Closing ZooKeeper client connected to " + this.Servers);
+ this.Client.Dispose();
+ this.Client = null;
+ Logger.Debug("ZooKeeper client connection closed");
+ }
+ }
+ catch (Exception exc)
+ {
+ Logger.Warn("Ignoring unexpected errors on closing", exc);
+ }
+ }
+
+ /// <summary>
+ /// Ensures object wasn't disposed
+ /// </summary>
+ private void EnsuresNotDisposed()
+ {
+ if (this.disposed)
+ {
+ throw new ObjectDisposedException(this.GetType().Name);
+ }
+ }
+ }
+}
Added: incubator/kafka/trunk/clients/csharp/src/Kafka/Kafka.Client/ZooKeeperIntegration/ZooKeeperStringSerializer.cs
URL: http://svn.apache.org/viewvc/incubator/kafka/trunk/clients/csharp/src/Kafka/Kafka.Client/ZooKeeperIntegration/ZooKeeperStringSerializer.cs?rev=1173797&view=auto
==============================================================================
--- incubator/kafka/trunk/clients/csharp/src/Kafka/Kafka.Client/ZooKeeperIntegration/ZooKeeperStringSerializer.cs (added)
+++ incubator/kafka/trunk/clients/csharp/src/Kafka/Kafka.Client/ZooKeeperIntegration/ZooKeeperStringSerializer.cs Wed Sep 21 19:17:19 2011
@@ -0,0 +1,71 @@
+/*
+ * Copyright 2011 LinkedIn
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+*/
+
+namespace Kafka.Client.ZooKeeperIntegration
+{
+ using System;
+ using System.Linq;
+ using System.Text;
+ using Kafka.Client.Utils;
+
+ /// <summary>
+ /// Zookeeper is able to store data in form of byte arrays. This interfacte is a bridge between those byte-array format
+ /// and higher level objects.
+ /// </summary>
+ internal class ZooKeeperStringSerializer : IZooKeeperSerializer
+ {
+ public static readonly ZooKeeperStringSerializer Serializer = new ZooKeeperStringSerializer();
+
+ /// <summary>
+ /// Prevents a default instance of the <see cref="ZooKeeperStringSerializer"/> class from being created.
+ /// </summary>
+ private ZooKeeperStringSerializer()
+ {
+ }
+
+ /// <summary>
+ /// Serializes data using UTF-8 encoding
+ /// </summary>
+ /// <param name="obj">
+ /// The data to serialize
+ /// </param>
+ /// <returns>
+ /// Serialized data
+ /// </returns>
+ public byte[] Serialize(object obj)
+ {
+ Guard.Assert<ArgumentNullException>(() => obj != null);
+ return Encoding.UTF8.GetBytes(obj.ToString());
+ }
+
+ /// <summary>
+ /// Deserializes data using UTF-8 encoding
+ /// </summary>
+ /// <param name="bytes">
+ /// The serialized data
+ /// </param>
+ /// <returns>
+ /// The deserialized data
+ /// </returns>
+ public object Deserialize(byte[] bytes)
+ {
+ Guard.Assert<ArgumentNullException>(() => bytes != null);
+ Guard.Assert<ArgumentException>(() => bytes.Count() > 0);
+
+ return bytes == null ? null : Encoding.UTF8.GetString(bytes);
+ }
+ }
+}
Added: incubator/kafka/trunk/clients/csharp/src/Kafka/Kafka.FxCop
URL: http://svn.apache.org/viewvc/incubator/kafka/trunk/clients/csharp/src/Kafka/Kafka.FxCop?rev=1173797&view=auto
==============================================================================
--- incubator/kafka/trunk/clients/csharp/src/Kafka/Kafka.FxCop (added)
+++ incubator/kafka/trunk/clients/csharp/src/Kafka/Kafka.FxCop Wed Sep 21 19:17:19 2011
@@ -0,0 +1,104 @@
+<?xml version="1.0" encoding="utf-8"?>
+<FxCopProject Version="10.0" Name="Kafka">
+ <ProjectOptions>
+ <SharedProject>True</SharedProject>
+ <Stylesheet Apply="False">$(FxCopDir)\Xml\FxCopReport.xsl</Stylesheet>
+ <SaveMessages>
+ <Project Status="None" NewOnly="False" />
+ <Report Status="Active" NewOnly="False" />
+ </SaveMessages>
+ <ProjectFile Compress="True" DefaultTargetCheck="True" DefaultRuleCheck="True" SaveByRuleGroup="" Deterministic="True" />
+ <EnableMultithreadedLoad>True</EnableMultithreadedLoad>
+ <EnableMultithreadedAnalysis>True</EnableMultithreadedAnalysis>
+ <SourceLookup>True</SourceLookup>
+ <AnalysisExceptionsThreshold>10</AnalysisExceptionsThreshold>
+ <RuleExceptionsThreshold>1</RuleExceptionsThreshold>
+ <Spelling Locale="en-US" />
+ <OverrideRuleVisibilities>False</OverrideRuleVisibilities>
+ <CustomDictionaries SearchFxCopDir="True" SearchUserProfile="True" SearchProjectDir="True" />
+ <SearchGlobalAssemblyCache>True</SearchGlobalAssemblyCache>
+ <DeadlockDetectionTimeout>120</DeadlockDetectionTimeout>
+ <IgnoreGeneratedCode>True</IgnoreGeneratedCode>
+ </ProjectOptions>
+ <Targets>
+ <Target Name="$(ProjectDir)/Kafka.Client/bin/Integration/Kafka.Client.dll" Analyze="True" AnalyzeAllChildren="True" />
+ </Targets>
+ <Rules>
+ <RuleFiles>
+ <RuleFile Name="$(FxCopDir)\Rules\DesignRules.dll" Enabled="True" AllRulesEnabled="False">
+ <Rule Name="AbstractTypesShouldNotHaveConstructors" Enabled="True" />
+ <Rule Name="AvoidEmptyInterfaces" Enabled="True" />
+ <Rule Name="AvoidExcessiveParametersOnGenericTypes" Enabled="True" />
+ <Rule Name="AvoidNamespacesWithFewTypes" Enabled="True" />
+ <Rule Name="AvoidOutParameters" Enabled="True" />
+ <Rule Name="CollectionsShouldImplementGenericInterface" Enabled="True" />
+ <Rule Name="ConsiderPassingBaseTypesAsParameters" Enabled="True" />
+ <Rule Name="DeclareEventHandlersCorrectly" Enabled="True" />
+ <Rule Name="DeclareTypesInNamespaces" Enabled="True" />
+ <Rule Name="DefaultParametersShouldNotBeUsed" Enabled="True" />
+ <Rule Name="DefineAccessorsForAttributeArguments" Enabled="True" />
+ <Rule Name="DoNotCatchGeneralExceptionTypes" Enabled="True" />
+ <Rule Name="DoNotDeclareProtectedMembersInSealedTypes" Enabled="True" />
+ <Rule Name="DoNotDeclareStaticMembersOnGenericTypes" Enabled="True" />
+ <Rule Name="DoNotDeclareVirtualMembersInSealedTypes" Enabled="True" />
+ <Rule Name="DoNotDeclareVisibleInstanceFields" Enabled="True" />
+ <Rule Name="DoNotExposeGenericLists" Enabled="True" />
+ <Rule Name="DoNotHideBaseClassMethods" Enabled="True" />
+ <Rule Name="DoNotNestGenericTypesInMemberSignatures" Enabled="True" />
+ <Rule Name="DoNotOverloadOperatorEqualsOnReferenceTypes" Enabled="True" />
+ <Rule Name="DoNotPassTypesByReference" Enabled="True" />
+ <Rule Name="DoNotRaiseExceptionsInUnexpectedLocations" Enabled="True" />
+ <Rule Name="EnumeratorsShouldBeStronglyTyped" Enabled="True" />
+ <Rule Name="EnumsShouldHaveZeroValue" Enabled="True" />
+ <Rule Name="EnumStorageShouldBeInt32" Enabled="True" />
+ <Rule Name="ExceptionsShouldBePublic" Enabled="True" />
+ <Rule Name="GenericMethodsShouldProvideTypeParameter" Enabled="True" />
+ <Rule Name="ICollectionImplementationsHaveStronglyTypedMembers" Enabled="True" />
+ <Rule Name="ImplementIDisposableCorrectly" Enabled="True" />
+ <Rule Name="ImplementStandardExceptionConstructors" Enabled="True" />
+ <Rule Name="IndexersShouldNotBeMultidimensional" Enabled="True" />
+ <Rule Name="InterfaceMethodsShouldBeCallableByChildTypes" Enabled="True" />
+ <Rule Name="ListsAreStronglyTyped" Enabled="True" />
+ <Rule Name="MarkAssembliesWithAssemblyVersion" Enabled="True" />
+ <Rule Name="MarkAssembliesWithClsCompliant" Enabled="True" />
+ <Rule Name="MarkAssembliesWithComVisible" Enabled="True" />
+ <Rule Name="MarkAttributesWithAttributeUsage" Enabled="True" />
+ <Rule Name="MarkEnumsWithFlags" Enabled="True" />
+ <Rule Name="MembersShouldNotExposeCertainConcreteTypes" Enabled="True" />
+ <Rule Name="MovePInvokesToNativeMethodsClass" Enabled="True" />
+ <Rule Name="NestedTypesShouldNotBeVisible" Enabled="True" />
+ <Rule Name="OverloadOperatorEqualsOnOverloadingAddAndSubtract" Enabled="True" />
+ <Rule Name="OverrideMethodsOnComparableTypes" Enabled="True" />
+ <Rule Name="PropertiesShouldNotBeWriteOnly" Enabled="True" />
+ <Rule Name="ProvideObsoleteAttributeMessage" Enabled="True" />
+ <Rule Name="ReplaceRepetitiveArgumentsWithParamsArray" Enabled="True" />
+ <Rule Name="StaticHolderTypesShouldBeSealed" Enabled="True" />
+ <Rule Name="StaticHolderTypesShouldNotHaveConstructors" Enabled="True" />
+ <Rule Name="StringUriOverloadsCallSystemUriOverloads" Enabled="True" />
+ <Rule Name="TypesShouldNotExtendCertainBaseTypes" Enabled="True" />
+ <Rule Name="TypesThatOwnDisposableFieldsShouldBeDisposable" Enabled="True" />
+ <Rule Name="TypesThatOwnNativeResourcesShouldBeDisposable" Enabled="True" />
+ <Rule Name="UriParametersShouldNotBeStrings" Enabled="True" />
+ <Rule Name="UriPropertiesShouldNotBeStrings" Enabled="True" />
+ <Rule Name="UriReturnValuesShouldNotBeStrings" Enabled="True" />
+ <Rule Name="UseEventsWhereAppropriate" Enabled="True" />
+ <Rule Name="UseGenericEventHandlerInstances" Enabled="True" />
+ <Rule Name="UseGenericsWhereAppropriate" Enabled="True" />
+ <Rule Name="UseIntegralOrStringArgumentForIndexers" Enabled="True" />
+ <Rule Name="UsePropertiesWhereAppropriate" Enabled="True" />
+ </RuleFile>
+ <RuleFile Name="$(FxCopDir)\Rules\GlobalizationRules.dll" Enabled="True" AllRulesEnabled="True" />
+ <RuleFile Name="$(FxCopDir)\Rules\InteroperabilityRules.dll" Enabled="True" AllRulesEnabled="True" />
+ <RuleFile Name="$(FxCopDir)\Rules\MobilityRules.dll" Enabled="True" AllRulesEnabled="True" />
+ <RuleFile Name="$(FxCopDir)\Rules\NamingRules.dll" Enabled="True" AllRulesEnabled="True" />
+ <RuleFile Name="$(FxCopDir)\Rules\PerformanceRules.dll" Enabled="True" AllRulesEnabled="True" />
+ <RuleFile Name="$(FxCopDir)\Rules\PortabilityRules.dll" Enabled="True" AllRulesEnabled="True" />
+ <RuleFile Name="$(FxCopDir)\Rules\SecurityRules.dll" Enabled="True" AllRulesEnabled="True" />
+ <RuleFile Name="$(FxCopDir)\Rules\SecurityTransparencyRules.dll" Enabled="True" AllRulesEnabled="True" />
+ <RuleFile Name="$(FxCopDir)\Rules\UsageRules.dll" Enabled="True" AllRulesEnabled="True" />
+ </RuleFiles>
+ <Groups />
+ <Settings />
+ </Rules>
+ <FxCopReport Version="10.0" />
+</FxCopProject>
Modified: incubator/kafka/trunk/clients/csharp/src/Kafka/Kafka.sln
URL: http://svn.apache.org/viewvc/incubator/kafka/trunk/clients/csharp/src/Kafka/Kafka.sln?rev=1173797&r1=1173796&r2=1173797&view=diff
==============================================================================
--- incubator/kafka/trunk/clients/csharp/src/Kafka/Kafka.sln (original)
+++ incubator/kafka/trunk/clients/csharp/src/Kafka/Kafka.sln Wed Sep 21 19:17:19 2011
@@ -1,38 +1,45 @@
-
-Microsoft Visual Studio Solution File, Format Version 11.00
-# Visual Studio 2010
-Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "Kafka.Client", "Kafka.Client\Kafka.Client.csproj", "{A92DD03B-EE4F-4A78-9FB2-279B6348C7D2}"
-EndProject
-Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "Kafka.Client.Tests", "Tests\Kafka.Client.Tests\Kafka.Client.Tests.csproj", "{9BA1A0BF-B207-4A11-8883-5F64B113C07D}"
-EndProject
-Project("{2150E333-8FDC-42A3-9474-1A3956D46DE8}") = "Tests", "Tests", "{06FD20F1-CE06-430E-AF6E-2EBECE6E47B3}"
-EndProject
-Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "Kafka.Client.IntegrationTests", "Tests\Kafka.Client.IntegrationTests\Kafka.Client.IntegrationTests.csproj", "{AF29C330-49BD-4648-B692-882E922C435B}"
-EndProject
-Global
- GlobalSection(SolutionConfigurationPlatforms) = preSolution
- Debug|Any CPU = Debug|Any CPU
- Release|Any CPU = Release|Any CPU
- EndGlobalSection
- GlobalSection(ProjectConfigurationPlatforms) = postSolution
- {A92DD03B-EE4F-4A78-9FB2-279B6348C7D2}.Debug|Any CPU.ActiveCfg = Debug|Any CPU
- {A92DD03B-EE4F-4A78-9FB2-279B6348C7D2}.Debug|Any CPU.Build.0 = Debug|Any CPU
- {A92DD03B-EE4F-4A78-9FB2-279B6348C7D2}.Release|Any CPU.ActiveCfg = Release|Any CPU
- {A92DD03B-EE4F-4A78-9FB2-279B6348C7D2}.Release|Any CPU.Build.0 = Release|Any CPU
- {9BA1A0BF-B207-4A11-8883-5F64B113C07D}.Debug|Any CPU.ActiveCfg = Debug|Any CPU
- {9BA1A0BF-B207-4A11-8883-5F64B113C07D}.Debug|Any CPU.Build.0 = Debug|Any CPU
- {9BA1A0BF-B207-4A11-8883-5F64B113C07D}.Release|Any CPU.ActiveCfg = Release|Any CPU
- {9BA1A0BF-B207-4A11-8883-5F64B113C07D}.Release|Any CPU.Build.0 = Release|Any CPU
- {AF29C330-49BD-4648-B692-882E922C435B}.Debug|Any CPU.ActiveCfg = Debug|Any CPU
- {AF29C330-49BD-4648-B692-882E922C435B}.Debug|Any CPU.Build.0 = Debug|Any CPU
- {AF29C330-49BD-4648-B692-882E922C435B}.Release|Any CPU.ActiveCfg = Release|Any CPU
- {AF29C330-49BD-4648-B692-882E922C435B}.Release|Any CPU.Build.0 = Release|Any CPU
- EndGlobalSection
- GlobalSection(SolutionProperties) = preSolution
- HideSolutionNode = FALSE
- EndGlobalSection
- GlobalSection(NestedProjects) = preSolution
- {9BA1A0BF-B207-4A11-8883-5F64B113C07D} = {06FD20F1-CE06-430E-AF6E-2EBECE6E47B3}
- {AF29C330-49BD-4648-B692-882E922C435B} = {06FD20F1-CE06-430E-AF6E-2EBECE6E47B3}
- EndGlobalSection
-EndGlobal
+
+Microsoft Visual Studio Solution File, Format Version 11.00
+# Visual Studio 2010
+Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "Kafka.Client", "Kafka.Client\Kafka.Client.csproj", "{A92DD03B-EE4F-4A78-9FB2-279B6348C7D2}"
+EndProject
+Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "Kafka.Client.Tests", "Tests\Kafka.Client.Tests\Kafka.Client.Tests.csproj", "{9BA1A0BF-B207-4A11-8883-5F64B113C07D}"
+EndProject
+Project("{2150E333-8FDC-42A3-9474-1A3956D46DE8}") = "Tests", "Tests", "{06FD20F1-CE06-430E-AF6E-2EBECE6E47B3}"
+EndProject
+Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "Kafka.Client.IntegrationTests", "Tests\Kafka.Client.IntegrationTests\Kafka.Client.IntegrationTests.csproj", "{AF29C330-49BD-4648-B692-882E922C435B}"
+EndProject
+Global
+ GlobalSection(SolutionConfigurationPlatforms) = preSolution
+ Debug|Any CPU = Debug|Any CPU
+ Integration|Any CPU = Integration|Any CPU
+ Release|Any CPU = Release|Any CPU
+ EndGlobalSection
+ GlobalSection(ProjectConfigurationPlatforms) = postSolution
+ {A92DD03B-EE4F-4A78-9FB2-279B6348C7D2}.Debug|Any CPU.ActiveCfg = Debug|Any CPU
+ {A92DD03B-EE4F-4A78-9FB2-279B6348C7D2}.Debug|Any CPU.Build.0 = Debug|Any CPU
+ {A92DD03B-EE4F-4A78-9FB2-279B6348C7D2}.Integration|Any CPU.ActiveCfg = Integration|Any CPU
+ {A92DD03B-EE4F-4A78-9FB2-279B6348C7D2}.Integration|Any CPU.Build.0 = Integration|Any CPU
+ {A92DD03B-EE4F-4A78-9FB2-279B6348C7D2}.Release|Any CPU.ActiveCfg = Release|Any CPU
+ {A92DD03B-EE4F-4A78-9FB2-279B6348C7D2}.Release|Any CPU.Build.0 = Release|Any CPU
+ {9BA1A0BF-B207-4A11-8883-5F64B113C07D}.Debug|Any CPU.ActiveCfg = Debug|Any CPU
+ {9BA1A0BF-B207-4A11-8883-5F64B113C07D}.Debug|Any CPU.Build.0 = Debug|Any CPU
+ {9BA1A0BF-B207-4A11-8883-5F64B113C07D}.Integration|Any CPU.ActiveCfg = Integration|Any CPU
+ {9BA1A0BF-B207-4A11-8883-5F64B113C07D}.Integration|Any CPU.Build.0 = Integration|Any CPU
+ {9BA1A0BF-B207-4A11-8883-5F64B113C07D}.Release|Any CPU.ActiveCfg = Release|Any CPU
+ {9BA1A0BF-B207-4A11-8883-5F64B113C07D}.Release|Any CPU.Build.0 = Release|Any CPU
+ {AF29C330-49BD-4648-B692-882E922C435B}.Debug|Any CPU.ActiveCfg = Debug|Any CPU
+ {AF29C330-49BD-4648-B692-882E922C435B}.Debug|Any CPU.Build.0 = Debug|Any CPU
+ {AF29C330-49BD-4648-B692-882E922C435B}.Integration|Any CPU.ActiveCfg = Integration|Any CPU
+ {AF29C330-49BD-4648-B692-882E922C435B}.Integration|Any CPU.Build.0 = Integration|Any CPU
+ {AF29C330-49BD-4648-B692-882E922C435B}.Release|Any CPU.ActiveCfg = Release|Any CPU
+ {AF29C330-49BD-4648-B692-882E922C435B}.Release|Any CPU.Build.0 = Release|Any CPU
+ EndGlobalSection
+ GlobalSection(SolutionProperties) = preSolution
+ HideSolutionNode = FALSE
+ EndGlobalSection
+ GlobalSection(NestedProjects) = preSolution
+ {9BA1A0BF-B207-4A11-8883-5F64B113C07D} = {06FD20F1-CE06-430E-AF6E-2EBECE6E47B3}
+ {AF29C330-49BD-4648-B692-882E922C435B} = {06FD20F1-CE06-430E-AF6E-2EBECE6E47B3}
+ EndGlobalSection
+EndGlobal
Added: incubator/kafka/trunk/clients/csharp/src/Kafka/Tests/Kafka.Client.IntegrationTests/App.config
URL: http://svn.apache.org/viewvc/incubator/kafka/trunk/clients/csharp/src/Kafka/Tests/Kafka.Client.IntegrationTests/App.config?rev=1173797&view=auto
==============================================================================
--- incubator/kafka/trunk/clients/csharp/src/Kafka/Tests/Kafka.Client.IntegrationTests/App.config (added)
+++ incubator/kafka/trunk/clients/csharp/src/Kafka/Tests/Kafka.Client.IntegrationTests/App.config Wed Sep 21 19:17:19 2011
@@ -0,0 +1,27 @@
+<?xml version="1.0" encoding="utf-8" ?>
+<configuration>
+ <configSections>
+ <section
+ name="kafkaClientConfiguration"
+ type="Kafka.Client.Cfg.KafkaClientConfiguration, Kafka.Client"
+ allowLocation="true"
+ allowDefinition="Everywhere"
+ />
+ <section name="log4net" type="log4net.Config.Log4NetConfigurationSectionHandler, log4net, Version=1.2.10.0, Culture=neutral, PublicKeyToken=1b44e1d426115821" />
+ </configSections>
+ <log4net configSource="Log4Net.config" />
+ <kafkaClientConfiguration>
+ <!--<kafkaServer address="192.168.3.251" port="9092"></kafkaServer>-->
+ <kafkaServer address="192.168.1.39" port="9092"></kafkaServer>
+ <consumer numberOfTries="2" groupId="testGroup" timeout="10000" autoOffsetReset="smallest" autoCommit="true" autoCommitIntervalMs="1000" fetchSize="307200" backOffIncrementMs="2000"/>
+ <brokerPartitionInfos>
+ <add id="0" address="192.168.1.39" port="9092" />
+ <add id="1" address="192.168.1.39" port="9101" />
+ <add id="2" address="192.168.1.39" port="9102" />
+ <!--<add id="0" address="192.168.3.251" port="9092" />-->
+ <!--<add id="2" address="192.168.3.251" port="9092" />-->
+ </brokerPartitionInfos>
+ <zooKeeperServers addressList="192.168.1.39:2181" sessionTimeout="30000" connectionTimeout="3000"></zooKeeperServers>
+ <!--<zooKeeperServers addressList="192.168.3.251:2181" sessionTimeout="30000"></zooKeeperServers>-->
+ </kafkaClientConfiguration>
+</configuration>
\ No newline at end of file
Added: incubator/kafka/trunk/clients/csharp/src/Kafka/Tests/Kafka.Client.IntegrationTests/Config/Debug/App.config
URL: http://svn.apache.org/viewvc/incubator/kafka/trunk/clients/csharp/src/Kafka/Tests/Kafka.Client.IntegrationTests/Config/Debug/App.config?rev=1173797&view=auto
==============================================================================
--- incubator/kafka/trunk/clients/csharp/src/Kafka/Tests/Kafka.Client.IntegrationTests/Config/Debug/App.config (added)
+++ incubator/kafka/trunk/clients/csharp/src/Kafka/Tests/Kafka.Client.IntegrationTests/Config/Debug/App.config Wed Sep 21 19:17:19 2011
@@ -0,0 +1,27 @@
+<?xml version="1.0" encoding="utf-8" ?>
+<configuration>
+ <configSections>
+ <section
+ name="kafkaClientConfiguration"
+ type="Kafka.Client.Cfg.KafkaClientConfiguration, Kafka.Client"
+ allowLocation="true"
+ allowDefinition="Everywhere"
+ />
+ <section name="log4net" type="log4net.Config.Log4NetConfigurationSectionHandler, log4net, Version=1.2.10.0, Culture=neutral, PublicKeyToken=1b44e1d426115821" />
+ </configSections>
+ <log4net configSource="Log4Net.config" />
+ <kafkaClientConfiguration>
+ <!--<kafkaServer address="192.168.3.251" port="9092"></kafkaServer>-->
+ <kafkaServer address="192.168.1.39" port="9092"></kafkaServer>
+ <consumer numberOfTries="2" groupId="testGroup" timeout="10000" autoOffsetReset="smallest" autoCommit="true" autoCommitIntervalMs="1000" fetchSize="307200" backOffIncrementMs="2000"/>
+ <brokerPartitionInfos>
+ <add id="0" address="192.168.1.39" port="9092" />
+ <add id="1" address="192.168.1.39" port="9101" />
+ <add id="2" address="192.168.1.39" port="9102" />
+ <!--<add id="0" address="192.168.3.251" port="9092" />-->
+ <!--<add id="2" address="192.168.3.251" port="9092" />-->
+ </brokerPartitionInfos>
+ <zooKeeperServers addressList="192.168.1.39:2181" sessionTimeout="30000" connectionTimeout="3000"></zooKeeperServers>
+ <!--<zooKeeperServers addressList="192.168.3.251:2181" sessionTimeout="30000"></zooKeeperServers>-->
+ </kafkaClientConfiguration>
+</configuration>
\ No newline at end of file