You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@storm.apache.org by bo...@apache.org on 2014/06/09 15:48:28 UTC

[23/32] git commit: STORM-297: add test case to test multiple receiver thread should reserve message order

STORM-297: add test case to test multiple receiver thread should reserve message order


Project: http://git-wip-us.apache.org/repos/asf/incubator-storm/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-storm/commit/426d1437
Tree: http://git-wip-us.apache.org/repos/asf/incubator-storm/tree/426d1437
Diff: http://git-wip-us.apache.org/repos/asf/incubator-storm/diff/426d1437

Branch: refs/heads/master
Commit: 426d143784ee3610aa3582d43316f1344abd6275
Parents: 20b4f8b
Author: Sean Zhong <cl...@gmail.com>
Authored: Tue Jun 3 14:44:09 2014 +0800
Committer: Sean Zhong <cl...@gmail.com>
Committed: Tue Jun 3 14:44:09 2014 +0800

----------------------------------------------------------------------
 .../storm/testing/TestEventLogSpout.java        | 105 +++++++++++++++++++
 .../storm/testing/TestEventOrderCheckBolt.java  |  76 ++++++++++++++
 .../test/clj/backtype/storm/messaging_test.clj  |  35 ++++++-
 3 files changed, 215 insertions(+), 1 deletion(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-storm/blob/426d1437/storm-core/src/jvm/backtype/storm/testing/TestEventLogSpout.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/backtype/storm/testing/TestEventLogSpout.java b/storm-core/src/jvm/backtype/storm/testing/TestEventLogSpout.java
new file mode 100644
index 0000000..a34484d
--- /dev/null
+++ b/storm-core/src/jvm/backtype/storm/testing/TestEventLogSpout.java
@@ -0,0 +1,105 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package backtype.storm.testing;
+
+import backtype.storm.topology.OutputFieldsDeclarer;
+
+import java.util.HashMap;
+import java.util.Map;
+import java.util.UUID;
+import java.util.concurrent.atomic.AtomicLong;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import backtype.storm.spout.SpoutOutputCollector;
+import backtype.storm.task.TopologyContext;
+import backtype.storm.topology.base.BaseRichSpout;
+import backtype.storm.tuple.Fields;
+import backtype.storm.tuple.Values;
+
+public class TestEventLogSpout extends BaseRichSpout {
+    public static Logger LOG = LoggerFactory.getLogger(TestEventLogSpout.class);
+    
+    private String uid;
+    private long totalCount;
+    private final static Map<String, AtomicLong> totalEmitCount = new HashMap<String, AtomicLong>();
+    
+    SpoutOutputCollector _collector;
+    private long eventId = 0;
+    private long myCount;
+    private int source;
+    
+    public TestEventLogSpout(long totalCount) {
+        this.uid = UUID.randomUUID().toString();
+        this.totalCount = totalCount;
+        
+        synchronized (totalEmitCount) {
+            if (null == totalEmitCount.get(uid)) {
+                totalEmitCount.put(uid, new AtomicLong(0));
+            }
+            
+        }
+    }
+        
+    public void open(Map conf, TopologyContext context, SpoutOutputCollector collector) {
+        _collector = collector;
+        this.source = context.getThisTaskId();
+        long taskCount = context.getComponentTasks(context.getThisComponentId()).size();
+        myCount = totalCount / taskCount;
+    }
+    
+    public void close() {
+        
+    }
+    
+    public void cleanup() {
+        synchronized(totalEmitCount) {            
+            totalEmitCount.remove(uid);
+        }
+    }
+    
+    public boolean completed() {
+        Long totalEmitted = totalEmitCount.get(uid).get();
+        
+        if (totalEmitted >= totalCount) {
+            return true;
+        }
+        return false;
+    }
+        
+    public void nextTuple() {
+        if (eventId < myCount) { 
+            eventId++;
+            _collector.emit(new Values(source, eventId), eventId);
+            totalEmitCount.get(uid).incrementAndGet();
+        }        
+    }
+    
+    public void ack(Object msgId) {
+
+    }
+
+    public void fail(Object msgId) {
+        
+    }
+    
+    public void declareOutputFields(OutputFieldsDeclarer declarer) {
+        declarer.declare(new Fields("source", "eventId"));
+    }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-storm/blob/426d1437/storm-core/src/jvm/backtype/storm/testing/TestEventOrderCheckBolt.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/backtype/storm/testing/TestEventOrderCheckBolt.java b/storm-core/src/jvm/backtype/storm/testing/TestEventOrderCheckBolt.java
new file mode 100644
index 0000000..1f80362
--- /dev/null
+++ b/storm-core/src/jvm/backtype/storm/testing/TestEventOrderCheckBolt.java
@@ -0,0 +1,76 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package backtype.storm.testing;
+
+import backtype.storm.topology.OutputFieldsDeclarer;
+
+import java.util.HashMap;
+import java.util.Map;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import backtype.storm.spout.SpoutOutputCollector;
+import backtype.storm.task.OutputCollector;
+import backtype.storm.task.TopologyContext;
+import backtype.storm.topology.base.BaseRichBolt;
+import backtype.storm.topology.base.BaseRichSpout;
+import backtype.storm.tuple.Fields;
+import backtype.storm.tuple.Tuple;
+import backtype.storm.tuple.Values;
+
+public class TestEventOrderCheckBolt extends BaseRichBolt {
+    public static Logger LOG = LoggerFactory.getLogger(TestEventOrderCheckBolt.class);
+    
+    private int _count;
+    OutputCollector _collector;
+    Map<Integer, Long> recentEventId = new HashMap<Integer, Long>();
+
+    public void prepare(Map stormConf, TopologyContext context, OutputCollector collector) {
+        _collector = collector;
+        _count = 0;
+    }
+
+    public void execute(Tuple input) {
+        Integer sourceId = input.getInteger(0);
+        Long eventId = input.getLong(1);
+        Long recentEvent = recentEventId.get(sourceId);
+
+        if (null != recentEvent && eventId <= recentEvent) {
+            String error = "Error: event id is not in strict order! event source Id: "
+                    + sourceId + ", last event Id: " + recentEvent + ", current event Id: " + eventId;
+
+            _collector.emit(input, new Values(error));
+        }
+        recentEventId.put(sourceId, eventId);
+
+        _collector.ack(input);
+    }
+
+    public void cleanup() {
+
+    }
+
+    public Fields getOutputFields() {
+        return new Fields("error");
+    }
+
+    public void declareOutputFields(OutputFieldsDeclarer declarer) {
+        declarer.declare(new Fields("error"));
+    }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-storm/blob/426d1437/storm-core/test/clj/backtype/storm/messaging_test.clj
----------------------------------------------------------------------
diff --git a/storm-core/test/clj/backtype/storm/messaging_test.clj b/storm-core/test/clj/backtype/storm/messaging_test.clj
index 94b9168..c719c68 100644
--- a/storm-core/test/clj/backtype/storm/messaging_test.clj
+++ b/storm-core/test/clj/backtype/storm/messaging_test.clj
@@ -15,7 +15,7 @@
 ;; limitations under the License.
 (ns backtype.storm.messaging-test
   (:use [clojure test])
-  (:import [backtype.storm.testing TestWordCounter TestWordSpout TestGlobalCount])
+  (:import [backtype.storm.testing TestWordCounter TestWordSpout TestGlobalCount TestEventLogSpout TestEventOrderCheckBolt])
   (:use [backtype.storm bootstrap testing])
   (:use [backtype.storm.daemon common])
   )
@@ -56,3 +56,36 @@
         (is (ms= (apply concat (repeat 6 [[1] [2] [3] [4]]))
                  (read-tuples results "2")))))))
 
+(extend-type TestEventLogSpout
+  CompletableSpout
+  (exhausted? [this]
+    (-> this .completed))
+  (cleanup [this]
+    (.cleanup this))
+  (startup [this]
+    ))
+
+;; Test Adding more receiver threads won't violate the message delivery order gurantee
+(deftest test-receiver-message-order 
+  (with-simulated-time-local-cluster [cluster :supervisors 1 :ports-per-supervisor 2
+                                        :daemon-conf {TOPOLOGY-WORKERS 2
+                                                      ;; Configure multiple receiver threads per worker 
+                                                      WORKER-RECEIVER-THREAD-COUNT 2
+                                                      STORM-LOCAL-MODE-ZMQ  true 
+                                                      STORM-MESSAGING-TRANSPORT 
+                                                      "backtype.storm.messaging.netty.Context"}]
+      (let [topology (thrift/mk-topology
+                       
+                       ;; TestEventLogSpout output(sourceId, eventId), eventId is Monotonically increasing
+                       {"1" (thrift/mk-spout-spec (TestEventLogSpout. 4000) :parallelism-hint 8)}
+                       
+                       ;; field grouping, message from same "source" task will be delivered to same bolt task
+                       ;; When received message order is not kept, Emit an error Tuple 
+                       {"2" (thrift/mk-bolt-spec {"1" ["source"]} (TestEventOrderCheckBolt.)
+                                                 :parallelism-hint 4)
+                        })
+            results (complete-topology cluster
+                                       topology)]
+        
+        ;; No error Tuple from Bolt TestEventOrderCheckBolt
+        (is (empty? (read-tuples results "2"))))))