You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@sentry.apache.org by ak...@apache.org on 2017/04/06 21:36:04 UTC
[5/5] sentry git commit: Client Failover reorg prototype
Client Failover reorg prototype
Project: http://git-wip-us.apache.org/repos/asf/sentry/repo
Commit: http://git-wip-us.apache.org/repos/asf/sentry/commit/c9c0119f
Tree: http://git-wip-us.apache.org/repos/asf/sentry/tree/c9c0119f
Diff: http://git-wip-us.apache.org/repos/asf/sentry/diff/c9c0119f
Branch: refs/heads/SENTRY-1593-akolb
Commit: c9c0119fc9e61615b9445d989dd63c9395a1b21c
Parents: e3d859a
Author: Alexander Kolbasov <ak...@cloudera.com>
Authored: Thu Apr 6 14:35:46 2017 -0700
Committer: Alexander Kolbasov <ak...@cloudera.com>
Committed: Thu Apr 6 14:35:46 2017 -0700
----------------------------------------------------------------------
.../transport/RetryClientInvocationHandler.java | 22 +-
.../SentryClientTransportConfigInterface.java | 2 +-
.../common/transport/SentryServiceClient.java | 48 ---
...SentryServiceClientTransportDefaultImpl.java | 342 -------------------
.../core/common/transport/SentrySocket.java | 32 ++
.../transport/SentryTransportFactory.java | 234 +++++++++++++
.../sentry/hdfs/SentryHDFSServiceClient.java | 5 +-
.../SentryHDFSServiceClientDefaultImpl.java | 43 ++-
.../hdfs/SentryHDFSServiceClientFactory.java | 11 +-
.../thrift/SentryGenericServiceClient.java | 5 +-
.../SentryGenericServiceClientDefaultImpl.java | 50 ++-
.../SentryGenericServiceClientFactory.java | 8 +-
.../thrift/SentryPolicyServiceClient.java | 5 +-
.../SentryPolicyServiceClientDefaultImpl.java | 46 ++-
.../thrift/SentryServiceClientFactory.java | 25 +-
.../thrift/SentryServiceClientPoolFactory.java | 27 +-
16 files changed, 428 insertions(+), 477 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/sentry/blob/c9c0119f/sentry-core/sentry-core-common/src/main/java/org/apache/sentry/core/common/transport/RetryClientInvocationHandler.java
----------------------------------------------------------------------
diff --git a/sentry-core/sentry-core-common/src/main/java/org/apache/sentry/core/common/transport/RetryClientInvocationHandler.java b/sentry-core/sentry-core-common/src/main/java/org/apache/sentry/core/common/transport/RetryClientInvocationHandler.java
index b01cb37..86569c9 100644
--- a/sentry-core/sentry-core-common/src/main/java/org/apache/sentry/core/common/transport/RetryClientInvocationHandler.java
+++ b/sentry-core/sentry-core-common/src/main/java/org/apache/sentry/core/common/transport/RetryClientInvocationHandler.java
@@ -49,16 +49,20 @@ import java.lang.reflect.Method;
* TODO(kalyan) allow multiple client connections using <code>PoolClientInvocationHandler</code>
*/
-public class RetryClientInvocationHandler extends SentryClientInvocationHandler {
+public final class RetryClientInvocationHandler extends SentryClientInvocationHandler {
private static final Logger LOGGER =
LoggerFactory.getLogger(RetryClientInvocationHandler.class);
- private SentryServiceClient client = null;
+ private final int retries;
+ private final SentrySocket client;
/**
* Initialize the sentry configurations, including rpc retry count and client connection
* configs for SentryPolicyServiceClientDefaultImpl
*/
- public RetryClientInvocationHandler(Configuration conf, SentryServiceClient clientObject) {
+ public RetryClientInvocationHandler(Configuration conf,
+ SentryClientTransportConfigInterface transportConfig,
+ SentrySocket clientObject) {
+ retries = transportConfig.getSentryRpcRetryTotal(conf);
Preconditions.checkNotNull(conf, "Configuration object cannot be null");
client = clientObject;
}
@@ -77,18 +81,17 @@ public class RetryClientInvocationHandler extends SentryClientInvocationHandler
synchronized public Object invokeImpl(Object proxy, Method method, Object[] args) throws Exception {
int retryCount = 0;
Exception lastExc = null;
- boolean tryAlternateServer = false;
- while (retryCount < client.getRetryCount()) {
+ while (retryCount < retries) {
// Connect to a sentry server if not connected yet.
try {
- client.connectWithRetry(tryAlternateServer);
+ client.connect();
} catch (IOException e) {
// Increase the retry num
// Retry when the exception is caused by connection problem.
retryCount++;
lastExc = e;
- close();
+ client.close();
continue;
}
@@ -108,7 +111,6 @@ public class RetryClientInvocationHandler extends SentryClientInvocationHandler
// Retry when the exception is caused by connection problem.
lastExc = new TTransportException(sentryTargetException);
LOGGER.error("Got TTransportException when do the thrift call ", lastExc);
- tryAlternateServer = true;
// Closing the thrift client on TTransportException. New client object is
// created using new socket when an attempt to reconnect is made.
close();
@@ -131,9 +133,9 @@ public class RetryClientInvocationHandler extends SentryClientInvocationHandler
}
// Throw the exception as reaching the max rpc retry num.
- LOGGER.error(String.format("failed after %d retries ", client.getRetryCount()), lastExc);
+ LOGGER.error(String.format("failed after %d retries ", retries), lastExc);
throw new SentryUserException(
- String.format("failed after %d retries ", client.getRetryCount()), lastExc);
+ String.format("failed after %d retries ", retries), lastExc);
}
@Override
http://git-wip-us.apache.org/repos/asf/sentry/blob/c9c0119f/sentry-core/sentry-core-common/src/main/java/org/apache/sentry/core/common/transport/SentryClientTransportConfigInterface.java
----------------------------------------------------------------------
diff --git a/sentry-core/sentry-core-common/src/main/java/org/apache/sentry/core/common/transport/SentryClientTransportConfigInterface.java b/sentry-core/sentry-core-common/src/main/java/org/apache/sentry/core/common/transport/SentryClientTransportConfigInterface.java
index 24192fd..3ea36a1 100644
--- a/sentry-core/sentry-core-common/src/main/java/org/apache/sentry/core/common/transport/SentryClientTransportConfigInterface.java
+++ b/sentry-core/sentry-core-common/src/main/java/org/apache/sentry/core/common/transport/SentryClientTransportConfigInterface.java
@@ -28,7 +28,7 @@ import org.apache.sentry.core.common.exception.MissingConfigurationException;
* This Configuration interface should be implemented for all the sentry clients to get
* the transport configuration.
*/
-interface SentryClientTransportConfigInterface {
+public interface SentryClientTransportConfigInterface {
/**
* @param conf configuration
* @return number of times client retry logic should iterate through all
http://git-wip-us.apache.org/repos/asf/sentry/blob/c9c0119f/sentry-core/sentry-core-common/src/main/java/org/apache/sentry/core/common/transport/SentryServiceClient.java
----------------------------------------------------------------------
diff --git a/sentry-core/sentry-core-common/src/main/java/org/apache/sentry/core/common/transport/SentryServiceClient.java b/sentry-core/sentry-core-common/src/main/java/org/apache/sentry/core/common/transport/SentryServiceClient.java
deleted file mode 100644
index dc93fb7..0000000
--- a/sentry-core/sentry-core-common/src/main/java/org/apache/sentry/core/common/transport/SentryServiceClient.java
+++ /dev/null
@@ -1,48 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- * <p>
- * http://www.apache.org/licenses/LICENSE-2.0
- * <p>
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.sentry.core.common.transport;
-
-import java.io.Closeable;
-
-/**
- * Client interface for Proxy Invocation handlers
- * <p>
- * Defines interface that Sentry client's should expose to the Invocation handlers like
- * <code>RetryClientInvocationHandler</code> used to proxy the method invocation on sentry
- * client instances .
- * <p>
- * All the sentry clients that need retrying and failover capabilities should implement
- * this interface.
- */
-public interface SentryServiceClient extends Closeable {
- /**
- * This is a no-op when already connected.
- * When there is a connection error, it will retry with another sentry server. It will
- * first cycle through all the available sentry servers, and then retry the whole server
- * list no more than connectionFullRetryTotal times. In this case, it won't introduce
- * more latency when some server fails. Also to prevent all clients connecting to the
- * same server, it will reorder the endpoints randomly after a full retry.
- * <p>
- * TODO: Add metrics for the number of successful connects and errors per client, and total number of retries.
- */
- void connectWithRetry(boolean tryAlternateServer) throws Exception;
-
- int getRetryCount();
-
- void close();
-}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/sentry/blob/c9c0119f/sentry-core/sentry-core-common/src/main/java/org/apache/sentry/core/common/transport/SentryServiceClientTransportDefaultImpl.java
----------------------------------------------------------------------
diff --git a/sentry-core/sentry-core-common/src/main/java/org/apache/sentry/core/common/transport/SentryServiceClientTransportDefaultImpl.java b/sentry-core/sentry-core-common/src/main/java/org/apache/sentry/core/common/transport/SentryServiceClientTransportDefaultImpl.java
deleted file mode 100644
index 4c126fb..0000000
--- a/sentry-core/sentry-core-common/src/main/java/org/apache/sentry/core/common/transport/SentryServiceClientTransportDefaultImpl.java
+++ /dev/null
@@ -1,342 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- * <p>
- * http://www.apache.org/licenses/LICENSE-2.0
- * <p>
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.sentry.core.common.transport;
-
-import com.google.common.base.Preconditions;
-import com.google.common.collect.ImmutableMap;
-import com.google.common.net.HostAndPort;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.net.NetUtils;
-import org.apache.hadoop.security.SaslRpcServer;
-import org.apache.hadoop.security.SecurityUtil;
-import org.apache.hadoop.security.UserGroupInformation;
-import org.apache.sentry.core.common.exception.MissingConfigurationException;
-import org.apache.thrift.transport.TSaslClientTransport;
-import org.apache.thrift.transport.TSocket;
-import org.apache.thrift.transport.TTransport;
-import org.apache.thrift.transport.TTransportException;
-import org.apache.sentry.core.common.utils.ThriftUtil;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import javax.security.sasl.Sasl;
-import javax.security.sasl.SaslException;
-
-import java.io.IOException;
-import java.net.InetSocketAddress;
-import java.security.PrivilegedExceptionAction;
-import java.util.ArrayList;
-import java.util.Collections;
-
-/**
- * Implements the transport functionality for sentry clients.
- * All the sentry clients should extend this class for transport implementation.
- */
-
-public abstract class SentryServiceClientTransportDefaultImpl {
- protected final Configuration conf;
- protected final boolean kerberos;
- private String[] serverPrincipalParts;
-
- protected TTransport transport;
- private final int connectionTimeout;
- private static final Logger LOGGER = LoggerFactory.getLogger(SentryServiceClientTransportDefaultImpl.class);
- // configs for connection retry
- private final int connectionFullRetryTotal;
- private final int rpcRetryTotal;
- private final ArrayList<InetSocketAddress> endpoints;
- protected InetSocketAddress serverAddress;
- private final SentryClientTransportConfigInterface transportConfig;
- private static final ImmutableMap<String, String> SASL_PROPERTIES =
- ImmutableMap.of(Sasl.SERVER_AUTH, "true", Sasl.QOP, "auth-conf");
-
- /**
- * Defines various client types.
- */
- protected enum sentryClientType {
- POLICY_CLIENT,
- HDFS_CLIENT,
- }
-
- /**
- * This transport wraps the Sasl transports to set up the right UGI context for open().
- */
- public static class UgiSaslClientTransport extends TSaslClientTransport {
- UserGroupInformation ugi = null;
-
- public UgiSaslClientTransport(String mechanism, String protocol,
- String serverName, TTransport transport,
- boolean wrapUgi, Configuration conf)
- throws IOException, SaslException {
- super(mechanism, null, protocol, serverName, SASL_PROPERTIES, null,
- transport);
- if (wrapUgi) {
- // If we don't set the configuration, the UGI will be created based on
- // what's on the classpath, which may lack the kerberos changes we require
- UserGroupInformation.setConfiguration(conf);
- ugi = UserGroupInformation.getLoginUser();
- }
- }
-
- // open the SASL transport with using the current UserGroupInformation
- // This is needed to get the current login context stored
- @Override
- public void open() throws TTransportException {
- if (ugi == null) {
- baseOpen();
- } else {
- try {
- if (ugi.isFromKeytab()) {
- ugi.checkTGTAndReloginFromKeytab();
- }
- ugi.doAs(new PrivilegedExceptionAction<Void>() {
- public Void run() throws TTransportException {
- baseOpen();
- return null;
- }
- });
- } catch (IOException e) {
- throw new TTransportException("Failed to open SASL transport: " + e.getMessage(), e);
- } catch (InterruptedException e) {
- throw new TTransportException(
- "Interrupted while opening underlying transport: " + e.getMessage(), e);
- }
- }
- }
-
- private void baseOpen() throws TTransportException {
- super.open();
- }
- }
-
- /**
- * Initialize the object based on the sentry configuration provided.
- * List of configured servers are reordered randomly preventing all
- * clients connecting to the same server.
- *
- * @param conf Sentry configuration
- * @param type Type indicates the service type
- */
- public SentryServiceClientTransportDefaultImpl(Configuration conf,
- sentryClientType type) throws IOException {
-
- this.conf = conf;
- Preconditions.checkNotNull(this.conf, "Configuration object cannot be null");
- serverPrincipalParts = null;
- if (type == sentryClientType.POLICY_CLIENT) {
- transportConfig = new SentryPolicyClientTransportConfig();
- } else {
- transportConfig = new SentryHDFSClientTransportConfig();
- }
-
- try {
- String hostsAndPortsStr;
- this.connectionTimeout = transportConfig.getServerRpcConnTimeoutInMs(conf);
- this.connectionFullRetryTotal = transportConfig.getSentryFullRetryTotal(conf);
- this.rpcRetryTotal = transportConfig.getSentryRpcRetryTotal(conf);
- this.kerberos = transportConfig.isKerberosEnabled(conf);
-
- hostsAndPortsStr = transportConfig.getSentryServerRpcAddress(conf);
-
- int serverPort = transportConfig.getServerRpcPort(conf);
-
- String[] hostsAndPortsStrArr = hostsAndPortsStr.split(",");
- HostAndPort[] hostsAndPorts = ThriftUtil.parseHostPortStrings(hostsAndPortsStrArr, serverPort);
-
- this.endpoints = new ArrayList(hostsAndPortsStrArr.length);
- for (HostAndPort endpoint : hostsAndPorts) {
- this.endpoints.add(
- new InetSocketAddress(endpoint.getHostText(), endpoint.getPort()));
- LOGGER.debug("Added server endpoint: " + endpoint.toString());
- }
-
- // Reorder endpoints randomly to prevent all clients connecting to the same endpoint
- // at the same time after a node failure.
- Collections.shuffle(endpoints);
- serverAddress = null;
- connectWithRetry(false);
- } catch (Exception e) {
- throw new RuntimeException("Client Creation Failed: " + e.getMessage(), e);
- }
- }
-
- /**
- * Initialize object based on the parameters provided provided.
- *
- * @param addr Host address which the client needs to connect
- * @param port Host Port which the client needs to connect
- * @param conf Sentry configuration
- * @param type Type indicates the service type
- */
- public SentryServiceClientTransportDefaultImpl(String addr, int port, Configuration conf,
- sentryClientType type) throws IOException {
- // copy the configuration because we may make modifications to it.
- this.conf = new Configuration(conf);
- serverPrincipalParts = null;
- Preconditions.checkNotNull(this.conf, "Configuration object cannot be null");
- if (type == sentryClientType.POLICY_CLIENT) {
- transportConfig = new SentryPolicyClientTransportConfig();
- } else {
- transportConfig = new SentryHDFSClientTransportConfig();
- }
-
- try {
- InetSocketAddress serverAddress = NetUtils.createSocketAddr(addr, port);
- this.connectionTimeout = transportConfig.getServerRpcConnTimeoutInMs(conf);
- this.rpcRetryTotal = transportConfig.getSentryRpcRetryTotal(conf);
- this.connectionFullRetryTotal = transportConfig.getSentryFullRetryTotal(conf);
- this.kerberos = transportConfig.isKerberosEnabled(conf);
- connect(serverAddress);
- } catch (MissingConfigurationException e) {
- throw new RuntimeException("Client Creation Failed: " + e.getMessage(), e);
- }
- endpoints = null;
- }
-
-
- /**
- * no-op when already connected.
- * On connection error, Iterates through all the configured servers and tries to connect.
- * On successful connection, control returns
- * On connection failure, continues iterating through all the configured sentry servers,
- * and then retries the whole server list no more than connectionFullRetryTotal times.
- * In this case, it won't introduce more latency when some server fails.
- * <p>
- * TODO: Add metrics for the number of successful connects and errors per client, and total number of retries.
- */
- public synchronized void connectWithRetry(boolean tryAlternateServer) throws IOException {
- if (isConnected() && (!tryAlternateServer)) {
- return;
- }
-
- IOException currentException = null;
- for (int retryCount = 0; retryCount < connectionFullRetryTotal; retryCount++) {
- try {
- connectToAvailableServer();
- return;
- } catch (IOException e) {
- currentException = e;
- LOGGER.error(
- String.format("Failed to connect to all the configured sentry servers, " +
- "Retrying again"));
- }
- }
- // Throw exception as reaching the max full connectWithRetry number.
- LOGGER.error(
- String.format("Reach the max connection retry num %d ", connectionFullRetryTotal),
- currentException);
- throw currentException;
- }
-
- /**
- * Iterates through all the configured servers and tries to connect.
- * On connection error, tries to connect to next server.
- * Control returns on successful connection OR it's done trying to all the
- * configured servers.
- *
- * @throws IOException
- */
- private void connectToAvailableServer() throws IOException {
- IOException currentException = null;
- if (endpoints.size() == 1) {
- connect(endpoints.get(0));
- return;
- }
-
- for (InetSocketAddress addr : endpoints) {
- try {
- serverAddress = addr;
- connect(serverAddress);
- LOGGER.info(String.format("Connected to SentryServer: %s", addr.toString()));
- return;
- } catch (IOException e) {
- LOGGER.error(String.format("Failed connection to %s: %s",
- addr.toString(), e.getMessage()), e);
- currentException = e;
- }
- }
- throw currentException;
- }
-
- /**
- * Connect to the specified socket address and throw IOException if failed.
- *
- * @param serverAddress Address client needs to connect
- * @throws Exception if there is failure in establishing the connection.
- */
- protected void connect(InetSocketAddress serverAddress) throws IOException {
- try {
- transport = createTransport(serverAddress);
- transport.open();
- } catch (TTransportException e) {
- throw new IOException("Failed to open transport: " + e.getMessage(), e);
- } catch (MissingConfigurationException e) {
- throw new RuntimeException(e.getMessage(), e);
- }
-
- LOGGER.debug("Successfully opened transport: " + transport + " to " + serverAddress);
- }
-
- /**
- * New socket is is created
- *
- * @param serverAddress
- * @return
- * @throws TTransportException
- * @throws MissingConfigurationException
- * @throws IOException
- */
- private TTransport createTransport(InetSocketAddress serverAddress)
- throws TTransportException, MissingConfigurationException, IOException {
- TTransport socket = new TSocket(serverAddress.getHostName(),
- serverAddress.getPort(), connectionTimeout);
-
- if (kerberos) {
- String serverPrincipal = transportConfig.getSentryPrincipal(conf);
- serverPrincipal = SecurityUtil.getServerPrincipal(serverPrincipal, serverAddress.getAddress());
- LOGGER.debug("Using server kerberos principal: " + serverPrincipal);
- if (serverPrincipalParts == null) {
- serverPrincipalParts = SaslRpcServer.splitKerberosName(serverPrincipal);
- Preconditions.checkArgument(serverPrincipalParts.length == 3,
- "Kerberos principal should have 3 parts: " + serverPrincipal);
- }
-
- boolean wrapUgi = transportConfig.useUserGroupInformation(conf);
- return new UgiSaslClientTransport(SaslRpcServer.AuthMethod.KERBEROS.getMechanismName(),
- serverPrincipalParts[0], serverPrincipalParts[1],
- socket, wrapUgi, conf);
- } else {
- return socket;
- }
- }
-
- private boolean isConnected() {
- return transport != null && transport.isOpen();
- }
-
- public synchronized void close() {
- if (isConnected()) {
- transport.close();
- }
- }
-
- public int getRetryCount() {
- return rpcRetryTotal;
- }
-}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/sentry/blob/c9c0119f/sentry-core/sentry-core-common/src/main/java/org/apache/sentry/core/common/transport/SentrySocket.java
----------------------------------------------------------------------
diff --git a/sentry-core/sentry-core-common/src/main/java/org/apache/sentry/core/common/transport/SentrySocket.java b/sentry-core/sentry-core-common/src/main/java/org/apache/sentry/core/common/transport/SentrySocket.java
new file mode 100644
index 0000000..3374489
--- /dev/null
+++ b/sentry-core/sentry-core-common/src/main/java/org/apache/sentry/core/common/transport/SentrySocket.java
@@ -0,0 +1,32 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.sentry.core.common.transport;
+
+import java.io.IOException;
+
+/**
+ * General representation of transport connection to Sentry
+ */
+public interface SentrySocket extends AutoCloseable {
+ /**
+ * Connect to the Sentry server
+ * @throws IOException
+ */
+ void connect() throws IOException;
+}
http://git-wip-us.apache.org/repos/asf/sentry/blob/c9c0119f/sentry-core/sentry-core-common/src/main/java/org/apache/sentry/core/common/transport/SentryTransportFactory.java
----------------------------------------------------------------------
diff --git a/sentry-core/sentry-core-common/src/main/java/org/apache/sentry/core/common/transport/SentryTransportFactory.java b/sentry-core/sentry-core-common/src/main/java/org/apache/sentry/core/common/transport/SentryTransportFactory.java
new file mode 100644
index 0000000..74ac92d
--- /dev/null
+++ b/sentry-core/sentry-core-common/src/main/java/org/apache/sentry/core/common/transport/SentryTransportFactory.java
@@ -0,0 +1,234 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.sentry.core.common.transport;
+
+import com.google.common.base.Preconditions;
+import com.google.common.collect.ImmutableMap;
+import com.google.common.net.HostAndPort;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.security.SaslRpcServer;
+import org.apache.hadoop.security.SecurityUtil;
+import org.apache.hadoop.security.UserGroupInformation;
+import org.apache.sentry.core.common.exception.MissingConfigurationException;
+import org.apache.sentry.core.common.utils.ThriftUtil;
+import org.apache.thrift.transport.TSaslClientTransport;
+import org.apache.thrift.transport.TSocket;
+import org.apache.thrift.transport.TTransport;
+import org.apache.thrift.transport.TTransportException;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.security.sasl.Sasl;
+import javax.security.sasl.SaslException;
+import java.io.IOException;
+import java.net.InetSocketAddress;
+import java.security.PrivilegedExceptionAction;
+import java.util.ArrayList;
+import java.util.Collections;
+
+/**
+ * Generate Thrift transports suitable for talking to Sentry
+ */
+public final class SentryTransportFactory {
+ private static final Logger LOGGER = LoggerFactory.getLogger(SentryTransportFactory.class);
+
+ private final Configuration conf;
+ private final SentryClientTransportConfigInterface transportConfig;
+ private final ArrayList<InetSocketAddress> endpoints;
+
+ public SentryTransportFactory(Configuration conf,
+ SentryClientTransportConfigInterface configInterface) {
+ this.conf = conf;
+ this.transportConfig = configInterface;
+ String hostsAndPortsStr = transportConfig.getSentryServerRpcAddress(conf);
+ int serverPort = transportConfig.getServerRpcPort(conf);
+
+ String[] hostsAndPortsStrArr = hostsAndPortsStr.split(",");
+ HostAndPort[] hostsAndPorts = ThriftUtil.parseHostPortStrings(hostsAndPortsStrArr, serverPort);
+ this.endpoints = new ArrayList<>(hostsAndPortsStrArr.length);
+ for (HostAndPort endpoint : hostsAndPorts) {
+ this.endpoints.add(
+ new InetSocketAddress(endpoint.getHostText(), endpoint.getPort()));
+ LOGGER.debug("Added server endpoint: " + endpoint.toString());
+ }
+ // Reorder endpoints randomly to prevent all clients connecting to the same endpoint
+ // at the same time after a node failure.
+ if (endpoints.size() > 1) {
+ Collections.shuffle(endpoints);
+ }
+ }
+
+ /**
+ * This transport wraps the Sasl transports to set up the right UGI context for open().
+ */
+ private static final class UgiSaslClientTransport extends TSaslClientTransport {
+ private static final ImmutableMap<String, String> SASL_PROPERTIES =
+ ImmutableMap.of(Sasl.SERVER_AUTH, "true", Sasl.QOP, "auth-conf");
+
+ private UserGroupInformation ugi = null;
+
+ private UgiSaslClientTransport(String mechanism, String protocol,
+ String serverName, TTransport transport,
+ boolean wrapUgi, Configuration conf)
+ throws IOException, SaslException {
+ super(mechanism, null, protocol, serverName, SASL_PROPERTIES, null,
+ transport);
+ if (wrapUgi) {
+ // If we don't set the configuration, the UGI will be created based on
+ // what's on the classpath, which may lack the kerberos changes we require
+ UserGroupInformation.setConfiguration(conf);
+ ugi = UserGroupInformation.getLoginUser();
+ }
+ }
+
+ // open the SASL transport with using the current UserGroupInformation
+ // This is needed to get the current login context stored
+ @Override
+ public void open() throws TTransportException {
+ if (ugi == null) {
+ baseOpen();
+ } else {
+ try {
+ if (ugi.isFromKeytab()) {
+ ugi.checkTGTAndReloginFromKeytab();
+ }
+ ugi.doAs(new PrivilegedExceptionAction<Void>() {
+ public Void run() throws TTransportException {
+ baseOpen();
+ return null;
+ }
+ });
+ } catch (IOException e) {
+ throw new TTransportException("Failed to open SASL transport: " + e.getMessage(), e);
+ } catch (InterruptedException e) {
+ throw new TTransportException(
+ "Interrupted while opening underlying transport: " + e.getMessage(), e);
+ }
+ }
+ }
+
+ private void baseOpen() throws TTransportException {
+ super.open();
+ }
+ }
+
+ /**
+ * On connection error, Iterates through all the configured servers and tries to connect.
+ * On successful connection, control returns
+ * On connection failure, continues iterating through all the configured sentry servers,
+ * and then retries the whole server list no more than connectionFullRetryTotal times.
+ * In this case, it won't introduce more latency when some server fails.
+ * <p>
+ * TODO: Add metrics for the number of successful connects and errors per client, and total number of retries.
+ */
+ public TTransport connect() throws IOException {
+ int connectionFullRetryTotal = transportConfig.getSentryFullRetryTotal(conf);
+ IOException currentException = null;
+ for (int retryCount = 0; retryCount < connectionFullRetryTotal; retryCount++) {
+ try {
+ return connectToAvailableServer();
+ } catch (IOException e) {
+ currentException = e;
+ LOGGER.error(
+ String.format("Failed to connect to all the configured sentry servers, " +
+ "Retrying again"));
+ }
+ }
+ // Throw exception as reaching the max full connectWithRetry number.
+ LOGGER.error(
+ String.format("Reach the max connection retry num %d ", connectionFullRetryTotal),
+ currentException);
+ throw currentException;
+ }
+
+ /**
+ * Iterates through all the configured servers and tries to connect.
+ * On connection error, tries to connect to next server.
+ * Control returns on successful connection OR it's done trying to all the
+ * configured servers.
+ *
+ * @throws IOException
+ */
+ private TTransport connectToAvailableServer() throws IOException {
+ IOException currentException = null;
+ for (InetSocketAddress addr : endpoints) {
+ try {
+ return connect(addr);
+ } catch (IOException e) {
+ LOGGER.error(String.format("Failed connection to %s: %s",
+ addr.toString(), e.getMessage()), e);
+ currentException = e;
+ }
+ }
+ if (currentException != null) {
+ throw currentException;
+ }
+ return null;
+ }
+
+ /**
+ * Connect to the specified socket address and throw IOException if failed.
+ *
+ * @param serverAddress Address client needs to connect
+ * @throws Exception if there is failure in establishing the connection.
+ */
+ protected TTransport connect(InetSocketAddress serverAddress) throws IOException {
+ try {
+ TTransport transport = createTransport(serverAddress);
+ transport.open();
+ LOGGER.info(String.format("Connected to SentryServer: %s", serverAddress));
+ return transport;
+ } catch (TTransportException e) {
+ throw new IOException("Failed to open transport: " + e.getMessage(), e);
+ } catch (MissingConfigurationException e) {
+ throw new RuntimeException(e.getMessage(), e);
+ }
+ }
+
+ /**
+ * New socket is is created
+ *
+ * @param serverAddress
+ * @return
+ * @throws TTransportException
+ * @throws MissingConfigurationException
+ * @throws IOException
+ */
+ private TTransport createTransport(InetSocketAddress serverAddress)
+ throws TTransportException, MissingConfigurationException, IOException {
+ TTransport socket = new TSocket(serverAddress.getHostName(),
+ serverAddress.getPort(), transportConfig.getServerRpcConnTimeoutInMs(conf));
+
+ if (!transportConfig.isKerberosEnabled(conf)) {
+ return socket;
+ }
+
+ String serverPrincipal = transportConfig.getSentryPrincipal(conf);
+ serverPrincipal = SecurityUtil.getServerPrincipal(serverPrincipal, serverAddress.getAddress());
+ LOGGER.debug("Using server kerberos principal: " + serverPrincipal);
+ String[] serverPrincipalParts = SaslRpcServer.splitKerberosName(serverPrincipal);
+ Preconditions.checkArgument(serverPrincipalParts.length == 3,
+ "Kerberos principal should have 3 parts: " + serverPrincipal);
+
+ boolean wrapUgi = transportConfig.useUserGroupInformation(conf);
+ return new UgiSaslClientTransport(SaslRpcServer.AuthMethod.KERBEROS.getMechanismName(),
+ serverPrincipalParts[0], serverPrincipalParts[1],
+ socket, wrapUgi, conf);
+ }
+}
http://git-wip-us.apache.org/repos/asf/sentry/blob/c9c0119f/sentry-hdfs/sentry-hdfs-service/src/main/java/org/apache/sentry/hdfs/SentryHDFSServiceClient.java
----------------------------------------------------------------------
diff --git a/sentry-hdfs/sentry-hdfs-service/src/main/java/org/apache/sentry/hdfs/SentryHDFSServiceClient.java b/sentry-hdfs/sentry-hdfs-service/src/main/java/org/apache/sentry/hdfs/SentryHDFSServiceClient.java
index faac053..11f6894 100644
--- a/sentry-hdfs/sentry-hdfs-service/src/main/java/org/apache/sentry/hdfs/SentryHDFSServiceClient.java
+++ b/sentry-hdfs/sentry-hdfs-service/src/main/java/org/apache/sentry/hdfs/SentryHDFSServiceClient.java
@@ -18,9 +18,8 @@
package org.apache.sentry.hdfs;
import org.apache.sentry.core.common.exception.SentryHdfsServiceException;
-import org.apache.sentry.core.common.transport.SentryServiceClient;
-public interface SentryHDFSServiceClient extends SentryServiceClient {
+public interface SentryHDFSServiceClient {
String SENTRY_HDFS_SERVICE_NAME = "SentryHDFSService";
void notifyHMSUpdate(PathsUpdate update)
@@ -30,5 +29,7 @@ public interface SentryHDFSServiceClient extends SentryServiceClient {
SentryAuthzUpdate getAllUpdatesFrom(long permSeqNum, long pathSeqNum)
throws SentryHdfsServiceException;
+
+ void close();
}
http://git-wip-us.apache.org/repos/asf/sentry/blob/c9c0119f/sentry-hdfs/sentry-hdfs-service/src/main/java/org/apache/sentry/hdfs/SentryHDFSServiceClientDefaultImpl.java
----------------------------------------------------------------------
diff --git a/sentry-hdfs/sentry-hdfs-service/src/main/java/org/apache/sentry/hdfs/SentryHDFSServiceClientDefaultImpl.java b/sentry-hdfs/sentry-hdfs-service/src/main/java/org/apache/sentry/hdfs/SentryHDFSServiceClientDefaultImpl.java
index d337319..794aded 100644
--- a/sentry-hdfs/sentry-hdfs-service/src/main/java/org/apache/sentry/hdfs/SentryHDFSServiceClientDefaultImpl.java
+++ b/sentry-hdfs/sentry-hdfs-service/src/main/java/org/apache/sentry/hdfs/SentryHDFSServiceClientDefaultImpl.java
@@ -18,12 +18,13 @@
package org.apache.sentry.hdfs;
import java.io.IOException;
-import java.net.InetSocketAddress;
import java.util.LinkedList;
import org.apache.hadoop.conf.Configuration;
import org.apache.sentry.core.common.exception.SentryHdfsServiceException;
-import org.apache.sentry.core.common.transport.SentryServiceClientTransportDefaultImpl;
+import org.apache.sentry.core.common.transport.SentryClientTransportConfigInterface;
+import org.apache.sentry.core.common.transport.SentrySocket;
+import org.apache.sentry.core.common.transport.SentryTransportFactory;
import org.apache.sentry.hdfs.service.thrift.SentryHDFSService;
import org.apache.sentry.hdfs.service.thrift.SentryHDFSService.Client;
import org.apache.sentry.hdfs.service.thrift.TAuthzUpdateResponse;
@@ -34,6 +35,7 @@ import org.apache.thrift.protocol.TCompactProtocol;
import org.apache.thrift.protocol.TMultiplexedProtocol;
import org.apache.thrift.protocol.TProtocol;
+import org.apache.thrift.transport.TTransport;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -47,28 +49,41 @@ import org.slf4j.LoggerFactory;
*/
-public class SentryHDFSServiceClientDefaultImpl extends SentryServiceClientTransportDefaultImpl implements SentryHDFSServiceClient {
+public class SentryHDFSServiceClientDefaultImpl
+ implements SentryHDFSServiceClient, SentrySocket {
private static final Logger LOGGER = LoggerFactory.getLogger(SentryHDFSServiceClientDefaultImpl.class);
+ private final Configuration conf;
private Client client;
+ private SentryTransportFactory transportFactory;
+ private TTransport transport;
- public SentryHDFSServiceClientDefaultImpl(Configuration conf) throws IOException {
- super(conf, sentryClientType.HDFS_CLIENT);
+
+
+ SentryHDFSServiceClientDefaultImpl(Configuration conf,
+ SentryClientTransportConfigInterface transportConfig)
+ throws IOException {
+ this.conf = conf;
+ transportFactory = new SentryTransportFactory(conf, transportConfig);
}
/**
* Connect to the specified socket address and then use the new socket
* to construct new thrift client.
*
- * @param serverAddress: socket address to which the client should connect.
* @throws IOException
*/
- public void connect(InetSocketAddress serverAddress) throws IOException {
- TProtocol tProtocol = null;
- super.connect(serverAddress);
+ @Override
+ public void connect() throws IOException {
+ if (isOpen()) {
+ return;
+ }
+
+ transport = transportFactory.connect();
long maxMessageSize = conf.getLong(ServiceConstants.ClientConfig.SENTRY_HDFS_THRIFT_MAX_MESSAGE_SIZE,
ServiceConstants.ClientConfig.SENTRY_HDFS_THRIFT_MAX_MESSAGE_SIZE_DEFAULT);
+ TProtocol tProtocol = null;
if (conf.getBoolean(ServiceConstants.ClientConfig.USE_COMPACT_TRANSPORT,
ServiceConstants.ClientConfig.USE_COMPACT_TRANSPORT_DEFAULT)) {
tProtocol = new TCompactProtocol(transport, maxMessageSize, maxMessageSize);
@@ -119,4 +134,14 @@ public class SentryHDFSServiceClientDefaultImpl extends SentryServiceClientTrans
}
return retVal;
}
+
+ @Override
+ public void close() {
+ transport.close();
+ transport = null;
+ }
+
+ private boolean isOpen() {
+ return ((transport != null) && transport.isOpen());
+ }
}
http://git-wip-us.apache.org/repos/asf/sentry/blob/c9c0119f/sentry-hdfs/sentry-hdfs-service/src/main/java/org/apache/sentry/hdfs/SentryHDFSServiceClientFactory.java
----------------------------------------------------------------------
diff --git a/sentry-hdfs/sentry-hdfs-service/src/main/java/org/apache/sentry/hdfs/SentryHDFSServiceClientFactory.java b/sentry-hdfs/sentry-hdfs-service/src/main/java/org/apache/sentry/hdfs/SentryHDFSServiceClientFactory.java
index 59ac360..174da4f 100644
--- a/sentry-hdfs/sentry-hdfs-service/src/main/java/org/apache/sentry/hdfs/SentryHDFSServiceClientFactory.java
+++ b/sentry-hdfs/sentry-hdfs-service/src/main/java/org/apache/sentry/hdfs/SentryHDFSServiceClientFactory.java
@@ -21,12 +21,16 @@ import java.lang.reflect.Proxy;
import org.apache.hadoop.conf.Configuration;
import org.apache.sentry.core.common.transport.RetryClientInvocationHandler;
+import org.apache.sentry.core.common.transport.SentryClientTransportConfigInterface;
+import org.apache.sentry.core.common.transport.SentryHDFSClientTransportConfig;
/**
* Client factory to create normal client or proxy with HA invocation handler
*/
public class SentryHDFSServiceClientFactory {
-
+ private static final SentryClientTransportConfigInterface transportConfig =
+ new SentryHDFSClientTransportConfig();
+
private SentryHDFSServiceClientFactory() {
// Make constructor private to avoid instantiation
}
@@ -36,7 +40,8 @@ public class SentryHDFSServiceClientFactory {
return (SentryHDFSServiceClient) Proxy
.newProxyInstance(SentryHDFSServiceClientDefaultImpl.class.getClassLoader(),
SentryHDFSServiceClientDefaultImpl.class.getInterfaces(),
- new RetryClientInvocationHandler(conf,
- new SentryHDFSServiceClientDefaultImpl(conf)));
+ new RetryClientInvocationHandler(conf, transportConfig,
+ new SentryHDFSServiceClientDefaultImpl(conf,
+ transportConfig)));
}
}
http://git-wip-us.apache.org/repos/asf/sentry/blob/c9c0119f/sentry-provider/sentry-provider-db/src/main/java/org/apache/sentry/provider/db/generic/service/thrift/SentryGenericServiceClient.java
----------------------------------------------------------------------
diff --git a/sentry-provider/sentry-provider-db/src/main/java/org/apache/sentry/provider/db/generic/service/thrift/SentryGenericServiceClient.java b/sentry-provider/sentry-provider-db/src/main/java/org/apache/sentry/provider/db/generic/service/thrift/SentryGenericServiceClient.java
index c832706..11cdee7 100644
--- a/sentry-provider/sentry-provider-db/src/main/java/org/apache/sentry/provider/db/generic/service/thrift/SentryGenericServiceClient.java
+++ b/sentry-provider/sentry-provider-db/src/main/java/org/apache/sentry/provider/db/generic/service/thrift/SentryGenericServiceClient.java
@@ -24,9 +24,8 @@ import java.util.Set;
import org.apache.sentry.core.common.exception.SentryUserException;
import org.apache.sentry.core.common.ActiveRoleSet;
import org.apache.sentry.core.common.Authorizable;
-import org.apache.sentry.core.common.transport.SentryServiceClient;
-public interface SentryGenericServiceClient extends SentryServiceClient {
+public interface SentryGenericServiceClient {
/**
* Create a sentry role
@@ -192,4 +191,6 @@ public interface SentryGenericServiceClient extends SentryServiceClient {
Map<String, TSentryPrivilegeMap> listPrivilegsbyAuthorizable(String component,
String serviceName, String requestorUserName, Set<String> authorizablesSet,
Set<String> groups, ActiveRoleSet roleSet) throws SentryUserException;
+
+ void close();
}
http://git-wip-us.apache.org/repos/asf/sentry/blob/c9c0119f/sentry-provider/sentry-provider-db/src/main/java/org/apache/sentry/provider/db/generic/service/thrift/SentryGenericServiceClientDefaultImpl.java
----------------------------------------------------------------------
diff --git a/sentry-provider/sentry-provider-db/src/main/java/org/apache/sentry/provider/db/generic/service/thrift/SentryGenericServiceClientDefaultImpl.java b/sentry-provider/sentry-provider-db/src/main/java/org/apache/sentry/provider/db/generic/service/thrift/SentryGenericServiceClientDefaultImpl.java
index 9bbd736..c9d0357 100644
--- a/sentry-provider/sentry-provider-db/src/main/java/org/apache/sentry/provider/db/generic/service/thrift/SentryGenericServiceClientDefaultImpl.java
+++ b/sentry-provider/sentry-provider-db/src/main/java/org/apache/sentry/provider/db/generic/service/thrift/SentryGenericServiceClientDefaultImpl.java
@@ -18,18 +18,16 @@
package org.apache.sentry.provider.db.generic.service.thrift;
import java.io.IOException;
-import java.net.InetSocketAddress;
import java.util.*;
import org.apache.hadoop.conf.Configuration;
-import static org.apache.hadoop.fs.CommonConfigurationKeysPublic.HADOOP_SECURITY_AUTHENTICATION;
-
import org.apache.sentry.core.common.exception.SentryUserException;
import org.apache.sentry.core.common.ActiveRoleSet;
import org.apache.sentry.core.common.Authorizable;
-import org.apache.sentry.core.common.transport.SentryServiceClientTransportDefaultImpl;
-import org.apache.sentry.core.common.utils.SentryConstants;
+import org.apache.sentry.core.common.transport.SentryClientTransportConfigInterface;
+import org.apache.sentry.core.common.transport.SentrySocket;
+import org.apache.sentry.core.common.transport.SentryTransportFactory;
import org.apache.sentry.core.model.db.AccessConstants;
import org.apache.sentry.service.thrift.ServiceConstants;
import org.apache.sentry.service.thrift.Status;
@@ -38,6 +36,7 @@ import org.apache.thrift.TException;
import org.apache.thrift.protocol.TBinaryProtocol;
import org.apache.thrift.protocol.TMultiplexedProtocol;
+import org.apache.thrift.transport.TTransport;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -52,30 +51,48 @@ import com.google.common.collect.Lists;
So it is important to close and re-open the transport so that new socket is used.
*/
-public class SentryGenericServiceClientDefaultImpl extends SentryServiceClientTransportDefaultImpl implements SentryGenericServiceClient {
+public class SentryGenericServiceClientDefaultImpl
+ implements SentryGenericServiceClient, SentrySocket {
+ private final SentryTransportFactory transportFactory;
+ private final Configuration conf;
+ private TTransport transport;
+
+
private SentryGenericPolicyService.Client client;
private static final Logger LOGGER = LoggerFactory
.getLogger(SentryGenericServiceClientDefaultImpl.class);
private static final String THRIFT_EXCEPTION_MESSAGE = "Thrift exception occured ";
- public SentryGenericServiceClientDefaultImpl(Configuration conf) throws IOException {
- super(conf, sentryClientType.POLICY_CLIENT);
+ public SentryGenericServiceClientDefaultImpl(Configuration conf,
+ SentryClientTransportConfigInterface transportConfig)
+ throws IOException {
+ this.conf = conf;
+ transportFactory = new SentryTransportFactory(conf, transportConfig);
+
+ // TODO - do it correctly
+ /*
if (kerberos) {
// since the client uses hadoop-auth, we need to set kerberos in
// hadoop-auth if we plan to use kerberos
conf.set(HADOOP_SECURITY_AUTHENTICATION, SentryConstants.KERBEROS_MODE);
}
+ */
}
/**
* Connect to the specified socket address and then use the new socket
* to construct new thrift client.
*
- * @param serverAddress: socket address to which the client should connect.
* @throws IOException
*/
- public void connect(InetSocketAddress serverAddress) throws IOException {
- super.connect(serverAddress);
+ @Override
+ public void connect() throws IOException {
+ if (isOpen()) {
+ return;
+ }
+
+ transport = transportFactory.connect();
+
long maxMessageSize = conf.getLong(ServiceConstants.ClientConfig.SENTRY_POLICY_CLIENT_THRIFT_MAX_MESSAGE_SIZE,
ServiceConstants.ClientConfig.SENTRY_POLICY_CLIENT_THRIFT_MAX_MESSAGE_SIZE_DEFAULT);
TMultiplexedProtocol protocol = new TMultiplexedProtocol(
@@ -84,6 +101,12 @@ public class SentryGenericServiceClientDefaultImpl extends SentryServiceClientTr
client = new SentryGenericPolicyService.Client(protocol);
LOGGER.debug("Successfully created client");
}
+
+ private boolean isOpen() {
+ return ((transport != null) && transport.isOpen());
+ }
+
+
/**
* Create a sentry role
*
@@ -506,4 +529,9 @@ public class SentryGenericServiceClientDefaultImpl extends SentryServiceClientTr
throw new SentryUserException(THRIFT_EXCEPTION_MESSAGE, e);
}
}
+
+ @Override
+ public void close() {
+ transport.close();
+ }
}
http://git-wip-us.apache.org/repos/asf/sentry/blob/c9c0119f/sentry-provider/sentry-provider-db/src/main/java/org/apache/sentry/provider/db/generic/service/thrift/SentryGenericServiceClientFactory.java
----------------------------------------------------------------------
diff --git a/sentry-provider/sentry-provider-db/src/main/java/org/apache/sentry/provider/db/generic/service/thrift/SentryGenericServiceClientFactory.java b/sentry-provider/sentry-provider-db/src/main/java/org/apache/sentry/provider/db/generic/service/thrift/SentryGenericServiceClientFactory.java
index 1c582f0..9132449 100644
--- a/sentry-provider/sentry-provider-db/src/main/java/org/apache/sentry/provider/db/generic/service/thrift/SentryGenericServiceClientFactory.java
+++ b/sentry-provider/sentry-provider-db/src/main/java/org/apache/sentry/provider/db/generic/service/thrift/SentryGenericServiceClientFactory.java
@@ -19,6 +19,8 @@ package org.apache.sentry.provider.db.generic.service.thrift;
import org.apache.hadoop.conf.Configuration;
import org.apache.sentry.core.common.transport.RetryClientInvocationHandler;
+import org.apache.sentry.core.common.transport.SentryClientTransportConfigInterface;
+import org.apache.sentry.core.common.transport.SentryPolicyClientTransportConfig;
import java.lang.reflect.Proxy;
@@ -26,6 +28,8 @@ import java.lang.reflect.Proxy;
* SentryGenericServiceClientFactory is a public class for the components which using Generic Model to create sentry client.
*/
public final class SentryGenericServiceClientFactory {
+ private static final SentryClientTransportConfigInterface transportConfig =
+ new SentryPolicyClientTransportConfig();
private SentryGenericServiceClientFactory() {
}
@@ -34,8 +38,8 @@ public final class SentryGenericServiceClientFactory {
return (SentryGenericServiceClient) Proxy
.newProxyInstance(SentryGenericServiceClientDefaultImpl.class.getClassLoader(),
SentryGenericServiceClientDefaultImpl.class.getInterfaces(),
- new RetryClientInvocationHandler(conf,
- new SentryGenericServiceClientDefaultImpl(conf)));
+ new RetryClientInvocationHandler(conf, transportConfig,
+ new SentryGenericServiceClientDefaultImpl(conf, transportConfig)));
}
}
http://git-wip-us.apache.org/repos/asf/sentry/blob/c9c0119f/sentry-provider/sentry-provider-db/src/main/java/org/apache/sentry/provider/db/service/thrift/SentryPolicyServiceClient.java
----------------------------------------------------------------------
diff --git a/sentry-provider/sentry-provider-db/src/main/java/org/apache/sentry/provider/db/service/thrift/SentryPolicyServiceClient.java b/sentry-provider/sentry-provider-db/src/main/java/org/apache/sentry/provider/db/service/thrift/SentryPolicyServiceClient.java
index 28c3e35..3b25db7 100644
--- a/sentry-provider/sentry-provider-db/src/main/java/org/apache/sentry/provider/db/service/thrift/SentryPolicyServiceClient.java
+++ b/sentry-provider/sentry-provider-db/src/main/java/org/apache/sentry/provider/db/service/thrift/SentryPolicyServiceClient.java
@@ -25,9 +25,8 @@ import java.util.Set;
import org.apache.sentry.core.common.exception.SentryUserException;
import org.apache.sentry.core.common.ActiveRoleSet;
import org.apache.sentry.core.common.Authorizable;
-import org.apache.sentry.core.common.transport.SentryServiceClient;
-public interface SentryPolicyServiceClient extends SentryServiceClient {
+public interface SentryPolicyServiceClient {
void createRole(String requestorUserName, String roleName) throws SentryUserException;
@@ -216,4 +215,6 @@ public interface SentryPolicyServiceClient extends SentryServiceClient {
// export the sentry mapping data with map structure
Map<String, Map<String, Set<String>>> exportPolicy(String requestorUserName, String objectPath)
throws SentryUserException;
+
+ void close();
}
http://git-wip-us.apache.org/repos/asf/sentry/blob/c9c0119f/sentry-provider/sentry-provider-db/src/main/java/org/apache/sentry/provider/db/service/thrift/SentryPolicyServiceClientDefaultImpl.java
----------------------------------------------------------------------
diff --git a/sentry-provider/sentry-provider-db/src/main/java/org/apache/sentry/provider/db/service/thrift/SentryPolicyServiceClientDefaultImpl.java b/sentry-provider/sentry-provider-db/src/main/java/org/apache/sentry/provider/db/service/thrift/SentryPolicyServiceClientDefaultImpl.java
index b4c1a5f..9eb60cc 100644
--- a/sentry-provider/sentry-provider-db/src/main/java/org/apache/sentry/provider/db/service/thrift/SentryPolicyServiceClientDefaultImpl.java
+++ b/sentry-provider/sentry-provider-db/src/main/java/org/apache/sentry/provider/db/service/thrift/SentryPolicyServiceClientDefaultImpl.java
@@ -19,7 +19,6 @@
package org.apache.sentry.provider.db.service.thrift;
import java.io.IOException;
-import java.net.InetSocketAddress;
import java.util.List;
import java.util.Map;
import java.util.Set;
@@ -31,6 +30,9 @@ import org.apache.hadoop.conf.Configuration;
import org.apache.sentry.core.common.exception.SentryUserException;
import org.apache.sentry.core.common.ActiveRoleSet;
import org.apache.sentry.core.common.Authorizable;
+import org.apache.sentry.core.common.transport.SentryClientTransportConfigInterface;
+import org.apache.sentry.core.common.transport.SentrySocket;
+import org.apache.sentry.core.common.transport.SentryTransportFactory;
import org.apache.sentry.core.model.db.AccessConstants;
import org.apache.sentry.core.model.db.DBModelAuthorizable;
import org.apache.sentry.core.common.utils.PolicyFileConstants;
@@ -42,7 +44,7 @@ import org.apache.sentry.service.thrift.Status;
import org.apache.thrift.TException;
import org.apache.thrift.protocol.TBinaryProtocol;
import org.apache.thrift.protocol.TMultiplexedProtocol;
-import org.apache.sentry.core.common.transport.SentryServiceClientTransportDefaultImpl;
+import org.apache.thrift.transport.TTransport;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -65,35 +67,43 @@ import com.google.common.collect.Sets;
server this is configured.
*/
-public class SentryPolicyServiceClientDefaultImpl extends SentryServiceClientTransportDefaultImpl implements SentryPolicyServiceClient {
+public class SentryPolicyServiceClientDefaultImpl
+ implements SentryPolicyServiceClient, SentrySocket {
+
+ private final Configuration conf;
private SentryPolicyService.Client client;
private static final Logger LOGGER = LoggerFactory
.getLogger(SentryPolicyServiceClient.class);
+ private SentryTransportFactory transportFactory;
+ private TTransport transport;
+
+
private static final String THRIFT_EXCEPTION_MESSAGE = "Thrift exception occurred ";
/**
* Initialize the sentry configurations.
*/
- public SentryPolicyServiceClientDefaultImpl(Configuration conf)
+ public SentryPolicyServiceClientDefaultImpl(Configuration conf,
+ SentryClientTransportConfigInterface transportConfig)
throws IOException {
- super(conf, sentryClientType.POLICY_CLIENT);
- }
-
- public SentryPolicyServiceClientDefaultImpl(String addr, int port,
- Configuration conf) throws IOException {
- super(addr, port, conf, sentryClientType.POLICY_CLIENT);
+ this.conf = conf;
+ this.transportFactory = new SentryTransportFactory(conf, transportConfig);
}
/**
* Connect to the specified socket address and then use the new socket
* to construct new thrift client.
*
- * @param serverAddress: socket address to which the client should connect.
* @throws IOException
*/
- public void connect(InetSocketAddress serverAddress) throws IOException {
- super.connect(serverAddress);
+ @Override
+ public void connect() throws IOException {
+ if (isOpen()) {
+ return;
+ }
+ transport = transportFactory.connect();
+
long maxMessageSize = conf.getLong(
ServiceConstants.ClientConfig.SENTRY_POLICY_CLIENT_THRIFT_MAX_MESSAGE_SIZE,
ServiceConstants.ClientConfig.SENTRY_POLICY_CLIENT_THRIFT_MAX_MESSAGE_SIZE_DEFAULT);
@@ -1008,4 +1018,14 @@ public class SentryPolicyServiceClientDefaultImpl extends SentryServiceClientTra
}
return rolePrivilegesMapForFile;
}
+
+ @Override
+ public void close() {
+ transport.close();
+ transport = null;
+ }
+
+ private boolean isOpen() {
+ return ((transport != null) && transport.isOpen());
+ }
}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/sentry/blob/c9c0119f/sentry-provider/sentry-provider-db/src/main/java/org/apache/sentry/service/thrift/SentryServiceClientFactory.java
----------------------------------------------------------------------
diff --git a/sentry-provider/sentry-provider-db/src/main/java/org/apache/sentry/service/thrift/SentryServiceClientFactory.java b/sentry-provider/sentry-provider-db/src/main/java/org/apache/sentry/service/thrift/SentryServiceClientFactory.java
index 745dc4c..55c51d3 100644
--- a/sentry-provider/sentry-provider-db/src/main/java/org/apache/sentry/service/thrift/SentryServiceClientFactory.java
+++ b/sentry-provider/sentry-provider-db/src/main/java/org/apache/sentry/service/thrift/SentryServiceClientFactory.java
@@ -23,29 +23,24 @@ import java.lang.reflect.Proxy;
import org.apache.hadoop.conf.Configuration;
import org.apache.sentry.core.common.transport.RetryClientInvocationHandler;
+import org.apache.sentry.core.common.transport.SentryClientTransportConfigInterface;
+import org.apache.sentry.core.common.transport.SentryPolicyClientTransportConfig;
import org.apache.sentry.provider.db.service.thrift.SentryPolicyServiceClient;
import org.apache.sentry.provider.db.service.thrift.SentryPolicyServiceClientDefaultImpl;
-import org.apache.sentry.service.thrift.ServiceConstants.ClientConfig;
public final class SentryServiceClientFactory {
+ private static final SentryClientTransportConfigInterface transportConfig =
+ new SentryPolicyClientTransportConfig();
+
private SentryServiceClientFactory() {
}
public static SentryPolicyServiceClient create(Configuration conf) throws Exception {
- boolean pooled = conf.getBoolean(
- ClientConfig.SENTRY_POOL_ENABLED, ClientConfig.SENTRY_POOL_ENABLED_DEFAULT);
- if (pooled) {
- return (SentryPolicyServiceClient) Proxy
- .newProxyInstance(SentryPolicyServiceClientDefaultImpl.class.getClassLoader(),
- SentryPolicyServiceClientDefaultImpl.class.getInterfaces(),
- new PoolClientInvocationHandler(conf));
- } else {
- return (SentryPolicyServiceClient) Proxy
- .newProxyInstance(SentryPolicyServiceClientDefaultImpl.class.getClassLoader(),
- SentryPolicyServiceClientDefaultImpl.class.getInterfaces(),
- new RetryClientInvocationHandler(conf,
- new SentryPolicyServiceClientDefaultImpl(conf)));
- }
+ return (SentryPolicyServiceClient) Proxy
+ .newProxyInstance(SentryPolicyServiceClientDefaultImpl.class.getClassLoader(),
+ SentryPolicyServiceClientDefaultImpl.class.getInterfaces(),
+ new RetryClientInvocationHandler(conf, transportConfig,
+ new SentryPolicyServiceClientDefaultImpl(conf, transportConfig)));
}
}
http://git-wip-us.apache.org/repos/asf/sentry/blob/c9c0119f/sentry-provider/sentry-provider-db/src/main/java/org/apache/sentry/service/thrift/SentryServiceClientPoolFactory.java
----------------------------------------------------------------------
diff --git a/sentry-provider/sentry-provider-db/src/main/java/org/apache/sentry/service/thrift/SentryServiceClientPoolFactory.java b/sentry-provider/sentry-provider-db/src/main/java/org/apache/sentry/service/thrift/SentryServiceClientPoolFactory.java
index 0164fa6..dd13e0d 100644
--- a/sentry-provider/sentry-provider-db/src/main/java/org/apache/sentry/service/thrift/SentryServiceClientPoolFactory.java
+++ b/sentry-provider/sentry-provider-db/src/main/java/org/apache/sentry/service/thrift/SentryServiceClientPoolFactory.java
@@ -23,9 +23,9 @@ import org.apache.commons.pool2.PooledObject;
import org.apache.commons.pool2.impl.DefaultPooledObject;
import org.apache.hadoop.conf.Configuration;
import org.apache.sentry.provider.db.service.thrift.SentryPolicyServiceClient;
-import org.apache.sentry.provider.db.service.thrift.SentryPolicyServiceClientDefaultImpl;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import sun.reflect.generics.reflectiveObjects.NotImplementedException;
/**
* SentryServiceClientPoolFactory is for connection pool to manage the object. Implement the related
@@ -36,21 +36,21 @@ public class SentryServiceClientPoolFactory extends BasePooledObjectFactory<Sent
private static final Logger LOGGER = LoggerFactory.getLogger(SentryServiceClientPoolFactory.class);
- private final String addr;
- private final int port;
- private final Configuration conf;
+ //private final String addr;
+ //private final int port;
+ //private final Configuration conf;
public SentryServiceClientPoolFactory(String addr, int port,
Configuration conf) {
- this.addr = addr;
- this.port = port;
- this.conf = conf;
+ LOGGER.debug("addr = " + addr + "port = " + String.valueOf(port) + " conf = ", conf.toString());
+ //this.addr = addr;
+ //this.port = port;
+ //this.conf = conf;
}
@Override
public SentryPolicyServiceClient create() throws Exception {
- LOGGER.debug("Creating Sentry Service Client...");
- return new SentryPolicyServiceClientDefaultImpl(addr, port, conf);
+ throw new NotImplementedException();
}
@Override
@@ -60,13 +60,6 @@ public class SentryServiceClientPoolFactory extends BasePooledObjectFactory<Sent
@Override
public void destroyObject(PooledObject<SentryPolicyServiceClient> pooledObject) {
- SentryPolicyServiceClient client = pooledObject.getObject();
- LOGGER.debug("Destroying Sentry Service Client: " + client);
- if (client != null) {
- // The close() of TSocket or TSaslClientTransport is called actually, and there has no
- // exception even there has some problems, eg, the client is closed already.
- // The close here is just try to close the socket and the client will be destroyed soon.
- client.close();
- }
+ throw new NotImplementedException();
}
}