You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@sentry.apache.org by ak...@apache.org on 2017/04/06 21:36:00 UTC
[1/5] sentry git commit: SENTRY-1593
Repository: sentry
Updated Branches:
refs/heads/SENTRY-1593-akolb [created] c9c0119fc
http://git-wip-us.apache.org/repos/asf/sentry/blob/e3d859a9/sentry-provider/sentry-provider-db/src/test/java/org/apache/sentry/provider/db/log/entity/TestJsonLogEntityFactory.java
----------------------------------------------------------------------
diff --git a/sentry-provider/sentry-provider-db/src/test/java/org/apache/sentry/provider/db/log/entity/TestJsonLogEntityFactory.java b/sentry-provider/sentry-provider-db/src/test/java/org/apache/sentry/provider/db/log/entity/TestJsonLogEntityFactory.java
index 1ec8840..b1c2365 100644
--- a/sentry-provider/sentry-provider-db/src/test/java/org/apache/sentry/provider/db/log/entity/TestJsonLogEntityFactory.java
+++ b/sentry-provider/sentry-provider-db/src/test/java/org/apache/sentry/provider/db/log/entity/TestJsonLogEntityFactory.java
@@ -40,7 +40,7 @@ import org.apache.sentry.provider.db.service.thrift.TDropSentryRoleRequest;
import org.apache.sentry.provider.db.service.thrift.TDropSentryRoleResponse;
import org.apache.sentry.provider.db.service.thrift.TSentryGroup;
import org.apache.sentry.provider.db.service.thrift.TSentryPrivilege;
-import org.apache.sentry.provider.db.service.thrift.ThriftUtil;
+import org.apache.sentry.core.common.utils.ThriftUtil;
import org.apache.sentry.service.thrift.ServiceConstants.PrivilegeScope;
import org.apache.sentry.service.thrift.ServiceConstants.ServerConfig;
import org.apache.sentry.service.thrift.Status;
http://git-wip-us.apache.org/repos/asf/sentry/blob/e3d859a9/sentry-provider/sentry-provider-db/src/test/java/org/apache/sentry/provider/db/log/entity/TestJsonLogEntityFactoryGM.java
----------------------------------------------------------------------
diff --git a/sentry-provider/sentry-provider-db/src/test/java/org/apache/sentry/provider/db/log/entity/TestJsonLogEntityFactoryGM.java b/sentry-provider/sentry-provider-db/src/test/java/org/apache/sentry/provider/db/log/entity/TestJsonLogEntityFactoryGM.java
index dfae5ab..4f35a44 100644
--- a/sentry-provider/sentry-provider-db/src/test/java/org/apache/sentry/provider/db/log/entity/TestJsonLogEntityFactoryGM.java
+++ b/sentry-provider/sentry-provider-db/src/test/java/org/apache/sentry/provider/db/log/entity/TestJsonLogEntityFactoryGM.java
@@ -43,7 +43,7 @@ import org.apache.sentry.provider.db.generic.service.thrift.TDropSentryRoleReque
import org.apache.sentry.provider.db.generic.service.thrift.TDropSentryRoleResponse;
import org.apache.sentry.provider.db.generic.service.thrift.TSentryPrivilege;
import org.apache.sentry.provider.db.log.util.Constants;
-import org.apache.sentry.provider.db.service.thrift.ThriftUtil;
+import org.apache.sentry.core.common.utils.ThriftUtil;
import org.apache.sentry.service.thrift.ServiceConstants.ServerConfig;
import org.apache.sentry.service.thrift.Status;
import org.junit.BeforeClass;
http://git-wip-us.apache.org/repos/asf/sentry/blob/e3d859a9/sentry-provider/sentry-provider-db/src/test/java/org/apache/sentry/provider/db/service/thrift/TestSentryPolicyServiceClient.java
----------------------------------------------------------------------
diff --git a/sentry-provider/sentry-provider-db/src/test/java/org/apache/sentry/provider/db/service/thrift/TestSentryPolicyServiceClient.java b/sentry-provider/sentry-provider-db/src/test/java/org/apache/sentry/provider/db/service/thrift/TestSentryPolicyServiceClient.java
new file mode 100644
index 0000000..3b3b30e
--- /dev/null
+++ b/sentry-provider/sentry-provider-db/src/test/java/org/apache/sentry/provider/db/service/thrift/TestSentryPolicyServiceClient.java
@@ -0,0 +1,64 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.sentry.provider.db.service.thrift;
+
+import java.util.Set;
+
+import org.apache.sentry.service.thrift.SentryServiceFactory;
+import org.apache.sentry.service.thrift.SentryServiceIntegrationBase;
+import org.junit.BeforeClass;
+import org.junit.Test;
+
+import com.google.common.collect.Sets;
+
+public class TestSentryPolicyServiceClient extends SentryServiceIntegrationBase {
+
+ @BeforeClass
+ public static void setup() throws Exception {
+ beforeSetup();
+ setupConf();
+ startSentryService();
+ afterSetup();
+ kerberos = false;
+ }
+
+ @Test
+ public void testConnectionWhenReconnect() throws Exception {
+ runTestAsSubject(new TestOperation() {
+ @Override
+ public void runTestAsSubject() throws Exception {
+ String requestorUserName = ADMIN_USER;
+ Set<String> requestorUserGroupNames = Sets.newHashSet(ADMIN_GROUP);
+ String roleName = "admin_r";
+ setLocalGroupMapping(requestorUserName, requestorUserGroupNames);
+ writePolicyFile();
+
+ client.dropRoleIfExists(requestorUserName, roleName);
+ client.createRole(requestorUserName, roleName);
+ client.listRoles(requestorUserName);
+ stopSentryService();
+ server = new SentryServiceFactory().create(conf);
+ startSentryService();
+ client.listRoles(requestorUserName);
+ client.dropRole(requestorUserName, roleName);
+ }
+ });
+ }
+
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/sentry/blob/e3d859a9/sentry-provider/sentry-provider-db/src/test/java/org/apache/sentry/provider/db/service/thrift/TestSentryPolicyStoreProcessor.java
----------------------------------------------------------------------
diff --git a/sentry-provider/sentry-provider-db/src/test/java/org/apache/sentry/provider/db/service/thrift/TestSentryPolicyStoreProcessor.java b/sentry-provider/sentry-provider-db/src/test/java/org/apache/sentry/provider/db/service/thrift/TestSentryPolicyStoreProcessor.java
index 04d92dd..58e2618 100644
--- a/sentry-provider/sentry-provider-db/src/test/java/org/apache/sentry/provider/db/service/thrift/TestSentryPolicyStoreProcessor.java
+++ b/sentry-provider/sentry-provider-db/src/test/java/org/apache/sentry/provider/db/service/thrift/TestSentryPolicyStoreProcessor.java
@@ -22,7 +22,7 @@ import org.junit.Assert;
import org.apache.hadoop.conf.Configuration;
import org.apache.sentry.core.common.exception.SentryThriftAPIMismatchException;
-import org.apache.sentry.provider.db.service.thrift.PolicyStoreConstants.PolicyStoreServerConfig;
+import org.apache.sentry.core.common.utils.PolicyStoreConstants.PolicyStoreServerConfig;
import org.apache.sentry.service.thrift.ServiceConstants;
import org.junit.Before;
import org.junit.Test;
http://git-wip-us.apache.org/repos/asf/sentry/blob/e3d859a9/sentry-provider/sentry-provider-db/src/test/java/org/apache/sentry/provider/db/service/thrift/TestSentryServiceFailureCase.java
----------------------------------------------------------------------
diff --git a/sentry-provider/sentry-provider-db/src/test/java/org/apache/sentry/provider/db/service/thrift/TestSentryServiceFailureCase.java b/sentry-provider/sentry-provider-db/src/test/java/org/apache/sentry/provider/db/service/thrift/TestSentryServiceFailureCase.java
index 7c7ebab..54f5cfa 100644
--- a/sentry-provider/sentry-provider-db/src/test/java/org/apache/sentry/provider/db/service/thrift/TestSentryServiceFailureCase.java
+++ b/sentry-provider/sentry-provider-db/src/test/java/org/apache/sentry/provider/db/service/thrift/TestSentryServiceFailureCase.java
@@ -67,9 +67,9 @@ public class TestSentryServiceFailureCase extends SentryServiceIntegrationBase {
if (cause == null) {
throw e;
}
- String msg = "Exception message: " + cause.getMessage() + " to contain " +
+ String msg = "Exception message: " + cause.getCause().getMessage() + " to contain " +
PEER_CALLBACK_FAILURE;
- Assert.assertTrue(msg, Strings.nullToEmpty(cause.getMessage())
+ Assert.assertTrue(msg, Strings.nullToEmpty(cause.getCause().getMessage())
.contains(PEER_CALLBACK_FAILURE));
}
}
http://git-wip-us.apache.org/repos/asf/sentry/blob/e3d859a9/sentry-provider/sentry-provider-db/src/test/java/org/apache/sentry/service/thrift/SentryServiceIntegrationBase.java
----------------------------------------------------------------------
diff --git a/sentry-provider/sentry-provider-db/src/test/java/org/apache/sentry/service/thrift/SentryServiceIntegrationBase.java b/sentry-provider/sentry-provider-db/src/test/java/org/apache/sentry/service/thrift/SentryServiceIntegrationBase.java
index dfd79ae..45dffd5 100644
--- a/sentry-provider/sentry-provider-db/src/test/java/org/apache/sentry/service/thrift/SentryServiceIntegrationBase.java
+++ b/sentry-provider/sentry-provider-db/src/test/java/org/apache/sentry/service/thrift/SentryServiceIntegrationBase.java
@@ -345,7 +345,8 @@ public abstract class SentryServiceIntegrationBase extends SentryMiniKdcTestcase
return null;
}});
} else {
- */ test.runTestAsSubject();
+ */
+ test.runTestAsSubject();
//}
}
http://git-wip-us.apache.org/repos/asf/sentry/blob/e3d859a9/sentry-provider/sentry-provider-db/src/test/java/org/apache/sentry/service/thrift/TestPoolClientInvocationHandler.java
----------------------------------------------------------------------
diff --git a/sentry-provider/sentry-provider-db/src/test/java/org/apache/sentry/service/thrift/TestPoolClientInvocationHandler.java b/sentry-provider/sentry-provider-db/src/test/java/org/apache/sentry/service/thrift/TestPoolClientInvocationHandler.java
index 7292387..a202775 100644
--- a/sentry-provider/sentry-provider-db/src/test/java/org/apache/sentry/service/thrift/TestPoolClientInvocationHandler.java
+++ b/sentry-provider/sentry-provider-db/src/test/java/org/apache/sentry/service/thrift/TestPoolClientInvocationHandler.java
@@ -19,7 +19,7 @@
package org.apache.sentry.service.thrift;
import com.google.common.net.HostAndPort;
-import org.apache.sentry.provider.db.service.thrift.ThriftUtil;
+import org.apache.sentry.core.common.utils.ThriftUtil;
import org.junit.Assert;
import org.junit.Test;
import org.slf4j.Logger;
[4/5] sentry git commit: SENTRY-1593
Posted by ak...@apache.org.
SENTRY-1593
Project: http://git-wip-us.apache.org/repos/asf/sentry/repo
Commit: http://git-wip-us.apache.org/repos/asf/sentry/commit/e3d859a9
Tree: http://git-wip-us.apache.org/repos/asf/sentry/tree/e3d859a9
Diff: http://git-wip-us.apache.org/repos/asf/sentry/diff/e3d859a9
Branch: refs/heads/SENTRY-1593-akolb
Commit: e3d859a990d7d42b4c3942e64a803e13ad897f22
Parents: 2811311
Author: Alexander Kolbasov <ak...@cloudera.com>
Authored: Wed Apr 5 21:19:29 2017 -0700
Committer: Alexander Kolbasov <ak...@cloudera.com>
Committed: Wed Apr 5 21:19:29 2017 -0700
----------------------------------------------------------------------
sentry-core/sentry-core-common/pom.xml | 4 +
.../exception/SentryHdfsServiceException.java | 37 +
.../transport/RetryClientInvocationHandler.java | 148 ++++
.../SentryClientInvocationHandler.java | 54 ++
.../SentryClientTransportConfigInterface.java | 7 +
.../SentryClientTransportConstants.java | 1 +
.../SentryHDFSClientTransportConfig.java | 5 +
.../SentryPolicyClientTransportConfig.java | 5 +
.../common/transport/SentryServiceClient.java | 48 ++
...SentryServiceClientTransportDefaultImpl.java | 342 +++++++++
.../core/common/utils/PolicyStoreConstants.java | 32 +
.../sentry/core/common/utils/ThriftUtil.java | 130 ++++
.../apache/sentry/hdfs/ServiceConstants.java | 3 -
.../sentry/hdfs/SentryHDFSServiceClient.java | 7 +-
.../SentryHDFSServiceClientDefaultImpl.java | 163 +----
.../hdfs/SentryHDFSServiceClientFactory.java | 12 +-
.../hdfs/SentryHDFSServiceProcessorFactory.java | 2 +-
.../sentry/hdfs/SentryHdfsServiceException.java | 33 -
.../thrift/SentryGenericPolicyProcessor.java | 2 +-
.../SentryGenericPolicyProcessorWrapper.java | 2 +-
.../thrift/SentryGenericServiceClient.java | 5 +-
.../SentryGenericServiceClientDefaultImpl.java | 303 +++-----
.../SentryGenericServiceClientFactory.java | 9 +-
.../db/log/entity/JsonLogEntityFactory.java | 2 +-
.../service/persistent/TransactionManager.java | 2 +-
.../db/service/thrift/PolicyStoreConstants.java | 32 -
.../thrift/SentryPolicyServiceClient.java | 5 +-
.../SentryPolicyServiceClientDefaultImpl.java | 690 +++++++------------
.../thrift/SentryPolicyStoreProcessor.java | 2 +-
.../service/thrift/SentryProcessorWrapper.java | 1 +
.../provider/db/service/thrift/ThriftUtil.java | 127 ----
.../thrift/PoolClientInvocationHandler.java | 3 +-
.../thrift/RetryClientInvocationHandler.java | 146 ----
.../thrift/SentryClientInvocationHandler.java | 54 --
.../thrift/SentryServiceClientFactory.java | 4 +-
.../sentry/service/thrift/ServiceConstants.java | 17 -
.../TestSentryGenericPolicyProcessor.java | 2 +-
.../db/log/entity/TestJsonLogEntityFactory.java | 2 +-
.../log/entity/TestJsonLogEntityFactoryGM.java | 2 +-
.../thrift/TestSentryPolicyServiceClient.java | 64 ++
.../thrift/TestSentryPolicyStoreProcessor.java | 2 +-
.../thrift/TestSentryServiceFailureCase.java | 4 +-
.../thrift/SentryServiceIntegrationBase.java | 3 +-
.../thrift/TestPoolClientInvocationHandler.java | 2 +-
44 files changed, 1321 insertions(+), 1199 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/sentry/blob/e3d859a9/sentry-core/sentry-core-common/pom.xml
----------------------------------------------------------------------
diff --git a/sentry-core/sentry-core-common/pom.xml b/sentry-core/sentry-core-common/pom.xml
index 9d18063..4d0e832 100644
--- a/sentry-core/sentry-core-common/pom.xml
+++ b/sentry-core/sentry-core-common/pom.xml
@@ -62,6 +62,10 @@ limitations under the License.
<artifactId>hadoop-common</artifactId>
<scope>provided</scope>
</dependency>
+ <dependency>
+ <groupId>org.apache.thrift</groupId>
+ <artifactId>libthrift</artifactId>
+ </dependency>
</dependencies>
<build>
http://git-wip-us.apache.org/repos/asf/sentry/blob/e3d859a9/sentry-core/sentry-core-common/src/main/java/org/apache/sentry/core/common/exception/SentryHdfsServiceException.java
----------------------------------------------------------------------
diff --git a/sentry-core/sentry-core-common/src/main/java/org/apache/sentry/core/common/exception/SentryHdfsServiceException.java b/sentry-core/sentry-core-common/src/main/java/org/apache/sentry/core/common/exception/SentryHdfsServiceException.java
new file mode 100644
index 0000000..5a7cce3
--- /dev/null
+++ b/sentry-core/sentry-core-common/src/main/java/org/apache/sentry/core/common/exception/SentryHdfsServiceException.java
@@ -0,0 +1,37 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.sentry.core.common.exception;
+
+/**
+ * Signals that there was an error while using Sentry Hdfs Service
+ * <p>
+ * Exception thrown by <code>SentryHDFSServiceClient</code> on catching any exception
+ * using the <code>SentryHDFSService</code>.
+ */
+public class SentryHdfsServiceException extends RuntimeException {
+ private static final long serialVersionUID = 1511645864949767378L;
+
+ public SentryHdfsServiceException(String message, Throwable cause) {
+ super(message, cause);
+ }
+
+ public SentryHdfsServiceException(String message) {
+ super(message);
+ }
+}
http://git-wip-us.apache.org/repos/asf/sentry/blob/e3d859a9/sentry-core/sentry-core-common/src/main/java/org/apache/sentry/core/common/transport/RetryClientInvocationHandler.java
----------------------------------------------------------------------
diff --git a/sentry-core/sentry-core-common/src/main/java/org/apache/sentry/core/common/transport/RetryClientInvocationHandler.java b/sentry-core/sentry-core-common/src/main/java/org/apache/sentry/core/common/transport/RetryClientInvocationHandler.java
new file mode 100644
index 0000000..b01cb37
--- /dev/null
+++ b/sentry-core/sentry-core-common/src/main/java/org/apache/sentry/core/common/transport/RetryClientInvocationHandler.java
@@ -0,0 +1,148 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.sentry.core.common.transport;
+
+import com.google.common.base.Preconditions;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.sentry.core.common.exception.SentryUserException;
+import org.apache.sentry.core.common.exception.SentryHdfsServiceException;
+import org.apache.thrift.transport.TTransportException;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.lang.reflect.InvocationTargetException;
+import java.lang.reflect.Method;
+
+/**
+ * The RetryClientInvocationHandler is a proxy class for handling thrift calls for non-pool
+ * model. Currently only one client connection is allowed.
+ * <p>
+ * For every rpc call, if the client is not connected, it will first connect to one of the
+ * sentry servers, and then do the thrift call to the connected sentry server, which will
+ * execute the requested method and return back the response. If it is failed with connection
+ * problem, it will close the current connection and retry (reconnect and resend the
+ * thrift call) no more than rpcRetryTotal times. If the client is already connected, it
+ * will reuse the existing connection, and do the thrift call.
+ * <p>
+ * During reconnection, it will first cycle through all the available sentry servers, and
+ * then retry the whole server list no more than connectionFullRetryTotal times. In this
+ * case, it won't introduce more latency when some server fails. Also to prevent all
+ * clients connecting to the same server, it will reorder the endpoints randomly after a
+ * full retry.
+ * <p>
+ * TODO(kalyan) allow multiple client connections using <code>PoolClientInvocationHandler</code>
+ */
+
+public class RetryClientInvocationHandler extends SentryClientInvocationHandler {
+ private static final Logger LOGGER =
+ LoggerFactory.getLogger(RetryClientInvocationHandler.class);
+ private SentryServiceClient client = null;
+
+ /**
+ * Initialize the sentry configurations, including rpc retry count and client connection
+ * configs for SentryPolicyServiceClientDefaultImpl
+ */
+ public RetryClientInvocationHandler(Configuration conf, SentryServiceClient clientObject) {
+ Preconditions.checkNotNull(conf, "Configuration object cannot be null");
+ client = clientObject;
+ }
+
+ /**
+ * For every rpc call, if the client is not connected, it will first connect to a sentry
+ * server, and then do the thrift call to the connected sentry server, which will
+ * execute the requested method and return back the response. If it is failed with
+ * connection problem, it will close the current connection, and retry (reconnect and
+ * resend the thrift call) no more than rpcRetryTotal times. Throw SentryUserException
+ * if failed retry after rpcRetryTotal times.
+ * if it is failed with other exception, method would just re-throw the exception.
+ * Synchronized it for thread safety.
+ */
+ @Override
+ synchronized public Object invokeImpl(Object proxy, Method method, Object[] args) throws Exception {
+ int retryCount = 0;
+ Exception lastExc = null;
+ boolean tryAlternateServer = false;
+
+ while (retryCount < client.getRetryCount()) {
+ // Connect to a sentry server if not connected yet.
+ try {
+ client.connectWithRetry(tryAlternateServer);
+ } catch (IOException e) {
+ // Increase the retry num
+ // Retry when the exception is caused by connection problem.
+ retryCount++;
+ lastExc = e;
+ close();
+ continue;
+ }
+
+ // do the thrift call
+ try {
+ return method.invoke(client, args);
+ } catch (InvocationTargetException e) {
+ // Get the target exception, check if SentryUserException or TTransportException is wrapped.
+ // TTransportException means there has connection problem with the pool.
+ Throwable targetException = e.getCause();
+ if (targetException instanceof SentryUserException ||
+ targetException instanceof SentryHdfsServiceException) {
+ Throwable sentryTargetException = targetException.getCause();
+ // If there has connection problem, eg, invalid connection if the service restarted,
+ // sentryTargetException instanceof TTransportException will be true.
+ if (sentryTargetException instanceof TTransportException) {
+ // Retry when the exception is caused by connection problem.
+ lastExc = new TTransportException(sentryTargetException);
+ LOGGER.error("Got TTransportException when do the thrift call ", lastExc);
+ tryAlternateServer = true;
+ // Closing the thrift client on TTransportException. New client object is
+ // created using new socket when an attempt to reconnect is made.
+ close();
+ } else {
+ // The exception is thrown by thrift call, eg, SentryAccessDeniedException.
+ // Do not need to reconnect to the sentry server.
+ if (targetException instanceof SentryUserException) {
+ throw (SentryUserException) targetException;
+ } else {
+ throw (SentryHdfsServiceException) targetException;
+ }
+ }
+ } else {
+ throw e;
+ }
+ }
+
+ // Increase the retry num
+ retryCount++;
+ }
+
+ // Throw the exception as reaching the max rpc retry num.
+ LOGGER.error(String.format("failed after %d retries ", client.getRetryCount()), lastExc);
+ throw new SentryUserException(
+ String.format("failed after %d retries ", client.getRetryCount()), lastExc);
+ }
+
+ @Override
+ public void close() {
+ try {
+ LOGGER.debug("Closing the current client connection");
+ client.close();
+ } catch (Exception e) {
+ LOGGER.error("Encountered failure while closing the connection");
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/sentry/blob/e3d859a9/sentry-core/sentry-core-common/src/main/java/org/apache/sentry/core/common/transport/SentryClientInvocationHandler.java
----------------------------------------------------------------------
diff --git a/sentry-core/sentry-core-common/src/main/java/org/apache/sentry/core/common/transport/SentryClientInvocationHandler.java b/sentry-core/sentry-core-common/src/main/java/org/apache/sentry/core/common/transport/SentryClientInvocationHandler.java
new file mode 100644
index 0000000..bf33fda
--- /dev/null
+++ b/sentry-core/sentry-core-common/src/main/java/org/apache/sentry/core/common/transport/SentryClientInvocationHandler.java
@@ -0,0 +1,54 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.sentry.core.common.transport;
+
+import java.lang.reflect.InvocationHandler;
+import java.lang.reflect.Method;
+
+/**
+ * SentryClientInvocationHandler is the base interface for all the InvocationHandler in SENTRY
+ */
+public abstract class SentryClientInvocationHandler implements InvocationHandler {
+
+ /**
+ * Close the InvocationHandler: An InvocationHandler may create some contexts,
+ * these contexts should be close when the method "close()" of client be called.
+ */
+ @Override
+ public final Object invoke(Object proxy, Method method, Object[] args) throws Exception {
+ // close() doesn't throw exception we supress that in case of connection
+ // loss. Changing SentryPolicyServiceClient#close() to throw an
+ // exception would be a backward incompatible change for Sentry clients.
+ if ("close".equals(method.getName()) && null == args) {
+ close();
+ return null;
+ }
+ return invokeImpl(proxy, method, args);
+ }
+
+ /**
+ * Subclass should implement this method for special function
+ */
+ abstract public Object invokeImpl(Object proxy, Method method, Object[] args) throws Exception;
+
+ /**
+ * An abstract method "close", an invocationHandler should close its contexts at here.
+ */
+ public abstract void close();
+
+}
http://git-wip-us.apache.org/repos/asf/sentry/blob/e3d859a9/sentry-core/sentry-core-common/src/main/java/org/apache/sentry/core/common/transport/SentryClientTransportConfigInterface.java
----------------------------------------------------------------------
diff --git a/sentry-core/sentry-core-common/src/main/java/org/apache/sentry/core/common/transport/SentryClientTransportConfigInterface.java b/sentry-core/sentry-core-common/src/main/java/org/apache/sentry/core/common/transport/SentryClientTransportConfigInterface.java
index 6cea596..24192fd 100644
--- a/sentry-core/sentry-core-common/src/main/java/org/apache/sentry/core/common/transport/SentryClientTransportConfigInterface.java
+++ b/sentry-core/sentry-core-common/src/main/java/org/apache/sentry/core/common/transport/SentryClientTransportConfigInterface.java
@@ -40,6 +40,13 @@ interface SentryClientTransportConfigInterface {
/**
* @param conf configuration
+ * @return number of times should client re-create the transport and try to connect
+ * before finally giving up.
+ */
+ int getSentryRpcRetryTotal(Configuration conf);
+
+ /**
+ * @param conf configuration
* @return True, if kerberos should be enabled.
* False, Iff kerberos is enabled.
* @throws MissingConfigurationException if property is mandatory and is missing in
http://git-wip-us.apache.org/repos/asf/sentry/blob/e3d859a9/sentry-core/sentry-core-common/src/main/java/org/apache/sentry/core/common/transport/SentryClientTransportConstants.java
----------------------------------------------------------------------
diff --git a/sentry-core/sentry-core-common/src/main/java/org/apache/sentry/core/common/transport/SentryClientTransportConstants.java b/sentry-core/sentry-core-common/src/main/java/org/apache/sentry/core/common/transport/SentryClientTransportConstants.java
index 636de40..4af7d1f 100644
--- a/sentry-core/sentry-core-common/src/main/java/org/apache/sentry/core/common/transport/SentryClientTransportConstants.java
+++ b/sentry-core/sentry-core-common/src/main/java/org/apache/sentry/core/common/transport/SentryClientTransportConstants.java
@@ -28,6 +28,7 @@ package org.apache.sentry.core.common.transport;
* <code>SentryClientTransportConfigInterface</code>.
*/
class SentryClientTransportConstants {
+
/**
* max retry num for client rpc
* {link RetryClientInvocationHandler#invokeImpl(Object, Method, Object[])}
http://git-wip-us.apache.org/repos/asf/sentry/blob/e3d859a9/sentry-core/sentry-core-common/src/main/java/org/apache/sentry/core/common/transport/SentryHDFSClientTransportConfig.java
----------------------------------------------------------------------
diff --git a/sentry-core/sentry-core-common/src/main/java/org/apache/sentry/core/common/transport/SentryHDFSClientTransportConfig.java b/sentry-core/sentry-core-common/src/main/java/org/apache/sentry/core/common/transport/SentryHDFSClientTransportConfig.java
index 12175f7..64cdd46 100644
--- a/sentry-core/sentry-core-common/src/main/java/org/apache/sentry/core/common/transport/SentryHDFSClientTransportConfig.java
+++ b/sentry-core/sentry-core-common/src/main/java/org/apache/sentry/core/common/transport/SentryHDFSClientTransportConfig.java
@@ -46,6 +46,11 @@ public final class SentryHDFSClientTransportConfig
}
@Override
+ public int getSentryRpcRetryTotal(Configuration conf) {
+ return conf.getInt(SENTRY_RPC_RETRY_TOTAL, SENTRY_RPC_RETRY_TOTAL_DEFAULT);
+ }
+
+ @Override
public boolean useUserGroupInformation(Configuration conf)
throws MissingConfigurationException {
return Boolean.valueOf(conf.get(SECURITY_USE_UGI_TRANSPORT, "true"));
http://git-wip-us.apache.org/repos/asf/sentry/blob/e3d859a9/sentry-core/sentry-core-common/src/main/java/org/apache/sentry/core/common/transport/SentryPolicyClientTransportConfig.java
----------------------------------------------------------------------
diff --git a/sentry-core/sentry-core-common/src/main/java/org/apache/sentry/core/common/transport/SentryPolicyClientTransportConfig.java b/sentry-core/sentry-core-common/src/main/java/org/apache/sentry/core/common/transport/SentryPolicyClientTransportConfig.java
index 038bca7..396a7f6 100644
--- a/sentry-core/sentry-core-common/src/main/java/org/apache/sentry/core/common/transport/SentryPolicyClientTransportConfig.java
+++ b/sentry-core/sentry-core-common/src/main/java/org/apache/sentry/core/common/transport/SentryPolicyClientTransportConfig.java
@@ -46,6 +46,11 @@ public final class SentryPolicyClientTransportConfig
}
@Override
+ public int getSentryRpcRetryTotal(Configuration conf) {
+ return conf.getInt(SENTRY_RPC_RETRY_TOTAL, SENTRY_RPC_RETRY_TOTAL_DEFAULT);
+ }
+
+ @Override
public boolean useUserGroupInformation(Configuration conf)
throws MissingConfigurationException {
return Boolean.valueOf(conf.get(SECURITY_USE_UGI_TRANSPORT, "true"));
http://git-wip-us.apache.org/repos/asf/sentry/blob/e3d859a9/sentry-core/sentry-core-common/src/main/java/org/apache/sentry/core/common/transport/SentryServiceClient.java
----------------------------------------------------------------------
diff --git a/sentry-core/sentry-core-common/src/main/java/org/apache/sentry/core/common/transport/SentryServiceClient.java b/sentry-core/sentry-core-common/src/main/java/org/apache/sentry/core/common/transport/SentryServiceClient.java
new file mode 100644
index 0000000..dc93fb7
--- /dev/null
+++ b/sentry-core/sentry-core-common/src/main/java/org/apache/sentry/core/common/transport/SentryServiceClient.java
@@ -0,0 +1,48 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.sentry.core.common.transport;
+
+import java.io.Closeable;
+
+/**
+ * Client interface for Proxy Invocation handlers
+ * <p>
+ * Defines interface that Sentry client's should expose to the Invocation handlers like
+ * <code>RetryClientInvocationHandler</code> used to proxy the method invocation on sentry
+ * client instances .
+ * <p>
+ * All the sentry clients that need retrying and failover capabilities should implement
+ * this interface.
+ */
+public interface SentryServiceClient extends Closeable {
+ /**
+ * This is a no-op when already connected.
+ * When there is a connection error, it will retry with another sentry server. It will
+ * first cycle through all the available sentry servers, and then retry the whole server
+ * list no more than connectionFullRetryTotal times. In this case, it won't introduce
+ * more latency when some server fails. Also to prevent all clients connecting to the
+ * same server, it will reorder the endpoints randomly after a full retry.
+ * <p>
+ * TODO: Add metrics for the number of successful connects and errors per client, and total number of retries.
+ */
+ void connectWithRetry(boolean tryAlternateServer) throws Exception;
+
+ int getRetryCount();
+
+ void close();
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/sentry/blob/e3d859a9/sentry-core/sentry-core-common/src/main/java/org/apache/sentry/core/common/transport/SentryServiceClientTransportDefaultImpl.java
----------------------------------------------------------------------
diff --git a/sentry-core/sentry-core-common/src/main/java/org/apache/sentry/core/common/transport/SentryServiceClientTransportDefaultImpl.java b/sentry-core/sentry-core-common/src/main/java/org/apache/sentry/core/common/transport/SentryServiceClientTransportDefaultImpl.java
new file mode 100644
index 0000000..4c126fb
--- /dev/null
+++ b/sentry-core/sentry-core-common/src/main/java/org/apache/sentry/core/common/transport/SentryServiceClientTransportDefaultImpl.java
@@ -0,0 +1,342 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.sentry.core.common.transport;
+
+import com.google.common.base.Preconditions;
+import com.google.common.collect.ImmutableMap;
+import com.google.common.net.HostAndPort;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.net.NetUtils;
+import org.apache.hadoop.security.SaslRpcServer;
+import org.apache.hadoop.security.SecurityUtil;
+import org.apache.hadoop.security.UserGroupInformation;
+import org.apache.sentry.core.common.exception.MissingConfigurationException;
+import org.apache.thrift.transport.TSaslClientTransport;
+import org.apache.thrift.transport.TSocket;
+import org.apache.thrift.transport.TTransport;
+import org.apache.thrift.transport.TTransportException;
+import org.apache.sentry.core.common.utils.ThriftUtil;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.security.sasl.Sasl;
+import javax.security.sasl.SaslException;
+
+import java.io.IOException;
+import java.net.InetSocketAddress;
+import java.security.PrivilegedExceptionAction;
+import java.util.ArrayList;
+import java.util.Collections;
+
+/**
+ * Implements the transport functionality for sentry clients.
+ * All the sentry clients should extend this class for transport implementation.
+ */
+
+public abstract class SentryServiceClientTransportDefaultImpl {
+ protected final Configuration conf;
+ protected final boolean kerberos;
+ private String[] serverPrincipalParts;
+
+ protected TTransport transport;
+ private final int connectionTimeout;
+ private static final Logger LOGGER = LoggerFactory.getLogger(SentryServiceClientTransportDefaultImpl.class);
+ // configs for connection retry
+ private final int connectionFullRetryTotal;
+ private final int rpcRetryTotal;
+ private final ArrayList<InetSocketAddress> endpoints;
+ protected InetSocketAddress serverAddress;
+ private final SentryClientTransportConfigInterface transportConfig;
+ private static final ImmutableMap<String, String> SASL_PROPERTIES =
+ ImmutableMap.of(Sasl.SERVER_AUTH, "true", Sasl.QOP, "auth-conf");
+
+ /**
+ * Defines various client types.
+ */
+ protected enum sentryClientType {
+ POLICY_CLIENT,
+ HDFS_CLIENT,
+ }
+
+ /**
+ * This transport wraps the Sasl transports to set up the right UGI context for open().
+ */
+ public static class UgiSaslClientTransport extends TSaslClientTransport {
+ UserGroupInformation ugi = null;
+
+ public UgiSaslClientTransport(String mechanism, String protocol,
+ String serverName, TTransport transport,
+ boolean wrapUgi, Configuration conf)
+ throws IOException, SaslException {
+ super(mechanism, null, protocol, serverName, SASL_PROPERTIES, null,
+ transport);
+ if (wrapUgi) {
+ // If we don't set the configuration, the UGI will be created based on
+ // what's on the classpath, which may lack the kerberos changes we require
+ UserGroupInformation.setConfiguration(conf);
+ ugi = UserGroupInformation.getLoginUser();
+ }
+ }
+
+ // open the SASL transport with using the current UserGroupInformation
+ // This is needed to get the current login context stored
+ @Override
+ public void open() throws TTransportException {
+ if (ugi == null) {
+ baseOpen();
+ } else {
+ try {
+ if (ugi.isFromKeytab()) {
+ ugi.checkTGTAndReloginFromKeytab();
+ }
+ ugi.doAs(new PrivilegedExceptionAction<Void>() {
+ public Void run() throws TTransportException {
+ baseOpen();
+ return null;
+ }
+ });
+ } catch (IOException e) {
+ throw new TTransportException("Failed to open SASL transport: " + e.getMessage(), e);
+ } catch (InterruptedException e) {
+ throw new TTransportException(
+ "Interrupted while opening underlying transport: " + e.getMessage(), e);
+ }
+ }
+ }
+
+ private void baseOpen() throws TTransportException {
+ super.open();
+ }
+ }
+
+ /**
+ * Initialize the object based on the sentry configuration provided.
+ * List of configured servers are reordered randomly preventing all
+ * clients connecting to the same server.
+ *
+ * @param conf Sentry configuration
+ * @param type Type indicates the service type
+ */
+ public SentryServiceClientTransportDefaultImpl(Configuration conf,
+ sentryClientType type) throws IOException {
+
+ this.conf = conf;
+ Preconditions.checkNotNull(this.conf, "Configuration object cannot be null");
+ serverPrincipalParts = null;
+ if (type == sentryClientType.POLICY_CLIENT) {
+ transportConfig = new SentryPolicyClientTransportConfig();
+ } else {
+ transportConfig = new SentryHDFSClientTransportConfig();
+ }
+
+ try {
+ String hostsAndPortsStr;
+ this.connectionTimeout = transportConfig.getServerRpcConnTimeoutInMs(conf);
+ this.connectionFullRetryTotal = transportConfig.getSentryFullRetryTotal(conf);
+ this.rpcRetryTotal = transportConfig.getSentryRpcRetryTotal(conf);
+ this.kerberos = transportConfig.isKerberosEnabled(conf);
+
+ hostsAndPortsStr = transportConfig.getSentryServerRpcAddress(conf);
+
+ int serverPort = transportConfig.getServerRpcPort(conf);
+
+ String[] hostsAndPortsStrArr = hostsAndPortsStr.split(",");
+ HostAndPort[] hostsAndPorts = ThriftUtil.parseHostPortStrings(hostsAndPortsStrArr, serverPort);
+
+ this.endpoints = new ArrayList(hostsAndPortsStrArr.length);
+ for (HostAndPort endpoint : hostsAndPorts) {
+ this.endpoints.add(
+ new InetSocketAddress(endpoint.getHostText(), endpoint.getPort()));
+ LOGGER.debug("Added server endpoint: " + endpoint.toString());
+ }
+
+ // Reorder endpoints randomly to prevent all clients connecting to the same endpoint
+ // at the same time after a node failure.
+ Collections.shuffle(endpoints);
+ serverAddress = null;
+ connectWithRetry(false);
+ } catch (Exception e) {
+ throw new RuntimeException("Client Creation Failed: " + e.getMessage(), e);
+ }
+ }
+
+ /**
+ * Initialize object based on the parameters provided provided.
+ *
+ * @param addr Host address which the client needs to connect
+ * @param port Host Port which the client needs to connect
+ * @param conf Sentry configuration
+ * @param type Type indicates the service type
+ */
+ public SentryServiceClientTransportDefaultImpl(String addr, int port, Configuration conf,
+ sentryClientType type) throws IOException {
+ // copy the configuration because we may make modifications to it.
+ this.conf = new Configuration(conf);
+ serverPrincipalParts = null;
+ Preconditions.checkNotNull(this.conf, "Configuration object cannot be null");
+ if (type == sentryClientType.POLICY_CLIENT) {
+ transportConfig = new SentryPolicyClientTransportConfig();
+ } else {
+ transportConfig = new SentryHDFSClientTransportConfig();
+ }
+
+ try {
+ InetSocketAddress serverAddress = NetUtils.createSocketAddr(addr, port);
+ this.connectionTimeout = transportConfig.getServerRpcConnTimeoutInMs(conf);
+ this.rpcRetryTotal = transportConfig.getSentryRpcRetryTotal(conf);
+ this.connectionFullRetryTotal = transportConfig.getSentryFullRetryTotal(conf);
+ this.kerberos = transportConfig.isKerberosEnabled(conf);
+ connect(serverAddress);
+ } catch (MissingConfigurationException e) {
+ throw new RuntimeException("Client Creation Failed: " + e.getMessage(), e);
+ }
+ endpoints = null;
+ }
+
+
+ /**
+ * no-op when already connected.
+ * On connection error, Iterates through all the configured servers and tries to connect.
+ * On successful connection, control returns
+ * On connection failure, continues iterating through all the configured sentry servers,
+ * and then retries the whole server list no more than connectionFullRetryTotal times.
+ * In this case, it won't introduce more latency when some server fails.
+ * <p>
+ * TODO: Add metrics for the number of successful connects and errors per client, and total number of retries.
+ */
+ public synchronized void connectWithRetry(boolean tryAlternateServer) throws IOException {
+ if (isConnected() && (!tryAlternateServer)) {
+ return;
+ }
+
+ IOException currentException = null;
+ for (int retryCount = 0; retryCount < connectionFullRetryTotal; retryCount++) {
+ try {
+ connectToAvailableServer();
+ return;
+ } catch (IOException e) {
+ currentException = e;
+ LOGGER.error(
+ String.format("Failed to connect to all the configured sentry servers, " +
+ "Retrying again"));
+ }
+ }
+ // Throw exception as reaching the max full connectWithRetry number.
+ LOGGER.error(
+ String.format("Reach the max connection retry num %d ", connectionFullRetryTotal),
+ currentException);
+ throw currentException;
+ }
+
+ /**
+ * Iterates through all the configured servers and tries to connect.
+ * On connection error, tries to connect to next server.
+ * Control returns on successful connection OR it's done trying to all the
+ * configured servers.
+ *
+ * @throws IOException
+ */
+ private void connectToAvailableServer() throws IOException {
+ IOException currentException = null;
+ if (endpoints.size() == 1) {
+ connect(endpoints.get(0));
+ return;
+ }
+
+ for (InetSocketAddress addr : endpoints) {
+ try {
+ serverAddress = addr;
+ connect(serverAddress);
+ LOGGER.info(String.format("Connected to SentryServer: %s", addr.toString()));
+ return;
+ } catch (IOException e) {
+ LOGGER.error(String.format("Failed connection to %s: %s",
+ addr.toString(), e.getMessage()), e);
+ currentException = e;
+ }
+ }
+ throw currentException;
+ }
+
+ /**
+ * Connect to the specified socket address and throw IOException if failed.
+ *
+ * @param serverAddress Address client needs to connect
+ * @throws Exception if there is failure in establishing the connection.
+ */
+ protected void connect(InetSocketAddress serverAddress) throws IOException {
+ try {
+ transport = createTransport(serverAddress);
+ transport.open();
+ } catch (TTransportException e) {
+ throw new IOException("Failed to open transport: " + e.getMessage(), e);
+ } catch (MissingConfigurationException e) {
+ throw new RuntimeException(e.getMessage(), e);
+ }
+
+ LOGGER.debug("Successfully opened transport: " + transport + " to " + serverAddress);
+ }
+
+ /**
+ * New socket is is created
+ *
+ * @param serverAddress
+ * @return
+ * @throws TTransportException
+ * @throws MissingConfigurationException
+ * @throws IOException
+ */
+ private TTransport createTransport(InetSocketAddress serverAddress)
+ throws TTransportException, MissingConfigurationException, IOException {
+ TTransport socket = new TSocket(serverAddress.getHostName(),
+ serverAddress.getPort(), connectionTimeout);
+
+ if (kerberos) {
+ String serverPrincipal = transportConfig.getSentryPrincipal(conf);
+ serverPrincipal = SecurityUtil.getServerPrincipal(serverPrincipal, serverAddress.getAddress());
+ LOGGER.debug("Using server kerberos principal: " + serverPrincipal);
+ if (serverPrincipalParts == null) {
+ serverPrincipalParts = SaslRpcServer.splitKerberosName(serverPrincipal);
+ Preconditions.checkArgument(serverPrincipalParts.length == 3,
+ "Kerberos principal should have 3 parts: " + serverPrincipal);
+ }
+
+ boolean wrapUgi = transportConfig.useUserGroupInformation(conf);
+ return new UgiSaslClientTransport(SaslRpcServer.AuthMethod.KERBEROS.getMechanismName(),
+ serverPrincipalParts[0], serverPrincipalParts[1],
+ socket, wrapUgi, conf);
+ } else {
+ return socket;
+ }
+ }
+
+ private boolean isConnected() {
+ return transport != null && transport.isOpen();
+ }
+
+ public synchronized void close() {
+ if (isConnected()) {
+ transport.close();
+ }
+ }
+
+ public int getRetryCount() {
+ return rpcRetryTotal;
+ }
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/sentry/blob/e3d859a9/sentry-core/sentry-core-common/src/main/java/org/apache/sentry/core/common/utils/PolicyStoreConstants.java
----------------------------------------------------------------------
diff --git a/sentry-core/sentry-core-common/src/main/java/org/apache/sentry/core/common/utils/PolicyStoreConstants.java b/sentry-core/sentry-core-common/src/main/java/org/apache/sentry/core/common/utils/PolicyStoreConstants.java
new file mode 100644
index 0000000..97604de
--- /dev/null
+++ b/sentry-core/sentry-core-common/src/main/java/org/apache/sentry/core/common/utils/PolicyStoreConstants.java
@@ -0,0 +1,32 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.sentry.core.common.utils;
+
+public final class PolicyStoreConstants {
+ public static final String SENTRY_GENERIC_POLICY_NOTIFICATION = "sentry.generic.policy.notification";
+ public static final String SENTRY_GENERIC_POLICY_STORE = "sentry.generic.policy.store";
+ public static final String SENTRY_GENERIC_POLICY_STORE_DEFAULT =
+ "org.apache.sentry.provider.db.generic.service.persistent.DelegateSentryStore";
+ public static class PolicyStoreServerConfig {
+ public static final String NOTIFICATION_HANDLERS = "sentry.policy.store.notification.handlers";
+ }
+
+ private PolicyStoreConstants() {
+ // Make constructor private to avoid instantiation
+ }
+}
http://git-wip-us.apache.org/repos/asf/sentry/blob/e3d859a9/sentry-core/sentry-core-common/src/main/java/org/apache/sentry/core/common/utils/ThriftUtil.java
----------------------------------------------------------------------
diff --git a/sentry-core/sentry-core-common/src/main/java/org/apache/sentry/core/common/utils/ThriftUtil.java b/sentry-core/sentry-core-common/src/main/java/org/apache/sentry/core/common/utils/ThriftUtil.java
new file mode 100644
index 0000000..aa68299
--- /dev/null
+++ b/sentry-core/sentry-core-common/src/main/java/org/apache/sentry/core/common/utils/ThriftUtil.java
@@ -0,0 +1,130 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.sentry.core.common.utils;
+
+import com.google.common.net.HostAndPort;
+import org.apache.thrift.protocol.TProtocol;
+import org.apache.thrift.transport.TSaslClientTransport;
+import org.apache.thrift.transport.TSaslServerTransport;
+import org.apache.thrift.transport.TSocket;
+import org.apache.thrift.transport.TTransport;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.google.common.base.Preconditions;
+
+/**
+ * Utility class for Thrift clients and servers
+ */
+public final class ThriftUtil {
+
+ private static final Logger LOGGER = LoggerFactory.getLogger(ThriftUtil.class);
+
+ public static void setImpersonator(final TProtocol in) {
+ try {
+ TTransport transport = in.getTransport();
+ if (transport instanceof TSaslServerTransport) {
+ String impersonator = ((TSaslServerTransport) transport).getSaslServer()
+ .getAuthorizationID();
+ setImpersonator(impersonator);
+ }
+ } catch (Exception e) {
+ // If there has exception when get impersonator info, log the error information.
+ LOGGER.warn("There is an error when get the impersonator:" + e.getMessage());
+ }
+ }
+
+ public static void setIpAddress(final TProtocol in) {
+ try {
+ TTransport transport = in.getTransport();
+ TSocket tSocket = getUnderlyingSocketFromTransport(transport);
+ if (tSocket != null) {
+ setIpAddress(tSocket.getSocket().getInetAddress().toString());
+ } else {
+ LOGGER.warn("Unknown Transport, cannot determine ipAddress");
+ }
+ } catch (Exception e) {
+ // If there has exception when get impersonator info, log the error information.
+ LOGGER.warn("There is an error when get the client's ip address:" + e.getMessage());
+ }
+ }
+
+ /**
+ * Returns the underlying TSocket from the transport, or null of the transport type is unknown.
+ */
+ private static TSocket getUnderlyingSocketFromTransport(TTransport transport) {
+ Preconditions.checkNotNull(transport);
+ if (transport instanceof TSaslServerTransport) {
+ return (TSocket) ((TSaslServerTransport) transport).getUnderlyingTransport();
+ } else if (transport instanceof TSaslClientTransport) {
+ return (TSocket) ((TSaslClientTransport) transport).getUnderlyingTransport();
+ } else if (transport instanceof TSocket) {
+ return (TSocket) transport;
+ }
+ return null;
+ }
+
+ private static ThreadLocal<String> threadLocalIpAddress = new ThreadLocal<String>() {
+ @Override
+ protected synchronized String initialValue() {
+ return "";
+ }
+ };
+
+ public static void setIpAddress(String ipAddress) {
+ threadLocalIpAddress.set(ipAddress);
+ }
+
+ public static String getIpAddress() {
+ return threadLocalIpAddress.get();
+ }
+
+ private static ThreadLocal<String> threadLocalImpersonator = new ThreadLocal<String>() {
+ @Override
+ protected synchronized String initialValue() {
+ return "";
+ }
+ };
+
+ public static void setImpersonator(String impersonator) {
+ threadLocalImpersonator.set(impersonator);
+ }
+
+ public static String getImpersonator() {
+ return threadLocalImpersonator.get();
+ }
+
+ private ThriftUtil() {
+ // Make constructor private to avoid instantiation
+ }
+
+ /**
+ * Utility function for parsing host and port strings. Expected form should be
+ * (host:port). The hostname could be in ipv6 style. If port is not specified,
+ * defaultPort will be used.
+ */
+ public static HostAndPort[] parseHostPortStrings(String[] hostsAndPortsArr, int defaultPort) {
+ HostAndPort[] hostsAndPorts = new HostAndPort[hostsAndPortsArr.length];
+ for (int i = 0; i < hostsAndPorts.length; i++) {
+ hostsAndPorts[i] =
+ HostAndPort.fromString(hostsAndPortsArr[i]).withDefaultPort(defaultPort);
+ }
+ return hostsAndPorts;
+ }
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/sentry/blob/e3d859a9/sentry-hdfs/sentry-hdfs-common/src/main/java/org/apache/sentry/hdfs/ServiceConstants.java
----------------------------------------------------------------------
diff --git a/sentry-hdfs/sentry-hdfs-common/src/main/java/org/apache/sentry/hdfs/ServiceConstants.java b/sentry-hdfs/sentry-hdfs-common/src/main/java/org/apache/sentry/hdfs/ServiceConstants.java
index 23552c2..a311b5b 100644
--- a/sentry-hdfs/sentry-hdfs-common/src/main/java/org/apache/sentry/hdfs/ServiceConstants.java
+++ b/sentry-hdfs/sentry-hdfs-common/src/main/java/org/apache/sentry/hdfs/ServiceConstants.java
@@ -79,12 +79,9 @@ public class ServiceConstants {
public static final String PRINCIPAL = "sentry.hdfs.service.server.principal";
public static final String SERVER_RPC_PORT = "sentry.hdfs.service.client.server.rpc-port";
- public static final int SERVER_RPC_PORT_DEFAULT = 8038;
public static final String SERVER_RPC_ADDRESS = "sentry.hdfs.service.client.server.rpc-address";
- public static final String SERVER_RPC_CONN_TIMEOUT = "sentry.hdfs.service.client.server.rpc-connection-timeout";
- public static final int SERVER_RPC_CONN_TIMEOUT_DEFAULT = 200000;
public static final String USE_COMPACT_TRANSPORT = "sentry.hdfs.service.client.compact.transport";
public static final boolean USE_COMPACT_TRANSPORT_DEFAULT = false;
http://git-wip-us.apache.org/repos/asf/sentry/blob/e3d859a9/sentry-hdfs/sentry-hdfs-service/src/main/java/org/apache/sentry/hdfs/SentryHDFSServiceClient.java
----------------------------------------------------------------------
diff --git a/sentry-hdfs/sentry-hdfs-service/src/main/java/org/apache/sentry/hdfs/SentryHDFSServiceClient.java b/sentry-hdfs/sentry-hdfs-service/src/main/java/org/apache/sentry/hdfs/SentryHDFSServiceClient.java
index ab12bf4..faac053 100644
--- a/sentry-hdfs/sentry-hdfs-service/src/main/java/org/apache/sentry/hdfs/SentryHDFSServiceClient.java
+++ b/sentry-hdfs/sentry-hdfs-service/src/main/java/org/apache/sentry/hdfs/SentryHDFSServiceClient.java
@@ -17,7 +17,10 @@
*/
package org.apache.sentry.hdfs;
-public interface SentryHDFSServiceClient {
+import org.apache.sentry.core.common.exception.SentryHdfsServiceException;
+import org.apache.sentry.core.common.transport.SentryServiceClient;
+
+public interface SentryHDFSServiceClient extends SentryServiceClient {
String SENTRY_HDFS_SERVICE_NAME = "SentryHDFSService";
void notifyHMSUpdate(PathsUpdate update)
@@ -27,7 +30,5 @@ public interface SentryHDFSServiceClient {
SentryAuthzUpdate getAllUpdatesFrom(long permSeqNum, long pathSeqNum)
throws SentryHdfsServiceException;
-
- void close();
}
http://git-wip-us.apache.org/repos/asf/sentry/blob/e3d859a9/sentry-hdfs/sentry-hdfs-service/src/main/java/org/apache/sentry/hdfs/SentryHDFSServiceClientDefaultImpl.java
----------------------------------------------------------------------
diff --git a/sentry-hdfs/sentry-hdfs-service/src/main/java/org/apache/sentry/hdfs/SentryHDFSServiceClientDefaultImpl.java b/sentry-hdfs/sentry-hdfs-service/src/main/java/org/apache/sentry/hdfs/SentryHDFSServiceClientDefaultImpl.java
index 28b1224..d337319 100644
--- a/sentry-hdfs/sentry-hdfs-service/src/main/java/org/apache/sentry/hdfs/SentryHDFSServiceClientDefaultImpl.java
+++ b/sentry-hdfs/sentry-hdfs-service/src/main/java/org/apache/sentry/hdfs/SentryHDFSServiceClientDefaultImpl.java
@@ -6,9 +6,9 @@
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
@@ -19,161 +19,70 @@ package org.apache.sentry.hdfs;
import java.io.IOException;
import java.net.InetSocketAddress;
-import java.security.PrivilegedExceptionAction;
import java.util.LinkedList;
-import java.util.Map;
-import javax.security.auth.callback.CallbackHandler;
-import javax.security.sasl.Sasl;
-
-import com.google.common.collect.ImmutableMap;
import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.net.NetUtils;
-import org.apache.hadoop.security.SaslRpcServer;
-import org.apache.hadoop.security.SaslRpcServer.AuthMethod;
-import org.apache.hadoop.security.SecurityUtil;
-import org.apache.hadoop.security.UserGroupInformation;
-import org.apache.sentry.core.common.exception.MissingConfigurationException;
+import org.apache.sentry.core.common.exception.SentryHdfsServiceException;
+import org.apache.sentry.core.common.transport.SentryServiceClientTransportDefaultImpl;
import org.apache.sentry.hdfs.service.thrift.SentryHDFSService;
import org.apache.sentry.hdfs.service.thrift.SentryHDFSService.Client;
import org.apache.sentry.hdfs.service.thrift.TAuthzUpdateResponse;
import org.apache.sentry.hdfs.service.thrift.TPathsUpdate;
import org.apache.sentry.hdfs.service.thrift.TPermissionsUpdate;
-import org.apache.sentry.hdfs.ServiceConstants.ClientConfig;
-import org.apache.sentry.core.common.transport.SentryHDFSClientTransportConfig;
import org.apache.thrift.protocol.TBinaryProtocol;
import org.apache.thrift.protocol.TCompactProtocol;
import org.apache.thrift.protocol.TMultiplexedProtocol;
import org.apache.thrift.protocol.TProtocol;
-import org.apache.thrift.transport.TSaslClientTransport;
-import org.apache.thrift.transport.TSocket;
-import org.apache.thrift.transport.TTransport;
-import org.apache.thrift.transport.TTransportException;
+
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import com.google.common.base.Preconditions;
-
-public class SentryHDFSServiceClientDefaultImpl implements SentryHDFSServiceClient {
-
- private static final Logger LOGGER = LoggerFactory.getLogger(SentryHDFSServiceClientDefaultImpl.class);
-
- /**
- * This transport wraps the Sasl transports to set up the right UGI context for open().
- */
- public static class UgiSaslClientTransport extends TSaslClientTransport {
- protected UserGroupInformation ugi = null;
+/*
+ Sentry HDFS Service Client
- public UgiSaslClientTransport(String mechanism, String authorizationId,
- String protocol, String serverName, Map<String, String> props,
- CallbackHandler cbh, TTransport transport, boolean wrapUgi)
- throws IOException {
- super(mechanism, authorizationId, protocol, serverName, props, cbh,
- transport);
- if (wrapUgi) {
- ugi = UserGroupInformation.getLoginUser();
- }
- }
+ The public implementation of SentryHDFSServiceClient.
+ A Sentry Client in which all the operations are synchronized for thread safety
+ Note: When using this client, if there is an exception in RPC, socket can get into an inconsistent state.
+ So it is important to close and re-open the transport so that new socket is used.
+*/
- // open the SASL transport with using the current UserGroupInformation
- // This is needed to get the current login context stored
- @Override
- public void open() throws TTransportException {
- if (ugi == null) {
- baseOpen();
- } else {
- try {
- // ensure that the ticket is valid before connecting to service. Note that
- // checkTGTAndReloginFromKeytab() renew the ticket only when more than 80%
- // of ticket lifetime has passed.
- if (ugi.isFromKeytab()) {
- ugi.checkTGTAndReloginFromKeytab();
- }
- ugi.doAs(new PrivilegedExceptionAction<Void>() {
- public Void run() throws TTransportException {
- baseOpen();
- return null;
- }
- });
- } catch (IOException e) {
- throw new TTransportException("Failed to open SASL transport", e);
- } catch (InterruptedException e) {
- throw new TTransportException(
- "Interrupted while opening underlying transport", e);
- }
- }
- }
+public class SentryHDFSServiceClientDefaultImpl extends SentryServiceClientTransportDefaultImpl implements SentryHDFSServiceClient {
- private void baseOpen() throws TTransportException {
- super.open();
- }
- }
-
- private final Configuration conf;
- private final InetSocketAddress serverAddress;
- private final int connectionTimeout;
- private boolean kerberos;
- private TTransport transport;
+ private static final Logger LOGGER = LoggerFactory.getLogger(SentryHDFSServiceClientDefaultImpl.class);
- private String[] serverPrincipalParts;
private Client client;
- private final SentryHDFSClientTransportConfig transportConfig = new SentryHDFSClientTransportConfig();
- private static final ImmutableMap<String, String> SASL_PROPERTIES =
- ImmutableMap.of(Sasl.SERVER_AUTH, "true", Sasl.QOP, "auth-conf");
public SentryHDFSServiceClientDefaultImpl(Configuration conf) throws IOException {
- this.conf = conf;
- Preconditions.checkNotNull(this.conf, "Configuration object cannot be null");
- try {
- this.serverAddress = NetUtils.createSocketAddr(
- transportConfig.getSentryServerRpcAddress(conf),
- transportConfig.getServerRpcPort(conf));
- this.connectionTimeout = transportConfig.getServerRpcConnTimeoutInMs(conf);
- kerberos = transportConfig.isKerberosEnabled(conf);
- transport = new TSocket(serverAddress.getHostName(),
- serverAddress.getPort(), connectionTimeout);
- if (kerberos) {
- String serverPrincipal = transportConfig.getSentryPrincipal(conf);
- // Resolve server host in the same way as we are doing on server side
- serverPrincipal = SecurityUtil.getServerPrincipal(serverPrincipal, serverAddress.getAddress());
- LOGGER.info("Using server kerberos principal: " + serverPrincipal);
-
- serverPrincipalParts = SaslRpcServer.splitKerberosName(serverPrincipal);
- Preconditions.checkArgument(serverPrincipalParts.length == 3,
- "Kerberos principal should have 3 parts: " + serverPrincipal);
- boolean wrapUgi = transportConfig.useUserGroupInformation(conf);
- transport = new UgiSaslClientTransport(AuthMethod.KERBEROS.getMechanismName(),
- null, serverPrincipalParts[0], serverPrincipalParts[1],
- SASL_PROPERTIES, null, transport, wrapUgi);
- } else {
- serverPrincipalParts = null;
- }
+ super(conf, sentryClientType.HDFS_CLIENT);
+ }
- transport.open();
- } catch (TTransportException e) {
- throw new IOException("Transport exception while opening transport: " + e.getMessage(), e);
- } catch (MissingConfigurationException e) {
- throw new RuntimeException("Client Creation Failed: " + e.getMessage(), e);
- }
- LOGGER.info("Successfully opened transport: " + transport + " to " + serverAddress);
+ /**
+ * Connect to the specified socket address and then use the new socket
+ * to construct new thrift client.
+ *
+ * @param serverAddress: socket address to which the client should connect.
+ * @throws IOException
+ */
+ public void connect(InetSocketAddress serverAddress) throws IOException {
TProtocol tProtocol = null;
+ super.connect(serverAddress);
long maxMessageSize = conf.getLong(ServiceConstants.ClientConfig.SENTRY_HDFS_THRIFT_MAX_MESSAGE_SIZE,
- ServiceConstants.ClientConfig.SENTRY_HDFS_THRIFT_MAX_MESSAGE_SIZE_DEFAULT);
- if (conf.getBoolean(ClientConfig.USE_COMPACT_TRANSPORT,
- ClientConfig.USE_COMPACT_TRANSPORT_DEFAULT)) {
+ ServiceConstants.ClientConfig.SENTRY_HDFS_THRIFT_MAX_MESSAGE_SIZE_DEFAULT);
+ if (conf.getBoolean(ServiceConstants.ClientConfig.USE_COMPACT_TRANSPORT,
+ ServiceConstants.ClientConfig.USE_COMPACT_TRANSPORT_DEFAULT)) {
tProtocol = new TCompactProtocol(transport, maxMessageSize, maxMessageSize);
} else {
tProtocol = new TBinaryProtocol(transport, maxMessageSize, maxMessageSize, true, true);
}
TMultiplexedProtocol protocol = new TMultiplexedProtocol(
- tProtocol, SentryHDFSServiceClient.SENTRY_HDFS_SERVICE_NAME);
+ tProtocol, SentryHDFSServiceClient.SENTRY_HDFS_SERVICE_NAME);
client = new SentryHDFSService.Client(protocol);
LOGGER.info("Successfully created client");
}
public synchronized void notifyHMSUpdate(PathsUpdate update)
- throws SentryHdfsServiceException {
+ throws SentryHdfsServiceException {
try {
client.handle_hms_notification(update.toThrift());
} catch (Exception e) {
@@ -182,7 +91,7 @@ public class SentryHDFSServiceClientDefaultImpl implements SentryHDFSServiceClie
}
public synchronized long getLastSeenHMSPathSeqNum()
- throws SentryHdfsServiceException {
+ throws SentryHdfsServiceException {
try {
return client.check_hms_seq_num(-1);
} catch (Exception e) {
@@ -191,7 +100,7 @@ public class SentryHDFSServiceClientDefaultImpl implements SentryHDFSServiceClie
}
public synchronized SentryAuthzUpdate getAllUpdatesFrom(long permSeqNum, long pathSeqNum)
- throws SentryHdfsServiceException {
+ throws SentryHdfsServiceException {
SentryAuthzUpdate retVal = new SentryAuthzUpdate(new LinkedList<PermissionsUpdate>(), new LinkedList<PathsUpdate>());
try {
TAuthzUpdateResponse sentryUpdates = client.get_all_authz_updates_from(permSeqNum, pathSeqNum);
@@ -210,10 +119,4 @@ public class SentryHDFSServiceClientDefaultImpl implements SentryHDFSServiceClie
}
return retVal;
}
-
- public void close() {
- if (transport != null) {
- transport.close();
- }
- }
}
http://git-wip-us.apache.org/repos/asf/sentry/blob/e3d859a9/sentry-hdfs/sentry-hdfs-service/src/main/java/org/apache/sentry/hdfs/SentryHDFSServiceClientFactory.java
----------------------------------------------------------------------
diff --git a/sentry-hdfs/sentry-hdfs-service/src/main/java/org/apache/sentry/hdfs/SentryHDFSServiceClientFactory.java b/sentry-hdfs/sentry-hdfs-service/src/main/java/org/apache/sentry/hdfs/SentryHDFSServiceClientFactory.java
index 2a18b15..59ac360 100644
--- a/sentry-hdfs/sentry-hdfs-service/src/main/java/org/apache/sentry/hdfs/SentryHDFSServiceClientFactory.java
+++ b/sentry-hdfs/sentry-hdfs-service/src/main/java/org/apache/sentry/hdfs/SentryHDFSServiceClientFactory.java
@@ -17,7 +17,10 @@
*/
package org.apache.sentry.hdfs;
+import java.lang.reflect.Proxy;
+
import org.apache.hadoop.conf.Configuration;
+import org.apache.sentry.core.common.transport.RetryClientInvocationHandler;
/**
* Client factory to create normal client or proxy with HA invocation handler
@@ -27,10 +30,13 @@ public class SentryHDFSServiceClientFactory {
private SentryHDFSServiceClientFactory() {
// Make constructor private to avoid instantiation
}
-
+
public static SentryHDFSServiceClient create(Configuration conf)
throws Exception {
- return new SentryHDFSServiceClientDefaultImpl(conf);
+ return (SentryHDFSServiceClient) Proxy
+ .newProxyInstance(SentryHDFSServiceClientDefaultImpl.class.getClassLoader(),
+ SentryHDFSServiceClientDefaultImpl.class.getInterfaces(),
+ new RetryClientInvocationHandler(conf,
+ new SentryHDFSServiceClientDefaultImpl(conf)));
}
-
}
http://git-wip-us.apache.org/repos/asf/sentry/blob/e3d859a9/sentry-hdfs/sentry-hdfs-service/src/main/java/org/apache/sentry/hdfs/SentryHDFSServiceProcessorFactory.java
----------------------------------------------------------------------
diff --git a/sentry-hdfs/sentry-hdfs-service/src/main/java/org/apache/sentry/hdfs/SentryHDFSServiceProcessorFactory.java b/sentry-hdfs/sentry-hdfs-service/src/main/java/org/apache/sentry/hdfs/SentryHDFSServiceProcessorFactory.java
index 4dc99a2..1ad9a02 100644
--- a/sentry-hdfs/sentry-hdfs-service/src/main/java/org/apache/sentry/hdfs/SentryHDFSServiceProcessorFactory.java
+++ b/sentry-hdfs/sentry-hdfs-service/src/main/java/org/apache/sentry/hdfs/SentryHDFSServiceProcessorFactory.java
@@ -22,7 +22,7 @@ import org.apache.hadoop.conf.Configuration;
import org.apache.sentry.hdfs.service.thrift.SentryHDFSService;
import org.apache.sentry.hdfs.service.thrift.SentryHDFSService.Iface;
import org.apache.sentry.provider.db.service.persistent.SentryStore;
-import org.apache.sentry.provider.db.service.thrift.ThriftUtil;
+import org.apache.sentry.core.common.utils.ThriftUtil;
import org.apache.sentry.service.thrift.ProcessorFactory;
import org.apache.thrift.TException;
import org.apache.thrift.TMultiplexedProcessor;
http://git-wip-us.apache.org/repos/asf/sentry/blob/e3d859a9/sentry-hdfs/sentry-hdfs-service/src/main/java/org/apache/sentry/hdfs/SentryHdfsServiceException.java
----------------------------------------------------------------------
diff --git a/sentry-hdfs/sentry-hdfs-service/src/main/java/org/apache/sentry/hdfs/SentryHdfsServiceException.java b/sentry-hdfs/sentry-hdfs-service/src/main/java/org/apache/sentry/hdfs/SentryHdfsServiceException.java
deleted file mode 100644
index 307d8c3..0000000
--- a/sentry-hdfs/sentry-hdfs-service/src/main/java/org/apache/sentry/hdfs/SentryHdfsServiceException.java
+++ /dev/null
@@ -1,33 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.sentry.hdfs;
-
-public class SentryHdfsServiceException extends RuntimeException {
- private static final long serialVersionUID = 1511645864949767378L;
-
- public SentryHdfsServiceException(String message, Throwable cause) {
- super(message, cause);
- }
-
- public SentryHdfsServiceException(String message) {
- super(message);
- }
-
-
-}
http://git-wip-us.apache.org/repos/asf/sentry/blob/e3d859a9/sentry-provider/sentry-provider-db/src/main/java/org/apache/sentry/provider/db/generic/service/thrift/SentryGenericPolicyProcessor.java
----------------------------------------------------------------------
diff --git a/sentry-provider/sentry-provider-db/src/main/java/org/apache/sentry/provider/db/generic/service/thrift/SentryGenericPolicyProcessor.java b/sentry-provider/sentry-provider-db/src/main/java/org/apache/sentry/provider/db/generic/service/thrift/SentryGenericPolicyProcessor.java
index 919b81b..0c51cc6 100644
--- a/sentry-provider/sentry-provider-db/src/main/java/org/apache/sentry/provider/db/generic/service/thrift/SentryGenericPolicyProcessor.java
+++ b/sentry-provider/sentry-provider-db/src/main/java/org/apache/sentry/provider/db/generic/service/thrift/SentryGenericPolicyProcessor.java
@@ -46,7 +46,7 @@ import org.apache.sentry.provider.db.log.entity.JsonLogEntityFactory;
import org.apache.sentry.provider.db.log.util.Constants;
import org.apache.sentry.provider.db.service.model.MSentryGMPrivilege;
import org.apache.sentry.provider.db.service.model.MSentryRole;
-import org.apache.sentry.provider.db.service.thrift.PolicyStoreConstants;
+import org.apache.sentry.core.common.utils.PolicyStoreConstants;
import org.apache.sentry.provider.db.service.thrift.SentryPolicyStoreProcessor;
import org.apache.sentry.service.thrift.ServiceConstants.ServerConfig;
import org.apache.sentry.service.thrift.ServiceConstants.ThriftConstants;
http://git-wip-us.apache.org/repos/asf/sentry/blob/e3d859a9/sentry-provider/sentry-provider-db/src/main/java/org/apache/sentry/provider/db/generic/service/thrift/SentryGenericPolicyProcessorWrapper.java
----------------------------------------------------------------------
diff --git a/sentry-provider/sentry-provider-db/src/main/java/org/apache/sentry/provider/db/generic/service/thrift/SentryGenericPolicyProcessorWrapper.java b/sentry-provider/sentry-provider-db/src/main/java/org/apache/sentry/provider/db/generic/service/thrift/SentryGenericPolicyProcessorWrapper.java
index d320d0f..a0fc2cc 100644
--- a/sentry-provider/sentry-provider-db/src/main/java/org/apache/sentry/provider/db/generic/service/thrift/SentryGenericPolicyProcessorWrapper.java
+++ b/sentry-provider/sentry-provider-db/src/main/java/org/apache/sentry/provider/db/generic/service/thrift/SentryGenericPolicyProcessorWrapper.java
@@ -18,7 +18,7 @@
package org.apache.sentry.provider.db.generic.service.thrift;
-import org.apache.sentry.provider.db.service.thrift.ThriftUtil;
+import org.apache.sentry.core.common.utils.ThriftUtil;
import org.apache.thrift.TException;
import org.apache.thrift.protocol.TProtocol;
http://git-wip-us.apache.org/repos/asf/sentry/blob/e3d859a9/sentry-provider/sentry-provider-db/src/main/java/org/apache/sentry/provider/db/generic/service/thrift/SentryGenericServiceClient.java
----------------------------------------------------------------------
diff --git a/sentry-provider/sentry-provider-db/src/main/java/org/apache/sentry/provider/db/generic/service/thrift/SentryGenericServiceClient.java b/sentry-provider/sentry-provider-db/src/main/java/org/apache/sentry/provider/db/generic/service/thrift/SentryGenericServiceClient.java
index 11cdee7..c832706 100644
--- a/sentry-provider/sentry-provider-db/src/main/java/org/apache/sentry/provider/db/generic/service/thrift/SentryGenericServiceClient.java
+++ b/sentry-provider/sentry-provider-db/src/main/java/org/apache/sentry/provider/db/generic/service/thrift/SentryGenericServiceClient.java
@@ -24,8 +24,9 @@ import java.util.Set;
import org.apache.sentry.core.common.exception.SentryUserException;
import org.apache.sentry.core.common.ActiveRoleSet;
import org.apache.sentry.core.common.Authorizable;
+import org.apache.sentry.core.common.transport.SentryServiceClient;
-public interface SentryGenericServiceClient {
+public interface SentryGenericServiceClient extends SentryServiceClient {
/**
* Create a sentry role
@@ -191,6 +192,4 @@ public interface SentryGenericServiceClient {
Map<String, TSentryPrivilegeMap> listPrivilegsbyAuthorizable(String component,
String serviceName, String requestorUserName, Set<String> authorizablesSet,
Set<String> groups, ActiveRoleSet roleSet) throws SentryUserException;
-
- void close();
}
[5/5] sentry git commit: Client Failover reorg prototype
Posted by ak...@apache.org.
Client Failover reorg prototype
Project: http://git-wip-us.apache.org/repos/asf/sentry/repo
Commit: http://git-wip-us.apache.org/repos/asf/sentry/commit/c9c0119f
Tree: http://git-wip-us.apache.org/repos/asf/sentry/tree/c9c0119f
Diff: http://git-wip-us.apache.org/repos/asf/sentry/diff/c9c0119f
Branch: refs/heads/SENTRY-1593-akolb
Commit: c9c0119fc9e61615b9445d989dd63c9395a1b21c
Parents: e3d859a
Author: Alexander Kolbasov <ak...@cloudera.com>
Authored: Thu Apr 6 14:35:46 2017 -0700
Committer: Alexander Kolbasov <ak...@cloudera.com>
Committed: Thu Apr 6 14:35:46 2017 -0700
----------------------------------------------------------------------
.../transport/RetryClientInvocationHandler.java | 22 +-
.../SentryClientTransportConfigInterface.java | 2 +-
.../common/transport/SentryServiceClient.java | 48 ---
...SentryServiceClientTransportDefaultImpl.java | 342 -------------------
.../core/common/transport/SentrySocket.java | 32 ++
.../transport/SentryTransportFactory.java | 234 +++++++++++++
.../sentry/hdfs/SentryHDFSServiceClient.java | 5 +-
.../SentryHDFSServiceClientDefaultImpl.java | 43 ++-
.../hdfs/SentryHDFSServiceClientFactory.java | 11 +-
.../thrift/SentryGenericServiceClient.java | 5 +-
.../SentryGenericServiceClientDefaultImpl.java | 50 ++-
.../SentryGenericServiceClientFactory.java | 8 +-
.../thrift/SentryPolicyServiceClient.java | 5 +-
.../SentryPolicyServiceClientDefaultImpl.java | 46 ++-
.../thrift/SentryServiceClientFactory.java | 25 +-
.../thrift/SentryServiceClientPoolFactory.java | 27 +-
16 files changed, 428 insertions(+), 477 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/sentry/blob/c9c0119f/sentry-core/sentry-core-common/src/main/java/org/apache/sentry/core/common/transport/RetryClientInvocationHandler.java
----------------------------------------------------------------------
diff --git a/sentry-core/sentry-core-common/src/main/java/org/apache/sentry/core/common/transport/RetryClientInvocationHandler.java b/sentry-core/sentry-core-common/src/main/java/org/apache/sentry/core/common/transport/RetryClientInvocationHandler.java
index b01cb37..86569c9 100644
--- a/sentry-core/sentry-core-common/src/main/java/org/apache/sentry/core/common/transport/RetryClientInvocationHandler.java
+++ b/sentry-core/sentry-core-common/src/main/java/org/apache/sentry/core/common/transport/RetryClientInvocationHandler.java
@@ -49,16 +49,20 @@ import java.lang.reflect.Method;
* TODO(kalyan) allow multiple client connections using <code>PoolClientInvocationHandler</code>
*/
-public class RetryClientInvocationHandler extends SentryClientInvocationHandler {
+public final class RetryClientInvocationHandler extends SentryClientInvocationHandler {
private static final Logger LOGGER =
LoggerFactory.getLogger(RetryClientInvocationHandler.class);
- private SentryServiceClient client = null;
+ private final int retries;
+ private final SentrySocket client;
/**
* Initialize the sentry configurations, including rpc retry count and client connection
* configs for SentryPolicyServiceClientDefaultImpl
*/
- public RetryClientInvocationHandler(Configuration conf, SentryServiceClient clientObject) {
+ public RetryClientInvocationHandler(Configuration conf,
+ SentryClientTransportConfigInterface transportConfig,
+ SentrySocket clientObject) {
+ retries = transportConfig.getSentryRpcRetryTotal(conf);
Preconditions.checkNotNull(conf, "Configuration object cannot be null");
client = clientObject;
}
@@ -77,18 +81,17 @@ public class RetryClientInvocationHandler extends SentryClientInvocationHandler
synchronized public Object invokeImpl(Object proxy, Method method, Object[] args) throws Exception {
int retryCount = 0;
Exception lastExc = null;
- boolean tryAlternateServer = false;
- while (retryCount < client.getRetryCount()) {
+ while (retryCount < retries) {
// Connect to a sentry server if not connected yet.
try {
- client.connectWithRetry(tryAlternateServer);
+ client.connect();
} catch (IOException e) {
// Increase the retry num
// Retry when the exception is caused by connection problem.
retryCount++;
lastExc = e;
- close();
+ client.close();
continue;
}
@@ -108,7 +111,6 @@ public class RetryClientInvocationHandler extends SentryClientInvocationHandler
// Retry when the exception is caused by connection problem.
lastExc = new TTransportException(sentryTargetException);
LOGGER.error("Got TTransportException when do the thrift call ", lastExc);
- tryAlternateServer = true;
// Closing the thrift client on TTransportException. New client object is
// created using new socket when an attempt to reconnect is made.
close();
@@ -131,9 +133,9 @@ public class RetryClientInvocationHandler extends SentryClientInvocationHandler
}
// Throw the exception as reaching the max rpc retry num.
- LOGGER.error(String.format("failed after %d retries ", client.getRetryCount()), lastExc);
+ LOGGER.error(String.format("failed after %d retries ", retries), lastExc);
throw new SentryUserException(
- String.format("failed after %d retries ", client.getRetryCount()), lastExc);
+ String.format("failed after %d retries ", retries), lastExc);
}
@Override
http://git-wip-us.apache.org/repos/asf/sentry/blob/c9c0119f/sentry-core/sentry-core-common/src/main/java/org/apache/sentry/core/common/transport/SentryClientTransportConfigInterface.java
----------------------------------------------------------------------
diff --git a/sentry-core/sentry-core-common/src/main/java/org/apache/sentry/core/common/transport/SentryClientTransportConfigInterface.java b/sentry-core/sentry-core-common/src/main/java/org/apache/sentry/core/common/transport/SentryClientTransportConfigInterface.java
index 24192fd..3ea36a1 100644
--- a/sentry-core/sentry-core-common/src/main/java/org/apache/sentry/core/common/transport/SentryClientTransportConfigInterface.java
+++ b/sentry-core/sentry-core-common/src/main/java/org/apache/sentry/core/common/transport/SentryClientTransportConfigInterface.java
@@ -28,7 +28,7 @@ import org.apache.sentry.core.common.exception.MissingConfigurationException;
* This Configuration interface should be implemented for all the sentry clients to get
* the transport configuration.
*/
-interface SentryClientTransportConfigInterface {
+public interface SentryClientTransportConfigInterface {
/**
* @param conf configuration
* @return number of times client retry logic should iterate through all
http://git-wip-us.apache.org/repos/asf/sentry/blob/c9c0119f/sentry-core/sentry-core-common/src/main/java/org/apache/sentry/core/common/transport/SentryServiceClient.java
----------------------------------------------------------------------
diff --git a/sentry-core/sentry-core-common/src/main/java/org/apache/sentry/core/common/transport/SentryServiceClient.java b/sentry-core/sentry-core-common/src/main/java/org/apache/sentry/core/common/transport/SentryServiceClient.java
deleted file mode 100644
index dc93fb7..0000000
--- a/sentry-core/sentry-core-common/src/main/java/org/apache/sentry/core/common/transport/SentryServiceClient.java
+++ /dev/null
@@ -1,48 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- * <p>
- * http://www.apache.org/licenses/LICENSE-2.0
- * <p>
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.sentry.core.common.transport;
-
-import java.io.Closeable;
-
-/**
- * Client interface for Proxy Invocation handlers
- * <p>
- * Defines interface that Sentry client's should expose to the Invocation handlers like
- * <code>RetryClientInvocationHandler</code> used to proxy the method invocation on sentry
- * client instances .
- * <p>
- * All the sentry clients that need retrying and failover capabilities should implement
- * this interface.
- */
-public interface SentryServiceClient extends Closeable {
- /**
- * This is a no-op when already connected.
- * When there is a connection error, it will retry with another sentry server. It will
- * first cycle through all the available sentry servers, and then retry the whole server
- * list no more than connectionFullRetryTotal times. In this case, it won't introduce
- * more latency when some server fails. Also to prevent all clients connecting to the
- * same server, it will reorder the endpoints randomly after a full retry.
- * <p>
- * TODO: Add metrics for the number of successful connects and errors per client, and total number of retries.
- */
- void connectWithRetry(boolean tryAlternateServer) throws Exception;
-
- int getRetryCount();
-
- void close();
-}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/sentry/blob/c9c0119f/sentry-core/sentry-core-common/src/main/java/org/apache/sentry/core/common/transport/SentryServiceClientTransportDefaultImpl.java
----------------------------------------------------------------------
diff --git a/sentry-core/sentry-core-common/src/main/java/org/apache/sentry/core/common/transport/SentryServiceClientTransportDefaultImpl.java b/sentry-core/sentry-core-common/src/main/java/org/apache/sentry/core/common/transport/SentryServiceClientTransportDefaultImpl.java
deleted file mode 100644
index 4c126fb..0000000
--- a/sentry-core/sentry-core-common/src/main/java/org/apache/sentry/core/common/transport/SentryServiceClientTransportDefaultImpl.java
+++ /dev/null
@@ -1,342 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- * <p>
- * http://www.apache.org/licenses/LICENSE-2.0
- * <p>
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.sentry.core.common.transport;
-
-import com.google.common.base.Preconditions;
-import com.google.common.collect.ImmutableMap;
-import com.google.common.net.HostAndPort;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.net.NetUtils;
-import org.apache.hadoop.security.SaslRpcServer;
-import org.apache.hadoop.security.SecurityUtil;
-import org.apache.hadoop.security.UserGroupInformation;
-import org.apache.sentry.core.common.exception.MissingConfigurationException;
-import org.apache.thrift.transport.TSaslClientTransport;
-import org.apache.thrift.transport.TSocket;
-import org.apache.thrift.transport.TTransport;
-import org.apache.thrift.transport.TTransportException;
-import org.apache.sentry.core.common.utils.ThriftUtil;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import javax.security.sasl.Sasl;
-import javax.security.sasl.SaslException;
-
-import java.io.IOException;
-import java.net.InetSocketAddress;
-import java.security.PrivilegedExceptionAction;
-import java.util.ArrayList;
-import java.util.Collections;
-
-/**
- * Implements the transport functionality for sentry clients.
- * All the sentry clients should extend this class for transport implementation.
- */
-
-public abstract class SentryServiceClientTransportDefaultImpl {
- protected final Configuration conf;
- protected final boolean kerberos;
- private String[] serverPrincipalParts;
-
- protected TTransport transport;
- private final int connectionTimeout;
- private static final Logger LOGGER = LoggerFactory.getLogger(SentryServiceClientTransportDefaultImpl.class);
- // configs for connection retry
- private final int connectionFullRetryTotal;
- private final int rpcRetryTotal;
- private final ArrayList<InetSocketAddress> endpoints;
- protected InetSocketAddress serverAddress;
- private final SentryClientTransportConfigInterface transportConfig;
- private static final ImmutableMap<String, String> SASL_PROPERTIES =
- ImmutableMap.of(Sasl.SERVER_AUTH, "true", Sasl.QOP, "auth-conf");
-
- /**
- * Defines various client types.
- */
- protected enum sentryClientType {
- POLICY_CLIENT,
- HDFS_CLIENT,
- }
-
- /**
- * This transport wraps the Sasl transports to set up the right UGI context for open().
- */
- public static class UgiSaslClientTransport extends TSaslClientTransport {
- UserGroupInformation ugi = null;
-
- public UgiSaslClientTransport(String mechanism, String protocol,
- String serverName, TTransport transport,
- boolean wrapUgi, Configuration conf)
- throws IOException, SaslException {
- super(mechanism, null, protocol, serverName, SASL_PROPERTIES, null,
- transport);
- if (wrapUgi) {
- // If we don't set the configuration, the UGI will be created based on
- // what's on the classpath, which may lack the kerberos changes we require
- UserGroupInformation.setConfiguration(conf);
- ugi = UserGroupInformation.getLoginUser();
- }
- }
-
- // open the SASL transport with using the current UserGroupInformation
- // This is needed to get the current login context stored
- @Override
- public void open() throws TTransportException {
- if (ugi == null) {
- baseOpen();
- } else {
- try {
- if (ugi.isFromKeytab()) {
- ugi.checkTGTAndReloginFromKeytab();
- }
- ugi.doAs(new PrivilegedExceptionAction<Void>() {
- public Void run() throws TTransportException {
- baseOpen();
- return null;
- }
- });
- } catch (IOException e) {
- throw new TTransportException("Failed to open SASL transport: " + e.getMessage(), e);
- } catch (InterruptedException e) {
- throw new TTransportException(
- "Interrupted while opening underlying transport: " + e.getMessage(), e);
- }
- }
- }
-
- private void baseOpen() throws TTransportException {
- super.open();
- }
- }
-
- /**
- * Initialize the object based on the sentry configuration provided.
- * List of configured servers are reordered randomly preventing all
- * clients connecting to the same server.
- *
- * @param conf Sentry configuration
- * @param type Type indicates the service type
- */
- public SentryServiceClientTransportDefaultImpl(Configuration conf,
- sentryClientType type) throws IOException {
-
- this.conf = conf;
- Preconditions.checkNotNull(this.conf, "Configuration object cannot be null");
- serverPrincipalParts = null;
- if (type == sentryClientType.POLICY_CLIENT) {
- transportConfig = new SentryPolicyClientTransportConfig();
- } else {
- transportConfig = new SentryHDFSClientTransportConfig();
- }
-
- try {
- String hostsAndPortsStr;
- this.connectionTimeout = transportConfig.getServerRpcConnTimeoutInMs(conf);
- this.connectionFullRetryTotal = transportConfig.getSentryFullRetryTotal(conf);
- this.rpcRetryTotal = transportConfig.getSentryRpcRetryTotal(conf);
- this.kerberos = transportConfig.isKerberosEnabled(conf);
-
- hostsAndPortsStr = transportConfig.getSentryServerRpcAddress(conf);
-
- int serverPort = transportConfig.getServerRpcPort(conf);
-
- String[] hostsAndPortsStrArr = hostsAndPortsStr.split(",");
- HostAndPort[] hostsAndPorts = ThriftUtil.parseHostPortStrings(hostsAndPortsStrArr, serverPort);
-
- this.endpoints = new ArrayList(hostsAndPortsStrArr.length);
- for (HostAndPort endpoint : hostsAndPorts) {
- this.endpoints.add(
- new InetSocketAddress(endpoint.getHostText(), endpoint.getPort()));
- LOGGER.debug("Added server endpoint: " + endpoint.toString());
- }
-
- // Reorder endpoints randomly to prevent all clients connecting to the same endpoint
- // at the same time after a node failure.
- Collections.shuffle(endpoints);
- serverAddress = null;
- connectWithRetry(false);
- } catch (Exception e) {
- throw new RuntimeException("Client Creation Failed: " + e.getMessage(), e);
- }
- }
-
- /**
- * Initialize object based on the parameters provided provided.
- *
- * @param addr Host address which the client needs to connect
- * @param port Host Port which the client needs to connect
- * @param conf Sentry configuration
- * @param type Type indicates the service type
- */
- public SentryServiceClientTransportDefaultImpl(String addr, int port, Configuration conf,
- sentryClientType type) throws IOException {
- // copy the configuration because we may make modifications to it.
- this.conf = new Configuration(conf);
- serverPrincipalParts = null;
- Preconditions.checkNotNull(this.conf, "Configuration object cannot be null");
- if (type == sentryClientType.POLICY_CLIENT) {
- transportConfig = new SentryPolicyClientTransportConfig();
- } else {
- transportConfig = new SentryHDFSClientTransportConfig();
- }
-
- try {
- InetSocketAddress serverAddress = NetUtils.createSocketAddr(addr, port);
- this.connectionTimeout = transportConfig.getServerRpcConnTimeoutInMs(conf);
- this.rpcRetryTotal = transportConfig.getSentryRpcRetryTotal(conf);
- this.connectionFullRetryTotal = transportConfig.getSentryFullRetryTotal(conf);
- this.kerberos = transportConfig.isKerberosEnabled(conf);
- connect(serverAddress);
- } catch (MissingConfigurationException e) {
- throw new RuntimeException("Client Creation Failed: " + e.getMessage(), e);
- }
- endpoints = null;
- }
-
-
- /**
- * no-op when already connected.
- * On connection error, Iterates through all the configured servers and tries to connect.
- * On successful connection, control returns
- * On connection failure, continues iterating through all the configured sentry servers,
- * and then retries the whole server list no more than connectionFullRetryTotal times.
- * In this case, it won't introduce more latency when some server fails.
- * <p>
- * TODO: Add metrics for the number of successful connects and errors per client, and total number of retries.
- */
- public synchronized void connectWithRetry(boolean tryAlternateServer) throws IOException {
- if (isConnected() && (!tryAlternateServer)) {
- return;
- }
-
- IOException currentException = null;
- for (int retryCount = 0; retryCount < connectionFullRetryTotal; retryCount++) {
- try {
- connectToAvailableServer();
- return;
- } catch (IOException e) {
- currentException = e;
- LOGGER.error(
- String.format("Failed to connect to all the configured sentry servers, " +
- "Retrying again"));
- }
- }
- // Throw exception as reaching the max full connectWithRetry number.
- LOGGER.error(
- String.format("Reach the max connection retry num %d ", connectionFullRetryTotal),
- currentException);
- throw currentException;
- }
-
- /**
- * Iterates through all the configured servers and tries to connect.
- * On connection error, tries to connect to next server.
- * Control returns on successful connection OR it's done trying to all the
- * configured servers.
- *
- * @throws IOException
- */
- private void connectToAvailableServer() throws IOException {
- IOException currentException = null;
- if (endpoints.size() == 1) {
- connect(endpoints.get(0));
- return;
- }
-
- for (InetSocketAddress addr : endpoints) {
- try {
- serverAddress = addr;
- connect(serverAddress);
- LOGGER.info(String.format("Connected to SentryServer: %s", addr.toString()));
- return;
- } catch (IOException e) {
- LOGGER.error(String.format("Failed connection to %s: %s",
- addr.toString(), e.getMessage()), e);
- currentException = e;
- }
- }
- throw currentException;
- }
-
- /**
- * Connect to the specified socket address and throw IOException if failed.
- *
- * @param serverAddress Address client needs to connect
- * @throws Exception if there is failure in establishing the connection.
- */
- protected void connect(InetSocketAddress serverAddress) throws IOException {
- try {
- transport = createTransport(serverAddress);
- transport.open();
- } catch (TTransportException e) {
- throw new IOException("Failed to open transport: " + e.getMessage(), e);
- } catch (MissingConfigurationException e) {
- throw new RuntimeException(e.getMessage(), e);
- }
-
- LOGGER.debug("Successfully opened transport: " + transport + " to " + serverAddress);
- }
-
- /**
- * New socket is is created
- *
- * @param serverAddress
- * @return
- * @throws TTransportException
- * @throws MissingConfigurationException
- * @throws IOException
- */
- private TTransport createTransport(InetSocketAddress serverAddress)
- throws TTransportException, MissingConfigurationException, IOException {
- TTransport socket = new TSocket(serverAddress.getHostName(),
- serverAddress.getPort(), connectionTimeout);
-
- if (kerberos) {
- String serverPrincipal = transportConfig.getSentryPrincipal(conf);
- serverPrincipal = SecurityUtil.getServerPrincipal(serverPrincipal, serverAddress.getAddress());
- LOGGER.debug("Using server kerberos principal: " + serverPrincipal);
- if (serverPrincipalParts == null) {
- serverPrincipalParts = SaslRpcServer.splitKerberosName(serverPrincipal);
- Preconditions.checkArgument(serverPrincipalParts.length == 3,
- "Kerberos principal should have 3 parts: " + serverPrincipal);
- }
-
- boolean wrapUgi = transportConfig.useUserGroupInformation(conf);
- return new UgiSaslClientTransport(SaslRpcServer.AuthMethod.KERBEROS.getMechanismName(),
- serverPrincipalParts[0], serverPrincipalParts[1],
- socket, wrapUgi, conf);
- } else {
- return socket;
- }
- }
-
- private boolean isConnected() {
- return transport != null && transport.isOpen();
- }
-
- public synchronized void close() {
- if (isConnected()) {
- transport.close();
- }
- }
-
- public int getRetryCount() {
- return rpcRetryTotal;
- }
-}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/sentry/blob/c9c0119f/sentry-core/sentry-core-common/src/main/java/org/apache/sentry/core/common/transport/SentrySocket.java
----------------------------------------------------------------------
diff --git a/sentry-core/sentry-core-common/src/main/java/org/apache/sentry/core/common/transport/SentrySocket.java b/sentry-core/sentry-core-common/src/main/java/org/apache/sentry/core/common/transport/SentrySocket.java
new file mode 100644
index 0000000..3374489
--- /dev/null
+++ b/sentry-core/sentry-core-common/src/main/java/org/apache/sentry/core/common/transport/SentrySocket.java
@@ -0,0 +1,32 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.sentry.core.common.transport;
+
+import java.io.IOException;
+
+/**
+ * General representation of transport connection to Sentry
+ */
+public interface SentrySocket extends AutoCloseable {
+ /**
+ * Connect to the Sentry server
+ * @throws IOException
+ */
+ void connect() throws IOException;
+}
http://git-wip-us.apache.org/repos/asf/sentry/blob/c9c0119f/sentry-core/sentry-core-common/src/main/java/org/apache/sentry/core/common/transport/SentryTransportFactory.java
----------------------------------------------------------------------
diff --git a/sentry-core/sentry-core-common/src/main/java/org/apache/sentry/core/common/transport/SentryTransportFactory.java b/sentry-core/sentry-core-common/src/main/java/org/apache/sentry/core/common/transport/SentryTransportFactory.java
new file mode 100644
index 0000000..74ac92d
--- /dev/null
+++ b/sentry-core/sentry-core-common/src/main/java/org/apache/sentry/core/common/transport/SentryTransportFactory.java
@@ -0,0 +1,234 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.sentry.core.common.transport;
+
+import com.google.common.base.Preconditions;
+import com.google.common.collect.ImmutableMap;
+import com.google.common.net.HostAndPort;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.security.SaslRpcServer;
+import org.apache.hadoop.security.SecurityUtil;
+import org.apache.hadoop.security.UserGroupInformation;
+import org.apache.sentry.core.common.exception.MissingConfigurationException;
+import org.apache.sentry.core.common.utils.ThriftUtil;
+import org.apache.thrift.transport.TSaslClientTransport;
+import org.apache.thrift.transport.TSocket;
+import org.apache.thrift.transport.TTransport;
+import org.apache.thrift.transport.TTransportException;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.security.sasl.Sasl;
+import javax.security.sasl.SaslException;
+import java.io.IOException;
+import java.net.InetSocketAddress;
+import java.security.PrivilegedExceptionAction;
+import java.util.ArrayList;
+import java.util.Collections;
+
+/**
+ * Generate Thrift transports suitable for talking to Sentry
+ */
+public final class SentryTransportFactory {
+ private static final Logger LOGGER = LoggerFactory.getLogger(SentryTransportFactory.class);
+
+ private final Configuration conf;
+ private final SentryClientTransportConfigInterface transportConfig;
+ private final ArrayList<InetSocketAddress> endpoints;
+
+ public SentryTransportFactory(Configuration conf,
+ SentryClientTransportConfigInterface configInterface) {
+ this.conf = conf;
+ this.transportConfig = configInterface;
+ String hostsAndPortsStr = transportConfig.getSentryServerRpcAddress(conf);
+ int serverPort = transportConfig.getServerRpcPort(conf);
+
+ String[] hostsAndPortsStrArr = hostsAndPortsStr.split(",");
+ HostAndPort[] hostsAndPorts = ThriftUtil.parseHostPortStrings(hostsAndPortsStrArr, serverPort);
+ this.endpoints = new ArrayList<>(hostsAndPortsStrArr.length);
+ for (HostAndPort endpoint : hostsAndPorts) {
+ this.endpoints.add(
+ new InetSocketAddress(endpoint.getHostText(), endpoint.getPort()));
+ LOGGER.debug("Added server endpoint: " + endpoint.toString());
+ }
+ // Reorder endpoints randomly to prevent all clients connecting to the same endpoint
+ // at the same time after a node failure.
+ if (endpoints.size() > 1) {
+ Collections.shuffle(endpoints);
+ }
+ }
+
+ /**
+ * This transport wraps the Sasl transports to set up the right UGI context for open().
+ */
+ private static final class UgiSaslClientTransport extends TSaslClientTransport {
+ private static final ImmutableMap<String, String> SASL_PROPERTIES =
+ ImmutableMap.of(Sasl.SERVER_AUTH, "true", Sasl.QOP, "auth-conf");
+
+ private UserGroupInformation ugi = null;
+
+ private UgiSaslClientTransport(String mechanism, String protocol,
+ String serverName, TTransport transport,
+ boolean wrapUgi, Configuration conf)
+ throws IOException, SaslException {
+ super(mechanism, null, protocol, serverName, SASL_PROPERTIES, null,
+ transport);
+ if (wrapUgi) {
+ // If we don't set the configuration, the UGI will be created based on
+ // what's on the classpath, which may lack the kerberos changes we require
+ UserGroupInformation.setConfiguration(conf);
+ ugi = UserGroupInformation.getLoginUser();
+ }
+ }
+
+ // open the SASL transport with using the current UserGroupInformation
+ // This is needed to get the current login context stored
+ @Override
+ public void open() throws TTransportException {
+ if (ugi == null) {
+ baseOpen();
+ } else {
+ try {
+ if (ugi.isFromKeytab()) {
+ ugi.checkTGTAndReloginFromKeytab();
+ }
+ ugi.doAs(new PrivilegedExceptionAction<Void>() {
+ public Void run() throws TTransportException {
+ baseOpen();
+ return null;
+ }
+ });
+ } catch (IOException e) {
+ throw new TTransportException("Failed to open SASL transport: " + e.getMessage(), e);
+ } catch (InterruptedException e) {
+ throw new TTransportException(
+ "Interrupted while opening underlying transport: " + e.getMessage(), e);
+ }
+ }
+ }
+
+ private void baseOpen() throws TTransportException {
+ super.open();
+ }
+ }
+
+ /**
+ * On connection error, Iterates through all the configured servers and tries to connect.
+ * On successful connection, control returns
+ * On connection failure, continues iterating through all the configured sentry servers,
+ * and then retries the whole server list no more than connectionFullRetryTotal times.
+ * In this case, it won't introduce more latency when some server fails.
+ * <p>
+ * TODO: Add metrics for the number of successful connects and errors per client, and total number of retries.
+ */
+ public TTransport connect() throws IOException {
+ int connectionFullRetryTotal = transportConfig.getSentryFullRetryTotal(conf);
+ IOException currentException = null;
+ for (int retryCount = 0; retryCount < connectionFullRetryTotal; retryCount++) {
+ try {
+ return connectToAvailableServer();
+ } catch (IOException e) {
+ currentException = e;
+ LOGGER.error(
+ String.format("Failed to connect to all the configured sentry servers, " +
+ "Retrying again"));
+ }
+ }
+ // Throw exception as reaching the max full connectWithRetry number.
+ LOGGER.error(
+ String.format("Reach the max connection retry num %d ", connectionFullRetryTotal),
+ currentException);
+ throw currentException;
+ }
+
+ /**
+ * Iterates through all the configured servers and tries to connect.
+ * On connection error, tries to connect to next server.
+ * Control returns on successful connection OR it's done trying to all the
+ * configured servers.
+ *
+ * @throws IOException
+ */
+ private TTransport connectToAvailableServer() throws IOException {
+ IOException currentException = null;
+ for (InetSocketAddress addr : endpoints) {
+ try {
+ return connect(addr);
+ } catch (IOException e) {
+ LOGGER.error(String.format("Failed connection to %s: %s",
+ addr.toString(), e.getMessage()), e);
+ currentException = e;
+ }
+ }
+ if (currentException != null) {
+ throw currentException;
+ }
+ return null;
+ }
+
+ /**
+ * Connect to the specified socket address and throw IOException if failed.
+ *
+ * @param serverAddress Address client needs to connect
+ * @throws Exception if there is failure in establishing the connection.
+ */
+ protected TTransport connect(InetSocketAddress serverAddress) throws IOException {
+ try {
+ TTransport transport = createTransport(serverAddress);
+ transport.open();
+ LOGGER.info(String.format("Connected to SentryServer: %s", serverAddress));
+ return transport;
+ } catch (TTransportException e) {
+ throw new IOException("Failed to open transport: " + e.getMessage(), e);
+ } catch (MissingConfigurationException e) {
+ throw new RuntimeException(e.getMessage(), e);
+ }
+ }
+
+ /**
+ * New socket is is created
+ *
+ * @param serverAddress
+ * @return
+ * @throws TTransportException
+ * @throws MissingConfigurationException
+ * @throws IOException
+ */
+ private TTransport createTransport(InetSocketAddress serverAddress)
+ throws TTransportException, MissingConfigurationException, IOException {
+ TTransport socket = new TSocket(serverAddress.getHostName(),
+ serverAddress.getPort(), transportConfig.getServerRpcConnTimeoutInMs(conf));
+
+ if (!transportConfig.isKerberosEnabled(conf)) {
+ return socket;
+ }
+
+ String serverPrincipal = transportConfig.getSentryPrincipal(conf);
+ serverPrincipal = SecurityUtil.getServerPrincipal(serverPrincipal, serverAddress.getAddress());
+ LOGGER.debug("Using server kerberos principal: " + serverPrincipal);
+ String[] serverPrincipalParts = SaslRpcServer.splitKerberosName(serverPrincipal);
+ Preconditions.checkArgument(serverPrincipalParts.length == 3,
+ "Kerberos principal should have 3 parts: " + serverPrincipal);
+
+ boolean wrapUgi = transportConfig.useUserGroupInformation(conf);
+ return new UgiSaslClientTransport(SaslRpcServer.AuthMethod.KERBEROS.getMechanismName(),
+ serverPrincipalParts[0], serverPrincipalParts[1],
+ socket, wrapUgi, conf);
+ }
+}
http://git-wip-us.apache.org/repos/asf/sentry/blob/c9c0119f/sentry-hdfs/sentry-hdfs-service/src/main/java/org/apache/sentry/hdfs/SentryHDFSServiceClient.java
----------------------------------------------------------------------
diff --git a/sentry-hdfs/sentry-hdfs-service/src/main/java/org/apache/sentry/hdfs/SentryHDFSServiceClient.java b/sentry-hdfs/sentry-hdfs-service/src/main/java/org/apache/sentry/hdfs/SentryHDFSServiceClient.java
index faac053..11f6894 100644
--- a/sentry-hdfs/sentry-hdfs-service/src/main/java/org/apache/sentry/hdfs/SentryHDFSServiceClient.java
+++ b/sentry-hdfs/sentry-hdfs-service/src/main/java/org/apache/sentry/hdfs/SentryHDFSServiceClient.java
@@ -18,9 +18,8 @@
package org.apache.sentry.hdfs;
import org.apache.sentry.core.common.exception.SentryHdfsServiceException;
-import org.apache.sentry.core.common.transport.SentryServiceClient;
-public interface SentryHDFSServiceClient extends SentryServiceClient {
+public interface SentryHDFSServiceClient {
String SENTRY_HDFS_SERVICE_NAME = "SentryHDFSService";
void notifyHMSUpdate(PathsUpdate update)
@@ -30,5 +29,7 @@ public interface SentryHDFSServiceClient extends SentryServiceClient {
SentryAuthzUpdate getAllUpdatesFrom(long permSeqNum, long pathSeqNum)
throws SentryHdfsServiceException;
+
+ void close();
}
http://git-wip-us.apache.org/repos/asf/sentry/blob/c9c0119f/sentry-hdfs/sentry-hdfs-service/src/main/java/org/apache/sentry/hdfs/SentryHDFSServiceClientDefaultImpl.java
----------------------------------------------------------------------
diff --git a/sentry-hdfs/sentry-hdfs-service/src/main/java/org/apache/sentry/hdfs/SentryHDFSServiceClientDefaultImpl.java b/sentry-hdfs/sentry-hdfs-service/src/main/java/org/apache/sentry/hdfs/SentryHDFSServiceClientDefaultImpl.java
index d337319..794aded 100644
--- a/sentry-hdfs/sentry-hdfs-service/src/main/java/org/apache/sentry/hdfs/SentryHDFSServiceClientDefaultImpl.java
+++ b/sentry-hdfs/sentry-hdfs-service/src/main/java/org/apache/sentry/hdfs/SentryHDFSServiceClientDefaultImpl.java
@@ -18,12 +18,13 @@
package org.apache.sentry.hdfs;
import java.io.IOException;
-import java.net.InetSocketAddress;
import java.util.LinkedList;
import org.apache.hadoop.conf.Configuration;
import org.apache.sentry.core.common.exception.SentryHdfsServiceException;
-import org.apache.sentry.core.common.transport.SentryServiceClientTransportDefaultImpl;
+import org.apache.sentry.core.common.transport.SentryClientTransportConfigInterface;
+import org.apache.sentry.core.common.transport.SentrySocket;
+import org.apache.sentry.core.common.transport.SentryTransportFactory;
import org.apache.sentry.hdfs.service.thrift.SentryHDFSService;
import org.apache.sentry.hdfs.service.thrift.SentryHDFSService.Client;
import org.apache.sentry.hdfs.service.thrift.TAuthzUpdateResponse;
@@ -34,6 +35,7 @@ import org.apache.thrift.protocol.TCompactProtocol;
import org.apache.thrift.protocol.TMultiplexedProtocol;
import org.apache.thrift.protocol.TProtocol;
+import org.apache.thrift.transport.TTransport;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -47,28 +49,41 @@ import org.slf4j.LoggerFactory;
*/
-public class SentryHDFSServiceClientDefaultImpl extends SentryServiceClientTransportDefaultImpl implements SentryHDFSServiceClient {
+public class SentryHDFSServiceClientDefaultImpl
+ implements SentryHDFSServiceClient, SentrySocket {
private static final Logger LOGGER = LoggerFactory.getLogger(SentryHDFSServiceClientDefaultImpl.class);
+ private final Configuration conf;
private Client client;
+ private SentryTransportFactory transportFactory;
+ private TTransport transport;
- public SentryHDFSServiceClientDefaultImpl(Configuration conf) throws IOException {
- super(conf, sentryClientType.HDFS_CLIENT);
+
+
+ SentryHDFSServiceClientDefaultImpl(Configuration conf,
+ SentryClientTransportConfigInterface transportConfig)
+ throws IOException {
+ this.conf = conf;
+ transportFactory = new SentryTransportFactory(conf, transportConfig);
}
/**
* Connect to the specified socket address and then use the new socket
* to construct new thrift client.
*
- * @param serverAddress: socket address to which the client should connect.
* @throws IOException
*/
- public void connect(InetSocketAddress serverAddress) throws IOException {
- TProtocol tProtocol = null;
- super.connect(serverAddress);
+ @Override
+ public void connect() throws IOException {
+ if (isOpen()) {
+ return;
+ }
+
+ transport = transportFactory.connect();
long maxMessageSize = conf.getLong(ServiceConstants.ClientConfig.SENTRY_HDFS_THRIFT_MAX_MESSAGE_SIZE,
ServiceConstants.ClientConfig.SENTRY_HDFS_THRIFT_MAX_MESSAGE_SIZE_DEFAULT);
+ TProtocol tProtocol = null;
if (conf.getBoolean(ServiceConstants.ClientConfig.USE_COMPACT_TRANSPORT,
ServiceConstants.ClientConfig.USE_COMPACT_TRANSPORT_DEFAULT)) {
tProtocol = new TCompactProtocol(transport, maxMessageSize, maxMessageSize);
@@ -119,4 +134,14 @@ public class SentryHDFSServiceClientDefaultImpl extends SentryServiceClientTrans
}
return retVal;
}
+
+ @Override
+ public void close() {
+ transport.close();
+ transport = null;
+ }
+
+ private boolean isOpen() {
+ return ((transport != null) && transport.isOpen());
+ }
}
http://git-wip-us.apache.org/repos/asf/sentry/blob/c9c0119f/sentry-hdfs/sentry-hdfs-service/src/main/java/org/apache/sentry/hdfs/SentryHDFSServiceClientFactory.java
----------------------------------------------------------------------
diff --git a/sentry-hdfs/sentry-hdfs-service/src/main/java/org/apache/sentry/hdfs/SentryHDFSServiceClientFactory.java b/sentry-hdfs/sentry-hdfs-service/src/main/java/org/apache/sentry/hdfs/SentryHDFSServiceClientFactory.java
index 59ac360..174da4f 100644
--- a/sentry-hdfs/sentry-hdfs-service/src/main/java/org/apache/sentry/hdfs/SentryHDFSServiceClientFactory.java
+++ b/sentry-hdfs/sentry-hdfs-service/src/main/java/org/apache/sentry/hdfs/SentryHDFSServiceClientFactory.java
@@ -21,12 +21,16 @@ import java.lang.reflect.Proxy;
import org.apache.hadoop.conf.Configuration;
import org.apache.sentry.core.common.transport.RetryClientInvocationHandler;
+import org.apache.sentry.core.common.transport.SentryClientTransportConfigInterface;
+import org.apache.sentry.core.common.transport.SentryHDFSClientTransportConfig;
/**
* Client factory to create normal client or proxy with HA invocation handler
*/
public class SentryHDFSServiceClientFactory {
-
+ private static final SentryClientTransportConfigInterface transportConfig =
+ new SentryHDFSClientTransportConfig();
+
private SentryHDFSServiceClientFactory() {
// Make constructor private to avoid instantiation
}
@@ -36,7 +40,8 @@ public class SentryHDFSServiceClientFactory {
return (SentryHDFSServiceClient) Proxy
.newProxyInstance(SentryHDFSServiceClientDefaultImpl.class.getClassLoader(),
SentryHDFSServiceClientDefaultImpl.class.getInterfaces(),
- new RetryClientInvocationHandler(conf,
- new SentryHDFSServiceClientDefaultImpl(conf)));
+ new RetryClientInvocationHandler(conf, transportConfig,
+ new SentryHDFSServiceClientDefaultImpl(conf,
+ transportConfig)));
}
}
http://git-wip-us.apache.org/repos/asf/sentry/blob/c9c0119f/sentry-provider/sentry-provider-db/src/main/java/org/apache/sentry/provider/db/generic/service/thrift/SentryGenericServiceClient.java
----------------------------------------------------------------------
diff --git a/sentry-provider/sentry-provider-db/src/main/java/org/apache/sentry/provider/db/generic/service/thrift/SentryGenericServiceClient.java b/sentry-provider/sentry-provider-db/src/main/java/org/apache/sentry/provider/db/generic/service/thrift/SentryGenericServiceClient.java
index c832706..11cdee7 100644
--- a/sentry-provider/sentry-provider-db/src/main/java/org/apache/sentry/provider/db/generic/service/thrift/SentryGenericServiceClient.java
+++ b/sentry-provider/sentry-provider-db/src/main/java/org/apache/sentry/provider/db/generic/service/thrift/SentryGenericServiceClient.java
@@ -24,9 +24,8 @@ import java.util.Set;
import org.apache.sentry.core.common.exception.SentryUserException;
import org.apache.sentry.core.common.ActiveRoleSet;
import org.apache.sentry.core.common.Authorizable;
-import org.apache.sentry.core.common.transport.SentryServiceClient;
-public interface SentryGenericServiceClient extends SentryServiceClient {
+public interface SentryGenericServiceClient {
/**
* Create a sentry role
@@ -192,4 +191,6 @@ public interface SentryGenericServiceClient extends SentryServiceClient {
Map<String, TSentryPrivilegeMap> listPrivilegsbyAuthorizable(String component,
String serviceName, String requestorUserName, Set<String> authorizablesSet,
Set<String> groups, ActiveRoleSet roleSet) throws SentryUserException;
+
+ void close();
}
http://git-wip-us.apache.org/repos/asf/sentry/blob/c9c0119f/sentry-provider/sentry-provider-db/src/main/java/org/apache/sentry/provider/db/generic/service/thrift/SentryGenericServiceClientDefaultImpl.java
----------------------------------------------------------------------
diff --git a/sentry-provider/sentry-provider-db/src/main/java/org/apache/sentry/provider/db/generic/service/thrift/SentryGenericServiceClientDefaultImpl.java b/sentry-provider/sentry-provider-db/src/main/java/org/apache/sentry/provider/db/generic/service/thrift/SentryGenericServiceClientDefaultImpl.java
index 9bbd736..c9d0357 100644
--- a/sentry-provider/sentry-provider-db/src/main/java/org/apache/sentry/provider/db/generic/service/thrift/SentryGenericServiceClientDefaultImpl.java
+++ b/sentry-provider/sentry-provider-db/src/main/java/org/apache/sentry/provider/db/generic/service/thrift/SentryGenericServiceClientDefaultImpl.java
@@ -18,18 +18,16 @@
package org.apache.sentry.provider.db.generic.service.thrift;
import java.io.IOException;
-import java.net.InetSocketAddress;
import java.util.*;
import org.apache.hadoop.conf.Configuration;
-import static org.apache.hadoop.fs.CommonConfigurationKeysPublic.HADOOP_SECURITY_AUTHENTICATION;
-
import org.apache.sentry.core.common.exception.SentryUserException;
import org.apache.sentry.core.common.ActiveRoleSet;
import org.apache.sentry.core.common.Authorizable;
-import org.apache.sentry.core.common.transport.SentryServiceClientTransportDefaultImpl;
-import org.apache.sentry.core.common.utils.SentryConstants;
+import org.apache.sentry.core.common.transport.SentryClientTransportConfigInterface;
+import org.apache.sentry.core.common.transport.SentrySocket;
+import org.apache.sentry.core.common.transport.SentryTransportFactory;
import org.apache.sentry.core.model.db.AccessConstants;
import org.apache.sentry.service.thrift.ServiceConstants;
import org.apache.sentry.service.thrift.Status;
@@ -38,6 +36,7 @@ import org.apache.thrift.TException;
import org.apache.thrift.protocol.TBinaryProtocol;
import org.apache.thrift.protocol.TMultiplexedProtocol;
+import org.apache.thrift.transport.TTransport;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -52,30 +51,48 @@ import com.google.common.collect.Lists;
So it is important to close and re-open the transport so that new socket is used.
*/
-public class SentryGenericServiceClientDefaultImpl extends SentryServiceClientTransportDefaultImpl implements SentryGenericServiceClient {
+public class SentryGenericServiceClientDefaultImpl
+ implements SentryGenericServiceClient, SentrySocket {
+ private final SentryTransportFactory transportFactory;
+ private final Configuration conf;
+ private TTransport transport;
+
+
private SentryGenericPolicyService.Client client;
private static final Logger LOGGER = LoggerFactory
.getLogger(SentryGenericServiceClientDefaultImpl.class);
private static final String THRIFT_EXCEPTION_MESSAGE = "Thrift exception occured ";
- public SentryGenericServiceClientDefaultImpl(Configuration conf) throws IOException {
- super(conf, sentryClientType.POLICY_CLIENT);
+ public SentryGenericServiceClientDefaultImpl(Configuration conf,
+ SentryClientTransportConfigInterface transportConfig)
+ throws IOException {
+ this.conf = conf;
+ transportFactory = new SentryTransportFactory(conf, transportConfig);
+
+ // TODO - do it correctly
+ /*
if (kerberos) {
// since the client uses hadoop-auth, we need to set kerberos in
// hadoop-auth if we plan to use kerberos
conf.set(HADOOP_SECURITY_AUTHENTICATION, SentryConstants.KERBEROS_MODE);
}
+ */
}
/**
* Connect to the specified socket address and then use the new socket
* to construct new thrift client.
*
- * @param serverAddress: socket address to which the client should connect.
* @throws IOException
*/
- public void connect(InetSocketAddress serverAddress) throws IOException {
- super.connect(serverAddress);
+ @Override
+ public void connect() throws IOException {
+ if (isOpen()) {
+ return;
+ }
+
+ transport = transportFactory.connect();
+
long maxMessageSize = conf.getLong(ServiceConstants.ClientConfig.SENTRY_POLICY_CLIENT_THRIFT_MAX_MESSAGE_SIZE,
ServiceConstants.ClientConfig.SENTRY_POLICY_CLIENT_THRIFT_MAX_MESSAGE_SIZE_DEFAULT);
TMultiplexedProtocol protocol = new TMultiplexedProtocol(
@@ -84,6 +101,12 @@ public class SentryGenericServiceClientDefaultImpl extends SentryServiceClientTr
client = new SentryGenericPolicyService.Client(protocol);
LOGGER.debug("Successfully created client");
}
+
+ private boolean isOpen() {
+ return ((transport != null) && transport.isOpen());
+ }
+
+
/**
* Create a sentry role
*
@@ -506,4 +529,9 @@ public class SentryGenericServiceClientDefaultImpl extends SentryServiceClientTr
throw new SentryUserException(THRIFT_EXCEPTION_MESSAGE, e);
}
}
+
+ @Override
+ public void close() {
+ transport.close();
+ }
}
http://git-wip-us.apache.org/repos/asf/sentry/blob/c9c0119f/sentry-provider/sentry-provider-db/src/main/java/org/apache/sentry/provider/db/generic/service/thrift/SentryGenericServiceClientFactory.java
----------------------------------------------------------------------
diff --git a/sentry-provider/sentry-provider-db/src/main/java/org/apache/sentry/provider/db/generic/service/thrift/SentryGenericServiceClientFactory.java b/sentry-provider/sentry-provider-db/src/main/java/org/apache/sentry/provider/db/generic/service/thrift/SentryGenericServiceClientFactory.java
index 1c582f0..9132449 100644
--- a/sentry-provider/sentry-provider-db/src/main/java/org/apache/sentry/provider/db/generic/service/thrift/SentryGenericServiceClientFactory.java
+++ b/sentry-provider/sentry-provider-db/src/main/java/org/apache/sentry/provider/db/generic/service/thrift/SentryGenericServiceClientFactory.java
@@ -19,6 +19,8 @@ package org.apache.sentry.provider.db.generic.service.thrift;
import org.apache.hadoop.conf.Configuration;
import org.apache.sentry.core.common.transport.RetryClientInvocationHandler;
+import org.apache.sentry.core.common.transport.SentryClientTransportConfigInterface;
+import org.apache.sentry.core.common.transport.SentryPolicyClientTransportConfig;
import java.lang.reflect.Proxy;
@@ -26,6 +28,8 @@ import java.lang.reflect.Proxy;
* SentryGenericServiceClientFactory is a public class for the components which using Generic Model to create sentry client.
*/
public final class SentryGenericServiceClientFactory {
+ private static final SentryClientTransportConfigInterface transportConfig =
+ new SentryPolicyClientTransportConfig();
private SentryGenericServiceClientFactory() {
}
@@ -34,8 +38,8 @@ public final class SentryGenericServiceClientFactory {
return (SentryGenericServiceClient) Proxy
.newProxyInstance(SentryGenericServiceClientDefaultImpl.class.getClassLoader(),
SentryGenericServiceClientDefaultImpl.class.getInterfaces(),
- new RetryClientInvocationHandler(conf,
- new SentryGenericServiceClientDefaultImpl(conf)));
+ new RetryClientInvocationHandler(conf, transportConfig,
+ new SentryGenericServiceClientDefaultImpl(conf, transportConfig)));
}
}
http://git-wip-us.apache.org/repos/asf/sentry/blob/c9c0119f/sentry-provider/sentry-provider-db/src/main/java/org/apache/sentry/provider/db/service/thrift/SentryPolicyServiceClient.java
----------------------------------------------------------------------
diff --git a/sentry-provider/sentry-provider-db/src/main/java/org/apache/sentry/provider/db/service/thrift/SentryPolicyServiceClient.java b/sentry-provider/sentry-provider-db/src/main/java/org/apache/sentry/provider/db/service/thrift/SentryPolicyServiceClient.java
index 28c3e35..3b25db7 100644
--- a/sentry-provider/sentry-provider-db/src/main/java/org/apache/sentry/provider/db/service/thrift/SentryPolicyServiceClient.java
+++ b/sentry-provider/sentry-provider-db/src/main/java/org/apache/sentry/provider/db/service/thrift/SentryPolicyServiceClient.java
@@ -25,9 +25,8 @@ import java.util.Set;
import org.apache.sentry.core.common.exception.SentryUserException;
import org.apache.sentry.core.common.ActiveRoleSet;
import org.apache.sentry.core.common.Authorizable;
-import org.apache.sentry.core.common.transport.SentryServiceClient;
-public interface SentryPolicyServiceClient extends SentryServiceClient {
+public interface SentryPolicyServiceClient {
void createRole(String requestorUserName, String roleName) throws SentryUserException;
@@ -216,4 +215,6 @@ public interface SentryPolicyServiceClient extends SentryServiceClient {
// export the sentry mapping data with map structure
Map<String, Map<String, Set<String>>> exportPolicy(String requestorUserName, String objectPath)
throws SentryUserException;
+
+ void close();
}
http://git-wip-us.apache.org/repos/asf/sentry/blob/c9c0119f/sentry-provider/sentry-provider-db/src/main/java/org/apache/sentry/provider/db/service/thrift/SentryPolicyServiceClientDefaultImpl.java
----------------------------------------------------------------------
diff --git a/sentry-provider/sentry-provider-db/src/main/java/org/apache/sentry/provider/db/service/thrift/SentryPolicyServiceClientDefaultImpl.java b/sentry-provider/sentry-provider-db/src/main/java/org/apache/sentry/provider/db/service/thrift/SentryPolicyServiceClientDefaultImpl.java
index b4c1a5f..9eb60cc 100644
--- a/sentry-provider/sentry-provider-db/src/main/java/org/apache/sentry/provider/db/service/thrift/SentryPolicyServiceClientDefaultImpl.java
+++ b/sentry-provider/sentry-provider-db/src/main/java/org/apache/sentry/provider/db/service/thrift/SentryPolicyServiceClientDefaultImpl.java
@@ -19,7 +19,6 @@
package org.apache.sentry.provider.db.service.thrift;
import java.io.IOException;
-import java.net.InetSocketAddress;
import java.util.List;
import java.util.Map;
import java.util.Set;
@@ -31,6 +30,9 @@ import org.apache.hadoop.conf.Configuration;
import org.apache.sentry.core.common.exception.SentryUserException;
import org.apache.sentry.core.common.ActiveRoleSet;
import org.apache.sentry.core.common.Authorizable;
+import org.apache.sentry.core.common.transport.SentryClientTransportConfigInterface;
+import org.apache.sentry.core.common.transport.SentrySocket;
+import org.apache.sentry.core.common.transport.SentryTransportFactory;
import org.apache.sentry.core.model.db.AccessConstants;
import org.apache.sentry.core.model.db.DBModelAuthorizable;
import org.apache.sentry.core.common.utils.PolicyFileConstants;
@@ -42,7 +44,7 @@ import org.apache.sentry.service.thrift.Status;
import org.apache.thrift.TException;
import org.apache.thrift.protocol.TBinaryProtocol;
import org.apache.thrift.protocol.TMultiplexedProtocol;
-import org.apache.sentry.core.common.transport.SentryServiceClientTransportDefaultImpl;
+import org.apache.thrift.transport.TTransport;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -65,35 +67,43 @@ import com.google.common.collect.Sets;
server this is configured.
*/
-public class SentryPolicyServiceClientDefaultImpl extends SentryServiceClientTransportDefaultImpl implements SentryPolicyServiceClient {
+public class SentryPolicyServiceClientDefaultImpl
+ implements SentryPolicyServiceClient, SentrySocket {
+
+ private final Configuration conf;
private SentryPolicyService.Client client;
private static final Logger LOGGER = LoggerFactory
.getLogger(SentryPolicyServiceClient.class);
+ private SentryTransportFactory transportFactory;
+ private TTransport transport;
+
+
private static final String THRIFT_EXCEPTION_MESSAGE = "Thrift exception occurred ";
/**
* Initialize the sentry configurations.
*/
- public SentryPolicyServiceClientDefaultImpl(Configuration conf)
+ public SentryPolicyServiceClientDefaultImpl(Configuration conf,
+ SentryClientTransportConfigInterface transportConfig)
throws IOException {
- super(conf, sentryClientType.POLICY_CLIENT);
- }
-
- public SentryPolicyServiceClientDefaultImpl(String addr, int port,
- Configuration conf) throws IOException {
- super(addr, port, conf, sentryClientType.POLICY_CLIENT);
+ this.conf = conf;
+ this.transportFactory = new SentryTransportFactory(conf, transportConfig);
}
/**
* Connect to the specified socket address and then use the new socket
* to construct new thrift client.
*
- * @param serverAddress: socket address to which the client should connect.
* @throws IOException
*/
- public void connect(InetSocketAddress serverAddress) throws IOException {
- super.connect(serverAddress);
+ @Override
+ public void connect() throws IOException {
+ if (isOpen()) {
+ return;
+ }
+ transport = transportFactory.connect();
+
long maxMessageSize = conf.getLong(
ServiceConstants.ClientConfig.SENTRY_POLICY_CLIENT_THRIFT_MAX_MESSAGE_SIZE,
ServiceConstants.ClientConfig.SENTRY_POLICY_CLIENT_THRIFT_MAX_MESSAGE_SIZE_DEFAULT);
@@ -1008,4 +1018,14 @@ public class SentryPolicyServiceClientDefaultImpl extends SentryServiceClientTra
}
return rolePrivilegesMapForFile;
}
+
+ @Override
+ public void close() {
+ transport.close();
+ transport = null;
+ }
+
+ private boolean isOpen() {
+ return ((transport != null) && transport.isOpen());
+ }
}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/sentry/blob/c9c0119f/sentry-provider/sentry-provider-db/src/main/java/org/apache/sentry/service/thrift/SentryServiceClientFactory.java
----------------------------------------------------------------------
diff --git a/sentry-provider/sentry-provider-db/src/main/java/org/apache/sentry/service/thrift/SentryServiceClientFactory.java b/sentry-provider/sentry-provider-db/src/main/java/org/apache/sentry/service/thrift/SentryServiceClientFactory.java
index 745dc4c..55c51d3 100644
--- a/sentry-provider/sentry-provider-db/src/main/java/org/apache/sentry/service/thrift/SentryServiceClientFactory.java
+++ b/sentry-provider/sentry-provider-db/src/main/java/org/apache/sentry/service/thrift/SentryServiceClientFactory.java
@@ -23,29 +23,24 @@ import java.lang.reflect.Proxy;
import org.apache.hadoop.conf.Configuration;
import org.apache.sentry.core.common.transport.RetryClientInvocationHandler;
+import org.apache.sentry.core.common.transport.SentryClientTransportConfigInterface;
+import org.apache.sentry.core.common.transport.SentryPolicyClientTransportConfig;
import org.apache.sentry.provider.db.service.thrift.SentryPolicyServiceClient;
import org.apache.sentry.provider.db.service.thrift.SentryPolicyServiceClientDefaultImpl;
-import org.apache.sentry.service.thrift.ServiceConstants.ClientConfig;
public final class SentryServiceClientFactory {
+ private static final SentryClientTransportConfigInterface transportConfig =
+ new SentryPolicyClientTransportConfig();
+
private SentryServiceClientFactory() {
}
public static SentryPolicyServiceClient create(Configuration conf) throws Exception {
- boolean pooled = conf.getBoolean(
- ClientConfig.SENTRY_POOL_ENABLED, ClientConfig.SENTRY_POOL_ENABLED_DEFAULT);
- if (pooled) {
- return (SentryPolicyServiceClient) Proxy
- .newProxyInstance(SentryPolicyServiceClientDefaultImpl.class.getClassLoader(),
- SentryPolicyServiceClientDefaultImpl.class.getInterfaces(),
- new PoolClientInvocationHandler(conf));
- } else {
- return (SentryPolicyServiceClient) Proxy
- .newProxyInstance(SentryPolicyServiceClientDefaultImpl.class.getClassLoader(),
- SentryPolicyServiceClientDefaultImpl.class.getInterfaces(),
- new RetryClientInvocationHandler(conf,
- new SentryPolicyServiceClientDefaultImpl(conf)));
- }
+ return (SentryPolicyServiceClient) Proxy
+ .newProxyInstance(SentryPolicyServiceClientDefaultImpl.class.getClassLoader(),
+ SentryPolicyServiceClientDefaultImpl.class.getInterfaces(),
+ new RetryClientInvocationHandler(conf, transportConfig,
+ new SentryPolicyServiceClientDefaultImpl(conf, transportConfig)));
}
}
http://git-wip-us.apache.org/repos/asf/sentry/blob/c9c0119f/sentry-provider/sentry-provider-db/src/main/java/org/apache/sentry/service/thrift/SentryServiceClientPoolFactory.java
----------------------------------------------------------------------
diff --git a/sentry-provider/sentry-provider-db/src/main/java/org/apache/sentry/service/thrift/SentryServiceClientPoolFactory.java b/sentry-provider/sentry-provider-db/src/main/java/org/apache/sentry/service/thrift/SentryServiceClientPoolFactory.java
index 0164fa6..dd13e0d 100644
--- a/sentry-provider/sentry-provider-db/src/main/java/org/apache/sentry/service/thrift/SentryServiceClientPoolFactory.java
+++ b/sentry-provider/sentry-provider-db/src/main/java/org/apache/sentry/service/thrift/SentryServiceClientPoolFactory.java
@@ -23,9 +23,9 @@ import org.apache.commons.pool2.PooledObject;
import org.apache.commons.pool2.impl.DefaultPooledObject;
import org.apache.hadoop.conf.Configuration;
import org.apache.sentry.provider.db.service.thrift.SentryPolicyServiceClient;
-import org.apache.sentry.provider.db.service.thrift.SentryPolicyServiceClientDefaultImpl;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import sun.reflect.generics.reflectiveObjects.NotImplementedException;
/**
* SentryServiceClientPoolFactory is for connection pool to manage the object. Implement the related
@@ -36,21 +36,21 @@ public class SentryServiceClientPoolFactory extends BasePooledObjectFactory<Sent
private static final Logger LOGGER = LoggerFactory.getLogger(SentryServiceClientPoolFactory.class);
- private final String addr;
- private final int port;
- private final Configuration conf;
+ //private final String addr;
+ //private final int port;
+ //private final Configuration conf;
public SentryServiceClientPoolFactory(String addr, int port,
Configuration conf) {
- this.addr = addr;
- this.port = port;
- this.conf = conf;
+ LOGGER.debug("addr = " + addr + "port = " + String.valueOf(port) + " conf = ", conf.toString());
+ //this.addr = addr;
+ //this.port = port;
+ //this.conf = conf;
}
@Override
public SentryPolicyServiceClient create() throws Exception {
- LOGGER.debug("Creating Sentry Service Client...");
- return new SentryPolicyServiceClientDefaultImpl(addr, port, conf);
+ throw new NotImplementedException();
}
@Override
@@ -60,13 +60,6 @@ public class SentryServiceClientPoolFactory extends BasePooledObjectFactory<Sent
@Override
public void destroyObject(PooledObject<SentryPolicyServiceClient> pooledObject) {
- SentryPolicyServiceClient client = pooledObject.getObject();
- LOGGER.debug("Destroying Sentry Service Client: " + client);
- if (client != null) {
- // The close() of TSocket or TSaslClientTransport is called actually, and there has no
- // exception even there has some problems, eg, the client is closed already.
- // The close here is just try to close the socket and the client will be destroyed soon.
- client.close();
- }
+ throw new NotImplementedException();
}
}
[2/5] sentry git commit: SENTRY-1593
Posted by ak...@apache.org.
http://git-wip-us.apache.org/repos/asf/sentry/blob/e3d859a9/sentry-provider/sentry-provider-db/src/main/java/org/apache/sentry/provider/db/service/thrift/SentryPolicyServiceClientDefaultImpl.java
----------------------------------------------------------------------
diff --git a/sentry-provider/sentry-provider-db/src/main/java/org/apache/sentry/provider/db/service/thrift/SentryPolicyServiceClientDefaultImpl.java b/sentry-provider/sentry-provider-db/src/main/java/org/apache/sentry/provider/db/service/thrift/SentryPolicyServiceClientDefaultImpl.java
index 4284b53..b4c1a5f 100644
--- a/sentry-provider/sentry-provider-db/src/main/java/org/apache/sentry/provider/db/service/thrift/SentryPolicyServiceClientDefaultImpl.java
+++ b/sentry-provider/sentry-provider-db/src/main/java/org/apache/sentry/provider/db/service/thrift/SentryPolicyServiceClientDefaultImpl.java
@@ -6,9 +6,9 @@
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
@@ -20,25 +20,14 @@ package org.apache.sentry.provider.db.service.thrift;
import java.io.IOException;
import java.net.InetSocketAddress;
-import java.security.PrivilegedExceptionAction;
-import java.util.ArrayList;
-import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.Set;
+import java.util.Collections;
-import javax.security.auth.callback.CallbackHandler;
-import javax.security.sasl.Sasl;
-
-import com.google.common.collect.ImmutableMap;
-import com.google.common.net.HostAndPort;
import org.apache.commons.lang.StringUtils;
import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.net.NetUtils;
-import org.apache.hadoop.security.SaslRpcServer;
-import org.apache.hadoop.security.SecurityUtil;
-import org.apache.hadoop.security.UserGroupInformation;
-import org.apache.sentry.core.common.exception.MissingConfigurationException;
+
import org.apache.sentry.core.common.exception.SentryUserException;
import org.apache.sentry.core.common.ActiveRoleSet;
import org.apache.sentry.core.common.Authorizable;
@@ -53,16 +42,11 @@ import org.apache.sentry.service.thrift.Status;
import org.apache.thrift.TException;
import org.apache.thrift.protocol.TBinaryProtocol;
import org.apache.thrift.protocol.TMultiplexedProtocol;
-import org.apache.thrift.transport.TSaslClientTransport;
-import org.apache.thrift.transport.TSocket;
-import org.apache.thrift.transport.TTransport;
-import org.apache.thrift.transport.TTransportException;
-import org.apache.sentry.core.common.transport.SentryPolicyClientTransportConfig;
+import org.apache.sentry.core.common.transport.SentryServiceClientTransportDefaultImpl;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import com.google.common.annotations.VisibleForTesting;
-import com.google.common.base.Preconditions;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableSet;
import com.google.common.collect.Lists;
@@ -70,211 +54,58 @@ import com.google.common.collect.Maps;
import com.google.common.collect.Sets;
/*
+ Sentry Policy Service Client
+
+ The public implementation of SentryPolicyServiceClient.
A Sentry Client in which all the operations are synchronized for thread safety
Note: When using this client, if there is an exception in RPC, socket can get into an inconsistent state.
- So it is important to recreate the client, which uses a new socket.
- */
-public class SentryPolicyServiceClientDefaultImpl implements SentryPolicyServiceClient {
+ So it is important to close and re-open the transport so that new socket is used.
+
+ When an class is instantiated, there will be transport created connecting with first available
+ server this is configured.
+*/
+
+public class SentryPolicyServiceClientDefaultImpl extends SentryServiceClientTransportDefaultImpl implements SentryPolicyServiceClient {
- private final Configuration conf;
- private final boolean kerberos;
- private String[] serverPrincipalParts;
private SentryPolicyService.Client client;
- private TTransport transport;
- private int connectionTimeout;
private static final Logger LOGGER = LoggerFactory
- .getLogger(SentryPolicyServiceClient.class);
+ .getLogger(SentryPolicyServiceClient.class);
private static final String THRIFT_EXCEPTION_MESSAGE = "Thrift exception occurred ";
- // configs for connection retry
- private int connectionFullRetryTotal;
- private List<InetSocketAddress> endpoints;
- final SentryPolicyClientTransportConfig transportConfig = new SentryPolicyClientTransportConfig();
- private static final ImmutableMap<String, String> SASL_PROPERTIES =
- ImmutableMap.of(Sasl.SERVER_AUTH, "true", Sasl.QOP, "auth-conf");
-
- /**
- * This transport wraps the Sasl transports to set up the right UGI context for open().
- */
- public static class UgiSaslClientTransport extends TSaslClientTransport {
- protected UserGroupInformation ugi = null;
-
- public UgiSaslClientTransport(String mechanism, String authorizationId,
- String protocol, String serverName, Map<String, String> props,
- CallbackHandler cbh, TTransport transport, boolean wrapUgi)
- throws IOException {
- super(mechanism, authorizationId, protocol, serverName, props, cbh,
- transport);
- if (wrapUgi) {
- ugi = UserGroupInformation.getLoginUser();
- }
- }
-
- // open the SASL transport with using the current UserGroupInformation
- // This is needed to get the current login context stored
- @Override
- public synchronized void open() throws TTransportException {
- if (ugi == null) {
- baseOpen();
- } else {
- try {
- if (ugi.isFromKeytab()) {
- ugi.checkTGTAndReloginFromKeytab();
- }
- ugi.doAs(new PrivilegedExceptionAction<Void>() {
- public Void run() throws TTransportException {
- baseOpen();
- return null;
- }
- });
- } catch (IOException e) {
- throw new TTransportException("Failed to open SASL transport", e);
- } catch (InterruptedException e) {
- throw new TTransportException(
- "Interrupted while opening underlying transport", e);
- }
- }
- }
-
- private void baseOpen() throws TTransportException {
- super.open();
- }
- }
/**
* Initialize the sentry configurations.
*/
public SentryPolicyServiceClientDefaultImpl(Configuration conf)
- throws IOException {
- this.conf = conf;
- Preconditions.checkNotNull(this.conf, "Configuration object cannot be null");
- try {
- String hostsAndPortsStr;
- this.connectionTimeout = transportConfig.getServerRpcConnTimeoutInMs(conf);
- this.connectionFullRetryTotal = transportConfig.getSentryFullRetryTotal(conf);
-
- this.kerberos = transportConfig.isKerberosEnabled(conf);
- hostsAndPortsStr = transportConfig.getSentryServerRpcAddress(conf);
-
- int serverPort = transportConfig.getServerRpcPort(conf);
-
- String[] hostsAndPortsStrArr = hostsAndPortsStr.split(",");
- HostAndPort[] hostsAndPorts = ThriftUtil.parseHostPortStrings(hostsAndPortsStrArr, serverPort);
- this.endpoints = new ArrayList(hostsAndPortsStrArr.length);
- for (int i = hostsAndPortsStrArr.length - 1; i >= 0; i--) {
- this.endpoints.add(
- new InetSocketAddress(hostsAndPorts[i].getHostText(), hostsAndPorts[i].getPort()));
- LOGGER.debug("Added server endpoint: " + hostsAndPorts[i].toString());
- }
- } catch (MissingConfigurationException e) {
- throw new RuntimeException("Client Creation Failed: " + e.getMessage(), e);
- }
+ throws IOException {
+ super(conf, sentryClientType.POLICY_CLIENT);
}
public SentryPolicyServiceClientDefaultImpl(String addr, int port,
Configuration conf) throws IOException {
- this.conf = conf;
- Preconditions.checkNotNull(this.conf, "Configuration object cannot be null");
- try {
- InetSocketAddress serverAddress = NetUtils.createSocketAddr(addr, port);
- this.connectionTimeout = transportConfig.getServerRpcConnTimeoutInMs(conf);
- this.kerberos = transportConfig.isKerberosEnabled(conf);
- connect(serverAddress);
- } catch (MissingConfigurationException e) {
- throw new RuntimeException("Client Creation Failed: " + e.getMessage(), e);
- }
-
+ super(addr, port, conf, sentryClientType.POLICY_CLIENT);
}
/**
- * This is a no-op when already connected.
- * When there is a connection error, it will retry with another sentry server. It will
- * first cycle through all the available sentry servers, and then retry the whole server
- * list no more than connectionFullRetryTotal times. In this case, it won't introduce
- * more latency when some server fails. Also to prevent all clients connecting to the
- * same server, it will reorder the endpoints randomly after a full retry.
- * <p>
- * TODO: Have a small random sleep after a full retry to prevent all clients connecting to the same server.
- * <p>
- * TODO: Add metrics for the number of successful connects and errors per client, and total number of retries.
- * @throws Exception if client fails to connect to all servers for a configured
- * number of times
- */
- public synchronized void connectWithRetry() throws Exception {
- if (isConnected()) {
- return;
- }
- Exception currentException = null;
- // Here for each full connectWithRetry it will cycle through all available sentry
- // servers. Before each full connectWithRetry, it will shuffle the server list.
- for (int retryCount = 0; retryCount < connectionFullRetryTotal; retryCount++) {
- // Reorder endpoints randomly to prevent all clients connecting to the same endpoint
- // at the same time after a node failure.
- Collections.shuffle(endpoints);
- for (InetSocketAddress addr : endpoints) {
- try {
- connect(addr);
- LOGGER.info(String.format("Connected to SentryServer: %s", addr.toString()));
- return;
- } catch (Exception e) {
- LOGGER.debug(String.format("Failed connection to %s: %s",
- addr.toString(), e.getMessage()), e);
- currentException = e;
- }
- }
- }
-
- // Throw exception as reaching the max full connectWithRetry number.
- LOGGER.error(
- String.format("Reach the max connection retry num %d ", connectionFullRetryTotal),
- currentException);
- throw currentException;
- }
-
- /**
- * Connect to the specified socket address and throw Exception if failed.
+ * Connect to the specified socket address and then use the new socket
+ * to construct new thrift client.
+ *
+ * @param serverAddress: socket address to which the client should connect.
+ * @throws IOException
*/
- private void connect(InetSocketAddress serverAddress) throws IOException {
- transport = new TSocket(serverAddress.getHostName(),
- serverAddress.getPort(), connectionTimeout);
- try {
- if (kerberos) {
- String serverPrincipal = transportConfig.getSentryPrincipal(conf);
-
- // Resolve server host in the same way as we are doing on server side
- serverPrincipal =
- SecurityUtil.getServerPrincipal(serverPrincipal, serverAddress.getAddress());
- LOGGER.debug("Using server kerberos principal: " + serverPrincipal);
-
- serverPrincipalParts = SaslRpcServer.splitKerberosName(serverPrincipal);
- Preconditions.checkArgument(serverPrincipalParts.length == 3,
- "Kerberos principal should have 3 parts: " + serverPrincipal);
- boolean wrapUgi = transportConfig.useUserGroupInformation(conf);
- transport = new SentryPolicyServiceClientDefaultImpl.UgiSaslClientTransport(
- SaslRpcServer.AuthMethod.KERBEROS.getMechanismName(),
- null, serverPrincipalParts[0], serverPrincipalParts[1],
- SASL_PROPERTIES, null, transport, wrapUgi);
- } else {
- serverPrincipalParts = null;
- }
-
- transport.open();
- } catch (TTransportException e) {
- throw new IOException("Transport exception while opening transport: " + e.getMessage(), e);
- }
-
- LOGGER.debug("Successfully opened transport: " + transport + " to " + serverAddress);
+ public void connect(InetSocketAddress serverAddress) throws IOException {
+ super.connect(serverAddress);
long maxMessageSize = conf.getLong(
- ServiceConstants.ClientConfig.SENTRY_POLICY_CLIENT_THRIFT_MAX_MESSAGE_SIZE,
- ServiceConstants.ClientConfig.SENTRY_POLICY_CLIENT_THRIFT_MAX_MESSAGE_SIZE_DEFAULT);
+ ServiceConstants.ClientConfig.SENTRY_POLICY_CLIENT_THRIFT_MAX_MESSAGE_SIZE,
+ ServiceConstants.ClientConfig.SENTRY_POLICY_CLIENT_THRIFT_MAX_MESSAGE_SIZE_DEFAULT);
TMultiplexedProtocol protocol = new TMultiplexedProtocol(
- new TBinaryProtocol(transport, maxMessageSize, maxMessageSize, true, true),
- SentryPolicyStoreProcessor.SENTRY_POLICY_SERVICE_NAME);
+ new TBinaryProtocol(transport, maxMessageSize, maxMessageSize, true, true),
+ SentryPolicyStoreProcessor.SENTRY_POLICY_SERVICE_NAME);
client = new SentryPolicyService.Client(protocol);
LOGGER.debug("Successfully created client");
}
public synchronized void createRole(String requestorUserName, String roleName)
- throws SentryUserException {
+ throws SentryUserException {
TCreateSentryRoleRequest request = new TCreateSentryRoleRequest();
request.setProtocol_version(ThriftConstants.TSENTRY_SERVICE_VERSION_CURRENT);
request.setRequestorUserName(requestorUserName);
@@ -288,20 +119,20 @@ public class SentryPolicyServiceClientDefaultImpl implements SentryPolicyService
}
public synchronized void dropRole(String requestorUserName,
- String roleName)
- throws SentryUserException {
+ String roleName)
+ throws SentryUserException {
dropRole(requestorUserName, roleName, false);
}
public synchronized void dropRoleIfExists(String requestorUserName,
- String roleName)
- throws SentryUserException {
+ String roleName)
+ throws SentryUserException {
dropRole(requestorUserName, roleName, true);
}
private synchronized void dropRole(String requestorUserName,
- String roleName, boolean ifExists)
- throws SentryUserException {
+ String roleName, boolean ifExists)
+ throws SentryUserException {
TDropSentryRoleRequest request = new TDropSentryRoleRequest();
request.setProtocol_version(ThriftConstants.TSENTRY_SERVICE_VERSION_CURRENT);
request.setRequestorUserName(requestorUserName);
@@ -320,15 +151,16 @@ public class SentryPolicyServiceClientDefaultImpl implements SentryPolicyService
/**
* Gets sentry role objects for a given groupName using the Sentry service
+ *
* @param requestorUserName : user on whose behalf the request is issued
- * @param groupName : groupName to look up ( if null returns all roles for all groups)
+ * @param groupName : groupName to look up ( if null returns all roles for all groups)
* @return Set of thrift sentry role objects
* @throws SentryUserException
*/
public synchronized Set<TSentryRole> listRolesByGroupName(
- String requestorUserName,
- String groupName)
- throws SentryUserException {
+ String requestorUserName,
+ String groupName)
+ throws SentryUserException {
TListSentryRolesRequest request = new TListSentryRolesRequest();
request.setProtocol_version(ThriftConstants.TSENTRY_SERVICE_VERSION_CURRENT);
request.setRequestorUserName(requestorUserName);
@@ -350,15 +182,13 @@ public class SentryPolicyServiceClientDefaultImpl implements SentryPolicyService
/**
* Gets sentry role objects for a given userName using the Sentry service
*
- * @param requestorUserName
- * : user on whose behalf the request is issued
- * @param userName
- * : userName to look up (can't be empty)
+ * @param requestorUserName : user on whose behalf the request is issued
+ * @param userName : userName to look up (can't be empty)
* @return Set of thrift sentry role objects
* @throws SentryUserException
*/
public Set<TSentryRole> listRolesByUserName(String requestorUserName, String userName)
- throws SentryUserException {
+ throws SentryUserException {
TListSentryRolesForUserRequest request = new TListSentryRolesForUserRequest();
request.setProtocol_version(ThriftConstants.TSENTRY_SERVICE_VERSION_CURRENT);
request.setRequestorUserName(requestorUserName);
@@ -374,22 +204,23 @@ public class SentryPolicyServiceClientDefaultImpl implements SentryPolicyService
}
public synchronized Set<TSentryPrivilege> listAllPrivilegesByRoleName(String requestorUserName,
- String roleName)
- throws SentryUserException {
+ String roleName)
+ throws SentryUserException {
return listPrivilegesByRoleName(requestorUserName, roleName, null);
}
/**
* Gets sentry privilege objects for a given roleName using the Sentry service
+ *
* @param requestorUserName : user on whose behalf the request is issued
- * @param roleName : roleName to look up
- * @param authorizable : authorizable Hierarchy (server->db->table etc)
+ * @param roleName : roleName to look up
+ * @param authorizable : authorizable Hierarchy (server->db->table etc)
* @return Set of thrift sentry privilege objects
* @throws SentryUserException
*/
public synchronized Set<TSentryPrivilege> listPrivilegesByRoleName(String requestorUserName,
- String roleName, List<? extends Authorizable> authorizable)
- throws SentryUserException {
+ String roleName, List<? extends Authorizable> authorizable)
+ throws SentryUserException {
TListSentryPrivilegesRequest request = new TListSentryPrivilegesRequest();
request.setProtocol_version(ThriftConstants.TSENTRY_SERVICE_VERSION_CURRENT);
request.setRequestorUserName(requestorUserName);
@@ -409,12 +240,12 @@ public class SentryPolicyServiceClientDefaultImpl implements SentryPolicyService
}
public synchronized Set<TSentryRole> listRoles(String requestorUserName)
- throws SentryUserException {
+ throws SentryUserException {
return listRolesByGroupName(requestorUserName, null);
}
public synchronized Set<TSentryRole> listUserRoles(String requestorUserName)
- throws SentryUserException {
+ throws SentryUserException {
Set<TSentryRole> tSentryRoles = Sets.newHashSet();
tSentryRoles.addAll(listRolesByGroupName(requestorUserName, AccessConstants.ALL));
tSentryRoles.addAll(listRolesByUserName(requestorUserName, requestorUserName));
@@ -422,22 +253,22 @@ public class SentryPolicyServiceClientDefaultImpl implements SentryPolicyService
}
public synchronized TSentryPrivilege grantURIPrivilege(String requestorUserName,
- String roleName, String server, String uri)
- throws SentryUserException {
+ String roleName, String server, String uri)
+ throws SentryUserException {
return grantPrivilege(requestorUserName, roleName,
- PrivilegeScope.URI, server, uri, null, null, null, AccessConstants.ALL);
+ PrivilegeScope.URI, server, uri, null, null, null, AccessConstants.ALL);
}
public synchronized TSentryPrivilege grantURIPrivilege(String requestorUserName,
- String roleName, String server, String uri, Boolean grantOption)
- throws SentryUserException {
+ String roleName, String server, String uri, Boolean grantOption)
+ throws SentryUserException {
return grantPrivilege(requestorUserName, roleName,
- PrivilegeScope.URI, server, uri, null, null, null, AccessConstants.ALL, grantOption);
+ PrivilegeScope.URI, server, uri, null, null, null, AccessConstants.ALL, grantOption);
}
public synchronized void grantServerPrivilege(String requestorUserName,
- String roleName, String server, String action)
- throws SentryUserException {
+ String roleName, String server, String action)
+ throws SentryUserException {
// "ALL" and "*" should be synonyms for action and need to be unified with grantServerPrivilege without
// action explicitly specified.
@@ -446,7 +277,7 @@ public class SentryPolicyServiceClientDefaultImpl implements SentryPolicyService
}
grantPrivilege(requestorUserName, roleName,
- PrivilegeScope.SERVER, server, null, null, null, null, action);
+ PrivilegeScope.SERVER, server, null, null, null, null, action);
}
@Deprecated
@@ -455,14 +286,14 @@ public class SentryPolicyServiceClientDefaultImpl implements SentryPolicyService
* String roleName, String server, String action, Boolean grantOption)
*/
public synchronized TSentryPrivilege grantServerPrivilege(String requestorUserName,
- String roleName, String server, Boolean grantOption) throws SentryUserException {
+ String roleName, String server, Boolean grantOption) throws SentryUserException {
return grantServerPrivilege(requestorUserName, roleName, server,
- AccessConstants.ALL, grantOption);
+ AccessConstants.ALL, grantOption);
}
public synchronized TSentryPrivilege grantServerPrivilege(String requestorUserName,
- String roleName, String server, String action, Boolean grantOption)
- throws SentryUserException {
+ String roleName, String server, String action, Boolean grantOption)
+ throws SentryUserException {
// "ALL" and "*" should be synonyms for action and need to be unified with grantServerPrivilege without
// action explicitly specified.
@@ -471,72 +302,72 @@ public class SentryPolicyServiceClientDefaultImpl implements SentryPolicyService
}
return grantPrivilege(requestorUserName, roleName,
- PrivilegeScope.SERVER, server, null, null, null, null, action, grantOption);
+ PrivilegeScope.SERVER, server, null, null, null, null, action, grantOption);
}
public synchronized TSentryPrivilege grantDatabasePrivilege(String requestorUserName,
- String roleName, String server, String db, String action)
- throws SentryUserException {
+ String roleName, String server, String db, String action)
+ throws SentryUserException {
return grantPrivilege(requestorUserName, roleName,
- PrivilegeScope.DATABASE, server, null, db, null, null, action);
+ PrivilegeScope.DATABASE, server, null, db, null, null, action);
}
public synchronized TSentryPrivilege grantDatabasePrivilege(String requestorUserName,
- String roleName, String server, String db, String action, Boolean grantOption)
- throws SentryUserException {
+ String roleName, String server, String db, String action, Boolean grantOption)
+ throws SentryUserException {
return grantPrivilege(requestorUserName, roleName,
- PrivilegeScope.DATABASE, server, null, db, null, null, action, grantOption);
+ PrivilegeScope.DATABASE, server, null, db, null, null, action, grantOption);
}
public synchronized TSentryPrivilege grantTablePrivilege(String requestorUserName,
- String roleName, String server, String db, String table, String action)
- throws SentryUserException {
+ String roleName, String server, String db, String table, String action)
+ throws SentryUserException {
return grantPrivilege(requestorUserName, roleName, PrivilegeScope.TABLE, server,
- null,
- db, table, null, action);
+ null,
+ db, table, null, action);
}
public synchronized TSentryPrivilege grantTablePrivilege(String requestorUserName,
- String roleName, String server, String db, String table, String action, Boolean grantOption)
- throws SentryUserException {
+ String roleName, String server, String db, String table, String action, Boolean grantOption)
+ throws SentryUserException {
return grantPrivilege(requestorUserName, roleName, PrivilegeScope.TABLE, server,
- null, db, table, null, action, grantOption);
+ null, db, table, null, action, grantOption);
}
public synchronized TSentryPrivilege grantColumnPrivilege(String requestorUserName,
- String roleName, String server, String db, String table, String columnName, String action)
- throws SentryUserException {
+ String roleName, String server, String db, String table, String columnName, String action)
+ throws SentryUserException {
return grantPrivilege(requestorUserName, roleName, PrivilegeScope.COLUMN, server,
- null,
- db, table, columnName, action);
+ null,
+ db, table, columnName, action);
}
public synchronized TSentryPrivilege grantColumnPrivilege(String requestorUserName,
- String roleName, String server, String db, String table, String columnName, String action, Boolean grantOption)
- throws SentryUserException {
+ String roleName, String server, String db, String table, String columnName, String action, Boolean grantOption)
+ throws SentryUserException {
return grantPrivilege(requestorUserName, roleName, PrivilegeScope.COLUMN, server,
- null, db, table, columnName, action, grantOption);
+ null, db, table, columnName, action, grantOption);
}
public synchronized Set<TSentryPrivilege> grantColumnsPrivileges(String requestorUserName,
- String roleName, String server, String db, String table, List<String> columnNames, String action)
- throws SentryUserException {
+ String roleName, String server, String db, String table, List<String> columnNames, String action)
+ throws SentryUserException {
return grantPrivileges(requestorUserName, roleName, PrivilegeScope.COLUMN, server,
null,
db, table, columnNames, action);
}
public synchronized Set<TSentryPrivilege> grantColumnsPrivileges(String requestorUserName,
- String roleName, String server, String db, String table, List<String> columnNames, String action, Boolean grantOption)
- throws SentryUserException {
+ String roleName, String server, String db, String table, List<String> columnNames, String action, Boolean grantOption)
+ throws SentryUserException {
return grantPrivileges(requestorUserName, roleName, PrivilegeScope.COLUMN,
- server,
- null, db, table, columnNames, action, grantOption);
+ server,
+ null, db, table, columnNames, action, grantOption);
}
public synchronized Set<TSentryPrivilege> grantPrivileges(
- String requestorUserName, String roleName,
- Set<TSentryPrivilege> privileges) throws SentryUserException {
+ String requestorUserName, String roleName,
+ Set<TSentryPrivilege> privileges) throws SentryUserException {
return grantPrivilegesCore(requestorUserName, roleName, privileges);
}
@@ -548,7 +379,7 @@ public class SentryPolicyServiceClientDefaultImpl implements SentryPolicyService
private TSentryPrivilege grantPrivilegeCore(String requestorUserName, String roleName,
TSentryPrivilege privilege) throws SentryUserException {
Set<TSentryPrivilege> results =
- grantPrivilegesCore(requestorUserName, roleName, ImmutableSet.of(privilege));
+ grantPrivilegesCore(requestorUserName, roleName, ImmutableSet.of(privilege));
if (results != null && results.size() > 0) {
return results.iterator().next();
} else {
@@ -565,7 +396,7 @@ public class SentryPolicyServiceClientDefaultImpl implements SentryPolicyService
request.setPrivileges(privileges);
try {
TAlterSentryRoleGrantPrivilegeResponse response =
- client.alter_sentry_role_grant_privilege(request);
+ client.alter_sentry_role_grant_privilege(request);
Status.throwIfNotOk(response.getStatus());
return response.getPrivileges();
} catch (TException e) {
@@ -575,24 +406,24 @@ public class SentryPolicyServiceClientDefaultImpl implements SentryPolicyService
@VisibleForTesting
public static TSentryAuthorizable setupSentryAuthorizable(
- List<? extends Authorizable> authorizable) {
+ List<? extends Authorizable> authorizable) {
TSentryAuthorizable tSentryAuthorizable = new TSentryAuthorizable();
for (Authorizable authzble : authorizable) {
if (authzble.getTypeName().equalsIgnoreCase(
- DBModelAuthorizable.AuthorizableType.Server.toString())) {
+ DBModelAuthorizable.AuthorizableType.Server.toString())) {
tSentryAuthorizable.setServer(authzble.getName());
} else if (authzble.getTypeName().equalsIgnoreCase(
- DBModelAuthorizable.AuthorizableType.URI.toString())) {
+ DBModelAuthorizable.AuthorizableType.URI.toString())) {
tSentryAuthorizable.setUri(authzble.getName());
} else if (authzble.getTypeName().equalsIgnoreCase(
- DBModelAuthorizable.AuthorizableType.Db.toString())) {
+ DBModelAuthorizable.AuthorizableType.Db.toString())) {
tSentryAuthorizable.setDb(authzble.getName());
} else if (authzble.getTypeName().equalsIgnoreCase(
- DBModelAuthorizable.AuthorizableType.Table.toString())) {
+ DBModelAuthorizable.AuthorizableType.Table.toString())) {
tSentryAuthorizable.setTable(authzble.getName());
} else if (authzble.getTypeName().equalsIgnoreCase(
- DBModelAuthorizable.AuthorizableType.Column.toString())) {
+ DBModelAuthorizable.AuthorizableType.Column.toString())) {
tSentryAuthorizable.setColumn(authzble.getName());
}
}
@@ -600,44 +431,46 @@ public class SentryPolicyServiceClientDefaultImpl implements SentryPolicyService
}
private TSentryPrivilege grantPrivilege(String requestorUserName,
- String roleName,
- PrivilegeScope scope, String serverName, String uri, String db,
- String table, String column, String action) throws SentryUserException {
+ String roleName,
+ PrivilegeScope scope, String serverName, String uri, String db,
+ String table, String column, String action) throws SentryUserException {
return grantPrivilege(requestorUserName, roleName, scope, serverName, uri,
- db, table, column, action, false);
+ db, table, column, action, false);
}
private TSentryPrivilege grantPrivilege(String requestorUserName,
- String roleName, PrivilegeScope scope, String serverName, String uri, String db, String table,
- String column, String action, Boolean grantOption)
- throws SentryUserException {
+
+ String roleName, PrivilegeScope scope, String serverName, String uri, String db, String table,
+ String column, String action, Boolean grantOption)
+ throws SentryUserException {
TSentryPrivilege privilege =
- convertToTSentryPrivilege(scope, serverName, uri, db, table, column, action, grantOption);
+ convertToTSentryPrivilege(scope, serverName, uri, db, table, column, action, grantOption);
return grantPrivilegeCore(requestorUserName, roleName, privilege);
}
private Set<TSentryPrivilege> grantPrivileges(String requestorUserName,
- String roleName,
- PrivilegeScope scope, String serverName, String uri, String db,
- String table, List<String> columns, String action) throws SentryUserException {
+ String roleName,
+ PrivilegeScope scope, String serverName, String uri, String db,
+ String table, List<String> columns, String action) throws SentryUserException {
return grantPrivileges(requestorUserName, roleName, scope, serverName, uri,
- db, table, columns, action, false);
+ db, table, columns, action, false);
}
private Set<TSentryPrivilege> grantPrivileges(String requestorUserName,
- String roleName, PrivilegeScope scope, String serverName, String uri, String db, String table,
- List<String> columns, String action, Boolean grantOption)
- throws SentryUserException {
+ String roleName, PrivilegeScope scope, String serverName, String uri, String db, String
+ table,
+ List<String> columns, String action, Boolean grantOption)
+ throws SentryUserException {
Set<TSentryPrivilege> privileges = convertColumnPrivileges(scope,
- serverName, uri, db, table, columns, action, grantOption);
+ serverName, uri, db, table, columns, action, grantOption);
return grantPrivilegesCore(requestorUserName, roleName, privileges);
}
- public synchronized void revokePrivileges(String requestorUserName, String roleName, Set<TSentryPrivilege> privileges) throws SentryUserException {
+ public synchronized void revokePrivileges(String requestorUserName, String roleName, Set<TSentryPrivilege> privileges) throws SentryUserException {
this.revokePrivilegesCore(requestorUserName, roleName, privileges);
}
- public synchronized void revokePrivilege(String requestorUserName, String roleName, TSentryPrivilege privilege) throws SentryUserException {
+ public synchronized void revokePrivilege(String requestorUserName, String roleName, TSentryPrivilege privilege) throws SentryUserException {
this.revokePrivilegeCore(requestorUserName, roleName, privilege);
}
@@ -654,7 +487,7 @@ public class SentryPolicyServiceClientDefaultImpl implements SentryPolicyService
request.setPrivileges(privileges);
try {
TAlterSentryRoleRevokePrivilegeResponse response = client.alter_sentry_role_revoke_privilege(
- request);
+ request);
Status.throwIfNotOk(response.getStatus());
} catch (TException e) {
throw new SentryUserException(THRIFT_EXCEPTION_MESSAGE, e);
@@ -662,22 +495,22 @@ public class SentryPolicyServiceClientDefaultImpl implements SentryPolicyService
}
public synchronized void revokeURIPrivilege(String requestorUserName,
- String roleName, String server, String uri)
- throws SentryUserException {
+ String roleName, String server, String uri)
+ throws SentryUserException {
revokePrivilege(requestorUserName, roleName,
- PrivilegeScope.URI, server, uri, null, null, null, AccessConstants.ALL);
+ PrivilegeScope.URI, server, uri, null, null, null, AccessConstants.ALL);
}
public synchronized void revokeURIPrivilege(String requestorUserName,
- String roleName, String server, String uri, Boolean grantOption)
- throws SentryUserException {
+ String roleName, String server, String uri, Boolean grantOption)
+ throws SentryUserException {
revokePrivilege(requestorUserName, roleName,
- PrivilegeScope.URI, server, uri, null, null, null, AccessConstants.ALL, grantOption);
+ PrivilegeScope.URI, server, uri, null, null, null, AccessConstants.ALL, grantOption);
}
public synchronized void revokeServerPrivilege(String requestorUserName,
- String roleName, String server, String action)
- throws SentryUserException {
+ String roleName, String server, String action)
+ throws SentryUserException {
// "ALL" and "*" should be synonyms for action and need to be unified with revokeServerPrivilege without
// action explicitly specified.
@@ -686,12 +519,12 @@ public class SentryPolicyServiceClientDefaultImpl implements SentryPolicyService
}
revokePrivilege(requestorUserName, roleName,
- PrivilegeScope.SERVER, server, null, null, null, null, action);
+ PrivilegeScope.SERVER, server, null, null, null, null, action);
}
public synchronized void revokeServerPrivilege(String requestorUserName,
- String roleName, String server, String action, Boolean grantOption)
- throws SentryUserException {
+ String roleName, String server, String action, Boolean grantOption)
+ throws SentryUserException {
// "ALL" and "*" should be synonyms for action and need to be unified with revokeServerPrivilege without
// action explicitly specified.
@@ -700,7 +533,7 @@ public class SentryPolicyServiceClientDefaultImpl implements SentryPolicyService
}
revokePrivilege(requestorUserName, roleName,
- PrivilegeScope.SERVER, server, null, null, null, null, action, grantOption);
+ PrivilegeScope.SERVER, server, null, null, null, null, action, grantOption);
}
@Deprecated
@@ -709,97 +542,98 @@ public class SentryPolicyServiceClientDefaultImpl implements SentryPolicyService
* String roleName, String server, String action, Boolean grantOption)
*/
public synchronized void revokeServerPrivilege(String requestorUserName,
- String roleName, String server, boolean grantOption)
- throws SentryUserException {
+ String roleName, String server, boolean grantOption)
+ throws SentryUserException {
revokePrivilege(requestorUserName, roleName,
- PrivilegeScope.SERVER, server, null, null, null, null, AccessConstants.ALL, grantOption);
+ PrivilegeScope.SERVER, server, null, null, null, null, AccessConstants.ALL, grantOption);
}
public synchronized void revokeDatabasePrivilege(String requestorUserName,
- String roleName, String server, String db, String action)
- throws SentryUserException {
+ String roleName, String server, String db, String action)
+ throws SentryUserException {
revokePrivilege(requestorUserName, roleName,
- PrivilegeScope.DATABASE, server, null, db, null, null, action);
+ PrivilegeScope.DATABASE, server, null, db, null, null, action);
}
public synchronized void revokeDatabasePrivilege(String requestorUserName,
- String roleName, String server, String db, String action, Boolean grantOption)
- throws SentryUserException {
+ String roleName, String server, String db, String action, Boolean grantOption)
+ throws SentryUserException {
revokePrivilege(requestorUserName, roleName,
- PrivilegeScope.DATABASE, server, null, db, null, null, action, grantOption);
+ PrivilegeScope.DATABASE, server, null, db, null, null, action, grantOption);
}
public synchronized void revokeTablePrivilege(String requestorUserName,
- String roleName, String server, String db, String table, String action)
- throws SentryUserException {
+ String roleName, String server, String db, String table, String action)
+ throws SentryUserException {
revokePrivilege(requestorUserName, roleName,
- PrivilegeScope.TABLE, server, null,
- db, table, null, action);
+ PrivilegeScope.TABLE, server, null,
+ db, table, null, action);
}
public synchronized void revokeTablePrivilege(String requestorUserName,
- String roleName, String server, String db, String table, String action, Boolean grantOption)
- throws SentryUserException {
+ String roleName, String server, String db, String table, String action, Boolean grantOption)
+ throws SentryUserException {
revokePrivilege(requestorUserName, roleName,
- PrivilegeScope.TABLE, server, null,
- db, table, null, action, grantOption);
+ PrivilegeScope.TABLE, server, null,
+ db, table, null, action, grantOption);
}
public synchronized void revokeColumnPrivilege(String requestorUserName, String roleName,
- String server, String db, String table, String columnName, String action)
- throws SentryUserException {
+ String server, String db, String table, String columnName, String action)
+ throws SentryUserException {
ImmutableList.Builder<String> listBuilder = ImmutableList.builder();
listBuilder.add(columnName);
revokePrivilege(requestorUserName, roleName,
- PrivilegeScope.COLUMN, server, null,
- db, table, listBuilder.build(), action);
+ PrivilegeScope.COLUMN, server, null,
+ db, table, listBuilder.build(), action);
}
public synchronized void revokeColumnPrivilege(String requestorUserName, String roleName,
- String server, String db, String table, String columnName, String action, Boolean grantOption)
- throws SentryUserException {
+ String server, String db, String table, String columnName, String action, Boolean grantOption)
+ throws SentryUserException {
ImmutableList.Builder<String> listBuilder = ImmutableList.builder();
listBuilder.add(columnName);
revokePrivilege(requestorUserName, roleName,
- PrivilegeScope.COLUMN, server, null,
- db, table, listBuilder.build(), action, grantOption);
+ PrivilegeScope.COLUMN, server, null,
+ db, table, listBuilder.build(), action, grantOption);
}
public synchronized void revokeColumnsPrivilege(String requestorUserName, String roleName,
- String server, String db, String table, List<String> columns, String action)
- throws SentryUserException {
+ String server, String db, String table, List<String> columns, String action)
+ throws SentryUserException {
revokePrivilege(requestorUserName, roleName,
- PrivilegeScope.COLUMN, server, null,
- db, table, columns, action);
+ PrivilegeScope.COLUMN, server, null,
+ db, table, columns, action);
}
public synchronized void revokeColumnsPrivilege(String requestorUserName, String roleName,
- String server, String db, String table, List<String> columns, String action, Boolean grantOption)
- throws SentryUserException {
+ String server, String db, String table, List<String> columns, String action, Boolean grantOption)
+ throws SentryUserException {
revokePrivilege(requestorUserName, roleName,
- PrivilegeScope.COLUMN, server, null,
- db, table, columns, action, grantOption);
+ PrivilegeScope.COLUMN, server, null,
+ db, table, columns, action, grantOption);
}
private void revokePrivilege(String requestorUserName,
- String roleName, PrivilegeScope scope, String serverName, String uri,
- String db, String table, List<String> columns, String action)
- throws SentryUserException {
+ String roleName, PrivilegeScope scope, String serverName, String uri,
+ String db, String table, List<String> columns, String action)
+ throws SentryUserException {
this.revokePrivilege(requestorUserName, roleName, scope, serverName, uri, db, table, columns, action, false);
}
private void revokePrivilege(String requestorUserName, String roleName,
- PrivilegeScope scope, String serverName, String uri, String db, String table, List<String> columns,
- String action, Boolean grantOption)
- throws SentryUserException {
+ PrivilegeScope scope, String serverName, String uri, String db, String table, List<String> columns,
+ String action, Boolean grantOption)
+ throws SentryUserException {
Set<TSentryPrivilege> privileges = convertColumnPrivileges(scope,
- serverName, uri, db, table, columns, action, grantOption);
+ serverName, uri, db, table, columns, action, grantOption);
this.revokePrivilegesCore(requestorUserName, roleName, privileges);
}
private Set<TSentryPrivilege> convertColumnPrivileges(
- PrivilegeScope scope, String serverName, String uri, String db, String table, List<String> columns,
- String action, Boolean grantOption) {
+ PrivilegeScope scope, String serverName, String uri, String db, String
+ table, List<String> columns,
+ String action, Boolean grantOption) {
ImmutableSet.Builder<TSentryPrivilege> setBuilder = ImmutableSet.builder();
if (columns == null || columns.isEmpty()) {
TSentryPrivilege privilege = new TSentryPrivilege();
@@ -832,8 +666,9 @@ public class SentryPolicyServiceClientDefaultImpl implements SentryPolicyService
}
private TSentryPrivilege convertToTSentryPrivilege(
- PrivilegeScope scope, String serverName, String uri, String db, String table, String column,
- String action, Boolean grantOption) {
+ PrivilegeScope scope, String serverName, String uri, String db, String table, String
+ column,
+ String action, Boolean grantOption) {
TSentryPrivilege privilege = new TSentryPrivilege();
privilege.setPrivilegeScope(scope.toString());
privilege.setServerName(serverName);
@@ -858,15 +693,16 @@ public class SentryPolicyServiceClientDefaultImpl implements SentryPolicyService
return TSentryGrantOption.FALSE;
}
- public synchronized Set<String> listPrivilegesForProvider(Set<String> groups, Set<String> users,
- ActiveRoleSet roleSet, Authorizable... authorizable) throws SentryUserException {
+ public synchronized Set<String> listPrivilegesForProvider
+ (Set<String> groups, Set<String> users,
+ ActiveRoleSet roleSet, Authorizable... authorizable) throws SentryUserException {
TSentryActiveRoleSet thriftRoleSet = new TSentryActiveRoleSet(roleSet.isAll(), roleSet.getRoles());
TListSentryPrivilegesForProviderRequest request =
- new TListSentryPrivilegesForProviderRequest(ThriftConstants.
- TSENTRY_SERVICE_VERSION_CURRENT, groups, thriftRoleSet);
+ new TListSentryPrivilegesForProviderRequest(ThriftConstants.
+ TSENTRY_SERVICE_VERSION_CURRENT, groups, thriftRoleSet);
if (authorizable != null && authorizable.length > 0) {
TSentryAuthorizable tSentryAuthorizable = setupSentryAuthorizable(Lists
- .newArrayList(authorizable));
+ .newArrayList(authorizable));
request.setAuthorizableHierarchy(tSentryAuthorizable);
}
if (users != null) {
@@ -883,25 +719,25 @@ public class SentryPolicyServiceClientDefaultImpl implements SentryPolicyService
@Override
public synchronized void grantRoleToGroup(String requestorUserName,
- String groupName, String roleName)
- throws SentryUserException {
+ String groupName, String roleName)
+ throws SentryUserException {
grantRoleToGroups(requestorUserName, roleName, Sets.newHashSet(groupName));
}
@Override
public synchronized void revokeRoleFromGroup(String requestorUserName,
- String groupName, String roleName)
- throws SentryUserException {
+ String groupName, String roleName)
+ throws SentryUserException {
revokeRoleFromGroups(requestorUserName, roleName, Sets.newHashSet(groupName));
}
@Override
public synchronized void grantRoleToGroups(String requestorUserName,
- String roleName, Set<String> groups)
- throws SentryUserException {
+ String roleName, Set<String> groups)
+ throws SentryUserException {
TAlterSentryRoleAddGroupsRequest request = new TAlterSentryRoleAddGroupsRequest(
- ThriftConstants.TSENTRY_SERVICE_VERSION_CURRENT, requestorUserName,
- roleName, convert2TGroups(groups));
+ ThriftConstants.TSENTRY_SERVICE_VERSION_CURRENT, requestorUserName,
+ roleName, convert2TGroups(groups));
try {
TAlterSentryRoleAddGroupsResponse response = client.alter_sentry_role_add_groups(request);
Status.throwIfNotOk(response.getStatus());
@@ -912,11 +748,11 @@ public class SentryPolicyServiceClientDefaultImpl implements SentryPolicyService
@Override
public synchronized void revokeRoleFromGroups(String requestorUserName,
- String roleName, Set<String> groups)
- throws SentryUserException {
+ String roleName, Set<String> groups)
+ throws SentryUserException {
TAlterSentryRoleDeleteGroupsRequest request = new TAlterSentryRoleDeleteGroupsRequest(
- ThriftConstants.TSENTRY_SERVICE_VERSION_CURRENT, requestorUserName,
- roleName, convert2TGroups(groups));
+ ThriftConstants.TSENTRY_SERVICE_VERSION_CURRENT, requestorUserName,
+ roleName, convert2TGroups(groups));
try {
TAlterSentryRoleDeleteGroupsResponse response = client.alter_sentry_role_delete_groups(request);
Status.throwIfNotOk(response.getStatus());
@@ -927,21 +763,21 @@ public class SentryPolicyServiceClientDefaultImpl implements SentryPolicyService
@Override
public synchronized void grantRoleToUser(String requestorUserName, String userName,
- String roleName) throws SentryUserException {
+ String roleName) throws SentryUserException {
grantRoleToUsers(requestorUserName, roleName, Sets.newHashSet(userName));
}
@Override
public synchronized void revokeRoleFromUser(String requestorUserName, String userName,
- String roleName) throws SentryUserException {
+ String roleName) throws SentryUserException {
revokeRoleFromUsers(requestorUserName, roleName, Sets.newHashSet(userName));
}
@Override
public synchronized void grantRoleToUsers(String requestorUserName, String roleName,
- Set<String> users) throws SentryUserException {
+ Set<String> users) throws SentryUserException {
TAlterSentryRoleAddUsersRequest request = new TAlterSentryRoleAddUsersRequest(
- ThriftConstants.TSENTRY_SERVICE_VERSION_CURRENT, requestorUserName, roleName, users);
+ ThriftConstants.TSENTRY_SERVICE_VERSION_CURRENT, requestorUserName, roleName, users);
try {
TAlterSentryRoleAddUsersResponse response = client.alter_sentry_role_add_users(request);
Status.throwIfNotOk(response.getStatus());
@@ -952,9 +788,9 @@ public class SentryPolicyServiceClientDefaultImpl implements SentryPolicyService
@Override
public synchronized void revokeRoleFromUsers(String requestorUserName, String roleName,
- Set<String> users) throws SentryUserException {
+ Set<String> users) throws SentryUserException {
TAlterSentryRoleDeleteUsersRequest request = new TAlterSentryRoleDeleteUsersRequest(
- ThriftConstants.TSENTRY_SERVICE_VERSION_CURRENT, requestorUserName, roleName, users);
+ ThriftConstants.TSENTRY_SERVICE_VERSION_CURRENT, requestorUserName, roleName, users);
try {
TAlterSentryRoleDeleteUsersResponse response = client.alter_sentry_role_delete_users(request);
Status.throwIfNotOk(response.getStatus());
@@ -974,13 +810,13 @@ public class SentryPolicyServiceClientDefaultImpl implements SentryPolicyService
}
public synchronized void dropPrivileges(String requestorUserName,
- List<? extends Authorizable> authorizableObjects)
- throws SentryUserException {
+ List<? extends Authorizable> authorizableObjects)
+ throws SentryUserException {
TSentryAuthorizable tSentryAuthorizable = setupSentryAuthorizable(authorizableObjects);
TDropPrivilegesRequest request = new TDropPrivilegesRequest(
- ThriftConstants.TSENTRY_SERVICE_VERSION_CURRENT, requestorUserName,
- tSentryAuthorizable);
+ ThriftConstants.TSENTRY_SERVICE_VERSION_CURRENT, requestorUserName,
+ tSentryAuthorizable);
try {
TDropPrivilegesResponse response = client.drop_sentry_privilege(request);
Status.throwIfNotOk(response.getStatus());
@@ -990,35 +826,36 @@ public class SentryPolicyServiceClientDefaultImpl implements SentryPolicyService
}
public synchronized void renamePrivileges(String requestorUserName,
- List<? extends Authorizable> oldAuthorizables,
- List<? extends Authorizable> newAuthorizables) throws SentryUserException {
+ List<? extends Authorizable> oldAuthorizables,
+ List<? extends Authorizable> newAuthorizables) throws SentryUserException {
TSentryAuthorizable tOldSentryAuthorizable = setupSentryAuthorizable(oldAuthorizables);
TSentryAuthorizable tNewSentryAuthorizable = setupSentryAuthorizable(newAuthorizables);
TRenamePrivilegesRequest request = new TRenamePrivilegesRequest(
- ThriftConstants.TSENTRY_SERVICE_VERSION_CURRENT, requestorUserName,
- tOldSentryAuthorizable, tNewSentryAuthorizable);
+ ThriftConstants.TSENTRY_SERVICE_VERSION_CURRENT, requestorUserName,
+ tOldSentryAuthorizable, tNewSentryAuthorizable);
try {
TRenamePrivilegesResponse response = client
- .rename_sentry_privilege(request);
+ .rename_sentry_privilege(request);
Status.throwIfNotOk(response.getStatus());
} catch (TException e) {
throw new SentryUserException(THRIFT_EXCEPTION_MESSAGE, e);
}
}
- public synchronized Map<TSentryAuthorizable, TSentryPrivilegeMap> listPrivilegsbyAuthorizable(
- String requestorUserName,
- Set<List<? extends Authorizable>> authorizables, Set<String> groups,
- ActiveRoleSet roleSet) throws SentryUserException {
+ public synchronized Map<TSentryAuthorizable, TSentryPrivilegeMap> listPrivilegsbyAuthorizable
+ (
+ String requestorUserName,
+ Set<List<? extends Authorizable>> authorizables, Set<String> groups,
+ ActiveRoleSet roleSet) throws SentryUserException {
Set<TSentryAuthorizable> authSet = Sets.newTreeSet();
for (List<? extends Authorizable> authorizableHierarchy : authorizables) {
authSet.add(setupSentryAuthorizable(authorizableHierarchy));
}
TListSentryPrivilegesByAuthRequest request = new TListSentryPrivilegesByAuthRequest(
- ThriftConstants.TSENTRY_SERVICE_VERSION_CURRENT, requestorUserName,
- authSet);
+ ThriftConstants.TSENTRY_SERVICE_VERSION_CURRENT, requestorUserName,
+ authSet);
if (groups != null) {
request.setGroups(groups);
}
@@ -1028,7 +865,7 @@ public class SentryPolicyServiceClientDefaultImpl implements SentryPolicyService
try {
TListSentryPrivilegesByAuthResponse response = client
- .list_sentry_privileges_by_authorizable(request);
+ .list_sentry_privileges_by_authorizable(request);
Status.throwIfNotOk(response.getStatus());
return response.getPrivilegesMapByAuth();
} catch (TException e) {
@@ -1041,6 +878,7 @@ public class SentryPolicyServiceClientDefaultImpl implements SentryPolicyService
* propertyName, or if propertyName does not exist, the defaultValue.
* There is no "requestorUserName" because this is regarded as an
* internal interface.
+ *
* @param propertyName Config attribute to search for
* @param defaultValue String to return if not found
* @return The value of the propertyName
@@ -1062,44 +900,33 @@ public class SentryPolicyServiceClientDefaultImpl implements SentryPolicyService
}
}
- public synchronized void close() {
- if (isConnected()) {
- transport.close();
- }
- }
-
- private boolean isConnected() {
- return transport != null && transport.isOpen();
- }
-
/**
* Import the sentry mapping data, convert the mapping data from map structure to
* TSentryMappingData, and call the import API.
- *
- * @param policyFileMappingData
- * Include 2 maps to save the mapping data, the following is the example of the data
- * structure:
- * for the following mapping data:
- * group1=role1,role2
- * group2=role2,role3
- * role1=server=server1->db=db1
- * role2=server=server1->db=db1->table=tbl1,server=server1->db=db1->table=tbl2
- * role3=server=server1->url=hdfs://localhost/path
- *
- * The policyFileMappingData will be inputed as:
- * {
- * groups={[group1={role1, role2}], group2=[role2, role3]},
- * roles={role1=[server=server1->db=db1],
- * role2=[server=server1->db=db1->table=tbl1,server=server1->db=db1->table=tbl2],
- * role3=[server=server1->url=hdfs://localhost/path]
- * }
- * }
- * @param requestorUserName
- * The name of the request user
+ *
+ * @param policyFileMappingData Include 2 maps to save the mapping data, the following is the example of the data
+ * structure:
+ * for the following mapping data:
+ * group1=role1,role2
+ * group2=role2,role3
+ * role1=server=server1->db=db1
+ * role2=server=server1->db=db1->table=tbl1,server=server1->db=db1->table=tbl2
+ * role3=server=server1->url=hdfs://localhost/path
+ * <p>
+ * The policyFileMappingData will be inputed as:
+ * {
+ * groups={[group1={role1, role2}], group2=[role2, role3]},
+ * roles={role1=[server=server1->db=db1],
+ * role2=[server=server1->db=db1->table=tbl1,server=server1->db=db1->table=tbl2],
+ * role3=[server=server1->url=hdfs://localhost/path]
+ * }
+ * }
+ * @param requestorUserName The name of the request user
*/
- public synchronized void importPolicy(Map<String, Map<String, Set<String>>> policyFileMappingData,
- String requestorUserName, boolean isOverwriteRole)
- throws SentryUserException {
+ public synchronized void importPolicy
+ (Map<String, Map<String, Set<String>>> policyFileMappingData,
+ String requestorUserName, boolean isOverwriteRole)
+ throws SentryUserException {
try {
TSentryMappingData tSentryMappingData = new TSentryMappingData();
// convert the mapping data for [group,role] from map structure to
@@ -1109,11 +936,11 @@ public class SentryPolicyServiceClientDefaultImpl implements SentryPolicyService
// convert the mapping data for [role,privilege] from map structure to
// TSentryMappingData.RolePrivilegesMap
tSentryMappingData
- .setRolePrivilegesMap(convertRolePrivilegesMapForSentryDB(policyFileMappingData
- .get(PolicyFileConstants.ROLES)));
+ .setRolePrivilegesMap(convertRolePrivilegesMapForSentryDB(policyFileMappingData
+ .get(PolicyFileConstants.ROLES)));
TSentryImportMappingDataRequest request = new TSentryImportMappingDataRequest(
- ThriftConstants.TSENTRY_SERVICE_VERSION_CURRENT, requestorUserName, isOverwriteRole,
- tSentryMappingData);
+ ThriftConstants.TSENTRY_SERVICE_VERSION_CURRENT, requestorUserName, isOverwriteRole,
+ tSentryMappingData);
TSentryImportMappingDataResponse response = client.import_sentry_mapping_data(request);
Status.throwIfNotOk(response.getStatus());
} catch (TException e) {
@@ -1124,7 +951,7 @@ public class SentryPolicyServiceClientDefaultImpl implements SentryPolicyService
// convert the mapping data for [role,privilege] from map structure to
// TSentryMappingData.RolePrivilegesMap
private Map<String, Set<TSentryPrivilege>> convertRolePrivilegesMapForSentryDB(
- Map<String, Set<String>> rolePrivilegesMap) {
+ Map<String, Set<String>> rolePrivilegesMap) {
Map<String, Set<TSentryPrivilege>> rolePrivilegesMapResult = Maps.newHashMap();
if (rolePrivilegesMap != null) {
for (Map.Entry<String, Set<String>> entry : rolePrivilegesMap.entrySet()) {
@@ -1140,10 +967,11 @@ public class SentryPolicyServiceClientDefaultImpl implements SentryPolicyService
}
// export the sentry mapping data with map structure
- public synchronized Map<String, Map<String, Set<String>>> exportPolicy(String requestorUserName,
- String objectPath) throws SentryUserException {
+ public synchronized Map<String, Map<String, Set<String>>> exportPolicy(String
+ requestorUserName,
+ String objectPath) throws SentryUserException {
TSentryExportMappingDataRequest request = new TSentryExportMappingDataRequest(
- ThriftConstants.TSENTRY_SERVICE_VERSION_CURRENT, requestorUserName);
+ ThriftConstants.TSENTRY_SERVICE_VERSION_CURRENT, requestorUserName);
request.setObjectPath(objectPath);
try {
TSentryExportMappingDataResponse response = client.export_sentry_mapping_data(request);
@@ -1162,7 +990,7 @@ public class SentryPolicyServiceClientDefaultImpl implements SentryPolicyService
// convert the mapping data for [roleName,privilege] from TSentryMappingData.RolePrivilegesMap to
// map structure
private Map<String, Set<String>> convertRolePrivilegesMapForPolicyFile(
- Map<String, Set<TSentryPrivilege>> rolePrivilegesMap) {
+ Map<String, Set<TSentryPrivilege>> rolePrivilegesMap) {
Map<String, Set<String>> rolePrivilegesMapForFile = Maps.newHashMap();
if (rolePrivilegesMap != null) {
for (Map.Entry<String, Set<TSentryPrivilege>> entry : rolePrivilegesMap.entrySet()) {
http://git-wip-us.apache.org/repos/asf/sentry/blob/e3d859a9/sentry-provider/sentry-provider-db/src/main/java/org/apache/sentry/provider/db/service/thrift/SentryPolicyStoreProcessor.java
----------------------------------------------------------------------
diff --git a/sentry-provider/sentry-provider-db/src/main/java/org/apache/sentry/provider/db/service/thrift/SentryPolicyStoreProcessor.java b/sentry-provider/sentry-provider-db/src/main/java/org/apache/sentry/provider/db/service/thrift/SentryPolicyStoreProcessor.java
index ee2a466..20ebb2c 100644
--- a/sentry-provider/sentry-provider-db/src/main/java/org/apache/sentry/provider/db/service/thrift/SentryPolicyStoreProcessor.java
+++ b/sentry-provider/sentry-provider-db/src/main/java/org/apache/sentry/provider/db/service/thrift/SentryPolicyStoreProcessor.java
@@ -47,7 +47,7 @@ import org.apache.sentry.provider.db.log.entity.JsonLogEntity;
import org.apache.sentry.provider.db.log.entity.JsonLogEntityFactory;
import org.apache.sentry.provider.db.log.util.Constants;
import org.apache.sentry.provider.db.service.persistent.SentryStore;
-import org.apache.sentry.provider.db.service.thrift.PolicyStoreConstants.PolicyStoreServerConfig;
+import org.apache.sentry.core.common.utils.PolicyStoreConstants.PolicyStoreServerConfig;
import org.apache.sentry.service.thrift.SentryServiceUtil;
import org.apache.sentry.service.thrift.ServiceConstants;
import org.apache.sentry.service.thrift.ServiceConstants.ConfUtilties;
http://git-wip-us.apache.org/repos/asf/sentry/blob/e3d859a9/sentry-provider/sentry-provider-db/src/main/java/org/apache/sentry/provider/db/service/thrift/SentryProcessorWrapper.java
----------------------------------------------------------------------
diff --git a/sentry-provider/sentry-provider-db/src/main/java/org/apache/sentry/provider/db/service/thrift/SentryProcessorWrapper.java b/sentry-provider/sentry-provider-db/src/main/java/org/apache/sentry/provider/db/service/thrift/SentryProcessorWrapper.java
index a5f11a9..5e26486 100644
--- a/sentry-provider/sentry-provider-db/src/main/java/org/apache/sentry/provider/db/service/thrift/SentryProcessorWrapper.java
+++ b/sentry-provider/sentry-provider-db/src/main/java/org/apache/sentry/provider/db/service/thrift/SentryProcessorWrapper.java
@@ -18,6 +18,7 @@
package org.apache.sentry.provider.db.service.thrift;
+import org.apache.sentry.core.common.utils.ThriftUtil;
import org.apache.thrift.TException;
import org.apache.thrift.protocol.TProtocol;
http://git-wip-us.apache.org/repos/asf/sentry/blob/e3d859a9/sentry-provider/sentry-provider-db/src/main/java/org/apache/sentry/provider/db/service/thrift/ThriftUtil.java
----------------------------------------------------------------------
diff --git a/sentry-provider/sentry-provider-db/src/main/java/org/apache/sentry/provider/db/service/thrift/ThriftUtil.java b/sentry-provider/sentry-provider-db/src/main/java/org/apache/sentry/provider/db/service/thrift/ThriftUtil.java
deleted file mode 100644
index 5fed04a..0000000
--- a/sentry-provider/sentry-provider-db/src/main/java/org/apache/sentry/provider/db/service/thrift/ThriftUtil.java
+++ /dev/null
@@ -1,127 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.sentry.provider.db.service.thrift;
-
-import com.google.common.net.HostAndPort;
-import org.apache.thrift.protocol.TProtocol;
-import org.apache.thrift.transport.TSaslClientTransport;
-import org.apache.thrift.transport.TSaslServerTransport;
-import org.apache.thrift.transport.TSocket;
-import org.apache.thrift.transport.TTransport;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import com.google.common.base.Preconditions;
-
-public final class ThriftUtil {
-
- private static final Logger LOGGER = LoggerFactory.getLogger(ThriftUtil.class);
-
- public static void setImpersonator(final TProtocol in) {
- try {
- TTransport transport = in.getTransport();
- if (transport instanceof TSaslServerTransport) {
- String impersonator = ((TSaslServerTransport) transport).getSaslServer()
- .getAuthorizationID();
- setImpersonator(impersonator);
- }
- } catch (Exception e) {
- // If there has exception when get impersonator info, log the error information.
- LOGGER.warn("There is an error when get the impersonator:" + e.getMessage());
- }
- }
-
- public static void setIpAddress(final TProtocol in) {
- try {
- TTransport transport = in.getTransport();
- TSocket tSocket = getUnderlyingSocketFromTransport(transport);
- if (tSocket != null) {
- setIpAddress(tSocket.getSocket().getInetAddress().toString());
- } else {
- LOGGER.warn("Unknown Transport, cannot determine ipAddress");
- }
- } catch (Exception e) {
- // If there has exception when get impersonator info, log the error information.
- LOGGER.warn("There is an error when get the client's ip address:" + e.getMessage());
- }
- }
-
- /**
- * Returns the underlying TSocket from the transport, or null of the transport type is unknown.
- */
- private static TSocket getUnderlyingSocketFromTransport(TTransport transport) {
- Preconditions.checkNotNull(transport);
- if (transport instanceof TSaslServerTransport) {
- return (TSocket) ((TSaslServerTransport) transport).getUnderlyingTransport();
- } else if (transport instanceof TSaslClientTransport) {
- return (TSocket) ((TSaslClientTransport) transport).getUnderlyingTransport();
- } else if (transport instanceof TSocket) {
- return (TSocket) transport;
- }
- return null;
- }
-
- private static ThreadLocal<String> threadLocalIpAddress = new ThreadLocal<String>() {
- @Override
- protected synchronized String initialValue() {
- return "";
- }
- };
-
- public static void setIpAddress(String ipAddress) {
- threadLocalIpAddress.set(ipAddress);
- }
-
- public static String getIpAddress() {
- return threadLocalIpAddress.get();
- }
-
- private static ThreadLocal<String> threadLocalImpersonator = new ThreadLocal<String>() {
- @Override
- protected synchronized String initialValue() {
- return "";
- }
- };
-
- public static void setImpersonator(String impersonator) {
- threadLocalImpersonator.set(impersonator);
- }
-
- public static String getImpersonator() {
- return threadLocalImpersonator.get();
- }
-
- private ThriftUtil() {
- // Make constructor private to avoid instantiation
- }
-
- /**
- * Utility function for parsing host and port strings. Expected form should be
- * (host:port). The hostname could be in ipv6 style. If port is not specified,
- * defaultPort will be used.
- */
- public static HostAndPort[] parseHostPortStrings(String[] hostsAndPortsArr, int defaultPort) {
- HostAndPort[] hostsAndPorts = new HostAndPort[hostsAndPortsArr.length];
- for (int i = 0; i < hostsAndPorts.length; i++) {
- hostsAndPorts[i] =
- HostAndPort.fromString(hostsAndPortsArr[i]).withDefaultPort(defaultPort);
- }
- return hostsAndPorts;
- }
-}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/sentry/blob/e3d859a9/sentry-provider/sentry-provider-db/src/main/java/org/apache/sentry/service/thrift/PoolClientInvocationHandler.java
----------------------------------------------------------------------
diff --git a/sentry-provider/sentry-provider-db/src/main/java/org/apache/sentry/service/thrift/PoolClientInvocationHandler.java b/sentry-provider/sentry-provider-db/src/main/java/org/apache/sentry/service/thrift/PoolClientInvocationHandler.java
index d5f4fcb..acf9b05 100644
--- a/sentry-provider/sentry-provider-db/src/main/java/org/apache/sentry/service/thrift/PoolClientInvocationHandler.java
+++ b/sentry-provider/sentry-provider-db/src/main/java/org/apache/sentry/service/thrift/PoolClientInvocationHandler.java
@@ -28,8 +28,9 @@ import org.apache.commons.pool2.impl.GenericObjectPool;
import org.apache.commons.pool2.impl.GenericObjectPoolConfig;
import org.apache.hadoop.conf.Configuration;
import org.apache.sentry.core.common.exception.SentryUserException;
+import org.apache.sentry.core.common.transport.SentryClientInvocationHandler;
import org.apache.sentry.provider.db.service.thrift.SentryPolicyServiceClient;
-import org.apache.sentry.provider.db.service.thrift.ThriftUtil;
+import org.apache.sentry.core.common.utils.ThriftUtil;
import org.apache.sentry.service.thrift.ServiceConstants.ClientConfig;
import org.apache.thrift.transport.TTransportException;
import org.slf4j.Logger;
http://git-wip-us.apache.org/repos/asf/sentry/blob/e3d859a9/sentry-provider/sentry-provider-db/src/main/java/org/apache/sentry/service/thrift/RetryClientInvocationHandler.java
----------------------------------------------------------------------
diff --git a/sentry-provider/sentry-provider-db/src/main/java/org/apache/sentry/service/thrift/RetryClientInvocationHandler.java b/sentry-provider/sentry-provider-db/src/main/java/org/apache/sentry/service/thrift/RetryClientInvocationHandler.java
deleted file mode 100644
index 2f38198..0000000
--- a/sentry-provider/sentry-provider-db/src/main/java/org/apache/sentry/service/thrift/RetryClientInvocationHandler.java
+++ /dev/null
@@ -1,146 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.sentry.service.thrift;
-
-import com.google.common.base.Preconditions;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.sentry.core.common.exception.SentryUserException;
-import org.apache.sentry.provider.db.service.thrift.SentryPolicyServiceClientDefaultImpl;
-import org.apache.thrift.transport.TTransportException;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.io.IOException;
-import java.lang.reflect.InvocationTargetException;
-import java.lang.reflect.Method;
-
-/**
- * The RetryClientInvocationHandler is a proxy class for handling thrift calls for non-pool
- * model. Currently only one client connection is allowed, and it's using lazy connection.
- * The client is not connected to the sentry server until there is any rpc call.
- * <p>
- * For every rpc call, if the client is not connected, it will first connect to a sentry
- * server, and then do the thrift call to the connected sentry server, which will execute
- * the requested method and return back the response. If it is failed with connection
- * problem, it will close the current connection and retry (reconnect and resend the
- * thrift call) no more than rpcRetryTotal times. If the client is already connected, it
- * will reuse the existing connection, and do the thrift call.
- * <p>
- * During reconnection, it will first cycle through all the available sentry servers, and
- * then retry the whole server list no more than connectionFullRetryTotal times. In this
- * case, it won't introduce more latency when some server fails. Also to prevent all
- * clients connecting to the same server, it will reorder the endpoints randomly after a
- * full retry.
- * <p>
- * TODO: allow multiple client connections
- */
-class RetryClientInvocationHandler extends SentryClientInvocationHandler{
- private static final Logger LOGGER =
- LoggerFactory.getLogger(RetryClientInvocationHandler.class);
- private final Configuration conf;
- private SentryPolicyServiceClientDefaultImpl client = null;
- private final int rpcRetryTotal;
-
- /**
- * Initialize the sentry configurations, including rpc retry count and client connection
- * configs for SentryPolicyServiceClientDefaultImpl
- */
- RetryClientInvocationHandler(Configuration conf) throws Exception {
- this.conf = conf;
- Preconditions.checkNotNull(this.conf, "Configuration object cannot be null");
- this.rpcRetryTotal = conf.getInt(ServiceConstants.ClientConfig.SENTRY_RPC_RETRY_TOTAL,
- ServiceConstants.ClientConfig.SENTRY_RPC_RETRY_TOTAL_DEFAULT);
- client = new SentryPolicyServiceClientDefaultImpl(conf);
- }
-
- /**
- * For every rpc call, if the client is not connected, it will first connect to a sentry
- * server, and then do the thrift call to the connected sentry server, which will
- * execute the requested method and return back the response. If it is failed with
- * connection problem, it will close the current connection, and retry (reconnect and
- * resend the thrift call) no more than rpcRetryTotal times. Throw SentryUserException
- * if failed retry after rpcRetryTotal times.
- * if it is failed with other exception, method would just re-throw the exception.
- * Synchronized it for thread safety.
- */
- @Override
- synchronized Object invokeImpl(Object proxy, Method method, Object[] args) throws Exception {
- int retryCount = 0;
- Exception lastExc = null;
-
- while (retryCount < rpcRetryTotal) {
- // Connect to a sentry server if not connected yet.
- try {
- client.connectWithRetry();
- } catch (IOException e) {
- // Increase the retry num
- // Retry when the exception is caused by connection problem.
- retryCount++;
- lastExc = e;
- close();
- continue;
- } catch (Exception e) {
- close();
- throw e;
- }
-
- // do the thrift call
- try {
- return method.invoke(client, args);
- } catch (InvocationTargetException e) {
- // Get the target exception, check if SentryUserException or TTransportException is wrapped.
- // TTransportException means there has connection problem with the pool.
- Throwable targetException = e.getCause();
- if (targetException instanceof SentryUserException) {
- Throwable sentryTargetException = targetException.getCause();
- // If there has connection problem, eg, invalid connection if the service restarted,
- // sentryTargetException instanceof TTransportException = true.
- if (sentryTargetException instanceof TTransportException) {
- // Retry when the exception is caused by connection problem.
- lastExc = new TTransportException(sentryTargetException);
- LOGGER.debug("Got TTransportException when do the thrift call ", lastExc);
- } else {
- // The exception is thrown by thrift call, eg, SentryAccessDeniedException.
- // Do not need to reconnect to the sentry server.
- throw (SentryUserException) targetException;
- }
- } else {
- throw e;
- }
- }
-
- // Increase the retry num
- retryCount++;
-
- // For connection problem, it will close the current connection, and reconnect to
- // an available sentry server and redo the thrift call.
- close();
- }
- // Throw the exception as reaching the max rpc retry num.
- LOGGER.error(String.format("failed after %d retries ", rpcRetryTotal), lastExc);
- throw new SentryUserException(
- String.format("failed after %d retries ", rpcRetryTotal), lastExc);
- }
-
- @Override
- public void close() {
- client.close();
- LOGGER.debug("Close the current client connection");
- }
-
-}
http://git-wip-us.apache.org/repos/asf/sentry/blob/e3d859a9/sentry-provider/sentry-provider-db/src/main/java/org/apache/sentry/service/thrift/SentryClientInvocationHandler.java
----------------------------------------------------------------------
diff --git a/sentry-provider/sentry-provider-db/src/main/java/org/apache/sentry/service/thrift/SentryClientInvocationHandler.java b/sentry-provider/sentry-provider-db/src/main/java/org/apache/sentry/service/thrift/SentryClientInvocationHandler.java
deleted file mode 100644
index b8c7f23..0000000
--- a/sentry-provider/sentry-provider-db/src/main/java/org/apache/sentry/service/thrift/SentryClientInvocationHandler.java
+++ /dev/null
@@ -1,54 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.sentry.service.thrift;
-
-import java.lang.reflect.InvocationHandler;
-import java.lang.reflect.Method;
-
-/**
- * SentryClientInvocationHandler is the base interface for all the InvocationHandler in SENTRY
- */
-public abstract class SentryClientInvocationHandler implements InvocationHandler {
-
- /**
- * Close the InvocationHandler: An InvocationHandler may create some contexts,
- * these contexts should be close when the method "close()" of client be called.
- */
- @Override
- public final Object invoke(Object proxy, Method method, Object[] args) throws Exception {
- // close() doesn't throw exception we supress that in case of connection
- // loss. Changing SentryPolicyServiceClient#close() to throw an
- // exception would be a backward incompatible change for Sentry clients.
- if ("close".equals(method.getName()) && null == args) {
- close();
- return null;
- }
- return invokeImpl(proxy, method, args);
- }
-
- /**
- * Subclass should implement this method for special function
- */
- abstract Object invokeImpl(Object proxy, Method method, Object[] args) throws Exception;
-
- /**
- * An abstract method "close", an invocationHandler should close its contexts at here.
- */
- public abstract void close();
-
-}
http://git-wip-us.apache.org/repos/asf/sentry/blob/e3d859a9/sentry-provider/sentry-provider-db/src/main/java/org/apache/sentry/service/thrift/SentryServiceClientFactory.java
----------------------------------------------------------------------
diff --git a/sentry-provider/sentry-provider-db/src/main/java/org/apache/sentry/service/thrift/SentryServiceClientFactory.java b/sentry-provider/sentry-provider-db/src/main/java/org/apache/sentry/service/thrift/SentryServiceClientFactory.java
index f822497..745dc4c 100644
--- a/sentry-provider/sentry-provider-db/src/main/java/org/apache/sentry/service/thrift/SentryServiceClientFactory.java
+++ b/sentry-provider/sentry-provider-db/src/main/java/org/apache/sentry/service/thrift/SentryServiceClientFactory.java
@@ -22,6 +22,7 @@ import java.lang.reflect.Proxy;
import org.apache.hadoop.conf.Configuration;
+import org.apache.sentry.core.common.transport.RetryClientInvocationHandler;
import org.apache.sentry.provider.db.service.thrift.SentryPolicyServiceClient;
import org.apache.sentry.provider.db.service.thrift.SentryPolicyServiceClientDefaultImpl;
import org.apache.sentry.service.thrift.ServiceConstants.ClientConfig;
@@ -43,7 +44,8 @@ public final class SentryServiceClientFactory {
return (SentryPolicyServiceClient) Proxy
.newProxyInstance(SentryPolicyServiceClientDefaultImpl.class.getClassLoader(),
SentryPolicyServiceClientDefaultImpl.class.getInterfaces(),
- new RetryClientInvocationHandler(conf));
+ new RetryClientInvocationHandler(conf,
+ new SentryPolicyServiceClientDefaultImpl(conf)));
}
}
}
http://git-wip-us.apache.org/repos/asf/sentry/blob/e3d859a9/sentry-provider/sentry-provider-db/src/main/java/org/apache/sentry/service/thrift/ServiceConstants.java
----------------------------------------------------------------------
diff --git a/sentry-provider/sentry-provider-db/src/main/java/org/apache/sentry/service/thrift/ServiceConstants.java b/sentry-provider/sentry-provider-db/src/main/java/org/apache/sentry/service/thrift/ServiceConstants.java
index d3a68c9..834ed41 100644
--- a/sentry-provider/sentry-provider-db/src/main/java/org/apache/sentry/service/thrift/ServiceConstants.java
+++ b/sentry-provider/sentry-provider-db/src/main/java/org/apache/sentry/service/thrift/ServiceConstants.java
@@ -25,7 +25,6 @@ import javax.security.sasl.Sasl;
import com.google.common.base.Splitter;
import com.google.common.collect.ImmutableMap;
import org.apache.sentry.provider.db.service.thrift.SentryMetrics;
-import org.apache.sentry.provider.db.service.thrift.SentryPolicyServiceClientDefaultImpl;
public class ServiceConstants {
@@ -227,12 +226,10 @@ public class ServiceConstants {
}
public static class ClientConfig {
- public static final ImmutableMap<String, String> SASL_PROPERTIES = ServiceConstants.SASL_PROPERTIES;
public static final String SERVER_RPC_PORT = "sentry.service.client.server.rpc-port";
public static final int SERVER_RPC_PORT_DEFAULT = ServerConfig.RPC_PORT_DEFAULT;
public static final String SERVER_RPC_ADDRESS = "sentry.service.client.server.rpc-address";
public static final String SERVER_RPC_CONN_TIMEOUT = "sentry.service.client.server.rpc-connection-timeout";
- public static final int SERVER_RPC_CONN_TIMEOUT_DEFAULT = 200000;
// HA configuration
public static final String SENTRY_HA_ENABLED = "sentry.ha.enabled";
@@ -256,20 +253,6 @@ public class ServiceConstants {
public static final String SENTRY_POOL_RETRY_TOTAL = "sentry.service.client.connection.pool.retry-total";
public static final int SENTRY_POOL_RETRY_TOTAL_DEFAULT = 3;
- /**
- * full retry num for getting the connection in non-pool model
- * In a full retry, it will cycle through all available sentry servers
- * {@link SentryPolicyServiceClientDefaultImpl#connectWithRetry()}
- */
- public static final String SENTRY_FULL_RETRY_TOTAL = "sentry.service.client.connection.full.retry-total";
- public static final int SENTRY_FULL_RETRY_TOTAL_DEFAULT = 2;
- /**
- * max retry num for client rpc
- * {@link RetryClientInvocationHandler#invokeImpl(Object, Method, Object[])}
- */
- public static final String SENTRY_RPC_RETRY_TOTAL = "sentry.service.client.rpc.retry-total";
- public static final int SENTRY_RPC_RETRY_TOTAL_DEFAULT = 3;
-
// max message size for thrift messages
public static final String SENTRY_POLICY_CLIENT_THRIFT_MAX_MESSAGE_SIZE = "sentry.policy.client.thrift.max.message.size";
public static final long SENTRY_POLICY_CLIENT_THRIFT_MAX_MESSAGE_SIZE_DEFAULT = 100 * 1024 * 1024;
http://git-wip-us.apache.org/repos/asf/sentry/blob/e3d859a9/sentry-provider/sentry-provider-db/src/test/java/org/apache/sentry/provider/db/generic/service/thrift/TestSentryGenericPolicyProcessor.java
----------------------------------------------------------------------
diff --git a/sentry-provider/sentry-provider-db/src/test/java/org/apache/sentry/provider/db/generic/service/thrift/TestSentryGenericPolicyProcessor.java b/sentry-provider/sentry-provider-db/src/test/java/org/apache/sentry/provider/db/generic/service/thrift/TestSentryGenericPolicyProcessor.java
index d4bf435..ac93e25 100644
--- a/sentry-provider/sentry-provider-db/src/test/java/org/apache/sentry/provider/db/generic/service/thrift/TestSentryGenericPolicyProcessor.java
+++ b/sentry-provider/sentry-provider-db/src/test/java/org/apache/sentry/provider/db/generic/service/thrift/TestSentryGenericPolicyProcessor.java
@@ -40,7 +40,7 @@ import org.apache.sentry.provider.db.generic.service.persistent.SentryStoreLayer
import org.apache.sentry.provider.db.generic.service.persistent.PrivilegeObject.Builder;
import org.apache.sentry.provider.db.service.model.MSentryGMPrivilege;
import org.apache.sentry.provider.db.service.model.MSentryRole;
-import org.apache.sentry.provider.db.service.thrift.PolicyStoreConstants;
+import org.apache.sentry.core.common.utils.PolicyStoreConstants;
import org.apache.sentry.service.thrift.ServiceConstants.ServerConfig;
import org.apache.sentry.service.thrift.Status;
import org.apache.sentry.service.thrift.TSentryResponseStatus;
[3/5] sentry git commit: SENTRY-1593
Posted by ak...@apache.org.
http://git-wip-us.apache.org/repos/asf/sentry/blob/e3d859a9/sentry-provider/sentry-provider-db/src/main/java/org/apache/sentry/provider/db/generic/service/thrift/SentryGenericServiceClientDefaultImpl.java
----------------------------------------------------------------------
diff --git a/sentry-provider/sentry-provider-db/src/main/java/org/apache/sentry/provider/db/generic/service/thrift/SentryGenericServiceClientDefaultImpl.java b/sentry-provider/sentry-provider-db/src/main/java/org/apache/sentry/provider/db/generic/service/thrift/SentryGenericServiceClientDefaultImpl.java
index 075983e..9bbd736 100644
--- a/sentry-provider/sentry-provider-db/src/main/java/org/apache/sentry/provider/db/generic/service/thrift/SentryGenericServiceClientDefaultImpl.java
+++ b/sentry-provider/sentry-provider-db/src/main/java/org/apache/sentry/provider/db/generic/service/thrift/SentryGenericServiceClientDefaultImpl.java
@@ -6,9 +6,9 @@
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
@@ -19,25 +19,16 @@ package org.apache.sentry.provider.db.generic.service.thrift;
import java.io.IOException;
import java.net.InetSocketAddress;
-import java.security.PrivilegedExceptionAction;
import java.util.*;
-import javax.security.auth.callback.CallbackHandler;
-import javax.security.sasl.Sasl;
-
-import com.google.common.collect.ImmutableMap;
import org.apache.hadoop.conf.Configuration;
+
import static org.apache.hadoop.fs.CommonConfigurationKeysPublic.HADOOP_SECURITY_AUTHENTICATION;
-import org.apache.hadoop.net.NetUtils;
-import org.apache.hadoop.security.SaslRpcServer;
-import org.apache.hadoop.security.SaslRpcServer.AuthMethod;
-import org.apache.hadoop.security.SecurityUtil;
-import org.apache.hadoop.security.UserGroupInformation;
-import org.apache.sentry.core.common.exception.MissingConfigurationException;
+
import org.apache.sentry.core.common.exception.SentryUserException;
import org.apache.sentry.core.common.ActiveRoleSet;
import org.apache.sentry.core.common.Authorizable;
-import org.apache.sentry.core.common.transport.SentryPolicyClientTransportConfig;
+import org.apache.sentry.core.common.transport.SentryServiceClientTransportDefaultImpl;
import org.apache.sentry.core.common.utils.SentryConstants;
import org.apache.sentry.core.model.db.AccessConstants;
import org.apache.sentry.service.thrift.ServiceConstants;
@@ -46,146 +37,63 @@ import org.apache.sentry.service.thrift.sentry_common_serviceConstants;
import org.apache.thrift.TException;
import org.apache.thrift.protocol.TBinaryProtocol;
import org.apache.thrift.protocol.TMultiplexedProtocol;
-import org.apache.thrift.transport.TSaslClientTransport;
-import org.apache.thrift.transport.TSocket;
-import org.apache.thrift.transport.TTransport;
-import org.apache.thrift.transport.TTransportException;
+
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import com.google.common.base.Preconditions;
import com.google.common.collect.Lists;
-public class SentryGenericServiceClientDefaultImpl implements SentryGenericServiceClient {
- private final Configuration conf;
- private final InetSocketAddress serverAddress;
- private final boolean kerberos;
- private final String[] serverPrincipalParts;
+/*
+ Sentry Generic Service Client
+
+ The public implementation of SentryGenericServiceClient.
+ TODO(kalyan) A Sentry Client in which all the operations are synchronized for thread safety
+ Note: When using this client, if there is an exception in RPC, socket can get into an inconsistent state.
+ So it is important to close and re-open the transport so that new socket is used.
+ */
+
+public class SentryGenericServiceClientDefaultImpl extends SentryServiceClientTransportDefaultImpl implements SentryGenericServiceClient {
private SentryGenericPolicyService.Client client;
- private TTransport transport;
- private int connectionTimeout;
private static final Logger LOGGER = LoggerFactory
- .getLogger(SentryGenericServiceClientDefaultImpl.class);
+ .getLogger(SentryGenericServiceClientDefaultImpl.class);
private static final String THRIFT_EXCEPTION_MESSAGE = "Thrift exception occured ";
- private final SentryPolicyClientTransportConfig transportConfig = new SentryPolicyClientTransportConfig();
-
- private static final ImmutableMap<String, String> SASL_PROPERTIES =
- ImmutableMap.of(Sasl.SERVER_AUTH, "true", Sasl.QOP, "auth-conf");
-
- /**
- * This transport wraps the Sasl transports to set up the right UGI context for open().
- */
- public static class UgiSaslClientTransport extends TSaslClientTransport {
- protected UserGroupInformation ugi = null;
-
- public UgiSaslClientTransport(String mechanism, String authorizationId,
- String protocol, String serverName, Map<String, String> props,
- CallbackHandler cbh, TTransport transport, boolean wrapUgi, Configuration conf)
- throws IOException {
- super(mechanism, authorizationId, protocol, serverName, props, cbh,
- transport);
- if (wrapUgi) {
- // If we don't set the configuration, the UGI will be created based on
- // what's on the classpath, which may lack the kerberos changes we require
- UserGroupInformation.setConfiguration(conf);
- ugi = UserGroupInformation.getLoginUser();
- }
- }
-
- // open the SASL transport with using the current UserGroupInformation
- // This is needed to get the current login context stored
- @Override
- public void open() throws TTransportException {
- if (ugi == null) {
- baseOpen();
- } else {
- try {
- if (ugi.isFromKeytab()) {
- ugi.checkTGTAndReloginFromKeytab();
- }
- ugi.doAs(new PrivilegedExceptionAction<Void>() {
- public Void run() throws TTransportException {
- baseOpen();
- return null;
- }
- });
- } catch (IOException e) {
- throw new TTransportException("Failed to open SASL transport: " + e.getMessage(), e);
- } catch (InterruptedException e) {
- throw new TTransportException(
- "Interrupted while opening underlying transport: " + e.getMessage(), e);
- }
- }
- }
- private void baseOpen() throws TTransportException {
- super.open();
+ public SentryGenericServiceClientDefaultImpl(Configuration conf) throws IOException {
+ super(conf, sentryClientType.POLICY_CLIENT);
+ if (kerberos) {
+ // since the client uses hadoop-auth, we need to set kerberos in
+ // hadoop-auth if we plan to use kerberos
+ conf.set(HADOOP_SECURITY_AUTHENTICATION, SentryConstants.KERBEROS_MODE);
}
}
- public SentryGenericServiceClientDefaultImpl(Configuration conf) throws Exception {
- // copy the configuration because we may make modifications to it.
- this.conf = new Configuration(conf);
-
- Preconditions.checkNotNull(this.conf, "Configuration object cannot be null");
- try {
- this.serverAddress = NetUtils.createSocketAddr(
- transportConfig.getSentryServerRpcAddress(conf),
- transportConfig.getServerRpcPort(conf));
-
-
- this.connectionTimeout = transportConfig.getServerRpcConnTimeoutInMs(conf);
- kerberos = transportConfig.isKerberosEnabled(conf);
- transport = new TSocket(serverAddress.getHostName(),
- serverAddress.getPort(), connectionTimeout);
- if (kerberos) {
- String serverPrincipal = transportConfig.getSentryPrincipal(conf);
- // since the client uses hadoop-auth, we need to set kerberos in
- // hadoop-auth if we plan to use kerberos
- conf.set(HADOOP_SECURITY_AUTHENTICATION, SentryConstants.KERBEROS_MODE);
-
- // Resolve server host in the same way as we are doing on server side
- serverPrincipal = SecurityUtil.getServerPrincipal(serverPrincipal, serverAddress.getAddress());
- LOGGER.debug("Using server kerberos principal: " + serverPrincipal);
-
- serverPrincipalParts = SaslRpcServer.splitKerberosName(serverPrincipal);
- Preconditions.checkArgument(serverPrincipalParts.length == 3,
- "Kerberos principal should have 3 parts: " + serverPrincipal);
- boolean wrapUgi = transportConfig.useUserGroupInformation(conf);
- transport = new UgiSaslClientTransport(AuthMethod.KERBEROS.getMechanismName(),
- null, serverPrincipalParts[0], serverPrincipalParts[1],
- SASL_PROPERTIES, null, transport, wrapUgi, conf);
- } else {
- serverPrincipalParts = null;
- }
- transport.open();
- } catch (TTransportException e) {
- throw new IOException("Transport exception while opening transport: " + e.getMessage(), e);
- } catch (MissingConfigurationException e) {
- throw new RuntimeException("Client Creation Failed: " + e.getMessage(), e);
- }
-
- LOGGER.debug("Successfully opened transport: " + transport + " to " + serverAddress);
+ /**
+ * Connect to the specified socket address and then use the new socket
+ * to construct new thrift client.
+ *
+ * @param serverAddress: socket address to which the client should connect.
+ * @throws IOException
+ */
+ public void connect(InetSocketAddress serverAddress) throws IOException {
+ super.connect(serverAddress);
long maxMessageSize = conf.getLong(ServiceConstants.ClientConfig.SENTRY_POLICY_CLIENT_THRIFT_MAX_MESSAGE_SIZE,
- ServiceConstants.ClientConfig.SENTRY_POLICY_CLIENT_THRIFT_MAX_MESSAGE_SIZE_DEFAULT);
+ ServiceConstants.ClientConfig.SENTRY_POLICY_CLIENT_THRIFT_MAX_MESSAGE_SIZE_DEFAULT);
TMultiplexedProtocol protocol = new TMultiplexedProtocol(
- new TBinaryProtocol(transport, maxMessageSize, maxMessageSize, true, true),
- SentryGenericPolicyProcessor.SENTRY_GENERIC_SERVICE_NAME);
+ new TBinaryProtocol(transport, maxMessageSize, maxMessageSize, true, true),
+ SentryGenericPolicyProcessor.SENTRY_GENERIC_SERVICE_NAME);
client = new SentryGenericPolicyService.Client(protocol);
LOGGER.debug("Successfully created client");
}
-
-
-
/**
* Create a sentry role
+ *
* @param requestorUserName: user on whose behalf the request is issued
- * @param roleName: Name of the role
- * @param component: The request is issued to which component
+ * @param roleName: Name of the role
+ * @param component: The request is issued to which component
* @throws SentryUserException
*/
public synchronized void createRole(String requestorUserName, String roleName, String component)
- throws SentryUserException {
+ throws SentryUserException {
TCreateSentryRoleRequest request = new TCreateSentryRoleRequest();
request.setProtocol_version(sentry_common_serviceConstants.TSENTRY_SERVICE_V2);
request.setRequestorUserName(requestorUserName);
@@ -219,26 +127,27 @@ public class SentryGenericServiceClientDefaultImpl implements SentryGenericServi
/**
* Drop a sentry role
+ *
* @param requestorUserName: user on whose behalf the request is issued
- * @param roleName: Name of the role
- * @param component: The request is issued to which component
+ * @param roleName: Name of the role
+ * @param component: The request is issued to which component
* @throws SentryUserException
*/
public void dropRole(String requestorUserName,
- String roleName, String component)
- throws SentryUserException {
+ String roleName, String component)
+ throws SentryUserException {
dropRole(requestorUserName, roleName, component, false);
}
public void dropRoleIfExists(String requestorUserName,
- String roleName, String component)
- throws SentryUserException {
+ String roleName, String component)
+ throws SentryUserException {
dropRole(requestorUserName, roleName, component, true);
}
private void dropRole(String requestorUserName,
- String roleName, String component , boolean ifExists)
- throws SentryUserException {
+ String roleName, String component, boolean ifExists)
+ throws SentryUserException {
TDropSentryRoleRequest request = new TDropSentryRoleRequest();
request.setProtocol_version(sentry_common_serviceConstants.TSENTRY_SERVICE_V2);
request.setRequestorUserName(requestorUserName);
@@ -258,14 +167,15 @@ public class SentryGenericServiceClientDefaultImpl implements SentryGenericServi
/**
* add a sentry role to groups.
+ *
* @param requestorUserName: user on whose behalf the request is issued
- * @param roleName: Name of the role
- * @param component: The request is issued to which component
- * @param groups: The name of groups
+ * @param roleName: Name of the role
+ * @param component: The request is issued to which component
+ * @param groups: The name of groups
* @throws SentryUserException
*/
public void addRoleToGroups(String requestorUserName, String roleName,
- String component, Set<String> groups) throws SentryUserException {
+ String component, Set<String> groups) throws SentryUserException {
TAlterSentryRoleAddGroupsRequest request = new TAlterSentryRoleAddGroupsRequest();
request.setProtocol_version(sentry_common_serviceConstants.TSENTRY_SERVICE_V2);
request.setRequestorUserName(requestorUserName);
@@ -283,14 +193,15 @@ public class SentryGenericServiceClientDefaultImpl implements SentryGenericServi
/**
* delete a sentry role from groups.
+ *
* @param requestorUserName: user on whose behalf the request is issued
- * @param roleName: Name of the role
- * @param component: The request is issued to which component
- * @param groups: The name of groups
+ * @param roleName: Name of the role
+ * @param component: The request is issued to which component
+ * @param groups: The name of groups
* @throws SentryUserException
*/
public void deleteRoleToGroups(String requestorUserName, String roleName,
- String component, Set<String> groups) throws SentryUserException {
+ String component, Set<String> groups) throws SentryUserException {
TAlterSentryRoleDeleteGroupsRequest request = new TAlterSentryRoleDeleteGroupsRequest();
request.setProtocol_version(sentry_common_serviceConstants.TSENTRY_SERVICE_V2);
request.setRequestorUserName(requestorUserName);
@@ -308,14 +219,15 @@ public class SentryGenericServiceClientDefaultImpl implements SentryGenericServi
/**
* grant privilege
+ *
* @param requestorUserName: user on whose behalf the request is issued
- * @param roleName: Name of the role
- * @param component: The request is issued to which component
+ * @param roleName: Name of the role
+ * @param component: The request is issued to which component
* @param privilege
* @throws SentryUserException
*/
public void grantPrivilege(String requestorUserName, String roleName,
- String component, TSentryPrivilege privilege) throws SentryUserException {
+ String component, TSentryPrivilege privilege) throws SentryUserException {
TAlterSentryRoleGrantPrivilegeRequest request = new TAlterSentryRoleGrantPrivilegeRequest();
request.setProtocol_version(sentry_common_serviceConstants.TSENTRY_SERVICE_V2);
request.setComponent(component);
@@ -333,14 +245,15 @@ public class SentryGenericServiceClientDefaultImpl implements SentryGenericServi
/**
* revoke privilege
+ *
* @param requestorUserName: user on whose behalf the request is issued
- * @param roleName: Name of the role
- * @param component: The request is issued to which component
+ * @param roleName: Name of the role
+ * @param component: The request is issued to which component
* @param privilege
* @throws SentryUserException
*/
public void revokePrivilege(String requestorUserName, String roleName,
- String component, TSentryPrivilege privilege) throws SentryUserException {
+ String component, TSentryPrivilege privilege) throws SentryUserException {
TAlterSentryRoleRevokePrivilegeRequest request = new TAlterSentryRoleRevokePrivilegeRequest();
request.setProtocol_version(sentry_common_serviceConstants.TSENTRY_SERVICE_V2);
request.setComponent(component);
@@ -358,13 +271,14 @@ public class SentryGenericServiceClientDefaultImpl implements SentryGenericServi
/**
* drop privilege
+ *
* @param requestorUserName: user on whose behalf the request is issued
- * @param component: The request is issued to which component
+ * @param component: The request is issued to which component
* @param privilege
* @throws SentryUserException
*/
- public void dropPrivilege(String requestorUserName,String component,
- TSentryPrivilege privilege) throws SentryUserException {
+ public void dropPrivilege(String requestorUserName, String component,
+ TSentryPrivilege privilege) throws SentryUserException {
TDropPrivilegesRequest request = new TDropPrivilegesRequest();
request.setProtocol_version(sentry_common_serviceConstants.TSENTRY_SERVICE_V2);
request.setComponent(component);
@@ -381,18 +295,19 @@ public class SentryGenericServiceClientDefaultImpl implements SentryGenericServi
/**
* rename privilege
+ *
* @param requestorUserName: user on whose behalf the request is issued
- * @param component: The request is issued to which component
- * @param serviceName: The Authorizable belongs to which service
+ * @param component: The request is issued to which component
+ * @param serviceName: The Authorizable belongs to which service
* @param oldAuthorizables
* @param newAuthorizables
* @throws SentryUserException
*/
public void renamePrivilege(String requestorUserName, String component,
- String serviceName, List<? extends Authorizable> oldAuthorizables,
- List<? extends Authorizable> newAuthorizables) throws SentryUserException {
+ String serviceName, List<? extends Authorizable> oldAuthorizables,
+ List<? extends Authorizable> newAuthorizables) throws SentryUserException {
if (oldAuthorizables == null || oldAuthorizables.isEmpty()
- || newAuthorizables == null || newAuthorizables.isEmpty()) {
+ || newAuthorizables == null || newAuthorizables.isEmpty()) {
throw new SentryUserException("oldAuthorizables or newAuthorizables can not be null or empty");
}
@@ -423,17 +338,18 @@ public class SentryGenericServiceClientDefaultImpl implements SentryGenericServi
/**
* Gets sentry role objects for a given groupName using the Sentry service
+ *
* @param requestorUserName : user on whose behalf the request is issued
- * @param groupName : groupName to look up ( if null returns all roles for groups related to requestorUserName)
- * @param component: The request is issued to which component
+ * @param groupName : groupName to look up ( if null returns all roles for groups related to requestorUserName)
+ * @param component: The request is issued to which component
* @return Set of thrift sentry role objects
* @throws SentryUserException
*/
public synchronized Set<TSentryRole> listRolesByGroupName(
- String requestorUserName,
- String groupName,
- String component)
- throws SentryUserException {
+ String requestorUserName,
+ String groupName,
+ String component)
+ throws SentryUserException {
TListSentryRolesRequest request = new TListSentryRolesRequest();
request.setProtocol_version(sentry_common_serviceConstants.TSENTRY_SERVICE_V2);
request.setRequestorUserName(requestorUserName);
@@ -450,29 +366,30 @@ public class SentryGenericServiceClientDefaultImpl implements SentryGenericServi
}
public Set<TSentryRole> listUserRoles(String requestorUserName, String component)
- throws SentryUserException {
+ throws SentryUserException {
return listRolesByGroupName(requestorUserName, AccessConstants.ALL, component);
}
public Set<TSentryRole> listAllRoles(String requestorUserName, String component)
- throws SentryUserException {
+ throws SentryUserException {
return listRolesByGroupName(requestorUserName, null, component);
}
/**
* Gets sentry privileges for a given roleName and Authorizable Hirerchys using the Sentry service
+ *
* @param requestorUserName: user on whose behalf the request is issued
* @param roleName:
- * @param component: The request is issued to which component
+ * @param component: The request is issued to which component
* @param serviceName
* @param authorizables
* @return
* @throws SentryUserException
*/
public Set<TSentryPrivilege> listPrivilegesByRoleName(
- String requestorUserName, String roleName, String component,
- String serviceName, List<? extends Authorizable> authorizables)
- throws SentryUserException {
+ String requestorUserName, String roleName, String component,
+ String serviceName, List<? extends Authorizable> authorizables)
+ throws SentryUserException {
TListSentryPrivilegesRequest request = new TListSentryPrivilegesRequest();
request.setProtocol_version(sentry_common_serviceConstants.TSENTRY_SERVICE_V2);
request.setComponent(component);
@@ -498,24 +415,25 @@ public class SentryGenericServiceClientDefaultImpl implements SentryGenericServi
}
public Set<TSentryPrivilege> listPrivilegesByRoleName(
- String requestorUserName, String roleName, String component,
- String serviceName) throws SentryUserException {
+ String requestorUserName, String roleName, String component,
+ String serviceName) throws SentryUserException {
return listPrivilegesByRoleName(requestorUserName, roleName, component, serviceName, null);
}
/**
* get sentry permissions from provider as followings:
+ *
+ * @throws SentryUserException
* @param: component: The request is issued to which component
* @param: serviceName: The privilege belongs to which service
* @param: roleSet
* @param: groupNames
* @param: the authorizables
* @returns the set of permissions
- * @throws SentryUserException
*/
public Set<String> listPrivilegesForProvider(String component,
- String serviceName, ActiveRoleSet roleSet, Set<String> groups,
- List<? extends Authorizable> authorizables) throws SentryUserException {
+ String serviceName, ActiveRoleSet roleSet, Set<String> groups,
+ List<? extends Authorizable> authorizables) throws SentryUserException {
TSentryActiveRoleSet thriftRoleSet = new TSentryActiveRoleSet(roleSet.isAll(), roleSet.getRoles());
TListSentryPrivilegesForProviderRequest request = new TListSentryPrivilegesForProviderRequest();
request.setProtocol_version(sentry_common_serviceConstants.TSENTRY_SERVICE_V2);
@@ -548,20 +466,19 @@ public class SentryGenericServiceClientDefaultImpl implements SentryGenericServi
* Get sentry privileges based on valid active roles and the authorize objects. Note that
* it is client responsibility to ensure the requestor username, etc. is not impersonated.
*
- * @param component: The request respond to which component.
- * @param serviceName: The name of service.
+ * @param component: The request respond to which component.
+ * @param serviceName: The name of service.
* @param requestorUserName: The requestor user name.
- * @param authorizablesSet: The set of authorize objects. One authorize object is represented
- * as a string. e.g resourceType1=resourceName1->resourceType2=resourceName2->resourceType3=resourceName3.
- * @param groups: The requested groups.
- * @param roleSet: The active roles set.
- *
- * @returns The mapping of authorize objects and TSentryPrivilegeMap(<role, set<privileges>).
+ * @param authorizablesSet: The set of authorize objects. One authorize object is represented
+ * as a string. e.g resourceType1=resourceName1->resourceType2=resourceName2->resourceType3=resourceName3.
+ * @param groups: The requested groups.
+ * @param roleSet: The active roles set.
* @throws SentryUserException
+ * @returns The mapping of authorize objects and TSentryPrivilegeMap(<role, set<privileges>).
*/
public Map<String, TSentryPrivilegeMap> listPrivilegsbyAuthorizable(String component,
- String serviceName, String requestorUserName, Set<String> authorizablesSet,
- Set<String> groups, ActiveRoleSet roleSet) throws SentryUserException {
+ String serviceName, String requestorUserName, Set<String> authorizablesSet,
+ Set<String> groups, ActiveRoleSet roleSet) throws SentryUserException {
TListSentryPrivilegesByAuthRequest request = new TListSentryPrivilegesByAuthRequest();
@@ -589,12 +506,4 @@ public class SentryGenericServiceClientDefaultImpl implements SentryGenericServi
throw new SentryUserException(THRIFT_EXCEPTION_MESSAGE, e);
}
}
-
- @Override
- public void close() {
- if (transport != null) {
- transport.close();
- }
- }
-
}
http://git-wip-us.apache.org/repos/asf/sentry/blob/e3d859a9/sentry-provider/sentry-provider-db/src/main/java/org/apache/sentry/provider/db/generic/service/thrift/SentryGenericServiceClientFactory.java
----------------------------------------------------------------------
diff --git a/sentry-provider/sentry-provider-db/src/main/java/org/apache/sentry/provider/db/generic/service/thrift/SentryGenericServiceClientFactory.java b/sentry-provider/sentry-provider-db/src/main/java/org/apache/sentry/provider/db/generic/service/thrift/SentryGenericServiceClientFactory.java
index 980d930..1c582f0 100644
--- a/sentry-provider/sentry-provider-db/src/main/java/org/apache/sentry/provider/db/generic/service/thrift/SentryGenericServiceClientFactory.java
+++ b/sentry-provider/sentry-provider-db/src/main/java/org/apache/sentry/provider/db/generic/service/thrift/SentryGenericServiceClientFactory.java
@@ -18,6 +18,9 @@
package org.apache.sentry.provider.db.generic.service.thrift;
import org.apache.hadoop.conf.Configuration;
+import org.apache.sentry.core.common.transport.RetryClientInvocationHandler;
+
+import java.lang.reflect.Proxy;
/**
* SentryGenericServiceClientFactory is a public class for the components which using Generic Model to create sentry client.
@@ -28,7 +31,11 @@ public final class SentryGenericServiceClientFactory {
}
public static SentryGenericServiceClient create(Configuration conf) throws Exception {
- return new SentryGenericServiceClientDefaultImpl(conf);
+ return (SentryGenericServiceClient) Proxy
+ .newProxyInstance(SentryGenericServiceClientDefaultImpl.class.getClassLoader(),
+ SentryGenericServiceClientDefaultImpl.class.getInterfaces(),
+ new RetryClientInvocationHandler(conf,
+ new SentryGenericServiceClientDefaultImpl(conf)));
}
}
http://git-wip-us.apache.org/repos/asf/sentry/blob/e3d859a9/sentry-provider/sentry-provider-db/src/main/java/org/apache/sentry/provider/db/log/entity/JsonLogEntityFactory.java
----------------------------------------------------------------------
diff --git a/sentry-provider/sentry-provider-db/src/main/java/org/apache/sentry/provider/db/log/entity/JsonLogEntityFactory.java b/sentry-provider/sentry-provider-db/src/main/java/org/apache/sentry/provider/db/log/entity/JsonLogEntityFactory.java
index f6bb8a5..09f7d13 100644
--- a/sentry-provider/sentry-provider-db/src/main/java/org/apache/sentry/provider/db/log/entity/JsonLogEntityFactory.java
+++ b/sentry-provider/sentry-provider-db/src/main/java/org/apache/sentry/provider/db/log/entity/JsonLogEntityFactory.java
@@ -46,7 +46,7 @@ import org.apache.sentry.provider.db.service.thrift.TDropSentryRoleRequest;
import org.apache.sentry.provider.db.service.thrift.TDropSentryRoleResponse;
import org.apache.sentry.provider.db.service.thrift.TSentryGroup;
import org.apache.sentry.provider.db.service.thrift.TSentryPrivilege;
-import org.apache.sentry.provider.db.service.thrift.ThriftUtil;
+import org.apache.sentry.core.common.utils.ThriftUtil;
import org.apache.sentry.service.thrift.ServiceConstants.ServerConfig;
import org.apache.sentry.service.thrift.Status;
import org.apache.sentry.service.thrift.TSentryResponseStatus;
http://git-wip-us.apache.org/repos/asf/sentry/blob/e3d859a9/sentry-provider/sentry-provider-db/src/main/java/org/apache/sentry/provider/db/service/persistent/TransactionManager.java
----------------------------------------------------------------------
diff --git a/sentry-provider/sentry-provider-db/src/main/java/org/apache/sentry/provider/db/service/persistent/TransactionManager.java b/sentry-provider/sentry-provider-db/src/main/java/org/apache/sentry/provider/db/service/persistent/TransactionManager.java
index fb7c40a..366df85 100644
--- a/sentry-provider/sentry-provider-db/src/main/java/org/apache/sentry/provider/db/service/persistent/TransactionManager.java
+++ b/sentry-provider/sentry-provider-db/src/main/java/org/apache/sentry/provider/db/service/persistent/TransactionManager.java
@@ -187,7 +187,7 @@ public class TransactionManager {
} catch (Exception e) {
retryNum++;
if (retryNum >= transactionRetryMax) {
- String message = "The transaction has reached max retry numbe, r"
+ String message = "The transaction has reached max retry number, "
+ e.getMessage();
LOGGER.error(message, e);
throw new Exception(message, e);
http://git-wip-us.apache.org/repos/asf/sentry/blob/e3d859a9/sentry-provider/sentry-provider-db/src/main/java/org/apache/sentry/provider/db/service/thrift/PolicyStoreConstants.java
----------------------------------------------------------------------
diff --git a/sentry-provider/sentry-provider-db/src/main/java/org/apache/sentry/provider/db/service/thrift/PolicyStoreConstants.java b/sentry-provider/sentry-provider-db/src/main/java/org/apache/sentry/provider/db/service/thrift/PolicyStoreConstants.java
deleted file mode 100644
index 8cf1c1a..0000000
--- a/sentry-provider/sentry-provider-db/src/main/java/org/apache/sentry/provider/db/service/thrift/PolicyStoreConstants.java
+++ /dev/null
@@ -1,32 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.sentry.provider.db.service.thrift;
-
-public final class PolicyStoreConstants {
- public static final String SENTRY_GENERIC_POLICY_NOTIFICATION = "sentry.generic.policy.notification";
- public static final String SENTRY_GENERIC_POLICY_STORE = "sentry.generic.policy.store";
- public static final String SENTRY_GENERIC_POLICY_STORE_DEFAULT =
- "org.apache.sentry.provider.db.generic.service.persistent.DelegateSentryStore";
- public static class PolicyStoreServerConfig {
- public static final String NOTIFICATION_HANDLERS = "sentry.policy.store.notification.handlers";
- }
-
- private PolicyStoreConstants() {
- // Make constructor private to avoid instantiation
- }
-}
http://git-wip-us.apache.org/repos/asf/sentry/blob/e3d859a9/sentry-provider/sentry-provider-db/src/main/java/org/apache/sentry/provider/db/service/thrift/SentryPolicyServiceClient.java
----------------------------------------------------------------------
diff --git a/sentry-provider/sentry-provider-db/src/main/java/org/apache/sentry/provider/db/service/thrift/SentryPolicyServiceClient.java b/sentry-provider/sentry-provider-db/src/main/java/org/apache/sentry/provider/db/service/thrift/SentryPolicyServiceClient.java
index c2b03e5..28c3e35 100644
--- a/sentry-provider/sentry-provider-db/src/main/java/org/apache/sentry/provider/db/service/thrift/SentryPolicyServiceClient.java
+++ b/sentry-provider/sentry-provider-db/src/main/java/org/apache/sentry/provider/db/service/thrift/SentryPolicyServiceClient.java
@@ -25,8 +25,9 @@ import java.util.Set;
import org.apache.sentry.core.common.exception.SentryUserException;
import org.apache.sentry.core.common.ActiveRoleSet;
import org.apache.sentry.core.common.Authorizable;
+import org.apache.sentry.core.common.transport.SentryServiceClient;
-public interface SentryPolicyServiceClient {
+public interface SentryPolicyServiceClient extends SentryServiceClient {
void createRole(String requestorUserName, String roleName) throws SentryUserException;
@@ -208,8 +209,6 @@ public interface SentryPolicyServiceClient {
*/
String getConfigValue(String propertyName, String defaultValue) throws SentryUserException;
- void close();
-
// Import the sentry mapping data with map structure
void importPolicy(Map<String, Map<String, Set<String>>> policyFileMappingData,
String requestorUserName, boolean isOverwriteRole) throws SentryUserException;