You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hive.apache.org by pr...@apache.org on 2017/11/07 01:16:23 UTC

hive git commit: HIVE-16917: HiveServer2 guard rails - Limit concurrent connections from user (Prasanth Jayachandran reviewed by Thejas, S, Sergey)

Repository: hive
Updated Branches:
  refs/heads/master d7d96658c -> 7195aee93


HIVE-16917: HiveServer2 guard rails - Limit concurrent connections from user (Prasanth Jayachandran reviewed by Thejas, S, Sergey)


Project: http://git-wip-us.apache.org/repos/asf/hive/repo
Commit: http://git-wip-us.apache.org/repos/asf/hive/commit/7195aee9
Tree: http://git-wip-us.apache.org/repos/asf/hive/tree/7195aee9
Diff: http://git-wip-us.apache.org/repos/asf/hive/diff/7195aee9

Branch: refs/heads/master
Commit: 7195aee937f5d6d33497cbf19dd70bc38ed2a92b
Parents: d7d9665
Author: Prasanth Jayachandran <pr...@apache.org>
Authored: Mon Nov 6 17:15:42 2017 -0800
Committer: Prasanth Jayachandran <pr...@apache.org>
Committed: Mon Nov 6 17:15:42 2017 -0800

----------------------------------------------------------------------
 .../org/apache/hadoop/hive/conf/HiveConf.java   |  11 +
 .../cli/session/TestHiveSessionImpl.java        |   4 +-
 .../service/cli/session/HiveSessionBase.java    |   5 +
 .../service/cli/session/HiveSessionImpl.java    |  15 +-
 .../cli/session/HiveSessionImplwithUGI.java     |   6 +-
 .../service/cli/session/SessionManager.java     | 120 ++++++-
 .../service/cli/thrift/ThriftCLIService.java    |   2 -
 .../cli/TestCLIServiceConnectionLimits.java     | 337 +++++++++++++++++++
 .../session/TestPluggableHiveSessionImpl.java   |  10 +-
 9 files changed, 491 insertions(+), 19 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hive/blob/7195aee9/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java
----------------------------------------------------------------------
diff --git a/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java b/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java
index 10b364a..305e9dc 100644
--- a/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java
+++ b/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java
@@ -2455,6 +2455,17 @@ public class HiveConf extends Configuration {
         "  PERFORMANCE: Execution + Performance logs \n" +
         "  VERBOSE: All logs" ),
 
+    // HS2 connections guard rails
+    HIVE_SERVER2_LIMIT_CONNECTIONS_PER_USER("hive.server2.limit.connections.per.user", 0,
+      "Maximum hive server2 connections per user. Any user exceeding this limit will not be allowed to connect. " +
+        "Default=0 does not enforce limits."),
+    HIVE_SERVER2_LIMIT_CONNECTIONS_PER_IPADDRESS("hive.server2.limit.connections.per.ipaddress", 0,
+      "Maximum hive server2 connections per ipaddress. Any ipaddress exceeding this limit will not be allowed " +
+        "to connect. Default=0 does not enforce limits."),
+    HIVE_SERVER2_LIMIT_CONNECTIONS_PER_USER_IPADDRESS("hive.server2.limit.connections.per.user.ipaddress", 0,
+      "Maximum hive server2 connections per user:ipaddress combination. Any user-ipaddress exceeding this limit will " +
+        "not be allowed to connect. Default=0 does not enforce limits."),
+
     // Enable metric collection for HiveServer2
     HIVE_SERVER2_METRICS_ENABLED("hive.server2.metrics.enabled", false, "Enable metrics on the HiveServer2."),
 

http://git-wip-us.apache.org/repos/asf/hive/blob/7195aee9/itests/hive-unit/src/test/java/org/apache/hive/service/cli/session/TestHiveSessionImpl.java
----------------------------------------------------------------------
diff --git a/itests/hive-unit/src/test/java/org/apache/hive/service/cli/session/TestHiveSessionImpl.java b/itests/hive-unit/src/test/java/org/apache/hive/service/cli/session/TestHiveSessionImpl.java
index ebcf4a8..1ee3a50 100644
--- a/itests/hive-unit/src/test/java/org/apache/hive/service/cli/session/TestHiveSessionImpl.java
+++ b/itests/hive-unit/src/test/java/org/apache/hive/service/cli/session/TestHiveSessionImpl.java
@@ -1,4 +1,4 @@
-/**
+/*
  * Licensed to the Apache Software Foundation (ASF) under one
  * or more contributor license agreements.  See the NOTICE file
  * distributed with this work for additional information
@@ -50,7 +50,7 @@ public class TestHiveSessionImpl {
     HiveConf serverhiveConf = new HiveConf();
     String ipAddress = null;
     HiveSessionImpl session = new HiveSessionImpl(null, protocol, username, password,
-      serverhiveConf, ipAddress) {
+      serverhiveConf, ipAddress, null) {
       @Override
       protected synchronized void acquire(boolean userAccess, boolean isOperation) {
       }

http://git-wip-us.apache.org/repos/asf/hive/blob/7195aee9/service/src/java/org/apache/hive/service/cli/session/HiveSessionBase.java
----------------------------------------------------------------------
diff --git a/service/src/java/org/apache/hive/service/cli/session/HiveSessionBase.java b/service/src/java/org/apache/hive/service/cli/session/HiveSessionBase.java
index 9436a25..ac105bf 100644
--- a/service/src/java/org/apache/hive/service/cli/session/HiveSessionBase.java
+++ b/service/src/java/org/apache/hive/service/cli/session/HiveSessionBase.java
@@ -19,6 +19,7 @@
 package org.apache.hive.service.cli.session;
 
 import java.io.File;
+import java.util.List;
 
 import org.apache.hadoop.hive.conf.HiveConf;
 import org.apache.hadoop.hive.ql.session.SessionState;
@@ -84,6 +85,10 @@ public interface HiveSessionBase {
 
   void setIpAddress(String ipAddress);
 
+  List<String> getForwardedAddresses();
+
+  void setForwardedAddresses(List<String> forwardedAddresses);
+
   long getLastAccessTime();
 
   long getCreationTime();

http://git-wip-us.apache.org/repos/asf/hive/blob/7195aee9/service/src/java/org/apache/hive/service/cli/session/HiveSessionImpl.java
----------------------------------------------------------------------
diff --git a/service/src/java/org/apache/hive/service/cli/session/HiveSessionImpl.java b/service/src/java/org/apache/hive/service/cli/session/HiveSessionImpl.java
index 0206fe3..7fbcd13 100644
--- a/service/src/java/org/apache/hive/service/cli/session/HiveSessionImpl.java
+++ b/service/src/java/org/apache/hive/service/cli/session/HiveSessionImpl.java
@@ -102,6 +102,7 @@ public class HiveSessionImpl implements HiveSession {
   //       2) Some parts of session state, like mrStats and vars, need proper synchronization.
   private SessionState sessionState;
   private String ipAddress;
+  private List<String> forwardedAddresses;
 
   private static final String FETCH_WORK_SERDE_CLASS =
       "org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe";
@@ -122,13 +123,15 @@ public class HiveSessionImpl implements HiveSession {
 
 
   public HiveSessionImpl(SessionHandle sessionHandle, TProtocolVersion protocol,
-      String username, String password, HiveConf serverConf, String ipAddress) {
+    String username, String password, HiveConf serverConf, String ipAddress,
+    final List<String> forwardedAddresses) {
     this.username = username;
     this.password = password;
     creationTime = System.currentTimeMillis();
     this.sessionHandle = sessionHandle != null ? sessionHandle : new SessionHandle(protocol);
     this.sessionConf = new HiveConf(serverConf);
     this.ipAddress = ipAddress;
+    this.forwardedAddresses = forwardedAddresses;
     this.operationLock = serverConf.getBoolVar(
         ConfVars.HIVE_SERVER2_PARALLEL_OPS_IN_SESSION) ? null : new Semaphore(1);
     try {
@@ -927,6 +930,16 @@ public class HiveSessionImpl implements HiveSession {
   }
 
   @Override
+  public List<String> getForwardedAddresses() {
+    return forwardedAddresses;
+  }
+
+  @Override
+  public void setForwardedAddresses(final List<String> forwardedAddresses) {
+    this.forwardedAddresses = forwardedAddresses;
+  }
+
+  @Override
   public String getDelegationToken(HiveAuthFactory authFactory, String owner, String renewer)
       throws HiveSQLException {
     HiveAuthFactory.verifyProxyAccess(getUserName(), owner, getIpAddress(), getHiveConf());

http://git-wip-us.apache.org/repos/asf/hive/blob/7195aee9/service/src/java/org/apache/hive/service/cli/session/HiveSessionImplwithUGI.java
----------------------------------------------------------------------
diff --git a/service/src/java/org/apache/hive/service/cli/session/HiveSessionImplwithUGI.java b/service/src/java/org/apache/hive/service/cli/session/HiveSessionImplwithUGI.java
index 8975aee..32598d3 100644
--- a/service/src/java/org/apache/hive/service/cli/session/HiveSessionImplwithUGI.java
+++ b/service/src/java/org/apache/hive/service/cli/session/HiveSessionImplwithUGI.java
@@ -19,6 +19,7 @@
 package org.apache.hive.service.cli.session;
 
 import java.io.IOException;
+import java.util.List;
 
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.hive.conf.HiveConf;
@@ -49,8 +50,9 @@ public class HiveSessionImplwithUGI extends HiveSessionImpl {
   private HiveSession proxySession = null;
 
   public HiveSessionImplwithUGI(SessionHandle sessionHandle, TProtocolVersion protocol, String username,
-    String password, HiveConf hiveConf, String ipAddress, String delegationToken) throws HiveSQLException {
-    super(sessionHandle, protocol, username, password, hiveConf, ipAddress);
+    String password, HiveConf hiveConf, String ipAddress, String delegationToken,
+    final List<String> forwardedAddresses) throws HiveSQLException {
+    super(sessionHandle, protocol, username, password, hiveConf, ipAddress, forwardedAddresses);
     setSessionUGI(username);
     setDelegationToken(delegationToken);
   }

http://git-wip-us.apache.org/repos/asf/hive/blob/7195aee9/service/src/java/org/apache/hive/service/cli/session/SessionManager.java
----------------------------------------------------------------------
diff --git a/service/src/java/org/apache/hive/service/cli/session/SessionManager.java b/service/src/java/org/apache/hive/service/cli/session/SessionManager.java
index 9b2ae57..1846c91 100644
--- a/service/src/java/org/apache/hive/service/cli/session/SessionManager.java
+++ b/service/src/java/org/apache/hive/service/cli/session/SessionManager.java
@@ -33,6 +33,7 @@ import java.util.concurrent.Future;
 import java.util.concurrent.LinkedBlockingQueue;
 import java.util.concurrent.ThreadPoolExecutor;
 import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.LongAdder;
 
 import com.google.common.base.Predicate;
 import com.google.common.collect.Iterables;
@@ -43,7 +44,6 @@ import org.apache.hadoop.hive.common.metrics.common.MetricsFactory;
 import org.apache.hadoop.hive.common.metrics.common.MetricsVariable;
 import org.apache.hadoop.hive.conf.HiveConf;
 import org.apache.hadoop.hive.conf.HiveConf.ConfVars;
-import org.apache.hadoop.hive.ql.hooks.HookUtils;
 import org.apache.hadoop.hive.ql.hooks.HooksLoader;
 import org.apache.hive.service.CompositeService;
 import org.apache.hive.service.cli.HiveSQLException;
@@ -68,6 +68,10 @@ public class SessionManager extends CompositeService {
   private HiveConf hiveConf;
   private final Map<SessionHandle, HiveSession> handleToSession =
       new ConcurrentHashMap<SessionHandle, HiveSession>();
+  private final Map<String, LongAdder> connectionsCount = new ConcurrentHashMap<>();
+  private int userLimit;
+  private int ipAddressLimit;
+  private int userIpAddressLimit;
   private final OperationManager operationManager = new OperationManager();
   private ThreadPoolExecutor backgroundOperationPool;
   private boolean isOperationLogEnabled;
@@ -103,6 +107,12 @@ public class SessionManager extends CompositeService {
       registerOpenSesssionMetrics(metrics);
       registerActiveSesssionMetrics(metrics);
     }
+
+    userLimit = hiveConf.getIntVar(ConfVars.HIVE_SERVER2_LIMIT_CONNECTIONS_PER_USER);
+    ipAddressLimit = hiveConf.getIntVar(ConfVars.HIVE_SERVER2_LIMIT_CONNECTIONS_PER_IPADDRESS);
+    userIpAddressLimit = hiveConf.getIntVar(ConfVars.HIVE_SERVER2_LIMIT_CONNECTIONS_PER_USER_IPADDRESS);
+    LOG.info("Connections limit are user: {} ipaddress: {} user-ipaddress: {}", userLimit, ipAddressLimit,
+      userIpAddressLimit);
     super.init(hiveConf);
   }
 
@@ -368,6 +378,10 @@ public class SessionManager extends CompositeService {
     String delegationToken)
     throws HiveSQLException {
 
+    // if client proxies connection, use forwarded ip-addresses instead of just the gateway
+    final List<String> forwardedAddresses = getForwardedAddresses();
+    incrementConnections(username, ipAddress, forwardedAddresses);
+
     HiveSession session;
     // If doAs is set to true for HiveServer2, we will create a proxy object for the session impl.
     // Within the proxy object, we wrap the method call in a UserGroupInformation#doAs
@@ -375,16 +389,16 @@ public class SessionManager extends CompositeService {
       HiveSessionImplwithUGI hiveSessionUgi;
       if (sessionImplWithUGIclassName == null) {
         hiveSessionUgi = new HiveSessionImplwithUGI(sessionHandle, protocol, username, password,
-            hiveConf, ipAddress, delegationToken);
+            hiveConf, ipAddress, delegationToken, forwardedAddresses);
       } else {
         try {
           Class<?> clazz = Class.forName(sessionImplWithUGIclassName);
           Constructor<?> constructor = clazz.getConstructor(SessionHandle.class, TProtocolVersion.class, String.class,
-            String.class, HiveConf.class, String.class, String.class);
+            String.class, HiveConf.class, String.class, String.class, List.class);
           hiveSessionUgi = (HiveSessionImplwithUGI) constructor.newInstance(sessionHandle,
-              protocol, username, password, hiveConf, ipAddress, delegationToken);
+              protocol, username, password, hiveConf, ipAddress, delegationToken, forwardedAddresses);
         } catch (Exception e) {
-          throw new HiveSQLException("Cannot initilize session class:" + sessionImplWithUGIclassName);
+          throw new HiveSQLException("Cannot initialize session class:" + sessionImplWithUGIclassName);
         }
       }
       session = HiveSessionProxy.getProxy(hiveSessionUgi, hiveSessionUgi.getSessionUgi());
@@ -392,14 +406,14 @@ public class SessionManager extends CompositeService {
     } else {
       if (sessionImplclassName == null) {
         session = new HiveSessionImpl(sessionHandle, protocol, username, password, hiveConf,
-          ipAddress);
+          ipAddress, forwardedAddresses);
       } else {
         try {
         Class<?> clazz = Class.forName(sessionImplclassName);
         Constructor<?> constructor = clazz.getConstructor(SessionHandle.class, TProtocolVersion.class,
-          String.class, String.class, HiveConf.class, String.class);
+          String.class, String.class, HiveConf.class, String.class, List.class);
         session = (HiveSession) constructor.newInstance(sessionHandle, protocol, username, password,
-          hiveConf, ipAddress);
+          hiveConf, ipAddress, forwardedAddresses);
         } catch (Exception e) {
           throw new HiveSQLException("Cannot initilize session class:" + sessionImplclassName, e);
         }
@@ -439,6 +453,95 @@ public class SessionManager extends CompositeService {
     return session;
   }
 
+  private void incrementConnections(final String username, final String ipAddress,
+    final List<String> forwardedAddresses) throws HiveSQLException {
+    final String clientIpAddress = getOriginClientIpAddress(ipAddress, forwardedAddresses);
+
+    String violation = anyViolations(username, clientIpAddress);
+    // increment the counters only when there are no violations
+    if (violation == null) {
+      if (trackConnectionsPerUser(username)) {
+        connectionsCount.computeIfAbsent(username, k -> new LongAdder()).increment();
+      }
+
+      if (trackConnectionsPerIpAddress(clientIpAddress)) {
+        connectionsCount.computeIfAbsent(clientIpAddress, k -> new LongAdder()).increment();
+      }
+
+      if (trackConnectionsPerUserIpAddress(username, clientIpAddress)) {
+        connectionsCount.computeIfAbsent(username + ":" + clientIpAddress, k -> new LongAdder()).increment();
+      }
+    } else {
+      LOG.error(violation);
+      throw new HiveSQLException(violation);
+    }
+  }
+
+  private String getOriginClientIpAddress(final String ipAddress, final List<String> forwardedAddresses) {
+    if (forwardedAddresses == null || forwardedAddresses.isEmpty()) {
+      return ipAddress;
+    }
+    // order of forwarded ips per X-Forwarded-For http spec (client, proxy1, proxy2)
+    return forwardedAddresses.get(0);
+  }
+
+  private void decrementConnections(final HiveSession session) {
+    final String username = session.getUserName();
+    final String clientIpAddress = getOriginClientIpAddress(session.getIpAddress(), session.getForwardedAddresses());
+    if (trackConnectionsPerUser(username)) {
+      connectionsCount.computeIfPresent(username, (k, v) -> v).decrement();
+    }
+
+    if (trackConnectionsPerIpAddress(clientIpAddress)) {
+      connectionsCount.computeIfPresent(clientIpAddress, (k, v) -> v).decrement();
+    }
+
+    if (trackConnectionsPerUserIpAddress(username, clientIpAddress)) {
+      connectionsCount.computeIfPresent(username + ":" + clientIpAddress, (k, v) -> v).decrement();
+    }
+  }
+
+  private String anyViolations(final String username, final String ipAddress) {
+    if (trackConnectionsPerUser(username) && !withinLimits(username, userLimit)) {
+      return "Connection limit per user reached (user: " + username + " limit: " + userLimit + ")";
+    }
+
+    if (trackConnectionsPerIpAddress(ipAddress) && !withinLimits(ipAddress, ipAddressLimit)) {
+      return "Connection limit per ipaddress reached (ipaddress: " + ipAddress + " limit: " + ipAddressLimit + ")";
+    }
+
+    if (trackConnectionsPerUserIpAddress(username, ipAddress) &&
+      !withinLimits(username + ":" + ipAddress, userIpAddressLimit)) {
+      return "Connection limit per user:ipaddress reached (user:ipaddress: " + username + ":" + ipAddress + " limit: " +
+        userIpAddressLimit + ")";
+    }
+
+    return null;
+  }
+
+  private boolean trackConnectionsPerUserIpAddress(final String username, final String ipAddress) {
+    return userIpAddressLimit > 0 && username != null && !username.isEmpty() && ipAddress != null &&
+      !ipAddress.isEmpty();
+  }
+
+  private boolean trackConnectionsPerIpAddress(final String ipAddress) {
+    return ipAddressLimit > 0 && ipAddress != null && !ipAddress.isEmpty();
+  }
+
+  private boolean trackConnectionsPerUser(final String username) {
+    return userLimit > 0 && username != null && !username.isEmpty();
+  }
+
+  private boolean withinLimits(final String track, final int limit) {
+    if (connectionsCount.containsKey(track)) {
+      final int connectionCount = connectionsCount.get(track).intValue();
+      if (connectionCount >= limit) {
+        return false;
+      }
+    }
+    return true;
+  }
+
   public synchronized void closeSession(SessionHandle sessionHandle) throws HiveSQLException {
     HiveSession session = handleToSession.remove(sessionHandle);
     if (session == null) {
@@ -448,6 +551,7 @@ public class SessionManager extends CompositeService {
     try {
       session.close();
     } finally {
+      decrementConnections(session);
       // Shutdown HiveServer2 if it has been deregistered from ZooKeeper and has no active sessions
       if (!(hiveServer2 == null) && (hiveConf.getBoolVar(ConfVars.HIVE_SERVER2_SUPPORT_DYNAMIC_SERVICE_DISCOVERY))
           && (hiveServer2.isDeregisteredWithZooKeeper())) {

http://git-wip-us.apache.org/repos/asf/hive/blob/7195aee9/service/src/java/org/apache/hive/service/cli/thrift/ThriftCLIService.java
----------------------------------------------------------------------
diff --git a/service/src/java/org/apache/hive/service/cli/thrift/ThriftCLIService.java b/service/src/java/org/apache/hive/service/cli/thrift/ThriftCLIService.java
index 6354c8c..fc9e6b2 100644
--- a/service/src/java/org/apache/hive/service/cli/thrift/ThriftCLIService.java
+++ b/service/src/java/org/apache/hive/service/cli/thrift/ThriftCLIService.java
@@ -320,8 +320,6 @@ public abstract class ThriftCLIService extends AbstractService implements TCLISe
     LOG.info("Client protocol version: " + req.getClient_protocol());
     TOpenSessionResp resp = new TOpenSessionResp();
     try {
-      Map<String, String> openConf = req.getConfiguration();
-
       SessionHandle sessionHandle = getSessionHandle(req, resp);
       resp.setSessionHandle(sessionHandle.toTSessionHandle());
       Map<String, String> configurationMap = new HashMap<String, String>();

http://git-wip-us.apache.org/repos/asf/hive/blob/7195aee9/service/src/test/org/apache/hive/service/cli/TestCLIServiceConnectionLimits.java
----------------------------------------------------------------------
diff --git a/service/src/test/org/apache/hive/service/cli/TestCLIServiceConnectionLimits.java b/service/src/test/org/apache/hive/service/cli/TestCLIServiceConnectionLimits.java
new file mode 100644
index 0000000..5ecea9a
--- /dev/null
+++ b/service/src/test/org/apache/hive/service/cli/TestCLIServiceConnectionLimits.java
@@ -0,0 +1,337 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hive.service.cli;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+
+import org.apache.hadoop.hive.conf.HiveConf;
+import org.apache.hive.service.cli.session.SessionManager;
+import org.junit.Test;
+import org.junit.rules.ExpectedException;
+
+import com.google.common.collect.Lists;
+
+public class TestCLIServiceConnectionLimits {
+  @org.junit.Rule
+  public ExpectedException thrown = ExpectedException.none();
+
+  private int limit = 10;
+  private HiveConf conf = new HiveConf();
+
+  @Test
+  public void testNoLimit() throws HiveSQLException {
+    conf.setIntVar(HiveConf.ConfVars.HIVE_SERVER2_LIMIT_CONNECTIONS_PER_USER, 0);
+    conf.setIntVar(HiveConf.ConfVars.HIVE_SERVER2_LIMIT_CONNECTIONS_PER_IPADDRESS, 0);
+    conf.setIntVar(HiveConf.ConfVars.HIVE_SERVER2_LIMIT_CONNECTIONS_PER_USER_IPADDRESS, 0);
+    CLIService service = getService(conf);
+    List<SessionHandle> sessionHandles = new ArrayList<>();
+    try {
+      for (int i = 0; i < limit + 1; i++) {
+        SessionHandle session = service.openSession(CLIService.SERVER_VERSION, "foo", "bar", "127.0.0.1", null);
+        sessionHandles.add(session);
+      }
+    } finally {
+      for (SessionHandle sessionHandle : sessionHandles) {
+        service.closeSession(sessionHandle);
+      }
+      service.stop();
+    }
+  }
+
+  @Test
+  public void testIncrementAndDecrementConnectionsUser() throws HiveSQLException {
+    conf.setIntVar(HiveConf.ConfVars.HIVE_SERVER2_LIMIT_CONNECTIONS_PER_USER, 10);
+    conf.setIntVar(HiveConf.ConfVars.HIVE_SERVER2_LIMIT_CONNECTIONS_PER_IPADDRESS, 0);
+    conf.setIntVar(HiveConf.ConfVars.HIVE_SERVER2_LIMIT_CONNECTIONS_PER_USER_IPADDRESS, 0);
+    CLIService service = getService(conf);
+    List<SessionHandle> sessionHandles = new ArrayList<>();
+    try {
+      // open 5 connections
+      for (int i = 0; i < limit / 2; i++) {
+        SessionHandle session = service.openSession(CLIService.SERVER_VERSION, "foo", "bar", "127.0.0.1", null);
+        sessionHandles.add(session);
+      }
+
+      // close them all
+      for (SessionHandle sessionHandle : sessionHandles) {
+        service.closeSession(sessionHandle);
+      }
+      sessionHandles.clear();
+
+      // open till limit but not exceed
+      for (int i = 0; i < limit; i++) {
+        SessionHandle session = service.openSession(CLIService.SERVER_VERSION, "ff", "bar", "127.0.0.1", null);
+        sessionHandles.add(session);
+      }
+    } finally {
+      for (SessionHandle sessionHandle : sessionHandles) {
+        service.closeSession(sessionHandle);
+      }
+      service.stop();
+    }
+  }
+
+  @Test
+  public void testInvalidUserName() throws HiveSQLException {
+    conf.setIntVar(HiveConf.ConfVars.HIVE_SERVER2_LIMIT_CONNECTIONS_PER_USER, 10);
+    conf.setIntVar(HiveConf.ConfVars.HIVE_SERVER2_LIMIT_CONNECTIONS_PER_IPADDRESS, 0);
+    conf.setIntVar(HiveConf.ConfVars.HIVE_SERVER2_LIMIT_CONNECTIONS_PER_USER_IPADDRESS, 0);
+    CLIService service = getService(conf);
+    List<SessionHandle> sessionHandles = new ArrayList<>();
+    try {
+      for (int i = 0; i < limit + 1; i++) {
+        SessionHandle session = service.openSession(CLIService.SERVER_VERSION, null, "bar", "127.0.0.1", null);
+        sessionHandles.add(session);
+      }
+
+      for (int i = 0; i < limit + 1; i++) {
+        SessionHandle session = service.openSession(CLIService.SERVER_VERSION, "", "bar", "127.0.0.1", null);
+        sessionHandles.add(session);
+      }
+    } finally {
+      for (SessionHandle sessionHandle : sessionHandles) {
+        service.closeSession(sessionHandle);
+      }
+      service.stop();
+    }
+  }
+
+  @Test
+  public void testInvalidIpaddress() throws HiveSQLException {
+    conf.setIntVar(HiveConf.ConfVars.HIVE_SERVER2_LIMIT_CONNECTIONS_PER_USER, 0);
+    conf.setIntVar(HiveConf.ConfVars.HIVE_SERVER2_LIMIT_CONNECTIONS_PER_IPADDRESS, 10);
+    conf.setIntVar(HiveConf.ConfVars.HIVE_SERVER2_LIMIT_CONNECTIONS_PER_USER_IPADDRESS, 0);
+    CLIService service = getService(conf);
+    List<SessionHandle> sessionHandles = new ArrayList<>();
+    try {
+      for (int i = 0; i < limit + 1; i++) {
+        SessionHandle session = service.openSession(CLIService.SERVER_VERSION, "foo", "bar", null, null);
+        sessionHandles.add(session);
+      }
+
+      for (int i = 0; i < limit + 1; i++) {
+        SessionHandle session = service.openSession(CLIService.SERVER_VERSION, "foo", "bar", "", null);
+        sessionHandles.add(session);
+      }
+    } finally {
+      for (SessionHandle sessionHandle : sessionHandles) {
+        service.closeSession(sessionHandle);
+      }
+      service.stop();
+    }
+  }
+
+  @Test
+  public void testInvalidUserIpaddress() throws HiveSQLException {
+    conf.setIntVar(HiveConf.ConfVars.HIVE_SERVER2_LIMIT_CONNECTIONS_PER_USER, 0);
+    conf.setIntVar(HiveConf.ConfVars.HIVE_SERVER2_LIMIT_CONNECTIONS_PER_IPADDRESS, 10);
+    conf.setIntVar(HiveConf.ConfVars.HIVE_SERVER2_LIMIT_CONNECTIONS_PER_USER_IPADDRESS, 0);
+    CLIService service = getService(conf);
+    List<SessionHandle> sessionHandles = new ArrayList<>();
+    try {
+      for (int i = 0; i < limit + 1; i++) {
+        SessionHandle session = service.openSession(CLIService.SERVER_VERSION, "   ", "bar", null, null);
+        sessionHandles.add(session);
+      }
+    } finally {
+      for (SessionHandle sessionHandle : sessionHandles) {
+        service.closeSession(sessionHandle);
+      }
+      service.stop();
+    }
+  }
+
+  @Test
+  public void testConnectionLimitPerUser() throws HiveSQLException {
+    thrown.expect(HiveSQLException.class);
+    thrown.expectMessage("Connection limit per user reached (user: foo limit: 10)");
+
+    conf.setIntVar(HiveConf.ConfVars.HIVE_SERVER2_LIMIT_CONNECTIONS_PER_USER, 10);
+    conf.setIntVar(HiveConf.ConfVars.HIVE_SERVER2_LIMIT_CONNECTIONS_PER_IPADDRESS, 0);
+    conf.setIntVar(HiveConf.ConfVars.HIVE_SERVER2_LIMIT_CONNECTIONS_PER_USER_IPADDRESS, 0);
+    CLIService service = getService(conf);
+    List<SessionHandle> sessionHandles = new ArrayList<>();
+    try {
+      for (int i = 0; i < limit + 1; i++) {
+        SessionHandle session = service.openSession(CLIService.SERVER_VERSION, "foo", "bar", "127.0.0.1", null);
+        sessionHandles.add(session);
+      }
+
+    } finally {
+      for (SessionHandle sessionHandle : sessionHandles) {
+        service.closeSession(sessionHandle);
+      }
+      service.stop();
+    }
+  }
+
+  @Test
+  public void testConnectionLimitPerIpAddress() throws HiveSQLException {
+    thrown.expect(HiveSQLException.class);
+    thrown.expectMessage("Connection limit per ipaddress reached (ipaddress: 127.0.0.1 limit: 10)");
+
+    conf.setIntVar(HiveConf.ConfVars.HIVE_SERVER2_LIMIT_CONNECTIONS_PER_USER, 0);
+    conf.setIntVar(HiveConf.ConfVars.HIVE_SERVER2_LIMIT_CONNECTIONS_PER_IPADDRESS, 10);
+    conf.setIntVar(HiveConf.ConfVars.HIVE_SERVER2_LIMIT_CONNECTIONS_PER_USER_IPADDRESS, 0);
+    CLIService service = getService(conf);
+    List<SessionHandle> sessionHandles = new ArrayList<>();
+    try {
+      for (int i = 0; i < limit + 1; i++) {
+        SessionHandle session = service.openSession(CLIService.SERVER_VERSION, "foo", "bar", "127.0.0.1", null);
+        sessionHandles.add(session);
+      }
+
+    } finally {
+      for (SessionHandle sessionHandle : sessionHandles) {
+        service.closeSession(sessionHandle);
+      }
+      service.stop();
+    }
+  }
+
+  @Test
+  public void testConnectionLimitPerUserIpAddress() throws HiveSQLException {
+    thrown.expect(HiveSQLException.class);
+    thrown.expectMessage("Connection limit per user:ipaddress reached (user:ipaddress: foo:127.0.0.1 limit: 10)");
+
+    conf.setIntVar(HiveConf.ConfVars.HIVE_SERVER2_LIMIT_CONNECTIONS_PER_USER, 0);
+    conf.setIntVar(HiveConf.ConfVars.HIVE_SERVER2_LIMIT_CONNECTIONS_PER_IPADDRESS, 0);
+    conf.setIntVar(HiveConf.ConfVars.HIVE_SERVER2_LIMIT_CONNECTIONS_PER_USER_IPADDRESS, 10);
+    CLIService service = getService(conf);
+    List<SessionHandle> sessionHandles = new ArrayList<>();
+    try {
+      for (int i = 0; i < limit + 1; i++) {
+        SessionHandle session = service.openSession(CLIService.SERVER_VERSION, "foo", "bar", "127.0.0.1", null);
+        sessionHandles.add(session);
+      }
+
+    } finally {
+      for (SessionHandle sessionHandle : sessionHandles) {
+        service.closeSession(sessionHandle);
+      }
+      service.stop();
+    }
+  }
+
+  @Test
+  public void testConnectionMultipleLimitsUserAndIP() throws HiveSQLException {
+    thrown.expect(HiveSQLException.class);
+    thrown.expectMessage("Connection limit per user reached (user: foo limit: 5)");
+
+    conf.setIntVar(HiveConf.ConfVars.HIVE_SERVER2_LIMIT_CONNECTIONS_PER_USER, 5);
+    conf.setIntVar(HiveConf.ConfVars.HIVE_SERVER2_LIMIT_CONNECTIONS_PER_IPADDRESS, 10);
+    conf.setIntVar(HiveConf.ConfVars.HIVE_SERVER2_LIMIT_CONNECTIONS_PER_USER_IPADDRESS, 0);
+    CLIService service = getService(conf);
+    List<SessionHandle> sessionHandles = new ArrayList<>();
+    try {
+      for (int i = 0; i < limit + 1; i++) {
+        SessionHandle session = service.openSession(CLIService.SERVER_VERSION, "foo", "bar", "127.0.0.1", null);
+        sessionHandles.add(session);
+      }
+
+    } finally {
+      for (SessionHandle sessionHandle : sessionHandles) {
+        service.closeSession(sessionHandle);
+      }
+      service.stop();
+    }
+  }
+
+  @Test
+  public void testConnectionMultipleLimitsIPAndUserIP() throws HiveSQLException {
+    thrown.expect(HiveSQLException.class);
+    thrown.expectMessage("Connection limit per ipaddress reached (ipaddress: 127.0.0.1 limit: 5)");
+
+    conf.setIntVar(HiveConf.ConfVars.HIVE_SERVER2_LIMIT_CONNECTIONS_PER_USER, 0);
+    conf.setIntVar(HiveConf.ConfVars.HIVE_SERVER2_LIMIT_CONNECTIONS_PER_IPADDRESS, 5);
+    conf.setIntVar(HiveConf.ConfVars.HIVE_SERVER2_LIMIT_CONNECTIONS_PER_USER_IPADDRESS, 10);
+    CLIService service = getService(conf);
+    List<SessionHandle> sessionHandles = new ArrayList<>();
+    try {
+      for (int i = 0; i < limit + 1; i++) {
+        SessionHandle session = service.openSession(CLIService.SERVER_VERSION, "foo", "bar", "127.0.0.1", null);
+        sessionHandles.add(session);
+      }
+
+    } finally {
+      for (SessionHandle sessionHandle : sessionHandles) {
+        service.closeSession(sessionHandle);
+      }
+      service.stop();
+    }
+  }
+
+  @Test
+  public void testConnectionMultipleLimitsUserIPAndUser() throws HiveSQLException {
+    thrown.expect(HiveSQLException.class);
+    thrown.expectMessage("Connection limit per user:ipaddress reached (user:ipaddress: foo:127.0.0.1 limit: 10)");
+
+    conf.setIntVar(HiveConf.ConfVars.HIVE_SERVER2_LIMIT_CONNECTIONS_PER_USER, 15);
+    conf.setIntVar(HiveConf.ConfVars.HIVE_SERVER2_LIMIT_CONNECTIONS_PER_IPADDRESS, 0);
+    conf.setIntVar(HiveConf.ConfVars.HIVE_SERVER2_LIMIT_CONNECTIONS_PER_USER_IPADDRESS, 10);
+    CLIService service = getService(conf);
+    List<SessionHandle> sessionHandles = new ArrayList<>();
+    try {
+      for (int i = 0; i < limit + 1; i++) {
+        SessionHandle session = service.openSession(CLIService.SERVER_VERSION, "foo", "bar", "127.0.0.1", null);
+        sessionHandles.add(session);
+      }
+
+    } finally {
+      for (SessionHandle sessionHandle : sessionHandles) {
+        service.closeSession(sessionHandle);
+      }
+      service.stop();
+    }
+  }
+
+  @Test
+  public void testConnectionForwardedIpAddresses() throws HiveSQLException {
+    thrown.expect(HiveSQLException.class);
+    thrown.expectMessage("Connection limit per ipaddress reached (ipaddress: 194.167.0.3 limit: 10)");
+
+    conf.setIntVar(HiveConf.ConfVars.HIVE_SERVER2_LIMIT_CONNECTIONS_PER_USER, 0);
+    conf.setIntVar(HiveConf.ConfVars.HIVE_SERVER2_LIMIT_CONNECTIONS_PER_IPADDRESS, 10);
+    conf.setIntVar(HiveConf.ConfVars.HIVE_SERVER2_LIMIT_CONNECTIONS_PER_USER_IPADDRESS, 10);
+    CLIService service = getService(conf);
+    SessionManager.setForwardedAddresses(Lists.newArrayList("194.167.0.3", "194.167.0.2", "194.167.0.1"));
+    List<SessionHandle> sessionHandles = new ArrayList<>();
+    try {
+      for (int i = 0; i < limit + 1; i++) {
+        SessionHandle session = service.openSession(CLIService.SERVER_VERSION, "foo", "bar", "194.167.0.1", null);
+        sessionHandles.add(session);
+      }
+
+    } finally {
+      SessionManager.setForwardedAddresses(Collections.emptyList());
+      for (SessionHandle sessionHandle : sessionHandles) {
+        service.closeSession(sessionHandle);
+      }
+      service.stop();
+    }
+  }
+
+  private CLIService getService(HiveConf conf) {
+    conf.setVar(HiveConf.ConfVars.HIVE_AUTHORIZATION_MANAGER,
+      "org.apache.hadoop.hive.ql.security.authorization.plugin.sqlstd.SQLStdHiveAuthorizerFactory");
+    CLIService service = new CLIService(null);
+    service.init(conf);
+    service.start();
+    return service;
+  }
+}

http://git-wip-us.apache.org/repos/asf/hive/blob/7195aee9/service/src/test/org/apache/hive/service/cli/session/TestPluggableHiveSessionImpl.java
----------------------------------------------------------------------
diff --git a/service/src/test/org/apache/hive/service/cli/session/TestPluggableHiveSessionImpl.java b/service/src/test/org/apache/hive/service/cli/session/TestPluggableHiveSessionImpl.java
index 47f95c5..90237c0 100644
--- a/service/src/test/org/apache/hive/service/cli/session/TestPluggableHiveSessionImpl.java
+++ b/service/src/test/org/apache/hive/service/cli/session/TestPluggableHiveSessionImpl.java
@@ -19,6 +19,8 @@ package org.apache.hive.service.cli.session;
 
 import static org.junit.Assert.assertEquals;
 
+import java.util.List;
+
 import org.apache.hadoop.hive.conf.HiveConf;
 import org.apache.hive.service.cli.CLIService;
 import org.apache.hive.service.cli.HiveSQLException;
@@ -87,8 +89,8 @@ public class TestPluggableHiveSessionImpl {
     public static final int MAGIC_RETURN_VALUE = 0xbeef0001;
 
     public SampleHiveSessionImpl(SessionHandle sessionHandle, TProtocolVersion protocol,
-        String username, String password, HiveConf serverhiveConf, String ipAddress) {
-      super(sessionHandle, protocol, username, password, serverhiveConf, ipAddress);
+        String username, String password, HiveConf serverhiveConf, String ipAddress, List<String> forwardAddresses) {
+      super(sessionHandle, protocol, username, password, serverhiveConf, ipAddress, forwardAddresses);
     }
 
     @Override
@@ -103,9 +105,9 @@ public class TestPluggableHiveSessionImpl {
 
     public SampleHiveSessionImplWithUGI(SessionHandle sessionHandle, TProtocolVersion protocol,
         String username, String password, HiveConf serverhiveConf, String ipAddress,
-        String delegationToken) throws HiveSQLException {
+        String delegationToken, List<String> forwardedAddresses) throws HiveSQLException {
       super(sessionHandle, protocol, username, password, serverhiveConf, ipAddress,
-          delegationToken);
+          delegationToken, forwardedAddresses);
     }
 
     @Override