You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@sentry.apache.org by va...@apache.org on 2017/06/14 00:57:28 UTC
[50/52] [abbrv] sentry git commit: SENTRY-1580: Provide pooled client
connection model with HA (Alex Kolbasov, reviewed by Vamsee Yarlagadda,
Kalyan Kalvagadda) CDH-54389: Sentry-1580 Provide pooled client connection
model with HA SENTRY-1800: Flaky test
http://git-wip-us.apache.org/repos/asf/sentry/blob/73fde5c9/sentry-core/sentry-core-common/src/main/java/org/apache/sentry/core/common/transport/TransportFactory.java
----------------------------------------------------------------------
diff --git a/sentry-core/sentry-core-common/src/main/java/org/apache/sentry/core/common/transport/TransportFactory.java b/sentry-core/sentry-core-common/src/main/java/org/apache/sentry/core/common/transport/TransportFactory.java
new file mode 100644
index 0000000..e115cbb
--- /dev/null
+++ b/sentry-core/sentry-core-common/src/main/java/org/apache/sentry/core/common/transport/TransportFactory.java
@@ -0,0 +1,39 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.sentry.core.common.transport;
+
+import com.google.common.net.HostAndPort;
+
+import java.io.IOException;
+
+/**
+ * Generic transport factory interface.
+ * <p>
+ * The intention is to implement transport pool in more abstract terms
+ * and be able to test it without actually connecting to any servers by
+ * implementing mock transport factories.
+ */
+public interface TransportFactory {
+ /**
+ * Connect to the endpoint and return a connected Thrift transport.
+ * @return Connection to the endpoint
+ * @throws IOException
+ */
+ TTransportWrapper getTransport(HostAndPort endpoint) throws IOException;
+}
http://git-wip-us.apache.org/repos/asf/sentry/blob/73fde5c9/sentry-core/sentry-core-common/src/main/java/org/apache/sentry/core/common/transport/UserGroupInformationInitializer.java
----------------------------------------------------------------------
diff --git a/sentry-core/sentry-core-common/src/main/java/org/apache/sentry/core/common/transport/UserGroupInformationInitializer.java b/sentry-core/sentry-core-common/src/main/java/org/apache/sentry/core/common/transport/UserGroupInformationInitializer.java
deleted file mode 100644
index 19ba12c..0000000
--- a/sentry-core/sentry-core-common/src/main/java/org/apache/sentry/core/common/transport/UserGroupInformationInitializer.java
+++ /dev/null
@@ -1,52 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- * <p>
- * http://www.apache.org/licenses/LICENSE-2.0
- * <p>
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.sentry.core.common.transport;
-
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.security.UserGroupInformation;
-
-import static org.apache.hadoop.fs.CommonConfigurationKeysPublic.HADOOP_SECURITY_AUTHENTICATION;
-
-/**
- * Wrapper to initialize UserGroupInformation
- */
-
-public class UserGroupInformationInitializer {
-
- // initialize() method could be called my multiple threads.
- // to attain visibility guarantee on isInitialized, it is declared volatile.
- private static volatile boolean isInitialized = false;
-
- // initialization block may be executed multiple times. This is fine as setConfiguration is
- // thread-safe
- public static void initialize(Configuration conf) {
- if(!isInitialized) {
- Configuration newConf = new Configuration(conf);
- // When kerberos is enabled, UserGroupInformation should have been initialized with
- // HADOOP_SECURITY_AUTHENTICATION property. There are instances where this is not done.
- // Example: Solr and Kafka while using sentry generic clients were not updating this
- // property. Instead of depending on the callers to update this configuration and to be
- // sure that UserGroupInformation is properly initialized, sentry client is explicitly
- // doing it,
- newConf.set(HADOOP_SECURITY_AUTHENTICATION, SentryClientTransportConstants.KERBEROS_MODE);
- UserGroupInformation.setConfiguration(newConf);
- isInitialized = true;
- }
- }
-}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/sentry/blob/73fde5c9/sentry-core/sentry-core-common/src/main/java/org/apache/sentry/core/common/utils/ThriftUtil.java
----------------------------------------------------------------------
diff --git a/sentry-core/sentry-core-common/src/main/java/org/apache/sentry/core/common/utils/ThriftUtil.java b/sentry-core/sentry-core-common/src/main/java/org/apache/sentry/core/common/utils/ThriftUtil.java
index 9e38a30..fb0630c 100644
--- a/sentry-core/sentry-core-common/src/main/java/org/apache/sentry/core/common/utils/ThriftUtil.java
+++ b/sentry-core/sentry-core-common/src/main/java/org/apache/sentry/core/common/utils/ThriftUtil.java
@@ -112,12 +112,8 @@ public class ThriftUtil {
* (host:port). The hostname could be in ipv6 style. If port is not specified,
* defaultPort will be used.
*/
- public static HostAndPort[] parseHostPortStrings(String[] hostsAndPortsArr, int defaultPort) {
- HostAndPort[] hostsAndPorts = new HostAndPort[hostsAndPortsArr.length];
- for (int i = 0; i < hostsAndPorts.length; i++) {
- hostsAndPorts[i] =
- HostAndPort.fromString(hostsAndPortsArr[i]).withDefaultPort(defaultPort);
- }
- return hostsAndPorts;
+ public static HostAndPort parseAddress(String address, int defaultPort) {
+ return HostAndPort.fromString(address).withDefaultPort(defaultPort);
}
+
}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/sentry/blob/73fde5c9/sentry-hdfs/sentry-hdfs-dist/pom.xml
----------------------------------------------------------------------
diff --git a/sentry-hdfs/sentry-hdfs-dist/pom.xml b/sentry-hdfs/sentry-hdfs-dist/pom.xml
index c9610eb..48406ab 100644
--- a/sentry-hdfs/sentry-hdfs-dist/pom.xml
+++ b/sentry-hdfs/sentry-hdfs-dist/pom.xml
@@ -78,6 +78,7 @@ limitations under the License.
<include>org.apache.sentry:sentry-hdfs-service</include>
<include>org.apache.sentry:sentry-core-common</include>
<include>org.apache.thrift:libthrift</include>
+ <include>org.apache.commons:commons-pool2</include>
</includes>
</artifactSet>
</configuration>
http://git-wip-us.apache.org/repos/asf/sentry/blob/73fde5c9/sentry-hdfs/sentry-hdfs-namenode-plugin/src/main/java/org/apache/sentry/hdfs/SentryUpdater.java
----------------------------------------------------------------------
diff --git a/sentry-hdfs/sentry-hdfs-namenode-plugin/src/main/java/org/apache/sentry/hdfs/SentryUpdater.java b/sentry-hdfs/sentry-hdfs-namenode-plugin/src/main/java/org/apache/sentry/hdfs/SentryUpdater.java
index 422554e..c3cc009 100644
--- a/sentry-hdfs/sentry-hdfs-namenode-plugin/src/main/java/org/apache/sentry/hdfs/SentryUpdater.java
+++ b/sentry-hdfs/sentry-hdfs-namenode-plugin/src/main/java/org/apache/sentry/hdfs/SentryUpdater.java
@@ -22,7 +22,7 @@ import org.apache.sentry.hdfs.SentryAuthzUpdate;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-public class SentryUpdater {
+class SentryUpdater {
private SentryHDFSServiceClient sentryClient;
private final Configuration conf;
@@ -30,12 +30,12 @@ public class SentryUpdater {
private static Logger LOG = LoggerFactory.getLogger(SentryUpdater.class);
- public SentryUpdater(Configuration conf, SentryAuthorizationInfo authzInfo) throws Exception {
+ SentryUpdater(Configuration conf, SentryAuthorizationInfo authzInfo) throws Exception {
this.conf = conf;
this.authzInfo = authzInfo;
}
- public SentryAuthzUpdate getUpdates() {
+ SentryAuthzUpdate getUpdates() {
if (sentryClient == null) {
try {
sentryClient = SentryHDFSServiceClientFactory.create(conf);
http://git-wip-us.apache.org/repos/asf/sentry/blob/73fde5c9/sentry-hdfs/sentry-hdfs-service/src/main/java/org/apache/sentry/hdfs/SentryHDFSServiceClient.java
----------------------------------------------------------------------
diff --git a/sentry-hdfs/sentry-hdfs-service/src/main/java/org/apache/sentry/hdfs/SentryHDFSServiceClient.java b/sentry-hdfs/sentry-hdfs-service/src/main/java/org/apache/sentry/hdfs/SentryHDFSServiceClient.java
index de9507b..49d2360 100644
--- a/sentry-hdfs/sentry-hdfs-service/src/main/java/org/apache/sentry/hdfs/SentryHDFSServiceClient.java
+++ b/sentry-hdfs/sentry-hdfs-service/src/main/java/org/apache/sentry/hdfs/SentryHDFSServiceClient.java
@@ -17,19 +17,9 @@
*/
package org.apache.sentry.hdfs;
-import org.apache.sentry.core.common.exception.SentryHdfsServiceException;
+public interface SentryHDFSServiceClient extends AutoCloseable {
+ String SENTRY_HDFS_SERVICE_NAME = "SentryHDFSService";
-public interface SentryHDFSServiceClient {
- public static final String SENTRY_HDFS_SERVICE_NAME = "SentryHDFSService";
-
- public void notifyHMSUpdate(PathsUpdate update)
- throws SentryHdfsServiceException;
-
- public long getLastSeenHMSPathSeqNum() throws SentryHdfsServiceException;
-
- public SentryAuthzUpdate getAllUpdatesFrom(long permSeqNum, long pathSeqNum)
- throws SentryHdfsServiceException;
-
- public void close();
+ SentryAuthzUpdate getAllUpdatesFrom(long permSeqNum, long pathSeqNum);
}
http://git-wip-us.apache.org/repos/asf/sentry/blob/73fde5c9/sentry-hdfs/sentry-hdfs-service/src/main/java/org/apache/sentry/hdfs/SentryHDFSServiceClientDefaultImpl.java
----------------------------------------------------------------------
diff --git a/sentry-hdfs/sentry-hdfs-service/src/main/java/org/apache/sentry/hdfs/SentryHDFSServiceClientDefaultImpl.java b/sentry-hdfs/sentry-hdfs-service/src/main/java/org/apache/sentry/hdfs/SentryHDFSServiceClientDefaultImpl.java
index 798bbef..1cdbb85 100644
--- a/sentry-hdfs/sentry-hdfs-service/src/main/java/org/apache/sentry/hdfs/SentryHDFSServiceClientDefaultImpl.java
+++ b/sentry-hdfs/sentry-hdfs-service/src/main/java/org/apache/sentry/hdfs/SentryHDFSServiceClientDefaultImpl.java
@@ -17,15 +17,12 @@
*/
package org.apache.sentry.hdfs;
-import java.io.IOException;
-import java.util.LinkedList;
-
import org.apache.hadoop.conf.Configuration;
import org.apache.sentry.core.common.exception.SentryHdfsServiceException;
-import org.apache.sentry.core.common.transport.SentryHDFSClientTransportConfig;
-import org.apache.sentry.core.common.transport.SentryServiceClient;
-import org.apache.sentry.core.common.transport.SentryTransportFactory;
-import org.apache.sentry.hdfs.service.thrift.SentryHDFSService;
+import org.apache.sentry.core.common.transport.SentryConnection;
+import org.apache.sentry.core.common.transport.SentryTransportPool;
+import org.apache.sentry.core.common.transport.TTransportWrapper;
+import org.apache.sentry.hdfs.ServiceConstants.ClientConfig;
import org.apache.sentry.hdfs.service.thrift.SentryHDFSService.Client;
import org.apache.sentry.hdfs.service.thrift.TAuthzUpdateResponse;
import org.apache.sentry.hdfs.service.thrift.TPathsUpdate;
@@ -35,82 +32,57 @@ import org.apache.thrift.protocol.TCompactProtocol;
import org.apache.thrift.protocol.TMultiplexedProtocol;
import org.apache.thrift.protocol.TProtocol;
-import org.apache.thrift.transport.TTransport;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
+import java.io.IOException;
+import java.util.LinkedList;
/**
* Sentry HDFS Service Client
* <p>
- * The public implementation of SentryHDFSServiceClient.
- * A Sentry Client in which all the operations are synchronized for thread safety
- * Note: When using this client, if there is an exception in RPC, socket can get into an inconsistent state.
- * So it is important to close and re-open the transport so that new socket is used.
+ * The class isn't thread-safe - it is up to the aller to ensure thread safety
*/
-
-public class SentryHDFSServiceClientDefaultImpl implements SentryHDFSServiceClient, SentryServiceClient {
-
- private static final Logger LOGGER = LoggerFactory.getLogger(SentryHDFSServiceClientDefaultImpl.class);
+public class SentryHDFSServiceClientDefaultImpl
+ implements SentryHDFSServiceClient, SentryConnection {
+ private final boolean useCompactTransport;
private Client client;
- private SentryTransportFactory transportFactory;
- private TTransport transport;
- private Configuration conf;
+ private final SentryTransportPool transportPool;
+ private TTransportWrapper transport;
+ private final long maxMessageSize;
- public SentryHDFSServiceClientDefaultImpl(Configuration conf, SentryHDFSClientTransportConfig transportConfig) throws IOException {
- transportFactory = new SentryTransportFactory(conf, transportConfig);
- this.conf = conf;
+ SentryHDFSServiceClientDefaultImpl(Configuration conf,
+ SentryTransportPool transportPool) throws IOException {
+ maxMessageSize = conf.getLong(ClientConfig.SENTRY_HDFS_THRIFT_MAX_MESSAGE_SIZE,
+ ClientConfig.SENTRY_HDFS_THRIFT_MAX_MESSAGE_SIZE_DEFAULT);
+ useCompactTransport = conf.getBoolean(ClientConfig.USE_COMPACT_TRANSPORT,
+ ClientConfig.USE_COMPACT_TRANSPORT_DEFAULT);
+ this.transportPool = transportPool;
}
/**
* Connect to the sentry server
*
- * @throws IOException
+ * @throws Exception
*/
@Override
- public synchronized void connect() throws IOException {
- if (transport != null && transport.isOpen()) {
+ public void connect() throws Exception {
+ if ((transport != null) && transport.isOpen()) {
return;
}
- transport = transportFactory.getTransport();
- TProtocol tProtocol = null;
- long maxMessageSize = conf.getLong(ServiceConstants.ClientConfig.SENTRY_HDFS_THRIFT_MAX_MESSAGE_SIZE,
- ServiceConstants.ClientConfig.SENTRY_HDFS_THRIFT_MAX_MESSAGE_SIZE_DEFAULT);
- if (conf.getBoolean(ServiceConstants.ClientConfig.USE_COMPACT_TRANSPORT,
- ServiceConstants.ClientConfig.USE_COMPACT_TRANSPORT_DEFAULT)) {
- tProtocol = new TCompactProtocol(transport, maxMessageSize, maxMessageSize);
+ transport = transportPool.getTransport();
+ TProtocol tProtocol;
+ if (useCompactTransport) {
+ tProtocol = new TCompactProtocol(transport.getTTransport(), maxMessageSize, maxMessageSize);
} else {
- tProtocol = new TBinaryProtocol(transport, maxMessageSize, maxMessageSize, true, true);
+ tProtocol = new TBinaryProtocol(transport.getTTransport(), maxMessageSize, maxMessageSize, true, true);
}
TMultiplexedProtocol protocol = new TMultiplexedProtocol(
tProtocol, SentryHDFSServiceClient.SENTRY_HDFS_SERVICE_NAME);
- client = new SentryHDFSService.Client(protocol);
- LOGGER.info("Successfully created client");
- }
-
- @Override
- public synchronized void notifyHMSUpdate(PathsUpdate update)
- throws SentryHdfsServiceException {
- try {
- client.handle_hms_notification(update.toThrift());
- } catch (Exception e) {
- throw new SentryHdfsServiceException("Thrift Exception occurred !!", e);
- }
- }
-
- @Override
- public synchronized long getLastSeenHMSPathSeqNum()
- throws SentryHdfsServiceException {
- try {
- return client.check_hms_seq_num(-1);
- } catch (Exception e) {
- throw new SentryHdfsServiceException("Thrift Exception occurred !!", e);
- }
+ client = new Client(protocol);
}
@Override
- public synchronized SentryAuthzUpdate getAllUpdatesFrom(long permSeqNum, long pathSeqNum)
+ public SentryAuthzUpdate getAllUpdatesFrom(long permSeqNum, long pathSeqNum)
throws SentryHdfsServiceException {
SentryAuthzUpdate retVal = new SentryAuthzUpdate(new LinkedList<PermissionsUpdate>(), new LinkedList<PathsUpdate>());
try {
@@ -132,12 +104,23 @@ public class SentryHDFSServiceClientDefaultImpl implements SentryHDFSServiceClie
}
@Override
- public synchronized void close() {
- transportFactory.close();
+ public void close() {
+ done();
}
@Override
- public void disconnect() {
- transportFactory.releaseTransport();
+ public void done() {
+ if (transport != null) {
+ transportPool.returnTransport(transport);
+ transport = null;
+ }
+ }
+
+ @Override
+ public void invalidate() {
+ if (transport != null) {
+ transportPool.invalidateTransport(transport);
+ transport = null;
+ }
}
}
http://git-wip-us.apache.org/repos/asf/sentry/blob/73fde5c9/sentry-hdfs/sentry-hdfs-service/src/main/java/org/apache/sentry/hdfs/SentryHDFSServiceClientFactory.java
----------------------------------------------------------------------
diff --git a/sentry-hdfs/sentry-hdfs-service/src/main/java/org/apache/sentry/hdfs/SentryHDFSServiceClientFactory.java b/sentry-hdfs/sentry-hdfs-service/src/main/java/org/apache/sentry/hdfs/SentryHDFSServiceClientFactory.java
index e350103..b40162a 100644
--- a/sentry-hdfs/sentry-hdfs-service/src/main/java/org/apache/sentry/hdfs/SentryHDFSServiceClientFactory.java
+++ b/sentry-hdfs/sentry-hdfs-service/src/main/java/org/apache/sentry/hdfs/SentryHDFSServiceClientFactory.java
@@ -18,28 +18,95 @@
package org.apache.sentry.hdfs;
import java.lang.reflect.Proxy;
+import java.util.concurrent.atomic.AtomicReference;
import org.apache.hadoop.conf.Configuration;
import org.apache.sentry.core.common.transport.RetryClientInvocationHandler;
import org.apache.sentry.core.common.transport.SentryHDFSClientTransportConfig;
+import org.apache.sentry.core.common.transport.SentryTransportFactory;
+import org.apache.sentry.core.common.transport.SentryTransportPool;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.annotation.concurrent.ThreadSafe;
/**
- * Client factory to create normal client or proxy with HA invocation handler
+ * Client factory for creating HDFS service clients.
+ * This is a singleton which uses a single factory.
*/
-public class SentryHDFSServiceClientFactory {
- private final static SentryHDFSClientTransportConfig transportConfig =
+@ThreadSafe
+public final class SentryHDFSServiceClientFactory {
+ private static final Logger LOGGER = LoggerFactory.getLogger(SentryHDFSServiceClientFactory.class);
+
+ private static final AtomicReference<SentryHDFSServiceClientFactory> clientFactory =
+ new AtomicReference<>();
+
+ private final SentryHDFSClientTransportConfig transportConfig =
new SentryHDFSClientTransportConfig();
+ private final Configuration conf;
+ private final SentryTransportPool transportPool;
+
+ /**
+ * Return a client instance
+ * @param conf
+ * @return
+ * @throws Exception
+ */
+ public static SentryHDFSServiceClient create(Configuration conf) throws Exception {
+ SentryHDFSServiceClientFactory factory = clientFactory.get();
+ if (factory != null) {
+ return factory.create();
+ }
+ factory = new SentryHDFSServiceClientFactory(conf);
+ boolean ok = clientFactory.compareAndSet(null, factory);
+ if (ok) {
+ return factory.create();
+ }
+ factory.close();
+ return clientFactory.get().create();
+ }
- private SentryHDFSServiceClientFactory() {
- // Make constructor private to avoid instantiation
+ private SentryHDFSServiceClientFactory(Configuration conf) {
+ this.conf = conf;
+ transportPool = new SentryTransportPool(conf, transportConfig,
+ new SentryTransportFactory(conf, transportConfig));
}
- public static SentryHDFSServiceClient create(Configuration conf)
- throws Exception {
+ /**
+ * Create a new client connection to one of the Sentry servers.
+ * @return client instance
+ * @throws Exception if something goes wrong
+ */
+ @SuppressWarnings("squid:S00112")
+ private SentryHDFSServiceClient create() throws Exception {
return (SentryHDFSServiceClient) Proxy
.newProxyInstance(SentryHDFSServiceClientDefaultImpl.class.getClassLoader(),
SentryHDFSServiceClientDefaultImpl.class.getInterfaces(),
new RetryClientInvocationHandler(conf,
- new SentryHDFSServiceClientDefaultImpl(conf, transportConfig), transportConfig));
+ new SentryHDFSServiceClientDefaultImpl(conf, transportPool), transportConfig));
+ }
+
+ /**
+ * Reset existing factory and return the old one.
+ * Only used by tests.
+ */
+ public static void factoryReset() {
+ LOGGER.debug("factory reset");
+ SentryHDFSServiceClientFactory factory = clientFactory.getAndSet(null);
+ if (factory != null) {
+ try {
+ factory.transportPool.close();
+ } catch (Exception e) {
+ LOGGER.error("failed to close transport pool", e);
+ }
+ }
+ }
+
+ private void close() {
+ try {
+ transportPool.close();
+ } catch (Exception e) {
+ LOGGER.error("failed to close transport pool", e);
+ }
}
}
http://git-wip-us.apache.org/repos/asf/sentry/blob/73fde5c9/sentry-hdfs/sentry-hdfs-service/src/test/java/org/apache/sentry/hdfs/SentryHdfsServiceIntegrationBase.java
----------------------------------------------------------------------
diff --git a/sentry-hdfs/sentry-hdfs-service/src/test/java/org/apache/sentry/hdfs/SentryHdfsServiceIntegrationBase.java b/sentry-hdfs/sentry-hdfs-service/src/test/java/org/apache/sentry/hdfs/SentryHdfsServiceIntegrationBase.java
index eccf83b..3ee3724 100644
--- a/sentry-hdfs/sentry-hdfs-service/src/test/java/org/apache/sentry/hdfs/SentryHdfsServiceIntegrationBase.java
+++ b/sentry-hdfs/sentry-hdfs-service/src/test/java/org/apache/sentry/hdfs/SentryHdfsServiceIntegrationBase.java
@@ -44,7 +44,10 @@ public class SentryHdfsServiceIntegrationBase extends
@After
public void after() {
if (hdfsClient != null) {
- hdfsClient.close();
+ try {
+ hdfsClient.close();
+ } catch (Exception ignored) {
+ }
}
}
http://git-wip-us.apache.org/repos/asf/sentry/blob/73fde5c9/sentry-hdfs/sentry-hdfs-service/src/test/java/org/apache/sentry/hdfs/TestSentryHDFSServiceClientForUgi.java
----------------------------------------------------------------------
diff --git a/sentry-hdfs/sentry-hdfs-service/src/test/java/org/apache/sentry/hdfs/TestSentryHDFSServiceClientForUgi.java b/sentry-hdfs/sentry-hdfs-service/src/test/java/org/apache/sentry/hdfs/TestSentryHDFSServiceClientForUgi.java
deleted file mode 100644
index 09d417e..0000000
--- a/sentry-hdfs/sentry-hdfs-service/src/test/java/org/apache/sentry/hdfs/TestSentryHDFSServiceClientForUgi.java
+++ /dev/null
@@ -1,70 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- * <p>
- * http://www.apache.org/licenses/LICENSE-2.0
- * <p>
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.sentry.hdfs;
-
-import org.apache.hadoop.security.UserGroupInformation;
-import org.apache.sentry.provider.db.generic.service.thrift.SentryGenericServiceIntegrationBase;
-import org.apache.sentry.service.thrift.ServiceConstants.ServerConfig;
-import org.junit.BeforeClass;
-import org.junit.Test;
-
-import static org.apache.hadoop.fs.CommonConfigurationKeysPublic.HADOOP_SECURITY_AUTHENTICATION;
-
-public class TestSentryHDFSServiceClientForUgi extends SentryHdfsServiceIntegrationBase {
-
- @BeforeClass
- public static void setup() throws Exception {
- kerberos = true;
- beforeSetup();
- setupConf();
- startSentryService();
- afterSetup();
- }
-
- public static void setupConf() throws Exception {
- // If kerberos is enabled, SentryTransportFactory should make sure that
- // HADOOP_SECURITY_AUTHENTICATION is appropriately configured.
- SentryGenericServiceIntegrationBase.setupConf();
- conf.set(ServerConfig.SECURITY_MODE, ServerConfig.SECURITY_MODE_KERBEROS);
- conf.set(ServerConfig.SECURITY_USE_UGI_TRANSPORT, "true");
- conf.set(HADOOP_SECURITY_AUTHENTICATION, "simple");
- UserGroupInformation.setConfiguration(conf);
- }
-
- /**
- * Test UserGroupInformationInitializer
- * <p>
- * Ensures that SentryTransportFactory is making sure that HADOOP_SECURITY_AUTHENTICATION
- * is appropriately configured and UserGroupInformation is initialized accordingly
- * by validating the static information in UserGroupInformation Class
- *
- * @throws Exception
- */
-
- @Test
- public void testUserGroupInformationInitializer() throws Exception {
- kerberos = false;
- runTestAsSubject(new TestOperation() {
- @Override
- public void runTestAsSubject() throws Exception {
- assert UserGroupInformation.isSecurityEnabled();
- }
- });
- }
-}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/sentry/blob/73fde5c9/sentry-provider/sentry-provider-db/src/main/java/org/apache/sentry/provider/db/SimpleDBProviderBackend.java
----------------------------------------------------------------------
diff --git a/sentry-provider/sentry-provider-db/src/main/java/org/apache/sentry/provider/db/SimpleDBProviderBackend.java b/sentry-provider/sentry-provider-db/src/main/java/org/apache/sentry/provider/db/SimpleDBProviderBackend.java
index ff25d95..c6efd10 100644
--- a/sentry-provider/sentry-provider-db/src/main/java/org/apache/sentry/provider/db/SimpleDBProviderBackend.java
+++ b/sentry-provider/sentry-provider-db/src/main/java/org/apache/sentry/provider/db/SimpleDBProviderBackend.java
@@ -71,11 +71,10 @@ public class SimpleDBProviderBackend implements ProviderBackend {
int retries = Math.max(retryCount + 1, 1); // if customer configs retryCount as Integer.MAX_VALUE, try only once
while (retries > 0) {
retries--;
- SentryPolicyServiceClient policyServiceClient = null;
- try {
- policyServiceClient = SentryServiceClientFactory.create(conf);
- return ImmutableSet.copyOf(policyServiceClient.listPrivilegesForProvider(groups, roleSet, authorizableHierarchy));
- } catch (Exception e) {
+ try (SentryPolicyServiceClient policyServiceClient =
+ SentryServiceClientFactory.create(conf)) {
+ return ImmutableSet.copyOf(policyServiceClient.listPrivilegesForProvider(groups, roleSet, authorizableHierarchy));
+ } catch (Exception e) {
//TODO: differentiate transient errors and permanent errors
String msg = "Unable to obtain privileges from server: " + e.getMessage() + ".";
if (retries > 0) {
@@ -90,10 +89,6 @@ public class SimpleDBProviderBackend implements ProviderBackend {
LOGGER.info("Sleeping is interrupted.", e1);
}
}
- } finally {
- if(policyServiceClient != null) {
- policyServiceClient.close();
- }
}
}
http://git-wip-us.apache.org/repos/asf/sentry/blob/73fde5c9/sentry-provider/sentry-provider-db/src/main/java/org/apache/sentry/provider/db/generic/SentryGenericProviderBackend.java
----------------------------------------------------------------------
diff --git a/sentry-provider/sentry-provider-db/src/main/java/org/apache/sentry/provider/db/generic/SentryGenericProviderBackend.java b/sentry-provider/sentry-provider-db/src/main/java/org/apache/sentry/provider/db/generic/SentryGenericProviderBackend.java
index 39ed64c..e7c5e0d 100644
--- a/sentry-provider/sentry-provider-db/src/main/java/org/apache/sentry/provider/db/generic/SentryGenericProviderBackend.java
+++ b/sentry-provider/sentry-provider-db/src/main/java/org/apache/sentry/provider/db/generic/SentryGenericProviderBackend.java
@@ -82,6 +82,7 @@ public class SentryGenericProviderBackend extends CacheProvider implements Provi
} catch (NoSuchMethodException | ClassNotFoundException | InstantiationException | InvocationTargetException | IllegalAccessException e) {
throw new RuntimeException("Failed to create privilege converter of type " + privilegeConverter, e);
}
+ LOGGER.debug("Starting Updateable Cache");
UpdatableCache cache = new UpdatableCache(conf, getComponentType(), getServiceName(), sentryPrivilegeConverter);
try {
cache.startUpdateThread(true);
@@ -110,9 +111,7 @@ public class SentryGenericProviderBackend extends CacheProvider implements Provi
if (enableCaching) {
return super.getPrivileges(groups, roleSet, authorizableHierarchy);
} else {
- SentryGenericServiceClient client = null;
- try {
- client = getClient();
+ try (SentryGenericServiceClient client = getClient()){
return ImmutableSet.copyOf(client.listPrivilegesForProvider(componentType, serviceName,
roleSet, groups, Arrays.asList(authorizableHierarchy)));
} catch (SentryUserException e) {
@@ -121,10 +120,6 @@ public class SentryGenericProviderBackend extends CacheProvider implements Provi
} catch (Exception e) {
String msg = "Unable to obtain client:" + e.getMessage();
LOGGER.error(msg, e);
- } finally {
- if (client != null) {
- client.close();
- }
}
}
return ImmutableSet.of();
@@ -138,10 +133,8 @@ public class SentryGenericProviderBackend extends CacheProvider implements Provi
if (enableCaching) {
return super.getRoles(groups, roleSet);
} else {
- SentryGenericServiceClient client = null;
- try {
+ try (SentryGenericServiceClient client = getClient()){
Set<TSentryRole> tRoles = Sets.newHashSet();
- client = getClient();
//get the roles according to group
String requestor = UserGroupInformation.getCurrentUser().getShortUserName();
for (String group : groups) {
@@ -158,10 +151,6 @@ public class SentryGenericProviderBackend extends CacheProvider implements Provi
} catch (Exception e) {
String msg = "Unable to obtain client:" + e.getMessage();
LOGGER.error(msg, e);
- } finally {
- if (client != null) {
- client.close();
- }
}
return ImmutableSet.of();
}
http://git-wip-us.apache.org/repos/asf/sentry/blob/73fde5c9/sentry-provider/sentry-provider-db/src/main/java/org/apache/sentry/provider/db/generic/UpdatableCache.java
----------------------------------------------------------------------
diff --git a/sentry-provider/sentry-provider-db/src/main/java/org/apache/sentry/provider/db/generic/UpdatableCache.java b/sentry-provider/sentry-provider-db/src/main/java/org/apache/sentry/provider/db/generic/UpdatableCache.java
index a126f35..d20710f 100644
--- a/sentry-provider/sentry-provider-db/src/main/java/org/apache/sentry/provider/db/generic/UpdatableCache.java
+++ b/sentry-provider/sentry-provider-db/src/main/java/org/apache/sentry/provider/db/generic/UpdatableCache.java
@@ -29,9 +29,15 @@ import java.util.Timer;
import java.util.TimerTask;
import java.util.concurrent.TimeUnit;
-class UpdatableCache implements TableCache {
+public final class UpdatableCache implements TableCache, AutoCloseable {
private static final Logger LOGGER = LoggerFactory.getLogger(UpdatableCache.class);
+ // Timer for getting updates periodically
+ private final Timer timer = new Timer();
+ private boolean initialized = false;
+ // saved timer is used by tests to cancel previous timer
+ private static Timer savedTimer;
+
private final String componentType;
private final String serviceName;
private final long cacheTtlNs;
@@ -94,14 +100,13 @@ class UpdatableCache implements TableCache {
String requestor;
requestor = UserGroupInformation.getLoginUser().getShortUserName();
- SentryGenericServiceClient client = null;
- try {
- client = getClient(); // will be closed in finaly clause
- final Set<TSentryRole> tSentryRoles = client.listAllRoles(requestor, componentType);
+ try(SentryGenericServiceClient client = getClient()) {
+ Set<TSentryRole> tSentryRoles = client.listAllRoles(requestor, componentType);
for (TSentryRole tSentryRole : tSentryRoles) {
final String roleName = tSentryRole.getRoleName();
- final Set<TSentryPrivilege> tSentryPrivileges = client.listPrivilegesByRoleName(requestor, roleName, componentType, serviceName);
+ final Set<TSentryPrivilege> tSentryPrivileges =
+ client.listPrivilegesByRoleName(requestor, roleName, componentType, serviceName);
for (String group : tSentryRole.getGroups()) {
Set<String> currentPrivileges = tempCache.get(group, roleName);
if (currentPrivileges == null) {
@@ -113,12 +118,8 @@ class UpdatableCache implements TableCache {
}
}
}
- } finally {
- if (client != null) {
- client.close();
- }
+ return tempCache;
}
- return tempCache;
}
/**
@@ -136,7 +137,19 @@ class UpdatableCache implements TableCache {
reloadData();
}
- Timer timer = new Timer();
+ if (initialized) {
+ LOGGER.info("Already initialized");
+ return;
+ }
+
+ initialized = true;
+ // Save timer to be able to cancel it.
+ if (savedTimer != null) {
+ LOGGER.debug("Resetting saved timer");
+ savedTimer.cancel();
+ }
+ savedTimer = timer;
+
final long refreshIntervalMs = TimeUnit.NANOSECONDS.toMillis(cacheTtlNs);
timer.scheduleAtFixedRate(
new TimerTask() {
@@ -158,6 +171,7 @@ class UpdatableCache implements TableCache {
private void revokeAllPrivilegesIfRequired() {
if (++consecutiveUpdateFailuresCount > allowedUpdateFailuresCount) {
+ consecutiveUpdateFailuresCount = 0;
// Clear cache to revoke all privileges.
// Update table cache to point to an empty table to avoid thread-unsafe characteristics of HashBasedTable.
this.table = HashBasedTable.create();
@@ -175,4 +189,21 @@ class UpdatableCache implements TableCache {
final long currentTimeNs = System.nanoTime();
return lastRefreshedNs + cacheTtlNs < currentTimeNs;
}
+
+ /**
+ * Only called by tests to disable timer.
+ */
+ public static void disable() {
+ if (savedTimer != null) {
+ LOGGER.info("Disabling timer");
+ savedTimer.cancel();
+ }
+ }
+
+ @Override
+ public void close() {
+ timer.cancel();
+ savedTimer = null;
+ LOGGER.info("Closed Updatable Cache");
+ }
}
http://git-wip-us.apache.org/repos/asf/sentry/blob/73fde5c9/sentry-provider/sentry-provider-db/src/main/java/org/apache/sentry/provider/db/generic/service/thrift/SentryGenericServiceClient.java
----------------------------------------------------------------------
diff --git a/sentry-provider/sentry-provider-db/src/main/java/org/apache/sentry/provider/db/generic/service/thrift/SentryGenericServiceClient.java b/sentry-provider/sentry-provider-db/src/main/java/org/apache/sentry/provider/db/generic/service/thrift/SentryGenericServiceClient.java
index c0cf475..d0c0075 100644
--- a/sentry-provider/sentry-provider-db/src/main/java/org/apache/sentry/provider/db/generic/service/thrift/SentryGenericServiceClient.java
+++ b/sentry-provider/sentry-provider-db/src/main/java/org/apache/sentry/provider/db/generic/service/thrift/SentryGenericServiceClient.java
@@ -25,7 +25,7 @@ import org.apache.sentry.core.common.exception.SentryUserException;
import org.apache.sentry.core.common.ActiveRoleSet;
import org.apache.sentry.core.common.Authorizable;
-public interface SentryGenericServiceClient {
+public interface SentryGenericServiceClient extends AutoCloseable {
/**
* Create a sentry role
@@ -173,8 +173,6 @@ public interface SentryGenericServiceClient {
String serviceName, ActiveRoleSet roleSet, Set<String> groups,
List<? extends Authorizable> authorizables) throws SentryUserException;
- public void close();
-
/**
* Get sentry privileges based on valid active roles and the authorize objects. Note that
* it is client responsibility to ensure the requestor username, etc. is not impersonated.
http://git-wip-us.apache.org/repos/asf/sentry/blob/73fde5c9/sentry-provider/sentry-provider-db/src/main/java/org/apache/sentry/provider/db/generic/service/thrift/SentryGenericServiceClientDefaultImpl.java
----------------------------------------------------------------------
diff --git a/sentry-provider/sentry-provider-db/src/main/java/org/apache/sentry/provider/db/generic/service/thrift/SentryGenericServiceClientDefaultImpl.java b/sentry-provider/sentry-provider-db/src/main/java/org/apache/sentry/provider/db/generic/service/thrift/SentryGenericServiceClientDefaultImpl.java
index f430064..cac87cb 100644
--- a/sentry-provider/sentry-provider-db/src/main/java/org/apache/sentry/provider/db/generic/service/thrift/SentryGenericServiceClientDefaultImpl.java
+++ b/sentry-provider/sentry-provider-db/src/main/java/org/apache/sentry/provider/db/generic/service/thrift/SentryGenericServiceClientDefaultImpl.java
@@ -17,51 +17,63 @@
*/
package org.apache.sentry.provider.db.generic.service.thrift;
-import java.io.IOException;
-import java.util.*;
-
+import com.google.common.collect.Lists;
import org.apache.hadoop.conf.Configuration;
-import org.apache.sentry.core.common.exception.SentryUserException;
import org.apache.sentry.core.common.ActiveRoleSet;
import org.apache.sentry.core.common.Authorizable;
-import org.apache.sentry.core.common.transport.SentryPolicyClientTransportConfig;
-import org.apache.sentry.core.common.transport.SentryServiceClient;
-import org.apache.sentry.core.common.transport.SentryTransportFactory;
+import org.apache.sentry.core.common.exception.SentryUserException;
+import org.apache.sentry.core.common.transport.SentryConnection;
+import org.apache.sentry.core.common.transport.SentryTransportPool;
+import org.apache.sentry.core.common.transport.TTransportWrapper;
import org.apache.sentry.core.model.db.AccessConstants;
-import org.apache.sentry.service.thrift.ServiceConstants;
+import org.apache.sentry.provider.db.generic.service.thrift.SentryGenericPolicyService.Client;
+import org.apache.sentry.service.thrift.ServiceConstants.ClientConfig;
import org.apache.sentry.service.thrift.Status;
import org.apache.sentry.service.thrift.sentry_common_serviceConstants;
import org.apache.thrift.TException;
import org.apache.thrift.protocol.TBinaryProtocol;
import org.apache.thrift.protocol.TMultiplexedProtocol;
-import org.apache.thrift.transport.TTransport;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
+import java.io.IOException;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
-import com.google.common.collect.Lists;
/**
- * Sentry Generic Service Client
+ * Sentry Generic Service Client.
* <p>
- * The public implementation of SentryGenericServiceClient.
- * TODO(kalyan) A Sentry Client in which all the operations are synchronized for thread safety
- * Note: When using this client, if there is an exception in RPC, socket can get into an inconsistent state.
- * So it is important to close and re-open the transportFactory so that new socket is used.
+ * Thread safety. This class is not thread safe - it is up to the
+ * caller to ensure thread safety.
*/
+public class SentryGenericServiceClientDefaultImpl
+ implements SentryGenericServiceClient, SentryConnection {
-public class SentryGenericServiceClientDefaultImpl implements SentryGenericServiceClient, SentryServiceClient {
- private SentryGenericPolicyService.Client client;
- private SentryTransportFactory transportFactory;
- private TTransport transport;
- private Configuration conf;
- private static final Logger LOGGER = LoggerFactory
- .getLogger(SentryGenericServiceClientDefaultImpl.class);
+ private Client client;
+ private final SentryTransportPool transportPool;
+ private TTransportWrapper transport;
private static final String THRIFT_EXCEPTION_MESSAGE = "Thrift exception occured ";
+ private final long maxMessageSize;
- public SentryGenericServiceClientDefaultImpl(Configuration conf, SentryPolicyClientTransportConfig transportConfig) throws IOException {
- transportFactory = new SentryTransportFactory(conf, transportConfig);
- this.conf = conf;
+ /**
+ * Initialize client with the given configuration, using specified transport pool
+ * implementation for obtaining transports.
+ * @param conf Sentry Configuration
+ * @param transportPool source of connected transports
+ */
+ SentryGenericServiceClientDefaultImpl(Configuration conf,
+ SentryTransportPool transportPool) {
+
+ //TODO(kalyan) need to find appropriate place to add it
+ // if (kerberos) {
+ // // since the client uses hadoop-auth, we need to set kerberos in
+ // // hadoop-auth if we plan to use kerberos
+ // conf.set(HADOOP_SECURITY_AUTHENTICATION, SentryConstants.KERBEROS_MoODE);
+ // }
+ maxMessageSize = conf.getLong(ClientConfig.SENTRY_POLICY_CLIENT_THRIFT_MAX_MESSAGE_SIZE,
+ ClientConfig.SENTRY_POLICY_CLIENT_THRIFT_MAX_MESSAGE_SIZE_DEFAULT);
+ this.transportPool = transportPool;
}
/**
@@ -70,20 +82,18 @@ public class SentryGenericServiceClientDefaultImpl implements SentryGenericServi
* @throws IOException
*/
@Override
- public synchronized void connect() throws IOException {
- if (transport != null && transport.isOpen()) {
+ public void connect() throws Exception {
+ if ((transport != null) && transport.isOpen()) {
return;
}
- transport = transportFactory.getTransport();
- TMultiplexedProtocol protocol = null;
- long maxMessageSize = conf.getLong(ServiceConstants.ClientConfig.SENTRY_POLICY_CLIENT_THRIFT_MAX_MESSAGE_SIZE,
- ServiceConstants.ClientConfig.SENTRY_POLICY_CLIENT_THRIFT_MAX_MESSAGE_SIZE_DEFAULT);
- protocol = new TMultiplexedProtocol(
- new TBinaryProtocol(transport, maxMessageSize, maxMessageSize, true, true),
+ // Obtain connection to Sentry server
+ transport = transportPool.getTransport();
+ TMultiplexedProtocol protocol = new TMultiplexedProtocol(
+ new TBinaryProtocol(transport.getTTransport(), maxMessageSize,
+ maxMessageSize, true, true),
SentryGenericPolicyProcessor.SENTRY_GENERIC_SERVICE_NAME);
- client = new SentryGenericPolicyService.Client(protocol);
- LOGGER.debug("Successfully created client");
+ client = new Client(protocol);
}
/**
@@ -95,7 +105,7 @@ public class SentryGenericServiceClientDefaultImpl implements SentryGenericServi
* @throws SentryUserException
*/
@Override
- public synchronized void createRole(String requestorUserName, String roleName, String component)
+ public void createRole(String requestorUserName, String roleName, String component)
throws SentryUserException {
TCreateSentryRoleRequest request = new TCreateSentryRoleRequest();
request.setProtocol_version(sentry_common_serviceConstants.TSENTRY_SERVICE_V2);
@@ -358,7 +368,7 @@ public class SentryGenericServiceClientDefaultImpl implements SentryGenericServi
* @throws SentryUserException
*/
@Override
- public synchronized Set<TSentryRole> listRolesByGroupName(
+ public Set<TSentryRole> listRolesByGroupName(
String requestorUserName,
String groupName,
String component)
@@ -412,7 +422,7 @@ public class SentryGenericServiceClientDefaultImpl implements SentryGenericServi
request.setServiceName(serviceName);
request.setRequestorUserName(requestorUserName);
request.setRoleName(roleName);
- if ((authorizables != null) && (authorizables.size() > 0)) {
+ if (authorizables != null && !authorizables.isEmpty()) {
List<TAuthorizable> tAuthorizables = Lists.newArrayList();
for (Authorizable authorizable : authorizables) {
tAuthorizables.add(new TAuthorizable(authorizable.getTypeName(), authorizable.getName()));
@@ -464,7 +474,7 @@ public class SentryGenericServiceClientDefaultImpl implements SentryGenericServi
request.setGroups(groups);
}
List<TAuthorizable> tAuthoriables = Lists.newArrayList();
- if ((authorizables != null) && (authorizables.size() > 0)) {
+ if (authorizables != null && !authorizables.isEmpty()) {
for (Authorizable authorizable : authorizables) {
tAuthoriables.add(new TAuthorizable(authorizable.getTypeName(), authorizable.getName()));
}
@@ -527,12 +537,23 @@ public class SentryGenericServiceClientDefaultImpl implements SentryGenericServi
}
@Override
- public synchronized void close() {
- transportFactory.close();
+ public void close() {
+ done();
}
@Override
- public void disconnect() {
- transportFactory.releaseTransport();
+ public void done() {
+ if (transport != null) {
+ transportPool.returnTransport(transport);
+ transport = null;
+ }
+ }
+
+ @Override
+ public void invalidate() {
+ if (transport != null) {
+ transportPool.invalidateTransport(transport);
+ transport = null;
+ }
}
}
http://git-wip-us.apache.org/repos/asf/sentry/blob/73fde5c9/sentry-provider/sentry-provider-db/src/main/java/org/apache/sentry/provider/db/generic/service/thrift/SentryGenericServiceClientFactory.java
----------------------------------------------------------------------
diff --git a/sentry-provider/sentry-provider-db/src/main/java/org/apache/sentry/provider/db/generic/service/thrift/SentryGenericServiceClientFactory.java b/sentry-provider/sentry-provider-db/src/main/java/org/apache/sentry/provider/db/generic/service/thrift/SentryGenericServiceClientFactory.java
index 2fc8b0f..b663e3d 100644
--- a/sentry-provider/sentry-provider-db/src/main/java/org/apache/sentry/provider/db/generic/service/thrift/SentryGenericServiceClientFactory.java
+++ b/sentry-provider/sentry-provider-db/src/main/java/org/apache/sentry/provider/db/generic/service/thrift/SentryGenericServiceClientFactory.java
@@ -18,27 +18,106 @@
package org.apache.sentry.provider.db.generic.service.thrift;
import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.security.UserGroupInformation;
import org.apache.sentry.core.common.transport.RetryClientInvocationHandler;
import org.apache.sentry.core.common.transport.SentryPolicyClientTransportConfig;
+import org.apache.sentry.core.common.transport.SentryTransportFactory;
+import org.apache.sentry.core.common.transport.SentryTransportPool;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import javax.annotation.concurrent.ThreadSafe;
import java.lang.reflect.Proxy;
+import java.util.concurrent.atomic.AtomicReference;
/**
- * SentryGenericServiceClientFactory is a public class for the components which using Generic Model to create sentry client.
+ * Produces client connection for Sentry clients using Generic model.
+ * Factory is [alost] a singleton. Tests can call {@link #factoryReset()} to destroy the
+ * existing factory and create a new one. This may be needed because tests modify
+ * configuration and start and stop servers.
*/
-
+@ThreadSafe
public final class SentryGenericServiceClientFactory {
- private static final SentryPolicyClientTransportConfig transportConfig =
+ private static final Logger LOGGER = LoggerFactory.getLogger(SentryGenericServiceClientFactory.class);
+
+ // Used to implement a singleton
+ private static final AtomicReference<SentryGenericServiceClientFactory> clientFactory =
+ new AtomicReference<>();
+
+ private final SentryPolicyClientTransportConfig transportConfig =
new SentryPolicyClientTransportConfig();
- private SentryGenericServiceClientFactory() {
- }
+ private final SentryTransportPool transportPool;
+ private final Configuration conf;
+ /**
+ * Obtain an Generic policy client instance.
+ * @param conf Configuration that should be used. Configuration is only used for the
+ * initial creation and ignored afterwords.
+ */
public static SentryGenericServiceClient create(Configuration conf) throws Exception {
+ SentryGenericServiceClientFactory factory = clientFactory.get();
+ if (factory != null) {
+ return factory.create();
+ }
+ factory = new SentryGenericServiceClientFactory(conf);
+ boolean ok = clientFactory.compareAndSet(null, factory);
+ if (ok) {
+ return factory.create();
+ }
+ factory.close();
+ return clientFactory.get().create();
+ }
+
+ /**
+ * Create a new factory instance and atach it to a connection pool instance.
+ * @param conf Configuration
+ */
+ private SentryGenericServiceClientFactory(Configuration conf) {
+ if (transportConfig.isKerberosEnabled(conf) &&
+ transportConfig.useUserGroupInformation(conf)) {
+ LOGGER.info("Using UserGroupInformation authentication");
+ UserGroupInformation.setConfiguration(conf);
+ }
+
+ this.conf = conf;
+
+ transportPool = new SentryTransportPool(this.conf, transportConfig,
+ new SentryTransportFactory(this.conf, transportConfig));
+ }
+
+ /**
+ * Create a new client connection to the server for Generic model clients
+ * @return client instance
+ * @throws Exception if something goes wrong
+ */
+ @SuppressWarnings("squid:S00112")
+ private SentryGenericServiceClient create() throws Exception {
return (SentryGenericServiceClient) Proxy
.newProxyInstance(SentryGenericServiceClientDefaultImpl.class.getClassLoader(),
SentryGenericServiceClientDefaultImpl.class.getInterfaces(),
new RetryClientInvocationHandler(conf,
- new SentryGenericServiceClientDefaultImpl(conf, transportConfig), transportConfig));
+ new SentryGenericServiceClientDefaultImpl(conf, transportPool), transportConfig));
}
+ // Should only be used by tests.
+ // Resets the factory and destroys any pooled connections
+ public static void factoryReset() {
+ LOGGER.debug("factory reset");
+ SentryGenericServiceClientFactory factory = clientFactory.getAndSet(null);
+ if (factory != null) {
+ try {
+ factory.transportPool.close();
+ } catch (Exception e) {
+ LOGGER.error("failed to close transport pool", e);
+ }
+ }
+ }
+
+ private void close() {
+ try {
+ transportPool.close();
+ } catch (Exception e) {
+ LOGGER.error("failed to close transport pool", e);
+ }
+ }
}
http://git-wip-us.apache.org/repos/asf/sentry/blob/73fde5c9/sentry-provider/sentry-provider-db/src/main/java/org/apache/sentry/provider/db/generic/tools/SentryConfigToolSolr.java
----------------------------------------------------------------------
diff --git a/sentry-provider/sentry-provider-db/src/main/java/org/apache/sentry/provider/db/generic/tools/SentryConfigToolSolr.java b/sentry-provider/sentry-provider-db/src/main/java/org/apache/sentry/provider/db/generic/tools/SentryConfigToolSolr.java
index 1753e91..873d51c 100644
--- a/sentry-provider/sentry-provider-db/src/main/java/org/apache/sentry/provider/db/generic/tools/SentryConfigToolSolr.java
+++ b/sentry-provider/sentry-provider-db/src/main/java/org/apache/sentry/provider/db/generic/tools/SentryConfigToolSolr.java
@@ -62,12 +62,14 @@ public class SentryConfigToolSolr extends SentryConfigToolCommon {
String service = conf.get(SOLR_SERVICE_NAME, "service1");
// instantiate a solr client for sentry service. This sets the ugi, so must
// be done before getting the ugi below.
- SentryGenericServiceClient client = SentryGenericServiceClientFactory.create(conf);
- UserGroupInformation ugi = UserGroupInformation.getLoginUser();
- String requestorName = ugi.getShortUserName();
+ try(SentryGenericServiceClient client =
+ SentryGenericServiceClientFactory.create(conf)) {
+ UserGroupInformation ugi = UserGroupInformation.getLoginUser();
+ String requestorName = ugi.getShortUserName();
- convertINIToSentryServiceCmds(component, service, requestorName, conf, client,
- getPolicyFile(), getValidate(), getImportPolicy(), getCheckCompat());
+ convertINIToSentryServiceCmds(component, service, requestorName, conf, client,
+ getPolicyFile(), getValidate(), getImportPolicy(), getCheckCompat());
+ }
}
private Configuration getSentryConf() {
http://git-wip-us.apache.org/repos/asf/sentry/blob/73fde5c9/sentry-provider/sentry-provider-db/src/main/java/org/apache/sentry/provider/db/generic/tools/SentryShellSolr.java
----------------------------------------------------------------------
diff --git a/sentry-provider/sentry-provider-db/src/main/java/org/apache/sentry/provider/db/generic/tools/SentryShellSolr.java b/sentry-provider/sentry-provider-db/src/main/java/org/apache/sentry/provider/db/generic/tools/SentryShellSolr.java
index 1027550..3fb9f18 100644
--- a/sentry-provider/sentry-provider-db/src/main/java/org/apache/sentry/provider/db/generic/tools/SentryShellSolr.java
+++ b/sentry-provider/sentry-provider-db/src/main/java/org/apache/sentry/provider/db/generic/tools/SentryShellSolr.java
@@ -47,39 +47,41 @@ public class SentryShellSolr extends SentryShellCommon {
Configuration conf = getSentryConf();
String service = conf.get(SOLR_SERVICE_NAME, "service1");
- SentryGenericServiceClient client = SentryGenericServiceClientFactory.create(conf);
- UserGroupInformation ugi = UserGroupInformation.getLoginUser();
- String requestorName = ugi.getShortUserName();
+ try(SentryGenericServiceClient client =
+ SentryGenericServiceClientFactory.create(conf)) {
+ UserGroupInformation ugi = UserGroupInformation.getLoginUser();
+ String requestorName = ugi.getShortUserName();
- if (isCreateRole) {
- command = new CreateRoleCmd(roleName, component);
- } else if (isDropRole) {
- command = new DropRoleCmd(roleName, component);
- } else if (isAddRoleGroup) {
- command = new AddRoleToGroupCmd(roleName, groupName, component);
- } else if (isDeleteRoleGroup) {
- command = new DeleteRoleFromGroupCmd(roleName, groupName, component);
- } else if (isGrantPrivilegeRole) {
- command = new GrantPrivilegeToRoleCmd(roleName, component,
- privilegeStr, new SolrTSentryPrivilegeConverter(component, service));
- } else if (isRevokePrivilegeRole) {
- command = new RevokePrivilegeFromRoleCmd(roleName, component,
- privilegeStr, new SolrTSentryPrivilegeConverter(component, service));
- } else if (isListRole) {
- command = new ListRolesCmd(groupName, component);
- } else if (isListPrivilege) {
- command = new ListPrivilegesByRoleCmd(roleName, component,
- service, new SolrTSentryPrivilegeConverter(component, service));
- }
+ if (isCreateRole) {
+ command = new CreateRoleCmd(roleName, component);
+ } else if (isDropRole) {
+ command = new DropRoleCmd(roleName, component);
+ } else if (isAddRoleGroup) {
+ command = new AddRoleToGroupCmd(roleName, groupName, component);
+ } else if (isDeleteRoleGroup) {
+ command = new DeleteRoleFromGroupCmd(roleName, groupName, component);
+ } else if (isGrantPrivilegeRole) {
+ command = new GrantPrivilegeToRoleCmd(roleName, component,
+ privilegeStr, new SolrTSentryPrivilegeConverter(component, service));
+ } else if (isRevokePrivilegeRole) {
+ command = new RevokePrivilegeFromRoleCmd(roleName, component,
+ privilegeStr, new SolrTSentryPrivilegeConverter(component, service));
+ } else if (isListRole) {
+ command = new ListRolesCmd(groupName, component);
+ } else if (isListPrivilege) {
+ command = new ListPrivilegesByRoleCmd(roleName, component,
+ service, new SolrTSentryPrivilegeConverter(component, service));
+ }
- // check the requestor name
- if (StringUtils.isEmpty(requestorName)) {
- // The exception message will be recorded in log file.
- throw new Exception("The requestor name is empty.");
- }
+ // check the requestor name
+ if (StringUtils.isEmpty(requestorName)) {
+ // The exception message will be recorded in log file.
+ throw new Exception("The requestor name is empty.");
+ }
- if (command != null) {
- command.execute(client, requestorName);
+ if (command != null) {
+ command.execute(client, requestorName);
+ }
}
}
http://git-wip-us.apache.org/repos/asf/sentry/blob/73fde5c9/sentry-provider/sentry-provider-db/src/main/java/org/apache/sentry/provider/db/service/thrift/SentryPolicyServiceClient.java
----------------------------------------------------------------------
diff --git a/sentry-provider/sentry-provider-db/src/main/java/org/apache/sentry/provider/db/service/thrift/SentryPolicyServiceClient.java b/sentry-provider/sentry-provider-db/src/main/java/org/apache/sentry/provider/db/service/thrift/SentryPolicyServiceClient.java
index 413d090..5957cc9 100644
--- a/sentry-provider/sentry-provider-db/src/main/java/org/apache/sentry/provider/db/service/thrift/SentryPolicyServiceClient.java
+++ b/sentry-provider/sentry-provider-db/src/main/java/org/apache/sentry/provider/db/service/thrift/SentryPolicyServiceClient.java
@@ -26,7 +26,7 @@ import org.apache.sentry.core.common.exception.SentryUserException;
import org.apache.sentry.core.common.ActiveRoleSet;
import org.apache.sentry.core.common.Authorizable;
-public interface SentryPolicyServiceClient {
+public interface SentryPolicyServiceClient extends AutoCloseable {
public void createRole(String requestorUserName, String roleName) throws SentryUserException;
@@ -180,8 +180,6 @@ public interface SentryPolicyServiceClient {
*/
public String getConfigValue(String propertyName, String defaultValue) throws SentryUserException;
- public void close();
-
/**
* Requests the sentry server to synchronize all HMS notification events up to the specified id.
* The sentry server will return once it have processed the id specified..