You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by ac...@apache.org on 2011/12/08 15:20:43 UTC

svn commit: r1211902 - in /qpid/branches/qpid-3603/qpid/cpp/src: qpid/ha/QueueReplicator.cpp qpid/ha/ReplicatingSubscription.cpp qpid/ha/WiringReplicator.cpp tests/ha_tests.py tests/qpid-cluster-benchmark tests/qpid-cpp-benchmark

Author: aconway
Date: Thu Dec  8 14:20:43 2011
New Revision: 1211902

URL: http://svn.apache.org/viewvc?rev=1211902&view=rev
Log:
QPID-3603: Cleaned up log messages, update qpid-cluster-benchmark to set replicate=all

Modified:
    qpid/branches/qpid-3603/qpid/cpp/src/qpid/ha/QueueReplicator.cpp
    qpid/branches/qpid-3603/qpid/cpp/src/qpid/ha/ReplicatingSubscription.cpp
    qpid/branches/qpid-3603/qpid/cpp/src/qpid/ha/WiringReplicator.cpp
    qpid/branches/qpid-3603/qpid/cpp/src/tests/ha_tests.py
    qpid/branches/qpid-3603/qpid/cpp/src/tests/qpid-cluster-benchmark
    qpid/branches/qpid-3603/qpid/cpp/src/tests/qpid-cpp-benchmark

Modified: qpid/branches/qpid-3603/qpid/cpp/src/qpid/ha/QueueReplicator.cpp
URL: http://svn.apache.org/viewvc/qpid/branches/qpid-3603/qpid/cpp/src/qpid/ha/QueueReplicator.cpp?rev=1211902&r1=1211901&r2=1211902&view=diff
==============================================================================
--- qpid/branches/qpid-3603/qpid/cpp/src/qpid/ha/QueueReplicator.cpp (original)
+++ qpid/branches/qpid-3603/qpid/cpp/src/qpid/ha/QueueReplicator.cpp Thu Dec  8 14:20:43 2011
@@ -86,7 +86,7 @@ void QueueReplicator::initializeBridge(B
     peer.getMessage().subscribe(args.i_src, args.i_dest, 0/*accept-explicit*/, 0/*acquire-pre-acquired*/, false, "", 0, settings);
     peer.getMessage().flow(getName(), 0, 0xFFFFFFFF);
     peer.getMessage().flow(getName(), 1, 0xFFFFFFFF);
-    QPID_LOG(debug, "HA: Backup activated bridge from queue " << args.i_src << " to " << args.i_dest);
+    QPID_LOG(debug, "HA: Backup activated bridge from " << args.i_src << " to " << args.i_dest);
 }
 
 void QueueReplicator::route(Deliverable& msg, const std::string& key, const qpid::framing::FieldTable* /*args*/)

Modified: qpid/branches/qpid-3603/qpid/cpp/src/qpid/ha/ReplicatingSubscription.cpp
URL: http://svn.apache.org/viewvc/qpid/branches/qpid-3603/qpid/cpp/src/qpid/ha/ReplicatingSubscription.cpp?rev=1211902&r1=1211901&r2=1211902&view=diff
==============================================================================
--- qpid/branches/qpid-3603/qpid/cpp/src/qpid/ha/ReplicatingSubscription.cpp (original)
+++ qpid/branches/qpid-3603/qpid/cpp/src/qpid/ha/ReplicatingSubscription.cpp Thu Dec  8 14:20:43 2011
@@ -138,8 +138,9 @@ ReplicatingSubscription::ReplicatingSubs
             //local queue (i.e. master) is empty
             range.add(lwm, queue->getPosition());
         }
-        QPID_LOG(debug, "HA: Initial set of dequeues for " << queue->getName() << " are " << range
-                 << " (lwm=" << lwm << ", hwm=" << hwm << ", current=" << queue->getPosition() << ")");
+        QPID_LOG(debug, "HA: Initial set of dequeues for " << queue->getName() << ": "
+                 << range << " (lwm=" << lwm << ", hwm=" << hwm
+                 << ", current=" << queue->getPosition() << ")");
         //set position of 'cursor'
         position = hwm;
     }

Modified: qpid/branches/qpid-3603/qpid/cpp/src/qpid/ha/WiringReplicator.cpp
URL: http://svn.apache.org/viewvc/qpid/branches/qpid-3603/qpid/cpp/src/qpid/ha/WiringReplicator.cpp?rev=1211902&r1=1211901&r2=1211902&view=diff
==============================================================================
--- qpid/branches/qpid-3603/qpid/cpp/src/qpid/ha/WiringReplicator.cpp (original)
+++ qpid/branches/qpid-3603/qpid/cpp/src/qpid/ha/WiringReplicator.cpp Thu Dec  8 14:20:43 2011
@@ -229,18 +229,14 @@ void WiringReplicator::route(Deliverable
                 Variant::Map& map = i->asMap();
                 Variant::Map& schema = map[SCHEMA_ID].asMap();
                 Variant::Map& values = map[VALUES].asMap();
-                QPID_LOG(debug, "HA: Backup received event: schema=" << schema
+                QPID_LOG(trace, "HA: Backup received event: schema=" << schema
                          << " values=" << values);
                 if      (match<EventQueueDeclare>(schema)) doEventQueueDeclare(values);
                 else if (match<EventQueueDelete>(schema)) doEventQueueDelete(values);
                 else if (match<EventExchangeDeclare>(schema)) doEventExchangeDeclare(values);
                 else if (match<EventExchangeDelete>(schema)) doEventExchangeDelete(values);
                 else if (match<EventBind>(schema)) doEventBind(values);
-                // FIXME aconway 2011-11-21: handle unbind & all other events.
-                else if (match<EventSubscribe>(schema)) {} // Deliberately ignored.
-                // FIXME aconway 2011-12-02: error handling
-                else throw(Exception(QPID_MSG("Backup received unexpected event, schema="
-                                              << schema)));
+                // FIXME aconway 2011-11-21: handle unbind & all other relevant events.
             }
         } else if (headers->getAsString(QMF_OPCODE) == QUERY_RESPONSE) {
             for (Variant::List::iterator i = list.begin(); i != list.end(); ++i) {
@@ -253,15 +249,13 @@ void WiringReplicator::route(Deliverable
                 if      (type == QUEUE) doResponseQueue(values);
                 else if (type == EXCHANGE) doResponseExchange(values);
                 else if (type == BINDING) doResponseBind(values);
-                else throw Exception(QPID_MSG("HA: Unexpected response type: " << type));
+                // FIXME aconway 2011-12-06: handle all relevant response types.
             }
         } else {
-            QPID_LOG(error, QPID_MSG("HA: Backup received unexpected message: "
-                                       << *headers));
+            QPID_LOG(error, "HA: Backup replication got unexpected message: " << *headers);
         }
     } catch (const std::exception& e) {
-        QPID_LOG(error, "HA: Backup replication error: " << e.what());
-        QPID_LOG(error, "HA: Backup replication error while processing: " << list);
+        QPID_LOG(error, "HA: Backup replication error: " << e.what() << ": while handling: " << list);
     }
 }
 

Modified: qpid/branches/qpid-3603/qpid/cpp/src/tests/ha_tests.py
URL: http://svn.apache.org/viewvc/qpid/branches/qpid-3603/qpid/cpp/src/tests/ha_tests.py?rev=1211902&r1=1211901&r2=1211902&view=diff
==============================================================================
--- qpid/branches/qpid-3603/qpid/cpp/src/tests/ha_tests.py (original)
+++ qpid/branches/qpid-3603/qpid/cpp/src/tests/ha_tests.py Thu Dec  8 14:20:43 2011
@@ -118,6 +118,29 @@ class ShortTests(BrokerTest):
         self.assert_browse_retry(p, "foo", [])
         self.assert_browse_retry(b, "foo", [])
 
+    def test_sync(self):
+        def queue(name, replicate):
+            return "%s;{create:always,node:{x-declare:{arguments:{'qpid.replicate':%s}}}}"%(name, replicate)
+        primary = self.ha_broker(name="primary", broker_url="primary") # Temp hack to identify primary
+        p = primary.connect().session()
+        s = p.sender(queue("q","all"))
+        for m in [str(i) for i in range(0,10)]: s.send(m)
+        s.sync()
+        backup1 = self.ha_broker(name="backup1", broker_url=primary.host_port())
+        for m in [str(i) for i in range(10,20)]: s.send(m)
+        s.sync()
+        backup2 = self.ha_broker(name="backup2", broker_url=primary.host_port())
+        for m in [str(i) for i in range(20,30)]: s.send(m)
+        s.sync()
+        msgs = [str(i) for i in range(30)]
+
+        b = backup1.connect().session()
+        self.assert_browse_retry(b, "q", msgs)
+
+        b = backup2.connect().session()
+        self.assert_browse_retry(backup2.connect().session(), "q", msgs)
+
+
 if __name__ == "__main__":
     shutil.rmtree("brokertest.tmp", True)
     os.execvp("qpid-python-test", ["qpid-python-test", "-m", "ha_tests"] + sys.argv[1:])

Modified: qpid/branches/qpid-3603/qpid/cpp/src/tests/qpid-cluster-benchmark
URL: http://svn.apache.org/viewvc/qpid/branches/qpid-3603/qpid/cpp/src/tests/qpid-cluster-benchmark?rev=1211902&r1=1211901&r2=1211902&view=diff
==============================================================================
--- qpid/branches/qpid-3603/qpid/cpp/src/tests/qpid-cluster-benchmark (original)
+++ qpid/branches/qpid-3603/qpid/cpp/src/tests/qpid-cluster-benchmark Thu Dec  8 14:20:43 2011
@@ -30,7 +30,7 @@ RECEIVERS="-r 3"
 BROKERS=			# Local broker
 CLIENT_HOSTS=			# No ssh, all clients are local
 
-while getopts "m:f:n:b:q:s:r:c:txy" opt; do
+while getopts "m:f:n:b:q:s:r:c:txyv" opt; do
     case $opt in
 	m) MESSAGES="-m $OPTARG";;
 	f) FLOW="--flow-control $OPTARG";;
@@ -43,13 +43,16 @@ while getopts "m:f:n:b:q:s:r:c:txy" opt;
 	t) TCP_NODELAY="--connection-options {tcp-nodelay:true}";;
 	x) SAVE_RECEIVED="--save-received";;
 	y) NO_DELETE="--no-delete";;
+	v) OPTS="--verbose";;
 	*) echo "Unknown option"; exit 1;;
     esac
 done
+REPLICATE="node:{x-declare:{arguments:{'qpid.replicate':all}}}"
 BROKER=$(echo $BROKERS | sed s/,.*//)
 run_test() { echo $*; shift; "$@"; echo; echo; echo; }
 
-OPTS="$REPEAT $BROKERS --summarize $QUEUES $SENDERS $RECEIVERS $MESSAGES $CLIENT_HOSTS $SAVE_RECEIVED $TCP_NODELAY $NO_DELETE"
-run_test "Queue contention:" qpid-cpp-benchmark $OPTS
-run_test "No queue contention: :" qpid-cpp-benchmark $OPTS --group-receivers
+OPTS="$OPTS $REPEAT $BROKERS --summarize $QUEUES $SENDERS $RECEIVERS $MESSAGES $CLIENT_HOSTS $SAVE_RECEIVED $TCP_NODELAY $NO_DELETE"
+OPTS="$OPTS --create-option $REPLICATE"
+run_test "Benchmark:" qpid-cpp-benchmark $OPTS
+
 

Modified: qpid/branches/qpid-3603/qpid/cpp/src/tests/qpid-cpp-benchmark
URL: http://svn.apache.org/viewvc/qpid/branches/qpid-3603/qpid/cpp/src/tests/qpid-cpp-benchmark?rev=1211902&r1=1211901&r2=1211902&view=diff
==============================================================================
--- qpid/branches/qpid-3603/qpid/cpp/src/tests/qpid-cpp-benchmark (original)
+++ qpid/branches/qpid-3603/qpid/cpp/src/tests/qpid-cpp-benchmark Thu Dec  8 14:20:43 2011
@@ -55,6 +55,8 @@ op.add_option("--send-option", default=[
               help="Additional option for sending addresses")
 op.add_option("--receive-option", default=[], action="append", type="str",
               help="Additional option for receiving addresses")
+op.add_option("--create-option", default=[], action="append", type="str",
+              help="Additional option for creating addresses")
 op.add_option("--send-arg", default=[], action="append", type="str",
               help="Additional argument for qpid-send")
 op.add_option("--receive-arg", default=[], action="append", type="str",
@@ -75,6 +77,7 @@ op.add_option("--verbose", default=False
               help="Show commands executed")
 op.add_option("--no-delete", default=False, action="store_true",
               help="Don't delete the test queues.")
+
 single_quote_re = re.compile("'")
 def posix_quote(string):
     """ Quote a string for use as an argument in a posix shell"""
@@ -176,7 +179,7 @@ def queue_exists(queue,broker):
             return False
     finally: c.close()
 
-def recreate_queues(queues, brokers, no_delete):
+def recreate_queues(queues, brokers, no_delete, opts):
     c = qpid.messaging.Connection(brokers[0])
     c.open()
     s = c.session()
@@ -187,7 +190,9 @@ def recreate_queues(queues, brokers, no_
             # FIXME aconway 2011-05-04: new cluster async wiring, wait for changes to propagate
             for b in brokers:
                 while queue_exists(q,b): time.sleep(0.1);
-        s.sender("%s;{create:always}"%q)
+        address = "%s;{%s}"%(q, ",".join(opts.create_option + ["create:always"]))
+        if opts.verbose: print "Creating", address
+        s.sender(address)
         # FIXME aconway 2011-05-04: new cluster async wiring, wait for changes to propagate
         for b in brokers:
             while not queue_exists(q,b): time.sleep(0.1);
@@ -285,7 +290,7 @@ def main():
     queues = ["%s-%s"%(opts.queue_name, i) for i in xrange(opts.queues)]
     try:
         for i in xrange(opts.repeat):
-            recreate_queues(queues, opts.broker, opts.no_delete)
+            recreate_queues(queues, opts.broker, opts.no_delete, opts)
             ready_receiver = ReadyReceiver(ready_queue, opts.broker[0])
 
             if opts.group_receivers: # Run receivers for same queue against same broker.



---------------------------------------------------------------------
Apache Qpid - AMQP Messaging Implementation
Project:      http://qpid.apache.org
Use/Interact: mailto:commits-subscribe@qpid.apache.org