You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@atlas.apache.org by ma...@apache.org on 2017/09/27 04:25:29 UTC
[5/7] atlas git commit: ATLAS-2179: Split Atlas client library to
avoid unnecessary dependencies
http://git-wip-us.apache.org/repos/asf/atlas/blob/187730dd/client/common/src/main/java/org/apache/atlas/security/SecureClientUtils.java
----------------------------------------------------------------------
diff --git a/client/common/src/main/java/org/apache/atlas/security/SecureClientUtils.java b/client/common/src/main/java/org/apache/atlas/security/SecureClientUtils.java
new file mode 100644
index 0000000..d5392b2
--- /dev/null
+++ b/client/common/src/main/java/org/apache/atlas/security/SecureClientUtils.java
@@ -0,0 +1,248 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.atlas.security;
+
+import com.sun.jersey.api.client.config.DefaultClientConfig;
+import com.sun.jersey.client.urlconnection.HttpURLConnectionFactory;
+import com.sun.jersey.client.urlconnection.URLConnectionClientHandler;
+import org.apache.atlas.AtlasException;
+import org.apache.commons.lang.StringUtils;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.security.UserGroupInformation;
+import org.apache.hadoop.security.alias.CredentialProviderFactory;
+import org.apache.hadoop.security.authentication.client.Authenticator;
+import org.apache.hadoop.security.authentication.client.ConnectionConfigurator;
+import org.apache.hadoop.security.ssl.SSLFactory;
+import org.apache.hadoop.security.token.delegation.web.DelegationTokenAuthenticatedURL;
+import org.apache.hadoop.security.token.delegation.web.DelegationTokenAuthenticator;
+import org.apache.hadoop.security.token.delegation.web.KerberosDelegationTokenAuthenticator;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.net.ssl.HostnameVerifier;
+import javax.net.ssl.HttpsURLConnection;
+import javax.net.ssl.SSLSocketFactory;
+import java.io.File;
+import java.io.FileWriter;
+import java.io.IOException;
+import java.net.HttpURLConnection;
+import java.net.URL;
+import java.net.URLConnection;
+import java.security.GeneralSecurityException;
+import java.security.PrivilegedExceptionAction;
+
+import static org.apache.atlas.security.SecurityProperties.CERT_STORES_CREDENTIAL_PROVIDER_PATH;
+import static org.apache.atlas.security.SecurityProperties.CLIENT_AUTH_KEY;
+import static org.apache.atlas.security.SecurityProperties.KEYSTORE_FILE_KEY;
+import static org.apache.atlas.security.SecurityProperties.TRUSTSTORE_FILE_KEY;
+
+/**
+ *
+ */
+public class SecureClientUtils {
+
+ public final static int DEFAULT_SOCKET_TIMEOUT_IN_MSECS = 1 * 60 * 1000; // 1 minute
+ private static final Logger LOG = LoggerFactory.getLogger(SecureClientUtils.class);
+
+
+ public static URLConnectionClientHandler getClientConnectionHandler(DefaultClientConfig config,
+ org.apache.commons.configuration.Configuration clientConfig, String doAsUser,
+ final UserGroupInformation ugi) {
+ config.getProperties().put(URLConnectionClientHandler.PROPERTY_HTTP_URL_CONNECTION_SET_METHOD_WORKAROUND, true);
+ Configuration conf = new Configuration();
+ conf.addResource(conf.get(SSLFactory.SSL_CLIENT_CONF_KEY, SecurityProperties.SSL_CLIENT_PROPERTIES));
+ UserGroupInformation.setConfiguration(conf);
+ final ConnectionConfigurator connConfigurator = newConnConfigurator(conf);
+
+ Authenticator authenticator = new KerberosDelegationTokenAuthenticator();
+
+ authenticator.setConnectionConfigurator(connConfigurator);
+ final DelegationTokenAuthenticator finalAuthenticator = (DelegationTokenAuthenticator) authenticator;
+ final DelegationTokenAuthenticatedURL.Token token = new DelegationTokenAuthenticatedURL.Token();
+ HttpURLConnectionFactory httpURLConnectionFactory = null;
+ try {
+ UserGroupInformation ugiToUse = ugi != null ? ugi : UserGroupInformation.getCurrentUser();
+ final UserGroupInformation actualUgi =
+ (ugiToUse.getAuthenticationMethod() == UserGroupInformation.AuthenticationMethod.PROXY)
+ ? ugiToUse.getRealUser() : ugiToUse;
+ LOG.info("Real User: {}, is from ticket cache? {}", actualUgi, actualUgi.isLoginTicketBased());
+ if (StringUtils.isEmpty(doAsUser)) {
+ doAsUser = actualUgi.getShortUserName();
+ }
+ LOG.info("doAsUser: {}", doAsUser);
+ final String finalDoAsUser = doAsUser;
+ httpURLConnectionFactory = new HttpURLConnectionFactory() {
+ @Override
+ public HttpURLConnection getHttpURLConnection(final URL url) throws IOException {
+ try {
+ return actualUgi.doAs(new PrivilegedExceptionAction<HttpURLConnection>() {
+ @Override
+ public HttpURLConnection run() throws Exception {
+ try {
+ return new DelegationTokenAuthenticatedURL(finalAuthenticator, connConfigurator)
+ .openConnection(url, token, finalDoAsUser);
+ } catch (Exception e) {
+ throw new IOException(e);
+ }
+ }
+ });
+ } catch (Exception e) {
+ if (e instanceof IOException) {
+ throw (IOException) e;
+ } else {
+ throw new IOException(e);
+ }
+ }
+ }
+ };
+ } catch (IOException e) {
+ LOG.warn("Error obtaining user", e);
+ }
+
+ return new URLConnectionClientHandler(httpURLConnectionFactory);
+ }
+
+ private final static ConnectionConfigurator DEFAULT_TIMEOUT_CONN_CONFIGURATOR = new ConnectionConfigurator() {
+ @Override
+ public HttpURLConnection configure(HttpURLConnection conn) throws IOException {
+ setTimeouts(conn, DEFAULT_SOCKET_TIMEOUT_IN_MSECS);
+ return conn;
+ }
+ };
+
+ private static ConnectionConfigurator newConnConfigurator(Configuration conf) {
+ try {
+ return newSslConnConfigurator(DEFAULT_SOCKET_TIMEOUT_IN_MSECS, conf);
+ } catch (Exception e) {
+ LOG.debug("Cannot load customized ssl related configuration. " + "Fallback to system-generic settings.", e);
+ return DEFAULT_TIMEOUT_CONN_CONFIGURATOR;
+ }
+ }
+
+ private static ConnectionConfigurator newSslConnConfigurator(final int timeout, Configuration conf)
+ throws IOException, GeneralSecurityException {
+ final SSLFactory factory;
+ final SSLSocketFactory sf;
+ final HostnameVerifier hv;
+
+ factory = new SSLFactory(SSLFactory.Mode.CLIENT, conf);
+ factory.init();
+ sf = factory.createSSLSocketFactory();
+ hv = factory.getHostnameVerifier();
+
+ return new ConnectionConfigurator() {
+ @Override
+ public HttpURLConnection configure(HttpURLConnection conn) throws IOException {
+ if (conn instanceof HttpsURLConnection) {
+ HttpsURLConnection c = (HttpsURLConnection) conn;
+ c.setSSLSocketFactory(sf);
+ c.setHostnameVerifier(hv);
+ }
+ setTimeouts(conn, timeout);
+ return conn;
+ }
+ };
+ }
+
+ private static void setTimeouts(URLConnection connection, int socketTimeout) {
+ connection.setConnectTimeout(socketTimeout);
+ connection.setReadTimeout(socketTimeout);
+ }
+
+ private static File getSSLClientFile() throws AtlasException {
+ String confLocation = System.getProperty("atlas.conf");
+ File sslDir;
+ try {
+ if (confLocation == null) {
+ String persistDir = null;
+ URL resource = SecureClientUtils.class.getResource("/");
+ if (resource != null) {
+ persistDir = resource.toURI().getPath();
+ }
+ assert persistDir != null;
+ sslDir = new File(persistDir);
+ } else {
+ sslDir = new File(confLocation);
+ }
+ LOG.info("ssl-client.xml will be created in {}", sslDir);
+ } catch (Exception e) {
+ throw new AtlasException("Failed to find client configuration directory", e);
+ }
+ return new File(sslDir, SecurityProperties.SSL_CLIENT_PROPERTIES);
+ }
+
+ public static void persistSSLClientConfiguration(org.apache.commons.configuration.Configuration clientConfig)
+ throws AtlasException, IOException {
+ //trust settings
+ Configuration configuration = new Configuration(false);
+ File sslClientFile = getSSLClientFile();
+ if (!sslClientFile.exists()) {
+ configuration.set("ssl.client.truststore.type", "jks");
+ configuration.set("ssl.client.truststore.location", clientConfig.getString(TRUSTSTORE_FILE_KEY));
+ if (clientConfig.getBoolean(CLIENT_AUTH_KEY, false)) {
+ // need to get client key properties
+ configuration.set("ssl.client.keystore.location", clientConfig.getString(KEYSTORE_FILE_KEY));
+ configuration.set("ssl.client.keystore.type", "jks");
+ }
+ // add the configured credential provider
+ configuration.set(CredentialProviderFactory.CREDENTIAL_PROVIDER_PATH,
+ clientConfig.getString(CERT_STORES_CREDENTIAL_PROVIDER_PATH));
+ String hostnameVerifier = clientConfig.getString(SSLFactory.SSL_HOSTNAME_VERIFIER_KEY);
+ if (hostnameVerifier != null) {
+ configuration.set(SSLFactory.SSL_HOSTNAME_VERIFIER_KEY, hostnameVerifier);
+ }
+
+ configuration.writeXml(new FileWriter(sslClientFile));
+ }
+ }
+
+ public static URLConnectionClientHandler getUrlConnectionClientHandler() {
+ return new URLConnectionClientHandler(new HttpURLConnectionFactory() {
+ @Override
+ public HttpURLConnection getHttpURLConnection(URL url)
+ throws IOException {
+ HttpURLConnection connection = (HttpURLConnection) url.openConnection();
+
+ if (connection instanceof HttpsURLConnection) {
+ LOG.debug("Attempting to configure HTTPS connection using client "
+ + "configuration");
+ final SSLFactory factory;
+ final SSLSocketFactory sf;
+ final HostnameVerifier hv;
+
+ try {
+ Configuration conf = new Configuration();
+ conf.addResource(conf.get(SSLFactory.SSL_CLIENT_CONF_KEY, SecurityProperties.SSL_CLIENT_PROPERTIES));
+ UserGroupInformation.setConfiguration(conf);
+
+ HttpsURLConnection c = (HttpsURLConnection) connection;
+ factory = new SSLFactory(SSLFactory.Mode.CLIENT, conf);
+ factory.init();
+ sf = factory.createSSLSocketFactory();
+ hv = factory.getHostnameVerifier();
+ c.setSSLSocketFactory(sf);
+ c.setHostnameVerifier(hv);
+ } catch (Exception e) {
+ LOG.info("Unable to configure HTTPS connection from "
+ + "configuration. Leveraging JDK properties.");
+ }
+ }
+ return connection;
+ }
+ });
+ }
+}
http://git-wip-us.apache.org/repos/asf/atlas/blob/187730dd/client/pom.xml
----------------------------------------------------------------------
diff --git a/client/pom.xml b/client/pom.xml
index 9d41176..b065833 100755
--- a/client/pom.xml
+++ b/client/pom.xml
@@ -24,58 +24,28 @@
<groupId>org.apache.atlas</groupId>
<version>1.0.0-SNAPSHOT</version>
</parent>
+
+ <!-- Sub modules -->
+ <modules>
+ <module>common</module>
+ <module>client-v1</module>
+ <module>client-v2</module>
+ </modules>
+
<artifactId>atlas-client</artifactId>
<description>Apache Atlas Client</description>
<name>Apache Atlas Client</name>
- <packaging>jar</packaging>
+ <packaging>pom</packaging>
<dependencies>
<dependency>
- <groupId>org.apache.atlas</groupId>
- <artifactId>atlas-typesystem</artifactId>
- </dependency>
-
- <!-- supports simple auth handler -->
- <dependency>
- <groupId>org.apache.httpcomponents</groupId>
- <artifactId>httpclient</artifactId>
- </dependency>
-
- <dependency>
- <groupId>org.apache.hadoop</groupId>
- <artifactId>hadoop-common</artifactId>
- </dependency>
-
- <dependency>
- <groupId>org.apache.hadoop</groupId>
- <artifactId>hadoop-annotations</artifactId>
- </dependency>
-
- <dependency>
- <groupId>org.apache.hadoop</groupId>
- <artifactId>hadoop-minikdc</artifactId>
- </dependency>
-
- <dependency>
- <groupId>com.sun.jersey</groupId>
- <artifactId>jersey-client</artifactId>
+ <groupId>org.mockito</groupId>
+ <artifactId>mockito-all</artifactId>
+ <scope>test</scope>
</dependency>
-
<dependency>
<groupId>org.testng</groupId>
<artifactId>testng</artifactId>
- </dependency>
-
- <dependency>
- <groupId>org.apache.atlas</groupId>
- <artifactId>atlas-typesystem</artifactId>
- <classifier>tests</classifier>
- <scope>test</scope>
- </dependency>
-
- <dependency>
- <groupId>org.mockito</groupId>
- <artifactId>mockito-all</artifactId>
<scope>test</scope>
</dependency>
</dependencies>
http://git-wip-us.apache.org/repos/asf/atlas/blob/187730dd/client/src/main/assembly/all-jar.xml
----------------------------------------------------------------------
diff --git a/client/src/main/assembly/all-jar.xml b/client/src/main/assembly/all-jar.xml
new file mode 100644
index 0000000..8bc9eee
--- /dev/null
+++ b/client/src/main/assembly/all-jar.xml
@@ -0,0 +1,35 @@
+<!--
+ ~ Licensed to the Apache Software Foundation (ASF) under one
+ ~ or more contributor license agreements. See the NOTICE file
+ ~ distributed with this work for additional information
+ ~ regarding copyright ownership. The ASF licenses this file
+ ~ to you under the Apache License, Version 2.0 (the
+ ~ "License"); you may not use this file except in compliance
+ ~ with the License. You may obtain a copy of the License at
+ ~
+ ~ http://www.apache.org/licenses/LICENSE-2.0
+ ~
+ ~ Unless required by applicable law or agreed to in writing, software
+ ~ distributed under the License is distributed on an "AS IS" BASIS,
+ ~ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ ~ See the License for the specific language governing permissions and
+ ~ limitations under the License.
+ -->
+<assembly
+ xmlns="http://maven.apache.org/plugins/maven-assembly-plugin/assembly/1.1.0"
+ xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+ xsi:schemaLocation="http://maven.apache.org/plugins/maven-assembly-plugin/assembly/1.1.0 http://maven.apache.org/xsd/assembly-1.1.0.xsd">
+ <id>all-jar</id>
+ <formats>
+ <format>jar</format> <!-- the result is a jar file -->
+ </formats>
+
+ <includeBaseDirectory>false</includeBaseDirectory> <!-- strip the module prefixes -->
+
+ <dependencySets>
+ <dependencySet>
+ <unpack>true</unpack> <!-- unpack , then repack the jars -->
+ <useTransitiveDependencies>false</useTransitiveDependencies> <!-- do not pull in any transitive dependencies -->
+ </dependencySet>
+ </dependencySets>
+</assembly>
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/atlas/blob/187730dd/client/src/main/java/org/apache/atlas/AtlasAdminClient.java
----------------------------------------------------------------------
diff --git a/client/src/main/java/org/apache/atlas/AtlasAdminClient.java b/client/src/main/java/org/apache/atlas/AtlasAdminClient.java
deleted file mode 100644
index f334f6c..0000000
--- a/client/src/main/java/org/apache/atlas/AtlasAdminClient.java
+++ /dev/null
@@ -1,140 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.atlas;
-
-import org.apache.atlas.model.metrics.AtlasMetrics;
-import org.apache.atlas.type.AtlasType;
-import org.apache.atlas.utils.AuthenticationUtil;
-import org.apache.commons.cli.CommandLine;
-import org.apache.commons.cli.CommandLineParser;
-import org.apache.commons.cli.GnuParser;
-import org.apache.commons.cli.HelpFormatter;
-import org.apache.commons.cli.Option;
-import org.apache.commons.cli.Options;
-import org.apache.commons.cli.ParseException;
-import org.apache.commons.configuration.Configuration;
-
-import java.util.Arrays;
-
-
-/**
- * An application that allows users to run admin commands against an Atlas server.
- *
- * The application uses {@link AtlasClient} to send REST requests to the Atlas server. The details of connections
- * and other configuration is specified in the Atlas properties file.
- * Exit status of the application will be as follows:
- * <li>0: successful execution</li>
- * <li>1: error in options used for the application</li>
- * <li>-1/255: application error</li>
- */
-public class AtlasAdminClient {
-
- private static final Option STATUS = new Option("status", false, "Get the status of an atlas instance");
- private static final Option STATS = new Option("stats", false, "Get the metrics of an atlas instance");
- private static final Options OPTIONS = new Options();
-
- private static final int INVALID_OPTIONS_STATUS = 1;
- private static final int PROGRAM_ERROR_STATUS = -1;
-
- static {
- OPTIONS.addOption(STATUS);
- OPTIONS.addOption(STATS);
- }
-
- public static void main(String[] args) throws AtlasException, ParseException {
- AtlasAdminClient atlasAdminClient = new AtlasAdminClient();
- int result = atlasAdminClient.run(args);
- System.exit(result);
- }
-
- private int run(String[] args) throws AtlasException {
- CommandLine commandLine = parseCommandLineOptions(args);
- Configuration configuration = ApplicationProperties.get();
- String[] atlasServerUri = configuration.getStringArray(AtlasConstants.ATLAS_REST_ADDRESS_KEY);
-
- if (atlasServerUri == null || atlasServerUri.length == 0) {
- atlasServerUri = new String[] { AtlasConstants.DEFAULT_ATLAS_REST_ADDRESS };
- }
-
- AtlasClient atlasClient = null;
- if (!AuthenticationUtil.isKerberosAuthenticationEnabled()) {
- String[] basicAuthUsernamePassword = AuthenticationUtil.getBasicAuthenticationInput();
- atlasClient = new AtlasClient(atlasServerUri, basicAuthUsernamePassword);
- } else {
- atlasClient = new AtlasClient(atlasServerUri);
- }
- return handleCommand(commandLine, atlasServerUri, atlasClient);
- }
-
- private int handleCommand(CommandLine commandLine, String[] atlasServerUri, AtlasClient atlasClient) {
- int cmdStatus = PROGRAM_ERROR_STATUS;
- if (commandLine.hasOption(STATUS.getOpt())) {
- try {
- System.out.println(atlasClient.getAdminStatus());
- cmdStatus = 0;
- } catch (AtlasServiceException e) {
- System.err.println("Could not retrieve status of the server at " + Arrays.toString(atlasServerUri));
- printStandardHttpErrorDetails(e);
- }
- } else if (commandLine.hasOption(STATS.getOpt())) {
- try {
- AtlasMetrics atlasMetrics = atlasClient.getAtlasMetrics();
- String json = AtlasType.toJson(atlasMetrics);
- System.out.println(json);
- cmdStatus = 0;
- } catch (AtlasServiceException e) {
- System.err.println("Could not retrieve metrics of the server at " + Arrays.toString(atlasServerUri));
- printStandardHttpErrorDetails(e);
- }
- } else {
- System.err.println("Unsupported option. Refer to usage for valid options.");
- printUsage(INVALID_OPTIONS_STATUS);
- }
- return cmdStatus;
- }
-
- private void printStandardHttpErrorDetails(AtlasServiceException e) {
- System.err.println("Error details: ");
- System.err.println("HTTP Status: " + e.getStatus().getStatusCode() + ","
- + e.getStatus().getReasonPhrase());
- System.err.println("Exception message: " + e.getMessage());
- }
-
- private CommandLine parseCommandLineOptions(String[] args) {
- if (args.length == 0) {
- printUsage(INVALID_OPTIONS_STATUS);
- }
- CommandLineParser parser = new GnuParser();
- CommandLine commandLine = null;
- try {
- commandLine = parser.parse(OPTIONS, args);
- } catch (ParseException e) {
- System.err.println("Could not parse command line options.");
- printUsage(INVALID_OPTIONS_STATUS);
- }
- return commandLine;
- }
-
- private void printUsage(int statusCode) {
- HelpFormatter helpFormatter = new HelpFormatter();
- helpFormatter.printHelp("atlas_admin.py", OPTIONS);
- System.exit(statusCode);
- }
-
-}
http://git-wip-us.apache.org/repos/asf/atlas/blob/187730dd/client/src/main/java/org/apache/atlas/AtlasBaseClient.java
----------------------------------------------------------------------
diff --git a/client/src/main/java/org/apache/atlas/AtlasBaseClient.java b/client/src/main/java/org/apache/atlas/AtlasBaseClient.java
deleted file mode 100644
index 602831a..0000000
--- a/client/src/main/java/org/apache/atlas/AtlasBaseClient.java
+++ /dev/null
@@ -1,620 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- * <p>
- * http://www.apache.org/licenses/LICENSE-2.0
- * <p>
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.atlas;
-
-import com.google.common.annotations.VisibleForTesting;
-import com.sun.jersey.api.client.Client;
-import com.sun.jersey.api.client.ClientHandlerException;
-import com.sun.jersey.api.client.ClientResponse;
-import com.sun.jersey.api.client.GenericType;
-import com.sun.jersey.api.client.WebResource;
-import com.sun.jersey.api.client.config.DefaultClientConfig;
-import com.sun.jersey.api.client.filter.HTTPBasicAuthFilter;
-import com.sun.jersey.api.json.JSONConfiguration;
-import com.sun.jersey.client.urlconnection.URLConnectionClientHandler;
-import org.apache.atlas.model.metrics.AtlasMetrics;
-import org.apache.atlas.security.SecureClientUtils;
-import org.apache.atlas.utils.AuthenticationUtil;
-import org.apache.commons.configuration.Configuration;
-import org.apache.commons.lang.StringUtils;
-import org.apache.hadoop.security.UserGroupInformation;
-import org.codehaus.jettison.json.JSONException;
-import org.codehaus.jettison.json.JSONObject;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import javax.ws.rs.HttpMethod;
-import javax.ws.rs.core.Cookie;
-import javax.ws.rs.core.MediaType;
-import javax.ws.rs.core.MultivaluedMap;
-import javax.ws.rs.core.Response;
-import javax.ws.rs.core.UriBuilder;
-import java.io.IOException;
-import java.net.ConnectException;
-import java.util.List;
-import java.util.Map;
-
-import static org.apache.atlas.security.SecurityProperties.TLS_ENABLED;
-
-public abstract class AtlasBaseClient {
- public static final String BASE_URI = "api/atlas/";
- public static final String TYPES = "types";
- public static final String ADMIN_VERSION = "admin/version";
- public static final String ADMIN_STATUS = "admin/status";
- public static final String ADMIN_METRICS = "admin/metrics";
- public static final String HTTP_AUTHENTICATION_ENABLED = "atlas.http.authentication.enabled";
- //Admin operations
- public static final APIInfo VERSION = new APIInfo(BASE_URI + ADMIN_VERSION, HttpMethod.GET, Response.Status.OK);
- public static final APIInfo STATUS = new APIInfo(BASE_URI + ADMIN_STATUS, HttpMethod.GET, Response.Status.OK);
- public static final APIInfo METRICS = new APIInfo(BASE_URI + ADMIN_METRICS, HttpMethod.GET, Response.Status.OK);
- static final String JSON_MEDIA_TYPE = MediaType.APPLICATION_JSON + "; charset=UTF-8";
- static final String UNKNOWN_STATUS = "Unknown status";
- static final String ATLAS_CLIENT_HA_RETRIES_KEY = "atlas.client.ha.retries";
- // Setting the default value based on testing failovers while client code like quickstart is running.
- static final int DEFAULT_NUM_RETRIES = 4;
- static final String ATLAS_CLIENT_HA_SLEEP_INTERVAL_MS_KEY = "atlas.client.ha.sleep.interval.ms";
- // Setting the default value based on testing failovers while client code like quickstart is running.
- // With number of retries, this gives a total time of about 20s for the server to start.
- static final int DEFAULT_SLEEP_BETWEEN_RETRIES_MS = 5000;
- private static final Logger LOG = LoggerFactory.getLogger(AtlasBaseClient.class);
- protected WebResource service;
- protected Configuration configuration;
- private String basicAuthUser;
- private String basicAuthPassword;
- private AtlasClientContext atlasClientContext;
- private boolean retryEnabled = false;
- private Cookie cookie = null;
-
- protected AtlasBaseClient() {
- }
-
- protected AtlasBaseClient(String[] baseUrl, String[] basicAuthUserNamePassword) {
- if (basicAuthUserNamePassword != null) {
- if (basicAuthUserNamePassword.length > 0) {
- this.basicAuthUser = basicAuthUserNamePassword[0];
- }
- if (basicAuthUserNamePassword.length > 1) {
- this.basicAuthPassword = basicAuthUserNamePassword[1];
- }
- }
-
- initializeState(baseUrl, null, null);
- }
-
- protected AtlasBaseClient(String... baseUrls) throws AtlasException {
- this(getCurrentUGI(), baseUrls);
- }
-
- protected AtlasBaseClient(UserGroupInformation ugi, String[] baseUrls) {
- this(ugi, ugi.getShortUserName(), baseUrls);
- }
-
- protected AtlasBaseClient(UserGroupInformation ugi, String doAsUser, String[] baseUrls) {
- initializeState(baseUrls, ugi, doAsUser);
- }
-
- protected AtlasBaseClient(String[] baseUrls, Cookie cookie) {
- this.cookie = cookie;
- initializeState(baseUrls, null, null);
- }
-
- @VisibleForTesting
- protected AtlasBaseClient(WebResource service, Configuration configuration) {
- this.service = service;
- this.configuration = configuration;
- }
-
- @VisibleForTesting
- protected AtlasBaseClient(Configuration configuration, String[] baseUrl, String[] basicAuthUserNamePassword) {
- if (basicAuthUserNamePassword != null) {
- if (basicAuthUserNamePassword.length > 0) {
- this.basicAuthUser = basicAuthUserNamePassword[0];
- }
- if (basicAuthUserNamePassword.length > 1) {
- this.basicAuthPassword = basicAuthUserNamePassword[1];
- }
- }
-
- initializeState(configuration, baseUrl, null, null);
- }
-
- public void setCookie(Cookie cookie) {
- this.cookie = cookie;
- }
-
- protected static UserGroupInformation getCurrentUGI() throws AtlasException {
- try {
- return UserGroupInformation.getCurrentUser();
- } catch (IOException e) {
- throw new AtlasException(e);
- }
- }
-
- void initializeState(String[] baseUrls, UserGroupInformation ugi, String doAsUser) {
- initializeState(getClientProperties(), baseUrls, ugi, doAsUser);
- }
-
- void initializeState(Configuration configuration, String[] baseUrls, UserGroupInformation ugi, String doAsUser) {
- this.configuration = configuration;
- Client client = getClient(configuration, ugi, doAsUser);
-
- if ((!AuthenticationUtil.isKerberosAuthenticationEnabled()) && basicAuthUser != null && basicAuthPassword != null) {
- final HTTPBasicAuthFilter authFilter = new HTTPBasicAuthFilter(basicAuthUser, basicAuthPassword);
- client.addFilter(authFilter);
- }
-
- String activeServiceUrl = determineActiveServiceURL(baseUrls, client);
- atlasClientContext = new AtlasClientContext(baseUrls, client, ugi, doAsUser);
- service = client.resource(UriBuilder.fromUri(activeServiceUrl).build());
- }
-
- @VisibleForTesting
- protected Client getClient(Configuration configuration, UserGroupInformation ugi, String doAsUser) {
- DefaultClientConfig config = new DefaultClientConfig();
- // Enable POJO mapping feature
- config.getFeatures().put(JSONConfiguration.FEATURE_POJO_MAPPING, Boolean.TRUE);
- int readTimeout = configuration.getInt("atlas.client.readTimeoutMSecs", 60000);
- int connectTimeout = configuration.getInt("atlas.client.connectTimeoutMSecs", 60000);
- if (configuration.getBoolean(TLS_ENABLED, false)) {
- // create an SSL properties configuration if one doesn't exist. SSLFactory expects a file, so forced
- // to create a
- // configuration object, persist it, then subsequently pass in an empty configuration to SSLFactory
- try {
- SecureClientUtils.persistSSLClientConfiguration(configuration);
- } catch (Exception e) {
- LOG.info("Error processing client configuration.", e);
- }
- }
-
- final URLConnectionClientHandler handler;
-
- if ((AuthenticationUtil.isKerberosAuthenticationEnabled())) {
- handler = SecureClientUtils.getClientConnectionHandler(config, configuration, doAsUser, ugi);
- } else {
- if (configuration.getBoolean(TLS_ENABLED, false)) {
- handler = SecureClientUtils.getUrlConnectionClientHandler();
- } else {
- handler = new URLConnectionClientHandler();
- }
- }
- Client client = new Client(handler, config);
- client.setReadTimeout(readTimeout);
- client.setConnectTimeout(connectTimeout);
- return client;
- }
-
- @VisibleForTesting
- protected String determineActiveServiceURL(String[] baseUrls, Client client) {
- if (baseUrls.length == 0) {
- throw new IllegalArgumentException("Base URLs cannot be null or empty");
- }
- final String baseUrl;
- AtlasServerEnsemble atlasServerEnsemble = new AtlasServerEnsemble(baseUrls);
- if (atlasServerEnsemble.hasSingleInstance()) {
- baseUrl = atlasServerEnsemble.firstURL();
- LOG.info("Client has only one service URL, will use that for all actions: {}", baseUrl);
- } else {
- try {
- baseUrl = selectActiveServerAddress(client, atlasServerEnsemble);
- } catch (AtlasServiceException e) {
- LOG.error("None of the passed URLs are active: {}", atlasServerEnsemble, e);
- throw new IllegalArgumentException("None of the passed URLs are active " + atlasServerEnsemble, e);
- }
- }
- return baseUrl;
- }
-
- private String selectActiveServerAddress(Client client, AtlasServerEnsemble serverEnsemble)
- throws AtlasServiceException {
- List<String> serverInstances = serverEnsemble.getMembers();
- String activeServerAddress = null;
- for (String serverInstance : serverInstances) {
- LOG.info("Trying with address {}", serverInstance);
- activeServerAddress = getAddressIfActive(client, serverInstance);
- if (activeServerAddress != null) {
- LOG.info("Found service {} as active service.", serverInstance);
- break;
- }
- }
- if (activeServerAddress != null)
- return activeServerAddress;
- else
- throw new AtlasServiceException(STATUS, new RuntimeException("Could not find any active instance"));
- }
-
- private String getAddressIfActive(Client client, String serverInstance) {
- String activeServerAddress = null;
- for (int i = 0; i < getNumberOfRetries(); i++) {
- try {
- service = client.resource(UriBuilder.fromUri(serverInstance).build());
- String adminStatus = getAdminStatus();
- if (StringUtils.equals(adminStatus, "ACTIVE")) {
- activeServerAddress = serverInstance;
- break;
- } else {
- LOG.info("attempt #{}: Service {} - is not active. status={}", (i + 1), serverInstance, adminStatus);
- }
- } catch (Exception e) {
- LOG.error("attempt #{}: Service {} - could not get status", (i + 1), serverInstance, e);
- }
- sleepBetweenRetries();
- }
- return activeServerAddress;
- }
-
- protected Configuration getClientProperties() {
- try {
- if (configuration == null) {
- configuration = ApplicationProperties.get();
- }
- } catch (AtlasException e) {
- LOG.error("Exception while loading configuration.", e);
- }
- return configuration;
- }
-
- public boolean isServerReady() throws AtlasServiceException {
- WebResource resource = getResource(VERSION.getPath());
- try {
- callAPIWithResource(VERSION, resource, null, JSONObject.class);
- return true;
- } catch (ClientHandlerException che) {
- return false;
- } catch (AtlasServiceException ase) {
- if (ase.getStatus() != null && ase.getStatus().equals(ClientResponse.Status.SERVICE_UNAVAILABLE)) {
- LOG.warn("Received SERVICE_UNAVAILABLE, server is not yet ready");
- return false;
- }
- throw ase;
- }
- }
-
- protected WebResource getResource(String path, String... pathParams) {
- return getResource(service, path, pathParams);
- }
-
- protected <T> T callAPIWithResource(APIInfo api, WebResource resource, Object requestObject, Class<T> responseType) throws AtlasServiceException {
- GenericType<T> genericType = null;
- if (responseType != null) {
- genericType = new GenericType<>(responseType);
- }
- return callAPIWithResource(api, resource, requestObject, genericType);
- }
-
- protected <T> T callAPIWithResource(APIInfo api, WebResource resource, Object requestObject, GenericType<T> responseType) throws AtlasServiceException {
- ClientResponse clientResponse = null;
- int i = 0;
- do {
- if (LOG.isDebugEnabled()) {
- LOG.debug("Calling API [ {} : {} ] {}", api.getMethod(), api.getPath(), requestObject != null ? "<== " + requestObject : "");
- }
-
- WebResource.Builder requestBuilder = resource.getRequestBuilder();
-
- // Set content headers
- requestBuilder
- .accept(JSON_MEDIA_TYPE)
- .type(JSON_MEDIA_TYPE);
-
- // Set cookie if present
- if (cookie != null) {
- requestBuilder.cookie(cookie);
- }
-
- clientResponse = requestBuilder.method(api.getMethod(), ClientResponse.class, requestObject);
-
- if (LOG.isDebugEnabled()) {
- LOG.debug("API {} returned status {}", resource.getURI(), clientResponse.getStatus());
- }
-
- if (clientResponse.getStatus() == api.getExpectedStatus().getStatusCode()) {
- if (null == responseType) {
- return null;
- }
- try {
- if (responseType.getRawClass() == JSONObject.class) {
- String stringEntity = clientResponse.getEntity(String.class);
- try {
- JSONObject jsonObject = new JSONObject(stringEntity);
- LOG.info("Response = {}", jsonObject);
- return (T) jsonObject;
- } catch (JSONException e) {
- throw new AtlasServiceException(api, e);
- }
- } else {
- T entity = clientResponse.getEntity(responseType);
- return entity;
- }
- } catch (ClientHandlerException e) {
- throw new AtlasServiceException(api, e);
- }
- } else if (clientResponse.getStatus() != ClientResponse.Status.SERVICE_UNAVAILABLE.getStatusCode()) {
- break;
- } else {
- LOG.error("Got a service unavailable when calling: {}, will retry..", resource);
- sleepBetweenRetries();
- }
-
- i++;
- } while (i < getNumberOfRetries());
-
- throw new AtlasServiceException(api, clientResponse);
- }
-
- private WebResource getResource(WebResource service, String path, String... pathParams) {
- WebResource resource = service.path(path);
- resource = appendPathParams(resource, pathParams);
- return resource;
- }
-
- void sleepBetweenRetries() {
- try {
- Thread.sleep(getSleepBetweenRetriesMs());
- } catch (InterruptedException e) {
- LOG.error("Interrupted from sleeping between retries.", e);
- }
- }
-
- int getNumberOfRetries() {
- return configuration.getInt(AtlasBaseClient.ATLAS_CLIENT_HA_RETRIES_KEY, AtlasBaseClient.DEFAULT_NUM_RETRIES);
- }
-
- private int getSleepBetweenRetriesMs() {
- return configuration.getInt(AtlasBaseClient.ATLAS_CLIENT_HA_SLEEP_INTERVAL_MS_KEY, AtlasBaseClient.DEFAULT_SLEEP_BETWEEN_RETRIES_MS);
- }
-
- /**
- * Return status of the service instance the client is pointing to.
- *
- * @return One of the values in ServiceState.ServiceStateValue or {@link #UNKNOWN_STATUS} if
- * there is a JSON parse exception
- * @throws AtlasServiceException if there is a HTTP error.
- */
- public String getAdminStatus() throws AtlasServiceException {
- String result = AtlasBaseClient.UNKNOWN_STATUS;
- WebResource resource = getResource(service, STATUS.getPath());
- JSONObject response = callAPIWithResource(STATUS, resource, null, JSONObject.class);
- try {
- result = response.getString("Status");
- } catch (JSONException e) {
- LOG.error("Exception while parsing admin status response. Returned response {}", response.toString(), e);
- }
- return result;
- }
-
- /**
- * @return Return metrics of the service instance the client is pointing to
- * @throws AtlasServiceException
- */
- public AtlasMetrics getAtlasMetrics() throws AtlasServiceException {
- return callAPI(METRICS, AtlasMetrics.class, null);
- }
-
- boolean isRetryableException(ClientHandlerException che) {
- return che.getCause().getClass().equals(IOException.class)
- || che.getCause().getClass().equals(ConnectException.class);
- }
-
- void handleClientHandlerException(ClientHandlerException che) {
- if (isRetryableException(che)) {
- atlasClientContext.getClient().destroy();
- LOG.warn("Destroyed current context while handling ClientHandlerEception.");
- LOG.warn("Will retry and create new context.");
- sleepBetweenRetries();
- initializeState(atlasClientContext.getBaseUrls(), atlasClientContext.getUgi(),
- atlasClientContext.getDoAsUser());
- return;
- }
- throw che;
- }
-
- @VisibleForTesting
- JSONObject callAPIWithRetries(APIInfo api, Object requestObject, ResourceCreator resourceCreator)
- throws AtlasServiceException {
- for (int i = 0; i < getNumberOfRetries(); i++) {
- WebResource resource = resourceCreator.createResource();
- try {
- LOG.debug("Using resource {} for {} times", resource.getURI(), i + 1);
- return callAPIWithResource(api, resource, requestObject, JSONObject.class);
- } catch (ClientHandlerException che) {
- if (i == (getNumberOfRetries() - 1)) {
- throw che;
- }
- LOG.warn("Handled exception in calling api {}", api.getPath(), che);
- LOG.warn("Exception's cause: {}", che.getCause().getClass());
- handleClientHandlerException(che);
- }
- }
- throw new AtlasServiceException(api, new RuntimeException("Could not get response after retries."));
- }
-
- public <T> T callAPI(APIInfo api, Class<T> responseType, Object requestObject, String... params)
- throws AtlasServiceException {
- return callAPIWithResource(api, getResource(api, params), requestObject, responseType);
- }
-
- public <T> T callAPI(APIInfo api, GenericType<T> responseType, Object requestObject, String... params)
- throws AtlasServiceException {
- return callAPIWithResource(api, getResource(api, params), requestObject, responseType);
- }
-
-
- public <T> T callAPI(APIInfo api, Class<T> responseType, Object requestBody,
- MultivaluedMap<String, String> queryParams, String... params) throws AtlasServiceException {
- WebResource resource = getResource(api, queryParams, params);
- return callAPIWithResource(api, resource, requestBody, responseType);
- }
-
- public <T> T callAPI(APIInfo api, Class<T> responseType, MultivaluedMap<String, String> queryParams, String... params)
- throws AtlasServiceException {
- WebResource resource = getResource(api, queryParams, params);
- return callAPIWithResource(api, resource, null, responseType);
- }
-
- public <T> T callAPI(APIInfo api, GenericType<T> responseType, MultivaluedMap<String, String> queryParams, String... params)
- throws AtlasServiceException {
- WebResource resource = getResource(api, queryParams, params);
- return callAPIWithResource(api, resource, null, responseType);
- }
-
- protected WebResource getResource(APIInfo api, String... pathParams) {
- return getResource(service, api, pathParams);
- }
-
- // Modify URL to include the path params
- private WebResource getResource(WebResource service, APIInfo api, String... pathParams) {
- WebResource resource = service.path(api.getPath());
- resource = appendPathParams(resource, pathParams);
- return resource;
- }
-
- public <T> T callAPI(APIInfo api, Class<T> responseType, MultivaluedMap<String, String> queryParams)
- throws AtlasServiceException {
- return callAPIWithResource(api, getResource(api, queryParams), null, responseType);
- }
-
- public <T> T callAPI(APIInfo api, Class<T> responseType, String queryParamKey, List<String> queryParamValues)
- throws AtlasServiceException {
- return callAPIWithResource(api, getResource(api, queryParamKey, queryParamValues), null, responseType);
- }
-
- private WebResource getResource(APIInfo api, String queryParamKey, List<String> queryParamValues) {
- WebResource resource = service.path(api.getPath());
- for (String queryParamValue : queryParamValues) {
- if (StringUtils.isNotBlank(queryParamKey) && StringUtils.isNotBlank(queryParamValue)) {
- resource = resource.queryParam(queryParamKey, queryParamValue);
- }
- }
- return resource;
- }
-
- protected WebResource getResource(APIInfo api, MultivaluedMap<String, String> queryParams, String... pathParams) {
- WebResource resource = service.path(api.getPath());
- resource = appendPathParams(resource, pathParams);
- resource = appendQueryParams(queryParams, resource);
- return resource;
- }
-
- private WebResource appendPathParams(WebResource resource, String[] pathParams) {
- if (pathParams != null) {
- for (String pathParam : pathParams) {
- resource = resource.path(pathParam);
- }
- }
- return resource;
- }
-
- protected WebResource getResource(APIInfo api, MultivaluedMap<String, String> queryParams) {
- return getResource(service, api, queryParams);
- }
-
- // Modify URL to include the query params
- private WebResource getResource(WebResource service, APIInfo api, MultivaluedMap<String, String> queryParams) {
- WebResource resource = service.path(api.getPath());
- resource = appendQueryParams(queryParams, resource);
- return resource;
- }
-
- private WebResource appendQueryParams(MultivaluedMap<String, String> queryParams, WebResource resource) {
- if (null != queryParams && !queryParams.isEmpty()) {
- for (Map.Entry<String, List<String>> entry : queryParams.entrySet()) {
- for (String value : entry.getValue()) {
- if (StringUtils.isNotBlank(value)) {
- resource = resource.queryParam(entry.getKey(), value);
- }
- }
- }
- }
- return resource;
- }
-
- protected APIInfo updatePathParameters(APIInfo apiInfo, String... params) {
- return new APIInfo(String.format(apiInfo.getPath(), params), apiInfo.getMethod(), apiInfo.getExpectedStatus());
- }
-
- @VisibleForTesting
- void setConfiguration(Configuration configuration) {
- this.configuration = configuration;
- }
-
- @VisibleForTesting
- void setService(WebResource resource) {
- this.service = resource;
- }
-
-
- public static class APIInfo {
- private final String method;
- private final String path;
- private final Response.Status status;
-
- public APIInfo(String path, String method, Response.Status status) {
- this.path = path;
- this.method = method;
- this.status = status;
- }
-
- public String getMethod() {
- return method;
- }
-
- public String getPath() {
- return path;
- }
-
- public Response.Status getExpectedStatus() {
- return status;
- }
- }
-
- /**
- * A class to capture input state while creating the client.
- *
- * The information here will be reused when the client is re-initialized on switch-over
- * in case of High Availability.
- */
- private class AtlasClientContext {
- private String[] baseUrls;
- private Client client;
- private String doAsUser;
- private UserGroupInformation ugi;
-
- public AtlasClientContext(String[] baseUrls, Client client, UserGroupInformation ugi, String doAsUser) {
- this.baseUrls = baseUrls;
- this.client = client;
- this.ugi = ugi;
- this.doAsUser = doAsUser;
- }
-
- public Client getClient() {
- return client;
- }
-
- public String[] getBaseUrls() {
- return baseUrls;
- }
-
- public String getDoAsUser() {
- return doAsUser;
- }
-
- public UserGroupInformation getUgi() {
- return ugi;
- }
- }
-}