You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hive.apache.org by pr...@apache.org on 2017/11/07 01:16:23 UTC
hive git commit: HIVE-16917: HiveServer2 guard rails - Limit
concurrent connections from user (Prasanth Jayachandran reviewed by Thejas, S,
Sergey)
Repository: hive
Updated Branches:
refs/heads/master d7d96658c -> 7195aee93
HIVE-16917: HiveServer2 guard rails - Limit concurrent connections from user (Prasanth Jayachandran reviewed by Thejas, S, Sergey)
Project: http://git-wip-us.apache.org/repos/asf/hive/repo
Commit: http://git-wip-us.apache.org/repos/asf/hive/commit/7195aee9
Tree: http://git-wip-us.apache.org/repos/asf/hive/tree/7195aee9
Diff: http://git-wip-us.apache.org/repos/asf/hive/diff/7195aee9
Branch: refs/heads/master
Commit: 7195aee937f5d6d33497cbf19dd70bc38ed2a92b
Parents: d7d9665
Author: Prasanth Jayachandran <pr...@apache.org>
Authored: Mon Nov 6 17:15:42 2017 -0800
Committer: Prasanth Jayachandran <pr...@apache.org>
Committed: Mon Nov 6 17:15:42 2017 -0800
----------------------------------------------------------------------
.../org/apache/hadoop/hive/conf/HiveConf.java | 11 +
.../cli/session/TestHiveSessionImpl.java | 4 +-
.../service/cli/session/HiveSessionBase.java | 5 +
.../service/cli/session/HiveSessionImpl.java | 15 +-
.../cli/session/HiveSessionImplwithUGI.java | 6 +-
.../service/cli/session/SessionManager.java | 120 ++++++-
.../service/cli/thrift/ThriftCLIService.java | 2 -
.../cli/TestCLIServiceConnectionLimits.java | 337 +++++++++++++++++++
.../session/TestPluggableHiveSessionImpl.java | 10 +-
9 files changed, 491 insertions(+), 19 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/hive/blob/7195aee9/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java
----------------------------------------------------------------------
diff --git a/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java b/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java
index 10b364a..305e9dc 100644
--- a/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java
+++ b/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java
@@ -2455,6 +2455,17 @@ public class HiveConf extends Configuration {
" PERFORMANCE: Execution + Performance logs \n" +
" VERBOSE: All logs" ),
+ // HS2 connections guard rails
+ HIVE_SERVER2_LIMIT_CONNECTIONS_PER_USER("hive.server2.limit.connections.per.user", 0,
+ "Maximum hive server2 connections per user. Any user exceeding this limit will not be allowed to connect. " +
+ "Default=0 does not enforce limits."),
+ HIVE_SERVER2_LIMIT_CONNECTIONS_PER_IPADDRESS("hive.server2.limit.connections.per.ipaddress", 0,
+ "Maximum hive server2 connections per ipaddress. Any ipaddress exceeding this limit will not be allowed " +
+ "to connect. Default=0 does not enforce limits."),
+ HIVE_SERVER2_LIMIT_CONNECTIONS_PER_USER_IPADDRESS("hive.server2.limit.connections.per.user.ipaddress", 0,
+ "Maximum hive server2 connections per user:ipaddress combination. Any user-ipaddress exceeding this limit will " +
+ "not be allowed to connect. Default=0 does not enforce limits."),
+
// Enable metric collection for HiveServer2
HIVE_SERVER2_METRICS_ENABLED("hive.server2.metrics.enabled", false, "Enable metrics on the HiveServer2."),
http://git-wip-us.apache.org/repos/asf/hive/blob/7195aee9/itests/hive-unit/src/test/java/org/apache/hive/service/cli/session/TestHiveSessionImpl.java
----------------------------------------------------------------------
diff --git a/itests/hive-unit/src/test/java/org/apache/hive/service/cli/session/TestHiveSessionImpl.java b/itests/hive-unit/src/test/java/org/apache/hive/service/cli/session/TestHiveSessionImpl.java
index ebcf4a8..1ee3a50 100644
--- a/itests/hive-unit/src/test/java/org/apache/hive/service/cli/session/TestHiveSessionImpl.java
+++ b/itests/hive-unit/src/test/java/org/apache/hive/service/cli/session/TestHiveSessionImpl.java
@@ -1,4 +1,4 @@
-/**
+/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
@@ -50,7 +50,7 @@ public class TestHiveSessionImpl {
HiveConf serverhiveConf = new HiveConf();
String ipAddress = null;
HiveSessionImpl session = new HiveSessionImpl(null, protocol, username, password,
- serverhiveConf, ipAddress) {
+ serverhiveConf, ipAddress, null) {
@Override
protected synchronized void acquire(boolean userAccess, boolean isOperation) {
}
http://git-wip-us.apache.org/repos/asf/hive/blob/7195aee9/service/src/java/org/apache/hive/service/cli/session/HiveSessionBase.java
----------------------------------------------------------------------
diff --git a/service/src/java/org/apache/hive/service/cli/session/HiveSessionBase.java b/service/src/java/org/apache/hive/service/cli/session/HiveSessionBase.java
index 9436a25..ac105bf 100644
--- a/service/src/java/org/apache/hive/service/cli/session/HiveSessionBase.java
+++ b/service/src/java/org/apache/hive/service/cli/session/HiveSessionBase.java
@@ -19,6 +19,7 @@
package org.apache.hive.service.cli.session;
import java.io.File;
+import java.util.List;
import org.apache.hadoop.hive.conf.HiveConf;
import org.apache.hadoop.hive.ql.session.SessionState;
@@ -84,6 +85,10 @@ public interface HiveSessionBase {
void setIpAddress(String ipAddress);
+ List<String> getForwardedAddresses();
+
+ void setForwardedAddresses(List<String> forwardedAddresses);
+
long getLastAccessTime();
long getCreationTime();
http://git-wip-us.apache.org/repos/asf/hive/blob/7195aee9/service/src/java/org/apache/hive/service/cli/session/HiveSessionImpl.java
----------------------------------------------------------------------
diff --git a/service/src/java/org/apache/hive/service/cli/session/HiveSessionImpl.java b/service/src/java/org/apache/hive/service/cli/session/HiveSessionImpl.java
index 0206fe3..7fbcd13 100644
--- a/service/src/java/org/apache/hive/service/cli/session/HiveSessionImpl.java
+++ b/service/src/java/org/apache/hive/service/cli/session/HiveSessionImpl.java
@@ -102,6 +102,7 @@ public class HiveSessionImpl implements HiveSession {
// 2) Some parts of session state, like mrStats and vars, need proper synchronization.
private SessionState sessionState;
private String ipAddress;
+ private List<String> forwardedAddresses;
private static final String FETCH_WORK_SERDE_CLASS =
"org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe";
@@ -122,13 +123,15 @@ public class HiveSessionImpl implements HiveSession {
public HiveSessionImpl(SessionHandle sessionHandle, TProtocolVersion protocol,
- String username, String password, HiveConf serverConf, String ipAddress) {
+ String username, String password, HiveConf serverConf, String ipAddress,
+ final List<String> forwardedAddresses) {
this.username = username;
this.password = password;
creationTime = System.currentTimeMillis();
this.sessionHandle = sessionHandle != null ? sessionHandle : new SessionHandle(protocol);
this.sessionConf = new HiveConf(serverConf);
this.ipAddress = ipAddress;
+ this.forwardedAddresses = forwardedAddresses;
this.operationLock = serverConf.getBoolVar(
ConfVars.HIVE_SERVER2_PARALLEL_OPS_IN_SESSION) ? null : new Semaphore(1);
try {
@@ -927,6 +930,16 @@ public class HiveSessionImpl implements HiveSession {
}
@Override
+ public List<String> getForwardedAddresses() {
+ return forwardedAddresses;
+ }
+
+ @Override
+ public void setForwardedAddresses(final List<String> forwardedAddresses) {
+ this.forwardedAddresses = forwardedAddresses;
+ }
+
+ @Override
public String getDelegationToken(HiveAuthFactory authFactory, String owner, String renewer)
throws HiveSQLException {
HiveAuthFactory.verifyProxyAccess(getUserName(), owner, getIpAddress(), getHiveConf());
http://git-wip-us.apache.org/repos/asf/hive/blob/7195aee9/service/src/java/org/apache/hive/service/cli/session/HiveSessionImplwithUGI.java
----------------------------------------------------------------------
diff --git a/service/src/java/org/apache/hive/service/cli/session/HiveSessionImplwithUGI.java b/service/src/java/org/apache/hive/service/cli/session/HiveSessionImplwithUGI.java
index 8975aee..32598d3 100644
--- a/service/src/java/org/apache/hive/service/cli/session/HiveSessionImplwithUGI.java
+++ b/service/src/java/org/apache/hive/service/cli/session/HiveSessionImplwithUGI.java
@@ -19,6 +19,7 @@
package org.apache.hive.service.cli.session;
import java.io.IOException;
+import java.util.List;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.hive.conf.HiveConf;
@@ -49,8 +50,9 @@ public class HiveSessionImplwithUGI extends HiveSessionImpl {
private HiveSession proxySession = null;
public HiveSessionImplwithUGI(SessionHandle sessionHandle, TProtocolVersion protocol, String username,
- String password, HiveConf hiveConf, String ipAddress, String delegationToken) throws HiveSQLException {
- super(sessionHandle, protocol, username, password, hiveConf, ipAddress);
+ String password, HiveConf hiveConf, String ipAddress, String delegationToken,
+ final List<String> forwardedAddresses) throws HiveSQLException {
+ super(sessionHandle, protocol, username, password, hiveConf, ipAddress, forwardedAddresses);
setSessionUGI(username);
setDelegationToken(delegationToken);
}
http://git-wip-us.apache.org/repos/asf/hive/blob/7195aee9/service/src/java/org/apache/hive/service/cli/session/SessionManager.java
----------------------------------------------------------------------
diff --git a/service/src/java/org/apache/hive/service/cli/session/SessionManager.java b/service/src/java/org/apache/hive/service/cli/session/SessionManager.java
index 9b2ae57..1846c91 100644
--- a/service/src/java/org/apache/hive/service/cli/session/SessionManager.java
+++ b/service/src/java/org/apache/hive/service/cli/session/SessionManager.java
@@ -33,6 +33,7 @@ import java.util.concurrent.Future;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.LongAdder;
import com.google.common.base.Predicate;
import com.google.common.collect.Iterables;
@@ -43,7 +44,6 @@ import org.apache.hadoop.hive.common.metrics.common.MetricsFactory;
import org.apache.hadoop.hive.common.metrics.common.MetricsVariable;
import org.apache.hadoop.hive.conf.HiveConf;
import org.apache.hadoop.hive.conf.HiveConf.ConfVars;
-import org.apache.hadoop.hive.ql.hooks.HookUtils;
import org.apache.hadoop.hive.ql.hooks.HooksLoader;
import org.apache.hive.service.CompositeService;
import org.apache.hive.service.cli.HiveSQLException;
@@ -68,6 +68,10 @@ public class SessionManager extends CompositeService {
private HiveConf hiveConf;
private final Map<SessionHandle, HiveSession> handleToSession =
new ConcurrentHashMap<SessionHandle, HiveSession>();
+ private final Map<String, LongAdder> connectionsCount = new ConcurrentHashMap<>();
+ private int userLimit;
+ private int ipAddressLimit;
+ private int userIpAddressLimit;
private final OperationManager operationManager = new OperationManager();
private ThreadPoolExecutor backgroundOperationPool;
private boolean isOperationLogEnabled;
@@ -103,6 +107,12 @@ public class SessionManager extends CompositeService {
registerOpenSesssionMetrics(metrics);
registerActiveSesssionMetrics(metrics);
}
+
+ userLimit = hiveConf.getIntVar(ConfVars.HIVE_SERVER2_LIMIT_CONNECTIONS_PER_USER);
+ ipAddressLimit = hiveConf.getIntVar(ConfVars.HIVE_SERVER2_LIMIT_CONNECTIONS_PER_IPADDRESS);
+ userIpAddressLimit = hiveConf.getIntVar(ConfVars.HIVE_SERVER2_LIMIT_CONNECTIONS_PER_USER_IPADDRESS);
+ LOG.info("Connections limit are user: {} ipaddress: {} user-ipaddress: {}", userLimit, ipAddressLimit,
+ userIpAddressLimit);
super.init(hiveConf);
}
@@ -368,6 +378,10 @@ public class SessionManager extends CompositeService {
String delegationToken)
throws HiveSQLException {
+ // if client proxies connection, use forwarded ip-addresses instead of just the gateway
+ final List<String> forwardedAddresses = getForwardedAddresses();
+ incrementConnections(username, ipAddress, forwardedAddresses);
+
HiveSession session;
// If doAs is set to true for HiveServer2, we will create a proxy object for the session impl.
// Within the proxy object, we wrap the method call in a UserGroupInformation#doAs
@@ -375,16 +389,16 @@ public class SessionManager extends CompositeService {
HiveSessionImplwithUGI hiveSessionUgi;
if (sessionImplWithUGIclassName == null) {
hiveSessionUgi = new HiveSessionImplwithUGI(sessionHandle, protocol, username, password,
- hiveConf, ipAddress, delegationToken);
+ hiveConf, ipAddress, delegationToken, forwardedAddresses);
} else {
try {
Class<?> clazz = Class.forName(sessionImplWithUGIclassName);
Constructor<?> constructor = clazz.getConstructor(SessionHandle.class, TProtocolVersion.class, String.class,
- String.class, HiveConf.class, String.class, String.class);
+ String.class, HiveConf.class, String.class, String.class, List.class);
hiveSessionUgi = (HiveSessionImplwithUGI) constructor.newInstance(sessionHandle,
- protocol, username, password, hiveConf, ipAddress, delegationToken);
+ protocol, username, password, hiveConf, ipAddress, delegationToken, forwardedAddresses);
} catch (Exception e) {
- throw new HiveSQLException("Cannot initilize session class:" + sessionImplWithUGIclassName);
+ throw new HiveSQLException("Cannot initialize session class:" + sessionImplWithUGIclassName);
}
}
session = HiveSessionProxy.getProxy(hiveSessionUgi, hiveSessionUgi.getSessionUgi());
@@ -392,14 +406,14 @@ public class SessionManager extends CompositeService {
} else {
if (sessionImplclassName == null) {
session = new HiveSessionImpl(sessionHandle, protocol, username, password, hiveConf,
- ipAddress);
+ ipAddress, forwardedAddresses);
} else {
try {
Class<?> clazz = Class.forName(sessionImplclassName);
Constructor<?> constructor = clazz.getConstructor(SessionHandle.class, TProtocolVersion.class,
- String.class, String.class, HiveConf.class, String.class);
+ String.class, String.class, HiveConf.class, String.class, List.class);
session = (HiveSession) constructor.newInstance(sessionHandle, protocol, username, password,
- hiveConf, ipAddress);
+ hiveConf, ipAddress, forwardedAddresses);
} catch (Exception e) {
throw new HiveSQLException("Cannot initilize session class:" + sessionImplclassName, e);
}
@@ -439,6 +453,95 @@ public class SessionManager extends CompositeService {
return session;
}
+ private void incrementConnections(final String username, final String ipAddress,
+ final List<String> forwardedAddresses) throws HiveSQLException {
+ final String clientIpAddress = getOriginClientIpAddress(ipAddress, forwardedAddresses);
+
+ String violation = anyViolations(username, clientIpAddress);
+ // increment the counters only when there are no violations
+ if (violation == null) {
+ if (trackConnectionsPerUser(username)) {
+ connectionsCount.computeIfAbsent(username, k -> new LongAdder()).increment();
+ }
+
+ if (trackConnectionsPerIpAddress(clientIpAddress)) {
+ connectionsCount.computeIfAbsent(clientIpAddress, k -> new LongAdder()).increment();
+ }
+
+ if (trackConnectionsPerUserIpAddress(username, clientIpAddress)) {
+ connectionsCount.computeIfAbsent(username + ":" + clientIpAddress, k -> new LongAdder()).increment();
+ }
+ } else {
+ LOG.error(violation);
+ throw new HiveSQLException(violation);
+ }
+ }
+
+ private String getOriginClientIpAddress(final String ipAddress, final List<String> forwardedAddresses) {
+ if (forwardedAddresses == null || forwardedAddresses.isEmpty()) {
+ return ipAddress;
+ }
+ // order of forwarded ips per X-Forwarded-For http spec (client, proxy1, proxy2)
+ return forwardedAddresses.get(0);
+ }
+
+ private void decrementConnections(final HiveSession session) {
+ final String username = session.getUserName();
+ final String clientIpAddress = getOriginClientIpAddress(session.getIpAddress(), session.getForwardedAddresses());
+ if (trackConnectionsPerUser(username)) {
+ connectionsCount.computeIfPresent(username, (k, v) -> v).decrement();
+ }
+
+ if (trackConnectionsPerIpAddress(clientIpAddress)) {
+ connectionsCount.computeIfPresent(clientIpAddress, (k, v) -> v).decrement();
+ }
+
+ if (trackConnectionsPerUserIpAddress(username, clientIpAddress)) {
+ connectionsCount.computeIfPresent(username + ":" + clientIpAddress, (k, v) -> v).decrement();
+ }
+ }
+
+ private String anyViolations(final String username, final String ipAddress) {
+ if (trackConnectionsPerUser(username) && !withinLimits(username, userLimit)) {
+ return "Connection limit per user reached (user: " + username + " limit: " + userLimit + ")";
+ }
+
+ if (trackConnectionsPerIpAddress(ipAddress) && !withinLimits(ipAddress, ipAddressLimit)) {
+ return "Connection limit per ipaddress reached (ipaddress: " + ipAddress + " limit: " + ipAddressLimit + ")";
+ }
+
+ if (trackConnectionsPerUserIpAddress(username, ipAddress) &&
+ !withinLimits(username + ":" + ipAddress, userIpAddressLimit)) {
+ return "Connection limit per user:ipaddress reached (user:ipaddress: " + username + ":" + ipAddress + " limit: " +
+ userIpAddressLimit + ")";
+ }
+
+ return null;
+ }
+
+ private boolean trackConnectionsPerUserIpAddress(final String username, final String ipAddress) {
+ return userIpAddressLimit > 0 && username != null && !username.isEmpty() && ipAddress != null &&
+ !ipAddress.isEmpty();
+ }
+
+ private boolean trackConnectionsPerIpAddress(final String ipAddress) {
+ return ipAddressLimit > 0 && ipAddress != null && !ipAddress.isEmpty();
+ }
+
+ private boolean trackConnectionsPerUser(final String username) {
+ return userLimit > 0 && username != null && !username.isEmpty();
+ }
+
+ private boolean withinLimits(final String track, final int limit) {
+ if (connectionsCount.containsKey(track)) {
+ final int connectionCount = connectionsCount.get(track).intValue();
+ if (connectionCount >= limit) {
+ return false;
+ }
+ }
+ return true;
+ }
+
public synchronized void closeSession(SessionHandle sessionHandle) throws HiveSQLException {
HiveSession session = handleToSession.remove(sessionHandle);
if (session == null) {
@@ -448,6 +551,7 @@ public class SessionManager extends CompositeService {
try {
session.close();
} finally {
+ decrementConnections(session);
// Shutdown HiveServer2 if it has been deregistered from ZooKeeper and has no active sessions
if (!(hiveServer2 == null) && (hiveConf.getBoolVar(ConfVars.HIVE_SERVER2_SUPPORT_DYNAMIC_SERVICE_DISCOVERY))
&& (hiveServer2.isDeregisteredWithZooKeeper())) {
http://git-wip-us.apache.org/repos/asf/hive/blob/7195aee9/service/src/java/org/apache/hive/service/cli/thrift/ThriftCLIService.java
----------------------------------------------------------------------
diff --git a/service/src/java/org/apache/hive/service/cli/thrift/ThriftCLIService.java b/service/src/java/org/apache/hive/service/cli/thrift/ThriftCLIService.java
index 6354c8c..fc9e6b2 100644
--- a/service/src/java/org/apache/hive/service/cli/thrift/ThriftCLIService.java
+++ b/service/src/java/org/apache/hive/service/cli/thrift/ThriftCLIService.java
@@ -320,8 +320,6 @@ public abstract class ThriftCLIService extends AbstractService implements TCLISe
LOG.info("Client protocol version: " + req.getClient_protocol());
TOpenSessionResp resp = new TOpenSessionResp();
try {
- Map<String, String> openConf = req.getConfiguration();
-
SessionHandle sessionHandle = getSessionHandle(req, resp);
resp.setSessionHandle(sessionHandle.toTSessionHandle());
Map<String, String> configurationMap = new HashMap<String, String>();
http://git-wip-us.apache.org/repos/asf/hive/blob/7195aee9/service/src/test/org/apache/hive/service/cli/TestCLIServiceConnectionLimits.java
----------------------------------------------------------------------
diff --git a/service/src/test/org/apache/hive/service/cli/TestCLIServiceConnectionLimits.java b/service/src/test/org/apache/hive/service/cli/TestCLIServiceConnectionLimits.java
new file mode 100644
index 0000000..5ecea9a
--- /dev/null
+++ b/service/src/test/org/apache/hive/service/cli/TestCLIServiceConnectionLimits.java
@@ -0,0 +1,337 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hive.service.cli;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+
+import org.apache.hadoop.hive.conf.HiveConf;
+import org.apache.hive.service.cli.session.SessionManager;
+import org.junit.Test;
+import org.junit.rules.ExpectedException;
+
+import com.google.common.collect.Lists;
+
+public class TestCLIServiceConnectionLimits {
+ @org.junit.Rule
+ public ExpectedException thrown = ExpectedException.none();
+
+ private int limit = 10;
+ private HiveConf conf = new HiveConf();
+
+ @Test
+ public void testNoLimit() throws HiveSQLException {
+ conf.setIntVar(HiveConf.ConfVars.HIVE_SERVER2_LIMIT_CONNECTIONS_PER_USER, 0);
+ conf.setIntVar(HiveConf.ConfVars.HIVE_SERVER2_LIMIT_CONNECTIONS_PER_IPADDRESS, 0);
+ conf.setIntVar(HiveConf.ConfVars.HIVE_SERVER2_LIMIT_CONNECTIONS_PER_USER_IPADDRESS, 0);
+ CLIService service = getService(conf);
+ List<SessionHandle> sessionHandles = new ArrayList<>();
+ try {
+ for (int i = 0; i < limit + 1; i++) {
+ SessionHandle session = service.openSession(CLIService.SERVER_VERSION, "foo", "bar", "127.0.0.1", null);
+ sessionHandles.add(session);
+ }
+ } finally {
+ for (SessionHandle sessionHandle : sessionHandles) {
+ service.closeSession(sessionHandle);
+ }
+ service.stop();
+ }
+ }
+
+ @Test
+ public void testIncrementAndDecrementConnectionsUser() throws HiveSQLException {
+ conf.setIntVar(HiveConf.ConfVars.HIVE_SERVER2_LIMIT_CONNECTIONS_PER_USER, 10);
+ conf.setIntVar(HiveConf.ConfVars.HIVE_SERVER2_LIMIT_CONNECTIONS_PER_IPADDRESS, 0);
+ conf.setIntVar(HiveConf.ConfVars.HIVE_SERVER2_LIMIT_CONNECTIONS_PER_USER_IPADDRESS, 0);
+ CLIService service = getService(conf);
+ List<SessionHandle> sessionHandles = new ArrayList<>();
+ try {
+ // open 5 connections
+ for (int i = 0; i < limit / 2; i++) {
+ SessionHandle session = service.openSession(CLIService.SERVER_VERSION, "foo", "bar", "127.0.0.1", null);
+ sessionHandles.add(session);
+ }
+
+ // close them all
+ for (SessionHandle sessionHandle : sessionHandles) {
+ service.closeSession(sessionHandle);
+ }
+ sessionHandles.clear();
+
+ // open till limit but not exceed
+ for (int i = 0; i < limit; i++) {
+ SessionHandle session = service.openSession(CLIService.SERVER_VERSION, "ff", "bar", "127.0.0.1", null);
+ sessionHandles.add(session);
+ }
+ } finally {
+ for (SessionHandle sessionHandle : sessionHandles) {
+ service.closeSession(sessionHandle);
+ }
+ service.stop();
+ }
+ }
+
+ @Test
+ public void testInvalidUserName() throws HiveSQLException {
+ conf.setIntVar(HiveConf.ConfVars.HIVE_SERVER2_LIMIT_CONNECTIONS_PER_USER, 10);
+ conf.setIntVar(HiveConf.ConfVars.HIVE_SERVER2_LIMIT_CONNECTIONS_PER_IPADDRESS, 0);
+ conf.setIntVar(HiveConf.ConfVars.HIVE_SERVER2_LIMIT_CONNECTIONS_PER_USER_IPADDRESS, 0);
+ CLIService service = getService(conf);
+ List<SessionHandle> sessionHandles = new ArrayList<>();
+ try {
+ for (int i = 0; i < limit + 1; i++) {
+ SessionHandle session = service.openSession(CLIService.SERVER_VERSION, null, "bar", "127.0.0.1", null);
+ sessionHandles.add(session);
+ }
+
+ for (int i = 0; i < limit + 1; i++) {
+ SessionHandle session = service.openSession(CLIService.SERVER_VERSION, "", "bar", "127.0.0.1", null);
+ sessionHandles.add(session);
+ }
+ } finally {
+ for (SessionHandle sessionHandle : sessionHandles) {
+ service.closeSession(sessionHandle);
+ }
+ service.stop();
+ }
+ }
+
+ @Test
+ public void testInvalidIpaddress() throws HiveSQLException {
+ conf.setIntVar(HiveConf.ConfVars.HIVE_SERVER2_LIMIT_CONNECTIONS_PER_USER, 0);
+ conf.setIntVar(HiveConf.ConfVars.HIVE_SERVER2_LIMIT_CONNECTIONS_PER_IPADDRESS, 10);
+ conf.setIntVar(HiveConf.ConfVars.HIVE_SERVER2_LIMIT_CONNECTIONS_PER_USER_IPADDRESS, 0);
+ CLIService service = getService(conf);
+ List<SessionHandle> sessionHandles = new ArrayList<>();
+ try {
+ for (int i = 0; i < limit + 1; i++) {
+ SessionHandle session = service.openSession(CLIService.SERVER_VERSION, "foo", "bar", null, null);
+ sessionHandles.add(session);
+ }
+
+ for (int i = 0; i < limit + 1; i++) {
+ SessionHandle session = service.openSession(CLIService.SERVER_VERSION, "foo", "bar", "", null);
+ sessionHandles.add(session);
+ }
+ } finally {
+ for (SessionHandle sessionHandle : sessionHandles) {
+ service.closeSession(sessionHandle);
+ }
+ service.stop();
+ }
+ }
+
+ @Test
+ public void testInvalidUserIpaddress() throws HiveSQLException {
+ conf.setIntVar(HiveConf.ConfVars.HIVE_SERVER2_LIMIT_CONNECTIONS_PER_USER, 0);
+ conf.setIntVar(HiveConf.ConfVars.HIVE_SERVER2_LIMIT_CONNECTIONS_PER_IPADDRESS, 10);
+ conf.setIntVar(HiveConf.ConfVars.HIVE_SERVER2_LIMIT_CONNECTIONS_PER_USER_IPADDRESS, 0);
+ CLIService service = getService(conf);
+ List<SessionHandle> sessionHandles = new ArrayList<>();
+ try {
+ for (int i = 0; i < limit + 1; i++) {
+ SessionHandle session = service.openSession(CLIService.SERVER_VERSION, " ", "bar", null, null);
+ sessionHandles.add(session);
+ }
+ } finally {
+ for (SessionHandle sessionHandle : sessionHandles) {
+ service.closeSession(sessionHandle);
+ }
+ service.stop();
+ }
+ }
+
+ @Test
+ public void testConnectionLimitPerUser() throws HiveSQLException {
+ thrown.expect(HiveSQLException.class);
+ thrown.expectMessage("Connection limit per user reached (user: foo limit: 10)");
+
+ conf.setIntVar(HiveConf.ConfVars.HIVE_SERVER2_LIMIT_CONNECTIONS_PER_USER, 10);
+ conf.setIntVar(HiveConf.ConfVars.HIVE_SERVER2_LIMIT_CONNECTIONS_PER_IPADDRESS, 0);
+ conf.setIntVar(HiveConf.ConfVars.HIVE_SERVER2_LIMIT_CONNECTIONS_PER_USER_IPADDRESS, 0);
+ CLIService service = getService(conf);
+ List<SessionHandle> sessionHandles = new ArrayList<>();
+ try {
+ for (int i = 0; i < limit + 1; i++) {
+ SessionHandle session = service.openSession(CLIService.SERVER_VERSION, "foo", "bar", "127.0.0.1", null);
+ sessionHandles.add(session);
+ }
+
+ } finally {
+ for (SessionHandle sessionHandle : sessionHandles) {
+ service.closeSession(sessionHandle);
+ }
+ service.stop();
+ }
+ }
+
+ @Test
+ public void testConnectionLimitPerIpAddress() throws HiveSQLException {
+ thrown.expect(HiveSQLException.class);
+ thrown.expectMessage("Connection limit per ipaddress reached (ipaddress: 127.0.0.1 limit: 10)");
+
+ conf.setIntVar(HiveConf.ConfVars.HIVE_SERVER2_LIMIT_CONNECTIONS_PER_USER, 0);
+ conf.setIntVar(HiveConf.ConfVars.HIVE_SERVER2_LIMIT_CONNECTIONS_PER_IPADDRESS, 10);
+ conf.setIntVar(HiveConf.ConfVars.HIVE_SERVER2_LIMIT_CONNECTIONS_PER_USER_IPADDRESS, 0);
+ CLIService service = getService(conf);
+ List<SessionHandle> sessionHandles = new ArrayList<>();
+ try {
+ for (int i = 0; i < limit + 1; i++) {
+ SessionHandle session = service.openSession(CLIService.SERVER_VERSION, "foo", "bar", "127.0.0.1", null);
+ sessionHandles.add(session);
+ }
+
+ } finally {
+ for (SessionHandle sessionHandle : sessionHandles) {
+ service.closeSession(sessionHandle);
+ }
+ service.stop();
+ }
+ }
+
+ @Test
+ public void testConnectionLimitPerUserIpAddress() throws HiveSQLException {
+ thrown.expect(HiveSQLException.class);
+ thrown.expectMessage("Connection limit per user:ipaddress reached (user:ipaddress: foo:127.0.0.1 limit: 10)");
+
+ conf.setIntVar(HiveConf.ConfVars.HIVE_SERVER2_LIMIT_CONNECTIONS_PER_USER, 0);
+ conf.setIntVar(HiveConf.ConfVars.HIVE_SERVER2_LIMIT_CONNECTIONS_PER_IPADDRESS, 0);
+ conf.setIntVar(HiveConf.ConfVars.HIVE_SERVER2_LIMIT_CONNECTIONS_PER_USER_IPADDRESS, 10);
+ CLIService service = getService(conf);
+ List<SessionHandle> sessionHandles = new ArrayList<>();
+ try {
+ for (int i = 0; i < limit + 1; i++) {
+ SessionHandle session = service.openSession(CLIService.SERVER_VERSION, "foo", "bar", "127.0.0.1", null);
+ sessionHandles.add(session);
+ }
+
+ } finally {
+ for (SessionHandle sessionHandle : sessionHandles) {
+ service.closeSession(sessionHandle);
+ }
+ service.stop();
+ }
+ }
+
+ @Test
+ public void testConnectionMultipleLimitsUserAndIP() throws HiveSQLException {
+ thrown.expect(HiveSQLException.class);
+ thrown.expectMessage("Connection limit per user reached (user: foo limit: 5)");
+
+ conf.setIntVar(HiveConf.ConfVars.HIVE_SERVER2_LIMIT_CONNECTIONS_PER_USER, 5);
+ conf.setIntVar(HiveConf.ConfVars.HIVE_SERVER2_LIMIT_CONNECTIONS_PER_IPADDRESS, 10);
+ conf.setIntVar(HiveConf.ConfVars.HIVE_SERVER2_LIMIT_CONNECTIONS_PER_USER_IPADDRESS, 0);
+ CLIService service = getService(conf);
+ List<SessionHandle> sessionHandles = new ArrayList<>();
+ try {
+ for (int i = 0; i < limit + 1; i++) {
+ SessionHandle session = service.openSession(CLIService.SERVER_VERSION, "foo", "bar", "127.0.0.1", null);
+ sessionHandles.add(session);
+ }
+
+ } finally {
+ for (SessionHandle sessionHandle : sessionHandles) {
+ service.closeSession(sessionHandle);
+ }
+ service.stop();
+ }
+ }
+
+ @Test
+ public void testConnectionMultipleLimitsIPAndUserIP() throws HiveSQLException {
+ thrown.expect(HiveSQLException.class);
+ thrown.expectMessage("Connection limit per ipaddress reached (ipaddress: 127.0.0.1 limit: 5)");
+
+ conf.setIntVar(HiveConf.ConfVars.HIVE_SERVER2_LIMIT_CONNECTIONS_PER_USER, 0);
+ conf.setIntVar(HiveConf.ConfVars.HIVE_SERVER2_LIMIT_CONNECTIONS_PER_IPADDRESS, 5);
+ conf.setIntVar(HiveConf.ConfVars.HIVE_SERVER2_LIMIT_CONNECTIONS_PER_USER_IPADDRESS, 10);
+ CLIService service = getService(conf);
+ List<SessionHandle> sessionHandles = new ArrayList<>();
+ try {
+ for (int i = 0; i < limit + 1; i++) {
+ SessionHandle session = service.openSession(CLIService.SERVER_VERSION, "foo", "bar", "127.0.0.1", null);
+ sessionHandles.add(session);
+ }
+
+ } finally {
+ for (SessionHandle sessionHandle : sessionHandles) {
+ service.closeSession(sessionHandle);
+ }
+ service.stop();
+ }
+ }
+
+ @Test
+ public void testConnectionMultipleLimitsUserIPAndUser() throws HiveSQLException {
+ thrown.expect(HiveSQLException.class);
+ thrown.expectMessage("Connection limit per user:ipaddress reached (user:ipaddress: foo:127.0.0.1 limit: 10)");
+
+ conf.setIntVar(HiveConf.ConfVars.HIVE_SERVER2_LIMIT_CONNECTIONS_PER_USER, 15);
+ conf.setIntVar(HiveConf.ConfVars.HIVE_SERVER2_LIMIT_CONNECTIONS_PER_IPADDRESS, 0);
+ conf.setIntVar(HiveConf.ConfVars.HIVE_SERVER2_LIMIT_CONNECTIONS_PER_USER_IPADDRESS, 10);
+ CLIService service = getService(conf);
+ List<SessionHandle> sessionHandles = new ArrayList<>();
+ try {
+ for (int i = 0; i < limit + 1; i++) {
+ SessionHandle session = service.openSession(CLIService.SERVER_VERSION, "foo", "bar", "127.0.0.1", null);
+ sessionHandles.add(session);
+ }
+
+ } finally {
+ for (SessionHandle sessionHandle : sessionHandles) {
+ service.closeSession(sessionHandle);
+ }
+ service.stop();
+ }
+ }
+
+ @Test
+ public void testConnectionForwardedIpAddresses() throws HiveSQLException {
+ thrown.expect(HiveSQLException.class);
+ thrown.expectMessage("Connection limit per ipaddress reached (ipaddress: 194.167.0.3 limit: 10)");
+
+ conf.setIntVar(HiveConf.ConfVars.HIVE_SERVER2_LIMIT_CONNECTIONS_PER_USER, 0);
+ conf.setIntVar(HiveConf.ConfVars.HIVE_SERVER2_LIMIT_CONNECTIONS_PER_IPADDRESS, 10);
+ conf.setIntVar(HiveConf.ConfVars.HIVE_SERVER2_LIMIT_CONNECTIONS_PER_USER_IPADDRESS, 10);
+ CLIService service = getService(conf);
+ SessionManager.setForwardedAddresses(Lists.newArrayList("194.167.0.3", "194.167.0.2", "194.167.0.1"));
+ List<SessionHandle> sessionHandles = new ArrayList<>();
+ try {
+ for (int i = 0; i < limit + 1; i++) {
+ SessionHandle session = service.openSession(CLIService.SERVER_VERSION, "foo", "bar", "194.167.0.1", null);
+ sessionHandles.add(session);
+ }
+
+ } finally {
+ SessionManager.setForwardedAddresses(Collections.emptyList());
+ for (SessionHandle sessionHandle : sessionHandles) {
+ service.closeSession(sessionHandle);
+ }
+ service.stop();
+ }
+ }
+
+ private CLIService getService(HiveConf conf) {
+ conf.setVar(HiveConf.ConfVars.HIVE_AUTHORIZATION_MANAGER,
+ "org.apache.hadoop.hive.ql.security.authorization.plugin.sqlstd.SQLStdHiveAuthorizerFactory");
+ CLIService service = new CLIService(null);
+ service.init(conf);
+ service.start();
+ return service;
+ }
+}
http://git-wip-us.apache.org/repos/asf/hive/blob/7195aee9/service/src/test/org/apache/hive/service/cli/session/TestPluggableHiveSessionImpl.java
----------------------------------------------------------------------
diff --git a/service/src/test/org/apache/hive/service/cli/session/TestPluggableHiveSessionImpl.java b/service/src/test/org/apache/hive/service/cli/session/TestPluggableHiveSessionImpl.java
index 47f95c5..90237c0 100644
--- a/service/src/test/org/apache/hive/service/cli/session/TestPluggableHiveSessionImpl.java
+++ b/service/src/test/org/apache/hive/service/cli/session/TestPluggableHiveSessionImpl.java
@@ -19,6 +19,8 @@ package org.apache.hive.service.cli.session;
import static org.junit.Assert.assertEquals;
+import java.util.List;
+
import org.apache.hadoop.hive.conf.HiveConf;
import org.apache.hive.service.cli.CLIService;
import org.apache.hive.service.cli.HiveSQLException;
@@ -87,8 +89,8 @@ public class TestPluggableHiveSessionImpl {
public static final int MAGIC_RETURN_VALUE = 0xbeef0001;
public SampleHiveSessionImpl(SessionHandle sessionHandle, TProtocolVersion protocol,
- String username, String password, HiveConf serverhiveConf, String ipAddress) {
- super(sessionHandle, protocol, username, password, serverhiveConf, ipAddress);
+ String username, String password, HiveConf serverhiveConf, String ipAddress, List<String> forwardAddresses) {
+ super(sessionHandle, protocol, username, password, serverhiveConf, ipAddress, forwardAddresses);
}
@Override
@@ -103,9 +105,9 @@ public class TestPluggableHiveSessionImpl {
public SampleHiveSessionImplWithUGI(SessionHandle sessionHandle, TProtocolVersion protocol,
String username, String password, HiveConf serverhiveConf, String ipAddress,
- String delegationToken) throws HiveSQLException {
+ String delegationToken, List<String> forwardedAddresses) throws HiveSQLException {
super(sessionHandle, protocol, username, password, serverhiveConf, ipAddress,
- delegationToken);
+ delegationToken, forwardedAddresses);
}
@Override