You are viewing a plain text version of this content. The canonical link for it is here.
Posted to reviews@livy.apache.org by GitBox <gi...@apache.org> on 2020/03/02 03:10:39 UTC

[GitHub] [incubator-livy] wypoon opened a new pull request #284: [LIVY-752] Fix implementation of limits on connections.

wypoon opened a new pull request #284: [LIVY-752] Fix implementation of limits on connections.
URL: https://github.com/apache/incubator-livy/pull/284
 
 
   ## What changes were proposed in this pull request?
   
   `LivyThriftSessionManager` keeps a `ConcurrentHashMap[String, AtomicLong]` named `connectionsCount` to track the number of connections per user, etc. The `incrementConnectionsCount` and `decrementConnectionsCount` methods in `LivyThriftSessionManager` check that `connectionsCount` does not contain a key (instead of contains the key) before getting the value and incrementing or decrementing the count (leading to a `NullPointerException`). Even accounting for the incorrect condition, they do not use the `ConcurrentHashMap` correctly. There is a race -- a thread can get a count, find that it's within a limit, create a new session and then increment the count, while in the meantime, another thread could have incremented the count and so the limit is now actually exceeded.
   
   We increment all relevant counts optimistically before creating a new session, check if any limits are violated, and if so, decrement all incremented counts.
   
   ## How was this patch tested?
   
   Tested by deploying the change on a cluster and setting livy.server.thrift.limit.connections.per.user. Verified that the number of connections reaches but does not exceed the limit.
   

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [incubator-livy] mgaido91 commented on a change in pull request #284: [LIVY-752][THRIFT] Fix implementation of limits on connections.

Posted by GitBox <gi...@apache.org>.
mgaido91 commented on a change in pull request #284: [LIVY-752][THRIFT] Fix implementation of limits on connections.
URL: https://github.com/apache/incubator-livy/pull/284#discussion_r391789507
 
 

 ##########
 File path: thriftserver/server/src/test/scala/org/apache/livy/thriftserver/TestLivyThriftSessionManager.scala
 ##########
 @@ -0,0 +1,145 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.livy.thriftserver
+
+import org.apache.hive.service.cli.HiveSQLException
+import org.junit.Assert._
+import org.junit.Test
+import org.mockito.Mockito.mock
+
+import org.apache.livy.LivyConf
+import org.apache.livy.server.AccessManager
+import org.apache.livy.server.recovery.{SessionStore, StateStore}
+import org.apache.livy.sessions.InteractiveSessionManager
+
+object ConnectionLimitType extends Enumeration {
+  type ConnectionLimitType = Value
+  val User, IpAddress, UserIpAddress = Value
+}
+
+class TestLivyThriftSessionManager {
+
+  import ConnectionLimitType._
+
+  private def createThriftSessionManager(
+      limitTypes: ConnectionLimitType*): LivyThriftSessionManager = {
+    val conf = new LivyConf()
+    conf.set(LivyConf.LIVY_SPARK_VERSION, sys.env("LIVY_SPARK_VERSION"))
+    val limit = 3
+    for (limitType <- limitTypes) {
 
 Review comment:
   nit:
   ```suggestion
      limitTypes.foreach { limitType =>
   ```

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [incubator-livy] mgaido91 commented on a change in pull request #284: [LIVY-752][THRIFT] Fix implementation of limits on connections.

Posted by GitBox <gi...@apache.org>.
mgaido91 commented on a change in pull request #284: [LIVY-752][THRIFT] Fix implementation of limits on connections.
URL: https://github.com/apache/incubator-livy/pull/284#discussion_r390569916
 
 

 ##########
 File path: thriftserver/server/src/test/scala/org/apache/livy/thriftserver/TestLivyThriftSessionManager.scala
 ##########
 @@ -0,0 +1,127 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.livy.thriftserver
+
+import org.apache.hive.service.cli.HiveSQLException
+import org.junit.Assert._
+import org.junit.Test
+import org.mockito.Mockito.mock
+
+import org.apache.livy.LivyConf
+import org.apache.livy.server.AccessManager
+import org.apache.livy.server.recovery.{SessionStore, StateStore}
+import org.apache.livy.sessions.InteractiveSessionManager
+
+object ConnectionLimitType extends Enumeration {
+  type ConnectionLimitType = Value
+  val User, IpAddress, UserIpAddress = Value
+}
+
+class TestLivyThriftSessionManager {
+
+  import ConnectionLimitType._
+
+  private def createThriftSessionManager(
+      limitType: ConnectionLimitType): LivyThriftSessionManager = {
+    val conf = new LivyConf()
+    conf.set(LivyConf.LIVY_SPARK_VERSION, sys.env("LIVY_SPARK_VERSION"))
+    val limit = 3
+    val entry = limitType match {
+      case User => LivyConf.THRIFT_LIMIT_CONNECTIONS_PER_USER
+      case IpAddress => LivyConf.THRIFT_LIMIT_CONNECTIONS_PER_IPADDRESS
+      case UserIpAddress => LivyConf.THRIFT_LIMIT_CONNECTIONS_PER_USER_IPADDRESS
+    }
+    conf.set(entry, limit)
+    val server = new LivyThriftServer(
+      conf,
+      mock(classOf[InteractiveSessionManager]),
+      mock(classOf[SessionStore]),
+      mock(classOf[AccessManager])
+    )
+    new LivyThriftSessionManager(server, conf)
+  }
+
+  private def testLimit(
+      thriftSessionMgr: LivyThriftSessionManager,
+      user: String,
+      ipAddress: String,
+      forwardedAddresses: java.util.List[String],
+      msg: String): Unit = {
+    val failureMsg = "Should have thrown HiveSQLException"
+    try {
+      thriftSessionMgr.incrementConnections(user, ipAddress, forwardedAddresses)
+      fail(failureMsg)
+    } catch {
+      case e: HiveSQLException =>
+        assertEquals(msg, e.getMessage)
+      case _: Throwable =>
+        fail(failureMsg)
+    }
+  }
+
+  @Test
+  def testLimitConnectionsByUser(): Unit = {
+    val thriftSessionMgr = createThriftSessionManager(User)
+    val user = "alice"
+    val forwardedAddresses = new java.util.ArrayList[String]()
+    thriftSessionMgr.incrementConnections(user, "10.20.30.40", forwardedAddresses)
+    thriftSessionMgr.incrementConnections(user, "10.20.30.41", forwardedAddresses)
+    thriftSessionMgr.incrementConnections(user, "10.20.30.42", forwardedAddresses)
+    val msg = s"Connection limit per user reached (user: $user limit: 3)"
+    testLimit(thriftSessionMgr, user, "10.20.30.43", forwardedAddresses, msg)
+  }
+
+  @Test
+  def testLimitConnectionsByIpAddress(): Unit = {
+    val thriftSessionMgr = createThriftSessionManager(IpAddress)
+    val ipAddress = "10.20.30.40"
+    val forwardedAddresses = new java.util.ArrayList[String]()
+    thriftSessionMgr.incrementConnections("alice", ipAddress, forwardedAddresses)
+    thriftSessionMgr.incrementConnections("bob", ipAddress, forwardedAddresses)
+    thriftSessionMgr.incrementConnections("charlie", ipAddress, forwardedAddresses)
+    val msg = s"Connection limit per ipaddress reached (ipaddress: $ipAddress limit: 3)"
+    testLimit(thriftSessionMgr, "dan", ipAddress, forwardedAddresses, msg)
+  }
+
+  @Test
+  def testLimitConnectionsByUserAndIpAddress(): Unit = {
+    val thriftSessionMgr = createThriftSessionManager(UserIpAddress)
+    val user = "alice"
+    val ipAddress = "10.20.30.40"
+    val userAndAddress = user + ":" + ipAddress
+    val forwardedAddresses = new java.util.ArrayList[String]()
+    thriftSessionMgr.incrementConnections(user, ipAddress, forwardedAddresses)
+
+    // more than 3 connections from the same IP Address is ok if users are different
+    thriftSessionMgr.incrementConnections("bob", ipAddress, forwardedAddresses)
+    thriftSessionMgr.incrementConnections("charlie", ipAddress, forwardedAddresses)
+    thriftSessionMgr.incrementConnections("dan", ipAddress, forwardedAddresses)
+
+    // more than 3 connections from the same user is ok if IP addresses are different
+    thriftSessionMgr.incrementConnections(user, "10.20.30.41", forwardedAddresses)
+    thriftSessionMgr.incrementConnections(user, "10.20.30.42", forwardedAddresses)
+    thriftSessionMgr.incrementConnections(user, "10.20.30.43", forwardedAddresses)
+
+    thriftSessionMgr.incrementConnections(user, ipAddress, forwardedAddresses)
+    thriftSessionMgr.incrementConnections(user, ipAddress, forwardedAddresses)
+    val msg =
+      s"Connection limit per user:ipaddress reached (user:ipaddress: $userAndAddress limit: 3)"
+    testLimit(thriftSessionMgr, user, ipAddress, forwardedAddresses, msg)
+  }
+
 
 Review comment:
   yes, thanks @squito , I agree the current status of this PR is already an improvement, my comment was a nit, but I think it is a nice to have. Since I don't think it is much effort to add it, I think it would be great to have it. 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [incubator-livy] codecov-io edited a comment on issue #284: [LIVY-752] Fix implementation of limits on connections.

Posted by GitBox <gi...@apache.org>.
codecov-io edited a comment on issue #284: [LIVY-752] Fix implementation of limits on connections.
URL: https://github.com/apache/incubator-livy/pull/284#issuecomment-593585529
 
 
   # [Codecov](https://codecov.io/gh/apache/incubator-livy/pull/284?src=pr&el=h1) Report
   > Merging [#284](https://codecov.io/gh/apache/incubator-livy/pull/284?src=pr&el=desc) into [master](https://codecov.io/gh/apache/incubator-livy/commit/3a2685643670c1cd1530feb7a54516b01784e781?src=pr&el=desc) will **increase** coverage by `0.03%`.
   > The diff coverage is `n/a`.
   
   [![Impacted file tree graph](https://codecov.io/gh/apache/incubator-livy/pull/284/graphs/tree.svg?width=650&token=0MkVbiUFwE&height=150&src=pr)](https://codecov.io/gh/apache/incubator-livy/pull/284?src=pr&el=tree)
   
   ```diff
   @@             Coverage Diff              @@
   ##             master     #284      +/-   ##
   ============================================
   + Coverage      68.2%   68.24%   +0.03%     
   + Complexity      965      964       -1     
   ============================================
     Files           104      104              
     Lines          5951     5952       +1     
     Branches        900      900              
   ============================================
   + Hits           4059     4062       +3     
   + Misses         1314     1311       -3     
   - Partials        578      579       +1
   ```
   
   
   | [Impacted Files](https://codecov.io/gh/apache/incubator-livy/pull/284?src=pr&el=tree) | Coverage Δ | Complexity Δ | |
   |---|---|---|---|
   | [...cala/org/apache/livy/scalaapi/ScalaJobHandle.scala](https://codecov.io/gh/apache/incubator-livy/pull/284/diff?src=pr&el=tree#diff-c2NhbGEtYXBpL3NyYy9tYWluL3NjYWxhL29yZy9hcGFjaGUvbGl2eS9zY2FsYWFwaS9TY2FsYUpvYkhhbmRsZS5zY2FsYQ==) | `52.94% <0%> (-2.95%)` | `7% <0%> (ø)` | |
   | [...c/main/scala/org/apache/livy/repl/ReplDriver.scala](https://codecov.io/gh/apache/incubator-livy/pull/284/diff?src=pr&el=tree#diff-cmVwbC9zcmMvbWFpbi9zY2FsYS9vcmcvYXBhY2hlL2xpdnkvcmVwbC9SZXBsRHJpdmVyLnNjYWxh) | `30.76% <0%> (-2.57%)` | `7% <0%> (ø)` | |
   | [...va/org/apache/livy/client/http/LivyConnection.java](https://codecov.io/gh/apache/incubator-livy/pull/284/diff?src=pr&el=tree#diff-Y2xpZW50LWh0dHAvc3JjL21haW4vamF2YS9vcmcvYXBhY2hlL2xpdnkvY2xpZW50L2h0dHAvTGl2eUNvbm5lY3Rpb24uamF2YQ==) | `82.27% <0%> (+0.22%)` | `15% <0%> (ø)` | :arrow_down: |
   | [...main/scala/org/apache/livy/server/LivyServer.scala](https://codecov.io/gh/apache/incubator-livy/pull/284/diff?src=pr&el=tree#diff-c2VydmVyL3NyYy9tYWluL3NjYWxhL29yZy9hcGFjaGUvbGl2eS9zZXJ2ZXIvTGl2eVNlcnZlci5zY2FsYQ==) | `33.48% <0%> (+0.44%)` | `11% <0%> (ø)` | :arrow_down: |
   | [...ain/scala/org/apache/livy/utils/SparkYarnApp.scala](https://codecov.io/gh/apache/incubator-livy/pull/284/diff?src=pr&el=tree#diff-c2VydmVyL3NyYy9tYWluL3NjYWxhL29yZy9hcGFjaGUvbGl2eS91dGlscy9TcGFya1lhcm5BcHAuc2NhbGE=) | `73.75% <0%> (+1.87%)` | `40% <0%> (ø)` | :arrow_down: |
   
   ------
   
   [Continue to review full report at Codecov](https://codecov.io/gh/apache/incubator-livy/pull/284?src=pr&el=continue).
   > **Legend** - [Click here to learn more](https://docs.codecov.io/docs/codecov-delta)
   > `Δ = absolute <relative> (impact)`, `ø = not affected`, `? = missing data`
   > Powered by [Codecov](https://codecov.io/gh/apache/incubator-livy/pull/284?src=pr&el=footer). Last update [3a26856...38f26a7](https://codecov.io/gh/apache/incubator-livy/pull/284?src=pr&el=lastupdated). Read the [comment docs](https://docs.codecov.io/docs/pull-request-comments).
   

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [incubator-livy] squito commented on issue #284: [LIVY-752] Fix implementation of limits on connections.

Posted by GitBox <gi...@apache.org>.
squito commented on issue #284: [LIVY-752] Fix implementation of limits on connections.
URL: https://github.com/apache/incubator-livy/pull/284#issuecomment-595438421
 
 
   I think the helper `withinLimits()` is no longer used.
   
   without testing for concurrency, could you at least add a basic unit test that the limits are enforced, say on `anyViolations()`?  (given that the logic was entirely backwards before, a unit test seems like it would help here.)

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [incubator-livy] codecov-io edited a comment on issue #284: [LIVY-752][THRIFT] Fix implementation of limits on connections.

Posted by GitBox <gi...@apache.org>.
codecov-io edited a comment on issue #284: [LIVY-752][THRIFT] Fix implementation of limits on connections.
URL: https://github.com/apache/incubator-livy/pull/284#issuecomment-593585529
 
 
   # [Codecov](https://codecov.io/gh/apache/incubator-livy/pull/284?src=pr&el=h1) Report
   > Merging [#284](https://codecov.io/gh/apache/incubator-livy/pull/284?src=pr&el=desc) into [master](https://codecov.io/gh/apache/incubator-livy/commit/3a2685643670c1cd1530feb7a54516b01784e781&el=desc) will **increase** coverage by `0.03%`.
   > The diff coverage is `n/a`.
   
   [![Impacted file tree graph](https://codecov.io/gh/apache/incubator-livy/pull/284/graphs/tree.svg?width=650&height=150&src=pr&token=0MkVbiUFwE)](https://codecov.io/gh/apache/incubator-livy/pull/284?src=pr&el=tree)
   
   ```diff
   @@             Coverage Diff              @@
   ##             master     #284      +/-   ##
   ============================================
   + Coverage     68.20%   68.24%   +0.03%     
   + Complexity      965      964       -1     
   ============================================
     Files           104      104              
     Lines          5951     5952       +1     
     Branches        900      900              
   ============================================
   + Hits           4059     4062       +3     
   + Misses         1314     1310       -4     
   - Partials        578      580       +2     
   ```
   
   
   | [Impacted Files](https://codecov.io/gh/apache/incubator-livy/pull/284?src=pr&el=tree) | Coverage Δ | Complexity Δ | |
   |---|---|---|---|
   | [...cala/org/apache/livy/scalaapi/ScalaJobHandle.scala](https://codecov.io/gh/apache/incubator-livy/pull/284/diff?src=pr&el=tree#diff-c2NhbGEtYXBpL3NyYy9tYWluL3NjYWxhL29yZy9hcGFjaGUvbGl2eS9zY2FsYWFwaS9TY2FsYUpvYkhhbmRsZS5zY2FsYQ==) | `52.94% <0.00%> (-2.95%)` | `7.00% <0.00%> (ø%)` | |
   | [...c/main/scala/org/apache/livy/repl/ReplDriver.scala](https://codecov.io/gh/apache/incubator-livy/pull/284/diff?src=pr&el=tree#diff-cmVwbC9zcmMvbWFpbi9zY2FsYS9vcmcvYXBhY2hlL2xpdnkvcmVwbC9SZXBsRHJpdmVyLnNjYWxh) | `30.76% <0.00%> (-2.57%)` | `7.00% <0.00%> (ø%)` | |
   | [...ain/java/org/apache/livy/rsc/driver/RSCDriver.java](https://codecov.io/gh/apache/incubator-livy/pull/284/diff?src=pr&el=tree#diff-cnNjL3NyYy9tYWluL2phdmEvb3JnL2FwYWNoZS9saXZ5L3JzYy9kcml2ZXIvUlNDRHJpdmVyLmphdmE=) | `80.00% <0.00%> (-0.84%)` | `45.00% <0.00%> (-1.00%)` | |
   | [...va/org/apache/livy/client/http/LivyConnection.java](https://codecov.io/gh/apache/incubator-livy/pull/284/diff?src=pr&el=tree#diff-Y2xpZW50LWh0dHAvc3JjL21haW4vamF2YS9vcmcvYXBhY2hlL2xpdnkvY2xpZW50L2h0dHAvTGl2eUNvbm5lY3Rpb24uamF2YQ==) | `82.27% <0.00%> (+0.22%)` | `15.00% <0.00%> (ø%)` | |
   | [...main/scala/org/apache/livy/server/LivyServer.scala](https://codecov.io/gh/apache/incubator-livy/pull/284/diff?src=pr&el=tree#diff-c2VydmVyL3NyYy9tYWluL3NjYWxhL29yZy9hcGFjaGUvbGl2eS9zZXJ2ZXIvTGl2eVNlcnZlci5zY2FsYQ==) | `33.48% <0.00%> (+0.44%)` | `11.00% <0.00%> (ø%)` | |
   | [...ain/scala/org/apache/livy/utils/SparkYarnApp.scala](https://codecov.io/gh/apache/incubator-livy/pull/284/diff?src=pr&el=tree#diff-c2VydmVyL3NyYy9tYWluL3NjYWxhL29yZy9hcGFjaGUvbGl2eS91dGlscy9TcGFya1lhcm5BcHAuc2NhbGE=) | `75.00% <0.00%> (+3.12%)` | `40.00% <0.00%> (ø%)` | |
   
   ------
   
   [Continue to review full report at Codecov](https://codecov.io/gh/apache/incubator-livy/pull/284?src=pr&el=continue).
   > **Legend** - [Click here to learn more](https://docs.codecov.io/docs/codecov-delta)
   > `Δ = absolute <relative> (impact)`, `ø = not affected`, `? = missing data`
   > Powered by [Codecov](https://codecov.io/gh/apache/incubator-livy/pull/284?src=pr&el=footer). Last update [3a26856...2252ce1](https://codecov.io/gh/apache/incubator-livy/pull/284?src=pr&el=lastupdated). Read the [comment docs](https://docs.codecov.io/docs/pull-request-comments).
   

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [incubator-livy] wypoon commented on a change in pull request #284: [LIVY-752] Fix implementation of limits on connections.

Posted by GitBox <gi...@apache.org>.
wypoon commented on a change in pull request #284: [LIVY-752] Fix implementation of limits on connections.
URL: https://github.com/apache/incubator-livy/pull/284#discussion_r388596005
 
 

 ##########
 File path: thriftserver/server/src/main/scala/org/apache/livy/thriftserver/LivyThriftSessionManager.scala
 ##########
 @@ -455,17 +457,45 @@ class LivyThriftSessionManager(val server: LivyThriftServer, val livyConf: LivyC
     }
   }
 
-  // Taken from Hive
   private def anyViolations(username: String, ipAddress: String): Option[String] = {
 
 Review comment:
   I can rename it checkForViolations. I'd rather not have a long name like checkForViolationsAndIncrementCountsAsNeeded. I document the side effect in the comment immediately. I think that is fine.

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [incubator-livy] squito commented on issue #284: [LIVY-752] Fix implementation of limits on connections.

Posted by GitBox <gi...@apache.org>.
squito commented on issue #284: [LIVY-752] Fix implementation of limits on connections.
URL: https://github.com/apache/incubator-livy/pull/284#issuecomment-595438567
 
 
   those things aside, the change itself lgtm

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [incubator-livy] wypoon commented on a change in pull request #284: [LIVY-752][THRIFT] Fix implementation of limits on connections.

Posted by GitBox <gi...@apache.org>.
wypoon commented on a change in pull request #284: [LIVY-752][THRIFT] Fix implementation of limits on connections.
URL: https://github.com/apache/incubator-livy/pull/284#discussion_r390032048
 
 

 ##########
 File path: thriftserver/server/src/main/scala/org/apache/livy/thriftserver/LivyThriftSessionManager.scala
 ##########
 @@ -455,21 +443,58 @@ class LivyThriftSessionManager(val server: LivyThriftServer, val livyConf: LivyC
     }
   }
 
-  // Taken from Hive
-  private def anyViolations(username: String, ipAddress: String): Option[String] = {
-    val userAndAddress = username + ":" + ipAddress
-    if (trackConnectionsPerUser(username) && !withinLimits(username, userLimit)) {
-      Some(s"Connection limit per user reached (user: $username limit: $userLimit)")
-    } else if (trackConnectionsPerIpAddress(ipAddress) &&
-        !withinLimits(ipAddress, ipAddressLimit)) {
-      Some(s"Connection limit per ipaddress reached (ipaddress: $ipAddress limit: " +
-        s"$ipAddressLimit)")
-    } else if (trackConnectionsPerUserIpAddress(username, ipAddress) &&
-        !withinLimits(userAndAddress, userIpAddressLimit)) {
-      Some(s"Connection limit per user:ipaddress reached (user:ipaddress: $userAndAddress " +
-        s"limit: $userIpAddressLimit)")
-    } else {
-      None
+  // Visible for testing
+  @throws[HiveSQLException]
+  private[thriftserver] def incrementConnections(
+      username: String,
+      ipAddress: String,
+      forwardedAddresses: util.List[String]): Unit = {
+    val clientIpAddress: String = getOriginClientIpAddress(ipAddress, forwardedAddresses)
+    val userAndAddress = username + ":" + clientIpAddress
+    val trackUser = trackConnectionsPerUser(username)
+    val trackIpAddress = trackConnectionsPerIpAddress(clientIpAddress)
+    val trackUserIpAddress = trackConnectionsPerUserIpAddress(username, clientIpAddress)
+    var userLimitExceeded = false
+    var ipAddressLimitExceeded = false
+    var userIpAddressLimitExceeded = false
+
+    // Optimistically increment the counts while getting them to check for violations.
+    if (trackUser) {
+      val userCount = incrementConnectionsCount(username)
+      if (userCount > userLimit) userLimitExceeded = true
+    }
+    if (trackIpAddress) {
+      val ipAddressCount = incrementConnectionsCount(clientIpAddress)
+      if (ipAddressCount > ipAddressLimit) ipAddressLimitExceeded = true
+    }
+    if (trackUserIpAddress) {
+      val userIpAddressCount = incrementConnectionsCount(userAndAddress)
+      if (userIpAddressCount > userIpAddressLimit) userIpAddressLimitExceeded = true
+    }
+
+    // If any limit has been exceeded, we won't be going ahead with the connection,
+    // so decrement all counts that have been incremented.
+    if (userLimitExceeded || ipAddressLimitExceeded || userIpAddressLimitExceeded) {
+      if (trackUser) decrementConnectionsCount(username)
 
 Review comment:
   I think this is clean and easy to reason about. If I understand you correctly, you are suggesting not calling incrementConnectionsCount on a key if we already have a limit exceeded so that we don't have to call decrementConnectionsCount on that same key. That leads to more complex logic and is harder to reason about.
   
   E.g.,
   ```
       // Optimistically increment the counts while getting them to check for violations.
       // If any limit has been exceeded, we won't be going ahead with the connection,
       // so decrement all counts that have been incremented.
       if (trackUser) {
         val userCount = incrementConnectionsCount(username)
         if (userCount > userLimit) {
           userLimitExceeded = true
           decrementConnectionsCount(username)
         }
       }
       if (trackIpAddress && !userLimitExceeded) {
         val ipAddressCount = incrementConnectionsCount(clientIpAddress)
         if (ipAddressCount > ipAddressLimit) {
           ipAddressLimitExceeded = true
           if (trackUser) decrementConnectionsCount(username)
           decrementConnectionsCount(clientIpAddress)
         }
       }
       if (trackUserIpAddress && !userLimitExceeded && !ipAddressLimitExceeded) {
         val userIpAddressCount = incrementConnectionsCount(userAndAddress)
         if (userIpAddressCount > userIpAddressLimit) {
           userIpAddressLimitExceeded = true
           if (trackUser) decrementConnectionsCount(username)
           if (trackIpAddress) decrementConnectionsCount(clientIpAddress)
           decrementConnectionsCount(userAndAddress)
         }
       }
   ```
   This logic is, imo, more complex and more difficult to reason about (and prove is correct!).
   Therefore I prefer to stick with the existing version.
   

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [incubator-livy] codecov-io commented on issue #284: [LIVY-752] Fix implementation of limits on connections.

Posted by GitBox <gi...@apache.org>.
codecov-io commented on issue #284: [LIVY-752] Fix implementation of limits on connections.
URL: https://github.com/apache/incubator-livy/pull/284#issuecomment-593585529
 
 
   # [Codecov](https://codecov.io/gh/apache/incubator-livy/pull/284?src=pr&el=h1) Report
   > Merging [#284](https://codecov.io/gh/apache/incubator-livy/pull/284?src=pr&el=desc) into [master](https://codecov.io/gh/apache/incubator-livy/commit/3a2685643670c1cd1530feb7a54516b01784e781?src=pr&el=desc) will **increase** coverage by `0.02%`.
   > The diff coverage is `n/a`.
   
   [![Impacted file tree graph](https://codecov.io/gh/apache/incubator-livy/pull/284/graphs/tree.svg?width=650&token=0MkVbiUFwE&height=150&src=pr)](https://codecov.io/gh/apache/incubator-livy/pull/284?src=pr&el=tree)
   
   ```diff
   @@             Coverage Diff              @@
   ##             master     #284      +/-   ##
   ============================================
   + Coverage      68.2%   68.22%   +0.02%     
   + Complexity      965      964       -1     
   ============================================
     Files           104      104              
     Lines          5951     5952       +1     
     Branches        900      900              
   ============================================
   + Hits           4059     4061       +2     
   + Misses         1314     1311       -3     
   - Partials        578      580       +2
   ```
   
   
   | [Impacted Files](https://codecov.io/gh/apache/incubator-livy/pull/284?src=pr&el=tree) | Coverage Δ | Complexity Δ | |
   |---|---|---|---|
   | [...c/main/scala/org/apache/livy/repl/ReplDriver.scala](https://codecov.io/gh/apache/incubator-livy/pull/284/diff?src=pr&el=tree#diff-cmVwbC9zcmMvbWFpbi9zY2FsYS9vcmcvYXBhY2hlL2xpdnkvcmVwbC9SZXBsRHJpdmVyLnNjYWxh) | `30.76% <0%> (-2.57%)` | `7% <0%> (ø)` | |
   | [...ain/java/org/apache/livy/rsc/driver/RSCDriver.java](https://codecov.io/gh/apache/incubator-livy/pull/284/diff?src=pr&el=tree#diff-cnNjL3NyYy9tYWluL2phdmEvb3JnL2FwYWNoZS9saXZ5L3JzYy9kcml2ZXIvUlNDRHJpdmVyLmphdmE=) | `80% <0%> (-0.84%)` | `45% <0%> (-1%)` | |
   | [...va/org/apache/livy/client/http/LivyConnection.java](https://codecov.io/gh/apache/incubator-livy/pull/284/diff?src=pr&el=tree#diff-Y2xpZW50LWh0dHAvc3JjL21haW4vamF2YS9vcmcvYXBhY2hlL2xpdnkvY2xpZW50L2h0dHAvTGl2eUNvbm5lY3Rpb24uamF2YQ==) | `82.27% <0%> (+0.22%)` | `15% <0%> (ø)` | :arrow_down: |
   | [...main/scala/org/apache/livy/server/LivyServer.scala](https://codecov.io/gh/apache/incubator-livy/pull/284/diff?src=pr&el=tree#diff-c2VydmVyL3NyYy9tYWluL3NjYWxhL29yZy9hcGFjaGUvbGl2eS9zZXJ2ZXIvTGl2eVNlcnZlci5zY2FsYQ==) | `33.48% <0%> (+0.44%)` | `11% <0%> (ø)` | :arrow_down: |
   | [...ain/scala/org/apache/livy/utils/SparkYarnApp.scala](https://codecov.io/gh/apache/incubator-livy/pull/284/diff?src=pr&el=tree#diff-c2VydmVyL3NyYy9tYWluL3NjYWxhL29yZy9hcGFjaGUvbGl2eS91dGlscy9TcGFya1lhcm5BcHAuc2NhbGE=) | `73.75% <0%> (+1.87%)` | `40% <0%> (ø)` | :arrow_down: |
   
   ------
   
   [Continue to review full report at Codecov](https://codecov.io/gh/apache/incubator-livy/pull/284?src=pr&el=continue).
   > **Legend** - [Click here to learn more](https://docs.codecov.io/docs/codecov-delta)
   > `Δ = absolute <relative> (impact)`, `ø = not affected`, `? = missing data`
   > Powered by [Codecov](https://codecov.io/gh/apache/incubator-livy/pull/284?src=pr&el=footer). Last update [3a26856...3261a38](https://codecov.io/gh/apache/incubator-livy/pull/284?src=pr&el=lastupdated). Read the [comment docs](https://docs.codecov.io/docs/pull-request-comments).
   

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [incubator-livy] mgaido91 closed pull request #284: [LIVY-752][THRIFT] Fix implementation of limits on connections.

Posted by GitBox <gi...@apache.org>.
mgaido91 closed pull request #284: [LIVY-752][THRIFT] Fix implementation of limits on connections.
URL: https://github.com/apache/incubator-livy/pull/284
 
 
   

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [incubator-livy] mgaido91 commented on issue #284: [LIVY-752][THRIFT] Fix implementation of limits on connections.

Posted by GitBox <gi...@apache.org>.
mgaido91 commented on issue #284: [LIVY-752][THRIFT] Fix implementation of limits on connections.
URL: https://github.com/apache/incubator-livy/pull/284#issuecomment-599641239
 
 
   Merged to master. Thanks.

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [incubator-livy] wypoon commented on a change in pull request #284: [LIVY-752][THRIFT] Fix implementation of limits on connections.

Posted by GitBox <gi...@apache.org>.
wypoon commented on a change in pull request #284: [LIVY-752][THRIFT] Fix implementation of limits on connections.
URL: https://github.com/apache/incubator-livy/pull/284#discussion_r390032685
 
 

 ##########
 File path: thriftserver/server/src/test/scala/org/apache/livy/thriftserver/TestLivyThriftSessionManager.scala
 ##########
 @@ -0,0 +1,127 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.livy.thriftserver
+
+import org.apache.hive.service.cli.HiveSQLException
+import org.junit.Assert._
+import org.junit.Test
+import org.mockito.Mockito.mock
+
+import org.apache.livy.LivyConf
+import org.apache.livy.server.AccessManager
+import org.apache.livy.server.recovery.{SessionStore, StateStore}
+import org.apache.livy.sessions.InteractiveSessionManager
+
+object ConnectionLimitType extends Enumeration {
+  type ConnectionLimitType = Value
+  val User, IpAddress, UserIpAddress = Value
+}
+
+class TestLivyThriftSessionManager {
+
+  import ConnectionLimitType._
+
+  private def createThriftSessionManager(
+      limitType: ConnectionLimitType): LivyThriftSessionManager = {
+    val conf = new LivyConf()
+    conf.set(LivyConf.LIVY_SPARK_VERSION, sys.env("LIVY_SPARK_VERSION"))
+    val limit = 3
+    val entry = limitType match {
+      case User => LivyConf.THRIFT_LIMIT_CONNECTIONS_PER_USER
+      case IpAddress => LivyConf.THRIFT_LIMIT_CONNECTIONS_PER_IPADDRESS
+      case UserIpAddress => LivyConf.THRIFT_LIMIT_CONNECTIONS_PER_USER_IPADDRESS
+    }
+    conf.set(entry, limit)
+    val server = new LivyThriftServer(
+      conf,
+      mock(classOf[InteractiveSessionManager]),
+      mock(classOf[SessionStore]),
+      mock(classOf[AccessManager])
+    )
+    new LivyThriftSessionManager(server, conf)
+  }
+
+  private def testLimit(
+      thriftSessionMgr: LivyThriftSessionManager,
+      user: String,
+      ipAddress: String,
+      forwardedAddresses: java.util.List[String],
+      msg: String): Unit = {
+    val failureMsg = "Should have thrown HiveSQLException"
+    try {
+      thriftSessionMgr.incrementConnections(user, ipAddress, forwardedAddresses)
+      fail(failureMsg)
+    } catch {
+      case e: HiveSQLException =>
+        assertEquals(msg, e.getMessage)
+      case _: Throwable =>
+        fail(failureMsg)
+    }
+  }
+
+  @Test
+  def testLimitConnectionsByUser(): Unit = {
+    val thriftSessionMgr = createThriftSessionManager(User)
+    val user = "alice"
+    val forwardedAddresses = new java.util.ArrayList[String]()
+    thriftSessionMgr.incrementConnections(user, "10.20.30.40", forwardedAddresses)
+    thriftSessionMgr.incrementConnections(user, "10.20.30.41", forwardedAddresses)
+    thriftSessionMgr.incrementConnections(user, "10.20.30.42", forwardedAddresses)
+    val msg = s"Connection limit per user reached (user: $user limit: 3)"
+    testLimit(thriftSessionMgr, user, "10.20.30.43", forwardedAddresses, msg)
+  }
+
+  @Test
+  def testLimitConnectionsByIpAddress(): Unit = {
+    val thriftSessionMgr = createThriftSessionManager(IpAddress)
+    val ipAddress = "10.20.30.40"
+    val forwardedAddresses = new java.util.ArrayList[String]()
+    thriftSessionMgr.incrementConnections("alice", ipAddress, forwardedAddresses)
+    thriftSessionMgr.incrementConnections("bob", ipAddress, forwardedAddresses)
+    thriftSessionMgr.incrementConnections("charlie", ipAddress, forwardedAddresses)
+    val msg = s"Connection limit per ipaddress reached (ipaddress: $ipAddress limit: 3)"
+    testLimit(thriftSessionMgr, "dan", ipAddress, forwardedAddresses, msg)
+  }
+
+  @Test
+  def testLimitConnectionsByUserAndIpAddress(): Unit = {
+    val thriftSessionMgr = createThriftSessionManager(UserIpAddress)
+    val user = "alice"
+    val ipAddress = "10.20.30.40"
+    val userAndAddress = user + ":" + ipAddress
+    val forwardedAddresses = new java.util.ArrayList[String]()
+    thriftSessionMgr.incrementConnections(user, ipAddress, forwardedAddresses)
+
+    // more than 3 connections from the same IP Address is ok if users are different
+    thriftSessionMgr.incrementConnections("bob", ipAddress, forwardedAddresses)
+    thriftSessionMgr.incrementConnections("charlie", ipAddress, forwardedAddresses)
+    thriftSessionMgr.incrementConnections("dan", ipAddress, forwardedAddresses)
+
+    // more than 3 connections from the same user is ok if IP addresses are different
+    thriftSessionMgr.incrementConnections(user, "10.20.30.41", forwardedAddresses)
+    thriftSessionMgr.incrementConnections(user, "10.20.30.42", forwardedAddresses)
+    thriftSessionMgr.incrementConnections(user, "10.20.30.43", forwardedAddresses)
+
+    thriftSessionMgr.incrementConnections(user, ipAddress, forwardedAddresses)
+    thriftSessionMgr.incrementConnections(user, ipAddress, forwardedAddresses)
+    val msg =
+      s"Connection limit per user:ipaddress reached (user:ipaddress: $userAndAddress limit: 3)"
+    testLimit(thriftSessionMgr, user, ipAddress, forwardedAddresses, msg)
+  }
+
 
 Review comment:
   I can add a case with multiple violations (the error message will the first violation since that is how the logic goes), but there are a number of permutations, and I don't intend these unit tests to be comprehensive. There were no tests on this area at all, so these ones, even though basic, are already an improvement.

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [incubator-livy] bersprockets commented on a change in pull request #284: [LIVY-752] Fix implementation of limits on connections.

Posted by GitBox <gi...@apache.org>.
bersprockets commented on a change in pull request #284: [LIVY-752] Fix implementation of limits on connections.
URL: https://github.com/apache/incubator-livy/pull/284#discussion_r388661897
 
 

 ##########
 File path: thriftserver/server/src/main/scala/org/apache/livy/thriftserver/LivyThriftSessionManager.scala
 ##########
 @@ -455,17 +457,45 @@ class LivyThriftSessionManager(val server: LivyThriftServer, val livyConf: LivyC
     }
   }
 
-  // Taken from Hive
   private def anyViolations(username: String, ipAddress: String): Option[String] = {
 
 Review comment:
   Is there a reason not to rename it incrementConnections and move the remaining few lines of the old incrementConnections into here?
   
   I don't feel super strongly about it. Just thought that it might make it more clear.

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [incubator-livy] squito commented on a change in pull request #284: [LIVY-752][THRIFT] Fix implementation of limits on connections.

Posted by GitBox <gi...@apache.org>.
squito commented on a change in pull request #284: [LIVY-752][THRIFT] Fix implementation of limits on connections.
URL: https://github.com/apache/incubator-livy/pull/284#discussion_r390042082
 
 

 ##########
 File path: thriftserver/server/src/main/scala/org/apache/livy/thriftserver/LivyThriftSessionManager.scala
 ##########
 @@ -455,21 +443,58 @@ class LivyThriftSessionManager(val server: LivyThriftServer, val livyConf: LivyC
     }
   }
 
-  // Taken from Hive
-  private def anyViolations(username: String, ipAddress: String): Option[String] = {
-    val userAndAddress = username + ":" + ipAddress
-    if (trackConnectionsPerUser(username) && !withinLimits(username, userLimit)) {
-      Some(s"Connection limit per user reached (user: $username limit: $userLimit)")
-    } else if (trackConnectionsPerIpAddress(ipAddress) &&
-        !withinLimits(ipAddress, ipAddressLimit)) {
-      Some(s"Connection limit per ipaddress reached (ipaddress: $ipAddress limit: " +
-        s"$ipAddressLimit)")
-    } else if (trackConnectionsPerUserIpAddress(username, ipAddress) &&
-        !withinLimits(userAndAddress, userIpAddressLimit)) {
-      Some(s"Connection limit per user:ipaddress reached (user:ipaddress: $userAndAddress " +
-        s"limit: $userIpAddressLimit)")
-    } else {
-      None
+  // Visible for testing
+  @throws[HiveSQLException]
+  private[thriftserver] def incrementConnections(
+      username: String,
+      ipAddress: String,
+      forwardedAddresses: util.List[String]): Unit = {
+    val clientIpAddress: String = getOriginClientIpAddress(ipAddress, forwardedAddresses)
+    val userAndAddress = username + ":" + clientIpAddress
+    val trackUser = trackConnectionsPerUser(username)
+    val trackIpAddress = trackConnectionsPerIpAddress(clientIpAddress)
+    val trackUserIpAddress = trackConnectionsPerUserIpAddress(username, clientIpAddress)
+    var userLimitExceeded = false
+    var ipAddressLimitExceeded = false
+    var userIpAddressLimitExceeded = false
+
+    // Optimistically increment the counts while getting them to check for violations.
+    if (trackUser) {
+      val userCount = incrementConnectionsCount(username)
+      if (userCount > userLimit) userLimitExceeded = true
+    }
+    if (trackIpAddress) {
+      val ipAddressCount = incrementConnectionsCount(clientIpAddress)
+      if (ipAddressCount > ipAddressLimit) ipAddressLimitExceeded = true
+    }
+    if (trackUserIpAddress) {
+      val userIpAddressCount = incrementConnectionsCount(userAndAddress)
+      if (userIpAddressCount > userIpAddressLimit) userIpAddressLimitExceeded = true
+    }
+
+    // If any limit has been exceeded, we won't be going ahead with the connection,
+    // so decrement all counts that have been incremented.
+    if (userLimitExceeded || ipAddressLimitExceeded || userIpAddressLimitExceeded) {
+      if (trackUser) decrementConnectionsCount(username)
 
 Review comment:
   I think marco is suggesting that you include the throw in there as well, which then decreases the complexity of the conditions, and lets you skip the `userLimitExceeded` etc vals and the entire `val violation` section below.  (though you still repeat the `decrementCounts` part
   
   ```scala
   def errorAndThrow(msg: String): Unit = {
     error(msg)
     throw new HiveSQLException(msg)
   }
   
   
       if (trackUser) {
         val userCount = incrementConnectionsCount(username)
         if (userCount > userLimit) {
           decrementConnectionsCount(username)
           errorAndThrow(s"Connection limit per user reached (user: $username limit: $userLimit)")        
         }
       }
       if (trackIpAddress) {
         val ipAddressCount = incrementConnectionsCount(clientIpAddress)
         if (ipAddressCount > ipAddressLimit) {
           if (trackUser) decrementConnectionsCount(username)
           decrementConnectionsCount(clientIpAddress)
           errorAndThrow(s"Connection limit per ipaddress reached (ipaddress: $clientIpAddress limit: " +
             s"$ipAddressLimit)")
         }
       }
       if (trackUserIpAddress) {
         val userIpAddressCount = incrementConnectionsCount(userAndAddress)
         if (userIpAddressCount > userIpAddressLimit) {
           if (trackUser) decrementConnectionsCount(username)
           if (trackIpAddress) decrementConnectionsCount(clientIpAddress)
           decrementConnectionsCount(userAndAddress)
           errorAndThrow(s"Connection limit per user:ipaddress reached (user:ipaddress: $userAndAddress " +
             s"limit: $userIpAddressLimit)")
         }
       }
   ```

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [incubator-livy] wypoon commented on a change in pull request #284: [LIVY-752][THRIFT] Fix implementation of limits on connections.

Posted by GitBox <gi...@apache.org>.
wypoon commented on a change in pull request #284: [LIVY-752][THRIFT] Fix implementation of limits on connections.
URL: https://github.com/apache/incubator-livy/pull/284#discussion_r390032048
 
 

 ##########
 File path: thriftserver/server/src/main/scala/org/apache/livy/thriftserver/LivyThriftSessionManager.scala
 ##########
 @@ -455,21 +443,58 @@ class LivyThriftSessionManager(val server: LivyThriftServer, val livyConf: LivyC
     }
   }
 
-  // Taken from Hive
-  private def anyViolations(username: String, ipAddress: String): Option[String] = {
-    val userAndAddress = username + ":" + ipAddress
-    if (trackConnectionsPerUser(username) && !withinLimits(username, userLimit)) {
-      Some(s"Connection limit per user reached (user: $username limit: $userLimit)")
-    } else if (trackConnectionsPerIpAddress(ipAddress) &&
-        !withinLimits(ipAddress, ipAddressLimit)) {
-      Some(s"Connection limit per ipaddress reached (ipaddress: $ipAddress limit: " +
-        s"$ipAddressLimit)")
-    } else if (trackConnectionsPerUserIpAddress(username, ipAddress) &&
-        !withinLimits(userAndAddress, userIpAddressLimit)) {
-      Some(s"Connection limit per user:ipaddress reached (user:ipaddress: $userAndAddress " +
-        s"limit: $userIpAddressLimit)")
-    } else {
-      None
+  // Visible for testing
+  @throws[HiveSQLException]
+  private[thriftserver] def incrementConnections(
+      username: String,
+      ipAddress: String,
+      forwardedAddresses: util.List[String]): Unit = {
+    val clientIpAddress: String = getOriginClientIpAddress(ipAddress, forwardedAddresses)
+    val userAndAddress = username + ":" + clientIpAddress
+    val trackUser = trackConnectionsPerUser(username)
+    val trackIpAddress = trackConnectionsPerIpAddress(clientIpAddress)
+    val trackUserIpAddress = trackConnectionsPerUserIpAddress(username, clientIpAddress)
+    var userLimitExceeded = false
+    var ipAddressLimitExceeded = false
+    var userIpAddressLimitExceeded = false
+
+    // Optimistically increment the counts while getting them to check for violations.
+    if (trackUser) {
+      val userCount = incrementConnectionsCount(username)
+      if (userCount > userLimit) userLimitExceeded = true
+    }
+    if (trackIpAddress) {
+      val ipAddressCount = incrementConnectionsCount(clientIpAddress)
+      if (ipAddressCount > ipAddressLimit) ipAddressLimitExceeded = true
+    }
+    if (trackUserIpAddress) {
+      val userIpAddressCount = incrementConnectionsCount(userAndAddress)
+      if (userIpAddressCount > userIpAddressLimit) userIpAddressLimitExceeded = true
+    }
+
+    // If any limit has been exceeded, we won't be going ahead with the connection,
+    // so decrement all counts that have been incremented.
+    if (userLimitExceeded || ipAddressLimitExceeded || userIpAddressLimitExceeded) {
+      if (trackUser) decrementConnectionsCount(username)
 
 Review comment:
   I think this is clean and easy to reason about. If I understand you correctly, you are suggesting not incrementing calling incrementConnectionsCount on a key if we already have a limit exceeded so that we don't have to call decrementConnectionsCount on that same key. That leads to more complex logic and is harder to reason about.
   
   E.g.,
   ```
       // Optimistically increment the counts while getting them to check for violations.
       // If any limit has been exceeded, we won't be going ahead with the connection,
       // so decrement all counts that have been incremented.
       if (trackUser) {
         val userCount = incrementConnectionsCount(username)
         if (userCount > userLimit) {
           userLimitExceeded = true
           decrementConnectionsCount(username)
         }
       }
       if (trackIpAddress && !userLimitExceeded) {
         val ipAddressCount = incrementConnectionsCount(clientIpAddress)
         if (ipAddressCount > ipAddressLimit) {
           ipAddressLimitExceeded = true
           if (trackUser) decrementConnectionsCount(username)
           decrementConnectionsCount(clientIpAddress)
         }
       }
       if (trackUserIpAddress && !userLimitExceeded && !ipAddressLimitExceeded) {
         val userIpAddressCount = incrementConnectionsCount(userAndAddress)
         if (userIpAddressCount > userIpAddressLimit) {
           userIpAddressLimitExceeded = true
           if (trackUser) decrementConnectionsCount(username)
           if (trackIpAddress) decrementConnectionsCount(clientIpAddress)
           decrementConnectionsCount(userAndAddress)
         }
       }
   ```
   This logic is, imo, more complex and more difficult to reason about (and prove is correct!).
   Therefore I prefer to stick with the existing version.
   

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [incubator-livy] wypoon commented on issue #284: [LIVY-752][THRIFT] Fix implementation of limits on connections.

Posted by GitBox <gi...@apache.org>.
wypoon commented on issue #284: [LIVY-752][THRIFT] Fix implementation of limits on connections.
URL: https://github.com/apache/incubator-livy/pull/284#issuecomment-597922611
 
 
   I adopted Imran's suggestion for rewriting `incrementConnections`.
   I also added a test case for multiple connection limits being violated; the error message is for the first limit that we check.
   I am passing on additional nice-to-have tests. There is always room for improvement for the tests. This can be an area for further contributions, possibly from new contributors!

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [incubator-livy] codecov-io edited a comment on issue #284: [LIVY-752] Fix implementation of limits on connections.

Posted by GitBox <gi...@apache.org>.
codecov-io edited a comment on issue #284: [LIVY-752] Fix implementation of limits on connections.
URL: https://github.com/apache/incubator-livy/pull/284#issuecomment-593585529
 
 
   # [Codecov](https://codecov.io/gh/apache/incubator-livy/pull/284?src=pr&el=h1) Report
   > Merging [#284](https://codecov.io/gh/apache/incubator-livy/pull/284?src=pr&el=desc) into [master](https://codecov.io/gh/apache/incubator-livy/commit/3a2685643670c1cd1530feb7a54516b01784e781?src=pr&el=desc) will **increase** coverage by `0.03%`.
   > The diff coverage is `n/a`.
   
   [![Impacted file tree graph](https://codecov.io/gh/apache/incubator-livy/pull/284/graphs/tree.svg?width=650&token=0MkVbiUFwE&height=150&src=pr)](https://codecov.io/gh/apache/incubator-livy/pull/284?src=pr&el=tree)
   
   ```diff
   @@             Coverage Diff              @@
   ##             master     #284      +/-   ##
   ============================================
   + Coverage      68.2%   68.24%   +0.03%     
   + Complexity      965      964       -1     
   ============================================
     Files           104      104              
     Lines          5951     5952       +1     
     Branches        900      900              
   ============================================
   + Hits           4059     4062       +3     
   + Misses         1314     1311       -3     
   - Partials        578      579       +1
   ```
   
   
   | [Impacted Files](https://codecov.io/gh/apache/incubator-livy/pull/284?src=pr&el=tree) | Coverage Δ | Complexity Δ | |
   |---|---|---|---|
   | [...cala/org/apache/livy/scalaapi/ScalaJobHandle.scala](https://codecov.io/gh/apache/incubator-livy/pull/284/diff?src=pr&el=tree#diff-c2NhbGEtYXBpL3NyYy9tYWluL3NjYWxhL29yZy9hcGFjaGUvbGl2eS9zY2FsYWFwaS9TY2FsYUpvYkhhbmRsZS5zY2FsYQ==) | `52.94% <0%> (-2.95%)` | `7% <0%> (ø)` | |
   | [...c/main/scala/org/apache/livy/repl/ReplDriver.scala](https://codecov.io/gh/apache/incubator-livy/pull/284/diff?src=pr&el=tree#diff-cmVwbC9zcmMvbWFpbi9zY2FsYS9vcmcvYXBhY2hlL2xpdnkvcmVwbC9SZXBsRHJpdmVyLnNjYWxh) | `30.76% <0%> (-2.57%)` | `7% <0%> (ø)` | |
   | [...va/org/apache/livy/client/http/LivyConnection.java](https://codecov.io/gh/apache/incubator-livy/pull/284/diff?src=pr&el=tree#diff-Y2xpZW50LWh0dHAvc3JjL21haW4vamF2YS9vcmcvYXBhY2hlL2xpdnkvY2xpZW50L2h0dHAvTGl2eUNvbm5lY3Rpb24uamF2YQ==) | `82.27% <0%> (+0.22%)` | `15% <0%> (ø)` | :arrow_down: |
   | [...main/scala/org/apache/livy/server/LivyServer.scala](https://codecov.io/gh/apache/incubator-livy/pull/284/diff?src=pr&el=tree#diff-c2VydmVyL3NyYy9tYWluL3NjYWxhL29yZy9hcGFjaGUvbGl2eS9zZXJ2ZXIvTGl2eVNlcnZlci5zY2FsYQ==) | `33.48% <0%> (+0.44%)` | `11% <0%> (ø)` | :arrow_down: |
   | [...ain/scala/org/apache/livy/utils/SparkYarnApp.scala](https://codecov.io/gh/apache/incubator-livy/pull/284/diff?src=pr&el=tree#diff-c2VydmVyL3NyYy9tYWluL3NjYWxhL29yZy9hcGFjaGUvbGl2eS91dGlscy9TcGFya1lhcm5BcHAuc2NhbGE=) | `73.75% <0%> (+1.87%)` | `40% <0%> (ø)` | :arrow_down: |
   
   ------
   
   [Continue to review full report at Codecov](https://codecov.io/gh/apache/incubator-livy/pull/284?src=pr&el=continue).
   > **Legend** - [Click here to learn more](https://docs.codecov.io/docs/codecov-delta)
   > `Δ = absolute <relative> (impact)`, `ø = not affected`, `? = missing data`
   > Powered by [Codecov](https://codecov.io/gh/apache/incubator-livy/pull/284?src=pr&el=footer). Last update [3a26856...38f26a7](https://codecov.io/gh/apache/incubator-livy/pull/284?src=pr&el=lastupdated). Read the [comment docs](https://docs.codecov.io/docs/pull-request-comments).
   

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [incubator-livy] codecov-io edited a comment on issue #284: [LIVY-752] Fix implementation of limits on connections.

Posted by GitBox <gi...@apache.org>.
codecov-io edited a comment on issue #284: [LIVY-752] Fix implementation of limits on connections.
URL: https://github.com/apache/incubator-livy/pull/284#issuecomment-593585529
 
 
   # [Codecov](https://codecov.io/gh/apache/incubator-livy/pull/284?src=pr&el=h1) Report
   > Merging [#284](https://codecov.io/gh/apache/incubator-livy/pull/284?src=pr&el=desc) into [master](https://codecov.io/gh/apache/incubator-livy/commit/3a2685643670c1cd1530feb7a54516b01784e781?src=pr&el=desc) will **increase** coverage by `0.05%`.
   > The diff coverage is `n/a`.
   
   [![Impacted file tree graph](https://codecov.io/gh/apache/incubator-livy/pull/284/graphs/tree.svg?width=650&token=0MkVbiUFwE&height=150&src=pr)](https://codecov.io/gh/apache/incubator-livy/pull/284?src=pr&el=tree)
   
   ```diff
   @@             Coverage Diff              @@
   ##             master     #284      +/-   ##
   ============================================
   + Coverage      68.2%   68.26%   +0.05%     
   + Complexity      965      964       -1     
   ============================================
     Files           104      104              
     Lines          5951     5952       +1     
     Branches        900      900              
   ============================================
   + Hits           4059     4063       +4     
   + Misses         1314     1309       -5     
   - Partials        578      580       +2
   ```
   
   
   | [Impacted Files](https://codecov.io/gh/apache/incubator-livy/pull/284?src=pr&el=tree) | Coverage Δ | Complexity Δ | |
   |---|---|---|---|
   | [...c/main/scala/org/apache/livy/repl/ReplDriver.scala](https://codecov.io/gh/apache/incubator-livy/pull/284/diff?src=pr&el=tree#diff-cmVwbC9zcmMvbWFpbi9zY2FsYS9vcmcvYXBhY2hlL2xpdnkvcmVwbC9SZXBsRHJpdmVyLnNjYWxh) | `30.76% <0%> (-2.57%)` | `7% <0%> (ø)` | |
   | [...ain/java/org/apache/livy/rsc/driver/RSCDriver.java](https://codecov.io/gh/apache/incubator-livy/pull/284/diff?src=pr&el=tree#diff-cnNjL3NyYy9tYWluL2phdmEvb3JnL2FwYWNoZS9saXZ5L3JzYy9kcml2ZXIvUlNDRHJpdmVyLmphdmE=) | `80% <0%> (-0.84%)` | `45% <0%> (-1%)` | |
   | [...va/org/apache/livy/client/http/LivyConnection.java](https://codecov.io/gh/apache/incubator-livy/pull/284/diff?src=pr&el=tree#diff-Y2xpZW50LWh0dHAvc3JjL21haW4vamF2YS9vcmcvYXBhY2hlL2xpdnkvY2xpZW50L2h0dHAvTGl2eUNvbm5lY3Rpb24uamF2YQ==) | `82.27% <0%> (+0.22%)` | `15% <0%> (ø)` | :arrow_down: |
   | [...main/scala/org/apache/livy/server/LivyServer.scala](https://codecov.io/gh/apache/incubator-livy/pull/284/diff?src=pr&el=tree#diff-c2VydmVyL3NyYy9tYWluL3NjYWxhL29yZy9hcGFjaGUvbGl2eS9zZXJ2ZXIvTGl2eVNlcnZlci5zY2FsYQ==) | `33.48% <0%> (+0.44%)` | `11% <0%> (ø)` | :arrow_down: |
   | [...ain/scala/org/apache/livy/utils/SparkYarnApp.scala](https://codecov.io/gh/apache/incubator-livy/pull/284/diff?src=pr&el=tree#diff-c2VydmVyL3NyYy9tYWluL3NjYWxhL29yZy9hcGFjaGUvbGl2eS91dGlscy9TcGFya1lhcm5BcHAuc2NhbGE=) | `75% <0%> (+3.12%)` | `40% <0%> (ø)` | :arrow_down: |
   
   ------
   
   [Continue to review full report at Codecov](https://codecov.io/gh/apache/incubator-livy/pull/284?src=pr&el=continue).
   > **Legend** - [Click here to learn more](https://docs.codecov.io/docs/codecov-delta)
   > `Δ = absolute <relative> (impact)`, `ø = not affected`, `? = missing data`
   > Powered by [Codecov](https://codecov.io/gh/apache/incubator-livy/pull/284?src=pr&el=footer). Last update [3a26856...3261a38](https://codecov.io/gh/apache/incubator-livy/pull/284?src=pr&el=lastupdated). Read the [comment docs](https://docs.codecov.io/docs/pull-request-comments).
   

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [incubator-livy] wypoon commented on a change in pull request #284: [LIVY-752] Fix implementation of limits on connections.

Posted by GitBox <gi...@apache.org>.
wypoon commented on a change in pull request #284: [LIVY-752] Fix implementation of limits on connections.
URL: https://github.com/apache/incubator-livy/pull/284#discussion_r389074846
 
 

 ##########
 File path: thriftserver/server/src/main/scala/org/apache/livy/thriftserver/LivyThriftSessionManager.scala
 ##########
 @@ -455,17 +457,45 @@ class LivyThriftSessionManager(val server: LivyThriftServer, val livyConf: LivyC
     }
   }
 
-  // Taken from Hive
   private def anyViolations(username: String, ipAddress: String): Option[String] = {
 
 Review comment:
   Yes, I suppose I could do that.

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [incubator-livy] wypoon commented on a change in pull request #284: [LIVY-752][THRIFT] Fix implementation of limits on connections.

Posted by GitBox <gi...@apache.org>.
wypoon commented on a change in pull request #284: [LIVY-752][THRIFT] Fix implementation of limits on connections.
URL: https://github.com/apache/incubator-livy/pull/284#discussion_r390023915
 
 

 ##########
 File path: thriftserver/server/src/main/scala/org/apache/livy/thriftserver/LivyThriftSessionManager.scala
 ##########
 @@ -412,37 +413,24 @@ class LivyThriftSessionManager(val server: LivyThriftServer, val livyConf: LivyC
     }
   }
 
-  // Taken from Hive
-  @throws[HiveSQLException]
-  private def incrementConnections(
-      username: String,
-      ipAddress: String,
-      forwardedAddresses: util.List[String]): Unit = {
-    val clientIpAddress: String = getOriginClientIpAddress(ipAddress, forwardedAddresses)
-    val violation = anyViolations(username, clientIpAddress)
-    // increment the counters only when there are no violations
-    if (violation.isEmpty) {
-      if (trackConnectionsPerUser(username)) incrementConnectionsCount(username)
-      if (trackConnectionsPerIpAddress(clientIpAddress)) incrementConnectionsCount(clientIpAddress)
-      if (trackConnectionsPerUserIpAddress(username, clientIpAddress)) {
-        incrementConnectionsCount(username + ":" + clientIpAddress)
-      }
-    } else {
-      error(violation.get)
-      throw new HiveSQLException(violation.get)
-    }
+  private def incrementConnectionsCount(key: String): Long = {
+    // Increment the count, returning the new value.
+    val count = connectionsCount.putIfAbsent(key, new AtomicLong(1L))
+    if (count != null) count.incrementAndGet()
 
 Review comment:
   I'm fine to put the braces in for if-else. I didn't because the code this replaces has exactly this kind of if-else (no braces, each clause being a single line), so I thought that was the project style!

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [incubator-livy] wypoon commented on a change in pull request #284: [LIVY-752] Fix implementation of limits on connections.

Posted by GitBox <gi...@apache.org>.
wypoon commented on a change in pull request #284: [LIVY-752] Fix implementation of limits on connections.
URL: https://github.com/apache/incubator-livy/pull/284#discussion_r388596005
 
 

 ##########
 File path: thriftserver/server/src/main/scala/org/apache/livy/thriftserver/LivyThriftSessionManager.scala
 ##########
 @@ -455,17 +457,45 @@ class LivyThriftSessionManager(val server: LivyThriftServer, val livyConf: LivyC
     }
   }
 
-  // Taken from Hive
   private def anyViolations(username: String, ipAddress: String): Option[String] = {
 
 Review comment:
   I can rename it checkForViolations. I'd rather not have a long name like checkForViolationsAndIncrementCountsAsNeeded or even just checkForViolationsAndIncrementCounts. I document the side effect in the comment immediately after. I think that is fine.

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [incubator-livy] wypoon commented on issue #284: [LIVY-752] Fix implementation of limits on connections.

Posted by GitBox <gi...@apache.org>.
wypoon commented on issue #284: [LIVY-752] Fix implementation of limits on connections.
URL: https://github.com/apache/incubator-livy/pull/284#issuecomment-595469915
 
 
   > I think the helper `withinLimits()` is no longer used.
   > 
   Thanks for pointing that out. I'll remove it.
   
   > without testing for concurrency, could you at least add a basic unit test that the limits are enforced, say on `anyViolations()`? (given that the logic was entirely backwards before, a unit test seems like it would help here.)
   
   Yes, it would be helpful to have a basic unit test for that.
   

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [incubator-livy] mgaido91 commented on a change in pull request #284: [LIVY-752][THRIFT] Fix implementation of limits on connections.

Posted by GitBox <gi...@apache.org>.
mgaido91 commented on a change in pull request #284: [LIVY-752][THRIFT] Fix implementation of limits on connections.
URL: https://github.com/apache/incubator-livy/pull/284#discussion_r389981352
 
 

 ##########
 File path: thriftserver/server/src/main/scala/org/apache/livy/thriftserver/LivyThriftSessionManager.scala
 ##########
 @@ -412,37 +413,24 @@ class LivyThriftSessionManager(val server: LivyThriftServer, val livyConf: LivyC
     }
   }
 
-  // Taken from Hive
-  @throws[HiveSQLException]
-  private def incrementConnections(
-      username: String,
-      ipAddress: String,
-      forwardedAddresses: util.List[String]): Unit = {
-    val clientIpAddress: String = getOriginClientIpAddress(ipAddress, forwardedAddresses)
-    val violation = anyViolations(username, clientIpAddress)
-    // increment the counters only when there are no violations
-    if (violation.isEmpty) {
-      if (trackConnectionsPerUser(username)) incrementConnectionsCount(username)
-      if (trackConnectionsPerIpAddress(clientIpAddress)) incrementConnectionsCount(clientIpAddress)
-      if (trackConnectionsPerUserIpAddress(username, clientIpAddress)) {
-        incrementConnectionsCount(username + ":" + clientIpAddress)
-      }
-    } else {
-      error(violation.get)
-      throw new HiveSQLException(violation.get)
-    }
+  private def incrementConnectionsCount(key: String): Long = {
+    // Increment the count, returning the new value.
+    val count = connectionsCount.putIfAbsent(key, new AtomicLong(1L))
+    if (count != null) count.incrementAndGet()
 
 Review comment:
   nit: we usually always put the `{`

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [incubator-livy] wypoon commented on a change in pull request #284: [LIVY-752] Fix implementation of limits on connections.

Posted by GitBox <gi...@apache.org>.
wypoon commented on a change in pull request #284: [LIVY-752] Fix implementation of limits on connections.
URL: https://github.com/apache/incubator-livy/pull/284#discussion_r386577528
 
 

 ##########
 File path: thriftserver/server/src/main/scala/org/apache/livy/thriftserver/LivyThriftSessionManager.scala
 ##########
 @@ -21,6 +21,7 @@ import java.lang.reflect.UndeclaredThrowableException
 import java.security.PrivilegedExceptionAction
 import java.util
 import java.util.{Date, Map => JMap, UUID}
+import java.util.function.BiFunction
 
 Review comment:
   Thanks. Somehow I'd missed that.

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [incubator-livy] bersprockets commented on a change in pull request #284: [LIVY-752] Fix implementation of limits on connections.

Posted by GitBox <gi...@apache.org>.
bersprockets commented on a change in pull request #284: [LIVY-752] Fix implementation of limits on connections.
URL: https://github.com/apache/incubator-livy/pull/284#discussion_r388577208
 
 

 ##########
 File path: thriftserver/server/src/main/scala/org/apache/livy/thriftserver/LivyThriftSessionManager.scala
 ##########
 @@ -455,17 +457,45 @@ class LivyThriftSessionManager(val server: LivyThriftServer, val livyConf: LivyC
     }
   }
 
-  // Taken from Hive
   private def anyViolations(username: String, ipAddress: String): Option[String] = {
 
 Review comment:
   This now has a funny name, because it's responsible for incrementing (and possibly decrementing) counts as part of testing if there are any violations. So it has that side effect.

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [incubator-livy] andrasbeni commented on a change in pull request #284: [LIVY-752] Fix implementation of limits on connections.

Posted by GitBox <gi...@apache.org>.
andrasbeni commented on a change in pull request #284: [LIVY-752] Fix implementation of limits on connections.
URL: https://github.com/apache/incubator-livy/pull/284#discussion_r386271798
 
 

 ##########
 File path: thriftserver/server/src/main/scala/org/apache/livy/thriftserver/LivyThriftSessionManager.scala
 ##########
 @@ -21,6 +21,7 @@ import java.lang.reflect.UndeclaredThrowableException
 import java.security.PrivilegedExceptionAction
 import java.util
 import java.util.{Date, Map => JMap, UUID}
+import java.util.function.BiFunction
 
 Review comment:
   You will need to move this line after `java.util.concurrent...` imports to maintain alphabetic ordering and satisfy scalastyle.

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [incubator-livy] wypoon commented on issue #284: [LIVY-752][THRIFT] Fix implementation of limits on connections.

Posted by GitBox <gi...@apache.org>.
wypoon commented on issue #284: [LIVY-752][THRIFT] Fix implementation of limits on connections.
URL: https://github.com/apache/incubator-livy/pull/284#issuecomment-596771656
 
 
   @mgaido91 @jerryshao can you please review?

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [incubator-livy] wypoon commented on a change in pull request #284: [LIVY-752][THRIFT] Fix implementation of limits on connections.

Posted by GitBox <gi...@apache.org>.
wypoon commented on a change in pull request #284: [LIVY-752][THRIFT] Fix implementation of limits on connections.
URL: https://github.com/apache/incubator-livy/pull/284#discussion_r390550363
 
 

 ##########
 File path: thriftserver/server/src/main/scala/org/apache/livy/thriftserver/LivyThriftSessionManager.scala
 ##########
 @@ -455,21 +443,58 @@ class LivyThriftSessionManager(val server: LivyThriftServer, val livyConf: LivyC
     }
   }
 
-  // Taken from Hive
-  private def anyViolations(username: String, ipAddress: String): Option[String] = {
-    val userAndAddress = username + ":" + ipAddress
-    if (trackConnectionsPerUser(username) && !withinLimits(username, userLimit)) {
-      Some(s"Connection limit per user reached (user: $username limit: $userLimit)")
-    } else if (trackConnectionsPerIpAddress(ipAddress) &&
-        !withinLimits(ipAddress, ipAddressLimit)) {
-      Some(s"Connection limit per ipaddress reached (ipaddress: $ipAddress limit: " +
-        s"$ipAddressLimit)")
-    } else if (trackConnectionsPerUserIpAddress(username, ipAddress) &&
-        !withinLimits(userAndAddress, userIpAddressLimit)) {
-      Some(s"Connection limit per user:ipaddress reached (user:ipaddress: $userAndAddress " +
-        s"limit: $userIpAddressLimit)")
-    } else {
-      None
+  // Visible for testing
+  @throws[HiveSQLException]
+  private[thriftserver] def incrementConnections(
+      username: String,
+      ipAddress: String,
+      forwardedAddresses: util.List[String]): Unit = {
+    val clientIpAddress: String = getOriginClientIpAddress(ipAddress, forwardedAddresses)
+    val userAndAddress = username + ":" + clientIpAddress
+    val trackUser = trackConnectionsPerUser(username)
+    val trackIpAddress = trackConnectionsPerIpAddress(clientIpAddress)
+    val trackUserIpAddress = trackConnectionsPerUserIpAddress(username, clientIpAddress)
+    var userLimitExceeded = false
+    var ipAddressLimitExceeded = false
+    var userIpAddressLimitExceeded = false
+
+    // Optimistically increment the counts while getting them to check for violations.
+    if (trackUser) {
+      val userCount = incrementConnectionsCount(username)
+      if (userCount > userLimit) userLimitExceeded = true
+    }
+    if (trackIpAddress) {
+      val ipAddressCount = incrementConnectionsCount(clientIpAddress)
+      if (ipAddressCount > ipAddressLimit) ipAddressLimitExceeded = true
+    }
+    if (trackUserIpAddress) {
+      val userIpAddressCount = incrementConnectionsCount(userAndAddress)
+      if (userIpAddressCount > userIpAddressLimit) userIpAddressLimitExceeded = true
+    }
+
+    // If any limit has been exceeded, we won't be going ahead with the connection,
+    // so decrement all counts that have been incremented.
+    if (userLimitExceeded || ipAddressLimitExceeded || userIpAddressLimitExceeded) {
+      if (trackUser) decrementConnectionsCount(username)
 
 Review comment:
   @squito
   Thanks for the suggestion, which lets us skip the violation portion (as well as remove the need for the vars). That leaves us still with the core that I'd outlined, and the logic of that core still seems harder to reason about (prove is correct) than the previous approach. You have repeated this core, so I assume you agree that it is correct:
   ```
       if (trackUser) {
         // 1
       }
       if (trackIpAddress) {
         // 2
       }
       if (trackUserIpAddress) {
         // 3
       }
   ```
   If we got to 2, we know that we may have incremented the count for user, so if there's a violation now, we may need to decrement the count for user as well as for clientIpAddress.
   If we got to 3, we know that we may have incremented the count for user and for clientIpAddress, so if there's a violation now, we may need to decrement the count for those as well as for userAndAddress.
   
   That's the reasoning that is more complex.
   But if you agree that the logic is correct, I'm ok to do it this way.
   

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [incubator-livy] mgaido91 commented on a change in pull request #284: [LIVY-752][THRIFT] Fix implementation of limits on connections.

Posted by GitBox <gi...@apache.org>.
mgaido91 commented on a change in pull request #284: [LIVY-752][THRIFT] Fix implementation of limits on connections.
URL: https://github.com/apache/incubator-livy/pull/284#discussion_r389984890
 
 

 ##########
 File path: thriftserver/server/src/main/scala/org/apache/livy/thriftserver/LivyThriftSessionManager.scala
 ##########
 @@ -455,21 +443,58 @@ class LivyThriftSessionManager(val server: LivyThriftServer, val livyConf: LivyC
     }
   }
 
-  // Taken from Hive
-  private def anyViolations(username: String, ipAddress: String): Option[String] = {
-    val userAndAddress = username + ":" + ipAddress
-    if (trackConnectionsPerUser(username) && !withinLimits(username, userLimit)) {
-      Some(s"Connection limit per user reached (user: $username limit: $userLimit)")
-    } else if (trackConnectionsPerIpAddress(ipAddress) &&
-        !withinLimits(ipAddress, ipAddressLimit)) {
-      Some(s"Connection limit per ipaddress reached (ipaddress: $ipAddress limit: " +
-        s"$ipAddressLimit)")
-    } else if (trackConnectionsPerUserIpAddress(username, ipAddress) &&
-        !withinLimits(userAndAddress, userIpAddressLimit)) {
-      Some(s"Connection limit per user:ipaddress reached (user:ipaddress: $userAndAddress " +
-        s"limit: $userIpAddressLimit)")
-    } else {
-      None
+  // Visible for testing
+  @throws[HiveSQLException]
+  private[thriftserver] def incrementConnections(
+      username: String,
+      ipAddress: String,
+      forwardedAddresses: util.List[String]): Unit = {
+    val clientIpAddress: String = getOriginClientIpAddress(ipAddress, forwardedAddresses)
+    val userAndAddress = username + ":" + clientIpAddress
+    val trackUser = trackConnectionsPerUser(username)
+    val trackIpAddress = trackConnectionsPerIpAddress(clientIpAddress)
+    val trackUserIpAddress = trackConnectionsPerUserIpAddress(username, clientIpAddress)
+    var userLimitExceeded = false
+    var ipAddressLimitExceeded = false
+    var userIpAddressLimitExceeded = false
+
+    // Optimistically increment the counts while getting them to check for violations.
+    if (trackUser) {
+      val userCount = incrementConnectionsCount(username)
+      if (userCount > userLimit) userLimitExceeded = true
+    }
+    if (trackIpAddress) {
+      val ipAddressCount = incrementConnectionsCount(clientIpAddress)
+      if (ipAddressCount > ipAddressLimit) ipAddressLimitExceeded = true
+    }
+    if (trackUserIpAddress) {
+      val userIpAddressCount = incrementConnectionsCount(userAndAddress)
+      if (userIpAddressCount > userIpAddressLimit) userIpAddressLimitExceeded = true
+    }
+
+    // If any limit has been exceeded, we won't be going ahead with the connection,
+    // so decrement all counts that have been incremented.
+    if (userLimitExceeded || ipAddressLimitExceeded || userIpAddressLimitExceeded) {
+      if (trackUser) decrementConnectionsCount(username)
 
 Review comment:
   do this immediately after a violation happens in order to avoid useless operations which have to be reverted anyway?

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [incubator-livy] mgaido91 commented on a change in pull request #284: [LIVY-752][THRIFT] Fix implementation of limits on connections.

Posted by GitBox <gi...@apache.org>.
mgaido91 commented on a change in pull request #284: [LIVY-752][THRIFT] Fix implementation of limits on connections.
URL: https://github.com/apache/incubator-livy/pull/284#discussion_r390577142
 
 

 ##########
 File path: thriftserver/server/src/main/scala/org/apache/livy/thriftserver/LivyThriftSessionManager.scala
 ##########
 @@ -455,21 +443,58 @@ class LivyThriftSessionManager(val server: LivyThriftServer, val livyConf: LivyC
     }
   }
 
-  // Taken from Hive
-  private def anyViolations(username: String, ipAddress: String): Option[String] = {
-    val userAndAddress = username + ":" + ipAddress
-    if (trackConnectionsPerUser(username) && !withinLimits(username, userLimit)) {
-      Some(s"Connection limit per user reached (user: $username limit: $userLimit)")
-    } else if (trackConnectionsPerIpAddress(ipAddress) &&
-        !withinLimits(ipAddress, ipAddressLimit)) {
-      Some(s"Connection limit per ipaddress reached (ipaddress: $ipAddress limit: " +
-        s"$ipAddressLimit)")
-    } else if (trackConnectionsPerUserIpAddress(username, ipAddress) &&
-        !withinLimits(userAndAddress, userIpAddressLimit)) {
-      Some(s"Connection limit per user:ipaddress reached (user:ipaddress: $userAndAddress " +
-        s"limit: $userIpAddressLimit)")
-    } else {
-      None
+  // Visible for testing
+  @throws[HiveSQLException]
+  private[thriftserver] def incrementConnections(
+      username: String,
+      ipAddress: String,
+      forwardedAddresses: util.List[String]): Unit = {
+    val clientIpAddress: String = getOriginClientIpAddress(ipAddress, forwardedAddresses)
+    val userAndAddress = username + ":" + clientIpAddress
+    val trackUser = trackConnectionsPerUser(username)
+    val trackIpAddress = trackConnectionsPerIpAddress(clientIpAddress)
+    val trackUserIpAddress = trackConnectionsPerUserIpAddress(username, clientIpAddress)
+    var userLimitExceeded = false
+    var ipAddressLimitExceeded = false
+    var userIpAddressLimitExceeded = false
+
+    // Optimistically increment the counts while getting them to check for violations.
+    if (trackUser) {
+      val userCount = incrementConnectionsCount(username)
+      if (userCount > userLimit) userLimitExceeded = true
+    }
+    if (trackIpAddress) {
+      val ipAddressCount = incrementConnectionsCount(clientIpAddress)
+      if (ipAddressCount > ipAddressLimit) ipAddressLimitExceeded = true
+    }
+    if (trackUserIpAddress) {
+      val userIpAddressCount = incrementConnectionsCount(userAndAddress)
+      if (userIpAddressCount > userIpAddressLimit) userIpAddressLimitExceeded = true
+    }
+
+    // If any limit has been exceeded, we won't be going ahead with the connection,
+    // so decrement all counts that have been incremented.
+    if (userLimitExceeded || ipAddressLimitExceeded || userIpAddressLimitExceeded) {
+      if (trackUser) decrementConnectionsCount(username)
 
 Review comment:
   yes, thanks for the clarification @squito , that was what I meant. Honestly, I don't see a big difference in terms of complexity/readability and in one case we avoid some sync operations, so it is more efficient: hence I prefer that option. This part is not critical anyway IMHO, so it is not a big deal, but if you're not strongly against it, I'd go this way. Thanks.

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [incubator-livy] wypoon commented on issue #284: [LIVY-752][THRIFT] Fix implementation of limits on connections.

Posted by GitBox <gi...@apache.org>.
wypoon commented on issue #284: [LIVY-752][THRIFT] Fix implementation of limits on connections.
URL: https://github.com/apache/incubator-livy/pull/284#issuecomment-598838352
 
 
   @mgaido91 can you merge this now? Thanks.

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [incubator-livy] squito commented on a change in pull request #284: [LIVY-752][THRIFT] Fix implementation of limits on connections.

Posted by GitBox <gi...@apache.org>.
squito commented on a change in pull request #284: [LIVY-752][THRIFT] Fix implementation of limits on connections.
URL: https://github.com/apache/incubator-livy/pull/284#discussion_r390555930
 
 

 ##########
 File path: thriftserver/server/src/main/scala/org/apache/livy/thriftserver/LivyThriftSessionManager.scala
 ##########
 @@ -455,21 +443,58 @@ class LivyThriftSessionManager(val server: LivyThriftServer, val livyConf: LivyC
     }
   }
 
-  // Taken from Hive
-  private def anyViolations(username: String, ipAddress: String): Option[String] = {
-    val userAndAddress = username + ":" + ipAddress
-    if (trackConnectionsPerUser(username) && !withinLimits(username, userLimit)) {
-      Some(s"Connection limit per user reached (user: $username limit: $userLimit)")
-    } else if (trackConnectionsPerIpAddress(ipAddress) &&
-        !withinLimits(ipAddress, ipAddressLimit)) {
-      Some(s"Connection limit per ipaddress reached (ipaddress: $ipAddress limit: " +
-        s"$ipAddressLimit)")
-    } else if (trackConnectionsPerUserIpAddress(username, ipAddress) &&
-        !withinLimits(userAndAddress, userIpAddressLimit)) {
-      Some(s"Connection limit per user:ipaddress reached (user:ipaddress: $userAndAddress " +
-        s"limit: $userIpAddressLimit)")
-    } else {
-      None
+  // Visible for testing
+  @throws[HiveSQLException]
+  private[thriftserver] def incrementConnections(
+      username: String,
+      ipAddress: String,
+      forwardedAddresses: util.List[String]): Unit = {
+    val clientIpAddress: String = getOriginClientIpAddress(ipAddress, forwardedAddresses)
+    val userAndAddress = username + ":" + clientIpAddress
+    val trackUser = trackConnectionsPerUser(username)
+    val trackIpAddress = trackConnectionsPerIpAddress(clientIpAddress)
+    val trackUserIpAddress = trackConnectionsPerUserIpAddress(username, clientIpAddress)
+    var userLimitExceeded = false
+    var ipAddressLimitExceeded = false
+    var userIpAddressLimitExceeded = false
+
+    // Optimistically increment the counts while getting them to check for violations.
+    if (trackUser) {
+      val userCount = incrementConnectionsCount(username)
+      if (userCount > userLimit) userLimitExceeded = true
+    }
+    if (trackIpAddress) {
+      val ipAddressCount = incrementConnectionsCount(clientIpAddress)
+      if (ipAddressCount > ipAddressLimit) ipAddressLimitExceeded = true
+    }
+    if (trackUserIpAddress) {
+      val userIpAddressCount = incrementConnectionsCount(userAndAddress)
+      if (userIpAddressCount > userIpAddressLimit) userIpAddressLimitExceeded = true
+    }
+
+    // If any limit has been exceeded, we won't be going ahead with the connection,
+    // so decrement all counts that have been incremented.
+    if (userLimitExceeded || ipAddressLimitExceeded || userIpAddressLimitExceeded) {
+      if (trackUser) decrementConnectionsCount(username)
 
 Review comment:
   honestly I see it both ways -- I was mostly just trying to clarify marco's comment a bit to keep things moving.   I'm fine either way.   its @mgaido91 's call.

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [incubator-livy] mgaido91 commented on a change in pull request #284: [LIVY-752][THRIFT] Fix implementation of limits on connections.

Posted by GitBox <gi...@apache.org>.
mgaido91 commented on a change in pull request #284: [LIVY-752][THRIFT] Fix implementation of limits on connections.
URL: https://github.com/apache/incubator-livy/pull/284#discussion_r389981413
 
 

 ##########
 File path: thriftserver/server/src/main/scala/org/apache/livy/thriftserver/LivyThriftSessionManager.scala
 ##########
 @@ -412,37 +413,24 @@ class LivyThriftSessionManager(val server: LivyThriftServer, val livyConf: LivyC
     }
   }
 
-  // Taken from Hive
-  @throws[HiveSQLException]
-  private def incrementConnections(
-      username: String,
-      ipAddress: String,
-      forwardedAddresses: util.List[String]): Unit = {
-    val clientIpAddress: String = getOriginClientIpAddress(ipAddress, forwardedAddresses)
-    val violation = anyViolations(username, clientIpAddress)
-    // increment the counters only when there are no violations
-    if (violation.isEmpty) {
-      if (trackConnectionsPerUser(username)) incrementConnectionsCount(username)
-      if (trackConnectionsPerIpAddress(clientIpAddress)) incrementConnectionsCount(clientIpAddress)
-      if (trackConnectionsPerUserIpAddress(username, clientIpAddress)) {
-        incrementConnectionsCount(username + ":" + clientIpAddress)
-      }
-    } else {
-      error(violation.get)
-      throw new HiveSQLException(violation.get)
-    }
+  private def incrementConnectionsCount(key: String): Long = {
+    // Increment the count, returning the new value.
+    val count = connectionsCount.putIfAbsent(key, new AtomicLong(1L))
+    if (count != null) count.incrementAndGet()
+    else 1L
   }
 
-  // Taken from Hive
-  private def incrementConnectionsCount(key: String): Unit = {
-    if (!connectionsCount.containsKey(key)) connectionsCount.get(key).incrementAndGet
-    else connectionsCount.put(key, new AtomicLong)
-  }
-
-  // Taken from Hive
   private def decrementConnectionsCount(key: String): Unit = {
-    if (!connectionsCount.containsKey(key)) connectionsCount.get(key).decrementAndGet
-    else connectionsCount.put(key, new AtomicLong)
+    // Decrement the count. Remove the entry if the count reaches zero.
+    val decrementCount = new BiFunction[String, AtomicLong, AtomicLong] {
+      override def apply(unused: String, count: AtomicLong): AtomicLong = {
+        val countValue = count.decrementAndGet()
+        debug(s"Connection count for $key is $countValue")
+        if (countValue == 0L) null
 
 Review comment:
   ditto

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [incubator-livy] mgaido91 commented on a change in pull request #284: [LIVY-752][THRIFT] Fix implementation of limits on connections.

Posted by GitBox <gi...@apache.org>.
mgaido91 commented on a change in pull request #284: [LIVY-752][THRIFT] Fix implementation of limits on connections.
URL: https://github.com/apache/incubator-livy/pull/284#discussion_r389989363
 
 

 ##########
 File path: thriftserver/server/src/test/scala/org/apache/livy/thriftserver/TestLivyThriftSessionManager.scala
 ##########
 @@ -0,0 +1,127 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.livy.thriftserver
+
+import org.apache.hive.service.cli.HiveSQLException
+import org.junit.Assert._
+import org.junit.Test
+import org.mockito.Mockito.mock
+
+import org.apache.livy.LivyConf
+import org.apache.livy.server.AccessManager
+import org.apache.livy.server.recovery.{SessionStore, StateStore}
+import org.apache.livy.sessions.InteractiveSessionManager
+
+object ConnectionLimitType extends Enumeration {
+  type ConnectionLimitType = Value
+  val User, IpAddress, UserIpAddress = Value
+}
+
+class TestLivyThriftSessionManager {
+
+  import ConnectionLimitType._
+
+  private def createThriftSessionManager(
+      limitType: ConnectionLimitType): LivyThriftSessionManager = {
+    val conf = new LivyConf()
+    conf.set(LivyConf.LIVY_SPARK_VERSION, sys.env("LIVY_SPARK_VERSION"))
+    val limit = 3
+    val entry = limitType match {
+      case User => LivyConf.THRIFT_LIMIT_CONNECTIONS_PER_USER
+      case IpAddress => LivyConf.THRIFT_LIMIT_CONNECTIONS_PER_IPADDRESS
+      case UserIpAddress => LivyConf.THRIFT_LIMIT_CONNECTIONS_PER_USER_IPADDRESS
+    }
+    conf.set(entry, limit)
+    val server = new LivyThriftServer(
+      conf,
+      mock(classOf[InteractiveSessionManager]),
+      mock(classOf[SessionStore]),
+      mock(classOf[AccessManager])
+    )
+    new LivyThriftSessionManager(server, conf)
+  }
+
+  private def testLimit(
+      thriftSessionMgr: LivyThriftSessionManager,
+      user: String,
+      ipAddress: String,
+      forwardedAddresses: java.util.List[String],
+      msg: String): Unit = {
+    val failureMsg = "Should have thrown HiveSQLException"
+    try {
+      thriftSessionMgr.incrementConnections(user, ipAddress, forwardedAddresses)
+      fail(failureMsg)
+    } catch {
+      case e: HiveSQLException =>
+        assertEquals(msg, e.getMessage)
+      case _: Throwable =>
+        fail(failureMsg)
+    }
+  }
+
+  @Test
+  def testLimitConnectionsByUser(): Unit = {
+    val thriftSessionMgr = createThriftSessionManager(User)
+    val user = "alice"
+    val forwardedAddresses = new java.util.ArrayList[String]()
+    thriftSessionMgr.incrementConnections(user, "10.20.30.40", forwardedAddresses)
+    thriftSessionMgr.incrementConnections(user, "10.20.30.41", forwardedAddresses)
+    thriftSessionMgr.incrementConnections(user, "10.20.30.42", forwardedAddresses)
+    val msg = s"Connection limit per user reached (user: $user limit: 3)"
+    testLimit(thriftSessionMgr, user, "10.20.30.43", forwardedAddresses, msg)
+  }
+
+  @Test
+  def testLimitConnectionsByIpAddress(): Unit = {
+    val thriftSessionMgr = createThriftSessionManager(IpAddress)
+    val ipAddress = "10.20.30.40"
+    val forwardedAddresses = new java.util.ArrayList[String]()
+    thriftSessionMgr.incrementConnections("alice", ipAddress, forwardedAddresses)
+    thriftSessionMgr.incrementConnections("bob", ipAddress, forwardedAddresses)
+    thriftSessionMgr.incrementConnections("charlie", ipAddress, forwardedAddresses)
+    val msg = s"Connection limit per ipaddress reached (ipaddress: $ipAddress limit: 3)"
+    testLimit(thriftSessionMgr, "dan", ipAddress, forwardedAddresses, msg)
+  }
+
+  @Test
+  def testLimitConnectionsByUserAndIpAddress(): Unit = {
+    val thriftSessionMgr = createThriftSessionManager(UserIpAddress)
+    val user = "alice"
+    val ipAddress = "10.20.30.40"
+    val userAndAddress = user + ":" + ipAddress
+    val forwardedAddresses = new java.util.ArrayList[String]()
+    thriftSessionMgr.incrementConnections(user, ipAddress, forwardedAddresses)
+
+    // more than 3 connections from the same IP Address is ok if users are different
+    thriftSessionMgr.incrementConnections("bob", ipAddress, forwardedAddresses)
+    thriftSessionMgr.incrementConnections("charlie", ipAddress, forwardedAddresses)
+    thriftSessionMgr.incrementConnections("dan", ipAddress, forwardedAddresses)
+
+    // more than 3 connections from the same user is ok if IP addresses are different
+    thriftSessionMgr.incrementConnections(user, "10.20.30.41", forwardedAddresses)
+    thriftSessionMgr.incrementConnections(user, "10.20.30.42", forwardedAddresses)
+    thriftSessionMgr.incrementConnections(user, "10.20.30.43", forwardedAddresses)
+
+    thriftSessionMgr.incrementConnections(user, ipAddress, forwardedAddresses)
+    thriftSessionMgr.incrementConnections(user, ipAddress, forwardedAddresses)
+    val msg =
+      s"Connection limit per user:ipaddress reached (user:ipaddress: $userAndAddress limit: 3)"
+    testLimit(thriftSessionMgr, user, ipAddress, forwardedAddresses, msg)
+  }
+
 
 Review comment:
   nit: would be good to have a test also for the case when there are multiple violations in order to enforce the error message thrown in that condiotion

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [incubator-livy] squito commented on a change in pull request #284: [LIVY-752][THRIFT] Fix implementation of limits on connections.

Posted by GitBox <gi...@apache.org>.
squito commented on a change in pull request #284: [LIVY-752][THRIFT] Fix implementation of limits on connections.
URL: https://github.com/apache/incubator-livy/pull/284#discussion_r390052468
 
 

 ##########
 File path: thriftserver/server/src/test/scala/org/apache/livy/thriftserver/TestLivyThriftSessionManager.scala
 ##########
 @@ -0,0 +1,127 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.livy.thriftserver
+
+import org.apache.hive.service.cli.HiveSQLException
+import org.junit.Assert._
+import org.junit.Test
+import org.mockito.Mockito.mock
+
+import org.apache.livy.LivyConf
+import org.apache.livy.server.AccessManager
+import org.apache.livy.server.recovery.{SessionStore, StateStore}
+import org.apache.livy.sessions.InteractiveSessionManager
+
+object ConnectionLimitType extends Enumeration {
+  type ConnectionLimitType = Value
+  val User, IpAddress, UserIpAddress = Value
+}
+
+class TestLivyThriftSessionManager {
+
+  import ConnectionLimitType._
+
+  private def createThriftSessionManager(
+      limitType: ConnectionLimitType): LivyThriftSessionManager = {
+    val conf = new LivyConf()
+    conf.set(LivyConf.LIVY_SPARK_VERSION, sys.env("LIVY_SPARK_VERSION"))
+    val limit = 3
+    val entry = limitType match {
+      case User => LivyConf.THRIFT_LIMIT_CONNECTIONS_PER_USER
+      case IpAddress => LivyConf.THRIFT_LIMIT_CONNECTIONS_PER_IPADDRESS
+      case UserIpAddress => LivyConf.THRIFT_LIMIT_CONNECTIONS_PER_USER_IPADDRESS
+    }
+    conf.set(entry, limit)
+    val server = new LivyThriftServer(
+      conf,
+      mock(classOf[InteractiveSessionManager]),
+      mock(classOf[SessionStore]),
+      mock(classOf[AccessManager])
+    )
+    new LivyThriftSessionManager(server, conf)
+  }
+
+  private def testLimit(
+      thriftSessionMgr: LivyThriftSessionManager,
+      user: String,
+      ipAddress: String,
+      forwardedAddresses: java.util.List[String],
+      msg: String): Unit = {
+    val failureMsg = "Should have thrown HiveSQLException"
+    try {
+      thriftSessionMgr.incrementConnections(user, ipAddress, forwardedAddresses)
+      fail(failureMsg)
+    } catch {
+      case e: HiveSQLException =>
+        assertEquals(msg, e.getMessage)
+      case _: Throwable =>
+        fail(failureMsg)
+    }
+  }
+
+  @Test
+  def testLimitConnectionsByUser(): Unit = {
+    val thriftSessionMgr = createThriftSessionManager(User)
+    val user = "alice"
+    val forwardedAddresses = new java.util.ArrayList[String]()
+    thriftSessionMgr.incrementConnections(user, "10.20.30.40", forwardedAddresses)
+    thriftSessionMgr.incrementConnections(user, "10.20.30.41", forwardedAddresses)
+    thriftSessionMgr.incrementConnections(user, "10.20.30.42", forwardedAddresses)
+    val msg = s"Connection limit per user reached (user: $user limit: 3)"
+    testLimit(thriftSessionMgr, user, "10.20.30.43", forwardedAddresses, msg)
+  }
+
+  @Test
+  def testLimitConnectionsByIpAddress(): Unit = {
+    val thriftSessionMgr = createThriftSessionManager(IpAddress)
+    val ipAddress = "10.20.30.40"
+    val forwardedAddresses = new java.util.ArrayList[String]()
+    thriftSessionMgr.incrementConnections("alice", ipAddress, forwardedAddresses)
+    thriftSessionMgr.incrementConnections("bob", ipAddress, forwardedAddresses)
+    thriftSessionMgr.incrementConnections("charlie", ipAddress, forwardedAddresses)
+    val msg = s"Connection limit per ipaddress reached (ipaddress: $ipAddress limit: 3)"
+    testLimit(thriftSessionMgr, "dan", ipAddress, forwardedAddresses, msg)
+  }
+
+  @Test
+  def testLimitConnectionsByUserAndIpAddress(): Unit = {
+    val thriftSessionMgr = createThriftSessionManager(UserIpAddress)
+    val user = "alice"
+    val ipAddress = "10.20.30.40"
+    val userAndAddress = user + ":" + ipAddress
+    val forwardedAddresses = new java.util.ArrayList[String]()
+    thriftSessionMgr.incrementConnections(user, ipAddress, forwardedAddresses)
+
+    // more than 3 connections from the same IP Address is ok if users are different
+    thriftSessionMgr.incrementConnections("bob", ipAddress, forwardedAddresses)
+    thriftSessionMgr.incrementConnections("charlie", ipAddress, forwardedAddresses)
+    thriftSessionMgr.incrementConnections("dan", ipAddress, forwardedAddresses)
+
+    // more than 3 connections from the same user is ok if IP addresses are different
+    thriftSessionMgr.incrementConnections(user, "10.20.30.41", forwardedAddresses)
+    thriftSessionMgr.incrementConnections(user, "10.20.30.42", forwardedAddresses)
+    thriftSessionMgr.incrementConnections(user, "10.20.30.43", forwardedAddresses)
+
+    thriftSessionMgr.incrementConnections(user, ipAddress, forwardedAddresses)
+    thriftSessionMgr.incrementConnections(user, ipAddress, forwardedAddresses)
+    val msg =
+      s"Connection limit per user:ipaddress reached (user:ipaddress: $userAndAddress limit: 3)"
+    testLimit(thriftSessionMgr, user, ipAddress, forwardedAddresses, msg)
+  }
+
 
 Review comment:
   it might also be nice to add a test for closing connections, and opening new ones, to ensure that does *not* trigger violations.
   
   (I agree with your sentiment that you are strictly improving coverage here.  this could be done later, but sometimes its easier to just do it while its fresh in your mind, if its not too hard.)

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services