You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@storm.apache.org by bo...@apache.org on 2014/06/09 15:48:28 UTC
[23/32] git commit: STORM-297: add test case to test multiple
receiver thread should reserve message order
STORM-297: add test case to test multiple receiver thread should reserve message order
Project: http://git-wip-us.apache.org/repos/asf/incubator-storm/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-storm/commit/426d1437
Tree: http://git-wip-us.apache.org/repos/asf/incubator-storm/tree/426d1437
Diff: http://git-wip-us.apache.org/repos/asf/incubator-storm/diff/426d1437
Branch: refs/heads/master
Commit: 426d143784ee3610aa3582d43316f1344abd6275
Parents: 20b4f8b
Author: Sean Zhong <cl...@gmail.com>
Authored: Tue Jun 3 14:44:09 2014 +0800
Committer: Sean Zhong <cl...@gmail.com>
Committed: Tue Jun 3 14:44:09 2014 +0800
----------------------------------------------------------------------
.../storm/testing/TestEventLogSpout.java | 105 +++++++++++++++++++
.../storm/testing/TestEventOrderCheckBolt.java | 76 ++++++++++++++
.../test/clj/backtype/storm/messaging_test.clj | 35 ++++++-
3 files changed, 215 insertions(+), 1 deletion(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-storm/blob/426d1437/storm-core/src/jvm/backtype/storm/testing/TestEventLogSpout.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/backtype/storm/testing/TestEventLogSpout.java b/storm-core/src/jvm/backtype/storm/testing/TestEventLogSpout.java
new file mode 100644
index 0000000..a34484d
--- /dev/null
+++ b/storm-core/src/jvm/backtype/storm/testing/TestEventLogSpout.java
@@ -0,0 +1,105 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package backtype.storm.testing;
+
+import backtype.storm.topology.OutputFieldsDeclarer;
+
+import java.util.HashMap;
+import java.util.Map;
+import java.util.UUID;
+import java.util.concurrent.atomic.AtomicLong;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import backtype.storm.spout.SpoutOutputCollector;
+import backtype.storm.task.TopologyContext;
+import backtype.storm.topology.base.BaseRichSpout;
+import backtype.storm.tuple.Fields;
+import backtype.storm.tuple.Values;
+
+public class TestEventLogSpout extends BaseRichSpout {
+ public static Logger LOG = LoggerFactory.getLogger(TestEventLogSpout.class);
+
+ private String uid;
+ private long totalCount;
+ private final static Map<String, AtomicLong> totalEmitCount = new HashMap<String, AtomicLong>();
+
+ SpoutOutputCollector _collector;
+ private long eventId = 0;
+ private long myCount;
+ private int source;
+
+ public TestEventLogSpout(long totalCount) {
+ this.uid = UUID.randomUUID().toString();
+ this.totalCount = totalCount;
+
+ synchronized (totalEmitCount) {
+ if (null == totalEmitCount.get(uid)) {
+ totalEmitCount.put(uid, new AtomicLong(0));
+ }
+
+ }
+ }
+
+ public void open(Map conf, TopologyContext context, SpoutOutputCollector collector) {
+ _collector = collector;
+ this.source = context.getThisTaskId();
+ long taskCount = context.getComponentTasks(context.getThisComponentId()).size();
+ myCount = totalCount / taskCount;
+ }
+
+ public void close() {
+
+ }
+
+ public void cleanup() {
+ synchronized(totalEmitCount) {
+ totalEmitCount.remove(uid);
+ }
+ }
+
+ public boolean completed() {
+ Long totalEmitted = totalEmitCount.get(uid).get();
+
+ if (totalEmitted >= totalCount) {
+ return true;
+ }
+ return false;
+ }
+
+ public void nextTuple() {
+ if (eventId < myCount) {
+ eventId++;
+ _collector.emit(new Values(source, eventId), eventId);
+ totalEmitCount.get(uid).incrementAndGet();
+ }
+ }
+
+ public void ack(Object msgId) {
+
+ }
+
+ public void fail(Object msgId) {
+
+ }
+
+ public void declareOutputFields(OutputFieldsDeclarer declarer) {
+ declarer.declare(new Fields("source", "eventId"));
+ }
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-storm/blob/426d1437/storm-core/src/jvm/backtype/storm/testing/TestEventOrderCheckBolt.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/backtype/storm/testing/TestEventOrderCheckBolt.java b/storm-core/src/jvm/backtype/storm/testing/TestEventOrderCheckBolt.java
new file mode 100644
index 0000000..1f80362
--- /dev/null
+++ b/storm-core/src/jvm/backtype/storm/testing/TestEventOrderCheckBolt.java
@@ -0,0 +1,76 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package backtype.storm.testing;
+
+import backtype.storm.topology.OutputFieldsDeclarer;
+
+import java.util.HashMap;
+import java.util.Map;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import backtype.storm.spout.SpoutOutputCollector;
+import backtype.storm.task.OutputCollector;
+import backtype.storm.task.TopologyContext;
+import backtype.storm.topology.base.BaseRichBolt;
+import backtype.storm.topology.base.BaseRichSpout;
+import backtype.storm.tuple.Fields;
+import backtype.storm.tuple.Tuple;
+import backtype.storm.tuple.Values;
+
+public class TestEventOrderCheckBolt extends BaseRichBolt {
+ public static Logger LOG = LoggerFactory.getLogger(TestEventOrderCheckBolt.class);
+
+ private int _count;
+ OutputCollector _collector;
+ Map<Integer, Long> recentEventId = new HashMap<Integer, Long>();
+
+ public void prepare(Map stormConf, TopologyContext context, OutputCollector collector) {
+ _collector = collector;
+ _count = 0;
+ }
+
+ public void execute(Tuple input) {
+ Integer sourceId = input.getInteger(0);
+ Long eventId = input.getLong(1);
+ Long recentEvent = recentEventId.get(sourceId);
+
+ if (null != recentEvent && eventId <= recentEvent) {
+ String error = "Error: event id is not in strict order! event source Id: "
+ + sourceId + ", last event Id: " + recentEvent + ", current event Id: " + eventId;
+
+ _collector.emit(input, new Values(error));
+ }
+ recentEventId.put(sourceId, eventId);
+
+ _collector.ack(input);
+ }
+
+ public void cleanup() {
+
+ }
+
+ public Fields getOutputFields() {
+ return new Fields("error");
+ }
+
+ public void declareOutputFields(OutputFieldsDeclarer declarer) {
+ declarer.declare(new Fields("error"));
+ }
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-storm/blob/426d1437/storm-core/test/clj/backtype/storm/messaging_test.clj
----------------------------------------------------------------------
diff --git a/storm-core/test/clj/backtype/storm/messaging_test.clj b/storm-core/test/clj/backtype/storm/messaging_test.clj
index 94b9168..c719c68 100644
--- a/storm-core/test/clj/backtype/storm/messaging_test.clj
+++ b/storm-core/test/clj/backtype/storm/messaging_test.clj
@@ -15,7 +15,7 @@
;; limitations under the License.
(ns backtype.storm.messaging-test
(:use [clojure test])
- (:import [backtype.storm.testing TestWordCounter TestWordSpout TestGlobalCount])
+ (:import [backtype.storm.testing TestWordCounter TestWordSpout TestGlobalCount TestEventLogSpout TestEventOrderCheckBolt])
(:use [backtype.storm bootstrap testing])
(:use [backtype.storm.daemon common])
)
@@ -56,3 +56,36 @@
(is (ms= (apply concat (repeat 6 [[1] [2] [3] [4]]))
(read-tuples results "2")))))))
+(extend-type TestEventLogSpout
+ CompletableSpout
+ (exhausted? [this]
+ (-> this .completed))
+ (cleanup [this]
+ (.cleanup this))
+ (startup [this]
+ ))
+
+;; Test Adding more receiver threads won't violate the message delivery order gurantee
+(deftest test-receiver-message-order
+ (with-simulated-time-local-cluster [cluster :supervisors 1 :ports-per-supervisor 2
+ :daemon-conf {TOPOLOGY-WORKERS 2
+ ;; Configure multiple receiver threads per worker
+ WORKER-RECEIVER-THREAD-COUNT 2
+ STORM-LOCAL-MODE-ZMQ true
+ STORM-MESSAGING-TRANSPORT
+ "backtype.storm.messaging.netty.Context"}]
+ (let [topology (thrift/mk-topology
+
+ ;; TestEventLogSpout output(sourceId, eventId), eventId is Monotonically increasing
+ {"1" (thrift/mk-spout-spec (TestEventLogSpout. 4000) :parallelism-hint 8)}
+
+ ;; field grouping, message from same "source" task will be delivered to same bolt task
+ ;; When received message order is not kept, Emit an error Tuple
+ {"2" (thrift/mk-bolt-spec {"1" ["source"]} (TestEventOrderCheckBolt.)
+ :parallelism-hint 4)
+ })
+ results (complete-topology cluster
+ topology)]
+
+ ;; No error Tuple from Bolt TestEventOrderCheckBolt
+ (is (empty? (read-tuples results "2"))))))