You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@samza.apache.org by cr...@apache.org on 2013/08/12 18:21:32 UTC

[02/15] initial import.

http://git-wip-us.apache.org/repos/asf/incubator-samza/blob/5ff71e51/samza-yarn/src/main/resources/scalate/js/bootstrap.min.js
----------------------------------------------------------------------
diff --git a/samza-yarn/src/main/resources/scalate/js/bootstrap.min.js b/samza-yarn/src/main/resources/scalate/js/bootstrap.min.js
new file mode 100644
index 0000000..bafe4b8
--- /dev/null
+++ b/samza-yarn/src/main/resources/scalate/js/bootstrap.min.js
@@ -0,0 +1,6 @@
+/*!
+* Bootstrap.js by @fat & @mdo
+* Copyright 2012 Twitter, Inc.
+* http://www.apache.org/licenses/LICENSE-2.0.txt
+*/
+!function(e){"use strict";e(function(){e.support.transition=function(){var e=function(){var e=document.createElement("bootstrap"),t={WebkitTransition:"webkitTransitionEnd",MozTransition:"transitionend",OTransition:"oTransitionEnd otransitionend",transition:"transitionend"},n;for(n in t)if(e.style[n]!==undefined)return t[n]}();return e&&{end:e}}()})}(window.jQuery),!function(e){"use strict";var t='[data-dismiss="alert"]',n=function(n){e(n).on("click",t,this.close)};n.prototype.close=function(t){function s(){i.trigger("closed").remove()}var n=e(this),r=n.attr("data-target"),i;r||(r=n.attr("href"),r=r&&r.replace(/.*(?=#[^\s]*$)/,"")),i=e(r),t&&t.preventDefault(),i.length||(i=n.hasClass("alert")?n:n.parent()),i.trigger(t=e.Event("close"));if(t.isDefaultPrevented())return;i.removeClass("in"),e.support.transition&&i.hasClass("fade")?i.on(e.support.transition.end,s):s()};var r=e.fn.alert;e.fn.alert=function(t){return this.each(function(){var r=e(this),i=r.data("alert");i||r.data("alert",i=
 new n(this)),typeof t=="string"&&i[t].call(r)})},e.fn.alert.Constructor=n,e.fn.alert.noConflict=function(){return e.fn.alert=r,this},e(document).on("click.alert.data-api",t,n.prototype.close)}(window.jQuery),!function(e){"use strict";var t=function(t,n){this.$element=e(t),this.options=e.extend({},e.fn.button.defaults,n)};t.prototype.setState=function(e){var t="disabled",n=this.$element,r=n.data(),i=n.is("input")?"val":"html";e+="Text",r.resetText||n.data("resetText",n[i]()),n[i](r[e]||this.options[e]),setTimeout(function(){e=="loadingText"?n.addClass(t).attr(t,t):n.removeClass(t).removeAttr(t)},0)},t.prototype.toggle=function(){var e=this.$element.closest('[data-toggle="buttons-radio"]');e&&e.find(".active").removeClass("active"),this.$element.toggleClass("active")};var n=e.fn.button;e.fn.button=function(n){return this.each(function(){var r=e(this),i=r.data("button"),s=typeof n=="object"&&n;i||r.data("button",i=new t(this,s)),n=="toggle"?i.toggle():n&&i.setState(n)})},e.fn.button.de
 faults={loadingText:"loading..."},e.fn.button.Constructor=t,e.fn.button.noConflict=function(){return e.fn.button=n,this},e(document).on("click.button.data-api","[data-toggle^=button]",function(t){var n=e(t.target);n.hasClass("btn")||(n=n.closest(".btn")),n.button("toggle")})}(window.jQuery),!function(e){"use strict";var t=function(t,n){this.$element=e(t),this.$indicators=this.$element.find(".carousel-indicators"),this.options=n,this.options.pause=="hover"&&this.$element.on("mouseenter",e.proxy(this.pause,this)).on("mouseleave",e.proxy(this.cycle,this))};t.prototype={cycle:function(t){return t||(this.paused=!1),this.interval&&clearInterval(this.interval),this.options.interval&&!this.paused&&(this.interval=setInterval(e.proxy(this.next,this),this.options.interval)),this},getActiveIndex:function(){return this.$active=this.$element.find(".item.active"),this.$items=this.$active.parent().children(),this.$items.index(this.$active)},to:function(t){var n=this.getActiveIndex(),r=this;if(t>thi
 s.$items.length-1||t<0)return;return this.sliding?this.$element.one("slid",function(){r.to(t)}):n==t?this.pause().cycle():this.slide(t>n?"next":"prev",e(this.$items[t]))},pause:function(t){return t||(this.paused=!0),this.$element.find(".next, .prev").length&&e.support.transition.end&&(this.$element.trigger(e.support.transition.end),this.cycle(!0)),clearInterval(this.interval),this.interval=null,this},next:function(){if(this.sliding)return;return this.slide("next")},prev:function(){if(this.sliding)return;return this.slide("prev")},slide:function(t,n){var r=this.$element.find(".item.active"),i=n||r[t](),s=this.interval,o=t=="next"?"left":"right",u=t=="next"?"first":"last",a=this,f;this.sliding=!0,s&&this.pause(),i=i.length?i:this.$element.find(".item")[u](),f=e.Event("slide",{relatedTarget:i[0],direction:o});if(i.hasClass("active"))return;this.$indicators.length&&(this.$indicators.find(".active").removeClass("active"),this.$element.one("slid",function(){var t=e(a.$indicators.children(
 )[a.getActiveIndex()]);t&&t.addClass("active")}));if(e.support.transition&&this.$element.hasClass("slide")){this.$element.trigger(f);if(f.isDefaultPrevented())return;i.addClass(t),i[0].offsetWidth,r.addClass(o),i.addClass(o),this.$element.one(e.support.transition.end,function(){i.removeClass([t,o].join(" ")).addClass("active"),r.removeClass(["active",o].join(" ")),a.sliding=!1,setTimeout(function(){a.$element.trigger("slid")},0)})}else{this.$element.trigger(f);if(f.isDefaultPrevented())return;r.removeClass("active"),i.addClass("active"),this.sliding=!1,this.$element.trigger("slid")}return s&&this.cycle(),this}};var n=e.fn.carousel;e.fn.carousel=function(n){return this.each(function(){var r=e(this),i=r.data("carousel"),s=e.extend({},e.fn.carousel.defaults,typeof n=="object"&&n),o=typeof n=="string"?n:s.slide;i||r.data("carousel",i=new t(this,s)),typeof n=="number"?i.to(n):o?i[o]():s.interval&&i.pause().cycle()})},e.fn.carousel.defaults={interval:5e3,pause:"hover"},e.fn.carousel.Const
 ructor=t,e.fn.carousel.noConflict=function(){return e.fn.carousel=n,this},e(document).on("click.carousel.data-api","[data-slide], [data-slide-to]",function(t){var n=e(this),r,i=e(n.attr("data-target")||(r=n.attr("href"))&&r.replace(/.*(?=#[^\s]+$)/,"")),s=e.extend({},i.data(),n.data()),o;i.carousel(s),(o=n.attr("data-slide-to"))&&i.data("carousel").pause().to(o).cycle(),t.preventDefault()})}(window.jQuery),!function(e){"use strict";var t=function(t,n){this.$element=e(t),this.options=e.extend({},e.fn.collapse.defaults,n),this.options.parent&&(this.$parent=e(this.options.parent)),this.options.toggle&&this.toggle()};t.prototype={constructor:t,dimension:function(){var e=this.$element.hasClass("width");return e?"width":"height"},show:function(){var t,n,r,i;if(this.transitioning||this.$element.hasClass("in"))return;t=this.dimension(),n=e.camelCase(["scroll",t].join("-")),r=this.$parent&&this.$parent.find("> .accordion-group > .in");if(r&&r.length){i=r.data("collapse");if(i&&i.transitionin
 g)return;r.collapse("hide"),i||r.data("collapse",null)}this.$element[t](0),this.transition("addClass",e.Event("show"),"shown"),e.support.transition&&this.$element[t](this.$element[0][n])},hide:function(){var t;if(this.transitioning||!this.$element.hasClass("in"))return;t=this.dimension(),this.reset(this.$element[t]()),this.transition("removeClass",e.Event("hide"),"hidden"),this.$element[t](0)},reset:function(e){var t=this.dimension();return this.$element.removeClass("collapse")[t](e||"auto")[0].offsetWidth,this.$element[e!==null?"addClass":"removeClass"]("collapse"),this},transition:function(t,n,r){var i=this,s=function(){n.type=="show"&&i.reset(),i.transitioning=0,i.$element.trigger(r)};this.$element.trigger(n);if(n.isDefaultPrevented())return;this.transitioning=1,this.$element[t]("in"),e.support.transition&&this.$element.hasClass("collapse")?this.$element.one(e.support.transition.end,s):s()},toggle:function(){this[this.$element.hasClass("in")?"hide":"show"]()}};var n=e.fn.collapse
 ;e.fn.collapse=function(n){return this.each(function(){var r=e(this),i=r.data("collapse"),s=e.extend({},e.fn.collapse.defaults,r.data(),typeof n=="object"&&n);i||r.data("collapse",i=new t(this,s)),typeof n=="string"&&i[n]()})},e.fn.collapse.defaults={toggle:!0},e.fn.collapse.Constructor=t,e.fn.collapse.noConflict=function(){return e.fn.collapse=n,this},e(document).on("click.collapse.data-api","[data-toggle=collapse]",function(t){var n=e(this),r,i=n.attr("data-target")||t.preventDefault()||(r=n.attr("href"))&&r.replace(/.*(?=#[^\s]+$)/,""),s=e(i).data("collapse")?"toggle":n.data();n[e(i).hasClass("in")?"addClass":"removeClass"]("collapsed"),e(i).collapse(s)})}(window.jQuery),!function(e){"use strict";function r(){e(t).each(function(){i(e(this)).removeClass("open")})}function i(t){var n=t.attr("data-target"),r;n||(n=t.attr("href"),n=n&&/#/.test(n)&&n.replace(/.*(?=#[^\s]*$)/,"")),r=n&&e(n);if(!r||!r.length)r=t.parent();return r}var t="[data-toggle=dropdown]",n=function(t){var n=e(t).o
 n("click.dropdown.data-api",this.toggle);e("html").on("click.dropdown.data-api",function(){n.parent().removeClass("open")})};n.prototype={constructor:n,toggle:function(t){var n=e(this),s,o;if(n.is(".disabled, :disabled"))return;return s=i(n),o=s.hasClass("open"),r(),o||s.toggleClass("open"),n.focus(),!1},keydown:function(n){var r,s,o,u,a,f;if(!/(38|40|27)/.test(n.keyCode))return;r=e(this),n.preventDefault(),n.stopPropagation();if(r.is(".disabled, :disabled"))return;u=i(r),a=u.hasClass("open");if(!a||a&&n.keyCode==27)return n.which==27&&u.find(t).focus(),r.click();s=e("[role=menu] li:not(.divider):visible a",u);if(!s.length)return;f=s.index(s.filter(":focus")),n.keyCode==38&&f>0&&f--,n.keyCode==40&&f<s.length-1&&f++,~f||(f=0),s.eq(f).focus()}};var s=e.fn.dropdown;e.fn.dropdown=function(t){return this.each(function(){var r=e(this),i=r.data("dropdown");i||r.data("dropdown",i=new n(this)),typeof t=="string"&&i[t].call(r)})},e.fn.dropdown.Constructor=n,e.fn.dropdown.noConflict=function()
 {return e.fn.dropdown=s,this},e(document).on("click.dropdown.data-api",r).on("click.dropdown.data-api",".dropdown form",function(e){e.stopPropagation()}).on("click.dropdown-menu",function(e){e.stopPropagation()}).on("click.dropdown.data-api",t,n.prototype.toggle).on("keydown.dropdown.data-api",t+", [role=menu]",n.prototype.keydown)}(window.jQuery),!function(e){"use strict";var t=function(t,n){this.options=n,this.$element=e(t).delegate('[data-dismiss="modal"]',"click.dismiss.modal",e.proxy(this.hide,this)),this.options.remote&&this.$element.find(".modal-body").load(this.options.remote)};t.prototype={constructor:t,toggle:function(){return this[this.isShown?"hide":"show"]()},show:function(){var t=this,n=e.Event("show");this.$element.trigger(n);if(this.isShown||n.isDefaultPrevented())return;this.isShown=!0,this.escape(),this.backdrop(function(){var n=e.support.transition&&t.$element.hasClass("fade");t.$element.parent().length||t.$element.appendTo(document.body),t.$element.show(),n&&t.$e
 lement[0].offsetWidth,t.$element.addClass("in").attr("aria-hidden",!1),t.enforceFocus(),n?t.$element.one(e.support.transition.end,function(){t.$element.focus().trigger("shown")}):t.$element.focus().trigger("shown")})},hide:function(t){t&&t.preventDefault();var n=this;t=e.Event("hide"),this.$element.trigger(t);if(!this.isShown||t.isDefaultPrevented())return;this.isShown=!1,this.escape(),e(document).off("focusin.modal"),this.$element.removeClass("in").attr("aria-hidden",!0),e.support.transition&&this.$element.hasClass("fade")?this.hideWithTransition():this.hideModal()},enforceFocus:function(){var t=this;e(document).on("focusin.modal",function(e){t.$element[0]!==e.target&&!t.$element.has(e.target).length&&t.$element.focus()})},escape:function(){var e=this;this.isShown&&this.options.keyboard?this.$element.on("keyup.dismiss.modal",function(t){t.which==27&&e.hide()}):this.isShown||this.$element.off("keyup.dismiss.modal")},hideWithTransition:function(){var t=this,n=setTimeout(function(){t.
 $element.off(e.support.transition.end),t.hideModal()},500);this.$element.one(e.support.transition.end,function(){clearTimeout(n),t.hideModal()})},hideModal:function(){var e=this;this.$element.hide(),this.backdrop(function(){e.removeBackdrop(),e.$element.trigger("hidden")})},removeBackdrop:function(){this.$backdrop&&this.$backdrop.remove(),this.$backdrop=null},backdrop:function(t){var n=this,r=this.$element.hasClass("fade")?"fade":"";if(this.isShown&&this.options.backdrop){var i=e.support.transition&&r;this.$backdrop=e('<div class="modal-backdrop '+r+'" />').appendTo(document.body),this.$backdrop.click(this.options.backdrop=="static"?e.proxy(this.$element[0].focus,this.$element[0]):e.proxy(this.hide,this)),i&&this.$backdrop[0].offsetWidth,this.$backdrop.addClass("in");if(!t)return;i?this.$backdrop.one(e.support.transition.end,t):t()}else!this.isShown&&this.$backdrop?(this.$backdrop.removeClass("in"),e.support.transition&&this.$element.hasClass("fade")?this.$backdrop.one(e.support.tra
 nsition.end,t):t()):t&&t()}};var n=e.fn.modal;e.fn.modal=function(n){return this.each(function(){var r=e(this),i=r.data("modal"),s=e.extend({},e.fn.modal.defaults,r.data(),typeof n=="object"&&n);i||r.data("modal",i=new t(this,s)),typeof n=="string"?i[n]():s.show&&i.show()})},e.fn.modal.defaults={backdrop:!0,keyboard:!0,show:!0},e.fn.modal.Constructor=t,e.fn.modal.noConflict=function(){return e.fn.modal=n,this},e(document).on("click.modal.data-api",'[data-toggle="modal"]',function(t){var n=e(this),r=n.attr("href"),i=e(n.attr("data-target")||r&&r.replace(/.*(?=#[^\s]+$)/,"")),s=i.data("modal")?"toggle":e.extend({remote:!/#/.test(r)&&r},i.data(),n.data());t.preventDefault(),i.modal(s).one("hide",function(){n.focus()})})}(window.jQuery),!function(e){"use strict";var t=function(e,t){this.init("tooltip",e,t)};t.prototype={constructor:t,init:function(t,n,r){var i,s,o,u,a;this.type=t,this.$element=e(n),this.options=this.getOptions(r),this.enabled=!0,o=this.options.trigger.split(" ");for(a=o
 .length;a--;)u=o[a],u=="click"?this.$element.on("click."+this.type,this.options.selector,e.proxy(this.toggle,this)):u!="manual"&&(i=u=="hover"?"mouseenter":"focus",s=u=="hover"?"mouseleave":"blur",this.$element.on(i+"."+this.type,this.options.selector,e.proxy(this.enter,this)),this.$element.on(s+"."+this.type,this.options.selector,e.proxy(this.leave,this)));this.options.selector?this._options=e.extend({},this.options,{trigger:"manual",selector:""}):this.fixTitle()},getOptions:function(t){return t=e.extend({},e.fn[this.type].defaults,this.$element.data(),t),t.delay&&typeof t.delay=="number"&&(t.delay={show:t.delay,hide:t.delay}),t},enter:function(t){var n=e.fn[this.type].defaults,r={},i;this._options&&e.each(this._options,function(e,t){n[e]!=t&&(r[e]=t)},this),i=e(t.currentTarget)[this.type](r).data(this.type);if(!i.options.delay||!i.options.delay.show)return i.show();clearTimeout(this.timeout),i.hoverState="in",this.timeout=setTimeout(function(){i.hoverState=="in"&&i.show()},i.optio
 ns.delay.show)},leave:function(t){var n=e(t.currentTarget)[this.type](this._options).data(this.type);this.timeout&&clearTimeout(this.timeout);if(!n.options.delay||!n.options.delay.hide)return n.hide();n.hoverState="out",this.timeout=setTimeout(function(){n.hoverState=="out"&&n.hide()},n.options.delay.hide)},show:function(){var t,n,r,i,s,o,u=e.Event("show");if(this.hasContent()&&this.enabled){this.$element.trigger(u);if(u.isDefaultPrevented())return;t=this.tip(),this.setContent(),this.options.animation&&t.addClass("fade"),s=typeof this.options.placement=="function"?this.options.placement.call(this,t[0],this.$element[0]):this.options.placement,t.detach().css({top:0,left:0,display:"block"}),this.options.container?t.appendTo(this.options.container):t.insertAfter(this.$element),n=this.getPosition(),r=t[0].offsetWidth,i=t[0].offsetHeight;switch(s){case"bottom":o={top:n.top+n.height,left:n.left+n.width/2-r/2};break;case"top":o={top:n.top-i,left:n.left+n.width/2-r/2};break;case"left":o={top
 :n.top+n.height/2-i/2,left:n.left-r};break;case"right":o={top:n.top+n.height/2-i/2,left:n.left+n.width}}this.applyPlacement(o,s),this.$element.trigger("shown")}},applyPlacement:function(e,t){var n=this.tip(),r=n[0].offsetWidth,i=n[0].offsetHeight,s,o,u,a;n.offset(e).addClass(t).addClass("in"),s=n[0].offsetWidth,o=n[0].offsetHeight,t=="top"&&o!=i&&(e.top=e.top+i-o,a=!0),t=="bottom"||t=="top"?(u=0,e.left<0&&(u=e.left*-2,e.left=0,n.offset(e),s=n[0].offsetWidth,o=n[0].offsetHeight),this.replaceArrow(u-r+s,s,"left")):this.replaceArrow(o-i,o,"top"),a&&n.offset(e)},replaceArrow:function(e,t,n){this.arrow().css(n,e?50*(1-e/t)+"%":"")},setContent:function(){var e=this.tip(),t=this.getTitle();e.find(".tooltip-inner")[this.options.html?"html":"text"](t),e.removeClass("fade in top bottom left right")},hide:function(){function i(){var t=setTimeout(function(){n.off(e.support.transition.end).detach()},500);n.one(e.support.transition.end,function(){clearTimeout(t),n.detach()})}var t=this,n=this.tip
 (),r=e.Event("hide");this.$element.trigger(r);if(r.isDefaultPrevented())return;return n.removeClass("in"),e.support.transition&&this.$tip.hasClass("fade")?i():n.detach(),this.$element.trigger("hidden"),this},fixTitle:function(){var e=this.$element;(e.attr("title")||typeof e.attr("data-original-title")!="string")&&e.attr("data-original-title",e.attr("title")||"").attr("title","")},hasContent:function(){return this.getTitle()},getPosition:function(){var t=this.$element[0];return e.extend({},typeof t.getBoundingClientRect=="function"?t.getBoundingClientRect():{width:t.offsetWidth,height:t.offsetHeight},this.$element.offset())},getTitle:function(){var e,t=this.$element,n=this.options;return e=t.attr("data-original-title")||(typeof n.title=="function"?n.title.call(t[0]):n.title),e},tip:function(){return this.$tip=this.$tip||e(this.options.template)},arrow:function(){return this.$arrow=this.$arrow||this.tip().find(".tooltip-arrow")},validate:function(){this.$element[0].parentNode||(this.h
 ide(),this.$element=null,this.options=null)},enable:function(){this.enabled=!0},disable:function(){this.enabled=!1},toggleEnabled:function(){this.enabled=!this.enabled},toggle:function(t){var n=t?e(t.currentTarget)[this.type](this._options).data(this.type):this;n.tip().hasClass("in")?n.hide():n.show()},destroy:function(){this.hide().$element.off("."+this.type).removeData(this.type)}};var n=e.fn.tooltip;e.fn.tooltip=function(n){return this.each(function(){var r=e(this),i=r.data("tooltip"),s=typeof n=="object"&&n;i||r.data("tooltip",i=new t(this,s)),typeof n=="string"&&i[n]()})},e.fn.tooltip.Constructor=t,e.fn.tooltip.defaults={animation:!0,placement:"top",selector:!1,template:'<div class="tooltip"><div class="tooltip-arrow"></div><div class="tooltip-inner"></div></div>',trigger:"hover focus",title:"",delay:0,html:!1,container:!1},e.fn.tooltip.noConflict=function(){return e.fn.tooltip=n,this}}(window.jQuery),!function(e){"use strict";var t=function(e,t){this.init("popover",e,t)};t.pro
 totype=e.extend({},e.fn.tooltip.Constructor.prototype,{constructor:t,setContent:function(){var e=this.tip(),t=this.getTitle(),n=this.getContent();e.find(".popover-title")[this.options.html?"html":"text"](t),e.find(".popover-content")[this.options.html?"html":"text"](n),e.removeClass("fade top bottom left right in")},hasContent:function(){return this.getTitle()||this.getContent()},getContent:function(){var e,t=this.$element,n=this.options;return e=(typeof n.content=="function"?n.content.call(t[0]):n.content)||t.attr("data-content"),e},tip:function(){return this.$tip||(this.$tip=e(this.options.template)),this.$tip},destroy:function(){this.hide().$element.off("."+this.type).removeData(this.type)}});var n=e.fn.popover;e.fn.popover=function(n){return this.each(function(){var r=e(this),i=r.data("popover"),s=typeof n=="object"&&n;i||r.data("popover",i=new t(this,s)),typeof n=="string"&&i[n]()})},e.fn.popover.Constructor=t,e.fn.popover.defaults=e.extend({},e.fn.tooltip.defaults,{placement:"
 right",trigger:"click",content:"",template:'<div class="popover"><div class="arrow"></div><h3 class="popover-title"></h3><div class="popover-content"></div></div>'}),e.fn.popover.noConflict=function(){return e.fn.popover=n,this}}(window.jQuery),!function(e){"use strict";function t(t,n){var r=e.proxy(this.process,this),i=e(t).is("body")?e(window):e(t),s;this.options=e.extend({},e.fn.scrollspy.defaults,n),this.$scrollElement=i.on("scroll.scroll-spy.data-api",r),this.selector=(this.options.target||(s=e(t).attr("href"))&&s.replace(/.*(?=#[^\s]+$)/,"")||"")+" .nav li > a",this.$body=e("body"),this.refresh(),this.process()}t.prototype={constructor:t,refresh:function(){var t=this,n;this.offsets=e([]),this.targets=e([]),n=this.$body.find(this.selector).map(function(){var n=e(this),r=n.data("target")||n.attr("href"),i=/^#\w/.test(r)&&e(r);return i&&i.length&&[[i.position().top+(!e.isWindow(t.$scrollElement.get(0))&&t.$scrollElement.scrollTop()),r]]||null}).sort(function(e,t){return e[0]-t[0]
 }).each(function(){t.offsets.push(this[0]),t.targets.push(this[1])})},process:function(){var e=this.$scrollElement.scrollTop()+this.options.offset,t=this.$scrollElement[0].scrollHeight||this.$body[0].scrollHeight,n=t-this.$scrollElement.height(),r=this.offsets,i=this.targets,s=this.activeTarget,o;if(e>=n)return s!=(o=i.last()[0])&&this.activate(o);for(o=r.length;o--;)s!=i[o]&&e>=r[o]&&(!r[o+1]||e<=r[o+1])&&this.activate(i[o])},activate:function(t){var n,r;this.activeTarget=t,e(this.selector).parent(".active").removeClass("active"),r=this.selector+'[data-target="'+t+'"],'+this.selector+'[href="'+t+'"]',n=e(r).parent("li").addClass("active"),n.parent(".dropdown-menu").length&&(n=n.closest("li.dropdown").addClass("active")),n.trigger("activate")}};var n=e.fn.scrollspy;e.fn.scrollspy=function(n){return this.each(function(){var r=e(this),i=r.data("scrollspy"),s=typeof n=="object"&&n;i||r.data("scrollspy",i=new t(this,s)),typeof n=="string"&&i[n]()})},e.fn.scrollspy.Constructor=t,e.fn.scr
 ollspy.defaults={offset:10},e.fn.scrollspy.noConflict=function(){return e.fn.scrollspy=n,this},e(window).on("load",function(){e('[data-spy="scroll"]').each(function(){var t=e(this);t.scrollspy(t.data())})})}(window.jQuery),!function(e){"use strict";var t=function(t){this.element=e(t)};t.prototype={constructor:t,show:function(){var t=this.element,n=t.closest("ul:not(.dropdown-menu)"),r=t.attr("data-target"),i,s,o;r||(r=t.attr("href"),r=r&&r.replace(/.*(?=#[^\s]*$)/,""));if(t.parent("li").hasClass("active"))return;i=n.find(".active:last a")[0],o=e.Event("show",{relatedTarget:i}),t.trigger(o);if(o.isDefaultPrevented())return;s=e(r),this.activate(t.parent("li"),n),this.activate(s,s.parent(),function(){t.trigger({type:"shown",relatedTarget:i})})},activate:function(t,n,r){function o(){i.removeClass("active").find("> .dropdown-menu > .active").removeClass("active"),t.addClass("active"),s?(t[0].offsetWidth,t.addClass("in")):t.removeClass("fade"),t.parent(".dropdown-menu")&&t.closest("li.dro
 pdown").addClass("active"),r&&r()}var i=n.find("> .active"),s=r&&e.support.transition&&i.hasClass("fade");s?i.one(e.support.transition.end,o):o(),i.removeClass("in")}};var n=e.fn.tab;e.fn.tab=function(n){return this.each(function(){var r=e(this),i=r.data("tab");i||r.data("tab",i=new t(this)),typeof n=="string"&&i[n]()})},e.fn.tab.Constructor=t,e.fn.tab.noConflict=function(){return e.fn.tab=n,this},e(document).on("click.tab.data-api",'[data-toggle="tab"], [data-toggle="pill"]',function(t){t.preventDefault(),e(this).tab("show")})}(window.jQuery),!function(e){"use strict";var t=function(t,n){this.$element=e(t),this.options=e.extend({},e.fn.typeahead.defaults,n),this.matcher=this.options.matcher||this.matcher,this.sorter=this.options.sorter||this.sorter,this.highlighter=this.options.highlighter||this.highlighter,this.updater=this.options.updater||this.updater,this.source=this.options.source,this.$menu=e(this.options.menu),this.shown=!1,this.listen()};t.prototype={constructor:t,select:fu
 nction(){var e=this.$menu.find(".active").attr("data-value");return this.$element.val(this.updater(e)).change(),this.hide()},updater:function(e){return e},show:function(){var t=e.extend({},this.$element.position(),{height:this.$element[0].offsetHeight});return this.$menu.insertAfter(this.$element).css({top:t.top+t.height,left:t.left}).show(),this.shown=!0,this},hide:function(){return this.$menu.hide(),this.shown=!1,this},lookup:function(t){var n;return this.query=this.$element.val(),!this.query||this.query.length<this.options.minLength?this.shown?this.hide():this:(n=e.isFunction(this.source)?this.source(this.query,e.proxy(this.process,this)):this.source,n?this.process(n):this)},process:function(t){var n=this;return t=e.grep(t,function(e){return n.matcher(e)}),t=this.sorter(t),t.length?this.render(t.slice(0,this.options.items)).show():this.shown?this.hide():this},matcher:function(e){return~e.toLowerCase().indexOf(this.query.toLowerCase())},sorter:function(e){var t=[],n=[],r=[],i;whil
 e(i=e.shift())i.toLowerCase().indexOf(this.query.toLowerCase())?~i.indexOf(this.query)?n.push(i):r.push(i):t.push(i);return t.concat(n,r)},highlighter:function(e){var t=this.query.replace(/[\-\[\]{}()*+?.,\\\^$|#\s]/g,"\\$&");return e.replace(new RegExp("("+t+")","ig"),function(e,t){return"<strong>"+t+"</strong>"})},render:function(t){var n=this;return t=e(t).map(function(t,r){return t=e(n.options.item).attr("data-value",r),t.find("a").html(n.highlighter(r)),t[0]}),t.first().addClass("active"),this.$menu.html(t),this},next:function(t){var n=this.$menu.find(".active").removeClass("active"),r=n.next();r.length||(r=e(this.$menu.find("li")[0])),r.addClass("active")},prev:function(e){var t=this.$menu.find(".active").removeClass("active"),n=t.prev();n.length||(n=this.$menu.find("li").last()),n.addClass("active")},listen:function(){this.$element.on("focus",e.proxy(this.focus,this)).on("blur",e.proxy(this.blur,this)).on("keypress",e.proxy(this.keypress,this)).on("keyup",e.proxy(this.keyup,t
 his)),this.eventSupported("keydown")&&this.$element.on("keydown",e.proxy(this.keydown,this)),this.$menu.on("click",e.proxy(this.click,this)).on("mouseenter","li",e.proxy(this.mouseenter,this)).on("mouseleave","li",e.proxy(this.mouseleave,this))},eventSupported:function(e){var t=e in this.$element;return t||(this.$element.setAttribute(e,"return;"),t=typeof this.$element[e]=="function"),t},move:function(e){if(!this.shown)return;switch(e.keyCode){case 9:case 13:case 27:e.preventDefault();break;case 38:e.preventDefault(),this.prev();break;case 40:e.preventDefault(),this.next()}e.stopPropagation()},keydown:function(t){this.suppressKeyPressRepeat=~e.inArray(t.keyCode,[40,38,9,13,27]),this.move(t)},keypress:function(e){if(this.suppressKeyPressRepeat)return;this.move(e)},keyup:function(e){switch(e.keyCode){case 40:case 38:case 16:case 17:case 18:break;case 9:case 13:if(!this.shown)return;this.select();break;case 27:if(!this.shown)return;this.hide();break;default:this.lookup()}e.stopPropagat
 ion(),e.preventDefault()},focus:function(e){this.focused=!0},blur:function(e){this.focused=!1,!this.mousedover&&this.shown&&this.hide()},click:function(e){e.stopPropagation(),e.preventDefault(),this.select(),this.$element.focus()},mouseenter:function(t){this.mousedover=!0,this.$menu.find(".active").removeClass("active"),e(t.currentTarget).addClass("active")},mouseleave:function(e){this.mousedover=!1,!this.focused&&this.shown&&this.hide()}};var n=e.fn.typeahead;e.fn.typeahead=function(n){return this.each(function(){var r=e(this),i=r.data("typeahead"),s=typeof n=="object"&&n;i||r.data("typeahead",i=new t(this,s)),typeof n=="string"&&i[n]()})},e.fn.typeahead.defaults={source:[],items:8,menu:'<ul class="typeahead dropdown-menu"></ul>',item:'<li><a href="#"></a></li>',minLength:1},e.fn.typeahead.Constructor=t,e.fn.typeahead.noConflict=function(){return e.fn.typeahead=n,this},e(document).on("focus.typeahead.data-api",'[data-provide="typeahead"]',function(t){var n=e(this);if(n.data("typeah
 ead"))return;n.typeahead(n.data())})}(window.jQuery),!function(e){"use strict";var t=function(t,n){this.options=e.extend({},e.fn.affix.defaults,n),this.$window=e(window).on("scroll.affix.data-api",e.proxy(this.checkPosition,this)).on("click.affix.data-api",e.proxy(function(){setTimeout(e.proxy(this.checkPosition,this),1)},this)),this.$element=e(t),this.checkPosition()};t.prototype.checkPosition=function(){if(!this.$element.is(":visible"))return;var t=e(document).height(),n=this.$window.scrollTop(),r=this.$element.offset(),i=this.options.offset,s=i.bottom,o=i.top,u="affix affix-top affix-bottom",a;typeof i!="object"&&(s=o=i),typeof o=="function"&&(o=i.top()),typeof s=="function"&&(s=i.bottom()),a=this.unpin!=null&&n+this.unpin<=r.top?!1:s!=null&&r.top+this.$element.height()>=t-s?"bottom":o!=null&&n<=o?"top":!1;if(this.affixed===a)return;this.affixed=a,this.unpin=a=="bottom"?r.top-n:null,this.$element.removeClass(u).addClass("affix"+(a?"-"+a:""))};var n=e.fn.affix;e.fn.affix=function(
 n){return this.each(function(){var r=e(this),i=r.data("affix"),s=typeof n=="object"&&n;i||r.data("affix",i=new t(this,s)),typeof n=="string"&&i[n]()})},e.fn.affix.Constructor=t,e.fn.affix.defaults={offset:0},e.fn.affix.noConflict=function(){return e.fn.affix=n,this},e(window).on("load",function(){e('[data-spy="affix"]').each(function(){var t=e(this),n=t.data();n.offset=n.offset||{},n.offsetBottom&&(n.offset.bottom=n.offsetBottom),n.offsetTop&&(n.offset.top=n.offsetTop),t.affix(n)})})}(window.jQuery);

http://git-wip-us.apache.org/repos/asf/incubator-samza/blob/5ff71e51/samza-yarn/src/main/scala/org/apache/samza/config/YarnConfig.scala
----------------------------------------------------------------------
diff --git a/samza-yarn/src/main/scala/org/apache/samza/config/YarnConfig.scala b/samza-yarn/src/main/scala/org/apache/samza/config/YarnConfig.scala
new file mode 100644
index 0000000..4779edb
--- /dev/null
+++ b/samza-yarn/src/main/scala/org/apache/samza/config/YarnConfig.scala
@@ -0,0 +1,78 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.samza.config
+
+object YarnConfig {
+  // environment variables
+  val ENV_CONFIG = "STREAMING_CONFIG"
+  val ENV_PARTITION_ID = "PARTITION_ID"
+  val ENV_CONTAINER_NAME = "CONTAINER_NAME"
+
+  // yarn job config
+  val PACKAGE_PATH = "yarn.package.path"
+  val CONTAINER_MAX_MEMORY_MB = "yarn.container.memory.mb"
+  val CONTAINER_MAX_CPU_CORES = "yarn.container.cpu.cores"
+  val CONTAINER_RETRY_COUNT = "yarn.countainer.retry.count"
+  val CONTAINER_RETRY_WINDOW_MS = "yarn.container.retry.window.ms"
+  val TASK_COUNT = "yarn.container.count"
+  val AM_JVM_OPTIONS = "yarn.am.opts"
+  val AM_JMX_ENABLED = "yarn.am.jmx.enabled"
+  val AM_CONTAINER_MAX_MEMORY_MB = "yarn.am.container.memory.mb"
+
+  implicit def Config2Yarn(config: Config) = new YarnConfig(config)
+}
+
+class YarnConfig(config: Config) extends ScalaMapConfig(config) {
+  def getContainerMaxMemoryMb: Option[Int] = getOption(YarnConfig.CONTAINER_MAX_MEMORY_MB) match {
+    case Some(mem) => Some(mem.toInt)
+    case _ => None
+  }
+
+  def getContainerMaxCpuCores: Option[Int] = getOption(YarnConfig.CONTAINER_MAX_CPU_CORES) match {
+    case Some(cpu) => Some(cpu.toInt)
+    case _ => None
+  }
+
+  def getContainerRetryCount: Option[Int] = getOption(YarnConfig.CONTAINER_RETRY_COUNT) match {
+    case Some(count) => Some(count.toInt)
+    case _ => None
+  }
+
+  def getContainerRetryWindowMs: Option[Int] = getOption(YarnConfig.CONTAINER_RETRY_WINDOW_MS) match {
+    case Some(retryWindowMs) => Some(retryWindowMs.toInt)
+    case _ => None
+  }
+
+  def getPackagePath = getOption(YarnConfig.PACKAGE_PATH)
+
+  def getTaskCount: Option[Int] = getOption(YarnConfig.TASK_COUNT) match {
+    case Some(tc) => Some(tc.toInt)
+    case _ => None
+  }
+
+  def getAmOpts = getOption(YarnConfig.AM_JVM_OPTIONS)
+  
+  def getAMContainerMaxMemoryMb: Option[Int] = getOption(YarnConfig.AM_CONTAINER_MAX_MEMORY_MB) match {
+    case Some(mem) => Some(mem.toInt)
+    case _ => None
+  }
+
+  def getJmxServerEnabled = getBoolean(YarnConfig.AM_JMX_ENABLED, true)
+}

http://git-wip-us.apache.org/repos/asf/incubator-samza/blob/5ff71e51/samza-yarn/src/main/scala/org/apache/samza/job/yarn/ClientHelper.scala
----------------------------------------------------------------------
diff --git a/samza-yarn/src/main/scala/org/apache/samza/job/yarn/ClientHelper.scala b/samza-yarn/src/main/scala/org/apache/samza/job/yarn/ClientHelper.scala
new file mode 100644
index 0000000..b2b529e
--- /dev/null
+++ b/samza-yarn/src/main/scala/org/apache/samza/job/yarn/ClientHelper.scala
@@ -0,0 +1,201 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.samza.job.yarn
+
+import java.util.Collections
+
+import scala.collection.JavaConversions._
+import scala.collection.Map
+
+import org.apache.hadoop.conf.Configuration
+import org.apache.hadoop.fs.Path
+import org.apache.hadoop.net.NetUtils
+import org.apache.hadoop.security.UserGroupInformation
+import org.apache.hadoop.yarn.api.protocolrecords.GetAllApplicationsRequest
+import org.apache.hadoop.yarn.api.protocolrecords.GetApplicationReportRequest
+import org.apache.hadoop.yarn.api.protocolrecords.GetNewApplicationRequest
+import org.apache.hadoop.yarn.api.protocolrecords.KillApplicationRequest
+import org.apache.hadoop.yarn.api.protocolrecords.SubmitApplicationRequest
+import org.apache.hadoop.yarn.api.records.ApplicationSubmissionContext
+import org.apache.hadoop.yarn.api.records.ContainerLaunchContext
+import org.apache.hadoop.yarn.api.records.LocalResource
+import org.apache.hadoop.yarn.api.records.Resource
+import org.apache.hadoop.yarn.api.records.ApplicationId
+import org.apache.hadoop.yarn.api.records.ApplicationReport
+import org.apache.hadoop.yarn.api.records.FinalApplicationStatus
+import org.apache.hadoop.yarn.api.records.LocalResourceType
+import org.apache.hadoop.yarn.api.records.LocalResourceVisibility
+import org.apache.hadoop.yarn.api.records.YarnApplicationState
+import org.apache.hadoop.yarn.api.ClientRMProtocol
+import org.apache.hadoop.yarn.conf.YarnConfiguration
+import org.apache.hadoop.yarn.ipc.YarnRPC
+import org.apache.hadoop.yarn.util.ConverterUtils
+import org.apache.hadoop.yarn.util.Records
+
+import org.apache.samza.job.ApplicationStatus
+import org.apache.samza.job.ApplicationStatus.New
+import org.apache.samza.job.ApplicationStatus.Running
+import org.apache.samza.job.ApplicationStatus.SuccessfulFinish
+import org.apache.samza.job.ApplicationStatus.UnsuccessfulFinish
+
+import grizzled.slf4j.Logging
+import org.apache.samza.SamzaException
+
+/**
+ * Client helper class required to submit an application master start script to the resource manager. Also
+ * allows us to forcefully shut-down the application master which in-turn will shut-down the corresponding
+ * container and its processes.
+ */
+class ClientHelper(conf: Configuration) extends Logging {
+  val rpc = YarnRPC.create(conf)
+  val rmAddress = NetUtils.createSocketAddr(conf.get(YarnConfiguration.RM_ADDRESS, YarnConfiguration.DEFAULT_RM_ADDRESS))
+  info("trying to connect to RM %s" format rmAddress)
+  val applicationsManager = rpc.getProxy(classOf[ClientRMProtocol], rmAddress, conf).asInstanceOf[ClientRMProtocol]
+  var appId: Option[ApplicationId] = None
+
+  /**
+   * Generate an application and submit it to the resource manager to start an application master
+   */
+  def submitApplication(packagePath: Path, memoryMb: Int, cpuCore: Int, user: UserGroupInformation, cmds: List[String], env: Option[Map[String, String]], name: Option[String]): Option[ApplicationId] = {
+    val newAppRequest = Records.newRecord(classOf[GetNewApplicationRequest])
+    val newAppResponse = applicationsManager.getNewApplication(newAppRequest)
+    var mem = memoryMb
+    var cpu = cpuCore
+
+    // If we are asking for memory less than the minimum required, bump it
+    if (mem < newAppResponse.getMinimumResourceCapability().getMemory()) {
+      val min = newAppResponse.getMinimumResourceCapability().getMemory()
+      warn("requesting %s megs of memory, which is less than minimum capability of %s, so using minimum" format (mem, min))
+      mem = min
+    }
+
+    // If we are asking for memory more than the max allowed, shout out
+    if (mem > newAppResponse.getMaximumResourceCapability().getMemory()) {
+      throw new SamzaException("You're asking for more memory (%s) than is allowed by YARN: %s" format
+        (mem, newAppResponse.getMaximumResourceCapability().getMemory()))
+    }
+
+    // if we are asking for cpu less than the minimum required, bump it
+    if (cpu < newAppResponse.getMinimumResourceCapability().getVirtualCores()) {
+      val min = newAppResponse.getMinimumResourceCapability.getVirtualCores()
+      warn("requesting %s virtual cores of cpu, which is less than minimum capability of %s, so using minimum" format (cpu, min))
+      cpu = min
+    }
+
+    // If we are asking for cpu more than the max allowed, shout out
+    if (cpu > newAppResponse.getMaximumResourceCapability().getVirtualCores()) {
+      throw new SamzaException("You're asking for more CPU (%s) than is allowed by YARN: %s" format
+        (cpu, newAppResponse.getMaximumResourceCapability().getVirtualCores()))
+    }
+
+    appId = Some(newAppResponse.getApplicationId)
+
+    info("preparing to request resources for app id %s" format appId.get)
+
+    val appCtx = Records.newRecord(classOf[ApplicationSubmissionContext])
+    val containerCtx = Records.newRecord(classOf[ContainerLaunchContext])
+    val resource = Records.newRecord(classOf[Resource])
+    val submitAppRequest = Records.newRecord(classOf[SubmitApplicationRequest])
+    val packageResource = Records.newRecord(classOf[LocalResource])
+
+    name match {
+      case Some(name) => { appCtx.setApplicationName(name) }
+      case None => { appCtx.setApplicationName(appId.toString) }
+    }
+
+    env match {
+      case Some(env) => {
+        containerCtx.setEnvironment(env)
+        info("set environment variables to %s for %s" format (env, appId.get))
+      }
+      case None => None
+    }
+
+    // set the local package so that the containers and app master are provisioned with it
+    val packageUrl = ConverterUtils.getYarnUrlFromPath(packagePath)
+    val fileStatus = packagePath.getFileSystem(conf).getFileStatus(packagePath)
+
+    packageResource.setResource(packageUrl)
+    info("set package url to %s for %s" format (packageUrl, appId.get))
+    packageResource.setSize(fileStatus.getLen)
+    info("set package size to %s for %s" format (fileStatus.getLen, appId.get))
+    packageResource.setTimestamp(fileStatus.getModificationTime)
+    packageResource.setType(LocalResourceType.ARCHIVE)
+    packageResource.setVisibility(LocalResourceVisibility.APPLICATION)
+
+    resource.setMemory(mem)
+    info("set memory request to %s for %s" format (mem, appId.get))
+    resource.setVirtualCores(cpu)
+    info("set cpu core request to %s for %s" format (cpu, appId.get))
+    containerCtx.setResource(resource)
+    containerCtx.setCommands(cmds.toList)
+    info("set command to %s for %s" format (cmds, appId.get))
+    containerCtx.setLocalResources(Collections.singletonMap("__package", packageResource))
+    appCtx.setApplicationId(appId.get)
+    info("set app ID to %s" format (user, appId.get))
+    appCtx.setUser(user.getShortUserName)
+    info("set user to %s for %s" format (user, appId.get))
+    appCtx.setAMContainerSpec(containerCtx)
+    submitAppRequest.setApplicationSubmissionContext(appCtx)
+    info("submitting application request for %s" format appId.get)
+    applicationsManager.submitApplication(submitAppRequest)
+    appId
+  }
+
+  def status(appId: ApplicationId): Option[ApplicationStatus] = {
+    val statusRequest = Records.newRecord(classOf[GetApplicationReportRequest])
+    statusRequest.setApplicationId(appId)
+    val statusResponse = applicationsManager.getApplicationReport(statusRequest)
+    convertState(statusResponse.getApplicationReport)
+  }
+
+  def kill(appId: ApplicationId) {
+    val killRequest = Records.newRecord(classOf[KillApplicationRequest])
+    killRequest.setApplicationId(appId)
+    applicationsManager.forceKillApplication(killRequest)
+  }
+
+  def getApplicationMaster(appId: ApplicationId): Option[ApplicationReport] = {
+    val getAppsReq = Records.newRecord(classOf[GetAllApplicationsRequest])
+    val getAppsRsp = applicationsManager.getAllApplications(getAppsReq)
+
+    getAppsRsp.getApplicationList.filter(appRep => appId.equals(appRep.getApplicationId())).headOption
+  }
+
+  def getApplicationMasters(status: Option[ApplicationStatus]): List[ApplicationReport] = {
+    val getAppsReq = Records.newRecord(classOf[GetAllApplicationsRequest])
+    val getAppsRsp = applicationsManager.getAllApplications(getAppsReq)
+
+    status match {
+      case Some(status) => getAppsRsp.getApplicationList
+        .filter(appRep => status.equals(convertState(appRep).get)).toList
+      case None => getAppsRsp.getApplicationList.toList
+    }
+  }
+
+  private def convertState(appReport: ApplicationReport): Option[ApplicationStatus] = {
+    (appReport.getYarnApplicationState(), appReport.getFinalApplicationStatus()) match {
+      case (YarnApplicationState.FINISHED, FinalApplicationStatus.SUCCEEDED) => Some(SuccessfulFinish)
+      case (YarnApplicationState.KILLED, _) | (YarnApplicationState.FAILED, _) => Some(UnsuccessfulFinish)
+      case (YarnApplicationState.NEW, _) | (YarnApplicationState.SUBMITTED, _) => Some(New)
+      case _ => Some(Running)
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-samza/blob/5ff71e51/samza-yarn/src/main/scala/org/apache/samza/job/yarn/SamzaAppMaster.scala
----------------------------------------------------------------------
diff --git a/samza-yarn/src/main/scala/org/apache/samza/job/yarn/SamzaAppMaster.scala b/samza-yarn/src/main/scala/org/apache/samza/job/yarn/SamzaAppMaster.scala
new file mode 100644
index 0000000..7f830f2
--- /dev/null
+++ b/samza-yarn/src/main/scala/org/apache/samza/job/yarn/SamzaAppMaster.scala
@@ -0,0 +1,86 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.samza.job.yarn
+import org.apache.samza.config.MapConfig
+import org.apache.samza.config.YarnConfig
+import org.apache.samza.config.serializers.JsonConfigSerializer
+import org.apache.hadoop.yarn.api.ApplicationConstants
+import org.apache.hadoop.yarn.conf.YarnConfiguration
+import org.apache.hadoop.yarn.util.ConverterUtils
+import scala.collection.JavaConversions._
+import org.apache.samza.metrics.{ JmxServer, MetricsRegistryMap }
+import grizzled.slf4j.Logging
+import org.apache.hadoop.yarn.client.AMRMClientImpl
+import org.apache.samza.config.YarnConfig._
+import org.apache.samza.job.yarn.SamzaAppMasterTaskManager._
+
+/**
+ * When YARN executes an application master, it needs a bash command to
+ * execute. For Samza, YARN will execute this main method when starting Samza's
+ * YARN application master.
+ *
+ * <br/><br/>
+ *
+ * The main method gets all of the environment variables (passed by Samza's
+ * YARN client, and YARN itself), and wires up everything to run Samza's
+ * application master.
+ */
+object SamzaAppMaster extends Logging {
+  def main(args: Array[String]) {
+    val containerIdStr = System.getenv(ApplicationConstants.AM_CONTAINER_ID_ENV)
+    info("got container id: %s" format containerIdStr)
+    val containerId = ConverterUtils.toContainerId(containerIdStr)
+    val applicationAttemptId = containerId.getApplicationAttemptId
+    info("got app attempt id: %s" format applicationAttemptId)
+    val nodeHostString = System.getenv(ApplicationConstants.NM_HOST_ENV)
+    info("got node manager host: %s" format nodeHostString)
+    val nodePortString = System.getenv(ApplicationConstants.NM_PORT_ENV)
+    info("got node manager port: %s" format nodePortString)
+    val nodeHttpPortString = System.getenv(ApplicationConstants.NM_HTTP_PORT_ENV)
+    info("got node manager http port: %s" format nodeHttpPortString)
+    val config = new MapConfig(JsonConfigSerializer.fromJson(System.getenv(YarnConfig.ENV_CONFIG)))
+    info("got config: %s" format config)
+    val hConfig = new YarnConfiguration
+    hConfig.set("fs.http.impl", "samza.util.hadoop.HttpFileSystem")
+    val amClient = new AMRMClientImpl(applicationAttemptId)
+    val clientHelper = new ClientHelper(hConfig)
+    val registry = new MetricsRegistryMap
+    val containerMem = config.getContainerMaxMemoryMb.getOrElse(DEFAULT_CONTAINER_MEM)
+    val containerCpu = config.getContainerMaxCpuCores.getOrElse(DEFAULT_CPU_CORES)
+    val jmxServer = if (new YarnConfig(config).getJmxServerEnabled) Some(new JmxServer()) else None
+
+    try {
+      // wire up all of the yarn event listeners
+      val state = new SamzaAppMasterState(-1, containerId, nodeHostString, nodePortString.toInt, nodeHttpPortString.toInt)
+      val service = new SamzaAppMasterService(config, state, registry, clientHelper)
+      val lifecycle = new SamzaAppMasterLifecycle(containerMem, containerCpu, state, amClient, hConfig)
+      val metrics = new SamzaAppMasterMetrics(config, state, registry)
+      val am = new SamzaAppMasterTaskManager({ System.currentTimeMillis }, config, state, amClient, hConfig)
+
+      // run the app master
+      new YarnAppMaster(List(state, service, lifecycle, metrics, am), amClient).run
+    } finally {
+      // jmxServer has to be stopped or will prevent process from exiting.
+      if (jmxServer.isDefined) {
+        jmxServer.get.stop
+      }
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-samza/blob/5ff71e51/samza-yarn/src/main/scala/org/apache/samza/job/yarn/SamzaAppMasterLifecycle.scala
----------------------------------------------------------------------
diff --git a/samza-yarn/src/main/scala/org/apache/samza/job/yarn/SamzaAppMasterLifecycle.scala b/samza-yarn/src/main/scala/org/apache/samza/job/yarn/SamzaAppMasterLifecycle.scala
new file mode 100644
index 0000000..95a6f05
--- /dev/null
+++ b/samza-yarn/src/main/scala/org/apache/samza/job/yarn/SamzaAppMasterLifecycle.scala
@@ -0,0 +1,73 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.samza.job.yarn
+import grizzled.slf4j.Logging
+import org.apache.samza.SamzaException
+import org.apache.hadoop.yarn.client.AMRMClient
+import org.apache.hadoop.yarn.conf.YarnConfiguration
+import org.apache.hadoop.yarn.api.records.FinalApplicationStatus
+
+/**
+ * Responsible for managing the lifecycle of the application master. Mostly,
+ * this means registering and unregistering with the RM, and shutting down
+ * when the RM tells us to Reboot.
+ */
+class SamzaAppMasterLifecycle(containerMem: Int, containerCpu: Int, state: SamzaAppMasterState, amClient: AMRMClient, conf: YarnConfiguration) extends YarnAppMasterListener with Logging {
+  var validResourceRequest = true
+  var shutdownMessage: String = null
+
+  override def onInit() {
+    val host = state.nodeHost
+
+    amClient.init(conf);
+    amClient.start
+
+    val response = amClient.registerApplicationMaster(host, state.rpcPort, "%s:%d" format (host, state.trackingPort))
+
+    // validate that the YARN cluster can handle our container resource requirements
+    val maxCapability = response.getMaximumResourceCapability
+    val minCapability = response.getMinimumResourceCapability
+    val maxMem = maxCapability.getMemory
+    val minMem = minCapability.getMemory
+    val maxCpu = maxCapability.getVirtualCores
+    val minCpu = minCapability.getVirtualCores
+
+    info("Got AM register response. The YARN RM supports container requests with max-mem: %s, min-mem: %s, max-cpu: %s, min-cpu: %s" format (maxMem, minMem, maxCpu, minCpu))
+
+    if (containerMem < minMem || containerMem > maxMem || containerCpu < minCpu || containerCpu > maxCpu) {
+      shutdownMessage = "The YARN cluster is unable to run your job due to unsatisfiable resource requirements. You asked for mem: %s, and cpu: %s." format (containerMem, containerCpu)
+      error(shutdownMessage)
+      validResourceRequest = false
+      state.status = FinalApplicationStatus.FAILED
+    }
+  }
+
+  override def onReboot() {
+    throw new SamzaException("Received a reboot signal from the RM, so throwing an exception to reboot the AM.")
+  }
+
+  override def onShutdown() {
+    info("Shutting down.")
+    amClient.unregisterApplicationMaster(state.status, shutdownMessage, null)
+    amClient.stop
+  }
+
+  override def shouldShutdown = !validResourceRequest
+}

http://git-wip-us.apache.org/repos/asf/incubator-samza/blob/5ff71e51/samza-yarn/src/main/scala/org/apache/samza/job/yarn/SamzaAppMasterMetrics.scala
----------------------------------------------------------------------
diff --git a/samza-yarn/src/main/scala/org/apache/samza/job/yarn/SamzaAppMasterMetrics.scala b/samza-yarn/src/main/scala/org/apache/samza/job/yarn/SamzaAppMasterMetrics.scala
new file mode 100644
index 0000000..4f6edfb
--- /dev/null
+++ b/samza-yarn/src/main/scala/org/apache/samza/job/yarn/SamzaAppMasterMetrics.scala
@@ -0,0 +1,90 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.samza.job.yarn
+import org.apache.samza.metrics.MetricsRegistryMap
+import org.apache.samza.metrics.JvmMetrics
+import org.apache.samza.config.Config
+import org.apache.samza.task.TaskContext
+import org.apache.samza.Partition
+import org.apache.samza.metrics.MetricsRegistry
+import org.apache.samza.config.StreamConfig.Config2Stream
+import org.apache.samza.config.MetricsConfig.Config2Metrics
+import org.apache.samza.metrics.MetricsReporterFactory
+import org.apache.samza.util.Util
+import org.apache.samza.metrics.ReadableMetricsRegistry
+import grizzled.slf4j.Logging
+import org.apache.samza.SamzaException
+import java.util.Timer
+import java.util.TimerTask
+import org.apache.samza.task.ReadableCollector
+
+object SamzaAppMasterMetrics {
+  val metricsGroup = "samza.yarn.am"
+
+  val sourceName = "ApplicationMaster"
+}
+
+/**
+ * Responsible for wiring up Samza's metrics. Given that Samza has a metric
+ * registry, we might as well use it. This class takes Samza's application
+ * master state, and converts it to metrics.
+ */
+class SamzaAppMasterMetrics(config: Config, state: SamzaAppMasterState, registry: ReadableMetricsRegistry) extends YarnAppMasterListener with Logging {
+  import SamzaAppMasterMetrics._
+
+  val jvm = new JvmMetrics(registry)
+  val mEventLoops = registry.newCounter(metricsGroup, "EventLoops")
+  val reporters = config.getMetricReporterNames.map(reporterName => {
+    val metricsFactoryClassName = config
+      .getMetricsFactoryClass(reporterName)
+      .getOrElse(throw new SamzaException("Metrics reporter %s missing .class config" format reporterName))
+
+    val reporter =
+      Util
+        .getObj[MetricsReporterFactory](metricsFactoryClassName)
+        .getMetricsReporter(reporterName, SamzaAppMasterMetrics.sourceName, config)
+
+    reporter.register(SamzaAppMasterMetrics.sourceName, registry)
+    (reporterName, reporter)
+  }).toMap
+
+  override def onInit() {
+    val mRunningContainers = registry.newGauge(metricsGroup, "RunningContainers", { state.runningTasks.size })
+    val mNeededContainers = registry.newGauge(metricsGroup, "NeededContainers", { state.neededContainers })
+    val mCompletedContainers = registry.newGauge(metricsGroup, "CompletedContainers", { state.completedTasks })
+    val mFailedContainers = registry.newGauge(metricsGroup, "FailedContainers", { state.failedContainers })
+    val mReleasedContainers = registry.newGauge(metricsGroup, "ReleasedContainers", { state.releasedContainers })
+    val mTasks = registry.newGauge(metricsGroup, "TaskCount", { state.taskCount })
+    val mHost = registry.newGauge(metricsGroup, "HttpHost", { state.nodeHost })
+    val mTrackingPort = registry.newGauge(metricsGroup, "HttpPort", { state.trackingPort })
+    val mRpcPort = registry.newGauge(metricsGroup, "RpcPort", { state.rpcPort })
+    val mAppAttemptId = registry.newGauge(metricsGroup, "AppAttemptId", { state.appAttemptId.toString })
+
+    reporters.values.foreach(_.start)
+  }
+
+  override def onEventLoop() {
+    mEventLoops.inc
+  }
+
+  override def onShutdown() {
+    reporters.values.foreach(_.stop)
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-samza/blob/5ff71e51/samza-yarn/src/main/scala/org/apache/samza/job/yarn/SamzaAppMasterService.scala
----------------------------------------------------------------------
diff --git a/samza-yarn/src/main/scala/org/apache/samza/job/yarn/SamzaAppMasterService.scala b/samza-yarn/src/main/scala/org/apache/samza/job/yarn/SamzaAppMasterService.scala
new file mode 100644
index 0000000..ce3fcc3
--- /dev/null
+++ b/samza-yarn/src/main/scala/org/apache/samza/job/yarn/SamzaAppMasterService.scala
@@ -0,0 +1,66 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.samza.job.yarn
+
+import org.apache.samza.util.Util
+import grizzled.slf4j.Logging
+import org.apache.samza.webapp._
+import org.apache.samza.config.Config
+import org.apache.samza.metrics.ReadableMetricsRegistry
+import org.apache.samza.SamzaException
+
+/**
+ * Samza's application master runs a very basic HTTP/JSON service to allow
+ * dashboards to check on the status of a job. SamzaAppMasterService starts
+ * up the web service when initialized.
+ */
+class SamzaAppMasterService(config: Config, state: SamzaAppMasterState, registry: ReadableMetricsRegistry, clientHelper: ClientHelper) extends YarnAppMasterListener with Logging {
+  override def onInit() {
+    // try starting the samza AM dashboard. try ten times, just in case we 
+    // pick a port that's already in use.
+    for (i <- 0 until 10) {
+      val rpcPort = Util.randomBetween(10000, 50000)
+      val trackingPort = Util.randomBetween(10000, 50000)
+      info("Starting webapp at rpc %d, tracking port %d" format (rpcPort, trackingPort))
+
+      try {
+        val rpcapp = new WebAppServer("/", rpcPort)
+        rpcapp.addServlet("/*", new ApplicationMasterRestServlet(config, state, registry))
+        rpcapp.start
+
+        val webapp = new WebAppServer("/", trackingPort)
+        webapp.addServlet("/*", new ApplicationMasterWebServlet(config, state))
+        webapp.start
+
+        state.rpcPort = rpcPort
+        state.trackingPort = trackingPort
+        return
+      } catch {
+        case e: Exception => {
+          warn("Unable to start webapp on rpc port %d, tracking port %d .. retrying" format (rpcPort, trackingPort))
+        }
+      }
+    }
+
+    if (state.rpcPort == 0 || state.trackingPort == 0) {
+      throw new SamzaException("Giving up trying to start the webapp, since we keep getting ports that are already in use")
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-samza/blob/5ff71e51/samza-yarn/src/main/scala/org/apache/samza/job/yarn/SamzaAppMasterState.scala
----------------------------------------------------------------------
diff --git a/samza-yarn/src/main/scala/org/apache/samza/job/yarn/SamzaAppMasterState.scala b/samza-yarn/src/main/scala/org/apache/samza/job/yarn/SamzaAppMasterState.scala
new file mode 100644
index 0000000..fa1642b
--- /dev/null
+++ b/samza-yarn/src/main/scala/org/apache/samza/job/yarn/SamzaAppMasterState.scala
@@ -0,0 +1,54 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.samza.job.yarn
+import org.apache.hadoop.yarn.api.records.ContainerStatus
+import org.apache.hadoop.yarn.api.records.Container
+import org.apache.samza.config.Config
+import grizzled.slf4j.Logging
+import org.apache.hadoop.yarn.api.records.FinalApplicationStatus
+import org.apache.samza.Partition
+import org.apache.hadoop.yarn.util.ConverterUtils
+import org.apache.hadoop.yarn.api.records.ContainerId
+
+/**
+ * Samza's application master has state that is usually manipulated based on
+ * responses from YARN's resource manager (via SamzaAppMasterTaskManager). This
+ * class holds the current state of the application master.
+ */
+class SamzaAppMasterState(val taskId: Int, val containerId: ContainerId, val nodeHost: String, val nodePort: Int, val nodeHttpPort: Int) extends YarnAppMasterListener with Logging {
+  // controlled by the AM
+  var completedTasks = 0
+  var neededContainers = 0
+  var failedContainers = 0
+  var releasedContainers = 0
+  var taskCount = 0
+  var unclaimedTasks = Set[Int]()
+  var finishedTasks = Set[Int]()
+  var runningTasks = Map[Int, Container]()
+  var taskPartitions = Map[Int, Set[Partition]]()
+  var status = FinalApplicationStatus.UNDEFINED
+
+  // controlled by the service
+  var trackingPort = 0
+  var rpcPort = 0
+
+  // controlled on startup
+  var appAttemptId = containerId.getApplicationAttemptId
+}

http://git-wip-us.apache.org/repos/asf/incubator-samza/blob/5ff71e51/samza-yarn/src/main/scala/org/apache/samza/job/yarn/SamzaAppMasterTaskManager.scala
----------------------------------------------------------------------
diff --git a/samza-yarn/src/main/scala/org/apache/samza/job/yarn/SamzaAppMasterTaskManager.scala b/samza-yarn/src/main/scala/org/apache/samza/job/yarn/SamzaAppMasterTaskManager.scala
new file mode 100644
index 0000000..1a13ee5
--- /dev/null
+++ b/samza-yarn/src/main/scala/org/apache/samza/job/yarn/SamzaAppMasterTaskManager.scala
@@ -0,0 +1,334 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.samza.job.yarn
+import org.apache.hadoop.yarn.api.records.ContainerStatus
+import org.apache.hadoop.yarn.api.records.Container
+import org.apache.samza.config.Config
+import org.apache.samza.Partition
+import grizzled.slf4j.Logging
+import org.apache.samza.config.YarnConfig.Config2Yarn
+import org.apache.samza.config.YarnConfig
+import org.apache.samza.job.CommandBuilder
+import org.apache.hadoop.yarn.api.ApplicationConstants
+import org.apache.hadoop.security.UserGroupInformation
+import org.apache.hadoop.fs.Path
+import org.apache.samza.task.TaskContext
+import org.apache.samza.config.SystemConfig.Config2System
+import org.apache.samza.config.StreamConfig.Config2Stream
+import org.apache.samza.config.TaskConfig.Config2Task
+import scala.collection.JavaConversions._
+import org.apache.hadoop.yarn.api.records.FinalApplicationStatus
+import org.apache.samza.util.Util
+import scala.collection.JavaConversions._
+import org.apache.samza.SamzaException
+import org.apache.hadoop.yarn.client.AMRMClient
+import org.apache.hadoop.yarn.client.AMRMClient.ContainerRequest
+import org.apache.hadoop.yarn.api.records.Priority
+import org.apache.hadoop.yarn.api.records.Resource
+import org.apache.hadoop.yarn.util.Records
+import org.apache.hadoop.security.token.Token
+import org.apache.hadoop.yarn.api.records.LocalResource
+import org.apache.hadoop.yarn.util.ConverterUtils
+import org.apache.hadoop.yarn.api.records.ContainerLaunchContext
+import org.apache.hadoop.yarn.api.protocolrecords.StartContainerRequest
+import org.apache.hadoop.yarn.conf.YarnConfiguration
+import org.apache.hadoop.yarn.api.ContainerManager
+import org.apache.hadoop.yarn.api.records.LocalResourceType
+import org.apache.hadoop.yarn.api.records.LocalResourceVisibility
+import org.apache.hadoop.yarn.ipc.YarnRPC
+import org.apache.hadoop.yarn.security.ContainerTokenIdentifier
+import org.apache.hadoop.io.Text
+import org.apache.hadoop.net.NetUtils
+import java.util.Collections
+import java.security.PrivilegedAction
+import org.apache.samza.job.ShellCommandBuilder
+
+object SamzaAppMasterTaskManager {
+  val DEFAULT_CONTAINER_MEM = 256
+  val DEFAULT_CPU_CORES = 1
+  val DEFAULT_CONTAINER_RETRY_COUNT = 8
+  val DEFAULT_CONTAINER_RETRY_WINDOW_MS = 300000
+
+  def getPartitionsForTask(taskId: Int, taskCount: Int, partitions: Set[Partition]) = {
+    partitions.filter(_.getPartitionId % taskCount == taskId).toSet
+  }
+}
+
+case class TaskFailure(val count: Int, val lastFailure: Long)
+
+/**
+ * Samza's application master is mostly interested in requesting containers to
+ * run Samza jobs. SamzaAppMasterTaskManager is responsible for requesting new
+ * containers, handling failures, and notifying the application master that the
+ * job is done.
+ */
+class SamzaAppMasterTaskManager(clock: () => Long, config: Config, state: SamzaAppMasterState, amClient: AMRMClient, conf: YarnConfiguration) extends YarnAppMasterListener with Logging {
+  import SamzaAppMasterTaskManager._
+
+  state.taskCount = config.getTaskCount match {
+    case Some(count) => count
+    case None =>
+      info("No %s specified. Defaulting to one container." format YarnConfig.TASK_COUNT)
+      1
+  }
+
+  val partitions = Util.getMaxInputStreamPartitions(config)
+  var taskFailures = Map[Int, TaskFailure]()
+  var tooManyFailedContainers = false
+
+  override def shouldShutdown = state.completedTasks == state.taskCount || tooManyFailedContainers
+
+  override def onInit() {
+    state.neededContainers = state.taskCount
+    state.unclaimedTasks = (0 until state.taskCount).toSet
+
+    info("Requesting %s containers" format state.taskCount)
+
+    requestContainers(config.getContainerMaxMemoryMb.getOrElse(DEFAULT_CONTAINER_MEM), config.getContainerMaxCpuCores.getOrElse(DEFAULT_CPU_CORES), state.neededContainers)
+  }
+
+  override def onContainerAllocated(container: Container) {
+    val containerIdStr = ConverterUtils.toString(container.getId)
+
+    info("Got a container from YARN ResourceManager: %s" format container)
+
+    state.unclaimedTasks.headOption match {
+      case Some(taskId) => {
+        info("Got available task id (%d) for container: %s" format (taskId, container))
+        val partitionsForTask = getPartitionsForTask(taskId, state.taskCount, partitions)
+        info("Claimed partitions %s for task ID %s" format (partitionsForTask, taskId))
+        val cmdBuilderClassName = config.getCommandClass.getOrElse(classOf[ShellCommandBuilder].getName)
+        val cmdBuilder = Class.forName(cmdBuilderClassName).newInstance.asInstanceOf[CommandBuilder]
+          .setConfig(config)
+          .setName("SamzaContainer-%s" format taskId)
+          .setPartitions(partitionsForTask)
+          .setTotalPartitions(partitions.size)
+        val command = cmdBuilder.buildCommand
+        info("Task ID %s using command %s" format (taskId, command))
+        val env = cmdBuilder.buildEnvironment.map { case (k, v) => (k, Util.envVarEscape(v)) }
+        info("Task ID %s using env %s" format (taskId, env))
+        val user = UserGroupInformation.getCurrentUser
+        info("Task ID %s using user %s" format (taskId, user))
+        val path = new Path(config.getPackagePath.get)
+        info("Starting task ID %s using package path %s" format (taskId, path))
+
+        startContainer(
+          path,
+          container,
+          user,
+          env.toMap,
+          "export SAMZA_LOG_DIR=%s && ln -sfn %s logs && exec ./__package/%s 1>logs/%s 2>logs/%s" format (ApplicationConstants.LOG_DIR_EXPANSION_VAR, ApplicationConstants.LOG_DIR_EXPANSION_VAR, command, ApplicationConstants.STDOUT, ApplicationConstants.STDERR))
+
+        state.neededContainers -= 1
+        state.runningTasks += taskId -> container
+        state.unclaimedTasks -= taskId
+        state.taskPartitions += taskId -> partitionsForTask
+
+        info("Claimed task ID %s for container %s on node %s (http://%s/node/containerlogs/%s)." format (taskId, containerIdStr, container.getNodeId.getHost, container.getNodeHttpAddress, containerIdStr))
+
+        info("Started task ID %s" format taskId)
+      }
+      case _ => {
+        // there are no more tasks to run, so release the container
+        info("Got an extra container from YARN ResourceManager: %s" format (container))
+
+        amClient.releaseAssignedContainer(container.getId)
+      }
+    }
+  }
+
+  override def onContainerCompleted(containerStatus: ContainerStatus) {
+    val containerIdStr = ConverterUtils.toString(containerStatus.getContainerId)
+    val taskId = state.runningTasks.filter { case (_, container) => container.getId().equals(containerStatus.getContainerId()) }.keys.headOption
+
+    taskId match {
+      case Some(taskId) => {
+        state.runningTasks -= taskId
+        state.taskPartitions -= taskId
+      }
+      case _ => None
+    }
+
+    containerStatus.getExitStatus match {
+      case 0 => {
+        info("Container %s completed successfully." format containerIdStr)
+
+        state.completedTasks += 1
+
+        if (taskId.isDefined) {
+          state.finishedTasks += taskId.get
+          taskFailures -= taskId.get
+        }
+
+        if (state.completedTasks == state.taskCount) {
+          info("Setting job status to SUCCEEDED, since all tasks have been marked as completed.")
+          state.status = FinalApplicationStatus.SUCCEEDED
+        }
+      }
+      case -100 => {
+        info("Got an exit code of -100. This means that container %s was "
+          + "killed by YARN, either due to being released by the application "
+          + "master or being 'lost' due to node failures etc." format containerIdStr)
+
+        state.releasedContainers += 1
+
+        // If this container was assigned some partitions (a taskId), then 
+        // clean up, and request a new container for the tasks. This only 
+        // should happen if the container was 'lost' due to node failure, not 
+        // if the AM released the container.
+        if (taskId.isDefined) {
+          info("Released container %s was assigned task ID %s. Requesting a new container for the task." format (containerIdStr, taskId.get))
+
+          state.neededContainers += 1
+          state.unclaimedTasks += taskId.get
+
+          // request a new container
+          requestContainers(config.getContainerMaxMemoryMb.getOrElse(DEFAULT_CONTAINER_MEM), config.getContainerMaxCpuCores.getOrElse(DEFAULT_CPU_CORES), 1)
+        }
+      }
+      case _ => {
+        info("Container %s failed with exit code %d - %s." format (containerIdStr, containerStatus.getExitStatus, containerStatus.getDiagnostics))
+
+        state.failedContainers += 1
+
+        taskId match {
+          case Some(taskId) =>
+            info("Failed container %s owned task id %s." format (containerIdStr, taskId))
+
+            state.unclaimedTasks += taskId
+            state.neededContainers += 1
+
+            // A container failed for an unknown reason. Let's check to see if 
+            // we need to shutdown the whole app master if too many container 
+            // failures have happened. The rules for failing are that the failure 
+            // count for a task id must be > the configured retry count, and the 
+            // last failure (the one prior to this one) must have happened less 
+            // than retry window ms ago. If retry count is set to 0, the app 
+            // master will fail on any container failure. If the retry count is 
+            // set to a number < 0, a container failure will never trigger an 
+            // app master failure.
+            val retryCount = config.getContainerRetryCount.getOrElse(DEFAULT_CONTAINER_RETRY_COUNT)
+            val retryWindowMs = config.getContainerRetryWindowMs.getOrElse(DEFAULT_CONTAINER_RETRY_WINDOW_MS)
+
+            if (retryCount == 0) {
+              error("Task id %s (%s) failed, and retry count is set to 0, so shutting down the application master, and marking the job as failed."
+                format (taskId, containerIdStr))
+
+              tooManyFailedContainers = true
+            } else if (retryCount > 0) {
+              val (currentFailCount, lastFailureTime) = taskFailures.get(taskId) match {
+                case Some(TaskFailure(count, lastFailure)) => (count + 1, lastFailure)
+                case _ => (1, 0L)
+              }
+
+              if (currentFailCount > retryCount) {
+                val lastFailureMsDiff = clock() - lastFailureTime
+
+                if (lastFailureMsDiff < retryWindowMs) {
+                  error("Task id %s (%s) has failed %s times, with last failure %sms ago. This is greater than retry count of %s and window of %s, so shutting down the application master, and marking the job as failed."
+                    format (taskId, containerIdStr, currentFailCount, lastFailureMsDiff, retryCount, retryWindowMs))
+
+                  // We have too many failures, and we're within the window 
+                  // boundary, so reset shut down the app master.
+                  tooManyFailedContainers = true
+                  state.status = FinalApplicationStatus.FAILED
+                } else {
+                  info("Resetting fail count for task id %s back to 1, since last container failure (%s) for this task id was outside the bounds of the retry window."
+                    format (taskId, containerIdStr))
+
+                  // Reset counter back to 1, since the last failure for this 
+                  // container happened outside the window boundary.
+                  taskFailures += taskId -> TaskFailure(1, clock())
+                }
+              } else {
+                info("Current fail count for task id %s is %s." format (taskId, currentFailCount))
+                taskFailures += taskId -> TaskFailure(currentFailCount, clock())
+              }
+            }
+
+            if (!tooManyFailedContainers) {
+              // Request a new container
+              requestContainers(config.getContainerMaxMemoryMb.getOrElse(DEFAULT_CONTAINER_MEM), config.getContainerMaxCpuCores.getOrElse(DEFAULT_CPU_CORES), 1)
+            }
+          case _ => None
+        }
+      }
+    }
+  }
+
+  protected def startContainer(packagePath: Path, container: Container, ugi: UserGroupInformation, env: Map[String, String], cmds: String*) {
+    info("starting container %s %s %s %s %s" format (packagePath, container, ugi, env, cmds))
+    // connect to container manager (based on similar code in the ContainerLauncher in Hadoop MapReduce)
+    val contToken = container.getContainerToken
+    val address = container.getNodeId.getHost + ":" + container.getNodeId.getPort
+    var user = ugi
+
+    if (UserGroupInformation.isSecurityEnabled) {
+      debug("security is enabled")
+      val hadoopToken = new Token[ContainerTokenIdentifier](contToken.getIdentifier.array, contToken.getPassword.array, new Text(contToken.getKind), new Text(contToken.getService))
+      user = UserGroupInformation.createRemoteUser(address)
+      user.addToken(hadoopToken)
+      debug("changed user to %s" format user)
+    }
+
+    val containerManager = user.doAs(new PrivilegedAction[ContainerManager] {
+      def run(): ContainerManager = {
+        return YarnRPC.create(conf).getProxy(classOf[ContainerManager], NetUtils.createSocketAddr(address), conf).asInstanceOf[ContainerManager]
+      }
+    })
+
+    // set the local package so that the containers and app master are provisioned with it
+    val packageResource = Records.newRecord(classOf[LocalResource])
+    val packageUrl = ConverterUtils.getYarnUrlFromPath(packagePath)
+    val fileStatus = packagePath.getFileSystem(conf).getFileStatus(packagePath)
+
+    packageResource.setResource(packageUrl)
+    packageResource.setSize(fileStatus.getLen)
+    packageResource.setTimestamp(fileStatus.getModificationTime)
+    packageResource.setType(LocalResourceType.ARCHIVE)
+    packageResource.setVisibility(LocalResourceVisibility.APPLICATION)
+
+    // start the container
+    val ctx = Records.newRecord(classOf[ContainerLaunchContext])
+    ctx.setEnvironment(env)
+    ctx.setContainerId(container.getId())
+    ctx.setResource(container.getResource())
+    ctx.setUser(user.getShortUserName())
+    ctx.setCommands(cmds.toList)
+    ctx.setLocalResources(Collections.singletonMap("__package", packageResource))
+
+    debug("setting package to %s" format packageResource)
+    debug("setting context to %s" format ctx)
+
+    val startContainerRequest = Records.newRecord(classOf[StartContainerRequest])
+    startContainerRequest.setContainerLaunchContext(ctx)
+    containerManager.startContainer(startContainerRequest)
+  }
+
+  protected def requestContainers(memMb: Int, cpuCores: Int, containers: Int) {
+    info("Requesting %d container(s) with %dmb of memory" format (containers, memMb))
+    val capability = Records.newRecord(classOf[Resource])
+    val priority = Records.newRecord(classOf[Priority])
+    priority.setPriority(0)
+    capability.setMemory(memMb)
+    capability.setVirtualCores(cpuCores)
+    amClient.addContainerRequest(new ContainerRequest(capability, null, null, priority, containers))
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-samza/blob/5ff71e51/samza-yarn/src/main/scala/org/apache/samza/job/yarn/YarnAppMaster.scala
----------------------------------------------------------------------
diff --git a/samza-yarn/src/main/scala/org/apache/samza/job/yarn/YarnAppMaster.scala b/samza-yarn/src/main/scala/org/apache/samza/job/yarn/YarnAppMaster.scala
new file mode 100644
index 0000000..14e3865
--- /dev/null
+++ b/samza-yarn/src/main/scala/org/apache/samza/job/yarn/YarnAppMaster.scala
@@ -0,0 +1,76 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.samza.job.yarn
+
+import scala.collection.JavaConversions._
+import grizzled.slf4j.Logging
+import org.apache.hadoop.yarn.client.AMRMClient
+
+/**
+ * YARN's API is somewhat clunky. Most implementations just sit in a loop, and
+ * poll the resource manager every N seconds (see the distributed shell
+ * example). To make life slightly better, Samza separates the polling logic
+ * from the application master logic, and we convert synchronous polling calls
+ * to callbacks, which are more intuitive when dealing with event based
+ * paradigms like YARN.
+ *
+ * <br/><br/>
+ *
+ * SamzaAppMaster uses this class to wire up all of Samza's application master
+ * listeners.
+ */
+class YarnAppMaster(pollIntervalMs: Long, listeners: List[YarnAppMasterListener], amClient: AMRMClient) extends Logging {
+  var isShutdown = false
+
+  def this(listeners: List[YarnAppMasterListener], amClient: AMRMClient) = this(1000, listeners, amClient)
+
+  def run {
+    try {
+      listeners.foreach(_.onInit)
+
+      while (!isShutdown && !listeners.map(_.shouldShutdown).reduceLeft(_ || _)) {
+        val response = amClient.allocate(0).getAMResponse
+
+        if (response.getReboot) {
+          listeners.foreach(_.onReboot)
+        }
+
+        listeners.foreach(_.onEventLoop)
+        response.getCompletedContainersStatuses.foreach(containerStatus => listeners.foreach(_.onContainerCompleted(containerStatus)))
+        response.getAllocatedContainers.foreach(container => listeners.foreach(_.onContainerAllocated(container)))
+
+        try {
+          Thread.sleep(pollIntervalMs)
+        } catch {
+          case e: InterruptedException => {
+            isShutdown = true
+            info("got interrupt in app master thread, so shutting down")
+          }
+        }
+      }
+    } finally {
+      listeners.foreach(listener => try {
+        listener.onShutdown
+      } catch {
+        case e: Throwable => warn("Listener %s failed to shutdown." format listener, e)
+      })
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-samza/blob/5ff71e51/samza-yarn/src/main/scala/org/apache/samza/job/yarn/YarnAppMasterListener.scala
----------------------------------------------------------------------
diff --git a/samza-yarn/src/main/scala/org/apache/samza/job/yarn/YarnAppMasterListener.scala b/samza-yarn/src/main/scala/org/apache/samza/job/yarn/YarnAppMasterListener.scala
new file mode 100644
index 0000000..1353e86
--- /dev/null
+++ b/samza-yarn/src/main/scala/org/apache/samza/job/yarn/YarnAppMasterListener.scala
@@ -0,0 +1,73 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.samza.job.yarn
+import org.apache.hadoop.yarn.api.records.Container
+import org.apache.hadoop.yarn.api.records.ContainerStatus
+
+/**
+ * Classes that wish to listen to callback events from YarnAppMaster must
+ * implement this trait.
+ */
+trait YarnAppMasterListener {
+  /**
+   * If true, YarnAppMaster will cease to poll the RM, and call onShutdown for
+   * all listeners.
+   */
+  def shouldShutdown: Boolean = false
+
+  /**
+   * Invoked by YarnAppMaster once per listener, before entering the RM polling
+   * event loop.
+   */
+  def onInit() {}
+
+  /**
+   * Invoked whenever the RM responds with a reboot request. Usually, reboots
+   * are triggered by the YARN RM when its state gets out of sync with the
+   * application master (usually the result of restarting the RM).
+   * YarnAppMaster does not actually restart anything. It is up to one or more
+   * listeners to trigger a failure, or shutdown.
+   */
+  def onReboot() {}
+
+  /**
+   * Signifies that the YarnAppMaster has exited the RM polling event loop, and
+   * is about to exit.
+   */
+  def onShutdown() {}
+
+  /**
+   * Whenever the RM allocates a container for the application master, this
+   * callback is invoked (once per container).
+   */
+  def onContainerAllocated(container: Container) {}
+
+  /**
+   * Whenever a container completes (either failure, or success), this callback
+   * will be invoked.
+   */
+  def onContainerCompleted(containerStatus: ContainerStatus) {}
+
+  /**
+   * Invoked by YarnAppMaster once per listener, every time it loops around to
+   * poll the RM again.
+   */
+  def onEventLoop() {}
+}

http://git-wip-us.apache.org/repos/asf/incubator-samza/blob/5ff71e51/samza-yarn/src/main/scala/org/apache/samza/job/yarn/YarnJob.scala
----------------------------------------------------------------------
diff --git a/samza-yarn/src/main/scala/org/apache/samza/job/yarn/YarnJob.scala b/samza-yarn/src/main/scala/org/apache/samza/job/yarn/YarnJob.scala
new file mode 100644
index 0000000..bde38e1
--- /dev/null
+++ b/samza-yarn/src/main/scala/org/apache/samza/job/yarn/YarnJob.scala
@@ -0,0 +1,108 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.samza.job.yarn
+import org.apache.hadoop.conf.Configuration
+import org.apache.hadoop.fs.Path
+import org.apache.hadoop.security.UserGroupInformation
+import org.apache.hadoop.yarn.api.records.ApplicationId
+import org.apache.hadoop.yarn.api.ApplicationConstants
+import org.apache.samza.config.Config
+import org.apache.samza.util.Util
+import org.apache.samza.job.ApplicationStatus
+import org.apache.samza.job.ApplicationStatus.Running
+import org.apache.samza.job.StreamJob
+import org.apache.samza.job.ApplicationStatus.SuccessfulFinish
+import org.apache.samza.job.ApplicationStatus.UnsuccessfulFinish
+import org.apache.samza.config.serializers.JsonConfigSerializer
+import org.apache.samza.config.YarnConfig.Config2Yarn
+import org.apache.samza.config.JobConfig.Config2Job
+import org.apache.samza.config.YarnConfig
+import org.apache.samza.config.ShellCommandConfig
+import org.apache.samza.SamzaException
+
+/**
+ * Starts the application manager
+ */
+class YarnJob(config: Config, hadoopConfig: Configuration) extends StreamJob {
+  val client = new ClientHelper(hadoopConfig)
+  var appId: Option[ApplicationId] = None
+
+  def submit: YarnJob = {
+    appId = client.submitApplication(
+      new Path(config.getPackagePath.getOrElse(throw new SamzaException("No YARN package path defined in config."))),
+      config.getAMContainerMaxMemoryMb.getOrElse(512),
+      1,
+      UserGroupInformation.getCurrentUser,
+      List(
+        "export SAMZA_LOG_DIR=%s && ln -sfn %s logs && exec ./__package/bin/run-am.sh 1>logs/%s 2>logs/%s"
+          format (ApplicationConstants.LOG_DIR_EXPANSION_VAR, ApplicationConstants.LOG_DIR_EXPANSION_VAR, ApplicationConstants.STDOUT, ApplicationConstants.STDERR)),
+      Some(Map(
+        YarnConfig.ENV_CONFIG -> Util.envVarEscape(JsonConfigSerializer.toJson(config)),
+        ShellCommandConfig.ENV_SAMZA_OPTS -> Util.envVarEscape(config.getAmOpts.getOrElse("")))),
+      Some("%s_%s" format (config.getName.get, config.getJobId.getOrElse(1))))
+
+    this
+  }
+
+  def waitForFinish(timeoutMs: Long): ApplicationStatus = {
+    val startTimeMs = System.currentTimeMillis()
+
+    while (System.currentTimeMillis() - startTimeMs < timeoutMs) {
+      Option(getStatus) match {
+        case Some(s) => if (SuccessfulFinish.equals(s) || UnsuccessfulFinish.equals(s)) return s
+        case None => null
+      }
+
+      Thread.sleep(1000)
+    }
+
+    Running
+  }
+
+  def waitForStatus(status: ApplicationStatus, timeoutMs: Long): ApplicationStatus = {
+    val startTimeMs = System.currentTimeMillis()
+
+    while (System.currentTimeMillis() - startTimeMs < timeoutMs) {
+      Option(getStatus) match {
+        case Some(s) => if (status.equals(s)) return status
+        case None => null
+      }
+
+      Thread.sleep(1000)
+    }
+
+    Running
+  }
+
+  def getStatus: ApplicationStatus = {
+    appId match {
+      case Some(appId) => client.status(appId).getOrElse(null)
+      case None => null
+    }
+  }
+
+  def kill: YarnJob = {
+    appId match {
+      case Some(appId) => client.kill(appId)
+      case None => None
+    }
+    this
+  }
+}