You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@samza.apache.org by cr...@apache.org on 2013/08/12 18:21:32 UTC
[02/15] initial import.
http://git-wip-us.apache.org/repos/asf/incubator-samza/blob/5ff71e51/samza-yarn/src/main/resources/scalate/js/bootstrap.min.js
----------------------------------------------------------------------
diff --git a/samza-yarn/src/main/resources/scalate/js/bootstrap.min.js b/samza-yarn/src/main/resources/scalate/js/bootstrap.min.js
new file mode 100644
index 0000000..bafe4b8
--- /dev/null
+++ b/samza-yarn/src/main/resources/scalate/js/bootstrap.min.js
@@ -0,0 +1,6 @@
+/*!
+* Bootstrap.js by @fat & @mdo
+* Copyright 2012 Twitter, Inc.
+* http://www.apache.org/licenses/LICENSE-2.0.txt
+*/
+!function(e){"use strict";e(function(){e.support.transition=function(){var e=function(){var e=document.createElement("bootstrap"),t={WebkitTransition:"webkitTransitionEnd",MozTransition:"transitionend",OTransition:"oTransitionEnd otransitionend",transition:"transitionend"},n;for(n in t)if(e.style[n]!==undefined)return t[n]}();return e&&{end:e}}()})}(window.jQuery),!function(e){"use strict";var t='[data-dismiss="alert"]',n=function(n){e(n).on("click",t,this.close)};n.prototype.close=function(t){function s(){i.trigger("closed").remove()}var n=e(this),r=n.attr("data-target"),i;r||(r=n.attr("href"),r=r&&r.replace(/.*(?=#[^\s]*$)/,"")),i=e(r),t&&t.preventDefault(),i.length||(i=n.hasClass("alert")?n:n.parent()),i.trigger(t=e.Event("close"));if(t.isDefaultPrevented())return;i.removeClass("in"),e.support.transition&&i.hasClass("fade")?i.on(e.support.transition.end,s):s()};var r=e.fn.alert;e.fn.alert=function(t){return this.each(function(){var r=e(this),i=r.data("alert");i||r.data("alert",i=
new n(this)),typeof t=="string"&&i[t].call(r)})},e.fn.alert.Constructor=n,e.fn.alert.noConflict=function(){return e.fn.alert=r,this},e(document).on("click.alert.data-api",t,n.prototype.close)}(window.jQuery),!function(e){"use strict";var t=function(t,n){this.$element=e(t),this.options=e.extend({},e.fn.button.defaults,n)};t.prototype.setState=function(e){var t="disabled",n=this.$element,r=n.data(),i=n.is("input")?"val":"html";e+="Text",r.resetText||n.data("resetText",n[i]()),n[i](r[e]||this.options[e]),setTimeout(function(){e=="loadingText"?n.addClass(t).attr(t,t):n.removeClass(t).removeAttr(t)},0)},t.prototype.toggle=function(){var e=this.$element.closest('[data-toggle="buttons-radio"]');e&&e.find(".active").removeClass("active"),this.$element.toggleClass("active")};var n=e.fn.button;e.fn.button=function(n){return this.each(function(){var r=e(this),i=r.data("button"),s=typeof n=="object"&&n;i||r.data("button",i=new t(this,s)),n=="toggle"?i.toggle():n&&i.setState(n)})},e.fn.button.de
faults={loadingText:"loading..."},e.fn.button.Constructor=t,e.fn.button.noConflict=function(){return e.fn.button=n,this},e(document).on("click.button.data-api","[data-toggle^=button]",function(t){var n=e(t.target);n.hasClass("btn")||(n=n.closest(".btn")),n.button("toggle")})}(window.jQuery),!function(e){"use strict";var t=function(t,n){this.$element=e(t),this.$indicators=this.$element.find(".carousel-indicators"),this.options=n,this.options.pause=="hover"&&this.$element.on("mouseenter",e.proxy(this.pause,this)).on("mouseleave",e.proxy(this.cycle,this))};t.prototype={cycle:function(t){return t||(this.paused=!1),this.interval&&clearInterval(this.interval),this.options.interval&&!this.paused&&(this.interval=setInterval(e.proxy(this.next,this),this.options.interval)),this},getActiveIndex:function(){return this.$active=this.$element.find(".item.active"),this.$items=this.$active.parent().children(),this.$items.index(this.$active)},to:function(t){var n=this.getActiveIndex(),r=this;if(t>thi
s.$items.length-1||t<0)return;return this.sliding?this.$element.one("slid",function(){r.to(t)}):n==t?this.pause().cycle():this.slide(t>n?"next":"prev",e(this.$items[t]))},pause:function(t){return t||(this.paused=!0),this.$element.find(".next, .prev").length&&e.support.transition.end&&(this.$element.trigger(e.support.transition.end),this.cycle(!0)),clearInterval(this.interval),this.interval=null,this},next:function(){if(this.sliding)return;return this.slide("next")},prev:function(){if(this.sliding)return;return this.slide("prev")},slide:function(t,n){var r=this.$element.find(".item.active"),i=n||r[t](),s=this.interval,o=t=="next"?"left":"right",u=t=="next"?"first":"last",a=this,f;this.sliding=!0,s&&this.pause(),i=i.length?i:this.$element.find(".item")[u](),f=e.Event("slide",{relatedTarget:i[0],direction:o});if(i.hasClass("active"))return;this.$indicators.length&&(this.$indicators.find(".active").removeClass("active"),this.$element.one("slid",function(){var t=e(a.$indicators.children(
)[a.getActiveIndex()]);t&&t.addClass("active")}));if(e.support.transition&&this.$element.hasClass("slide")){this.$element.trigger(f);if(f.isDefaultPrevented())return;i.addClass(t),i[0].offsetWidth,r.addClass(o),i.addClass(o),this.$element.one(e.support.transition.end,function(){i.removeClass([t,o].join(" ")).addClass("active"),r.removeClass(["active",o].join(" ")),a.sliding=!1,setTimeout(function(){a.$element.trigger("slid")},0)})}else{this.$element.trigger(f);if(f.isDefaultPrevented())return;r.removeClass("active"),i.addClass("active"),this.sliding=!1,this.$element.trigger("slid")}return s&&this.cycle(),this}};var n=e.fn.carousel;e.fn.carousel=function(n){return this.each(function(){var r=e(this),i=r.data("carousel"),s=e.extend({},e.fn.carousel.defaults,typeof n=="object"&&n),o=typeof n=="string"?n:s.slide;i||r.data("carousel",i=new t(this,s)),typeof n=="number"?i.to(n):o?i[o]():s.interval&&i.pause().cycle()})},e.fn.carousel.defaults={interval:5e3,pause:"hover"},e.fn.carousel.Const
ructor=t,e.fn.carousel.noConflict=function(){return e.fn.carousel=n,this},e(document).on("click.carousel.data-api","[data-slide], [data-slide-to]",function(t){var n=e(this),r,i=e(n.attr("data-target")||(r=n.attr("href"))&&r.replace(/.*(?=#[^\s]+$)/,"")),s=e.extend({},i.data(),n.data()),o;i.carousel(s),(o=n.attr("data-slide-to"))&&i.data("carousel").pause().to(o).cycle(),t.preventDefault()})}(window.jQuery),!function(e){"use strict";var t=function(t,n){this.$element=e(t),this.options=e.extend({},e.fn.collapse.defaults,n),this.options.parent&&(this.$parent=e(this.options.parent)),this.options.toggle&&this.toggle()};t.prototype={constructor:t,dimension:function(){var e=this.$element.hasClass("width");return e?"width":"height"},show:function(){var t,n,r,i;if(this.transitioning||this.$element.hasClass("in"))return;t=this.dimension(),n=e.camelCase(["scroll",t].join("-")),r=this.$parent&&this.$parent.find("> .accordion-group > .in");if(r&&r.length){i=r.data("collapse");if(i&&i.transitionin
g)return;r.collapse("hide"),i||r.data("collapse",null)}this.$element[t](0),this.transition("addClass",e.Event("show"),"shown"),e.support.transition&&this.$element[t](this.$element[0][n])},hide:function(){var t;if(this.transitioning||!this.$element.hasClass("in"))return;t=this.dimension(),this.reset(this.$element[t]()),this.transition("removeClass",e.Event("hide"),"hidden"),this.$element[t](0)},reset:function(e){var t=this.dimension();return this.$element.removeClass("collapse")[t](e||"auto")[0].offsetWidth,this.$element[e!==null?"addClass":"removeClass"]("collapse"),this},transition:function(t,n,r){var i=this,s=function(){n.type=="show"&&i.reset(),i.transitioning=0,i.$element.trigger(r)};this.$element.trigger(n);if(n.isDefaultPrevented())return;this.transitioning=1,this.$element[t]("in"),e.support.transition&&this.$element.hasClass("collapse")?this.$element.one(e.support.transition.end,s):s()},toggle:function(){this[this.$element.hasClass("in")?"hide":"show"]()}};var n=e.fn.collapse
;e.fn.collapse=function(n){return this.each(function(){var r=e(this),i=r.data("collapse"),s=e.extend({},e.fn.collapse.defaults,r.data(),typeof n=="object"&&n);i||r.data("collapse",i=new t(this,s)),typeof n=="string"&&i[n]()})},e.fn.collapse.defaults={toggle:!0},e.fn.collapse.Constructor=t,e.fn.collapse.noConflict=function(){return e.fn.collapse=n,this},e(document).on("click.collapse.data-api","[data-toggle=collapse]",function(t){var n=e(this),r,i=n.attr("data-target")||t.preventDefault()||(r=n.attr("href"))&&r.replace(/.*(?=#[^\s]+$)/,""),s=e(i).data("collapse")?"toggle":n.data();n[e(i).hasClass("in")?"addClass":"removeClass"]("collapsed"),e(i).collapse(s)})}(window.jQuery),!function(e){"use strict";function r(){e(t).each(function(){i(e(this)).removeClass("open")})}function i(t){var n=t.attr("data-target"),r;n||(n=t.attr("href"),n=n&&/#/.test(n)&&n.replace(/.*(?=#[^\s]*$)/,"")),r=n&&e(n);if(!r||!r.length)r=t.parent();return r}var t="[data-toggle=dropdown]",n=function(t){var n=e(t).o
n("click.dropdown.data-api",this.toggle);e("html").on("click.dropdown.data-api",function(){n.parent().removeClass("open")})};n.prototype={constructor:n,toggle:function(t){var n=e(this),s,o;if(n.is(".disabled, :disabled"))return;return s=i(n),o=s.hasClass("open"),r(),o||s.toggleClass("open"),n.focus(),!1},keydown:function(n){var r,s,o,u,a,f;if(!/(38|40|27)/.test(n.keyCode))return;r=e(this),n.preventDefault(),n.stopPropagation();if(r.is(".disabled, :disabled"))return;u=i(r),a=u.hasClass("open");if(!a||a&&n.keyCode==27)return n.which==27&&u.find(t).focus(),r.click();s=e("[role=menu] li:not(.divider):visible a",u);if(!s.length)return;f=s.index(s.filter(":focus")),n.keyCode==38&&f>0&&f--,n.keyCode==40&&f<s.length-1&&f++,~f||(f=0),s.eq(f).focus()}};var s=e.fn.dropdown;e.fn.dropdown=function(t){return this.each(function(){var r=e(this),i=r.data("dropdown");i||r.data("dropdown",i=new n(this)),typeof t=="string"&&i[t].call(r)})},e.fn.dropdown.Constructor=n,e.fn.dropdown.noConflict=function()
{return e.fn.dropdown=s,this},e(document).on("click.dropdown.data-api",r).on("click.dropdown.data-api",".dropdown form",function(e){e.stopPropagation()}).on("click.dropdown-menu",function(e){e.stopPropagation()}).on("click.dropdown.data-api",t,n.prototype.toggle).on("keydown.dropdown.data-api",t+", [role=menu]",n.prototype.keydown)}(window.jQuery),!function(e){"use strict";var t=function(t,n){this.options=n,this.$element=e(t).delegate('[data-dismiss="modal"]',"click.dismiss.modal",e.proxy(this.hide,this)),this.options.remote&&this.$element.find(".modal-body").load(this.options.remote)};t.prototype={constructor:t,toggle:function(){return this[this.isShown?"hide":"show"]()},show:function(){var t=this,n=e.Event("show");this.$element.trigger(n);if(this.isShown||n.isDefaultPrevented())return;this.isShown=!0,this.escape(),this.backdrop(function(){var n=e.support.transition&&t.$element.hasClass("fade");t.$element.parent().length||t.$element.appendTo(document.body),t.$element.show(),n&&t.$e
lement[0].offsetWidth,t.$element.addClass("in").attr("aria-hidden",!1),t.enforceFocus(),n?t.$element.one(e.support.transition.end,function(){t.$element.focus().trigger("shown")}):t.$element.focus().trigger("shown")})},hide:function(t){t&&t.preventDefault();var n=this;t=e.Event("hide"),this.$element.trigger(t);if(!this.isShown||t.isDefaultPrevented())return;this.isShown=!1,this.escape(),e(document).off("focusin.modal"),this.$element.removeClass("in").attr("aria-hidden",!0),e.support.transition&&this.$element.hasClass("fade")?this.hideWithTransition():this.hideModal()},enforceFocus:function(){var t=this;e(document).on("focusin.modal",function(e){t.$element[0]!==e.target&&!t.$element.has(e.target).length&&t.$element.focus()})},escape:function(){var e=this;this.isShown&&this.options.keyboard?this.$element.on("keyup.dismiss.modal",function(t){t.which==27&&e.hide()}):this.isShown||this.$element.off("keyup.dismiss.modal")},hideWithTransition:function(){var t=this,n=setTimeout(function(){t.
$element.off(e.support.transition.end),t.hideModal()},500);this.$element.one(e.support.transition.end,function(){clearTimeout(n),t.hideModal()})},hideModal:function(){var e=this;this.$element.hide(),this.backdrop(function(){e.removeBackdrop(),e.$element.trigger("hidden")})},removeBackdrop:function(){this.$backdrop&&this.$backdrop.remove(),this.$backdrop=null},backdrop:function(t){var n=this,r=this.$element.hasClass("fade")?"fade":"";if(this.isShown&&this.options.backdrop){var i=e.support.transition&&r;this.$backdrop=e('<div class="modal-backdrop '+r+'" />').appendTo(document.body),this.$backdrop.click(this.options.backdrop=="static"?e.proxy(this.$element[0].focus,this.$element[0]):e.proxy(this.hide,this)),i&&this.$backdrop[0].offsetWidth,this.$backdrop.addClass("in");if(!t)return;i?this.$backdrop.one(e.support.transition.end,t):t()}else!this.isShown&&this.$backdrop?(this.$backdrop.removeClass("in"),e.support.transition&&this.$element.hasClass("fade")?this.$backdrop.one(e.support.tra
nsition.end,t):t()):t&&t()}};var n=e.fn.modal;e.fn.modal=function(n){return this.each(function(){var r=e(this),i=r.data("modal"),s=e.extend({},e.fn.modal.defaults,r.data(),typeof n=="object"&&n);i||r.data("modal",i=new t(this,s)),typeof n=="string"?i[n]():s.show&&i.show()})},e.fn.modal.defaults={backdrop:!0,keyboard:!0,show:!0},e.fn.modal.Constructor=t,e.fn.modal.noConflict=function(){return e.fn.modal=n,this},e(document).on("click.modal.data-api",'[data-toggle="modal"]',function(t){var n=e(this),r=n.attr("href"),i=e(n.attr("data-target")||r&&r.replace(/.*(?=#[^\s]+$)/,"")),s=i.data("modal")?"toggle":e.extend({remote:!/#/.test(r)&&r},i.data(),n.data());t.preventDefault(),i.modal(s).one("hide",function(){n.focus()})})}(window.jQuery),!function(e){"use strict";var t=function(e,t){this.init("tooltip",e,t)};t.prototype={constructor:t,init:function(t,n,r){var i,s,o,u,a;this.type=t,this.$element=e(n),this.options=this.getOptions(r),this.enabled=!0,o=this.options.trigger.split(" ");for(a=o
.length;a--;)u=o[a],u=="click"?this.$element.on("click."+this.type,this.options.selector,e.proxy(this.toggle,this)):u!="manual"&&(i=u=="hover"?"mouseenter":"focus",s=u=="hover"?"mouseleave":"blur",this.$element.on(i+"."+this.type,this.options.selector,e.proxy(this.enter,this)),this.$element.on(s+"."+this.type,this.options.selector,e.proxy(this.leave,this)));this.options.selector?this._options=e.extend({},this.options,{trigger:"manual",selector:""}):this.fixTitle()},getOptions:function(t){return t=e.extend({},e.fn[this.type].defaults,this.$element.data(),t),t.delay&&typeof t.delay=="number"&&(t.delay={show:t.delay,hide:t.delay}),t},enter:function(t){var n=e.fn[this.type].defaults,r={},i;this._options&&e.each(this._options,function(e,t){n[e]!=t&&(r[e]=t)},this),i=e(t.currentTarget)[this.type](r).data(this.type);if(!i.options.delay||!i.options.delay.show)return i.show();clearTimeout(this.timeout),i.hoverState="in",this.timeout=setTimeout(function(){i.hoverState=="in"&&i.show()},i.optio
ns.delay.show)},leave:function(t){var n=e(t.currentTarget)[this.type](this._options).data(this.type);this.timeout&&clearTimeout(this.timeout);if(!n.options.delay||!n.options.delay.hide)return n.hide();n.hoverState="out",this.timeout=setTimeout(function(){n.hoverState=="out"&&n.hide()},n.options.delay.hide)},show:function(){var t,n,r,i,s,o,u=e.Event("show");if(this.hasContent()&&this.enabled){this.$element.trigger(u);if(u.isDefaultPrevented())return;t=this.tip(),this.setContent(),this.options.animation&&t.addClass("fade"),s=typeof this.options.placement=="function"?this.options.placement.call(this,t[0],this.$element[0]):this.options.placement,t.detach().css({top:0,left:0,display:"block"}),this.options.container?t.appendTo(this.options.container):t.insertAfter(this.$element),n=this.getPosition(),r=t[0].offsetWidth,i=t[0].offsetHeight;switch(s){case"bottom":o={top:n.top+n.height,left:n.left+n.width/2-r/2};break;case"top":o={top:n.top-i,left:n.left+n.width/2-r/2};break;case"left":o={top
:n.top+n.height/2-i/2,left:n.left-r};break;case"right":o={top:n.top+n.height/2-i/2,left:n.left+n.width}}this.applyPlacement(o,s),this.$element.trigger("shown")}},applyPlacement:function(e,t){var n=this.tip(),r=n[0].offsetWidth,i=n[0].offsetHeight,s,o,u,a;n.offset(e).addClass(t).addClass("in"),s=n[0].offsetWidth,o=n[0].offsetHeight,t=="top"&&o!=i&&(e.top=e.top+i-o,a=!0),t=="bottom"||t=="top"?(u=0,e.left<0&&(u=e.left*-2,e.left=0,n.offset(e),s=n[0].offsetWidth,o=n[0].offsetHeight),this.replaceArrow(u-r+s,s,"left")):this.replaceArrow(o-i,o,"top"),a&&n.offset(e)},replaceArrow:function(e,t,n){this.arrow().css(n,e?50*(1-e/t)+"%":"")},setContent:function(){var e=this.tip(),t=this.getTitle();e.find(".tooltip-inner")[this.options.html?"html":"text"](t),e.removeClass("fade in top bottom left right")},hide:function(){function i(){var t=setTimeout(function(){n.off(e.support.transition.end).detach()},500);n.one(e.support.transition.end,function(){clearTimeout(t),n.detach()})}var t=this,n=this.tip
(),r=e.Event("hide");this.$element.trigger(r);if(r.isDefaultPrevented())return;return n.removeClass("in"),e.support.transition&&this.$tip.hasClass("fade")?i():n.detach(),this.$element.trigger("hidden"),this},fixTitle:function(){var e=this.$element;(e.attr("title")||typeof e.attr("data-original-title")!="string")&&e.attr("data-original-title",e.attr("title")||"").attr("title","")},hasContent:function(){return this.getTitle()},getPosition:function(){var t=this.$element[0];return e.extend({},typeof t.getBoundingClientRect=="function"?t.getBoundingClientRect():{width:t.offsetWidth,height:t.offsetHeight},this.$element.offset())},getTitle:function(){var e,t=this.$element,n=this.options;return e=t.attr("data-original-title")||(typeof n.title=="function"?n.title.call(t[0]):n.title),e},tip:function(){return this.$tip=this.$tip||e(this.options.template)},arrow:function(){return this.$arrow=this.$arrow||this.tip().find(".tooltip-arrow")},validate:function(){this.$element[0].parentNode||(this.h
ide(),this.$element=null,this.options=null)},enable:function(){this.enabled=!0},disable:function(){this.enabled=!1},toggleEnabled:function(){this.enabled=!this.enabled},toggle:function(t){var n=t?e(t.currentTarget)[this.type](this._options).data(this.type):this;n.tip().hasClass("in")?n.hide():n.show()},destroy:function(){this.hide().$element.off("."+this.type).removeData(this.type)}};var n=e.fn.tooltip;e.fn.tooltip=function(n){return this.each(function(){var r=e(this),i=r.data("tooltip"),s=typeof n=="object"&&n;i||r.data("tooltip",i=new t(this,s)),typeof n=="string"&&i[n]()})},e.fn.tooltip.Constructor=t,e.fn.tooltip.defaults={animation:!0,placement:"top",selector:!1,template:'<div class="tooltip"><div class="tooltip-arrow"></div><div class="tooltip-inner"></div></div>',trigger:"hover focus",title:"",delay:0,html:!1,container:!1},e.fn.tooltip.noConflict=function(){return e.fn.tooltip=n,this}}(window.jQuery),!function(e){"use strict";var t=function(e,t){this.init("popover",e,t)};t.pro
totype=e.extend({},e.fn.tooltip.Constructor.prototype,{constructor:t,setContent:function(){var e=this.tip(),t=this.getTitle(),n=this.getContent();e.find(".popover-title")[this.options.html?"html":"text"](t),e.find(".popover-content")[this.options.html?"html":"text"](n),e.removeClass("fade top bottom left right in")},hasContent:function(){return this.getTitle()||this.getContent()},getContent:function(){var e,t=this.$element,n=this.options;return e=(typeof n.content=="function"?n.content.call(t[0]):n.content)||t.attr("data-content"),e},tip:function(){return this.$tip||(this.$tip=e(this.options.template)),this.$tip},destroy:function(){this.hide().$element.off("."+this.type).removeData(this.type)}});var n=e.fn.popover;e.fn.popover=function(n){return this.each(function(){var r=e(this),i=r.data("popover"),s=typeof n=="object"&&n;i||r.data("popover",i=new t(this,s)),typeof n=="string"&&i[n]()})},e.fn.popover.Constructor=t,e.fn.popover.defaults=e.extend({},e.fn.tooltip.defaults,{placement:"
right",trigger:"click",content:"",template:'<div class="popover"><div class="arrow"></div><h3 class="popover-title"></h3><div class="popover-content"></div></div>'}),e.fn.popover.noConflict=function(){return e.fn.popover=n,this}}(window.jQuery),!function(e){"use strict";function t(t,n){var r=e.proxy(this.process,this),i=e(t).is("body")?e(window):e(t),s;this.options=e.extend({},e.fn.scrollspy.defaults,n),this.$scrollElement=i.on("scroll.scroll-spy.data-api",r),this.selector=(this.options.target||(s=e(t).attr("href"))&&s.replace(/.*(?=#[^\s]+$)/,"")||"")+" .nav li > a",this.$body=e("body"),this.refresh(),this.process()}t.prototype={constructor:t,refresh:function(){var t=this,n;this.offsets=e([]),this.targets=e([]),n=this.$body.find(this.selector).map(function(){var n=e(this),r=n.data("target")||n.attr("href"),i=/^#\w/.test(r)&&e(r);return i&&i.length&&[[i.position().top+(!e.isWindow(t.$scrollElement.get(0))&&t.$scrollElement.scrollTop()),r]]||null}).sort(function(e,t){return e[0]-t[0]
}).each(function(){t.offsets.push(this[0]),t.targets.push(this[1])})},process:function(){var e=this.$scrollElement.scrollTop()+this.options.offset,t=this.$scrollElement[0].scrollHeight||this.$body[0].scrollHeight,n=t-this.$scrollElement.height(),r=this.offsets,i=this.targets,s=this.activeTarget,o;if(e>=n)return s!=(o=i.last()[0])&&this.activate(o);for(o=r.length;o--;)s!=i[o]&&e>=r[o]&&(!r[o+1]||e<=r[o+1])&&this.activate(i[o])},activate:function(t){var n,r;this.activeTarget=t,e(this.selector).parent(".active").removeClass("active"),r=this.selector+'[data-target="'+t+'"],'+this.selector+'[href="'+t+'"]',n=e(r).parent("li").addClass("active"),n.parent(".dropdown-menu").length&&(n=n.closest("li.dropdown").addClass("active")),n.trigger("activate")}};var n=e.fn.scrollspy;e.fn.scrollspy=function(n){return this.each(function(){var r=e(this),i=r.data("scrollspy"),s=typeof n=="object"&&n;i||r.data("scrollspy",i=new t(this,s)),typeof n=="string"&&i[n]()})},e.fn.scrollspy.Constructor=t,e.fn.scr
ollspy.defaults={offset:10},e.fn.scrollspy.noConflict=function(){return e.fn.scrollspy=n,this},e(window).on("load",function(){e('[data-spy="scroll"]').each(function(){var t=e(this);t.scrollspy(t.data())})})}(window.jQuery),!function(e){"use strict";var t=function(t){this.element=e(t)};t.prototype={constructor:t,show:function(){var t=this.element,n=t.closest("ul:not(.dropdown-menu)"),r=t.attr("data-target"),i,s,o;r||(r=t.attr("href"),r=r&&r.replace(/.*(?=#[^\s]*$)/,""));if(t.parent("li").hasClass("active"))return;i=n.find(".active:last a")[0],o=e.Event("show",{relatedTarget:i}),t.trigger(o);if(o.isDefaultPrevented())return;s=e(r),this.activate(t.parent("li"),n),this.activate(s,s.parent(),function(){t.trigger({type:"shown",relatedTarget:i})})},activate:function(t,n,r){function o(){i.removeClass("active").find("> .dropdown-menu > .active").removeClass("active"),t.addClass("active"),s?(t[0].offsetWidth,t.addClass("in")):t.removeClass("fade"),t.parent(".dropdown-menu")&&t.closest("li.dro
pdown").addClass("active"),r&&r()}var i=n.find("> .active"),s=r&&e.support.transition&&i.hasClass("fade");s?i.one(e.support.transition.end,o):o(),i.removeClass("in")}};var n=e.fn.tab;e.fn.tab=function(n){return this.each(function(){var r=e(this),i=r.data("tab");i||r.data("tab",i=new t(this)),typeof n=="string"&&i[n]()})},e.fn.tab.Constructor=t,e.fn.tab.noConflict=function(){return e.fn.tab=n,this},e(document).on("click.tab.data-api",'[data-toggle="tab"], [data-toggle="pill"]',function(t){t.preventDefault(),e(this).tab("show")})}(window.jQuery),!function(e){"use strict";var t=function(t,n){this.$element=e(t),this.options=e.extend({},e.fn.typeahead.defaults,n),this.matcher=this.options.matcher||this.matcher,this.sorter=this.options.sorter||this.sorter,this.highlighter=this.options.highlighter||this.highlighter,this.updater=this.options.updater||this.updater,this.source=this.options.source,this.$menu=e(this.options.menu),this.shown=!1,this.listen()};t.prototype={constructor:t,select:fu
nction(){var e=this.$menu.find(".active").attr("data-value");return this.$element.val(this.updater(e)).change(),this.hide()},updater:function(e){return e},show:function(){var t=e.extend({},this.$element.position(),{height:this.$element[0].offsetHeight});return this.$menu.insertAfter(this.$element).css({top:t.top+t.height,left:t.left}).show(),this.shown=!0,this},hide:function(){return this.$menu.hide(),this.shown=!1,this},lookup:function(t){var n;return this.query=this.$element.val(),!this.query||this.query.length<this.options.minLength?this.shown?this.hide():this:(n=e.isFunction(this.source)?this.source(this.query,e.proxy(this.process,this)):this.source,n?this.process(n):this)},process:function(t){var n=this;return t=e.grep(t,function(e){return n.matcher(e)}),t=this.sorter(t),t.length?this.render(t.slice(0,this.options.items)).show():this.shown?this.hide():this},matcher:function(e){return~e.toLowerCase().indexOf(this.query.toLowerCase())},sorter:function(e){var t=[],n=[],r=[],i;whil
e(i=e.shift())i.toLowerCase().indexOf(this.query.toLowerCase())?~i.indexOf(this.query)?n.push(i):r.push(i):t.push(i);return t.concat(n,r)},highlighter:function(e){var t=this.query.replace(/[\-\[\]{}()*+?.,\\\^$|#\s]/g,"\\$&");return e.replace(new RegExp("("+t+")","ig"),function(e,t){return"<strong>"+t+"</strong>"})},render:function(t){var n=this;return t=e(t).map(function(t,r){return t=e(n.options.item).attr("data-value",r),t.find("a").html(n.highlighter(r)),t[0]}),t.first().addClass("active"),this.$menu.html(t),this},next:function(t){var n=this.$menu.find(".active").removeClass("active"),r=n.next();r.length||(r=e(this.$menu.find("li")[0])),r.addClass("active")},prev:function(e){var t=this.$menu.find(".active").removeClass("active"),n=t.prev();n.length||(n=this.$menu.find("li").last()),n.addClass("active")},listen:function(){this.$element.on("focus",e.proxy(this.focus,this)).on("blur",e.proxy(this.blur,this)).on("keypress",e.proxy(this.keypress,this)).on("keyup",e.proxy(this.keyup,t
his)),this.eventSupported("keydown")&&this.$element.on("keydown",e.proxy(this.keydown,this)),this.$menu.on("click",e.proxy(this.click,this)).on("mouseenter","li",e.proxy(this.mouseenter,this)).on("mouseleave","li",e.proxy(this.mouseleave,this))},eventSupported:function(e){var t=e in this.$element;return t||(this.$element.setAttribute(e,"return;"),t=typeof this.$element[e]=="function"),t},move:function(e){if(!this.shown)return;switch(e.keyCode){case 9:case 13:case 27:e.preventDefault();break;case 38:e.preventDefault(),this.prev();break;case 40:e.preventDefault(),this.next()}e.stopPropagation()},keydown:function(t){this.suppressKeyPressRepeat=~e.inArray(t.keyCode,[40,38,9,13,27]),this.move(t)},keypress:function(e){if(this.suppressKeyPressRepeat)return;this.move(e)},keyup:function(e){switch(e.keyCode){case 40:case 38:case 16:case 17:case 18:break;case 9:case 13:if(!this.shown)return;this.select();break;case 27:if(!this.shown)return;this.hide();break;default:this.lookup()}e.stopPropagat
ion(),e.preventDefault()},focus:function(e){this.focused=!0},blur:function(e){this.focused=!1,!this.mousedover&&this.shown&&this.hide()},click:function(e){e.stopPropagation(),e.preventDefault(),this.select(),this.$element.focus()},mouseenter:function(t){this.mousedover=!0,this.$menu.find(".active").removeClass("active"),e(t.currentTarget).addClass("active")},mouseleave:function(e){this.mousedover=!1,!this.focused&&this.shown&&this.hide()}};var n=e.fn.typeahead;e.fn.typeahead=function(n){return this.each(function(){var r=e(this),i=r.data("typeahead"),s=typeof n=="object"&&n;i||r.data("typeahead",i=new t(this,s)),typeof n=="string"&&i[n]()})},e.fn.typeahead.defaults={source:[],items:8,menu:'<ul class="typeahead dropdown-menu"></ul>',item:'<li><a href="#"></a></li>',minLength:1},e.fn.typeahead.Constructor=t,e.fn.typeahead.noConflict=function(){return e.fn.typeahead=n,this},e(document).on("focus.typeahead.data-api",'[data-provide="typeahead"]',function(t){var n=e(this);if(n.data("typeah
ead"))return;n.typeahead(n.data())})}(window.jQuery),!function(e){"use strict";var t=function(t,n){this.options=e.extend({},e.fn.affix.defaults,n),this.$window=e(window).on("scroll.affix.data-api",e.proxy(this.checkPosition,this)).on("click.affix.data-api",e.proxy(function(){setTimeout(e.proxy(this.checkPosition,this),1)},this)),this.$element=e(t),this.checkPosition()};t.prototype.checkPosition=function(){if(!this.$element.is(":visible"))return;var t=e(document).height(),n=this.$window.scrollTop(),r=this.$element.offset(),i=this.options.offset,s=i.bottom,o=i.top,u="affix affix-top affix-bottom",a;typeof i!="object"&&(s=o=i),typeof o=="function"&&(o=i.top()),typeof s=="function"&&(s=i.bottom()),a=this.unpin!=null&&n+this.unpin<=r.top?!1:s!=null&&r.top+this.$element.height()>=t-s?"bottom":o!=null&&n<=o?"top":!1;if(this.affixed===a)return;this.affixed=a,this.unpin=a=="bottom"?r.top-n:null,this.$element.removeClass(u).addClass("affix"+(a?"-"+a:""))};var n=e.fn.affix;e.fn.affix=function(
n){return this.each(function(){var r=e(this),i=r.data("affix"),s=typeof n=="object"&&n;i||r.data("affix",i=new t(this,s)),typeof n=="string"&&i[n]()})},e.fn.affix.Constructor=t,e.fn.affix.defaults={offset:0},e.fn.affix.noConflict=function(){return e.fn.affix=n,this},e(window).on("load",function(){e('[data-spy="affix"]').each(function(){var t=e(this),n=t.data();n.offset=n.offset||{},n.offsetBottom&&(n.offset.bottom=n.offsetBottom),n.offsetTop&&(n.offset.top=n.offsetTop),t.affix(n)})})}(window.jQuery);
http://git-wip-us.apache.org/repos/asf/incubator-samza/blob/5ff71e51/samza-yarn/src/main/scala/org/apache/samza/config/YarnConfig.scala
----------------------------------------------------------------------
diff --git a/samza-yarn/src/main/scala/org/apache/samza/config/YarnConfig.scala b/samza-yarn/src/main/scala/org/apache/samza/config/YarnConfig.scala
new file mode 100644
index 0000000..4779edb
--- /dev/null
+++ b/samza-yarn/src/main/scala/org/apache/samza/config/YarnConfig.scala
@@ -0,0 +1,78 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.samza.config
+
+object YarnConfig {
+ // environment variables
+ val ENV_CONFIG = "STREAMING_CONFIG"
+ val ENV_PARTITION_ID = "PARTITION_ID"
+ val ENV_CONTAINER_NAME = "CONTAINER_NAME"
+
+ // yarn job config
+ val PACKAGE_PATH = "yarn.package.path"
+ val CONTAINER_MAX_MEMORY_MB = "yarn.container.memory.mb"
+ val CONTAINER_MAX_CPU_CORES = "yarn.container.cpu.cores"
+ val CONTAINER_RETRY_COUNT = "yarn.countainer.retry.count"
+ val CONTAINER_RETRY_WINDOW_MS = "yarn.container.retry.window.ms"
+ val TASK_COUNT = "yarn.container.count"
+ val AM_JVM_OPTIONS = "yarn.am.opts"
+ val AM_JMX_ENABLED = "yarn.am.jmx.enabled"
+ val AM_CONTAINER_MAX_MEMORY_MB = "yarn.am.container.memory.mb"
+
+ implicit def Config2Yarn(config: Config) = new YarnConfig(config)
+}
+
+class YarnConfig(config: Config) extends ScalaMapConfig(config) {
+ def getContainerMaxMemoryMb: Option[Int] = getOption(YarnConfig.CONTAINER_MAX_MEMORY_MB) match {
+ case Some(mem) => Some(mem.toInt)
+ case _ => None
+ }
+
+ def getContainerMaxCpuCores: Option[Int] = getOption(YarnConfig.CONTAINER_MAX_CPU_CORES) match {
+ case Some(cpu) => Some(cpu.toInt)
+ case _ => None
+ }
+
+ def getContainerRetryCount: Option[Int] = getOption(YarnConfig.CONTAINER_RETRY_COUNT) match {
+ case Some(count) => Some(count.toInt)
+ case _ => None
+ }
+
+ def getContainerRetryWindowMs: Option[Int] = getOption(YarnConfig.CONTAINER_RETRY_WINDOW_MS) match {
+ case Some(retryWindowMs) => Some(retryWindowMs.toInt)
+ case _ => None
+ }
+
+ def getPackagePath = getOption(YarnConfig.PACKAGE_PATH)
+
+ def getTaskCount: Option[Int] = getOption(YarnConfig.TASK_COUNT) match {
+ case Some(tc) => Some(tc.toInt)
+ case _ => None
+ }
+
+ def getAmOpts = getOption(YarnConfig.AM_JVM_OPTIONS)
+
+ def getAMContainerMaxMemoryMb: Option[Int] = getOption(YarnConfig.AM_CONTAINER_MAX_MEMORY_MB) match {
+ case Some(mem) => Some(mem.toInt)
+ case _ => None
+ }
+
+ def getJmxServerEnabled = getBoolean(YarnConfig.AM_JMX_ENABLED, true)
+}
http://git-wip-us.apache.org/repos/asf/incubator-samza/blob/5ff71e51/samza-yarn/src/main/scala/org/apache/samza/job/yarn/ClientHelper.scala
----------------------------------------------------------------------
diff --git a/samza-yarn/src/main/scala/org/apache/samza/job/yarn/ClientHelper.scala b/samza-yarn/src/main/scala/org/apache/samza/job/yarn/ClientHelper.scala
new file mode 100644
index 0000000..b2b529e
--- /dev/null
+++ b/samza-yarn/src/main/scala/org/apache/samza/job/yarn/ClientHelper.scala
@@ -0,0 +1,201 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.samza.job.yarn
+
+import java.util.Collections
+
+import scala.collection.JavaConversions._
+import scala.collection.Map
+
+import org.apache.hadoop.conf.Configuration
+import org.apache.hadoop.fs.Path
+import org.apache.hadoop.net.NetUtils
+import org.apache.hadoop.security.UserGroupInformation
+import org.apache.hadoop.yarn.api.protocolrecords.GetAllApplicationsRequest
+import org.apache.hadoop.yarn.api.protocolrecords.GetApplicationReportRequest
+import org.apache.hadoop.yarn.api.protocolrecords.GetNewApplicationRequest
+import org.apache.hadoop.yarn.api.protocolrecords.KillApplicationRequest
+import org.apache.hadoop.yarn.api.protocolrecords.SubmitApplicationRequest
+import org.apache.hadoop.yarn.api.records.ApplicationSubmissionContext
+import org.apache.hadoop.yarn.api.records.ContainerLaunchContext
+import org.apache.hadoop.yarn.api.records.LocalResource
+import org.apache.hadoop.yarn.api.records.Resource
+import org.apache.hadoop.yarn.api.records.ApplicationId
+import org.apache.hadoop.yarn.api.records.ApplicationReport
+import org.apache.hadoop.yarn.api.records.FinalApplicationStatus
+import org.apache.hadoop.yarn.api.records.LocalResourceType
+import org.apache.hadoop.yarn.api.records.LocalResourceVisibility
+import org.apache.hadoop.yarn.api.records.YarnApplicationState
+import org.apache.hadoop.yarn.api.ClientRMProtocol
+import org.apache.hadoop.yarn.conf.YarnConfiguration
+import org.apache.hadoop.yarn.ipc.YarnRPC
+import org.apache.hadoop.yarn.util.ConverterUtils
+import org.apache.hadoop.yarn.util.Records
+
+import org.apache.samza.job.ApplicationStatus
+import org.apache.samza.job.ApplicationStatus.New
+import org.apache.samza.job.ApplicationStatus.Running
+import org.apache.samza.job.ApplicationStatus.SuccessfulFinish
+import org.apache.samza.job.ApplicationStatus.UnsuccessfulFinish
+
+import grizzled.slf4j.Logging
+import org.apache.samza.SamzaException
+
+/**
+ * Client helper class required to submit an application master start script to the resource manager. Also
+ * allows us to forcefully shut-down the application master which in-turn will shut-down the corresponding
+ * container and its processes.
+ */
+class ClientHelper(conf: Configuration) extends Logging {
+ val rpc = YarnRPC.create(conf)
+ val rmAddress = NetUtils.createSocketAddr(conf.get(YarnConfiguration.RM_ADDRESS, YarnConfiguration.DEFAULT_RM_ADDRESS))
+ info("trying to connect to RM %s" format rmAddress)
+ val applicationsManager = rpc.getProxy(classOf[ClientRMProtocol], rmAddress, conf).asInstanceOf[ClientRMProtocol]
+ var appId: Option[ApplicationId] = None
+
+ /**
+ * Generate an application and submit it to the resource manager to start an application master
+ */
+ def submitApplication(packagePath: Path, memoryMb: Int, cpuCore: Int, user: UserGroupInformation, cmds: List[String], env: Option[Map[String, String]], name: Option[String]): Option[ApplicationId] = {
+ val newAppRequest = Records.newRecord(classOf[GetNewApplicationRequest])
+ val newAppResponse = applicationsManager.getNewApplication(newAppRequest)
+ var mem = memoryMb
+ var cpu = cpuCore
+
+ // If we are asking for memory less than the minimum required, bump it
+ if (mem < newAppResponse.getMinimumResourceCapability().getMemory()) {
+ val min = newAppResponse.getMinimumResourceCapability().getMemory()
+ warn("requesting %s megs of memory, which is less than minimum capability of %s, so using minimum" format (mem, min))
+ mem = min
+ }
+
+ // If we are asking for memory more than the max allowed, shout out
+ if (mem > newAppResponse.getMaximumResourceCapability().getMemory()) {
+ throw new SamzaException("You're asking for more memory (%s) than is allowed by YARN: %s" format
+ (mem, newAppResponse.getMaximumResourceCapability().getMemory()))
+ }
+
+ // if we are asking for cpu less than the minimum required, bump it
+ if (cpu < newAppResponse.getMinimumResourceCapability().getVirtualCores()) {
+ val min = newAppResponse.getMinimumResourceCapability.getVirtualCores()
+ warn("requesting %s virtual cores of cpu, which is less than minimum capability of %s, so using minimum" format (cpu, min))
+ cpu = min
+ }
+
+ // If we are asking for cpu more than the max allowed, shout out
+ if (cpu > newAppResponse.getMaximumResourceCapability().getVirtualCores()) {
+ throw new SamzaException("You're asking for more CPU (%s) than is allowed by YARN: %s" format
+ (cpu, newAppResponse.getMaximumResourceCapability().getVirtualCores()))
+ }
+
+ appId = Some(newAppResponse.getApplicationId)
+
+ info("preparing to request resources for app id %s" format appId.get)
+
+ val appCtx = Records.newRecord(classOf[ApplicationSubmissionContext])
+ val containerCtx = Records.newRecord(classOf[ContainerLaunchContext])
+ val resource = Records.newRecord(classOf[Resource])
+ val submitAppRequest = Records.newRecord(classOf[SubmitApplicationRequest])
+ val packageResource = Records.newRecord(classOf[LocalResource])
+
+ name match {
+ case Some(name) => { appCtx.setApplicationName(name) }
+ case None => { appCtx.setApplicationName(appId.toString) }
+ }
+
+ env match {
+ case Some(env) => {
+ containerCtx.setEnvironment(env)
+ info("set environment variables to %s for %s" format (env, appId.get))
+ }
+ case None => None
+ }
+
+ // set the local package so that the containers and app master are provisioned with it
+ val packageUrl = ConverterUtils.getYarnUrlFromPath(packagePath)
+ val fileStatus = packagePath.getFileSystem(conf).getFileStatus(packagePath)
+
+ packageResource.setResource(packageUrl)
+ info("set package url to %s for %s" format (packageUrl, appId.get))
+ packageResource.setSize(fileStatus.getLen)
+ info("set package size to %s for %s" format (fileStatus.getLen, appId.get))
+ packageResource.setTimestamp(fileStatus.getModificationTime)
+ packageResource.setType(LocalResourceType.ARCHIVE)
+ packageResource.setVisibility(LocalResourceVisibility.APPLICATION)
+
+ resource.setMemory(mem)
+ info("set memory request to %s for %s" format (mem, appId.get))
+ resource.setVirtualCores(cpu)
+ info("set cpu core request to %s for %s" format (cpu, appId.get))
+ containerCtx.setResource(resource)
+ containerCtx.setCommands(cmds.toList)
+ info("set command to %s for %s" format (cmds, appId.get))
+ containerCtx.setLocalResources(Collections.singletonMap("__package", packageResource))
+ appCtx.setApplicationId(appId.get)
+ info("set app ID to %s" format (user, appId.get))
+ appCtx.setUser(user.getShortUserName)
+ info("set user to %s for %s" format (user, appId.get))
+ appCtx.setAMContainerSpec(containerCtx)
+ submitAppRequest.setApplicationSubmissionContext(appCtx)
+ info("submitting application request for %s" format appId.get)
+ applicationsManager.submitApplication(submitAppRequest)
+ appId
+ }
+
+ def status(appId: ApplicationId): Option[ApplicationStatus] = {
+ val statusRequest = Records.newRecord(classOf[GetApplicationReportRequest])
+ statusRequest.setApplicationId(appId)
+ val statusResponse = applicationsManager.getApplicationReport(statusRequest)
+ convertState(statusResponse.getApplicationReport)
+ }
+
+ def kill(appId: ApplicationId) {
+ val killRequest = Records.newRecord(classOf[KillApplicationRequest])
+ killRequest.setApplicationId(appId)
+ applicationsManager.forceKillApplication(killRequest)
+ }
+
+ def getApplicationMaster(appId: ApplicationId): Option[ApplicationReport] = {
+ val getAppsReq = Records.newRecord(classOf[GetAllApplicationsRequest])
+ val getAppsRsp = applicationsManager.getAllApplications(getAppsReq)
+
+ getAppsRsp.getApplicationList.filter(appRep => appId.equals(appRep.getApplicationId())).headOption
+ }
+
+ def getApplicationMasters(status: Option[ApplicationStatus]): List[ApplicationReport] = {
+ val getAppsReq = Records.newRecord(classOf[GetAllApplicationsRequest])
+ val getAppsRsp = applicationsManager.getAllApplications(getAppsReq)
+
+ status match {
+ case Some(status) => getAppsRsp.getApplicationList
+ .filter(appRep => status.equals(convertState(appRep).get)).toList
+ case None => getAppsRsp.getApplicationList.toList
+ }
+ }
+
+ private def convertState(appReport: ApplicationReport): Option[ApplicationStatus] = {
+ (appReport.getYarnApplicationState(), appReport.getFinalApplicationStatus()) match {
+ case (YarnApplicationState.FINISHED, FinalApplicationStatus.SUCCEEDED) => Some(SuccessfulFinish)
+ case (YarnApplicationState.KILLED, _) | (YarnApplicationState.FAILED, _) => Some(UnsuccessfulFinish)
+ case (YarnApplicationState.NEW, _) | (YarnApplicationState.SUBMITTED, _) => Some(New)
+ case _ => Some(Running)
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-samza/blob/5ff71e51/samza-yarn/src/main/scala/org/apache/samza/job/yarn/SamzaAppMaster.scala
----------------------------------------------------------------------
diff --git a/samza-yarn/src/main/scala/org/apache/samza/job/yarn/SamzaAppMaster.scala b/samza-yarn/src/main/scala/org/apache/samza/job/yarn/SamzaAppMaster.scala
new file mode 100644
index 0000000..7f830f2
--- /dev/null
+++ b/samza-yarn/src/main/scala/org/apache/samza/job/yarn/SamzaAppMaster.scala
@@ -0,0 +1,86 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.samza.job.yarn
+import org.apache.samza.config.MapConfig
+import org.apache.samza.config.YarnConfig
+import org.apache.samza.config.serializers.JsonConfigSerializer
+import org.apache.hadoop.yarn.api.ApplicationConstants
+import org.apache.hadoop.yarn.conf.YarnConfiguration
+import org.apache.hadoop.yarn.util.ConverterUtils
+import scala.collection.JavaConversions._
+import org.apache.samza.metrics.{ JmxServer, MetricsRegistryMap }
+import grizzled.slf4j.Logging
+import org.apache.hadoop.yarn.client.AMRMClientImpl
+import org.apache.samza.config.YarnConfig._
+import org.apache.samza.job.yarn.SamzaAppMasterTaskManager._
+
+/**
+ * When YARN executes an application master, it needs a bash command to
+ * execute. For Samza, YARN will execute this main method when starting Samza's
+ * YARN application master.
+ *
+ * <br/><br/>
+ *
+ * The main method gets all of the environment variables (passed by Samza's
+ * YARN client, and YARN itself), and wires up everything to run Samza's
+ * application master.
+ */
+object SamzaAppMaster extends Logging {
+ def main(args: Array[String]) {
+ val containerIdStr = System.getenv(ApplicationConstants.AM_CONTAINER_ID_ENV)
+ info("got container id: %s" format containerIdStr)
+ val containerId = ConverterUtils.toContainerId(containerIdStr)
+ val applicationAttemptId = containerId.getApplicationAttemptId
+ info("got app attempt id: %s" format applicationAttemptId)
+ val nodeHostString = System.getenv(ApplicationConstants.NM_HOST_ENV)
+ info("got node manager host: %s" format nodeHostString)
+ val nodePortString = System.getenv(ApplicationConstants.NM_PORT_ENV)
+ info("got node manager port: %s" format nodePortString)
+ val nodeHttpPortString = System.getenv(ApplicationConstants.NM_HTTP_PORT_ENV)
+ info("got node manager http port: %s" format nodeHttpPortString)
+ val config = new MapConfig(JsonConfigSerializer.fromJson(System.getenv(YarnConfig.ENV_CONFIG)))
+ info("got config: %s" format config)
+ val hConfig = new YarnConfiguration
+ hConfig.set("fs.http.impl", "samza.util.hadoop.HttpFileSystem")
+ val amClient = new AMRMClientImpl(applicationAttemptId)
+ val clientHelper = new ClientHelper(hConfig)
+ val registry = new MetricsRegistryMap
+ val containerMem = config.getContainerMaxMemoryMb.getOrElse(DEFAULT_CONTAINER_MEM)
+ val containerCpu = config.getContainerMaxCpuCores.getOrElse(DEFAULT_CPU_CORES)
+ val jmxServer = if (new YarnConfig(config).getJmxServerEnabled) Some(new JmxServer()) else None
+
+ try {
+ // wire up all of the yarn event listeners
+ val state = new SamzaAppMasterState(-1, containerId, nodeHostString, nodePortString.toInt, nodeHttpPortString.toInt)
+ val service = new SamzaAppMasterService(config, state, registry, clientHelper)
+ val lifecycle = new SamzaAppMasterLifecycle(containerMem, containerCpu, state, amClient, hConfig)
+ val metrics = new SamzaAppMasterMetrics(config, state, registry)
+ val am = new SamzaAppMasterTaskManager({ System.currentTimeMillis }, config, state, amClient, hConfig)
+
+ // run the app master
+ new YarnAppMaster(List(state, service, lifecycle, metrics, am), amClient).run
+ } finally {
+ // jmxServer has to be stopped or will prevent process from exiting.
+ if (jmxServer.isDefined) {
+ jmxServer.get.stop
+ }
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-samza/blob/5ff71e51/samza-yarn/src/main/scala/org/apache/samza/job/yarn/SamzaAppMasterLifecycle.scala
----------------------------------------------------------------------
diff --git a/samza-yarn/src/main/scala/org/apache/samza/job/yarn/SamzaAppMasterLifecycle.scala b/samza-yarn/src/main/scala/org/apache/samza/job/yarn/SamzaAppMasterLifecycle.scala
new file mode 100644
index 0000000..95a6f05
--- /dev/null
+++ b/samza-yarn/src/main/scala/org/apache/samza/job/yarn/SamzaAppMasterLifecycle.scala
@@ -0,0 +1,73 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.samza.job.yarn
+import grizzled.slf4j.Logging
+import org.apache.samza.SamzaException
+import org.apache.hadoop.yarn.client.AMRMClient
+import org.apache.hadoop.yarn.conf.YarnConfiguration
+import org.apache.hadoop.yarn.api.records.FinalApplicationStatus
+
+/**
+ * Responsible for managing the lifecycle of the application master. Mostly,
+ * this means registering and unregistering with the RM, and shutting down
+ * when the RM tells us to Reboot.
+ */
+class SamzaAppMasterLifecycle(containerMem: Int, containerCpu: Int, state: SamzaAppMasterState, amClient: AMRMClient, conf: YarnConfiguration) extends YarnAppMasterListener with Logging {
+ var validResourceRequest = true
+ var shutdownMessage: String = null
+
+ override def onInit() {
+ val host = state.nodeHost
+
+ amClient.init(conf);
+ amClient.start
+
+ val response = amClient.registerApplicationMaster(host, state.rpcPort, "%s:%d" format (host, state.trackingPort))
+
+ // validate that the YARN cluster can handle our container resource requirements
+ val maxCapability = response.getMaximumResourceCapability
+ val minCapability = response.getMinimumResourceCapability
+ val maxMem = maxCapability.getMemory
+ val minMem = minCapability.getMemory
+ val maxCpu = maxCapability.getVirtualCores
+ val minCpu = minCapability.getVirtualCores
+
+ info("Got AM register response. The YARN RM supports container requests with max-mem: %s, min-mem: %s, max-cpu: %s, min-cpu: %s" format (maxMem, minMem, maxCpu, minCpu))
+
+ if (containerMem < minMem || containerMem > maxMem || containerCpu < minCpu || containerCpu > maxCpu) {
+ shutdownMessage = "The YARN cluster is unable to run your job due to unsatisfiable resource requirements. You asked for mem: %s, and cpu: %s." format (containerMem, containerCpu)
+ error(shutdownMessage)
+ validResourceRequest = false
+ state.status = FinalApplicationStatus.FAILED
+ }
+ }
+
+ override def onReboot() {
+ throw new SamzaException("Received a reboot signal from the RM, so throwing an exception to reboot the AM.")
+ }
+
+ override def onShutdown() {
+ info("Shutting down.")
+ amClient.unregisterApplicationMaster(state.status, shutdownMessage, null)
+ amClient.stop
+ }
+
+ override def shouldShutdown = !validResourceRequest
+}
http://git-wip-us.apache.org/repos/asf/incubator-samza/blob/5ff71e51/samza-yarn/src/main/scala/org/apache/samza/job/yarn/SamzaAppMasterMetrics.scala
----------------------------------------------------------------------
diff --git a/samza-yarn/src/main/scala/org/apache/samza/job/yarn/SamzaAppMasterMetrics.scala b/samza-yarn/src/main/scala/org/apache/samza/job/yarn/SamzaAppMasterMetrics.scala
new file mode 100644
index 0000000..4f6edfb
--- /dev/null
+++ b/samza-yarn/src/main/scala/org/apache/samza/job/yarn/SamzaAppMasterMetrics.scala
@@ -0,0 +1,90 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.samza.job.yarn
+import org.apache.samza.metrics.MetricsRegistryMap
+import org.apache.samza.metrics.JvmMetrics
+import org.apache.samza.config.Config
+import org.apache.samza.task.TaskContext
+import org.apache.samza.Partition
+import org.apache.samza.metrics.MetricsRegistry
+import org.apache.samza.config.StreamConfig.Config2Stream
+import org.apache.samza.config.MetricsConfig.Config2Metrics
+import org.apache.samza.metrics.MetricsReporterFactory
+import org.apache.samza.util.Util
+import org.apache.samza.metrics.ReadableMetricsRegistry
+import grizzled.slf4j.Logging
+import org.apache.samza.SamzaException
+import java.util.Timer
+import java.util.TimerTask
+import org.apache.samza.task.ReadableCollector
+
+object SamzaAppMasterMetrics {
+ val metricsGroup = "samza.yarn.am"
+
+ val sourceName = "ApplicationMaster"
+}
+
+/**
+ * Responsible for wiring up Samza's metrics. Given that Samza has a metric
+ * registry, we might as well use it. This class takes Samza's application
+ * master state, and converts it to metrics.
+ */
+class SamzaAppMasterMetrics(config: Config, state: SamzaAppMasterState, registry: ReadableMetricsRegistry) extends YarnAppMasterListener with Logging {
+ import SamzaAppMasterMetrics._
+
+ val jvm = new JvmMetrics(registry)
+ val mEventLoops = registry.newCounter(metricsGroup, "EventLoops")
+ val reporters = config.getMetricReporterNames.map(reporterName => {
+ val metricsFactoryClassName = config
+ .getMetricsFactoryClass(reporterName)
+ .getOrElse(throw new SamzaException("Metrics reporter %s missing .class config" format reporterName))
+
+ val reporter =
+ Util
+ .getObj[MetricsReporterFactory](metricsFactoryClassName)
+ .getMetricsReporter(reporterName, SamzaAppMasterMetrics.sourceName, config)
+
+ reporter.register(SamzaAppMasterMetrics.sourceName, registry)
+ (reporterName, reporter)
+ }).toMap
+
+ override def onInit() {
+ val mRunningContainers = registry.newGauge(metricsGroup, "RunningContainers", { state.runningTasks.size })
+ val mNeededContainers = registry.newGauge(metricsGroup, "NeededContainers", { state.neededContainers })
+ val mCompletedContainers = registry.newGauge(metricsGroup, "CompletedContainers", { state.completedTasks })
+ val mFailedContainers = registry.newGauge(metricsGroup, "FailedContainers", { state.failedContainers })
+ val mReleasedContainers = registry.newGauge(metricsGroup, "ReleasedContainers", { state.releasedContainers })
+ val mTasks = registry.newGauge(metricsGroup, "TaskCount", { state.taskCount })
+ val mHost = registry.newGauge(metricsGroup, "HttpHost", { state.nodeHost })
+ val mTrackingPort = registry.newGauge(metricsGroup, "HttpPort", { state.trackingPort })
+ val mRpcPort = registry.newGauge(metricsGroup, "RpcPort", { state.rpcPort })
+ val mAppAttemptId = registry.newGauge(metricsGroup, "AppAttemptId", { state.appAttemptId.toString })
+
+ reporters.values.foreach(_.start)
+ }
+
+ override def onEventLoop() {
+ mEventLoops.inc
+ }
+
+ override def onShutdown() {
+ reporters.values.foreach(_.stop)
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-samza/blob/5ff71e51/samza-yarn/src/main/scala/org/apache/samza/job/yarn/SamzaAppMasterService.scala
----------------------------------------------------------------------
diff --git a/samza-yarn/src/main/scala/org/apache/samza/job/yarn/SamzaAppMasterService.scala b/samza-yarn/src/main/scala/org/apache/samza/job/yarn/SamzaAppMasterService.scala
new file mode 100644
index 0000000..ce3fcc3
--- /dev/null
+++ b/samza-yarn/src/main/scala/org/apache/samza/job/yarn/SamzaAppMasterService.scala
@@ -0,0 +1,66 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.samza.job.yarn
+
+import org.apache.samza.util.Util
+import grizzled.slf4j.Logging
+import org.apache.samza.webapp._
+import org.apache.samza.config.Config
+import org.apache.samza.metrics.ReadableMetricsRegistry
+import org.apache.samza.SamzaException
+
+/**
+ * Samza's application master runs a very basic HTTP/JSON service to allow
+ * dashboards to check on the status of a job. SamzaAppMasterService starts
+ * up the web service when initialized.
+ */
+class SamzaAppMasterService(config: Config, state: SamzaAppMasterState, registry: ReadableMetricsRegistry, clientHelper: ClientHelper) extends YarnAppMasterListener with Logging {
+ override def onInit() {
+ // try starting the samza AM dashboard. try ten times, just in case we
+ // pick a port that's already in use.
+ for (i <- 0 until 10) {
+ val rpcPort = Util.randomBetween(10000, 50000)
+ val trackingPort = Util.randomBetween(10000, 50000)
+ info("Starting webapp at rpc %d, tracking port %d" format (rpcPort, trackingPort))
+
+ try {
+ val rpcapp = new WebAppServer("/", rpcPort)
+ rpcapp.addServlet("/*", new ApplicationMasterRestServlet(config, state, registry))
+ rpcapp.start
+
+ val webapp = new WebAppServer("/", trackingPort)
+ webapp.addServlet("/*", new ApplicationMasterWebServlet(config, state))
+ webapp.start
+
+ state.rpcPort = rpcPort
+ state.trackingPort = trackingPort
+ return
+ } catch {
+ case e: Exception => {
+ warn("Unable to start webapp on rpc port %d, tracking port %d .. retrying" format (rpcPort, trackingPort))
+ }
+ }
+ }
+
+ if (state.rpcPort == 0 || state.trackingPort == 0) {
+ throw new SamzaException("Giving up trying to start the webapp, since we keep getting ports that are already in use")
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-samza/blob/5ff71e51/samza-yarn/src/main/scala/org/apache/samza/job/yarn/SamzaAppMasterState.scala
----------------------------------------------------------------------
diff --git a/samza-yarn/src/main/scala/org/apache/samza/job/yarn/SamzaAppMasterState.scala b/samza-yarn/src/main/scala/org/apache/samza/job/yarn/SamzaAppMasterState.scala
new file mode 100644
index 0000000..fa1642b
--- /dev/null
+++ b/samza-yarn/src/main/scala/org/apache/samza/job/yarn/SamzaAppMasterState.scala
@@ -0,0 +1,54 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.samza.job.yarn
+import org.apache.hadoop.yarn.api.records.ContainerStatus
+import org.apache.hadoop.yarn.api.records.Container
+import org.apache.samza.config.Config
+import grizzled.slf4j.Logging
+import org.apache.hadoop.yarn.api.records.FinalApplicationStatus
+import org.apache.samza.Partition
+import org.apache.hadoop.yarn.util.ConverterUtils
+import org.apache.hadoop.yarn.api.records.ContainerId
+
+/**
+ * Samza's application master has state that is usually manipulated based on
+ * responses from YARN's resource manager (via SamzaAppMasterTaskManager). This
+ * class holds the current state of the application master.
+ */
+class SamzaAppMasterState(val taskId: Int, val containerId: ContainerId, val nodeHost: String, val nodePort: Int, val nodeHttpPort: Int) extends YarnAppMasterListener with Logging {
+ // controlled by the AM
+ var completedTasks = 0
+ var neededContainers = 0
+ var failedContainers = 0
+ var releasedContainers = 0
+ var taskCount = 0
+ var unclaimedTasks = Set[Int]()
+ var finishedTasks = Set[Int]()
+ var runningTasks = Map[Int, Container]()
+ var taskPartitions = Map[Int, Set[Partition]]()
+ var status = FinalApplicationStatus.UNDEFINED
+
+ // controlled by the service
+ var trackingPort = 0
+ var rpcPort = 0
+
+ // controlled on startup
+ var appAttemptId = containerId.getApplicationAttemptId
+}
http://git-wip-us.apache.org/repos/asf/incubator-samza/blob/5ff71e51/samza-yarn/src/main/scala/org/apache/samza/job/yarn/SamzaAppMasterTaskManager.scala
----------------------------------------------------------------------
diff --git a/samza-yarn/src/main/scala/org/apache/samza/job/yarn/SamzaAppMasterTaskManager.scala b/samza-yarn/src/main/scala/org/apache/samza/job/yarn/SamzaAppMasterTaskManager.scala
new file mode 100644
index 0000000..1a13ee5
--- /dev/null
+++ b/samza-yarn/src/main/scala/org/apache/samza/job/yarn/SamzaAppMasterTaskManager.scala
@@ -0,0 +1,334 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.samza.job.yarn
+import org.apache.hadoop.yarn.api.records.ContainerStatus
+import org.apache.hadoop.yarn.api.records.Container
+import org.apache.samza.config.Config
+import org.apache.samza.Partition
+import grizzled.slf4j.Logging
+import org.apache.samza.config.YarnConfig.Config2Yarn
+import org.apache.samza.config.YarnConfig
+import org.apache.samza.job.CommandBuilder
+import org.apache.hadoop.yarn.api.ApplicationConstants
+import org.apache.hadoop.security.UserGroupInformation
+import org.apache.hadoop.fs.Path
+import org.apache.samza.task.TaskContext
+import org.apache.samza.config.SystemConfig.Config2System
+import org.apache.samza.config.StreamConfig.Config2Stream
+import org.apache.samza.config.TaskConfig.Config2Task
+import scala.collection.JavaConversions._
+import org.apache.hadoop.yarn.api.records.FinalApplicationStatus
+import org.apache.samza.util.Util
+import scala.collection.JavaConversions._
+import org.apache.samza.SamzaException
+import org.apache.hadoop.yarn.client.AMRMClient
+import org.apache.hadoop.yarn.client.AMRMClient.ContainerRequest
+import org.apache.hadoop.yarn.api.records.Priority
+import org.apache.hadoop.yarn.api.records.Resource
+import org.apache.hadoop.yarn.util.Records
+import org.apache.hadoop.security.token.Token
+import org.apache.hadoop.yarn.api.records.LocalResource
+import org.apache.hadoop.yarn.util.ConverterUtils
+import org.apache.hadoop.yarn.api.records.ContainerLaunchContext
+import org.apache.hadoop.yarn.api.protocolrecords.StartContainerRequest
+import org.apache.hadoop.yarn.conf.YarnConfiguration
+import org.apache.hadoop.yarn.api.ContainerManager
+import org.apache.hadoop.yarn.api.records.LocalResourceType
+import org.apache.hadoop.yarn.api.records.LocalResourceVisibility
+import org.apache.hadoop.yarn.ipc.YarnRPC
+import org.apache.hadoop.yarn.security.ContainerTokenIdentifier
+import org.apache.hadoop.io.Text
+import org.apache.hadoop.net.NetUtils
+import java.util.Collections
+import java.security.PrivilegedAction
+import org.apache.samza.job.ShellCommandBuilder
+
+object SamzaAppMasterTaskManager {
+ val DEFAULT_CONTAINER_MEM = 256
+ val DEFAULT_CPU_CORES = 1
+ val DEFAULT_CONTAINER_RETRY_COUNT = 8
+ val DEFAULT_CONTAINER_RETRY_WINDOW_MS = 300000
+
+ def getPartitionsForTask(taskId: Int, taskCount: Int, partitions: Set[Partition]) = {
+ partitions.filter(_.getPartitionId % taskCount == taskId).toSet
+ }
+}
+
+case class TaskFailure(val count: Int, val lastFailure: Long)
+
+/**
+ * Samza's application master is mostly interested in requesting containers to
+ * run Samza jobs. SamzaAppMasterTaskManager is responsible for requesting new
+ * containers, handling failures, and notifying the application master that the
+ * job is done.
+ */
+class SamzaAppMasterTaskManager(clock: () => Long, config: Config, state: SamzaAppMasterState, amClient: AMRMClient, conf: YarnConfiguration) extends YarnAppMasterListener with Logging {
+ import SamzaAppMasterTaskManager._
+
+ state.taskCount = config.getTaskCount match {
+ case Some(count) => count
+ case None =>
+ info("No %s specified. Defaulting to one container." format YarnConfig.TASK_COUNT)
+ 1
+ }
+
+ val partitions = Util.getMaxInputStreamPartitions(config)
+ var taskFailures = Map[Int, TaskFailure]()
+ var tooManyFailedContainers = false
+
+ override def shouldShutdown = state.completedTasks == state.taskCount || tooManyFailedContainers
+
+ override def onInit() {
+ state.neededContainers = state.taskCount
+ state.unclaimedTasks = (0 until state.taskCount).toSet
+
+ info("Requesting %s containers" format state.taskCount)
+
+ requestContainers(config.getContainerMaxMemoryMb.getOrElse(DEFAULT_CONTAINER_MEM), config.getContainerMaxCpuCores.getOrElse(DEFAULT_CPU_CORES), state.neededContainers)
+ }
+
+ override def onContainerAllocated(container: Container) {
+ val containerIdStr = ConverterUtils.toString(container.getId)
+
+ info("Got a container from YARN ResourceManager: %s" format container)
+
+ state.unclaimedTasks.headOption match {
+ case Some(taskId) => {
+ info("Got available task id (%d) for container: %s" format (taskId, container))
+ val partitionsForTask = getPartitionsForTask(taskId, state.taskCount, partitions)
+ info("Claimed partitions %s for task ID %s" format (partitionsForTask, taskId))
+ val cmdBuilderClassName = config.getCommandClass.getOrElse(classOf[ShellCommandBuilder].getName)
+ val cmdBuilder = Class.forName(cmdBuilderClassName).newInstance.asInstanceOf[CommandBuilder]
+ .setConfig(config)
+ .setName("SamzaContainer-%s" format taskId)
+ .setPartitions(partitionsForTask)
+ .setTotalPartitions(partitions.size)
+ val command = cmdBuilder.buildCommand
+ info("Task ID %s using command %s" format (taskId, command))
+ val env = cmdBuilder.buildEnvironment.map { case (k, v) => (k, Util.envVarEscape(v)) }
+ info("Task ID %s using env %s" format (taskId, env))
+ val user = UserGroupInformation.getCurrentUser
+ info("Task ID %s using user %s" format (taskId, user))
+ val path = new Path(config.getPackagePath.get)
+ info("Starting task ID %s using package path %s" format (taskId, path))
+
+ startContainer(
+ path,
+ container,
+ user,
+ env.toMap,
+ "export SAMZA_LOG_DIR=%s && ln -sfn %s logs && exec ./__package/%s 1>logs/%s 2>logs/%s" format (ApplicationConstants.LOG_DIR_EXPANSION_VAR, ApplicationConstants.LOG_DIR_EXPANSION_VAR, command, ApplicationConstants.STDOUT, ApplicationConstants.STDERR))
+
+ state.neededContainers -= 1
+ state.runningTasks += taskId -> container
+ state.unclaimedTasks -= taskId
+ state.taskPartitions += taskId -> partitionsForTask
+
+ info("Claimed task ID %s for container %s on node %s (http://%s/node/containerlogs/%s)." format (taskId, containerIdStr, container.getNodeId.getHost, container.getNodeHttpAddress, containerIdStr))
+
+ info("Started task ID %s" format taskId)
+ }
+ case _ => {
+ // there are no more tasks to run, so release the container
+ info("Got an extra container from YARN ResourceManager: %s" format (container))
+
+ amClient.releaseAssignedContainer(container.getId)
+ }
+ }
+ }
+
+ override def onContainerCompleted(containerStatus: ContainerStatus) {
+ val containerIdStr = ConverterUtils.toString(containerStatus.getContainerId)
+ val taskId = state.runningTasks.filter { case (_, container) => container.getId().equals(containerStatus.getContainerId()) }.keys.headOption
+
+ taskId match {
+ case Some(taskId) => {
+ state.runningTasks -= taskId
+ state.taskPartitions -= taskId
+ }
+ case _ => None
+ }
+
+ containerStatus.getExitStatus match {
+ case 0 => {
+ info("Container %s completed successfully." format containerIdStr)
+
+ state.completedTasks += 1
+
+ if (taskId.isDefined) {
+ state.finishedTasks += taskId.get
+ taskFailures -= taskId.get
+ }
+
+ if (state.completedTasks == state.taskCount) {
+ info("Setting job status to SUCCEEDED, since all tasks have been marked as completed.")
+ state.status = FinalApplicationStatus.SUCCEEDED
+ }
+ }
+ case -100 => {
+ info("Got an exit code of -100. This means that container %s was "
+ + "killed by YARN, either due to being released by the application "
+ + "master or being 'lost' due to node failures etc." format containerIdStr)
+
+ state.releasedContainers += 1
+
+ // If this container was assigned some partitions (a taskId), then
+ // clean up, and request a new container for the tasks. This only
+ // should happen if the container was 'lost' due to node failure, not
+ // if the AM released the container.
+ if (taskId.isDefined) {
+ info("Released container %s was assigned task ID %s. Requesting a new container for the task." format (containerIdStr, taskId.get))
+
+ state.neededContainers += 1
+ state.unclaimedTasks += taskId.get
+
+ // request a new container
+ requestContainers(config.getContainerMaxMemoryMb.getOrElse(DEFAULT_CONTAINER_MEM), config.getContainerMaxCpuCores.getOrElse(DEFAULT_CPU_CORES), 1)
+ }
+ }
+ case _ => {
+ info("Container %s failed with exit code %d - %s." format (containerIdStr, containerStatus.getExitStatus, containerStatus.getDiagnostics))
+
+ state.failedContainers += 1
+
+ taskId match {
+ case Some(taskId) =>
+ info("Failed container %s owned task id %s." format (containerIdStr, taskId))
+
+ state.unclaimedTasks += taskId
+ state.neededContainers += 1
+
+ // A container failed for an unknown reason. Let's check to see if
+ // we need to shutdown the whole app master if too many container
+ // failures have happened. The rules for failing are that the failure
+ // count for a task id must be > the configured retry count, and the
+ // last failure (the one prior to this one) must have happened less
+ // than retry window ms ago. If retry count is set to 0, the app
+ // master will fail on any container failure. If the retry count is
+ // set to a number < 0, a container failure will never trigger an
+ // app master failure.
+ val retryCount = config.getContainerRetryCount.getOrElse(DEFAULT_CONTAINER_RETRY_COUNT)
+ val retryWindowMs = config.getContainerRetryWindowMs.getOrElse(DEFAULT_CONTAINER_RETRY_WINDOW_MS)
+
+ if (retryCount == 0) {
+ error("Task id %s (%s) failed, and retry count is set to 0, so shutting down the application master, and marking the job as failed."
+ format (taskId, containerIdStr))
+
+ tooManyFailedContainers = true
+ } else if (retryCount > 0) {
+ val (currentFailCount, lastFailureTime) = taskFailures.get(taskId) match {
+ case Some(TaskFailure(count, lastFailure)) => (count + 1, lastFailure)
+ case _ => (1, 0L)
+ }
+
+ if (currentFailCount > retryCount) {
+ val lastFailureMsDiff = clock() - lastFailureTime
+
+ if (lastFailureMsDiff < retryWindowMs) {
+ error("Task id %s (%s) has failed %s times, with last failure %sms ago. This is greater than retry count of %s and window of %s, so shutting down the application master, and marking the job as failed."
+ format (taskId, containerIdStr, currentFailCount, lastFailureMsDiff, retryCount, retryWindowMs))
+
+ // We have too many failures, and we're within the window
+ // boundary, so reset shut down the app master.
+ tooManyFailedContainers = true
+ state.status = FinalApplicationStatus.FAILED
+ } else {
+ info("Resetting fail count for task id %s back to 1, since last container failure (%s) for this task id was outside the bounds of the retry window."
+ format (taskId, containerIdStr))
+
+ // Reset counter back to 1, since the last failure for this
+ // container happened outside the window boundary.
+ taskFailures += taskId -> TaskFailure(1, clock())
+ }
+ } else {
+ info("Current fail count for task id %s is %s." format (taskId, currentFailCount))
+ taskFailures += taskId -> TaskFailure(currentFailCount, clock())
+ }
+ }
+
+ if (!tooManyFailedContainers) {
+ // Request a new container
+ requestContainers(config.getContainerMaxMemoryMb.getOrElse(DEFAULT_CONTAINER_MEM), config.getContainerMaxCpuCores.getOrElse(DEFAULT_CPU_CORES), 1)
+ }
+ case _ => None
+ }
+ }
+ }
+ }
+
+ protected def startContainer(packagePath: Path, container: Container, ugi: UserGroupInformation, env: Map[String, String], cmds: String*) {
+ info("starting container %s %s %s %s %s" format (packagePath, container, ugi, env, cmds))
+ // connect to container manager (based on similar code in the ContainerLauncher in Hadoop MapReduce)
+ val contToken = container.getContainerToken
+ val address = container.getNodeId.getHost + ":" + container.getNodeId.getPort
+ var user = ugi
+
+ if (UserGroupInformation.isSecurityEnabled) {
+ debug("security is enabled")
+ val hadoopToken = new Token[ContainerTokenIdentifier](contToken.getIdentifier.array, contToken.getPassword.array, new Text(contToken.getKind), new Text(contToken.getService))
+ user = UserGroupInformation.createRemoteUser(address)
+ user.addToken(hadoopToken)
+ debug("changed user to %s" format user)
+ }
+
+ val containerManager = user.doAs(new PrivilegedAction[ContainerManager] {
+ def run(): ContainerManager = {
+ return YarnRPC.create(conf).getProxy(classOf[ContainerManager], NetUtils.createSocketAddr(address), conf).asInstanceOf[ContainerManager]
+ }
+ })
+
+ // set the local package so that the containers and app master are provisioned with it
+ val packageResource = Records.newRecord(classOf[LocalResource])
+ val packageUrl = ConverterUtils.getYarnUrlFromPath(packagePath)
+ val fileStatus = packagePath.getFileSystem(conf).getFileStatus(packagePath)
+
+ packageResource.setResource(packageUrl)
+ packageResource.setSize(fileStatus.getLen)
+ packageResource.setTimestamp(fileStatus.getModificationTime)
+ packageResource.setType(LocalResourceType.ARCHIVE)
+ packageResource.setVisibility(LocalResourceVisibility.APPLICATION)
+
+ // start the container
+ val ctx = Records.newRecord(classOf[ContainerLaunchContext])
+ ctx.setEnvironment(env)
+ ctx.setContainerId(container.getId())
+ ctx.setResource(container.getResource())
+ ctx.setUser(user.getShortUserName())
+ ctx.setCommands(cmds.toList)
+ ctx.setLocalResources(Collections.singletonMap("__package", packageResource))
+
+ debug("setting package to %s" format packageResource)
+ debug("setting context to %s" format ctx)
+
+ val startContainerRequest = Records.newRecord(classOf[StartContainerRequest])
+ startContainerRequest.setContainerLaunchContext(ctx)
+ containerManager.startContainer(startContainerRequest)
+ }
+
+ protected def requestContainers(memMb: Int, cpuCores: Int, containers: Int) {
+ info("Requesting %d container(s) with %dmb of memory" format (containers, memMb))
+ val capability = Records.newRecord(classOf[Resource])
+ val priority = Records.newRecord(classOf[Priority])
+ priority.setPriority(0)
+ capability.setMemory(memMb)
+ capability.setVirtualCores(cpuCores)
+ amClient.addContainerRequest(new ContainerRequest(capability, null, null, priority, containers))
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-samza/blob/5ff71e51/samza-yarn/src/main/scala/org/apache/samza/job/yarn/YarnAppMaster.scala
----------------------------------------------------------------------
diff --git a/samza-yarn/src/main/scala/org/apache/samza/job/yarn/YarnAppMaster.scala b/samza-yarn/src/main/scala/org/apache/samza/job/yarn/YarnAppMaster.scala
new file mode 100644
index 0000000..14e3865
--- /dev/null
+++ b/samza-yarn/src/main/scala/org/apache/samza/job/yarn/YarnAppMaster.scala
@@ -0,0 +1,76 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.samza.job.yarn
+
+import scala.collection.JavaConversions._
+import grizzled.slf4j.Logging
+import org.apache.hadoop.yarn.client.AMRMClient
+
+/**
+ * YARN's API is somewhat clunky. Most implementations just sit in a loop, and
+ * poll the resource manager every N seconds (see the distributed shell
+ * example). To make life slightly better, Samza separates the polling logic
+ * from the application master logic, and we convert synchronous polling calls
+ * to callbacks, which are more intuitive when dealing with event based
+ * paradigms like YARN.
+ *
+ * <br/><br/>
+ *
+ * SamzaAppMaster uses this class to wire up all of Samza's application master
+ * listeners.
+ */
+class YarnAppMaster(pollIntervalMs: Long, listeners: List[YarnAppMasterListener], amClient: AMRMClient) extends Logging {
+ var isShutdown = false
+
+ def this(listeners: List[YarnAppMasterListener], amClient: AMRMClient) = this(1000, listeners, amClient)
+
+ def run {
+ try {
+ listeners.foreach(_.onInit)
+
+ while (!isShutdown && !listeners.map(_.shouldShutdown).reduceLeft(_ || _)) {
+ val response = amClient.allocate(0).getAMResponse
+
+ if (response.getReboot) {
+ listeners.foreach(_.onReboot)
+ }
+
+ listeners.foreach(_.onEventLoop)
+ response.getCompletedContainersStatuses.foreach(containerStatus => listeners.foreach(_.onContainerCompleted(containerStatus)))
+ response.getAllocatedContainers.foreach(container => listeners.foreach(_.onContainerAllocated(container)))
+
+ try {
+ Thread.sleep(pollIntervalMs)
+ } catch {
+ case e: InterruptedException => {
+ isShutdown = true
+ info("got interrupt in app master thread, so shutting down")
+ }
+ }
+ }
+ } finally {
+ listeners.foreach(listener => try {
+ listener.onShutdown
+ } catch {
+ case e: Throwable => warn("Listener %s failed to shutdown." format listener, e)
+ })
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-samza/blob/5ff71e51/samza-yarn/src/main/scala/org/apache/samza/job/yarn/YarnAppMasterListener.scala
----------------------------------------------------------------------
diff --git a/samza-yarn/src/main/scala/org/apache/samza/job/yarn/YarnAppMasterListener.scala b/samza-yarn/src/main/scala/org/apache/samza/job/yarn/YarnAppMasterListener.scala
new file mode 100644
index 0000000..1353e86
--- /dev/null
+++ b/samza-yarn/src/main/scala/org/apache/samza/job/yarn/YarnAppMasterListener.scala
@@ -0,0 +1,73 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.samza.job.yarn
+import org.apache.hadoop.yarn.api.records.Container
+import org.apache.hadoop.yarn.api.records.ContainerStatus
+
+/**
+ * Classes that wish to listen to callback events from YarnAppMaster must
+ * implement this trait.
+ */
+trait YarnAppMasterListener {
+ /**
+ * If true, YarnAppMaster will cease to poll the RM, and call onShutdown for
+ * all listeners.
+ */
+ def shouldShutdown: Boolean = false
+
+ /**
+ * Invoked by YarnAppMaster once per listener, before entering the RM polling
+ * event loop.
+ */
+ def onInit() {}
+
+ /**
+ * Invoked whenever the RM responds with a reboot request. Usually, reboots
+ * are triggered by the YARN RM when its state gets out of sync with the
+ * application master (usually the result of restarting the RM).
+ * YarnAppMaster does not actually restart anything. It is up to one or more
+ * listeners to trigger a failure, or shutdown.
+ */
+ def onReboot() {}
+
+ /**
+ * Signifies that the YarnAppMaster has exited the RM polling event loop, and
+ * is about to exit.
+ */
+ def onShutdown() {}
+
+ /**
+ * Whenever the RM allocates a container for the application master, this
+ * callback is invoked (once per container).
+ */
+ def onContainerAllocated(container: Container) {}
+
+ /**
+ * Whenever a container completes (either failure, or success), this callback
+ * will be invoked.
+ */
+ def onContainerCompleted(containerStatus: ContainerStatus) {}
+
+ /**
+ * Invoked by YarnAppMaster once per listener, every time it loops around to
+ * poll the RM again.
+ */
+ def onEventLoop() {}
+}
http://git-wip-us.apache.org/repos/asf/incubator-samza/blob/5ff71e51/samza-yarn/src/main/scala/org/apache/samza/job/yarn/YarnJob.scala
----------------------------------------------------------------------
diff --git a/samza-yarn/src/main/scala/org/apache/samza/job/yarn/YarnJob.scala b/samza-yarn/src/main/scala/org/apache/samza/job/yarn/YarnJob.scala
new file mode 100644
index 0000000..bde38e1
--- /dev/null
+++ b/samza-yarn/src/main/scala/org/apache/samza/job/yarn/YarnJob.scala
@@ -0,0 +1,108 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.samza.job.yarn
+import org.apache.hadoop.conf.Configuration
+import org.apache.hadoop.fs.Path
+import org.apache.hadoop.security.UserGroupInformation
+import org.apache.hadoop.yarn.api.records.ApplicationId
+import org.apache.hadoop.yarn.api.ApplicationConstants
+import org.apache.samza.config.Config
+import org.apache.samza.util.Util
+import org.apache.samza.job.ApplicationStatus
+import org.apache.samza.job.ApplicationStatus.Running
+import org.apache.samza.job.StreamJob
+import org.apache.samza.job.ApplicationStatus.SuccessfulFinish
+import org.apache.samza.job.ApplicationStatus.UnsuccessfulFinish
+import org.apache.samza.config.serializers.JsonConfigSerializer
+import org.apache.samza.config.YarnConfig.Config2Yarn
+import org.apache.samza.config.JobConfig.Config2Job
+import org.apache.samza.config.YarnConfig
+import org.apache.samza.config.ShellCommandConfig
+import org.apache.samza.SamzaException
+
+/**
+ * Starts the application manager
+ */
+class YarnJob(config: Config, hadoopConfig: Configuration) extends StreamJob {
+ val client = new ClientHelper(hadoopConfig)
+ var appId: Option[ApplicationId] = None
+
+ def submit: YarnJob = {
+ appId = client.submitApplication(
+ new Path(config.getPackagePath.getOrElse(throw new SamzaException("No YARN package path defined in config."))),
+ config.getAMContainerMaxMemoryMb.getOrElse(512),
+ 1,
+ UserGroupInformation.getCurrentUser,
+ List(
+ "export SAMZA_LOG_DIR=%s && ln -sfn %s logs && exec ./__package/bin/run-am.sh 1>logs/%s 2>logs/%s"
+ format (ApplicationConstants.LOG_DIR_EXPANSION_VAR, ApplicationConstants.LOG_DIR_EXPANSION_VAR, ApplicationConstants.STDOUT, ApplicationConstants.STDERR)),
+ Some(Map(
+ YarnConfig.ENV_CONFIG -> Util.envVarEscape(JsonConfigSerializer.toJson(config)),
+ ShellCommandConfig.ENV_SAMZA_OPTS -> Util.envVarEscape(config.getAmOpts.getOrElse("")))),
+ Some("%s_%s" format (config.getName.get, config.getJobId.getOrElse(1))))
+
+ this
+ }
+
+ def waitForFinish(timeoutMs: Long): ApplicationStatus = {
+ val startTimeMs = System.currentTimeMillis()
+
+ while (System.currentTimeMillis() - startTimeMs < timeoutMs) {
+ Option(getStatus) match {
+ case Some(s) => if (SuccessfulFinish.equals(s) || UnsuccessfulFinish.equals(s)) return s
+ case None => null
+ }
+
+ Thread.sleep(1000)
+ }
+
+ Running
+ }
+
+ def waitForStatus(status: ApplicationStatus, timeoutMs: Long): ApplicationStatus = {
+ val startTimeMs = System.currentTimeMillis()
+
+ while (System.currentTimeMillis() - startTimeMs < timeoutMs) {
+ Option(getStatus) match {
+ case Some(s) => if (status.equals(s)) return status
+ case None => null
+ }
+
+ Thread.sleep(1000)
+ }
+
+ Running
+ }
+
+ def getStatus: ApplicationStatus = {
+ appId match {
+ case Some(appId) => client.status(appId).getOrElse(null)
+ case None => null
+ }
+ }
+
+ def kill: YarnJob = {
+ appId match {
+ case Some(appId) => client.kill(appId)
+ case None => None
+ }
+ this
+ }
+}