You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by ac...@apache.org on 2008/09/05 21:53:45 UTC

svn commit: r692521 - in /incubator/qpid/trunk/qpid/cpp/src: ./ qpid/ qpid/cluster/ tests/

Author: aconway
Date: Fri Sep  5 12:53:44 2008
New Revision: 692521

URL: http://svn.apache.org/viewvc?rev=692521&view=rev
Log:
Fixed cluster membership notification.
Cluster events with RefCountedBuffers for queueing.
PollableQueue clears bacth immediately.
Improved perfdist: clients hit multiple brokers in a cluster.

Added:
    incubator/qpid/trunk/qpid/cpp/src/qpid/RefCountedBuffer.cpp   (with props)
    incubator/qpid/trunk/qpid/cpp/src/qpid/RefCountedBuffer.h   (with props)
    incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/Event.cpp   (with props)
    incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/Event.h   (with props)
Modified:
    incubator/qpid/trunk/qpid/cpp/src/Makefile.am
    incubator/qpid/trunk/qpid/cpp/src/cluster.mk
    incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/Cluster.cpp
    incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/Cluster.h
    incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/Cpg.cpp
    incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/Cpg.h
    incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/PollableQueue.h
    incubator/qpid/trunk/qpid/cpp/src/tests/benchmark
    incubator/qpid/trunk/qpid/cpp/src/tests/perfdist
    incubator/qpid/trunk/qpid/cpp/src/tests/start_cluster_hosts

Modified: incubator/qpid/trunk/qpid/cpp/src/Makefile.am
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/Makefile.am?rev=692521&r1=692520&r2=692521&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/Makefile.am (original)
+++ incubator/qpid/trunk/qpid/cpp/src/Makefile.am Fri Sep  5 12:53:44 2008
@@ -237,6 +237,8 @@
   qpid/DataDir.cpp \
   qpid/Options.cpp \
   qpid/log/Options.cpp \
+  qpid/RefCountedBuffer.h \
+  qpid/RefCountedBuffer.cpp \
   qpid/log/Selector.cpp \
   qpid/log/Statement.cpp \
   qpid/pointer_to_other.h

Modified: incubator/qpid/trunk/qpid/cpp/src/cluster.mk
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/cluster.mk?rev=692521&r1=692520&r2=692521&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/cluster.mk (original)
+++ incubator/qpid/trunk/qpid/cpp/src/cluster.mk Fri Sep  5 12:53:44 2008
@@ -25,7 +25,9 @@
   qpid/cluster/WriteEstimate.cpp \
   qpid/cluster/OutputInterceptor.h \
   qpid/cluster/OutputInterceptor.cpp \
-  qpid/cluster/ProxyInputHandler.h
+  qpid/cluster/ProxyInputHandler.h \
+  qpid/cluster/Event.h \
+  qpid/cluster/Event.cpp
 
 libqpidcluster_la_LIBADD= -lcpg libqpidbroker.la
 

Added: incubator/qpid/trunk/qpid/cpp/src/qpid/RefCountedBuffer.cpp
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/RefCountedBuffer.cpp?rev=692521&view=auto
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/RefCountedBuffer.cpp (added)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/RefCountedBuffer.cpp Fri Sep  5 12:53:44 2008
@@ -0,0 +1,45 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+#include "RefCountedBuffer.h"
+
+namespace qpid {
+
+RefCountedBuffer::RefCountedBuffer() : count(0) {}
+
+void RefCountedBuffer::destroy() const {
+    this->~RefCountedBuffer();
+    ::delete[] reinterpret_cast<const char*>(this);
+}
+
+char* RefCountedBuffer::addr() const {
+    return const_cast<char*>(reinterpret_cast<const char*>(this)+sizeof(RefCountedBuffer));
+}
+
+RefCountedBuffer::intrusive_ptr RefCountedBuffer::create(size_t n) {
+    char* store=::new char[n+sizeof(RefCountedBuffer)];
+    new(store) RefCountedBuffer;
+    return reinterpret_cast<RefCountedBuffer*>(store);
+}
+
+} // namespace qpid
+
+

Propchange: incubator/qpid/trunk/qpid/cpp/src/qpid/RefCountedBuffer.cpp
------------------------------------------------------------------------------
    svn:eol-style = native

Propchange: incubator/qpid/trunk/qpid/cpp/src/qpid/RefCountedBuffer.cpp
------------------------------------------------------------------------------
    svn:keywords = Rev Date

Added: incubator/qpid/trunk/qpid/cpp/src/qpid/RefCountedBuffer.h
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/RefCountedBuffer.h?rev=692521&view=auto
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/RefCountedBuffer.h (added)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/RefCountedBuffer.h Fri Sep  5 12:53:44 2008
@@ -0,0 +1,68 @@
+#ifndef QPID_REFCOUNTEDBUFFER_H
+#define QPID_REFCOUNTEDBUFFER_H
+
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+#include <boost/utility.hpp>
+#include <boost/detail/atomic_count.hpp>
+#include <boost/intrusive_ptr.hpp>
+
+namespace qpid {
+
+/**
+ * Reference-counted byte buffer.
+ * No alignment guarantees.
+ */
+class RefCountedBuffer : boost::noncopyable {
+    mutable boost::detail::atomic_count count;
+    RefCountedBuffer();
+    void destroy() const;
+    char* addr() const;
+
+public:
+
+    typedef boost::intrusive_ptr<RefCountedBuffer> intrusive_ptr;
+
+    /** Create a reference counted buffer of size n */
+    static intrusive_ptr create(size_t n);
+
+    /** Get a pointer to the start of the buffer. */
+    char* get() { return addr(); }
+    const char* get() const { return addr(); }
+    char& operator[](size_t i) { return get()[i]; }
+    const char& operator[](size_t i) const { return get()[i]; }
+
+    void addRef() const { ++count; }
+    void release() const { if (--count==0) destroy(); }
+    long refCount() { return count; }
+};
+
+} // namespace qpid
+
+// intrusive_ptr support.
+namespace boost {
+inline void intrusive_ptr_add_ref(const qpid::RefCountedBuffer* p) { p->addRef(); }
+inline void intrusive_ptr_release(const qpid::RefCountedBuffer* p) { p->release(); }
+}
+
+
+#endif  /*!QPID_REFCOUNTEDBUFFER_H*/

Propchange: incubator/qpid/trunk/qpid/cpp/src/qpid/RefCountedBuffer.h
------------------------------------------------------------------------------
    svn:eol-style = native

Propchange: incubator/qpid/trunk/qpid/cpp/src/qpid/RefCountedBuffer.h
------------------------------------------------------------------------------
    svn:keywords = Rev Date

Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/Cluster.cpp
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/Cluster.cpp?rev=692521&r1=692520&r2=692521&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/Cluster.cpp (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/Cluster.cpp Fri Sep  5 12:53:44 2008
@@ -38,6 +38,7 @@
 #include <algorithm>
 #include <iterator>
 #include <map>
+#include <ostream>
 
 namespace qpid {
 namespace cluster {
@@ -67,11 +68,8 @@
     )
 {
     broker->addFinalizer(boost::bind(&Cluster::leave, this));
-    QPID_LOG(trace, "Joining cluster: " << name << " as " << self);
+    QPID_LOG(notice, "Joining cluster: " << name.str() << " as " << self);
     cpg.join(name);
-    mcastFrame(AMQFrame(in_place<ClusterUrlNoticeBody>(ProtocolVersion(), url.str())),
-               ConnectionId(self,0));
-
     // Start dispatching from the poller.
     cpgDispatchHandle.startWatch(poller);
 }
@@ -94,7 +92,7 @@
     // Leave is called by from Broker destructor after the poller has
     // been shut down. No dispatches can occur.
 
-    QPID_LOG(debug, "Leaving cluster " << name.str());
+    QPID_LOG(notice, "Leaving cluster " << name.str());
     cpg.leave(name);
     // broker= is set to 0 when the final config-change is delivered.
     while(broker) {
@@ -158,7 +156,7 @@
     if (i == connections.end()) { // New shadow connection.
         assert(id.getMember() != self);
         std::ostringstream mgmtId;
-        mgmtId << name << ":"  << id;
+        mgmtId << name.str() << ":"  << id;
         ConnectionMap::value_type value(id, new Connection(*this, shadowOut, mgmtId.str(), id));
         i = connections.insert(value).first;
     }
@@ -205,22 +203,50 @@
     }
 }
 
+struct AddrList {
+    const cpg_address* addrs;
+    int count;
+    AddrList(const cpg_address* a, int n) : addrs(a), count(n) {}
+};
+
+ostream& operator<<(ostream& o, const AddrList& a) {
+    for (const cpg_address* p = a.addrs; p < a.addrs+a.count; ++p) {
+        const char* reasonString;
+        switch (p->reason) {
+          case CPG_REASON_JOIN: reasonString =  " joined "; break;
+          case CPG_REASON_LEAVE: reasonString =  " left ";break;
+          case CPG_REASON_NODEDOWN: reasonString =  " node-down ";break;
+          case CPG_REASON_NODEUP: reasonString =  " node-up ";break;
+          case CPG_REASON_PROCDOWN: reasonString =  " process-down ";break;
+          default: reasonString = " ";
+        }
+        qpid::cluster::MemberId member(*p);
+        o << member << reasonString;
+    }
+    return o;
+}
+
 void Cluster::configChange(
     cpg_handle_t /*handle*/,
     cpg_name */*group*/,
     cpg_address *current, int nCurrent,
     cpg_address *left, int nLeft,
-    cpg_address */*joined*/, int /*nJoined*/)
+    cpg_address *joined, int nJoined)
 {
-    QPID_LOG(debug, "Cluster change: "
-             << std::make_pair(current, nCurrent)
-             << std::make_pair(left, nLeft));
+    QPID_LOG(notice, "Cluster of " << nCurrent << ": " << AddrList(current, nCurrent) << ".\n Changes: "
+             << AddrList(joined, nJoined) << AddrList(left, nLeft));
+    
+    if (nJoined)                // Notfiy new members of my URL.
+        mcastFrame(
+            AMQFrame(in_place<ClusterUrlNoticeBody>(ProtocolVersion(), url.str())),
+            ConnectionId(self,0));
+
 
     Mutex::ScopedLock l(lock);
     for (int i = 0; i < nLeft; ++i) urls.erase(left[i]);
     // Add new members when their URL notice arraives.
 
-    if (std::find(left, left+nLeft, self) != left+nLeft)
+    if (find(left, left+nLeft, self) != left+nLeft)
         broker = 0;       // We have left the group, this is the final config change.
     lock.notifyAll();     // Threads waiting for membership changes.
 }
@@ -236,7 +262,8 @@
     broker->shutdown();
 }
 
-void Cluster::urlNotice(const MemberId& m, const std::string& url) {
+void Cluster::urlNotice(const MemberId& m, const string& url) {
+    QPID_LOG(notice, "Cluster member " << m << " has URL " << url);
     urls.insert(UrlMap::value_type(m,Url(url)));
 }
 

Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/Cluster.h
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/Cluster.h?rev=692521&r1=692520&r2=692521&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/Cluster.h (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/Cluster.h Fri Sep  5 12:53:44 2008
@@ -19,7 +19,6 @@
  *
  */
 
-#include "qpid/cluster/types.h"
 #include "qpid/cluster/Cpg.h"
 #include "qpid/cluster/PollableQueue.h"
 #include "qpid/cluster/NoOpConnectionOutputHandler.h"

Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/Cpg.cpp
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/Cpg.cpp?rev=692521&r1=692520&r2=692521&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/Cpg.cpp (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/Cpg.cpp Fri Sep  5 12:53:44 2008
@@ -184,33 +184,3 @@
 }
 
 }} // namespace qpid::cluster
-
-
-// In proper namespace for ADL.
-
-std::ostream& operator<<(std::ostream& o, const ::cpg_address& a) {
-    const char* reasonString;
-    switch (a.reason) {
-      case CPG_REASON_JOIN: reasonString = " joined"; break;
-      case CPG_REASON_LEAVE: reasonString = " left";break;
-      case CPG_REASON_NODEDOWN: reasonString = " node-down";break;
-      case CPG_REASON_NODEUP: reasonString = " node-up";break;
-      case CPG_REASON_PROCDOWN: reasonString = " process-down";break;
-      default: reasonString = "";
-    }
-    return o << qpid::cluster::MemberId(a.nodeid, a.pid) << reasonString;
-}
-
-std::ostream& operator<<(std::ostream& o, const cpg_name& name) {
-    return o << std::string(name.value, name.length);
-}
-
-namespace std {
-ostream& operator<<(ostream& o, std::pair<cpg_address*,int> a) {
-    for (cpg_address* p = a.first; p < a.first+a.second; ++p)
-        o << *p << " ";
-    return o;
-}
-}
-
-

Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/Cpg.h
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/Cpg.h?rev=692521&r1=692520&r2=692521&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/Cpg.h (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/Cpg.h Fri Sep  5 12:53:44 2008
@@ -158,8 +158,6 @@
     bool isShutdown;
 };
 
-std::ostream& operator <<(std::ostream& out, const MemberId& id);
-
 inline bool operator==(const cpg_name& a, const cpg_name& b) {
     return a.length==b.length &&  strncmp(a.value, b.value, a.length) == 0;
 }
@@ -167,12 +165,4 @@
 
 }} // namespace qpid::cluster
 
-// In proper namespaces for ADL
-std::ostream& operator <<(std::ostream& out, const cpg_name& name);
-std::ostream& operator<<(std::ostream& o, const cpg_address& a);
-namespace std {
-std::ostream& operator <<(std::ostream& out, std::pair<cpg_address*,int> addresses);
-}
-
-
 #endif  /*!CPG_H*/

Added: incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/Event.cpp
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/Event.cpp?rev=692521&view=auto
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/Event.cpp (added)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/Event.cpp Fri Sep  5 12:53:44 2008
@@ -0,0 +1,52 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+#include "types.h"
+#include "Event.h"
+#include "Cpg.h"
+#include "qpid/framing/Buffer.h"
+
+namespace qpid {
+namespace cluster {
+using framing::Buffer;
+
+const size_t Event::OVERHEAD = 1 /*type*/ + 8 /*64-bit pointr*/;
+
+Event::Event(EventType t, const ConnectionId c, const size_t s)
+    : type(t), connection(c), size(s), data(RefCountedBuffer::create(s)) {}
+
+Event::Event(const MemberId& m, const char* d, size_t s)
+    : connection(m, 0), size(s-OVERHEAD), data(RefCountedBuffer::create(size))
+{
+    memcpy(data->get(), d, s);
+}
+    
+void Event::mcast(const Cpg::Name& name, Cpg& cpg) {
+    char header[OVERHEAD];
+    Buffer b;
+    b.putOctet(type);
+    b.putLongLong(reinterpret_cast<uint64_t>(connection.getConnectionPtr()));
+    iovec iov[] = { { header, b.getPosition() }, { data.get(), size } };
+    cpg.mcast(name, iov, sizeof(iov)/sizeof(*iov));
+}
+
+
+
+}} // namespace qpid::cluster

Propchange: incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/Event.cpp
------------------------------------------------------------------------------
    svn:eol-style = native

Propchange: incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/Event.cpp
------------------------------------------------------------------------------
    svn:keywords = Rev Date

Added: incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/Event.h
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/Event.h?rev=692521&view=auto
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/Event.h (added)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/Event.h Fri Sep  5 12:53:44 2008
@@ -0,0 +1,65 @@
+#ifndef QPID_CLUSTER_EVENT_H
+#define QPID_CLUSTER_EVENT_H
+
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+#include "types.h"
+#include "Cpg.h"
+#include "qpid/RefCountedBuffer.h"
+
+namespace qpid {
+namespace cluster {
+
+// TODO aconway 2008-09-03: more efficient solution for shared
+// byte-stream data.
+// 
+
+/**
+ * Events are sent to/received from the cluster.
+ * Refcounted so they can be stored on queues.
+ */
+struct Event {
+  public:
+    /** Create an event with for mcasting, with size bytes of space. */
+    Event(EventType t, const ConnectionId c, size_t size);
+
+    /** Create an event from delivered data. */
+    Event(const MemberId& m, const char* data, size_t size);
+    
+    void mcast(const Cpg::Name& name, Cpg& cpg);
+    
+    EventType getType() const { return type; }
+    ConnectionId getConnection() const { return connection; }
+    size_t getSize() const { return size; }
+    char* getData() { return data->get(); }
+
+  private:
+    static const size_t OVERHEAD;
+    EventType type;
+    ConnectionId connection;
+    size_t size;
+    RefCountedBuffer::intrusive_ptr data;
+};
+
+}} // namespace qpid::cluster
+
+#endif  /*!QPID_CLUSTER_EVENT_H*/

Propchange: incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/Event.h
------------------------------------------------------------------------------
    svn:eol-style = native

Propchange: incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/Event.h
------------------------------------------------------------------------------
    svn:keywords = Rev Date

Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/PollableQueue.h
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/PollableQueue.h?rev=692521&r1=692520&r2=692521&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/PollableQueue.h (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/PollableQueue.h Fri Sep  5 12:53:44 2008
@@ -89,9 +89,13 @@
     batch.clear();
     batch.swap(queue);
     condition.clear();
-    ScopedUnlock u(lock);
-    callback(batch.begin(), batch.end()); // Process outside the lock to allow concurrent push.
-    h.rewatch();
+    {
+        // Process outside the lock to allow concurrent push.
+        ScopedUnlock u(lock);
+        callback(batch.begin(), batch.end()); 
+        h.rewatch();
+    }
+    batch.clear();
 }
 
 }} // namespace qpid::cluster

Modified: incubator/qpid/trunk/qpid/cpp/src/tests/benchmark
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/tests/benchmark?rev=692521&r1=692520&r2=692521&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/tests/benchmark (original)
+++ incubator/qpid/trunk/qpid/cpp/src/tests/benchmark Fri Sep  5 12:53:44 2008
@@ -21,33 +21,45 @@
 # and latency against a single cluster member while they are replicating.
 #
 # Must be run in the qpid src/tests build directory.
-#
+# 
 
+usage() {
+cat <<EOF
+Usage: $0 [options] -- client hosts --- broker hosts
+Read the script for options.
+EOF
+}
 # Defaults
 TESTDIR=${TESTDIR:-$PWD}    # Absolute path to test exes on all hosts.
-SCRIPTDIR=${SCRIPTDIR:-$PWD/`dirname $0`}    # Absolute path to test exes on all hosts.
-SAMPLES=10		    # Runs of each test.
-COUNT=${COUNT:-10000}	    # Count for pub/sub tests. 
-ECHO=${ECHO:-1000}	    # Count for echo test.
-BROKER_FLAGS=
-
-while getopts "t:b:p:s:c:n:e" opt ; do
-    case $opt in
-	t) TESTDIR=$OPTARG ;;
-	b) BROKER_FLAGS="$BROKER_FLAGS -b $OPTARG" ;;
-	p) BROKER_FLAGS="$BROKER_FLAGS -p $OPTARG" ;;
-	s) SAMPLES=$OPTARG ;;
-	c) CLIENTS="$CLIENTS $OPTARG" ;;
-	n) COUNT="$OPTARG" ;;
-	e) ECHO="$OPTARG" ;;
+SCRIPTDIR=${SCRIPTDIR:-$PWD/`dirname $0`}    # Absolute path to test scripts on all hosts.
+SAMPLES=10			# Runs of each test.
+COUNT=${COUNT:-10000}		# Count for pub/sub tests.
+SIZE=${SIZE:-600}		# Size of messages
+ECHO=${ECHO:-1000}		# Count for echo test.
+
+collect() { eval $COLLECT=\""\$$COLLECT $*"\"; }
+COLLECT=ARGS
+while test $# -gt 0; do
+    case $1 in
+	--testdir) TESTDIR=$2 ; shift 2 ;;
+	--samples) SAMPLES=$2 ; shift 2 ;;
+	--count) COUNT=$2 ; shift 2 ;;
+	--echos) ECHO==$2 ; shift 2 ;;
+	--size) SIZE==$2 ; shift 2 ;;
+	--) COLLECT=CLIENTARG; shift ;;
+	---) COLLECT=BROKERARG; shift;;
+	*) collect $1; shift ;;
     esac
 done
 
+CLIENTS=${CLIENTARG:-$CLIENTS}
+BROKERS=${BROKERARG:-$BROKERS}
 test -z "$CLIENTS" && { echo "Must specify at least one client host."; exit 1; }
-test -z "$BROKER_FLAGS" && { echo "Must specify a broker host."; exit 1; } 
+test -z "$BROKERS" && { echo "Must specify at least one broker host."; exit 1; } 
 
 export TESTDIR			# For perfdist
 CLIENTS=($CLIENTS)		# Convert to array
+BROKERS=($BROKERS)
 trap "rm -f $FILES" EXIT
 
 dosamples() {
@@ -63,13 +75,13 @@
 }
 
 HEADING="pub	sub	total	Mb"
-dosamples $SCRIPTDIR/perfdist $BROKER_FLAGS --count $COUNT --nsubs 2 --npubs 2 --qt 2 -s -- ${CLIENTS[*]}
+dosamples $SCRIPTDIR/perfdist --count $COUNT --nsubs 2 --npubs 2 --qt 2 -s -- ${CLIENTS[*]} --- ${BROKERS[*]}
 HEADING="pub"
-dosamples ssh -A ${CLIENTS[0]} $TESTDIR/publish --routing-key perftest0 -s $BROKER_FLAGS --count $COUNT
+dosamples ssh -A ${CLIENTS[0]} $TESTDIR/publish --routing-key perftest0 --count $COUNT -s -b ${BROKERS[0]} 
 HEADING="sub"
-dosamples ssh -A ${CLIENTS[0]} $TESTDIR/consume --queue perftest0 -s $BROKER_FLAGS --count $COUNT
+dosamples ssh -A ${CLIENTS[0]} $TESTDIR/consume --queue perftest0 -s --count $COUNT -b ${BROKERS[0]} 
 HEADING="min	max	avg"
-dosamples ssh -A ${CLIENTS[0]} $TESTDIR/echotest --count $ECHO -s
+dosamples ssh -A ${CLIENTS[0]} $TESTDIR/echotest --count $ECHO -s -b ${BROKERS[0]} 
 
 echo
 echo "Tab separated spreadsheet (also stored in benchmark.tab):"

Modified: incubator/qpid/trunk/qpid/cpp/src/tests/perfdist
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/tests/perfdist?rev=692521&r1=692520&r2=692521&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/tests/perfdist (original)
+++ incubator/qpid/trunk/qpid/cpp/src/tests/perfdist Fri Sep  5 12:53:44 2008
@@ -7,17 +7,16 @@
 set -e
 usage() {
 cat <<EOF
-usage: $0 <perftest-args> -- <client-hosts ...> 
+usage: $0 <perftest-args> -- <client-hosts ...> [ --- <broker hosts...> ]
+Client & broker hosts can also be set in env vars CLIENTS and BROKERS.
 
-Run perftest with clients running on the listed hosts.  Clients are
-assigned to hosts publishers first, then subscribers the host list is
-used round-robin if there are more clients than hosts. perftest-args should
-include a --host <brokerhost>  flag (and --port if necessary).
+Run perftest with clients running on the clients and brokers running
+on the specified hosts. Clients are assigned to client hosts round
+robin: publishers first, then subscribers. If there are multiple
+brokers (for cluster tests) clients connect to them round robin.
 
-Do not pass preftest action flags: --setup, --control, --publish, --subscribe.
-The script will pass them to the appropriate client processes.
-
-Note all perftest args must come before  --.
+Broker hosts can be listed with -b in perftest-args or after ---
+at the end of the arguments.
 
 Error: $*
 EOF
@@ -36,19 +35,28 @@
 	--npubs) collect $1 $2; NPUBS=$2; shift 2 ;;
 	--nsubs) collect $1 $2; NSUBS=$2; shift 2 ;;
 	-s|--summary) collect $1; QUIET=yes; shift 1 ;;
-	--) COLLECT=HOSTS; shift ;; 
+	-b|--broker) BROKERS="$BROKERS $2"; shift 2;;
+	--) COLLECT=CLIENTARG; shift ;;
+	---) COLLECT=BROKERARG; shift;;
 	*) collect $1; shift ;;
     esac
 done
 
-if [ -z "$HOSTS" ]; then usage "No hosts listed after --"; fi
+CLIENTS=${CLIENTARG:-$CLIENTS}
+if [ -z "$CLIENTS" ]; then usage "No client hosts listed after --"; fi
+BROKERS=${BROKERARG:-$BROKERS}
+if [ -z "$BROKERS" ]; then usage "No brokers specified"; fi
+
 PERFTEST="$TESTDIR/perftest $ARGS"
 
-HOSTS=($HOSTS)
+CLIENTS=($CLIENTS)
+BROKERS=($BROKERS)
 start() {
-    HOST=${HOSTS[i % ${#HOSTS[*]}]}
-    test -z "$QUIET" && echo "Client $i on $HOST $*"
-    ssh -fT $HOST "PATH=$ADDPATH:\$PATH" $PERFTEST "$@"
+    CLIENT=${CLIENTS[i % ${#CLIENTS[*]}]}
+    BROKER=${BROKERS[i % ${#BROKERS[*]}]}
+    ARGS="$* --broker $BROKER"
+    test -z "$QUIET" && echo "Client $i on $CLIENT: $ARGS"
+    ssh -fT $CLIENT $PERFTEST "$@"
 }
 
 $PERFTEST --setup

Modified: incubator/qpid/trunk/qpid/cpp/src/tests/start_cluster_hosts
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/tests/start_cluster_hosts?rev=692521&r1=692520&r2=692521&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/tests/start_cluster_hosts (original)
+++ incubator/qpid/trunk/qpid/cpp/src/tests/start_cluster_hosts Fri Sep  5 12:53:44 2008
@@ -16,14 +16,14 @@
 
 QPIDD=${QPIDD:-$PWD/../qpidd}
 LIBQPIDCLUSTER=${LIBQPIDCLUSTER:-$PWD/../.libs/libqpidcluster.so}
-CLUSTER=$USER			# User name is default cluster name.
+NAME=$USER			# User name is default cluster name.
 RESTART=NO
 
-while getopts "kp:c:q:r" ARG ; do
+while getopts "kp:n:q:r" ARG ; do
     case $ARG in
 	k) KILL=yes ;; 
 	p) PORT="$OPTARG" ;;
-	c) CLUSTER=$OPTARG ;;
+	n) NAME=$OPTARG ;;
 	q) QPIDD=$OPTARG ;;
 	l) LIBQPIDCLUSTER=$OPTARG ;;
 	r) RESTART=yes ;;
@@ -33,17 +33,17 @@
 shift `expr $OPTIND - 1`
 test -n "$PORT" && PORTOPT="-p $PORT"
 test "$KILL" = yes && KILL="$QPIDD -q $PORTOPT ;"
-test -z "$*" && { echo Must specify at least one host; exit 1; }
+CLUSTER=${*:-$CLUSTER}		# Use args or env
+test -z "$CLUSTER" && { echo Must specify at least one host; exit 1; }
 
 
-OPTS="-d $PORTOPT --load-module $LIBQPIDCLUSTER --cluster-name=$CLUSTER --no-data-dir --auth=no --log-output=syslog"
+OPTS="-d $PORTOPT --load-module $LIBQPIDCLUSTER --cluster-name=$NAME --no-data-dir --auth=no --log-output=syslog --log-enable=info+"
 
 num=0
-for h in $*; do
+for h in $CLUSTER; do
     num=`expr $num + 1`	      # Give a unique log prefix to each node.
     cmd="$KILL $QPIDD $OPTS --log-prefix $num.$h"
-    echo == $h
-    out=`echo "$cmd" | ssh $h newgrp ais` || { echo $out ; exit 1; }
+    out=`echo "$cmd" | ssh $h newgrp ais` || { echo == $h error: $out ; exit 1; }
     if [ "$PORT" = 0 ] ; then p=$out; else p=$PORT; fi
     echo "$h $p" 
 done