You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ranger.apache.org by rm...@apache.org on 2017/01/05 21:42:35 UTC
incubator-ranger git commit: RANGER-1285:ranger admin support storm HA
Repository: incubator-ranger
Updated Branches:
refs/heads/master 05e179d44 -> 31118afd4
RANGER-1285:ranger admin support storm HA
Signed-off-by: rmani <rm...@hortonworks.com>
Project: http://git-wip-us.apache.org/repos/asf/incubator-ranger/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-ranger/commit/31118afd
Tree: http://git-wip-us.apache.org/repos/asf/incubator-ranger/tree/31118afd
Diff: http://git-wip-us.apache.org/repos/asf/incubator-ranger/diff/31118afd
Branch: refs/heads/master
Commit: 31118afd4f0d10ad65faadfce69e0c2755cee030
Parents: 05e179d
Author: zhangqiang2 <zh...@zte.com.cn>
Authored: Wed Dec 28 15:34:18 2016 +0800
Committer: rmani <rm...@hortonworks.com>
Committed: Thu Jan 5 12:49:36 2017 -0800
----------------------------------------------------------------------
.../services/storm/client/StormClient.java | 135 ++++++++++++-------
1 file changed, 86 insertions(+), 49 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-ranger/blob/31118afd/storm-agent/src/main/java/org/apache/ranger/services/storm/client/StormClient.java
----------------------------------------------------------------------
diff --git a/storm-agent/src/main/java/org/apache/ranger/services/storm/client/StormClient.java b/storm-agent/src/main/java/org/apache/ranger/services/storm/client/StormClient.java
index e752362..cb56609 100644
--- a/storm-agent/src/main/java/org/apache/ranger/services/storm/client/StormClient.java
+++ b/storm-agent/src/main/java/org/apache/ranger/services/storm/client/StormClient.java
@@ -84,106 +84,143 @@ public class StormClient {
}
public List<String> getTopologyList(final String topologyNameMatching, final List<String> stormTopologyList) {
-
- LOG.debug("Getting Storm topology list for topologyNameMatching : " +
- topologyNameMatching);
+
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Getting Storm topology list for topologyNameMatching : " + topologyNameMatching);
+ }
final String errMsg = errMessage;
-
+
PrivilegedAction<ArrayList<String>> topologyListGetter = new PrivilegedAction<ArrayList<String>>() {
@Override
public ArrayList<String> run() {
-
- ArrayList<String> lret = new ArrayList<String>();
-
- String url = stormUIUrl + TOPOLOGY_LIST_API_ENDPOINT;
-
- Client client = null;
+ if (stormUIUrl == null || stormUIUrl.trim().isEmpty()) {
+ return null;
+ }
+
+ String[] stormUIUrls = stormUIUrl.trim().split("[,;]");
+ if (stormUIUrls == null || stormUIUrls.length == 0) {
+ return null;
+ }
+
+ Client client = Client.create();
ClientResponse response = null;
-
+ for (String currentUrl : stormUIUrls) {
+ if (currentUrl == null || currentUrl.trim().isEmpty()) {
+ continue;
+ }
+
+ String url = currentUrl.trim() + TOPOLOGY_LIST_API_ENDPOINT;
+ try {
+ response = getTopologyResponse(url, client);
+
+ if (response != null) {
+ if (response.getStatus() == 200) {
+ break;
+ } else {
+ response.close();
+ }
+ }
+ } catch (Throwable t) {
+ String msgDesc = "Exception while getting topology list." + " URL : " + url;
+ LOG.error(msgDesc, t);
+ }
+ }
+
+ ArrayList<String> lret = new ArrayList<String>();
try {
- client = Client.create();
-
- WebResource webResource = client.resource(url);
-
- response = webResource.accept(EXPECTED_MIME_TYPE)
- .get(ClientResponse.class);
-
- LOG.debug("getTopologyList():calling " + url);
-
if (response != null) {
- LOG.debug("getTopologyList():response.getStatus()= " + response.getStatus());
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("getTopologyList():response.getStatus()= " + response.getStatus());
+ }
if (response.getStatus() == 200) {
String jsonString = response.getEntity(String.class);
Gson gson = new GsonBuilder().setPrettyPrinting().create();
TopologyListResponse topologyListResponse = gson.fromJson(jsonString, TopologyListResponse.class);
if (topologyListResponse != null) {
if (topologyListResponse.getTopologyList() != null) {
- for(Topology topology : topologyListResponse.getTopologyList()) {
+ for (Topology topology : topologyListResponse.getTopologyList()) {
String topologyName = topology.getName();
- if ( stormTopologyList != null && stormTopologyList.contains(topologyName)) {
- continue;
- }
- LOG.debug("getTopologyList():Found topology " + topologyName);
- LOG.debug("getTopologyList():topology Name=[" + topology.getName() + "], topologyNameMatching=[" + topologyNameMatching + "], existingStormTopologyList=[" + stormTopologyList + "]");
+ if (stormTopologyList != null && stormTopologyList.contains(topologyName)) {
+ continue;
+ }
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("getTopologyList():Found topology " + topologyName);
+ LOG.debug("getTopologyList():topology Name=[" + topology.getName()
+ + "], topologyNameMatching=[" + topologyNameMatching
+ + "], existingStormTopologyList=[" + stormTopologyList + "]");
+ }
if (topologyName != null) {
if (topologyNameMatching == null || topologyNameMatching.isEmpty() || FilenameUtils.wildcardMatch(topology.getName(), topologyNameMatching + "*")) {
- LOG.debug("getTopologyList():Adding topology " + topologyName);
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("getTopologyList():Adding topology " + topologyName);
+ }
lret.add(topologyName);
}
}
}
}
}
- } else{
- LOG.info("getTopologyList():response.getStatus()= " + response.getStatus() + " for URL " + url + ", so returning null list");
- String jsonString = response.getEntity(String.class);
- LOG.info(jsonString);
- lret = null;
}
} else {
- String msgDesc = "Unable to get a valid response for "
- + "expected mime type : [" + EXPECTED_MIME_TYPE
- + "] URL : " + url + " - got null response.";
+ String msgDesc = "Unable to get a valid response for " + "expected mime type : ["
+ + EXPECTED_MIME_TYPE + "] URL : " + stormUIUrl + " - got null response.";
LOG.error(msgDesc);
HadoopException hdpException = new HadoopException(msgDesc);
- hdpException.generateResponseDataMap(false, msgDesc,
- msgDesc + errMsg, null, null);
+ hdpException.generateResponseDataMap(false, msgDesc, msgDesc + errMsg, null, null);
throw hdpException;
}
} catch (HadoopException he) {
throw he;
} catch (Throwable t) {
- String msgDesc = "Exception while getting Storm TopologyList."
- + " URL : " + url;
- HadoopException hdpException = new HadoopException(msgDesc,
- t);
+ String msgDesc = "Exception while getting Storm TopologyList." + " URL : " + stormUIUrl;
+ HadoopException hdpException = new HadoopException(msgDesc, t);
LOG.error(msgDesc, t);
- hdpException.generateResponseDataMap(false,
- BaseClient.getMessage(t), msgDesc + errMsg, null,
- null);
+ hdpException.generateResponseDataMap(false, BaseClient.getMessage(t), msgDesc + errMsg, null, null);
throw hdpException;
-
} finally {
if (response != null) {
response.close();
}
-
+
if (client != null) {
client.destroy();
}
-
}
return lret;
}
+
+ private ClientResponse getTopologyResponse(String url, Client client) {
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("getTopologyResponse():calling " + url);
+ }
+
+ WebResource webResource = client.resource(url);
+
+ ClientResponse response = webResource.accept(EXPECTED_MIME_TYPE).get(ClientResponse.class);
+
+ if (response != null) {
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("getTopologyResponse():response.getStatus()= " + response.getStatus());
+ }
+ if (response.getStatus() != 200) {
+ LOG.info("getTopologyResponse():response.getStatus()= " + response.getStatus() + " for URL "
+ + url + ", failed to get topology list");
+ String jsonString = response.getEntity(String.class);
+ LOG.info(jsonString);
+ }
+ }
+ return response;
+ }
};
+
List<String> ret = null;
try {
ret = executeUnderKerberos(this.userName, this.password, this.lookupPrincipal, this.lookupKeytab, this.nameRules, topologyListGetter);
} catch (IOException e) {
LOG.error("Unable to get Topology list from [" + stormUIUrl + "]", e);
}
-
+
return ret;
}