You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@storm.apache.org by bo...@apache.org on 2014/06/12 23:11:04 UTC
[06/14] Merge branch 'master' into idiomatic-clojure-01
http://git-wip-us.apache.org/repos/asf/incubator-storm/blob/55e1664d/storm-core/src/ui/public/js/script.js
----------------------------------------------------------------------
diff --git a/storm-core/src/ui/public/js/script.js b/storm-core/src/ui/public/js/script.js
index e9902ab..8f7608e 100644
--- a/storm-core/src/ui/public/js/script.js
+++ b/storm-core/src/ui/public/js/script.js
@@ -71,7 +71,7 @@ function ensureInt(n) {
function confirmAction(id, name, action, wait, defaultWait) {
var opts = {
type:'POST',
- url:'/topology/' + id + '/' + action
+ url:'/api/v1/topology/' + id + '/' + action
};
if (wait) {
var waitSecs = prompt('Do you really want to ' + action + ' topology "' + name + '"? ' +
@@ -91,7 +91,7 @@ function confirmAction(id, name, action, wait, defaultWait) {
$.ajax(opts).always(function () {
window.location.reload();
}).fail(function () {
- alert("Error while communicating with Nimbus.")
+ alert("Error while communicating with Nimbus.");
});
return false;
@@ -106,4 +106,49 @@ $(function () {
delayIn: 1000
});
}
-})
+});
+
+function formatConfigData(data) {
+ var mustacheFormattedData = {'config':[]};
+ for (var prop in data) {
+ if(data.hasOwnProperty(prop)) {
+ mustacheFormattedData['config'].push({
+ 'key': prop,
+ 'value': data[prop]
+ });
+ }
+ }
+ return mustacheFormattedData;
+}
+
+
+function renderToggleSys(div) {
+ var sys = $.cookies.get("sys") || false;
+ if(sys) {
+ div.append("<span data-original-title=\"Use this to toggle inclusion of storm system components.\" class=\"tip right\"><input onclick=\"toggleSys()\" value=\"Hide System Stats\" type=\"button\"></span>");
+ } else {
+ div.append("<span class=\"tip right\" title=\"Use this to toggle inclusion of storm system components.\"><input onclick=\"toggleSys()\" value=\"Show System Stats\" type=\"button\"></span>");
+ }
+}
+
+function topologyActionJson(id,name,status,msgTimeout) {
+ var jsonData = {};
+ jsonData["id"] = id;
+ jsonData["name"] = name;
+ jsonData["msgTimeout"] = msgTimeout;
+ jsonData["activateStatus"] = (status === "ACTIVE") ? "disabled" : "enabled";
+ jsonData["deactivateStatus"] = (status === "ACTIVE") ? "enabled" : "disabled";
+ jsonData["rebalanceStatus"] = (status === "ACTIVE" || status === "INACTIVE" ) ? "enabled" : "disabled";
+ jsonData["killStatus"] = (status !== "KILLED") ? "enabled" : "disabled";
+ return jsonData;
+}
+
+function topologyActionButton(id,name,status,actionLabel,command,wait,defaultWait) {
+ var buttonData = {};
+ buttonData["buttonStatus"] = status ;
+ buttonData["actionLabel"] = actionLabel;
+ buttonData["command"] = command;
+ buttonData["isWait"] = wait;
+ buttonData["defaultWait"] = defaultWait;
+ return buttonData;
+}
http://git-wip-us.apache.org/repos/asf/incubator-storm/blob/55e1664d/storm-core/src/ui/public/js/visualization.js
----------------------------------------------------------------------
diff --git a/storm-core/src/ui/public/js/visualization.js b/storm-core/src/ui/public/js/visualization.js
new file mode 100644
index 0000000..87596b8
--- /dev/null
+++ b/storm-core/src/ui/public/js/visualization.js
@@ -0,0 +1,403 @@
+// Inspired by
+// https://github.com/samizdatco/arbor/blob/master/docs/sample-project/main.js
+
+function renderGraph(elem) {
+
+ var canvas = $(elem).get(0);
+ canvas.width = $(window).width();
+ canvas.height = $(window).height();
+ var ctx = canvas.getContext("2d");
+ var gfx = arbor.Graphics(canvas);
+ var psys;
+
+ var totaltrans = 0;
+ var weights = {};
+ var texts = {};
+ var update = false;
+
+ var myRenderer = {
+ init: function(system){
+ psys = system;
+ psys.screenSize(canvas.width, canvas.height)
+ psys.screenPadding(20);
+ myRenderer.initMouseHandling();
+ },
+
+ signal_update: function() {
+ update = true;
+ },
+
+ redraw: function() {
+
+ if(!psys)
+ return;
+
+ if(update) {
+ totaltrans = calculate_total_transmitted(psys);
+ weights = calculate_weights(psys, totaltrans);
+ texts = calculate_texts(psys, totaltrans);
+ update = false;
+ }
+
+
+
+ ctx.fillStyle = "white";
+ ctx.fillRect(0, 0, canvas.width, canvas.height);
+ var x = 0;
+
+
+ psys.eachEdge(function(edge, pt1, pt2) {
+
+ var len = Math.sqrt(Math.pow(pt2.x - pt1.x,2) + Math.pow(pt2.y - pt1.y,2));
+ var sublen = len - (Math.max(50, 20 + gfx.textWidth(edge.target.name)) / 2);
+ var thirdlen = len/3;
+ var theta = Math.atan2(pt2.y - pt1.y, pt2.x - pt1.x);
+
+ var newpt2 = {
+ x : pt1.x + (Math.cos(theta) * sublen),
+ y : pt1.y + (Math.sin(theta) * sublen)
+ };
+
+ var thirdpt = {
+ x: pt1.x + (Math.cos(theta) * thirdlen),
+ y: pt1.y + (Math.sin(theta) * thirdlen)
+ }
+
+ weight = weights[edge.source.name + edge.target.name];
+
+ if(!weights[edge.source.name + edge.target.name])
+ {
+ totaltrans = calculate_total_transmitted(psys);
+ weights = calculate_weights(psys, totaltrans);
+ }
+
+ ctx.strokeStyle = "rgba(0,0,0, .333)";
+ ctx.lineWidth = 25 * weight + 5;
+ ctx.beginPath();
+
+ var arrlen = 15;
+ ctx.moveTo(pt1.x, pt1.y);
+ ctx.lineTo(newpt2.x, newpt2.y);
+ ctx.lineTo(newpt2.x - arrlen * Math.cos(theta-Math.PI/6), newpt2.y - arrlen * Math.sin(theta - Math.PI/6));
+ ctx.moveTo(newpt2.x, newpt2.y);
+ ctx.lineTo(newpt2.x - arrlen * Math.cos(theta+Math.PI/6), newpt2.y - arrlen * Math.sin(theta + Math.PI/6));
+
+
+ if (texts[edge.source.name + edge.target.name] == null)
+ {
+ totaltrans = calculate_total_transmitted(psys);
+ texts = calculate_texts(psys, totaltrans);
+ }
+
+ gfx.text(texts[edge.source.name + edge.target.name], thirdpt.x, thirdpt.y + 10, {color:"black", align:"center", font:"Arial", size:10})
+ ctx.stroke();
+ });
+
+ psys.eachNode(function(node, pt) {
+ var col;
+
+ var real_trans = gather_stream_count(node.data[":stats"], "default", "600");
+
+ if(node.data[":type"] === "bolt") {
+ var cap = Math.min(node.data[":capacity"], 1);
+ var red = Math.floor(cap * 225) + 30;
+ var green = Math.floor(255 - red);
+ var blue = Math.floor(green/5);
+ col = arbor.colors.encode({r:red,g:green,b:blue,a:1});
+ } else {
+ col = "#0000FF";
+ }
+
+ var w = Math.max(55, 25 + gfx.textWidth(node.name));
+
+ gfx.oval(pt.x - w/2, pt.y - w/2, w, w, {fill: col});
+ gfx.text(node.name, pt.x, pt.y+3, {color:"white", align:"center", font:"Arial", size:12});
+ gfx.text(node.name, pt.x, pt.y+3, {color:"white", align:"center", font:"Arial", size:12});
+
+ gfx.text(parseFloat(node.data[":latency"]).toFixed(2) + " ms", pt.x, pt.y + 17, {color:"white", align:"center", font:"Arial", size:12});
+
+ });
+
+ // Draw gradient sidebar
+ ctx.rect(0,0,50,canvas.height);
+ var grd = ctx.createLinearGradient(0,0,50,canvas.height);
+ grd.addColorStop(0, '#1ee12d');
+ grd.addColorStop(1, '#ff0000');
+ ctx.fillStyle=grd;
+ ctx.fillRect(0,0,50,canvas.height);
+
+
+ },
+
+ initMouseHandling:function() {
+ var dragged = null;
+
+ var clicked = false;
+
+ var handler = {
+ clicked:function(e){
+ var pos = $(canvas).offset();
+ _mouseP = arbor.Point(e.pageX-pos.left, e.pageY - pos.top);
+ dragged = psys.nearest(_mouseP);
+
+ if(dragged && dragged.node !== null) {
+ dragged.node.fixed = true;
+ }
+
+ clicked = true;
+ setTimeout(function(){clicked = false;}, 50);
+
+ $(canvas).bind('mousemove', handler.dragged);
+ $(window).bind('mouseup', handler.dropped);
+
+ return false;
+ },
+
+ dragged:function(e) {
+
+ var pos = $(canvas).offset();
+ var s = arbor.Point(e.pageX-pos.left, e.pageY-pos.top);
+
+ if(dragged && dragged.node != null) {
+ var p = psys.fromScreen(s);
+ dragged.node.p = p;
+ }
+
+ return false;
+
+ },
+
+ dropped:function(e) {
+ if(clicked) {
+ if(dragged.distance < 50) {
+ if(dragged && dragged.node != null) {
+ window.location = dragged.node.data[":link"];
+ }
+ }
+ }
+
+ if(dragged === null || dragged.node === undefined) return;
+ if(dragged.node !== null) dragged.node.fixed = false;
+ dragged.node.tempMass = 1000;
+ dragged = null;
+ $(canvas).unbind('mousemove', handler.dragged);
+ $(window).unbind('mouseup', handler.dropped);
+ _mouseP = null;
+ return false;
+ }
+
+ }
+
+ $(canvas).mousedown(handler.clicked);
+ }
+ }
+
+ return myRenderer;
+}
+
+function calculate_texts(psys, totaltrans) {
+ var texts = {};
+ psys.eachEdge(function(edge, pt1, pt2) {
+ var text = "";
+ for(var i = 0; i < edge.target.data[":inputs"].length; i++) {
+ var stream = edge.target.data[":inputs"][i][":stream"];
+ var sani_stream = edge.target.data[":inputs"][i][":sani-stream"];
+ if(stream_checked(sani_stream)
+ && edge.target.data[":inputs"][i][":component"] === edge.source.name) {
+ stream_transfered = gather_stream_count(edge.source.data[":stats"], sani_stream, "600");
+ text += stream + ": "
+ + stream_transfered + ": "
+ + (totaltrans > 0 ? Math.round((stream_transfered/totaltrans) * 100) : 0) + "%\n";
+
+ }
+ }
+
+ texts[edge.source.name + edge.target.name] = text;
+ });
+
+ return texts;
+}
+
+function calculate_weights(psys, totaltrans) {
+ var weights = {};
+
+ psys.eachEdge(function(edge, pt1, pt2) {
+ var trans = 0;
+ for(var i = 0; i < edge.target.data[":inputs"].length; i++) {
+ var stream = edge.target.data[":inputs"][i][":sani-stream"];
+ if(stream_checked(stream) && edge.target.data[":inputs"][i][":component"] === edge.source.name)
+ trans += gather_stream_count(edge.source.data[":stats"], stream, "600");
+ }
+ weights[edge.source.name + edge.target.name] = (totaltrans > 0 ? trans/totaltrans : 0);
+ });
+ return weights;
+}
+
+function calculate_total_transmitted(psys) {
+ var totaltrans = 0;
+ var countedmap = {}
+ psys.eachEdge(function(node, pt, pt2) {
+ if(!countedmap[node.source.name])
+ countedmap[node.source.name] = {};
+
+ for(var i = 0; i < node.target.data[":inputs"].length; i++) {
+ var stream = node.target.data[":inputs"][i][":stream"];
+ if(stream_checked(node.target.data[":inputs"][i][":sani-stream"]))
+ {
+ if(!countedmap[node.source.name][stream]) {
+ if(node.source.data[":stats"])
+ {
+ var toadd = gather_stream_count(node.source.data[":stats"], node.target.data[":inputs"][i][":sani-stream"], "600");
+ totaltrans += toadd;
+ }
+ countedmap[node.source.name][stream] = true;
+ }
+ }
+ }
+
+ });
+
+ return totaltrans;
+}
+
+function has_checked_stream_input(inputs) {
+
+ for(var i = 0; i < inputs.length; i++) {
+ var x = stream_checked(inputs[i][":sani-stream"]);
+ if(x)
+ return true;
+ }
+ return false;
+}
+
+function stream_checked(stream) {
+ var checked = $("#" + stream).is(":checked");
+ return checked;
+}
+
+function has_checked_stream_output(jdat, component) {
+ var ret = false;
+ $.each(jdat, function(k, v) {
+ for(var i = 0; i < v[":inputs"].length; i++) {
+ if(stream_checked(v[":inputs"][i][":sani-stream"])
+ && v[":inputs"][i][":component"] == component)
+ ret = true;
+ }
+ });
+ return ret;
+}
+
+function gather_stream_count(stats, stream, time) {
+ var transferred = 0;
+ if(stats)
+ for(var i = 0; i < stats.length; i++) {
+ if(stats[i][":transferred"] != null)
+ {
+ var stream_trans = stats[i][":transferred"][time][stream];
+ if(stream_trans != null)
+ transferred += stream_trans;
+ }
+ }
+ return transferred;
+}
+
+
+function rechoose(jdat, sys, box) {
+ var id = box.id;
+ if($(box).is(':checked'))
+ {
+ //Check each node in our json data to see if it has inputs from or outputs to selected streams. If it does, add a node for it.
+ $.each(jdat,function(k,v) {
+ if( has_checked_stream_input(v[":inputs"]) || has_checked_stream_output(jdat, k))
+ sys.addNode(k,v);
+ });
+
+ //Check each node in our json data and add necessary edges based on selected components.
+ $.each(jdat, function(k, v) {
+ for(var i = 0; i < v[":inputs"].length; i++)
+ if(v[":inputs"][i][":sani-stream"] === id) {
+
+ sys.addEdge(v[":inputs"][i][":component"], k, v);
+ }
+ });
+ }
+ else {
+ //Check each node to see if it should be pruned.
+ sys.prune(function(node, from, to) {
+ return !has_checked_stream_input(node.data[":inputs"]) && !has_checked_stream_output(jdat, node.name);
+ });
+
+ //Check each edge to see if it represents any selected streams. If not, prune it.
+ sys.eachEdge(function(edge, pt1, pt2) {
+ var inputs = edge.target.data[":inputs"];
+
+ if($.grep(inputs, function(input) {
+
+ return input[":component"] === edge.source.name
+ && stream_checked(input[":sani-stream"]);
+
+ }).length == 0)
+ {
+ sys.pruneEdge(edge);
+ }
+ });
+ }
+
+ //Tell the particle system's renderer that it needs to update its labels, colors, widths, etc.
+ sys.renderer.signal_update();
+ sys.renderer.redraw();
+
+}
+
+var topology_data;
+function update_data(jdat, sys) {
+ $.each(jdat, function(k,v) {
+ if(sys.getNode(k))
+ sys.getNode(k).data = v;
+ });
+}
+
+var should_update;
+function show_visualization(sys) {
+
+ if(sys == null)
+ {
+ sys = arbor.ParticleSystem(20, 1000, 0.15, true, 55, 0.02, 0.6);
+ sys.renderer = renderGraph("#topoGraph");
+ sys.stop();
+
+ $(".stream-box").click(function () { rechoose(topology_data, sys, this) });
+ }
+
+ should_update = true;
+ var update_freq_ms = 10000;
+ var update = function(should_rechoose){
+ $.ajax({
+ url: "/api/v1/topology/"+$.url().param("id")+"/visualization",
+ success: function(data, status, jqXHR) {
+ topology_data = data;
+ update_data(topology_data, sys);
+ sys.renderer.signal_update();
+ sys.renderer.redraw();
+ if(should_update)
+ setTimeout(update, update_freq_ms);
+ if(should_rechoose)
+ $(".stream-box").each(function () { rechoose(topology_data, sys, this) });
+ }
+ });
+ };
+
+ update(true);
+ $("#visualization-container").show(500);
+ $("#show-hide-visualization").attr('value', 'Hide Visualization');
+ $("#show-hide-visualization").unbind("click");
+ $("#show-hide-visualization").click(function () { hide_visualization(sys) });
+}
+
+function hide_visualization(sys) {
+ should_update = false;
+ $("#visualization-container").hide(500);
+ $("#show-hide-visualization").attr('value', 'Show Visualization');
+ $("#show-hide-visualization").unbind("click");
+ $("#show-hide-visualization").click(function () { show_visualization(sys) });
+}
http://git-wip-us.apache.org/repos/asf/incubator-storm/blob/55e1664d/storm-core/src/ui/public/templates/component-page-template.html
----------------------------------------------------------------------
diff --git a/storm-core/src/ui/public/templates/component-page-template.html b/storm-core/src/ui/public/templates/component-page-template.html
new file mode 100644
index 0000000..f2a5266
--- /dev/null
+++ b/storm-core/src/ui/public/templates/component-page-template.html
@@ -0,0 +1,152 @@
+<script id="component-summary-template" type="text/html">
+<h2>Component summary</h2>
+<table><thead><tr><th><span class="tip right" title="The ID assigned to a the Component by the Topology.">Id</span></th><th><span class="tip above" title="The name given to the topology by when it was submitted. Click the name to view the Topology's information.">Topology</span></th><th><span class="tip above" title="Executors are threads in a Worker process.">Executors</span></th><th><span class="tip above" title="A Task is an instance of a Bolt or Spout. The number of Tasks is almost always equal to the number of Executors.">Tasks</span></th></tr></thead>
+<tbody>
+<tr>
+<td>{{id}}</td>
+<td><a href="/topology.html?id={{topologyId}}">{{name}}</a></td>
+<td>{{executors}}</td>
+<td>{{tasks}}</td>
+</tbody>
+</table>
+</script>
+<script id="spout-stats-detail-template" type="text/html">
+<h2>Spout stats</h2>
+<table class="zebra-striped" id="spout-stats-table"><thead><tr><th class="header headerSortDown"><span class="tip right" title="The past period of time for which the statistics apply. Click on a value to set the window for this page.">Window</span></th><th class="header"><span class="tip above" title="The number of Tuples emitted.">Emitted</span></th><th class="header"><span data-original-title="The number of Tuples emitted that sent to one or more bolts." class="tip above">Transferred</span></th><th class="header"><span class="tip above" title="The average time a Tuple "tree" takes to be completely processed by the Topology. A value of 0 is expected if no acking is done.">Complete latency (ms)</span></th><th class="header"><span class="tip above" title="The number of Tuple "trees" successfully processed. A value of 0 is expected if no acking is done.">Acked</span></th><th class="header"><span data-original-title="The number of Tuple "trees" that were e
xplicitly failed or timed out before acking was completed. A value of 0 is expected if no acking is done." class="tip left">Failed</span></th></tr></thead>
+<tbody>
+{{#spoutSummary}}
+<tr>
+<td><a href="/component.html?id={{id}}&topology_id={{topologyId}}&window={{window}}">{{windowPretty}}</td>
+<td>{{transferred}}</td>
+<td>{{emitted}}</td>
+<td>{{completeLatency}}</td>
+<td>{{acked}}</td>
+<td>{{failed}}</td>
+</tr>
+{{/spoutSummary}}
+</tbody>
+</script>
+<script id="output-stats-template" type="text/html">
+<h2>Output stats ({{windowHint}})</h2>
+<table class="zebra-striped" id="output-stats-table"><thead><tr><th class="header headerSortDown"><span data-original-title="The name of the Tuple stream given in the Topolgy, or "default" if none was given." class="tip right">Stream</span></th><th class="header"><span class="tip above" title="The number of Tuples emitted.">Emitted</span></th><th class="header"><span data-original-title="The number of Tuples emitted that sent to one or more bolts." class="tip above">Transferred</span></th><th class="header"><span data-original-title="The average time a Tuple "tree" takes to be completely processed by the Topology. A value of 0 is expected if no acking is done." class="tip above">Complete latency (ms)</span></th><th class="header"><span data-original-title="The number of Tuple "trees" successfully processed. A value of 0 is expected if no acking is done." class="tip above">Acked</span></th><th class="header"><span data-original-title="The number of Tuple
"trees" that were explicitly failed or timed out before acking was completed. A value of 0 is expected if no acking is done." class="tip left">Failed</span></th></tr></thead>
+<tbody>
+{{#outputStats}}
+<tr>
+<td>{{stream}}</td>
+<td>{{emitted}}</td>
+<td>{{transferred}}</td>
+<td>{{completeLatency}}</td>
+<td>{{acked}}</td>
+<td>{{failed}}</td>
+</tr>
+{{/outputStats}}
+</tbody>
+</table>
+</script>
+<script id="executor-stats-template" type="text/html">
+<h2>Executors ({{windowHint}})</h2>
+<table class="zebra-striped" id="executor-stats-table"><thead><tr><th class="header headerSortDown"><span class="tip right" title="The unique executor ID.">Id</span></th><th class="header"><span class="tip right" title="The length of time an Executor (thread) has been alive.">Uptime</span></th><th class="header"><span class="tip above" title="The hostname reported by the remote host. (Note that this hostname is not the result of a reverse lookup at the Nimbus node.)">Host</span></th><th class="header"><span data-original-title="The port number used by the Worker to which an Executor is assigned. Click on the port number to open the logviewer page for this Worker." class="tip above">Port</span></th><th class="header"><span data-original-title="The number of Tuples emitted." class="tip above">Emitted</span></th><th class="header"><span class="tip above" title="The number of Tuples emitted that sent to one or more bolts.">Transferred</span></th><th class="header"><span class="tip above
" title="The average time a Tuple "tree" takes to be completely processed by the Topology. A value of 0 is expected if no acking is done.">Complete latency (ms)</span></th><th class="header"><span class="tip above" title="The number of Tuple "trees" successfully processed. A value of 0 is expected if no acking is done.">Acked</span></th><th class="header"><span data-original-title="The number of Tuple "trees" that were explicitly failed or timed out before acking was completed. A value of 0 is expected if no acking is done." class="tip left">Failed</span></th></tr></thead>
+<tbody>
+{{#executorStats}}
+<tr>
+<td>{{id}}</td>
+<td>{{uptime}}</td>
+<td>{{host}}</td>
+<td><a href="{{workerLogLink}}">{{port}}</a></td>
+<td>{{emitted}}</td>
+<td>{{transferred}}</td>
+<td>{{completeLatency}}</td>
+<td>{{acked}}</td>
+<td>{{failed}}</td>
+</tr>
+{{/executorStats}}
+</tbody>
+</table>
+</script>
+<script id="bolt-stats-template" type="text/html">
+<h2>Bolt stats</h2>
+<table class="zebra-striped" id="bolt-stats-table"><thead><tr><th class="header headerSortDown"><span class="tip right" title="The past period of time for which the statistics apply. Click on a value to set the window for this page.">Window</span></th><th class="header"><span class="tip above" title="The number of Tuples emitted.">Emitted</span></th><th class="header"><span class="tip above" title="The number of Tuples emitted that sent to one or more bolts.">Transferred</span></th><th class="header"><span data-original-title="The average time a Tuple spends in the execute method. The execute method may complete without sending an Ack for the tuple." class="tip above">Execute latency (ms)</span></th><th class="header"><span class="tip above" title="The number of incoming Tuples processed.">Executed</span></th><th class="header"><span data-original-title="The average time it takes to Ack a Tuple after it is first received. Bolts that join, aggregate or batch may not Ack a tuple unti
l a number of other Tuples have been received." class="tip above">Process latency (ms)</span></th><th class="header"><span data-original-title="The number of Tuples acknowledged by this Bolt." class="tip above">Acked</span></th><th class="header"><span data-original-title="The number of tuples Failed by this Bolt." class="tip left">Failed</span></th></tr></thead>
+<tbody>
+{{#boltStats}}
+<tr>
+<td><a href="/component.html?id={{id}}&topology_id={{topologyId}}&window={{window}}">{{windowPretty}}</td>
+<td>{{emitted}}</td>
+<td>{{transferred}}</td>
+<td>{{executeLatency}}</td>
+<td>{{executed}}</td>
+<td>{{processLatency}}</td>
+<td>{{acked}}</td>
+<td>{{failed}}</td>
+</tr>
+{{/boltStats}}
+</tbody>
+</script>
+<script id="bolt-input-stats-template" type="text/html">
+<h2>Input stats ({{windowHint}})</h2>
+<table class="zebra-striped" id="bolt-input-stats-table"><thead><tr><th class="header headerSortDown"><span class="tip right" title="The ID assigned to a the Component by the Topology.">Component</span></th><th class="header"><span class="tip right" title="The name of the Tuple stream given in the Topolgy, or "default" if none was given.">Stream</span></th><th class="header"><span class="tip above" title="The average time a Tuple spends in the execute method. The execute method may complete without sending an Ack for the tuple.">Execute latency (ms)</span></th><th class="header"><span class="tip above" title="The number of incoming Tuples processed.">Executed</span></th><th class="header"><span data-original-title="The average time it takes to Ack a Tuple after it is first received. Bolts that join, aggregate or batch may not Ack a tuple until a number of other Tuples have been received." class="tip above">Process latency (ms)</span></th><th class="header"><span class="ti
p above" title="The number of Tuples acknowledged by this Bolt.">Acked</span></th><th class="header"><span data-original-title="The number of tuples Failed by this Bolt." class="tip left">Failed</span></th></tr></thead>
+<tbody>
+{{#inputStats}}
+<tr>
+<td>{{component}}</td>
+<td>{{stream}}</td>
+<td>{{executeLatency}}</td>
+<td>{{executed}}</td>
+<td>{{processLatency}}</td>
+<td>{{acked}}</td>
+<td>{{failed}}</td>
+</tr>
+{{/inputStats}}
+</tbody>
+</table>
+</script>
+<script id="bolt-output-stats-template" type="text/html">
+<h2>Output stats ({{windowHint}})</h2>
+<table class="zebra-striped" id="bolt-output-stats-table"><thead><tr><th class="header headerSortDown"><span class="tip right" title="The name of the Tuple stream given in the Topolgy, or "default" if none was given.">Stream</span></th><th class="header"><span class="tip above" title="The number of Tuples emitted.">Emitted</span></th><th class="header"><span class="tip above" title="The number of Tuples emitted that sent to one or more bolts.">Transferred</span></th></tr></thead>
+<tbody>
+{{#outputStats}}
+<tr>
+<td>{{stream}}</td>
+<td>{{emitted}}</td>
+<td>{{transferred}}</td>
+</tr>
+{{/outputStats}}
+</tbody>
+</table>
+</script>
+<script id="bolt-executor-template" type="text/html">
+<h2>Executors</h2>
+<table class="zebra-striped" id="bolt-executor-table"><thead><tr><th class="header headerSortDown"><span class="tip right" title="The unique executor ID.">Id</span></th><th class="header"><span data-original-title="The length of time an Executor (thread) has been alive." class="tip right">Uptime</span></th><th class="header"><span class="tip above" title="The hostname reported by the remote host. (Note that this hostname is not the result of a reverse lookup at the Nimbus node.)">Host</span></th><th class="header"><span class="tip above" title="The port number used by the Worker to which an Executor is assigned. Click on the port number to open the logviewer page for this Worker.">Port</span></th><th class="header"><span class="tip above" title="The number of Tuples emitted.">Emitted</span></th><th class="header"><span class="tip above" title="The number of Tuples emitted that sent to one or more bolts.">Transferred</span></th><th class="header"><span class="tip above" title="If thi
s is around 1.0, the corresponding Bolt is running as fast as it can, so you may want to increase the Bolt's parallelism. This is (number executed * average execute latency) / measurement time.">Capacity (last 10m)</span></th><th class="header"><span data-original-title="The average time a Tuple spends in the execute method. The execute method may complete without sending an Ack for the tuple." class="tip above">Execute latency (ms)</span></th><th class="header"><span class="tip above" title="The number of incoming Tuples processed.">Executed</span></th><th class="header"><span data-original-title="The average time it takes to Ack a Tuple after it is first received. Bolts that join, aggregate or batch may not Ack a tuple until a number of other Tuples have been received." class="tip above">Process latency (ms)</span></th><th class="header"><span data-original-title="The number of Tuples acknowledged by this Bolt." class="tip above">Acked</span></th><th class="header"><span data-ori
ginal-title="The number of tuples Failed by this Bolt." class="tip left">Failed</span></th></tr></thead>
+<tbody>
+{{#executorStats}}
+<tr>
+<td>{{id}}}</td>
+<td>{{uptime}}</td>
+<td>{{host}}</td>
+<td><a href="{{workerLogLink}}">{{port}}</a></td>
+<td>{{emitted}}</td>
+<td>{{transferred}}</td>
+<td>{{capacity}}</td>
+<td>{{executeLatency}}</td>
+<td>{{executed}}</td>
+<td>{{processLatency}}</td>
+<td>{{acked}}</td>
+<td>{{failed}}</td>
+</tr>
+{{/executorStats}}
+</tbody>
+</table>
+</script>
+
+<script id="component-errors-template" type="text/html">
+<h2>Errors</h2>
+<table class="zebra-striped" id="component-errors-table"><thead><tr><th>Time</th><th>Error</th></tr></thead>
+<tbody>
+{{#componentErrors}}
+<tr>
+<td>{{time}}</td>
+<td>{{error}}</td>
+</tr>
+{{/componentErrors}}
+</tbody>
+</table>
+</script>
http://git-wip-us.apache.org/repos/asf/incubator-storm/blob/55e1664d/storm-core/src/ui/public/templates/index-page-template.html
----------------------------------------------------------------------
diff --git a/storm-core/src/ui/public/templates/index-page-template.html b/storm-core/src/ui/public/templates/index-page-template.html
new file mode 100644
index 0000000..128f2d0
--- /dev/null
+++ b/storm-core/src/ui/public/templates/index-page-template.html
@@ -0,0 +1,62 @@
+<script id="cluster-summary-template" type="text/html">
+<table id="cluster-summary-table"><thead><tr><th><span class="tip right" title="The version of storm installed on the UI node. (Hopefully, this is the same on all storm nodes!)">Version</span></th><th><span class="tip right" title="The duration the current Nimbus instance has been running. (Note that the storm cluster may have been deployed and available for a much longer period than the current Nimbus process has been running.)">Nimbus uptime</span></th><th><span class="tip above" title="The number of nodes in the cluster currently.">Supervisors</span></th><th><span class="tip above" title="Slots are Workers (processes).">Used slots</span></th><th><span class="tip above" title="Slots are Workers (processes).">Free slots</span></th><th><span class="tip above" title="Slots are Workers (processes).">Total slots</span></th><th><span class="tip above" title="Executors are threads in a Worker process.">Executors</span></th><th><span class="tip left" title="A Task is an instance of a Bolt
or Spout. The number of Tasks is almost always equal to the number of Executors.">Tasks</span></th></tr></thead>
+<tbody>
+<tr>
+ <td>{{stormVersion}}</td>
+ <td>{{nimbusUptime}}</td>
+ <td>{{supervisors}}</td>
+ <td>{{slotsUsed}}</td>
+ <td>{{slotsFree}}</td>
+ <td>{{slotsTotal}}</td>
+ <td>{{executorsTotal}}</td>
+ <td>{{tasksTotal}}</td>
+</tr>
+</tbody>
+</table>
+</script>
+<script id="topology-summary-template" type="text/html">
+<table class="zebra-striped" id="topology-summary-table">
+<thead><tr><th><span class="tip right" title="The name given to the topology by when it was submitted. Click the name to view the Topology's information.">Name</span></th><th><span class="tip right" title="The unique ID given to a Topology each time it is launched.">Id</span></th><th><span class="tip above" title="The status can be one of ACTIVE, INACTIVE, KILLED, or REBALANCING.">Status</span></th><th><span class="tip above" title="The time since the Topology was submitted.">Uptime</span></th><th><span class="tip above" title="The number of Workers (processes).">Num workers</span></th><th><span class="tip above" title="Executors are threads in a Worker process.">Num executors</span></th><th><span class="tip above" title="A Task is an instance of a Bolt or Spout. The number of Tasks is almost always equal to the number of Executors.">Num tasks</span></th></tr></thead>
+<tbody>
+{{#topologies}}
+<tr>
+ <td><a href="/topology.html?id={{id}}">{{name}}</a></td>
+ <td>{{id}}</td>
+ <td>{{status}}</td>
+ <td>{{uptime}}</td>
+ <td>{{tasksTotal}}</td>
+ <td>{{workersTotal}}</td>
+ <td>{{executorsTotal}}</td>
+</tr>
+{{/topologies}}
+</tbody>
+</table>
+</script>
+<script id="supervisor-summary-template" type="text/html">
+<table class="zebra-striped" id="supervisor-summary-table"><thead><tr><th><span class="tip right" title="A unique identifier given to a Supervisor when it joins the cluster.">Id</span></th><th><span class="tip above" title="The hostname reported by the remote host. (Note that this hostname is not the result of a reverse lookup at the Nimbus node.)">Host</span></th><th><span class="tip above" title="The length of time a Supervisor has been registered to the cluster.">Uptime</span></th><th><span class="tip above" title="Slots are Workers (processes).">Slots</span></th><th><span class="tip left" title="Slots are Workers (processes).">Used slots</span></th></tr></thead>
+<tbody>
+{{#supervisors}}
+<tr>
+ <td>{{id}}</td>
+ <td>{{host}}</td>
+ <td>{{uptime}}</td>
+ <td>{{slotsTotal}}</td>
+ <td>{{slotsUsed}}</td>
+</tr>
+{{/supervisors}}
+</tbody>
+</table>
+</script>
+
+<script id="configuration-template" type="text/html">
+<table class="zebra-striped" id="nimbus-configuration-table"><thead><tr><th>Key</th><th>Value</th></tr></thead>
+<tbody>
+{{#config}}
+<tr>
+<td>{{key}}</td>
+<td>{{value}}</td>
+</tr>
+{{/config}}
+</tbody>
+</table>
+</script>
http://git-wip-us.apache.org/repos/asf/incubator-storm/blob/55e1664d/storm-core/src/ui/public/templates/json-error-template.html
----------------------------------------------------------------------
diff --git a/storm-core/src/ui/public/templates/json-error-template.html b/storm-core/src/ui/public/templates/json-error-template.html
new file mode 100644
index 0000000..d797726
--- /dev/null
+++ b/storm-core/src/ui/public/templates/json-error-template.html
@@ -0,0 +1,4 @@
+<script id="json-error-template" type="text/html">
+<h2>{{error}}</h2>
+<pre>{{errorMessage}}</pre>
+</script>
http://git-wip-us.apache.org/repos/asf/incubator-storm/blob/55e1664d/storm-core/src/ui/public/templates/topology-page-template.html
----------------------------------------------------------------------
diff --git a/storm-core/src/ui/public/templates/topology-page-template.html b/storm-core/src/ui/public/templates/topology-page-template.html
new file mode 100644
index 0000000..da58f9d
--- /dev/null
+++ b/storm-core/src/ui/public/templates/topology-page-template.html
@@ -0,0 +1,128 @@
+<script id="topology-summary-template" type="text/html">
+ <table id="topology-summary-table">
+ <thead><tr><th><span class="tip right" title="The name given to the topology by when it was submitted.">Name</span></th><th><span class="tip right" title="The unique ID given to a Topology each time it is launched.">Id</span></th><th><span class="tip above" title="The status can be one of ACTIVE, INACTIVE, KILLED, or REBALANCING.">Status</span></th><th><span class="tip above" title="The time since the Topology was submitted.">Uptime</span></th><th><span class="tip above" title="The number of Workers (processes).">Num workers</span></th><th><span class="tip above" title="Executors are threads in a Worker process.">Num executors</span></th><th><span class="tip above" title="A Task is an instance of a Bolt or Spout. The number of Tasks is almost always equal to the number of Executors.">Num tasks</span></th></tr></thead>
+ <tbody>
+ <tr>
+ <td>{{name}}</td>
+ <td>{{id}}</td>
+ <td>{{status}}</td>
+ <td>{{uptime}}</td>
+ <td>{{tasksTotal}}</td>
+ <td>{{workersTotal}}</td>
+ <td>{{executorsTotal}}</td>
+ </tr>
+ </tbody>
+ </table>
+</script>
+<script id="topology-stats-template" type="text/html">
+ <h2>Topology stats</h2>
+ <table class="zebra-striped" id="topology-stats-table">
+ <thead><tr><th><span class="tip right" title="The past period of time for which the statistics apply. Click on a value to set the window for this page.">Window</span></th><th><span class="tip above" title="The number of Tuples emitted.">Emitted</span></th><th><span class="tip above" title="The number of Tuples emitted that sent to one or more bolts.">Transferred</span></th><th><span class="tip above" title="The average time a Tuple "tree" takes to be completely processed by the Topology. A value of 0 is expected if no acking is done.">Complete latency (ms)</span></th><th><span class="tip above" title="The number of Tuple "trees" successfully processed. A value of 0 is expected if no acking is done.">Acked</span></th><th><span class="tip left" title="The number of Tuple "trees" that were explicitly failed or timed out before acking was completed. A value of 0 is expected if no acking is done.">Failed</span></th></tr></thead>
+ <tbody>
+ {{#topologyStats}}
+ <tr>
+ <td><a href="/topology.html?id={{id}}&window={{window}}">{{windowPretty}}</td>
+ <td>{{emitted}}</td>
+ <td>{{transferred}}</td>
+ <td>{{completeLatency}}</td>
+ <td>{{acked}}</td>
+ <td>{{failed}}</td>
+ </tr>
+ {{/topologyStats}}
+ </tbody>
+ </table>
+</script>
+<script id="topology-visualization-template" type="text/html">
+ <h2>Topology Visualization</h2>
+ <input type="button" id="show-hide-visualization" value="Show Visualization"/>
+ <p>
+ <div id="visualization-container" style="display:none;">
+ <p>
+ <table class="zebra-striped">
+ <thead>
+ <tr>
+ <th class="header" colspan=4>
+ Streams
+ </th>
+ </tr>
+ </thead>
+ {{#visualizationTable}}
+ <tr>
+ {{#:row}}
+ <td>
+ <input type="checkbox" id={{:sani-stream}} class="stream-box" {{#:checked}}checked{{/:checked}}/>
+ {{:stream}}
+ </td>
+ {{/:row}}
+ </tr>
+ {{/visualizationTable}}
+ </table>
+ </p>
+ <canvas id="topoGraph" width=1024 height=768 style="border:1px solid #000000;">
+ </div>
+ </p>
+</script>
+
+<script id="topology-configuration-template" type="text/html">
+ <h2>Topology Configuration</h2>
+ <table class="zebra-striped" id="topology-configuration-table"><thead><tr><th>Key</th><th>Value</th></tr></thead>
+ <tbody>
+ {{#config}}
+ <tr>
+ <td>{{key}}</td>
+ <td>{{value}}</td>
+ </tr>
+ {{/config}}
+ </tbody>
+ </table>
+</script>
+<script id="spout-stats-template" type="text/html">
+ <h2>Spouts ({{windowHint}})</h2>
+ <table class="zebra-striped" id="spout-stats-table">
+ <thead>
+ <tr><th class="header headerSortDown"><span data-original-title="The ID assigned to a the Component by the Topology. Click on the name to view the Component's page." class="tip right">Id</span></th><th class="header"><span data-original-title="Executors are threads in a Worker process." class="tip right">Executors</span></th><th class="header"><span class="tip above" title="A Task is an instance of a Bolt or Spout. The number of Tasks is almost always equal to the number of Executors.">Tasks</span></th><th class="header"><span class="tip above" title="The number of Tuples emitted.">Emitted</span></th><th class="header"><span class="tip above" title="The number of Tuples emitted that sent to one or more bolts.">Transferred</span></th><th class="header"><span class="tip above" title="The average time a Tuple "tree" takes to be completely processed by the Topology. A value of 0 is expected if no acking is done.">Complete latency (ms)</span></th><th class="header"><span
class="tip above" title="The number of Tuple "trees" successfully processed. A value of 0 is expected if no acking is done.">Acked</span></th><th class="header"><span class="tip above" title="The number of Tuple "trees" that were explicitly failed or timed out before acking was completed. A value of 0 is expected if no acking is done.">Failed</span></th><th class="header">Last error</th>
+ </tr>
+ </thead>
+ <tbody>
+ {{#spouts}}
+ <tr>
+ <td><a href="/component.html?id={{spoutId}}&topology_id={{id}}">{{spoutId}}</a></td>
+ <td>{{executors}}</td>
+ <td>{{tasks}}</td>
+ <td>{{emitted}}</td>
+ <td>{{transferred}}</td>
+ <td>{{completeLatency}}</td>
+ <td>{{acked}}</td>
+ <td>{{failed}}</td>
+ <td>{{lastError}}</td>
+ {{/spouts}}
+ </tbody>
+ </table>
+</script>
+<script id="bolt-stats-template" type="text/html">
+ <h2>Bolts ({{windowHint}})</h2>
+ <table class="zebra-striped" id="bolt-stats-table"><thead>
+ <tr><th class="header headerSortDown"><span class="tip right" title="The ID assigned to a the Component by the Topology. Click on the name to view the Component's page.">Id</span></th><th class="header"><span data-original-title="Executors are threads in a Worker process." class="tip right">Executors</span></th><th class="header"><span class="tip above" title="A Task is an instance of a Bolt or Spout. The number of Tasks is almost always equal to the number of Executors.">Tasks</span></th><th class="header"><span class="tip above" title="The number of Tuples emitted.">Emitted</span></th><th class="header"><span class="tip above" title="The number of Tuples emitted that sent to one or more bolts.">Transferred</span></th><th class="header"><span data-original-title="If this is around 1.0, the corresponding Bolt is running as fast as it can, so you may want to increase the Bolt's parallelism. This is (number executed * average execute latency) / measurement time." class="tip abov
e">Capacity (last 10m)</span></th><th class="header"><span class="tip above" title="The average time a Tuple spends in the execute method. The execute method may complete without sending an Ack for the tuple.">Execute latency (ms)</span></th><th class="header"><span class="tip above" title="The number of incoming Tuples processed.">Executed</span></th><th class="header"><span class="tip above" title="The average time it takes to Ack a Tuple after it is first received. Bolts that join, aggregate or batch may not Ack a tuple until a number of other Tuples have been received.">Process latency (ms)</span></th><th class="header"><span class="tip above" title="The number of Tuples acknowledged by this Bolt.">Acked</span></th><th class="header"><span class="tip left" title="The number of tuples Failed by this Bolt.">Failed</span></th><th class="header">Last error</th>
+ </tr></thead>
+ <tbody>
+ {{#bolts}}
+ <tr>
+ <td><a href="/component.html?id={{boltId}}&topology_id={{id}}">{{boltId}}</a></td>
+ <td>{{executors}}</td>
+ <td>{{tasks}}</td>
+ <td>{{emitted}}</td>
+ <td>{{transferred}}</td>
+ <td>{{capacity}}</td>
+ <td>{{executeLatency}}</td>
+ <td>{{executed}}</td>
+ <td>{{processLatency}}</td>
+ <td>{{acked}}</td>
+ <td>{{failed}}</td>
+ <td>{{lastError}}</td>
+ {{/bolts}}
+ </tbody>
+</script>
+
+<script id="topology-actions-template" type="text/html">
+ <input {{activateStatus}} onclick="confirmAction('{{id}}', '{{name}}', 'activate', false, 0)" type="button" value="Activate"><input {{deactivateStatus}} onclick="confirmAction('{{id}}', '{{name}}', 'deactivate', false, 0)" type="button" value="Deactivate"><input {{rebalanceStatus}} onclick="confirmAction('{{id}}', '{{name}}', 'rebalance', true, {{msgTimeout}})" type="button" value="Rebalance"><input {{killStatus}} onclick="confirmAction('{{id}}', '{{name}}', 'kill', true, 30)" type="button" value="Kill">
+</script>
http://git-wip-us.apache.org/repos/asf/incubator-storm/blob/55e1664d/storm-core/src/ui/public/topology.html
----------------------------------------------------------------------
diff --git a/storm-core/src/ui/public/topology.html b/storm-core/src/ui/public/topology.html
new file mode 100644
index 0000000..df095ad
--- /dev/null
+++ b/storm-core/src/ui/public/topology.html
@@ -0,0 +1,90 @@
+<!DOCTYPE html PUBLIC "-//W3C//DTD HTML 4.01//EN" "http://www.w3.org/TR/html4/strict.dtd">
+<html><head>
+<title>Storm UI</title>
+<link href="/css/bootstrap-1.4.0.css" rel="stylesheet" type="text/css">
+<link href="/css/style.css" rel="stylesheet" type="text/css">
+<script src="/js/jquery-1.6.2.min.js" type="text/javascript"></script>
+<script src="/js/jquery.tablesorter.min.js" type="text/javascript"></script>
+<script src="/js/jquery.cookies.2.2.0.min.js" type="text/javascript"></script>
+<script src="/js/jquery.mustache.js" type="text/javascript"></script>
+<script src="/js/purl.js" type="text/javascript"></script>
+<script src="/js/bootstrap-twipsy.js" type="text/javascript"></script>
+<script src="/js/script.js" type="text/javascript"></script>
+<script src="/js/visualization.js" type="text/javascript"></script>
+<script src="/js/arbor.js" type="text/javascript"></script>
+<script src="/js/arbor-graphics.js" type="text/javascript"></script>
+</head>
+<body>
+<h1><a href="/">Storm UI</a></h1>
+<h2>Topology summary</h2>
+<div id="topology-summary">
+</div>
+<div id="topology-actions">
+<h2 class="js-only">Topology actions</h2>
+<p id="topology-actions" class="js-only">
+</p>
+</div>
+<div id="topology-stats"></div>
+<div id="spout-stats">
+</div>
+<div id="bolt-stats">
+</div>
+<div id="topology-visualization">
+</div>
+<div id="topology-configuration">
+</div>
+<p id="toggle-switch" style="display: block;" class="js-only"></p>
+<div id="json-response-error">
+</div>
+</body>
+<script>
+$(document).ready(function() {
+ var topologyId = $.url().param("id");
+ var window = $.url().param("window");
+ var sys = $.cookies.get("sys") || "false";
+ var url = "/api/v1/topology/"+topologyId+"?sys="+sys;
+ if(window) url += "&window="+window;
+ renderToggleSys($("#toggle-switch"));
+ $.ajaxSetup({
+ "error":function(jqXHR,textStatus,response) {
+ var errorJson = jQuery.parseJSON(jqXHR.responseText);
+ $.get("/templates/json-error-template.html", function(template) {
+ $("#json-response-error").append(Mustache.render($(template).filter("#json-error-template").html(),errorJson));
+ });
+ }
+ });
+
+ $.getJSON(url,function(response,status,jqXHR) {
+ var topologySummary = $("#topology-summary");
+ var topologyStats = $("#topology-stats");
+ var spoutStats = $("#spout-stats");
+ var boltStats = $("#bolt-stats");
+ var config = $("#topology-configuration");
+ var topologyActions = $("#topology-actions");
+ var topologyVisualization = $("#topology-visualization")
+ var formattedConfig = formatConfigData(response["configuration"]);
+ var buttonJsonData = topologyActionJson(response["id"],response["name"],response["status"],response["msgTimeout"]);
+ $.get("/templates/topology-page-template.html", function(template) {
+ topologySummary.append(Mustache.render($(template).filter("#topology-summary-template").html(),response));
+ topologyActions.append(Mustache.render($(template).filter("#topology-actions-template").html(),buttonJsonData));
+ topologyStats.append(Mustache.render($(template).filter("#topology-stats-template").html(),response));
+ $("#topology-stats-table").tablesorter({ sortList: [[0,0]], headers: {0: { sorter: "stormtimestr"}}});
+ spoutStats.append(Mustache.render($(template).filter("#spout-stats-template").html(),response));
+ if(response["spouts"].length > 0) {
+ $("#spout-stats-table").tablesorter({sortList: [[0,0]], headers:{}});
+ }
+ boltStats.append(Mustache.render($(template).filter("#bolt-stats-template").html(),response));
+ if(response["bolts"].length > 0) {
+ $("#bolt-stats-table").tablesorter({sortList: [[0,0]], headers:{}});
+ }
+
+ topologyVisualization.append(Mustache.render($(template).filter("#topology-visualization-template").html(), response));
+ $("#show-hide-visualization").click(function () { show_visualization(null) });
+
+ config.append(Mustache.render($(template).filter("#topology-configuration-template").html(),formattedConfig));
+ $("#topology-configuration-table").tablesorter({ sortList: [[0,0]], headers: {}});
+ });
+ });
+ });
+</script>
+</html>
http://git-wip-us.apache.org/repos/asf/incubator-storm/blob/55e1664d/storm-core/test/clj/backtype/storm/drpc_test.clj
----------------------------------------------------------------------
diff --git a/storm-core/test/clj/backtype/storm/drpc_test.clj b/storm-core/test/clj/backtype/storm/drpc_test.clj
index fbc60e6..6d0ba2b 100644
--- a/storm-core/test/clj/backtype/storm/drpc_test.clj
+++ b/storm-core/test/clj/backtype/storm/drpc_test.clj
@@ -22,9 +22,11 @@
(:import [backtype.storm LocalDRPC LocalCluster])
(:import [backtype.storm.tuple Fields])
(:import [backtype.storm.generated DRPCExecutionException])
- (:use [backtype.storm bootstrap testing])
- (:use [backtype.storm.daemon common])
+ (:import [java.util.concurrent ConcurrentLinkedQueue])
+ (:use [backtype.storm bootstrap config testing])
+ (:use [backtype.storm.daemon common drpc])
(:use [backtype.storm clojure])
+ (:use [conjure core])
)
(bootstrap)
@@ -218,3 +220,24 @@
(.shutdown cluster)
(.shutdown drpc)
))
+
+(deftest test-dequeue-req-after-timeout
+ (let [queue (ConcurrentLinkedQueue.)
+ delay-seconds 2]
+ (stubbing [acquire-queue queue
+ read-storm-config {DRPC-REQUEST-TIMEOUT-SECS delay-seconds}]
+ (let [drpc-handler (service-handler)]
+ (is (thrown? DRPCExecutionException
+ (.execute drpc-handler "ArbitraryDRPCFunctionName" "")))
+ (is (= 0 (.size queue)))))))
+
+(deftest test-drpc-timeout-cleanup
+ (let [queue (ConcurrentLinkedQueue.)
+ delay-seconds 1]
+ (stubbing [acquire-queue queue
+ read-storm-config {DRPC-REQUEST-TIMEOUT-SECS delay-seconds}
+ timeout-check-secs delay-seconds]
+ (let [drpc-handler (service-handler)]
+ (is (thrown? DRPCExecutionException
+ (.execute drpc-handler "ArbitraryDRPCFunctionName" "no-args")))))))
+
http://git-wip-us.apache.org/repos/asf/incubator-storm/blob/55e1664d/storm-core/test/clj/backtype/storm/messaging/netty_unit_test.clj
----------------------------------------------------------------------
diff --git a/storm-core/test/clj/backtype/storm/messaging/netty_unit_test.clj b/storm-core/test/clj/backtype/storm/messaging/netty_unit_test.clj
index f271607..ea7b8dc 100644
--- a/storm-core/test/clj/backtype/storm/messaging/netty_unit_test.clj
+++ b/storm-core/test/clj/backtype/storm/messaging/netty_unit_test.clj
@@ -37,7 +37,8 @@
server (.bind context nil port)
client (.connect context nil "localhost" port)
_ (.send client task (.getBytes req_msg))
- resp (.recv server 0)]
+ iter (.recv server 0 0)
+ resp (.next iter)]
(is (= task (.task resp)))
(is (= req_msg (String. (.message resp))))
(.close client)
@@ -58,7 +59,8 @@
server (.bind context nil port)
client (.connect context nil "localhost" port)
_ (.send client task (.getBytes req_msg))
- resp (.recv server 0)]
+ iter (.recv server 0 0)
+ resp (.next iter)]
(is (= task (.task resp)))
(is (= req_msg (String. (.message resp))))
(.close client)
@@ -77,15 +79,23 @@
}
context (TransportFactory/makeContext storm-conf)
client (.connect context nil "localhost" port)
+
+ server (Thread.
+ (fn []
+ (Thread/sleep 1000)
+ (let [server (.bind context nil port)
+ iter (.recv server 0 0)
+ resp (.next iter)]
+ (is (= task (.task resp)))
+ (is (= req_msg (String. (.message resp))))
+ (.close server)
+ )))
+ _ (.start server)
_ (.send client task (.getBytes req_msg))
- _ (Thread/sleep 1000)
- server (.bind context nil port)
- resp (.recv server 0)]
- (is (= task (.task resp)))
- (is (= req_msg (String. (.message resp))))
+ ]
(.close client)
- (.close server)
- (.term context)))
+ (.join server)
+ (.term context)))
(deftest test-batch
(let [storm-conf {STORM-MESSAGING-TRANSPORT "backtype.storm.messaging.netty.Context"
@@ -102,11 +112,21 @@
(doseq [num (range 1 100000)]
(let [req_msg (str num)]
(.send client task (.getBytes req_msg))))
- (doseq [num (range 1 100000)]
+
+ (let [resp (ArrayList.)
+ received (atom 0)]
+ (while (< @received (- 100000 1))
+ (let [iter (.recv server 0 0)]
+ (while (.hasNext iter)
+ (let [msg (.next iter)]
+ (.add resp msg)
+ (swap! received inc)
+ ))))
+ (doseq [num (range 1 100000)]
(let [req_msg (str num)
- resp (.recv server 0)
- resp_msg (String. (.message resp))]
- (is (= req_msg resp_msg))))
+ resp_msg (String. (.message (.get resp (- num 1))))]
+ (is (= req_msg resp_msg)))))
+
(.close client)
(.close server)
(.term context)))
http://git-wip-us.apache.org/repos/asf/incubator-storm/blob/55e1664d/storm-core/test/clj/backtype/storm/messaging_test.clj
----------------------------------------------------------------------
diff --git a/storm-core/test/clj/backtype/storm/messaging_test.clj b/storm-core/test/clj/backtype/storm/messaging_test.clj
index 94b9168..c719c68 100644
--- a/storm-core/test/clj/backtype/storm/messaging_test.clj
+++ b/storm-core/test/clj/backtype/storm/messaging_test.clj
@@ -15,7 +15,7 @@
;; limitations under the License.
(ns backtype.storm.messaging-test
(:use [clojure test])
- (:import [backtype.storm.testing TestWordCounter TestWordSpout TestGlobalCount])
+ (:import [backtype.storm.testing TestWordCounter TestWordSpout TestGlobalCount TestEventLogSpout TestEventOrderCheckBolt])
(:use [backtype.storm bootstrap testing])
(:use [backtype.storm.daemon common])
)
@@ -56,3 +56,36 @@
(is (ms= (apply concat (repeat 6 [[1] [2] [3] [4]]))
(read-tuples results "2")))))))
+(extend-type TestEventLogSpout
+ CompletableSpout
+ (exhausted? [this]
+ (-> this .completed))
+ (cleanup [this]
+ (.cleanup this))
+ (startup [this]
+ ))
+
+;; Test Adding more receiver threads won't violate the message delivery order gurantee
+(deftest test-receiver-message-order
+ (with-simulated-time-local-cluster [cluster :supervisors 1 :ports-per-supervisor 2
+ :daemon-conf {TOPOLOGY-WORKERS 2
+ ;; Configure multiple receiver threads per worker
+ WORKER-RECEIVER-THREAD-COUNT 2
+ STORM-LOCAL-MODE-ZMQ true
+ STORM-MESSAGING-TRANSPORT
+ "backtype.storm.messaging.netty.Context"}]
+ (let [topology (thrift/mk-topology
+
+ ;; TestEventLogSpout output(sourceId, eventId), eventId is Monotonically increasing
+ {"1" (thrift/mk-spout-spec (TestEventLogSpout. 4000) :parallelism-hint 8)}
+
+ ;; field grouping, message from same "source" task will be delivered to same bolt task
+ ;; When received message order is not kept, Emit an error Tuple
+ {"2" (thrift/mk-bolt-spec {"1" ["source"]} (TestEventOrderCheckBolt.)
+ :parallelism-hint 4)
+ })
+ results (complete-topology cluster
+ topology)]
+
+ ;; No error Tuple from Bolt TestEventOrderCheckBolt
+ (is (empty? (read-tuples results "2"))))))