You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@slider.apache.org by sm...@apache.org on 2014/05/20 05:09:32 UTC
svn commit: r1596105 [1/2] - in /incubator/slider/trunk:
app-packages/hbase-v0_96/ app-packages/storm-v0_91/
slider-agent/src/main/python/agent/ slider-agent/src/test/python/agent/
slider-core/src/main/java/org/apache/slider/providers/agent/ slider-cor...
Author: smohanty
Date: Tue May 20 03:09:30 2014
New Revision: 1596105
URL: http://svn.apache.org/r1596105
Log:
SLIDER 57. Add basic component command order support
Added:
incubator/slider/trunk/slider-agent/src/main/python/agent/Constants.py
incubator/slider/trunk/slider-core/src/main/java/org/apache/slider/providers/agent/ComponentCommandOrder.java
incubator/slider/trunk/slider-core/src/main/java/org/apache/slider/providers/agent/application/metadata/CommandOrder.java
incubator/slider/trunk/slider-core/src/test/java/org/apache/slider/providers/agent/TestComponentCommandOrder.java
Modified:
incubator/slider/trunk/app-packages/hbase-v0_96/appConfig.json
incubator/slider/trunk/app-packages/hbase-v0_96/metainfo.xml
incubator/slider/trunk/app-packages/storm-v0_91/appConfig.json
incubator/slider/trunk/app-packages/storm-v0_91/metainfo.xml
incubator/slider/trunk/slider-agent/src/main/python/agent/ActionQueue.py
incubator/slider/trunk/slider-agent/src/main/python/agent/CommandStatusDict.py
incubator/slider/trunk/slider-agent/src/main/python/agent/CustomServiceOrchestrator.py
incubator/slider/trunk/slider-agent/src/main/python/agent/PythonExecutor.py
incubator/slider/trunk/slider-agent/src/main/python/agent/shell.py
incubator/slider/trunk/slider-agent/src/test/python/agent/TestActionQueue.py
incubator/slider/trunk/slider-agent/src/test/python/agent/TestController.py
incubator/slider/trunk/slider-agent/src/test/python/agent/TestCustomServiceOrchestrator.py
incubator/slider/trunk/slider-agent/src/test/python/agent/TestHeartbeat.py
incubator/slider/trunk/slider-agent/src/test/python/agent/TestShell.py
incubator/slider/trunk/slider-core/src/main/java/org/apache/slider/providers/agent/AgentProviderService.java
incubator/slider/trunk/slider-core/src/main/java/org/apache/slider/providers/agent/ComponentInstanceState.java
incubator/slider/trunk/slider-core/src/main/java/org/apache/slider/providers/agent/application/metadata/MetainfoParser.java
incubator/slider/trunk/slider-core/src/main/java/org/apache/slider/providers/agent/application/metadata/Service.java
incubator/slider/trunk/slider-core/src/main/java/org/apache/slider/server/appmaster/web/rest/agent/CommandReport.java
incubator/slider/trunk/slider-core/src/test/java/org/apache/slider/providers/agent/TestAgentProviderService.java
incubator/slider/trunk/src/site/markdown/slider_specs/application_definition.md
Modified: incubator/slider/trunk/app-packages/hbase-v0_96/appConfig.json
URL: http://svn.apache.org/viewvc/incubator/slider/trunk/app-packages/hbase-v0_96/appConfig.json?rev=1596105&r1=1596104&r2=1596105&view=diff
==============================================================================
--- incubator/slider/trunk/app-packages/hbase-v0_96/appConfig.json (original)
+++ incubator/slider/trunk/app-packages/hbase-v0_96/appConfig.json Tue May 20 03:09:30 2014
@@ -54,13 +54,11 @@
},
"components": {
"HBASE_MASTER": {
- "wait.heartbeat": "5"
},
"slider-appmaster": {
"jvm.heapsize": "256M"
},
"HBASE_REGIONSERVER": {
- "wait.heartbeat": "3"
}
}
}
Modified: incubator/slider/trunk/app-packages/hbase-v0_96/metainfo.xml
URL: http://svn.apache.org/viewvc/incubator/slider/trunk/app-packages/hbase-v0_96/metainfo.xml?rev=1596105&r1=1596104&r2=1596105&view=diff
==============================================================================
--- incubator/slider/trunk/app-packages/hbase-v0_96/metainfo.xml (original)
+++ incubator/slider/trunk/app-packages/hbase-v0_96/metainfo.xml Tue May 20 03:09:30 2014
@@ -42,6 +42,12 @@
</exports>
</exportGroup>
</exportGroups>
+ <commandOrders>
+ <commandOrder>
+ <command>HBASE_REGIONSERVER-START</command>
+ <requires>HBASE_MASTER-STARTED</requires>
+ </commandOrder>
+ </commandOrders>
<components>
<component>
<name>HBASE_MASTER</name>
Modified: incubator/slider/trunk/app-packages/storm-v0_91/appConfig.json
URL: http://svn.apache.org/viewvc/incubator/slider/trunk/app-packages/storm-v0_91/appConfig.json?rev=1596105&r1=1596104&r2=1596105&view=diff
==============================================================================
--- incubator/slider/trunk/app-packages/storm-v0_91/appConfig.json (original)
+++ incubator/slider/trunk/app-packages/storm-v0_91/appConfig.json Tue May 20 03:09:30 2014
@@ -19,7 +19,7 @@
"site.storm-site.supervisor.heartbeat.frequency.secs": "5",
"site.storm-site.topology.executor.send.buffer.size": "1024",
"site.storm-site.drpc.childopts": "-Xmx768m",
- "site.storm-site.nimbus.thrift.port": "6627",
+ "site.storm-site.nimbus.thrift.port": "${NIMBUS.ALLOCATED_PORT}",
"site.storm-site.storm.zookeeper.retry.intervalceiling.millis": "30000",
"site.storm-site.storm.local.dir": "${AGENT_WORK_ROOT}/app/tmp/storm",
"site.storm-site.topology.receiver.buffer.size": "8",
@@ -45,7 +45,7 @@
"site.storm-site.nimbus.monitor.freq.secs": "10",
"site.storm-site.storm.cluster.mode": "distributed",
"site.storm-site.dev.zookeeper.path": "${AGENT_WORK_ROOT}/app/tmp/dev-storm-zookeeper",
- "site.storm-site.drpc.invocations.port": "3773",
+ "site.storm-site.drpc.invocations.port": "${DRPC_SERVER.ALLOCATED_PORT}",
"site.storm-site.storm.zookeeper.root": "/storm",
"site.storm-site.logviewer.childopts": "-Xmx128m",
"site.storm-site.transactional.zookeeper.port": "null",
@@ -61,7 +61,7 @@
"site.storm-site.storm.messaging.transport": "backtype.storm.messaging.netty.Context",
"site.storm-site.logviewer.appender.name": "A1",
"site.storm-site.nimbus.host": "${NIMBUS_HOST}",
- "site.storm-site.ui.port": "8744",
+ "site.storm-site.ui.port": "${STORM_UI_SERVER.ALLOCATED_PORT}",
"site.storm-site.supervisor.slots.ports": "[6700, 6701]",
"site.storm-site.nimbus.file.copy.expiration.secs": "600",
"site.storm-site.supervisor.monitor.frequency.secs": "3",
@@ -109,16 +109,12 @@
"NIMBUS": {
},
"STORM_REST_API": {
- "wait.heartbeat": "3"
},
"STORM_UI_SERVER": {
- "wait.heartbeat": "3"
},
"DRPC_SERVER": {
- "wait.heartbeat": "7"
},
"SUPERVISOR": {
- "wait.heartbeat": "10"
}
}
}
Modified: incubator/slider/trunk/app-packages/storm-v0_91/metainfo.xml
URL: http://svn.apache.org/viewvc/incubator/slider/trunk/app-packages/storm-v0_91/metainfo.xml?rev=1596105&r1=1596104&r2=1596105&view=diff
==============================================================================
--- incubator/slider/trunk/app-packages/storm-v0_91/metainfo.xml (original)
+++ incubator/slider/trunk/app-packages/storm-v0_91/metainfo.xml Tue May 20 03:09:30 2014
@@ -23,6 +23,26 @@
<name>STORM</name>
<comment>Apache Hadoop Stream processing framework</comment>
<version>0.9.1.2.1</version>
+
+ <commandOrders>
+ <commandOrder>
+ <command>SUPERVISOR-START</command>
+ <requires>NIMBUS-STARTED</requires>
+ </commandOrder>
+ <commandOrder>
+ <command>DRPC_SERVER-START</command>
+ <requires>NIMBUS-STARTED</requires>
+ </commandOrder>
+ <commandOrder>
+ <command>STORM_REST_API-START</command>
+ <requires>NIMBUS-STARTED,DRPC_SERVER-STARTED,STORM_UI_SERVER-STARTED</requires>
+ </commandOrder>
+ <commandOrder>
+ <command>STORM_UI_SERVER-START</command>
+ <requires>NIMBUS-STARTED</requires>
+ </commandOrder>
+ </commandOrders>
+
<components>
<component>
Modified: incubator/slider/trunk/slider-agent/src/main/python/agent/ActionQueue.py
URL: http://svn.apache.org/viewvc/incubator/slider/trunk/slider-agent/src/main/python/agent/ActionQueue.py?rev=1596105&r1=1596104&r2=1596105&view=diff
==============================================================================
--- incubator/slider/trunk/slider-agent/src/main/python/agent/ActionQueue.py (original)
+++ incubator/slider/trunk/slider-agent/src/main/python/agent/ActionQueue.py Tue May 20 03:09:30 2014
@@ -26,10 +26,10 @@ import pprint
import os
import time
-from shell import shellRunner
from AgentConfig import AgentConfig
from CommandStatusDict import CommandStatusDict
from CustomServiceOrchestrator import CustomServiceOrchestrator
+import Constants
logger = logging.getLogger()
@@ -57,7 +57,6 @@ class ActionQueue(threading.Thread):
self.status_update_callback)
self.config = config
self.controller = controller
- self.sh = shellRunner()
self._stop = threading.Event()
self.tmpdir = config.getResolvedPath(AgentConfig.APP_TASK_DIR)
self.customServiceOrchestrator = CustomServiceOrchestrator(config,
@@ -145,13 +144,13 @@ class ActionQueue(threading.Thread):
store_config)
# dumping results
status = self.COMPLETED_STATUS
- if commandresult['exitcode'] != 0:
+ if commandresult[Constants.EXIT_CODE] != 0:
status = self.FAILED_STATUS
roleResult = self.commandStatuses.generate_report_template(command)
roleResult.update({
'stdout': commandresult['stdout'],
'stderr': commandresult['stderr'],
- 'exitCode': commandresult['exitcode'],
+ Constants.EXIT_CODE: commandresult[Constants.EXIT_CODE],
'status': status,
})
if roleResult['stdout'] == '':
@@ -165,8 +164,10 @@ class ActionQueue(threading.Thread):
roleResult['structuredOut'] = ''
# let server know that configuration tags were applied
if status == self.COMPLETED_STATUS:
- if command.has_key('configurationTags'):
+ if 'configurationTags' in command:
roleResult['configurationTags'] = command['configurationTags']
+ if Constants.ALLOCATED_PORTS in commandresult:
+ roleResult['allocatedPorts'] = commandresult[Constants.ALLOCATED_PORTS]
self.commandStatuses.put_command_status(command, roleResult)
# Store action result to agent response queue
@@ -197,8 +198,8 @@ class ActionQueue(threading.Thread):
if 'configurations' in component_status:
result['configurations'] = component_status['configurations']
- if 'exitcode' in component_status:
- result['status'] = component_status['exitcode']
+ if Constants.EXIT_CODE in component_status:
+ result['status'] = component_status[Constants.EXIT_CODE]
logger.debug("Got live status for component " + component + \
" of service " + str(service) + \
" of cluster " + str(cluster))
Modified: incubator/slider/trunk/slider-agent/src/main/python/agent/CommandStatusDict.py
URL: http://svn.apache.org/viewvc/incubator/slider/trunk/slider-agent/src/main/python/agent/CommandStatusDict.py?rev=1596105&r1=1596104&r2=1596105&view=diff
==============================================================================
--- incubator/slider/trunk/slider-agent/src/main/python/agent/CommandStatusDict.py (original)
+++ incubator/slider/trunk/slider-agent/src/main/python/agent/CommandStatusDict.py Tue May 20 03:09:30 2014
@@ -22,6 +22,7 @@ import json
import logging
import threading
from Grep import Grep
+import Constants
logger = logging.getLogger()
@@ -117,7 +118,7 @@ class CommandStatusDict():
'stdout': grep.filterMarkup(output),
'stderr': tmperr,
'structuredOut': tmpstructuredout,
- 'exitCode': 777,
+ Constants.EXIT_CODE: 777,
'status': ActionQueue.IN_PROGRESS_STATUS,
})
return inprogress
Added: incubator/slider/trunk/slider-agent/src/main/python/agent/Constants.py
URL: http://svn.apache.org/viewvc/incubator/slider/trunk/slider-agent/src/main/python/agent/Constants.py?rev=1596105&view=auto
==============================================================================
--- incubator/slider/trunk/slider-agent/src/main/python/agent/Constants.py (added)
+++ incubator/slider/trunk/slider-agent/src/main/python/agent/Constants.py Tue May 20 03:09:30 2014
@@ -0,0 +1,26 @@
+#!/usr/bin/env python
+
+'''
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements. See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership. The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License. You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing, software
+distributed under the License is distributed on an "AS IS" BASIS,
+WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+See the License for the specific language governing permissions and
+limitations under the License.
+'''
+
+"""
+Constants used by Slider Agent
+"""
+
+EXIT_CODE = "exitcode"
+ALLOCATED_PORTS = "allocated_ports"
Modified: incubator/slider/trunk/slider-agent/src/main/python/agent/CustomServiceOrchestrator.py
URL: http://svn.apache.org/viewvc/incubator/slider/trunk/slider-agent/src/main/python/agent/CustomServiceOrchestrator.py?rev=1596105&r1=1596104&r2=1596105&view=diff
==============================================================================
--- incubator/slider/trunk/slider-agent/src/main/python/agent/CustomServiceOrchestrator.py (original)
+++ incubator/slider/trunk/slider-agent/src/main/python/agent/CustomServiceOrchestrator.py Tue May 20 03:09:30 2014
@@ -28,6 +28,7 @@ from AgentConfig import AgentConfig
from AgentException import AgentException
from PythonExecutor import PythonExecutor
import hostname
+import Constants
logger = logging.getLogger()
@@ -66,6 +67,7 @@ class CustomServiceOrchestrator():
def runCommand(self, command, tmpoutfile, tmperrfile,
override_output_files=True, store_config=False):
+ allocated_port = {}
try:
script_type = command['commandParams']['script_type']
script = command['commandParams']['script']
@@ -82,8 +84,7 @@ class CustomServiceOrchestrator():
# We don't support anything else yet
message = "Unknown script type {0}".format(script_type)
raise AgentException(message)
- # Execute command using proper interpreter
- json_path = self.dump_command_to_json(command, store_config)
+ json_path = self.dump_command_to_json(command, allocated_port, store_config)
py_file_list = [script_tuple]
# filter None values
filtered_py_file_list = [i for i in py_file_list if i]
@@ -104,7 +105,7 @@ class CustomServiceOrchestrator():
environment_vars)
# Next run_file() invocations should always append to current output
override_output_files = False
- if ret['exitcode'] != 0:
+ if ret[Constants.EXIT_CODE] != 0:
break
if not ret: # Something went wrong
@@ -119,9 +120,11 @@ class CustomServiceOrchestrator():
'stdout': message,
'stderr': message,
'structuredOut': '{}',
- 'exitcode': 1,
+ Constants.EXIT_CODE: 1,
}
+ if Constants.EXIT_CODE in ret and ret[Constants.EXIT_CODE] == 0:
+ ret[Constants.ALLOCATED_PORTS] = allocated_port
return ret
@@ -172,15 +175,15 @@ class CustomServiceOrchestrator():
res = self.runCommand(command, self.status_commands_stdout,
self.status_commands_stderr,
override_output_files=override_output_files)
- if res['exitcode'] == 0:
- res['exitcode'] = CustomServiceOrchestrator.LIVE_STATUS
+ if res[Constants.EXIT_CODE] == 0:
+ res[Constants.EXIT_CODE] = CustomServiceOrchestrator.LIVE_STATUS
else:
- res['exitcode'] = CustomServiceOrchestrator.DEAD_STATUS
+ res[Constants.EXIT_CODE] = CustomServiceOrchestrator.DEAD_STATUS
return res
pass
- def dump_command_to_json(self, command, store_config=False):
+ def dump_command_to_json(self, command, allocated_ports, store_config=False):
"""
Converts command to json file and returns file path
"""
@@ -203,7 +206,7 @@ class CustomServiceOrchestrator():
if os.path.isfile(file_path):
os.unlink(file_path)
- self.finalize_command(command, store_config)
+ self.finalize_command(command, store_config, allocated_ports)
with os.fdopen(os.open(file_path, os.O_WRONLY | os.O_CREAT,
0600), 'w') as f:
@@ -217,7 +220,7 @@ class CustomServiceOrchestrator():
${AGENT_LOG_ROOT} -> AgentConfig.getLogPath()
"""
- def finalize_command(self, command, store_config):
+ def finalize_command(self, command, store_config, allocated_ports):
component = command['componentName']
allocated_port_format = "${{{0}.ALLOCATED_PORT}}"
port_allocation_req = allocated_port_format.format(component)
@@ -234,6 +237,7 @@ class CustomServiceOrchestrator():
port = self.allocate_port()
value = value.replace(port_allocation_req, str(port))
logger.info("Allocated port " + str(port) + " for " + port_allocation_req)
+ allocated_ports[k] = value
command['configurations'][key][k] = value
pass
pass
Modified: incubator/slider/trunk/slider-agent/src/main/python/agent/PythonExecutor.py
URL: http://svn.apache.org/viewvc/incubator/slider/trunk/slider-agent/src/main/python/agent/PythonExecutor.py?rev=1596105&r1=1596104&r2=1596105&view=diff
==============================================================================
--- incubator/slider/trunk/slider-agent/src/main/python/agent/PythonExecutor.py (original)
+++ incubator/slider/trunk/slider-agent/src/main/python/agent/PythonExecutor.py Tue May 20 03:09:30 2014
@@ -28,6 +28,7 @@ from threading import Thread
from Grep import Grep
import shell
import sys
+import Constants
logger = logging.getLogger()
@@ -137,7 +138,7 @@ class PythonExecutor:
grep = self.grep
result = {
- "exitcode": retcode,
+ Constants.EXIT_CODE: retcode,
"stdout": grep.tail(stdout,
log_lines_count) if log_lines_count else stdout,
"stderr": grep.tail(stderr,
Modified: incubator/slider/trunk/slider-agent/src/main/python/agent/shell.py
URL: http://svn.apache.org/viewvc/incubator/slider/trunk/slider-agent/src/main/python/agent/shell.py?rev=1596105&r1=1596104&r2=1596105&view=diff
==============================================================================
--- incubator/slider/trunk/slider-agent/src/main/python/agent/shell.py (original)
+++ incubator/slider/trunk/slider-agent/src/main/python/agent/shell.py Tue May 20 03:09:30 2014
@@ -84,28 +84,4 @@ def changeUid():
try:
os.setuid(threadLocal.uid)
except Exception:
- logger.warn("can not switch user for running command.")
-
-class shellRunner:
- # Run any command
- def run(self, script, user=None):
- try:
- if user!=None:
- user = pwd.getpwnam(user)[2]
- else:
- user = os.getuid()
- threadLocal.uid = user
- except Exception:
- logger.warn("can not switch user for RUN_COMMAND.")
- code = 0
- cmd = " "
- cmd = cmd.join(script)
- p = subprocess.Popen(cmd, preexec_fn=changeUid, stdout=subprocess.PIPE,
- stderr=subprocess.PIPE, shell=True, close_fds=True)
- out, err = p.communicate()
- code = p.wait()
- logger.debug("Exitcode for %s is %d" % (cmd,code))
- return {'exitCode': code, 'output': out, 'error': err}
-
- def getServerTracker(self):
- return serverTracker
\ No newline at end of file
+ logger.warn("can not switch user for running command.")
\ No newline at end of file
Modified: incubator/slider/trunk/slider-agent/src/test/python/agent/TestActionQueue.py
URL: http://svn.apache.org/viewvc/incubator/slider/trunk/slider-agent/src/test/python/agent/TestActionQueue.py?rev=1596105&r1=1596104&r2=1596105&view=diff
==============================================================================
--- incubator/slider/trunk/slider-agent/src/test/python/agent/TestActionQueue.py (original)
+++ incubator/slider/trunk/slider-agent/src/test/python/agent/TestActionQueue.py Tue May 20 03:09:30 2014
@@ -21,8 +21,8 @@ limitations under the License.
from Queue import Queue
from unittest import TestCase
import unittest
-from agent.ActionQueue import ActionQueue
-from agent.AgentConfig import AgentConfig
+from ActionQueue import ActionQueue
+from AgentConfig import AgentConfig
import os
import errno
import time
@@ -34,9 +34,9 @@ import sys
import logging
from threading import Thread
from mock.mock import patch, MagicMock, call
-from agent.CustomServiceOrchestrator import CustomServiceOrchestrator
-from agent.PythonExecutor import PythonExecutor
-from agent.CommandStatusDict import CommandStatusDict
+from CustomServiceOrchestrator import CustomServiceOrchestrator
+from PythonExecutor import PythonExecutor
+from CommandStatusDict import CommandStatusDict
class TestActionQueue(TestCase):
@@ -341,7 +341,7 @@ class TestActionQueue(TestCase):
'role': u'HBASE_MASTER',
'actionId': '1-1',
'taskId': 3,
- 'exitCode': 777}
+ 'exitcode': 777}
self.assertEqual(report['reports'][0], expected)
# Continue command execution
unfreeze_flag.set()
@@ -362,7 +362,8 @@ class TestActionQueue(TestCase):
'actionId': '1-1',
'taskId': 3,
'structuredOut': '',
- 'exitCode': 0}
+ 'exitcode': 0,
+ 'allocatedPorts': {}}
self.assertEqual(len(report['reports']), 1)
self.assertEqual(report['reports'][0], expected)
self.assertTrue(os.path.isfile(configname))
@@ -400,7 +401,7 @@ class TestActionQueue(TestCase):
'actionId': '1-1',
'taskId': 3,
'structuredOut': '',
- 'exitCode': 13}
+ 'exitcode': 13}
self.assertEqual(len(report['reports']), 1)
self.assertEqual(report['reports'][0], expected)
Modified: incubator/slider/trunk/slider-agent/src/test/python/agent/TestController.py
URL: http://svn.apache.org/viewvc/incubator/slider/trunk/slider-agent/src/test/python/agent/TestController.py?rev=1596105&r1=1596104&r2=1596105&view=diff
==============================================================================
--- incubator/slider/trunk/slider-agent/src/test/python/agent/TestController.py (original)
+++ incubator/slider/trunk/slider-agent/src/test/python/agent/TestController.py Tue May 20 03:09:30 2014
@@ -25,9 +25,9 @@ import unittest, threading
from agent import Controller, ActionQueue
from agent import hostname
import sys
-from agent.Controller import AGENT_AUTO_RESTART_EXIT_CODE
-from agent.Controller import State
-from agent.AgentConfig import AgentConfig
+from Controller import AGENT_AUTO_RESTART_EXIT_CODE
+from Controller import State
+from AgentConfig import AgentConfig
from mock.mock import patch, MagicMock, call, Mock
import logging
from threading import Event
Modified: incubator/slider/trunk/slider-agent/src/test/python/agent/TestCustomServiceOrchestrator.py
URL: http://svn.apache.org/viewvc/incubator/slider/trunk/slider-agent/src/test/python/agent/TestCustomServiceOrchestrator.py?rev=1596105&r1=1596104&r2=1596105&view=diff
==============================================================================
--- incubator/slider/trunk/slider-agent/src/test/python/agent/TestCustomServiceOrchestrator.py (original)
+++ incubator/slider/trunk/slider-agent/src/test/python/agent/TestCustomServiceOrchestrator.py Tue May 20 03:09:30 2014
@@ -79,7 +79,7 @@ class TestCustomServiceOrchestrator(Test
orchestrator = CustomServiceOrchestrator(config, dummy_controller)
isfile_mock.return_value = True
# Test dumping EXECUTION_COMMAND
- json_file = orchestrator.dump_command_to_json(command)
+ json_file = orchestrator.dump_command_to_json(command, {})
self.assertTrue(os.path.exists(json_file))
self.assertTrue(os.path.getsize(json_file) > 0)
self.assertEqual(oct(os.stat(json_file).st_mode & 0777), '0600')
@@ -87,7 +87,7 @@ class TestCustomServiceOrchestrator(Test
os.unlink(json_file)
# Test dumping STATUS_COMMAND
command['commandType'] = 'STATUS_COMMAND'
- json_file = orchestrator.dump_command_to_json(command)
+ json_file = orchestrator.dump_command_to_json(command, {})
self.assertTrue(os.path.exists(json_file))
self.assertTrue(os.path.getsize(json_file) > 0)
self.assertEqual(oct(os.stat(json_file).st_mode & 0777), '0600')
@@ -135,7 +135,7 @@ class TestCustomServiceOrchestrator(Test
run_file_mock.return_value = {
'stdout': 'sss',
'stderr': 'eee',
- 'exitcode': 0,
+ 'exitcode': 0
}
ret = orchestrator.runCommand(command, "out.txt", "err.txt")
self.assertEqual(ret['exitcode'], 0)
@@ -155,6 +155,16 @@ class TestCustomServiceOrchestrator(Test
self.assertEquals(run_file_mock.call_args_list[0][0][6], True)
run_file_mock.reset_mock()
+ # Case when we force another command
+ run_file_mock.return_value = {
+ 'stdout': 'sss',
+ 'stderr': 'eee',
+ 'exitcode': 1
+ }
+ ret = orchestrator.runCommand(command, "out.txt", "err.txt")
+ self.assertFalse('allocated_ports' in ret)
+
+ run_file_mock.reset_mock()
# unknown script type case
command['commandParams']['script_type'] = "PUPPET"
@@ -168,6 +178,58 @@ class TestCustomServiceOrchestrator(Test
pass
+ @patch.object(CustomServiceOrchestrator, "allocate_port")
+ @patch.object(CustomServiceOrchestrator, "resolve_script_path")
+ @patch.object(PythonExecutor, "run_file")
+ def test_runCommand_get_port(self,
+ run_file_mock,
+ resolve_script_path_mock,
+ allocate_port_mock):
+ command = {
+ 'role': 'HBASE_REGIONSERVER',
+ 'hostLevelParams': {
+ 'stack_name': 'HDP',
+ 'stack_version': '2.0.7',
+ 'jdk_location': 'some_location'
+ },
+ 'commandParams': {
+ 'script_type': 'PYTHON',
+ 'script': 'scripts/hbase_regionserver.py',
+ 'command_timeout': '600',
+ 'service_package_folder': 'HBASE'
+ },
+ 'taskId': '3',
+ 'roleCommand': 'INSTALL',
+ 'commandType': 'EXECUTE',
+ 'componentName': 'HBASE_REGIONSERVER',
+ 'configurations': {'a': {'a.port': '${HBASE_REGIONSERVER.ALLOCATED_PORT}'}}
+ }
+
+ tempdir = tempfile.gettempdir()
+ config = MagicMock()
+ config.get.return_value = "something"
+ config.getResolvedPath.return_value = tempdir
+ config.getWorkRootPath.return_value = tempdir
+ config.getLogPath.return_value = tempdir
+
+ allocate_port_mock.return_value = 10233
+
+ resolve_script_path_mock.return_value = "/basedir/scriptpath"
+ dummy_controller = MagicMock()
+ orchestrator = CustomServiceOrchestrator(config, dummy_controller)
+ # normal run case
+ run_file_mock.return_value = {
+ 'stdout': 'sss',
+ 'stderr': 'eee',
+ 'exitcode': 0
+ }
+ ret = orchestrator.runCommand(command, "out.txt", "err.txt")
+ self.assertEqual(ret['exitcode'], 0)
+ self.assertEqual(ret['allocated_ports'], {'a.port': '10233'})
+ self.assertTrue(run_file_mock.called)
+ self.assertEqual(run_file_mock.call_count, 1)
+
+
@patch("hostname.public_hostname")
@patch("os.path.isfile")
@patch("os.unlink")
@@ -243,7 +305,7 @@ class TestCustomServiceOrchestrator(Test
expected_specific = {
'hbase-site': {
'hbase.log': tempdir, 'hbase.number': '10485760'},
- }
+ }
ret = orchestrator.runCommand(command, "out.txt", "err.txt", True, True)
self.assertEqual(ret['exitcode'], 0)
@@ -316,11 +378,15 @@ class TestCustomServiceOrchestrator(Test
command['configurations']['oozie-site']['log_root'] = "${AGENT_LOG_ROOT}"
command['configurations']['oozie-site']['a_port'] = "${HBASE_MASTER.ALLOCATED_PORT}"
- orchestrator.finalize_command(command, False)
+ allocated_ports = {}
+ orchestrator.finalize_command(command, False, allocated_ports)
self.assertEqual(command['configurations']['hbase-site']['work_root'], tempWorkDir)
self.assertEqual(command['configurations']['oozie-site']['log_root'], tempdir)
self.assertEqual(command['configurations']['oozie-site']['a_port'], "10023")
self.assertEqual(orchestrator.applied_configs, {})
+ self.assertEqual(len(allocated_ports), 1)
+ self.assertTrue('a_port' in allocated_ports)
+ self.assertEqual(allocated_ports['a_port'], '10023')
command['configurations']['hbase-site']['work_root'] = "${AGENT_WORK_ROOT}"
command['configurations']['hbase-site']['log_root'] = "${AGENT_LOG_ROOT}/log"
@@ -328,7 +394,7 @@ class TestCustomServiceOrchestrator(Test
command['configurations']['oozie-site']['log_root'] = "${AGENT_LOG_ROOT}"
command['configurations']['oozie-site']['b_port'] = "${HBASE_REGIONSERVER.ALLOCATED_PORT}"
- orchestrator.finalize_command(command, True)
+ orchestrator.finalize_command(command, True, {})
self.assertEqual(command['configurations']['hbase-site']['log_root'], tempdir + "/log")
self.assertEqual(command['configurations']['hbase-site']['blog_root'], "/b/" + tempdir + "/log")
self.assertEqual(command['configurations']['oozie-site']['b_port'], "${HBASE_REGIONSERVER.ALLOCATED_PORT}")
Modified: incubator/slider/trunk/slider-agent/src/test/python/agent/TestHeartbeat.py
URL: http://svn.apache.org/viewvc/incubator/slider/trunk/slider-agent/src/test/python/agent/TestHeartbeat.py?rev=1596105&r1=1596104&r2=1596105&view=diff
==============================================================================
--- incubator/slider/trunk/slider-agent/src/test/python/agent/TestHeartbeat.py (original)
+++ incubator/slider/trunk/slider-agent/src/test/python/agent/TestHeartbeat.py Tue May 20 03:09:30 2014
@@ -20,9 +20,9 @@ limitations under the License.
from unittest import TestCase
import unittest
-from agent.Heartbeat import Heartbeat
-from agent.ActionQueue import ActionQueue
-from agent.AgentConfig import AgentConfig
+from Heartbeat import Heartbeat
+from ActionQueue import ActionQueue
+from AgentConfig import AgentConfig
import socket
import os
import time
@@ -85,7 +85,7 @@ class TestHeartbeat(TestCase):
'role': u'DATANODE',
'actionId': '1-1',
'taskId': 3,
- 'exitCode': 777},
+ 'exitcode': 777},
{'status': 'COMPLETED',
'stderr': 'stderr',
@@ -96,7 +96,7 @@ class TestHeartbeat(TestCase):
'role': 'role',
'actionId': 17,
'taskId': 'taskId',
- 'exitCode': 0},
+ 'exitcode': 0},
{'status': 'FAILED',
'stderr': 'stderr',
@@ -107,7 +107,7 @@ class TestHeartbeat(TestCase):
'role': u'DATANODE',
'actionId': '1-1',
'taskId': 3,
- 'exitCode': 13},
+ 'exitcode': 13},
{'status': 'COMPLETED',
'stderr': 'stderr',
@@ -119,7 +119,7 @@ class TestHeartbeat(TestCase):
'role': u'DATANODE',
'actionId': '1-1',
'taskId': 3,
- 'exitCode': 0}
+ 'exitcode': 0}
],
'componentStatus': [
@@ -141,17 +141,17 @@ class TestHeartbeat(TestCase):
'serviceName': u'HDFS', 'role': u'DATANODE', 'actionId': '1-1',
'stderr': 'Read from /tmp/errors-3.txt',
'stdout': 'Read from /tmp/output-3.txt', 'clusterName': u'cc',
- 'taskId': 3, 'exitCode': 777},
+ 'taskId': 3, 'exitcode': 777},
{'status': 'COMPLETED', 'roleCommand': 'UPGRADE',
'serviceName': 'serviceName', 'role': 'role', 'actionId': 17,
'stderr': 'stderr', 'stdout': 'out', 'clusterName': 'clusterName',
- 'taskId': 'taskId', 'exitCode': 0},
+ 'taskId': 'taskId', 'exitcode': 0},
{'status': 'FAILED', 'roleCommand': u'INSTALL', 'serviceName': u'HDFS',
'role': u'DATANODE', 'actionId': '1-1', 'stderr': 'stderr',
- 'stdout': 'out', 'clusterName': u'cc', 'taskId': 3, 'exitCode': 13},
+ 'stdout': 'out', 'clusterName': u'cc', 'taskId': 3, 'exitcode': 13},
{'status': 'COMPLETED', 'stdout': 'out',
'configurationTags': {'global': {'tag': 'v1'}}, 'taskId': 3,
- 'exitCode': 0, 'roleCommand': u'INSTALL', 'clusterName': u'cc',
+ 'exitcode': 0, 'roleCommand': u'INSTALL', 'clusterName': u'cc',
'serviceName': u'HDFS', 'role': u'DATANODE', 'actionId': '1-1',
'stderr': 'stderr'}], 'componentStatus': [
{'status': 'HEALTHY', 'componentName': 'DATANODE'},
Modified: incubator/slider/trunk/slider-agent/src/test/python/agent/TestShell.py
URL: http://svn.apache.org/viewvc/incubator/slider/trunk/slider-agent/src/test/python/agent/TestShell.py?rev=1596105&r1=1596104&r2=1596105&view=diff
==============================================================================
--- incubator/slider/trunk/slider-agent/src/test/python/agent/TestShell.py (original)
+++ incubator/slider/trunk/slider-agent/src/test/python/agent/TestShell.py Tue May 20 03:09:30 2014
@@ -24,7 +24,6 @@ import unittest
import tempfile
from mock.mock import patch, MagicMock, call
from agent import shell
-from shell import shellRunner
from sys import platform as _platform
import subprocess, time
@@ -38,18 +37,6 @@ class TestShell(unittest.TestCase):
self.assertTrue(os_setUIDMock.called)
- @patch("pwd.getpwnam")
- def test_shellRunner_run(self, getpwnamMock):
- sh = shellRunner()
- result = sh.run(['echo'])
- self.assertEquals(result['exitCode'], 0)
- self.assertEquals(result['error'], '')
-
- getpwnamMock.return_value = [os.getuid(), os.getuid(), os.getuid()]
- result = sh.run(['echo'], 'non_exist_user_name')
- self.assertEquals(result['exitCode'], 0)
- self.assertEquals(result['error'], '')
-
def test_kill_process_with_children(self):
if _platform == "linux" or _platform == "linux2": # Test is Linux-specific
gracefull_kill_delay_old = shell.gracefull_kill_delay
Modified: incubator/slider/trunk/slider-core/src/main/java/org/apache/slider/providers/agent/AgentProviderService.java
URL: http://svn.apache.org/viewvc/incubator/slider/trunk/slider-core/src/main/java/org/apache/slider/providers/agent/AgentProviderService.java?rev=1596105&r1=1596104&r2=1596105&view=diff
==============================================================================
--- incubator/slider/trunk/slider-core/src/main/java/org/apache/slider/providers/agent/AgentProviderService.java (original)
+++ incubator/slider/trunk/slider-core/src/main/java/org/apache/slider/providers/agent/AgentProviderService.java Tue May 20 03:09:30 2014
@@ -18,6 +18,7 @@
package org.apache.slider.providers.agent;
+import com.google.common.annotations.VisibleForTesting;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.util.StringUtils;
@@ -84,6 +85,7 @@ import java.util.List;
import java.util.Locale;
import java.util.Map;
import java.util.TreeMap;
+import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.atomic.AtomicInteger;
import static org.apache.slider.server.appmaster.web.rest.RestPaths.SLIDER_PATH_AGENTS;
@@ -101,10 +103,13 @@ public class AgentProviderService extend
private static final String LABEL_MAKER = "___";
private static final String CONTAINER_ID = "container_id";
private static final String GLOBAL_CONFIG_TAG = "global";
+ private final Object syncLock = new Object();
+ private final Map<String, String> allocatedPorts = new ConcurrentHashMap<>();
private AgentClientProvider clientProvider;
private Map<String, ComponentInstanceState> componentStatuses = new HashMap<>();
private AtomicInteger taskId = new AtomicInteger(0);
private Metainfo metainfo = null;
+ private ComponentCommandOrder commandOrder = null;
public AgentProviderService() {
super("AgentProviderService");
@@ -152,12 +157,17 @@ public class AgentProviderService extend
String appDef = instanceDefinition.getAppConfOperations().
getGlobalOptions().getMandatoryOption(AgentKeys.APP_DEF);
- // No need to synchronize as there is low chance of multiple simultaneous reads
if (metainfo == null) {
- metainfo = getApplicationMetainfo(fileSystem, appDef);
- if (metainfo == null) {
- log.error("metainfo.xml is unavailable or malformed at {}.", appDef);
- throw new SliderException("metainfo.xml is required in app package.");
+ synchronized (syncLock) {
+ if (metainfo == null) {
+ metainfo = getApplicationMetainfo(fileSystem, appDef);
+ if (metainfo == null || metainfo.getServices() == null || metainfo.getServices().size() == 0) {
+ log.error("metainfo.xml is unavailable or malformed at {}.", appDef);
+ throw new SliderException("metainfo.xml is required in app package.");
+ }
+
+ commandOrder = new ComponentCommandOrder(metainfo.getServices().get(0).getCommandOrder());
+ }
}
}
@@ -312,7 +322,6 @@ public class AgentProviderService extend
@Override
public RegistrationResponse handleRegistration(Register registration) {
- // dummy impl
RegistrationResponse response = new RegistrationResponse();
String label = registration.getHostname();
if (componentStatuses.containsKey(label)) {
@@ -380,6 +389,13 @@ public class AgentProviderService extend
List<CommandReport> reports = heartBeat.getReports();
if (reports != null && !reports.isEmpty()) {
CommandReport report = reports.get(0);
+ Map<String, String> ports = report.getAllocatedPorts();
+ if (ports != null && !ports.isEmpty()) {
+ for (Map.Entry<String, String> port : ports.entrySet()) {
+ log.info("Recording allocated port for {} as {}", port.getKey(), port.getValue());
+ this.allocatedPorts.put(port.getKey(), port.getValue());
+ }
+ }
CommandResult result = getCommandResult(report.getStatus());
Command command = getCommand(report.getRoleCommand());
componentStatus.applyCommandResult(result, command);
@@ -398,13 +414,20 @@ public class AgentProviderService extend
Command command = componentStatus.getNextCommand();
try {
if (Command.NOP != command) {
- componentStatus.commandIssued(command);
if (command == Command.INSTALL) {
- log.info("Installing component ...");
+ log.info("Installing {} on {}.", roleName, containerId);
addInstallCommand(roleName, containerId, response, scriptPath);
+ componentStatus.commandIssued(command);
} else if (command == Command.START) {
- log.info("Starting component ...");
- addStartCommand(roleName, containerId, response, scriptPath);
+ // check against dependencies
+ boolean canExecute = commandOrder.canExecute(roleName, command, componentStatuses.values());
+ if (canExecute) {
+ log.info("Starting {} on {}.", roleName, containerId);
+ addStartCommand(roleName, containerId, response, scriptPath);
+ componentStatus.commandIssued(command);
+ } else {
+ log.info("Start of {} on {} delayed as dependencies have not started.", roleName, containerId);
+ }
}
}
// if there is no outstanding command then retrieve config
@@ -579,6 +602,7 @@ public class AgentProviderService extend
cmd.setConfigurations(configurations);
}
+ @VisibleForTesting
protected void addStatusCommand(String roleName, String containerId, HeartBeatResponse response, String scriptPath)
throws SliderException {
assert getStateAccessor().isApplicationLive();
@@ -608,6 +632,7 @@ public class AgentProviderService extend
response.addStatusCommand(cmd);
}
+ @VisibleForTesting
protected void addGetConfigCommand(String roleName, String containerId, HeartBeatResponse response)
throws SliderException {
assert getStateAccessor().isApplicationLive();
@@ -630,6 +655,7 @@ public class AgentProviderService extend
response.addStatusCommand(cmd);
}
+ @VisibleForTesting
protected void addStartCommand(String roleName, String containerId, HeartBeatResponse response, String scriptPath)
throws
SliderException {
@@ -660,6 +686,10 @@ public class AgentProviderService extend
response.addExecutionCommand(cmd);
}
+ protected Map<String, String> getAllocatedPorts() {
+ return this.allocatedPorts;
+ }
+
private Map<String, Map<String, String>> buildCommandConfigurations(ConfTreeOperations appConf) {
Map<String, Map<String, String>> configurations = new TreeMap<>();
@@ -710,6 +740,15 @@ public class AgentProviderService extend
// add role hosts to tokens
addRoleRelatedTokens(tokens);
providerUtils.propagateSiteOptions(sourceConfig, config, configName, tokens);
+
+ //apply any port updates
+ if (!this.getAllocatedPorts().isEmpty()) {
+ for (String key : config.keySet()) {
+ if (this.getAllocatedPorts().keySet().contains(key)) {
+ config.put(key, getAllocatedPorts().get(key));
+ }
+ }
+ }
configurations.put(configName, config);
}
Added: incubator/slider/trunk/slider-core/src/main/java/org/apache/slider/providers/agent/ComponentCommandOrder.java
URL: http://svn.apache.org/viewvc/incubator/slider/trunk/slider-core/src/main/java/org/apache/slider/providers/agent/ComponentCommandOrder.java?rev=1596105&view=auto
==============================================================================
--- incubator/slider/trunk/slider-core/src/main/java/org/apache/slider/providers/agent/ComponentCommandOrder.java (added)
+++ incubator/slider/trunk/slider-core/src/main/java/org/apache/slider/providers/agent/ComponentCommandOrder.java Tue May 20 03:09:30 2014
@@ -0,0 +1,169 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.slider.providers.agent;
+
+import org.apache.slider.providers.agent.application.metadata.CommandOrder;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+/**
+ * Stores the command dependency order for all components in a service. <commandOrder>
+ * <command>SUPERVISOR-START</command> <requires>NIMBUS-STARTED</requires> </commandOrder> Means, SUPERVISOR START
+ * requires NIMBUS to be STARTED
+ */
+public class ComponentCommandOrder {
+ public static final Logger log =
+ LoggerFactory.getLogger(ComponentCommandOrder.class);
+ private static char SPLIT_CHAR = '-';
+ Map<Command, Map<String, List<ComponentState>>> dependencies = new HashMap<>();
+
+ public ComponentCommandOrder(List<CommandOrder> commandOrders) {
+ if (commandOrders != null && commandOrders.size() > 0) {
+ for (CommandOrder commandOrder : commandOrders) {
+ ComponentCommand componentCmd = getComponentCommand(commandOrder.getCommand());
+ String requires = commandOrder.getRequires();
+ List<ComponentState> requiredStates = parseRequiredStates(requires);
+ if (requiredStates.size() > 0) {
+ Map<String, List<ComponentState>> compDep = dependencies.get(componentCmd.command);
+ if (compDep == null) {
+ compDep = new HashMap<>();
+ dependencies.put(componentCmd.command, compDep);
+ }
+
+ List<ComponentState> requirements = compDep.get(componentCmd.componentName);
+ if (requirements == null) {
+ requirements = new ArrayList<>();
+ compDep.put(componentCmd.componentName, requirements);
+ }
+
+ requirements.addAll(requiredStates);
+ }
+ }
+ }
+ }
+
+ private List<ComponentState> parseRequiredStates(String requires) {
+ if (requires == null || requires.length() < 2) {
+ throw new IllegalArgumentException("Input cannot be null and must contain component and state.");
+ }
+
+ String[] componentStates = requires.split(",");
+ List<ComponentState> retList = new ArrayList<>();
+ for (String componentStateStr : componentStates) {
+ retList.add(getComponentState(componentStateStr));
+ }
+
+ return retList;
+ }
+
+ private ComponentCommand getComponentCommand(String compCmdStr) {
+ if (compCmdStr == null || compCmdStr.trim().length() < 2) {
+ throw new IllegalArgumentException("Input cannot be null and must contain component and command.");
+ }
+
+ compCmdStr = compCmdStr.trim();
+ int splitIndex = compCmdStr.lastIndexOf(SPLIT_CHAR);
+ if (splitIndex == -1 || splitIndex == 0 || splitIndex == compCmdStr.length() - 1) {
+ throw new IllegalArgumentException("Input does not appear to be well-formed.");
+ }
+ String compStr = compCmdStr.substring(0, splitIndex);
+ String cmdStr = compCmdStr.substring(splitIndex + 1);
+
+ Command cmd = Command.valueOf(cmdStr);
+
+ if(cmd != Command.START) {
+ throw new IllegalArgumentException("Dependency order can only be specified for START.");
+ }
+ return new ComponentCommand(compStr, cmd);
+ }
+
+ private ComponentState getComponentState(String compStStr) {
+ if (compStStr == null || compStStr.trim().length() < 2) {
+ throw new IllegalArgumentException("Input cannot be null.");
+ }
+
+ compStStr = compStStr.trim();
+ int splitIndex = compStStr.lastIndexOf(SPLIT_CHAR);
+ if (splitIndex == -1 || splitIndex == 0 || splitIndex == compStStr.length() - 1) {
+ throw new IllegalArgumentException("Input does not appear to be well-formed.");
+ }
+ String compStr = compStStr.substring(0, splitIndex);
+ String stateStr = compStStr.substring(splitIndex + 1);
+
+ State state = State.valueOf(stateStr);
+ if(state != State.STARTED) {
+ throw new IllegalArgumentException("Dependency order can only be specified against STARTED.");
+ }
+ return new ComponentState(compStr, state);
+ }
+
+ public boolean canExecute(String component, Command command, Collection<ComponentInstanceState> currentStates) {
+ boolean canExecute = true;
+ if (dependencies.containsKey(command) && dependencies.get(command).containsKey(component)) {
+ List<ComponentState> required = dependencies.get(command).get(component);
+ for (ComponentState stateToMatch : required) {
+ for (ComponentInstanceState currState : currentStates) {
+ log.debug("Checking schedule {} {} against dependency {} is {}",
+ component, command, currState.getCompName(), currState.getState());
+ if (currState.getCompName().equals(stateToMatch.componentName)) {
+ if (currState.getState() != stateToMatch.state) {
+ log.info("Cannot schedule {} {} as dependency {} is {}",
+ component, command, currState.getCompName(), currState.getState());
+ canExecute = false;
+ }
+ }
+ if (!canExecute) {
+ break;
+ }
+ }
+ if (!canExecute) {
+ break;
+ }
+ }
+ }
+
+ return canExecute;
+ }
+
+ static class ComponentState {
+ public String componentName;
+ public State state;
+
+ public ComponentState(String componentName, State state) {
+ this.componentName = componentName;
+ this.state = state;
+ }
+ }
+
+ static class ComponentCommand {
+ public String componentName;
+ public Command command;
+
+ public ComponentCommand(String componentName, Command command) {
+ this.componentName = componentName;
+ this.command = command;
+ }
+ }
+}
Modified: incubator/slider/trunk/slider-core/src/main/java/org/apache/slider/providers/agent/ComponentInstanceState.java
URL: http://svn.apache.org/viewvc/incubator/slider/trunk/slider-core/src/main/java/org/apache/slider/providers/agent/ComponentInstanceState.java?rev=1596105&r1=1596104&r2=1596105&view=diff
==============================================================================
--- incubator/slider/trunk/slider-core/src/main/java/org/apache/slider/providers/agent/ComponentInstanceState.java (original)
+++ incubator/slider/trunk/slider-core/src/main/java/org/apache/slider/providers/agent/ComponentInstanceState.java Tue May 20 03:09:30 2014
@@ -29,6 +29,7 @@ public class ComponentInstanceState {
private static int MAX_FAILURE_TOLERATED = 3;
private static String INVALID_TRANSITION_ERROR =
"Result {0} for command {1} is not expected for component {2} in state {3}.";
+
private final String compName;
private final String containerId;
private final String applicationId;
@@ -45,6 +46,10 @@ public class ComponentInstanceState {
this.applicationId = applicationId;
}
+ public String getCompName() {
+ return compName;
+ }
+
public Boolean getConfigReported() {
return configReported;
}
Added: incubator/slider/trunk/slider-core/src/main/java/org/apache/slider/providers/agent/application/metadata/CommandOrder.java
URL: http://svn.apache.org/viewvc/incubator/slider/trunk/slider-core/src/main/java/org/apache/slider/providers/agent/application/metadata/CommandOrder.java?rev=1596105&view=auto
==============================================================================
--- incubator/slider/trunk/slider-core/src/main/java/org/apache/slider/providers/agent/application/metadata/CommandOrder.java (added)
+++ incubator/slider/trunk/slider-core/src/main/java/org/apache/slider/providers/agent/application/metadata/CommandOrder.java Tue May 20 03:09:30 2014
@@ -0,0 +1,54 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.slider.providers.agent.application.metadata;
+
+/**
+ *
+ */
+public class CommandOrder {
+ String command;
+ String requires;
+
+ public CommandOrder() {
+ }
+
+ public String getCommand() {
+ return command;
+ }
+
+ public void setCommand(String command) {
+ this.command = command;
+ }
+
+ public String getRequires() {
+ return requires;
+ }
+
+ public void setRequires(String requires) {
+ this.requires = requires;
+ }
+
+ @Override
+ public String toString() {
+ final StringBuilder sb =
+ new StringBuilder("{");
+ sb.append(",\n\"command\": ").append(command);
+ sb.append(",\n\"requires\": ").append(requires);
+ sb.append('}');
+ return sb.toString();
+ }
+}
Modified: incubator/slider/trunk/slider-core/src/main/java/org/apache/slider/providers/agent/application/metadata/MetainfoParser.java
URL: http://svn.apache.org/viewvc/incubator/slider/trunk/slider-core/src/main/java/org/apache/slider/providers/agent/application/metadata/MetainfoParser.java?rev=1596105&r1=1596104&r2=1596105&view=diff
==============================================================================
--- incubator/slider/trunk/slider-core/src/main/java/org/apache/slider/providers/agent/application/metadata/MetainfoParser.java (original)
+++ incubator/slider/trunk/slider-core/src/main/java/org/apache/slider/providers/agent/application/metadata/MetainfoParser.java Tue May 20 03:09:30 2014
@@ -39,6 +39,11 @@ public class MetainfoParser {
digester.addBeanPropertySetter("*/service/comment");
digester.addBeanPropertySetter("*/service/version");
+ digester.addObjectCreate("*/commandOrder", CommandOrder.class);
+ digester.addBeanPropertySetter("*/commandOrder/command");
+ digester.addBeanPropertySetter("*/commandOrder/requires");
+ digester.addSetNext("*/commandOrder", "addCommandOrder");
+
digester.addObjectCreate("*/exportGroup", ExportGroup.class);
digester.addBeanPropertySetter("*/exportGroup/name");
digester.addObjectCreate("*/export", Export.class);
Modified: incubator/slider/trunk/slider-core/src/main/java/org/apache/slider/providers/agent/application/metadata/Service.java
URL: http://svn.apache.org/viewvc/incubator/slider/trunk/slider-core/src/main/java/org/apache/slider/providers/agent/application/metadata/Service.java?rev=1596105&r1=1596104&r2=1596105&view=diff
==============================================================================
--- incubator/slider/trunk/slider-core/src/main/java/org/apache/slider/providers/agent/application/metadata/Service.java (original)
+++ incubator/slider/trunk/slider-core/src/main/java/org/apache/slider/providers/agent/application/metadata/Service.java Tue May 20 03:09:30 2014
@@ -29,12 +29,14 @@ public class Service {
List<Component> components;
List<ExportGroup> exportGroups;
List<OSSpecific> osSpecifics;
+ List<CommandOrder> commandOrders;
ConfigurationDependencies configDependencies;
public Service() {
- exportGroups = new ArrayList<ExportGroup>();
- components = new ArrayList<Component>();
- osSpecifics = new ArrayList<OSSpecific>();
+ exportGroups = new ArrayList<>();
+ components = new ArrayList<>();
+ osSpecifics = new ArrayList<>();
+ commandOrders = new ArrayList<>();
}
public String getName() {
@@ -93,6 +95,14 @@ public class Service {
return osSpecifics;
}
+ public void addCommandOrder(CommandOrder commandOrder) {
+ commandOrders.add(commandOrder);
+ }
+
+ public List<CommandOrder> getCommandOrder() {
+ return commandOrders;
+ }
+
@Override
public String toString() {
final StringBuilder sb =
Modified: incubator/slider/trunk/slider-core/src/main/java/org/apache/slider/server/appmaster/web/rest/agent/CommandReport.java
URL: http://svn.apache.org/viewvc/incubator/slider/trunk/slider-core/src/main/java/org/apache/slider/server/appmaster/web/rest/agent/CommandReport.java?rev=1596105&r1=1596104&r2=1596105&view=diff
==============================================================================
--- incubator/slider/trunk/slider-core/src/main/java/org/apache/slider/server/appmaster/web/rest/agent/CommandReport.java (original)
+++ incubator/slider/trunk/slider-core/src/main/java/org/apache/slider/server/appmaster/web/rest/agent/CommandReport.java Tue May 20 03:09:30 2014
@@ -29,18 +29,18 @@ import java.util.Map;
@JsonSerialize(include = JsonSerialize.Inclusion.NON_NULL)
public class CommandReport {
+ int exitCode;
private String role;
private String actionId;
private String stdout;
private String stderr;
private String structuredOut;
private String status;
- int exitCode;
private String clusterName;
private String serviceName;
private long taskId;
private String roleCommand;
-
+ private Map<String, String> allocatedPorts;
private Map<String, Map<String, String>> configurationTags;
@JsonProperty("taskId")
@@ -54,13 +54,13 @@ public class CommandReport {
}
@JsonProperty("clusterName")
- public void setClusterName(String clusterName) {
- this.clusterName = clusterName;
+ public String getClusterName() {
+ return this.clusterName;
}
@JsonProperty("clusterName")
- public String getClusterName() {
- return this.clusterName;
+ public void setClusterName(String clusterName) {
+ this.clusterName = clusterName;
}
@JsonProperty("actionId")
@@ -108,7 +108,6 @@ public class CommandReport {
return this.structuredOut;
}
-
@JsonProperty("structuredOut")
public void setStructuredOut(String structuredOut) {
this.structuredOut = structuredOut;
@@ -154,21 +153,28 @@ public class CommandReport {
this.serviceName = serviceName;
}
- /**
- * @param tags the config tags that match this command
- */
+ /** @return the config tags that match this command, or <code>null</code> if none are present */
@JsonProperty("configurationTags")
- public void setConfigurationTags(Map<String, Map<String,String>> tags) {
- configurationTags = tags;
+ public Map<String, Map<String, String>> getConfigurationTags() {
+ return configurationTags;
}
- /**
- * @return the config tags that match this command, or <code>null</code>
- * if none are present
- */
+ /** @param tags the config tags that match this command */
@JsonProperty("configurationTags")
- public Map<String, Map<String,String>> getConfigurationTags() {
- return configurationTags;
+ public void setConfigurationTags(Map<String, Map<String, String>> tags) {
+ configurationTags = tags;
+ }
+
+ /** @return the config tags that match this command, or <code>null</code> if none are present */
+ @JsonProperty("allocatedPorts")
+ public Map<String, String> getAllocatedPorts() {
+ return allocatedPorts;
+ }
+
+ /** @param ports allocated ports */
+ @JsonProperty("allocatedPorts")
+ public void setAllocatedPorts(Map<String, String> ports) {
+ allocatedPorts = ports;
}
@Override
Modified: incubator/slider/trunk/slider-core/src/test/java/org/apache/slider/providers/agent/TestAgentProviderService.java
URL: http://svn.apache.org/viewvc/incubator/slider/trunk/slider-core/src/test/java/org/apache/slider/providers/agent/TestAgentProviderService.java?rev=1596105&r1=1596104&r2=1596105&view=diff
==============================================================================
--- incubator/slider/trunk/slider-core/src/test/java/org/apache/slider/providers/agent/TestAgentProviderService.java (original)
+++ incubator/slider/trunk/slider-core/src/test/java/org/apache/slider/providers/agent/TestAgentProviderService.java Tue May 20 03:09:30 2014
@@ -18,7 +18,9 @@
package org.apache.slider.providers.agent;
-import junit.framework.TestCase;
+import org.apache.slider.server.appmaster.web.rest.agent.CommandReport;
+import org.junit.Assert;
+import org.junit.Test;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.FilterFileSystem;
import org.apache.hadoop.fs.Path;
@@ -44,6 +46,7 @@ import org.apache.slider.core.conf.ConfT
import org.apache.slider.core.conf.MapOperations;
import org.apache.slider.core.exceptions.SliderException;
import org.apache.slider.core.launch.ContainerLauncher;
+import org.apache.slider.providers.agent.application.metadata.CommandOrder;
import org.apache.slider.providers.agent.application.metadata.Component;
import org.apache.slider.providers.agent.application.metadata.Export;
import org.apache.slider.providers.agent.application.metadata.ExportGroup;
@@ -54,7 +57,6 @@ import org.apache.slider.server.appmaste
import org.apache.slider.server.appmaster.model.mock.MockFileSystem;
import org.apache.slider.server.appmaster.model.mock.MockNodeId;
import org.apache.slider.server.appmaster.state.AppState;
-import org.apache.slider.server.appmaster.state.RoleInstance;
import org.apache.slider.server.appmaster.state.StateAccessForProviders;
import org.apache.slider.server.appmaster.web.rest.agent.ComponentStatus;
import org.apache.slider.server.appmaster.web.rest.agent.HeartBeat;
@@ -120,6 +122,16 @@ public class TestAgentProviderService {
+ " </exports>\n"
+ " </exportGroup>\n"
+ " </exportGroups>\n"
+ + " <commandOrders>\n"
+ + " <commandOrder>\n"
+ + " <command>HBASE_REGIONSERVER-START</command>\n"
+ + " <requires>HBASE_MASTER-STARTED</requires>\n"
+ + " </commandOrder>\n"
+ + " <commandOrder>\n"
+ + " <command>A-START</command>\n"
+ + " <requires>B-STARTED</requires>\n"
+ + " </commandOrder>\n"
+ + " </commandOrders>\n"
+ " <components>\n"
+ " <component>\n"
+ " <name>HBASE_MASTER</name>\n"
@@ -194,7 +206,9 @@ public class TestAgentProviderService {
AgentProviderService mockAps = Mockito.spy(aps);
doReturn(access).when(mockAps).getStateAccessor();
doReturn("scripts/hbase_master.py").when(mockAps).getScriptPathFromMetainfo(anyString());
- doReturn(new Metainfo()).when(mockAps).getApplicationMetainfo(any(SliderFileSystem.class), anyString());
+ Metainfo metainfo = new Metainfo();
+ metainfo.addService(new Service());
+ doReturn(metainfo).when(mockAps).getApplicationMetainfo(any(SliderFileSystem.class), anyString());
try {
doReturn(true).when(mockAps).isMaster(anyString());
@@ -240,14 +254,14 @@ public class TestAgentProviderService {
reg.setResponseId(0);
reg.setHostname("mockcontainer_1___HBASE_MASTER");
RegistrationResponse resp = mockAps.handleRegistration(reg);
- TestCase.assertEquals(0, resp.getResponseId());
- TestCase.assertEquals(RegistrationStatus.OK, resp.getResponseStatus());
+ Assert.assertEquals(0, resp.getResponseId());
+ Assert.assertEquals(RegistrationStatus.OK, resp.getResponseStatus());
HeartBeat hb = new HeartBeat();
hb.setResponseId(1);
hb.setHostname("mockcontainer_1___HBASE_MASTER");
HeartBeatResponse hbr = mockAps.handleHeartBeat(hb);
- TestCase.assertEquals(2, hbr.getResponseId());
+ Assert.assertEquals(2, hbr.getResponseId());
}
@Test
@@ -257,8 +271,8 @@ public class TestAgentProviderService {
@Override
public ClusterDescription getClusterStatus() {
ClusterDescription cd = new ClusterDescription();
- cd.status = new HashMap<String,Object>();
- Map<String, Map<String,ClusterNode>> roleMap = new HashMap<>();
+ cd.status = new HashMap<String, Object>();
+ Map<String, Map<String, ClusterNode>> roleMap = new HashMap<>();
ClusterNode cn1 = new ClusterNode(new MyContainerId(1));
cn1.host = "FIRST_HOST";
Map<String, ClusterNode> map1 = new HashMap<>();
@@ -293,9 +307,9 @@ public class TestAgentProviderService {
aps.setStateAccessor(appState);
Map<String, String> tokens = new HashMap<String, String>();
aps.addRoleRelatedTokens(tokens);
- TestCase.assertEquals(2, tokens.size());
- TestCase.assertEquals("FIRST_HOST", tokens.get("${FIRST_ROLE_HOST}"));
- TestCase.assertEquals("THIRD_HOST,SECOND_HOST", tokens.get("${SECOND_ROLE_HOST}"));
+ Assert.assertEquals(2, tokens.size());
+ Assert.assertEquals("FIRST_HOST", tokens.get("${FIRST_ROLE_HOST}"));
+ Assert.assertEquals("THIRD_HOST,SECOND_HOST", tokens.get("${SECOND_ROLE_HOST}"));
aps.close();
}
@@ -398,6 +412,21 @@ public class TestAgentProviderService {
}
assert found == 2;
+ List<CommandOrder> cmdOrders = service.getCommandOrder();
+ assert cmdOrders.size() == 2;
+ found = 0;
+ for (CommandOrder co : service.getCommandOrder()) {
+ if (co.getCommand().equals("HBASE_REGIONSERVER-START")) {
+ Assert.assertTrue(co.getRequires().equals("HBASE_MASTER-STARTED"));
+ found++;
+ }
+ if (co.getCommand().equals("A-START")) {
+ assert co.getRequires().equals("B-STARTED");
+ found++;
+ }
+ }
+ assert found == 2;
+
AgentProviderService aps = new AgentProviderService();
AgentProviderService mockAps = Mockito.spy(aps);
doReturn(metainfo).when(mockAps).getMetainfo();
@@ -418,6 +447,276 @@ public class TestAgentProviderService {
assert metainfo == null;
}
+ @Test
+ public void testOrchastratedAppStart() throws IOException {
+ // App has two components HBASE_MASTER and HBASE_REGIONSERVER
+ // Start of HBASE_RS depends on the start of HBASE_MASTER
+ InputStream metainfo_1 = new ByteArrayInputStream(metainfo_1_str.getBytes());
+ Metainfo metainfo = new MetainfoParser().parse(metainfo_1);
+ ConfTree tree = new ConfTree();
+ tree.global.put(OptionKeys.INTERNAL_APPLICATION_IMAGE_PATH, ".");
+
+ AgentProviderService aps = new AgentProviderService();
+ ContainerLaunchContext ctx = createNiceMock(ContainerLaunchContext.class);
+ AggregateConf instanceDefinition = new AggregateConf();
+
+ instanceDefinition.setInternal(tree);
+ instanceDefinition.setAppConf(tree);
+ instanceDefinition.getAppConfOperations().getGlobalOptions().put(AgentKeys.APP_DEF, ".");
+ instanceDefinition.getAppConfOperations().getGlobalOptions().put(AgentKeys.AGENT_CONF, ".");
+ instanceDefinition.getAppConfOperations().getGlobalOptions().put(AgentKeys.AGENT_VERSION, ".");
+
+ Container container = createNiceMock(Container.class);
+ String role_hm = "HBASE_MASTER";
+ String role_hrs = "HBASE_REGIONSERVER";
+ SliderFileSystem sliderFileSystem = createNiceMock(SliderFileSystem.class);
+ ContainerLauncher launcher = createNiceMock(ContainerLauncher.class);
+ Path generatedConfPath = new Path(".", "test");
+ MapOperations resourceComponent = new MapOperations();
+ MapOperations appComponent = new MapOperations();
+ Path containerTmpDirPath = new Path(".", "test");
+ FileSystem mockFs = new MockFileSystem();
+ expect(sliderFileSystem.getFileSystem())
+ .andReturn(new FilterFileSystem(mockFs)).anyTimes();
+ expect(sliderFileSystem.createAmResource(anyObject(Path.class),
+ anyObject(LocalResourceType.class)))
+ .andReturn(createNiceMock(LocalResource.class)).anyTimes();
+ expect(container.getId()).andReturn(new MockContainerId(1)).anyTimes();
+ expect(container.getNodeId()).andReturn(new MockNodeId("localhost")).anyTimes();
+ StateAccessForProviders access = createNiceMock(StateAccessForProviders.class);
+
+ AgentProviderService mockAps = Mockito.spy(aps);
+ doReturn(access).when(mockAps).getStateAccessor();
+ doReturn(metainfo).when(mockAps).getApplicationMetainfo(any(SliderFileSystem.class), anyString());
+
+ try {
+ doReturn(true).when(mockAps).isMaster(anyString());
+ doNothing().when(mockAps).addInstallCommand(
+ anyString(),
+ anyString(),
+ any(HeartBeatResponse.class),
+ anyString());
+ doNothing().when(mockAps).addStartCommand(
+ anyString(),
+ anyString(),
+ any(HeartBeatResponse.class),
+ anyString());
+ doNothing().when(mockAps).addGetConfigCommand(
+ anyString(),
+ anyString(),
+ any(HeartBeatResponse.class));
+ } catch (SliderException e) {
+ }
+
+ expect(access.isApplicationLive()).andReturn(true).anyTimes();
+ ClusterDescription desc = new ClusterDescription();
+ desc.setInfo(StatusKeys.INFO_AM_HOSTNAME, "host1");
+ desc.setInfo(StatusKeys.INFO_AM_WEB_PORT, "8088");
+ desc.setInfo(OptionKeys.APPLICATION_NAME, "HBASE");
+ expect(access.getClusterStatus()).andReturn(desc).anyTimes();
+
+ AggregateConf aggConf = new AggregateConf();
+ ConfTreeOperations treeOps = aggConf.getAppConfOperations();
+ treeOps.getOrAddComponent("HBASE_MASTER").put(AgentKeys.WAIT_HEARTBEAT, "0");
+ treeOps.getOrAddComponent("HBASE_REGIONSERVER").put(AgentKeys.WAIT_HEARTBEAT, "0");
+ expect(access.getInstanceDefinitionSnapshot()).andReturn(aggConf).anyTimes();
+ replay(access, ctx, container, sliderFileSystem);
+
+ // build two containers
+ try {
+ mockAps.buildContainerLaunchContext(launcher,
+ instanceDefinition,
+ container,
+ role_hm,
+ sliderFileSystem,
+ generatedConfPath,
+ resourceComponent,
+ appComponent,
+ containerTmpDirPath);
+
+ mockAps.buildContainerLaunchContext(launcher,
+ instanceDefinition,
+ container,
+ role_hrs,
+ sliderFileSystem,
+ generatedConfPath,
+ resourceComponent,
+ appComponent,
+ containerTmpDirPath);
+
+ // Both containers register
+ Register reg = new Register();
+ reg.setResponseId(0);
+ reg.setHostname("mockcontainer_1___HBASE_MASTER");
+ RegistrationResponse resp = mockAps.handleRegistration(reg);
+ Assert.assertEquals(0, resp.getResponseId());
+ Assert.assertEquals(RegistrationStatus.OK, resp.getResponseStatus());
+
+ reg = new Register();
+ reg.setResponseId(0);
+ reg.setHostname("mockcontainer_1___HBASE_REGIONSERVER");
+ resp = mockAps.handleRegistration(reg);
+ Assert.assertEquals(0, resp.getResponseId());
+ Assert.assertEquals(RegistrationStatus.OK, resp.getResponseStatus());
+
+ // Both issue install command
+ HeartBeat hb = new HeartBeat();
+ hb.setResponseId(1);
+ hb.setHostname("mockcontainer_1___HBASE_MASTER");
+ HeartBeatResponse hbr = mockAps.handleHeartBeat(hb);
+ Assert.assertEquals(2, hbr.getResponseId());
+ Mockito.verify(mockAps, Mockito.times(1)).addInstallCommand(anyString(),
+ anyString(),
+ any(HeartBeatResponse.class),
+ anyString());
+
+ hb = new HeartBeat();
+ hb.setResponseId(1);
+ hb.setHostname("mockcontainer_1___HBASE_REGIONSERVER");
+ hbr = mockAps.handleHeartBeat(hb);
+ Assert.assertEquals(2, hbr.getResponseId());
+ Mockito.verify(mockAps, Mockito.times(2)).addInstallCommand(anyString(),
+ anyString(),
+ any(HeartBeatResponse.class),
+ anyString());
+ // RS succeeds install but does not start
+ hb = new HeartBeat();
+ hb.setResponseId(2);
+ hb.setHostname("mockcontainer_1___HBASE_REGIONSERVER");
+ CommandReport cr = new CommandReport();
+ cr.setRole("HBASE_REGIONSERVER");
+ cr.setRoleCommand("INSTALL");
+ cr.setStatus("COMPLETED");
+ hb.setReports(Arrays.asList(cr));
+ hbr = mockAps.handleHeartBeat(hb);
+ Assert.assertEquals(3, hbr.getResponseId());
+ Mockito.verify(mockAps, Mockito.times(0)).addStartCommand(anyString(),
+ anyString(),
+ any(HeartBeatResponse.class),
+ anyString());
+ // RS still does not start
+ hb = new HeartBeat();
+ hb.setResponseId(3);
+ hb.setHostname("mockcontainer_1___HBASE_REGIONSERVER");
+ hbr = mockAps.handleHeartBeat(hb);
+ Assert.assertEquals(4, hbr.getResponseId());
+ Mockito.verify(mockAps, Mockito.times(0)).addStartCommand(anyString(),
+ anyString(),
+ any(HeartBeatResponse.class),
+ anyString());
+
+ // MASTER succeeds install and issues start
+ hb = new HeartBeat();
+ hb.setResponseId(2);
+ hb.setHostname("mockcontainer_1___HBASE_MASTER");
+ cr = new CommandReport();
+ cr.setRole("HBASE_MASTER");
+ cr.setRoleCommand("INSTALL");
+ cr.setStatus("COMPLETED");
+ Map<String, String> ap = new HashMap<>();
+ ap.put("a.port", "10233");
+ cr.setAllocatedPorts(ap);
+ hb.setReports(Arrays.asList(cr));
+ hbr = mockAps.handleHeartBeat(hb);
+ Assert.assertEquals(3, hbr.getResponseId());
+ Mockito.verify(mockAps, Mockito.times(1)).addStartCommand(anyString(),
+ anyString(),
+ any(HeartBeatResponse.class),
+ anyString());
+ Map<String, String> allocatedPorts = mockAps.getAllocatedPorts();
+ Assert.assertTrue(allocatedPorts != null);
+ Assert.assertTrue(allocatedPorts.size() == 1);
+ Assert.assertTrue(allocatedPorts.containsKey("a.port"));
+
+ // RS still does not start
+ hb = new HeartBeat();
+ hb.setResponseId(4);
+ hb.setHostname("mockcontainer_1___HBASE_REGIONSERVER");
+ hbr = mockAps.handleHeartBeat(hb);
+ Assert.assertEquals(5, hbr.getResponseId());
+ Mockito.verify(mockAps, Mockito.times(1)).addStartCommand(anyString(),
+ anyString(),
+ any(HeartBeatResponse.class),
+ anyString());
+ // MASTER succeeds start
+ hb = new HeartBeat();
+ hb.setResponseId(3);
+ hb.setHostname("mockcontainer_1___HBASE_MASTER");
+ cr = new CommandReport();
+ cr.setRole("HBASE_MASTER");
+ cr.setRoleCommand("START");
+ cr.setStatus("COMPLETED");
+ hb.setReports(Arrays.asList(cr));
+ mockAps.handleHeartBeat(hb);
+ Mockito.verify(mockAps, Mockito.times(1)).addGetConfigCommand(anyString(),
+ anyString(),
+ any(HeartBeatResponse.class));
+
+ // RS starts now
+ hb = new HeartBeat();
+ hb.setResponseId(5);
+ hb.setHostname("mockcontainer_1___HBASE_REGIONSERVER");
+ hbr = mockAps.handleHeartBeat(hb);
+ Assert.assertEquals(6, hbr.getResponseId());
+ Mockito.verify(mockAps, Mockito.times(2)).addStartCommand(anyString(),
+ anyString(),
+ any(HeartBeatResponse.class),
+ anyString());
+ } catch (SliderException he) {
+ log.warn(he.getMessage());
+ } catch (IOException ioe) {
+ log.warn(ioe.getMessage());
+ }
+ }
+
+
+ @Test
+ public void testAddStartCommand() throws Exception {
+ AgentProviderService aps = new AgentProviderService();
+ HeartBeatResponse hbr = new HeartBeatResponse();
+
+ StateAccessForProviders access = createNiceMock(StateAccessForProviders.class);
+ AgentProviderService mockAps = Mockito.spy(aps);
+ doReturn(access).when(mockAps).getStateAccessor();
+
+ AggregateConf aggConf = new AggregateConf();
+ ConfTreeOperations treeOps = aggConf.getAppConfOperations();
+ treeOps.getGlobalOptions().put(AgentKeys.JAVA_HOME, "java_home");
+ treeOps.set(OptionKeys.APPLICATION_NAME, "HBASE");
+ treeOps.set("site.fs.defaultFS", "hdfs://HOST1:8020/");
+ treeOps.set(OptionKeys.ZOOKEEPER_HOSTS, "HOST1");
+ treeOps.set("config_types", "hbase-site");
+ treeOps.getGlobalOptions().put("site.hbase-site.a.port", "${HBASE_MASTER.ALLOCATED_PORT}");
+ treeOps.getGlobalOptions().put("site.hbase-site.b.port", "${HBASE_MASTER.ALLOCATED_PORT}");
+
+ expect(access.getAppConfSnapshot()).andReturn(treeOps).anyTimes();
+ expect(access.getInternalsSnapshot()).andReturn(treeOps).anyTimes();
+ expect(access.isApplicationLive()).andReturn(true).anyTimes();
+
+ doReturn("HOST1").when(mockAps).getClusterInfoPropertyValue(anyString());
+
+ Map<String, Map<String, ClusterNode>> roleClusterNodeMap = new HashMap<>();
+ Map<String, ClusterNode> container = new HashMap<>();
+ ClusterNode cn1 = new ClusterNode(new MyContainerId(1));
+ cn1.host = "HOST1";
+ container.put("cid1", cn1);
+ roleClusterNodeMap.put("HBASE_MASTER", container);
+ doReturn(roleClusterNodeMap).when(mockAps).getRoleClusterNodeMapping();
+ Map<String, String> allocatedPorts = new HashMap<>();
+ allocatedPorts.put("a.port", "10023");
+ allocatedPorts.put("b.port", "10024");
+ doReturn(allocatedPorts).when(mockAps).getAllocatedPorts();
+
+ replay(access);
+
+ mockAps.addStartCommand("HBASE_MASTER", "cid1", hbr, "");
+ Assert.assertTrue(hbr.getExecutionCommands().get(0).getConfigurations().containsKey("hbase-site"));
+ Map<String, String> hbaseSiteConf = hbr.getExecutionCommands().get(0).getConfigurations().get("hbase-site");
+ Assert.assertTrue(hbaseSiteConf.containsKey("a.port"));
+ Assert.assertTrue(hbaseSiteConf.get("a.port").equals("10023"));
+ Assert.assertTrue(hbaseSiteConf.get("b.port").equals("10024"));
+ }
+
private static class MyContainer extends Container {
ContainerId cid = null;
Added: incubator/slider/trunk/slider-core/src/test/java/org/apache/slider/providers/agent/TestComponentCommandOrder.java
URL: http://svn.apache.org/viewvc/incubator/slider/trunk/slider-core/src/test/java/org/apache/slider/providers/agent/TestComponentCommandOrder.java?rev=1596105&view=auto
==============================================================================
--- incubator/slider/trunk/slider-core/src/test/java/org/apache/slider/providers/agent/TestComponentCommandOrder.java (added)
+++ incubator/slider/trunk/slider-core/src/test/java/org/apache/slider/providers/agent/TestComponentCommandOrder.java Tue May 20 03:09:30 2014
@@ -0,0 +1,151 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.slider.providers.agent;
+
+import org.apache.slider.providers.agent.application.metadata.CommandOrder;
+import org.junit.Assert;
+import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.Arrays;
+
+public class TestComponentCommandOrder {
+ protected static final Logger log =
+ LoggerFactory.getLogger(TestComponentCommandOrder.class);
+
+ @Test
+ public void testComponentCommandOrder() throws Exception {
+ CommandOrder co1 = new CommandOrder();
+ co1.setCommand("A-START");
+ co1.setRequires("B-STARTED");
+ CommandOrder co2 = new CommandOrder();
+ co2.setCommand("A-START");
+ co2.setRequires("C-STARTED");
+ CommandOrder co3 = new CommandOrder();
+ co3.setCommand("B-START");
+ co3.setRequires("C-STARTED,D-STARTED,E-STARTED");
+
+ ComponentCommandOrder cco = new ComponentCommandOrder(Arrays.asList(co1, co2, co3));
+ ComponentInstanceState cisB = new ComponentInstanceState("B", "cid", "aid");
+ ComponentInstanceState cisC = new ComponentInstanceState("C", "cid", "aid");
+ ComponentInstanceState cisD = new ComponentInstanceState("D", "cid", "aid");
+ ComponentInstanceState cisE = new ComponentInstanceState("E", "cid", "aid");
+ ComponentInstanceState cisE2 = new ComponentInstanceState("E", "cid", "aid");
+ cisB.setState(State.STARTED);
+ cisC.setState(State.INSTALLED);
+ Assert.assertTrue(cco.canExecute("A", Command.START, Arrays.asList(cisB)));
+ Assert.assertFalse(cco.canExecute("A", Command.START, Arrays.asList(cisB, cisC)));
+
+ cisC.setState(State.STARTING);
+ Assert.assertFalse(cco.canExecute("A", Command.START, Arrays.asList(cisB, cisC)));
+
+ cisC.setState(State.INSTALL_FAILED);
+ Assert.assertFalse(cco.canExecute("A", Command.START, Arrays.asList(cisB, cisC)));
+
+ cisD.setState(State.INSTALL_FAILED);
+ cisE.setState(State.STARTED);
+ Assert.assertTrue(cco.canExecute("E", Command.START, Arrays.asList(cisB, cisC, cisD, cisE)));
+
+ Assert.assertTrue(cco.canExecute("B", Command.INSTALL, Arrays.asList(cisB, cisC, cisD, cisE)));
+ Assert.assertFalse(cco.canExecute("B", Command.START, Arrays.asList(cisB, cisC, cisD, cisE)));
+
+ cisD.setState(State.INSTALLING);
+ Assert.assertFalse(cco.canExecute("B", Command.START, Arrays.asList(cisB, cisC, cisD, cisE)));
+
+ cisC.setState(State.STARTED);
+ cisD.setState(State.STARTED);
+ Assert.assertTrue(cco.canExecute("B", Command.START, Arrays.asList(cisB, cisC, cisD, cisE)));
+
+ cisE2.setState(State.INSTALLED);
+ Assert.assertFalse(cco.canExecute("B", Command.START, Arrays.asList(cisE, cisE2)));
+
+ cisE2.setState(State.STARTED);
+ Assert.assertTrue(cco.canExecute("B", Command.START, Arrays.asList(cisE, cisE2)));
+ }
+
+ @Test
+ public void testComponentCommandOrderBadInput() throws Exception {
+ CommandOrder co = new CommandOrder();
+ co.setCommand(" A-START");
+ co.setRequires("B-STARTED , C-STARTED");
+
+ ComponentInstanceState cisB = new ComponentInstanceState("B", "cid", "aid");
+ ComponentInstanceState cisC = new ComponentInstanceState("C", "cid", "aid");
+ cisB.setState(State.STARTED);
+ cisC.setState(State.STARTED);
+
+ ComponentCommandOrder cco = new ComponentCommandOrder(Arrays.asList(co));
+ Assert.assertTrue(cco.canExecute("A", Command.START, Arrays.asList(cisB, cisC)));
+
+ co.setCommand(" A-STAR");
+ co.setRequires("B-STARTED , C-STARTED");
+ try {
+ cco = new ComponentCommandOrder(Arrays.asList(co));
+ Assert.fail("Instantiation should have failed.");
+ } catch (IllegalArgumentException ie) {
+ log.info(ie.getMessage());
+ }
+
+ co.setCommand(" -START");
+ co.setRequires("B-STARTED , C-STARTED");
+ try {
+ cco = new ComponentCommandOrder(Arrays.asList(co));
+ Assert.fail("Instantiation should have failed.");
+ } catch (IllegalArgumentException ie) {
+ log.info(ie.getMessage());
+ }
+
+ co.setCommand(" A-START");
+ co.setRequires("B-STRTED , C-STARTED");
+ try {
+ cco = new ComponentCommandOrder(Arrays.asList(co));
+ Assert.fail("Instantiation should have failed.");
+ } catch (IllegalArgumentException ie) {
+ log.info(ie.getMessage());
+ }
+
+ co.setCommand(" A-START");
+ co.setRequires("B-STARTED , C-");
+ try {
+ cco = new ComponentCommandOrder(Arrays.asList(co));
+ Assert.fail("Instantiation should have failed.");
+ } catch (IllegalArgumentException ie) {
+ log.info(ie.getMessage());
+ }
+
+ co.setCommand(" A-INSTALL");
+ co.setRequires("B-STARTED");
+ try {
+ cco = new ComponentCommandOrder(Arrays.asList(co));
+ Assert.fail("Instantiation should have failed.");
+ } catch (IllegalArgumentException ie) {
+ log.info(ie.getMessage());
+ }
+
+ co.setCommand(" A-START");
+ co.setRequires("B-INSTALLED");
+ try {
+ cco = new ComponentCommandOrder(Arrays.asList(co));
+ Assert.fail("Instantiation should have failed.");
+ } catch (IllegalArgumentException ie) {
+ log.info(ie.getMessage());
+ }
+ }
+}