You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@tinkerpop.apache.org by GitBox <gi...@apache.org> on 2022/04/19 16:58:48 UTC

[GitHub] [tinkerpop] divijvaidya commented on a diff in pull request #1619: Gremlin Go MS5 - Feature Complete

divijvaidya commented on code in PR #1619:
URL: https://github.com/apache/tinkerpop/pull/1619#discussion_r853255058


##########
gremlin-go/driver/connectionPool.go:
##########
@@ -0,0 +1,147 @@
+/*
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements.  See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership.  The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License.  You may obtain a copy of the License at
+
+http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied.  See the License for the
+specific language governing permissions and limitations
+under the License.
+*/
+
+package gremlingo
+
+import (
+	"sync"
+)
+
+type connectionPool interface {
+	write(*request) (ResultSet, error)
+	close()
+}
+
+const defaultNewConnectionThreshold = 4
+
+// loadBalancingPool has two configurations: maximumConcurrentConnections/cap(connections) and newConnectionThreshold.
+// maximumConcurrentConnections denotes the maximum amount of active connections at any given time.
+// newConnectionThreshold specifies the minimum amount of concurrent active traversals on the least used connection
+// which will trigger creation of a new connection if maximumConcurrentConnections has not bee reached.
+// loadBalancingPool will use the least-used connection, and as a part of the process, getLeastUsedConnection(), will
+// remove any unusable connections from the pool and ensure that the returned connection is usable. If there are
+// multiple active connections with no active traversals on them, one will be used and the others will be closed and
+// removed from the pool.
+type loadBalancingPool struct {
+	url          string
+	logHandler   *logHandler
+	connSettings *connectionSettings
+
+	newConnectionThreshold int
+	connections            []*connection
+	loadBalanceLock        sync.Mutex
+}
+
+func (pool *loadBalancingPool) close() {

Review Comment:
   What happens when we call close() while a `createConnection()` is in progress? I think we would close the registered connections and would never close the connection that will be added to the pool after `createConnection()` is complete.



##########
gremlin-go/driver/connectionPool.go:
##########
@@ -0,0 +1,147 @@
+/*
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements.  See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership.  The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License.  You may obtain a copy of the License at
+
+http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied.  See the License for the
+specific language governing permissions and limitations
+under the License.
+*/
+
+package gremlingo
+
+import (
+	"sync"
+)
+
+type connectionPool interface {
+	write(*request) (ResultSet, error)
+	close()
+}
+
+const defaultNewConnectionThreshold = 4
+
+// loadBalancingPool has two configurations: maximumConcurrentConnections/cap(connections) and newConnectionThreshold.
+// maximumConcurrentConnections denotes the maximum amount of active connections at any given time.
+// newConnectionThreshold specifies the minimum amount of concurrent active traversals on the least used connection
+// which will trigger creation of a new connection if maximumConcurrentConnections has not bee reached.
+// loadBalancingPool will use the least-used connection, and as a part of the process, getLeastUsedConnection(), will
+// remove any unusable connections from the pool and ensure that the returned connection is usable. If there are
+// multiple active connections with no active traversals on them, one will be used and the others will be closed and
+// removed from the pool.
+type loadBalancingPool struct {
+	url          string
+	logHandler   *logHandler
+	connSettings *connectionSettings
+
+	newConnectionThreshold int
+	connections            []*connection
+	loadBalanceLock        sync.Mutex
+}
+
+func (pool *loadBalancingPool) close() {
+	for _, connection := range pool.connections {
+		err := connection.close()
+		if err != nil {
+			pool.logHandler.logf(Warning, errorClosingConnection, err.Error())
+		}
+	}
+}
+
+func (pool *loadBalancingPool) write(request *request) (ResultSet, error) {
+	connection, err := pool.getLeastUsedConnection()
+	if err != nil {
+		return nil, err
+	}
+	return connection.write(request)
+}
+
+func (pool *loadBalancingPool) getLeastUsedConnection() (*connection, error) {
+	pool.loadBalanceLock.Lock()
+	defer pool.loadBalanceLock.Unlock()
+	if len(pool.connections) == 0 {
+		return pool.newConnection()
+	} else {
+		var leastUsed *connection = nil
+		validIndex := 0
+		for _, connection := range pool.connections {
+			// Purge dead connections from pool
+			if connection.state == established {
+				// Close and purge connections from pool if there is more than one being unused
+				if leastUsed != nil && (leastUsed.activeResults() == 0 && connection.activeResults() == 0) {
+					// Close the connection asynchronously since it is a high-latency method
+					go func() {
+						pool.logHandler.log(Debug, closeUnusedPoolConnection)
+						err := connection.close()
+						if err != nil {
+							pool.logHandler.logf(Warning, errorClosingConnection, err.Error())
+						}
+					}()
+
+					continue
+				}
+
+				// Mark connection as valid to keep
+				pool.connections[validIndex] = connection
+				validIndex++
+
+				// Set the least used connection
+				if leastUsed == nil || connection.activeResults() < leastUsed.activeResults() {
+					leastUsed = connection
+				}
+			} else {
+				pool.logHandler.log(Debug, purgingDeadConnection)
+			}
+		}
+
+		// Deallocate truncated dead connections to prevent memory leak
+		for invalidIndex := validIndex; invalidIndex < len(pool.connections); invalidIndex++ {
+			pool.connections[invalidIndex] = nil
+		}
+		pool.connections = pool.connections[:validIndex]
+
+		// Create new connection if no valid connections were found in the pool or the least used connection exceeded
+		// the concurrent usage threshold while the pool still has capacity for a new connection
+		if leastUsed == nil ||
+			(leastUsed.activeResults() >= pool.newConnectionThreshold && len(pool.connections) < cap(pool.connections)) {
+			return pool.newConnection()
+		} else {
+			return leastUsed, nil
+		}
+	}
+}
+
+func (pool *loadBalancingPool) newConnection() (*connection, error) {
+	connection, err := createConnection(pool.url, pool.logHandler, pool.connSettings)
+	if err != nil {
+		return nil, err
+	}
+	pool.connections = append(pool.connections, connection)
+	return connection, nil
+}
+
+func newLoadBalancingPool(url string, logHandler *logHandler, connSettings *connectionSettings, newConnectionThreshold int,
+	maximumConcurrentConnections int) (connectionPool, error) {
+	pool := make([]*connection, 0, maximumConcurrentConnections)
+	initialConnection, err := createConnection(url, logHandler, connSettings)

Review Comment:
   The number of initial connections per pool should be configurable. We have faced situations with Java client where users complained about the fact that "lazy initialization" leads to higher latency for first few requests to the server.



##########
gremlin-go/driver/connectionPool.go:
##########
@@ -0,0 +1,147 @@
+/*
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements.  See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership.  The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License.  You may obtain a copy of the License at
+
+http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied.  See the License for the
+specific language governing permissions and limitations
+under the License.
+*/
+
+package gremlingo
+
+import (
+	"sync"
+)
+
+type connectionPool interface {
+	write(*request) (ResultSet, error)
+	close()
+}
+
+const defaultNewConnectionThreshold = 4
+
+// loadBalancingPool has two configurations: maximumConcurrentConnections/cap(connections) and newConnectionThreshold.
+// maximumConcurrentConnections denotes the maximum amount of active connections at any given time.
+// newConnectionThreshold specifies the minimum amount of concurrent active traversals on the least used connection
+// which will trigger creation of a new connection if maximumConcurrentConnections has not bee reached.
+// loadBalancingPool will use the least-used connection, and as a part of the process, getLeastUsedConnection(), will
+// remove any unusable connections from the pool and ensure that the returned connection is usable. If there are
+// multiple active connections with no active traversals on them, one will be used and the others will be closed and
+// removed from the pool.
+type loadBalancingPool struct {
+	url          string
+	logHandler   *logHandler
+	connSettings *connectionSettings
+
+	newConnectionThreshold int
+	connections            []*connection
+	loadBalanceLock        sync.Mutex
+}
+
+func (pool *loadBalancingPool) close() {
+	for _, connection := range pool.connections {
+		err := connection.close()
+		if err != nil {
+			pool.logHandler.logf(Warning, errorClosingConnection, err.Error())
+		}
+	}
+}
+
+func (pool *loadBalancingPool) write(request *request) (ResultSet, error) {
+	connection, err := pool.getLeastUsedConnection()
+	if err != nil {
+		return nil, err
+	}
+	return connection.write(request)
+}
+
+func (pool *loadBalancingPool) getLeastUsedConnection() (*connection, error) {
+	pool.loadBalanceLock.Lock()
+	defer pool.loadBalanceLock.Unlock()
+	if len(pool.connections) == 0 {
+		return pool.newConnection()
+	} else {
+		var leastUsed *connection = nil
+		validIndex := 0
+		for _, connection := range pool.connections {
+			// Purge dead connections from pool
+			if connection.state == established {
+				// Close and purge connections from pool if there is more than one being unused
+				if leastUsed != nil && (leastUsed.activeResults() == 0 && connection.activeResults() == 0) {
+					// Close the connection asynchronously since it is a high-latency method
+					go func() {
+						pool.logHandler.log(Debug, closeUnusedPoolConnection)
+						err := connection.close()
+						if err != nil {
+							pool.logHandler.logf(Warning, errorClosingConnection, err.Error())
+						}
+					}()
+
+					continue
+				}
+
+				// Mark connection as valid to keep
+				pool.connections[validIndex] = connection
+				validIndex++
+
+				// Set the least used connection
+				if leastUsed == nil || connection.activeResults() < leastUsed.activeResults() {
+					leastUsed = connection
+				}
+			} else {
+				pool.logHandler.log(Debug, purgingDeadConnection)

Review Comment:
   should there be a function call here to purge connections?



##########
gremlin-go/driver/connectionPool.go:
##########
@@ -0,0 +1,147 @@
+/*
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements.  See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership.  The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License.  You may obtain a copy of the License at
+
+http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied.  See the License for the
+specific language governing permissions and limitations
+under the License.
+*/
+
+package gremlingo
+
+import (
+	"sync"
+)
+
+type connectionPool interface {
+	write(*request) (ResultSet, error)
+	close()
+}
+
+const defaultNewConnectionThreshold = 4
+
+// loadBalancingPool has two configurations: maximumConcurrentConnections/cap(connections) and newConnectionThreshold.
+// maximumConcurrentConnections denotes the maximum amount of active connections at any given time.
+// newConnectionThreshold specifies the minimum amount of concurrent active traversals on the least used connection
+// which will trigger creation of a new connection if maximumConcurrentConnections has not bee reached.
+// loadBalancingPool will use the least-used connection, and as a part of the process, getLeastUsedConnection(), will
+// remove any unusable connections from the pool and ensure that the returned connection is usable. If there are
+// multiple active connections with no active traversals on them, one will be used and the others will be closed and
+// removed from the pool.
+type loadBalancingPool struct {
+	url          string
+	logHandler   *logHandler
+	connSettings *connectionSettings
+
+	newConnectionThreshold int
+	connections            []*connection
+	loadBalanceLock        sync.Mutex
+}
+
+func (pool *loadBalancingPool) close() {
+	for _, connection := range pool.connections {
+		err := connection.close()
+		if err != nil {
+			pool.logHandler.logf(Warning, errorClosingConnection, err.Error())
+		}
+	}
+}
+
+func (pool *loadBalancingPool) write(request *request) (ResultSet, error) {
+	connection, err := pool.getLeastUsedConnection()
+	if err != nil {
+		return nil, err
+	}
+	return connection.write(request)
+}
+
+func (pool *loadBalancingPool) getLeastUsedConnection() (*connection, error) {
+	pool.loadBalanceLock.Lock()
+	defer pool.loadBalanceLock.Unlock()
+	if len(pool.connections) == 0 {
+		return pool.newConnection()
+	} else {
+		var leastUsed *connection = nil
+		validIndex := 0
+		for _, connection := range pool.connections {
+			// Purge dead connections from pool
+			if connection.state == established {
+				// Close and purge connections from pool if there is more than one being unused

Review Comment:
   I am not very sure about this logic. Connection setup is quite expensive due to SSL handshake, authn and authz happening with every setup. It is very likely that a connection may be intermittently un-used and this logic will end up creating and destroying connections all the time!
   
   Alternatively, we can do the following:
   1. Close "idle" connections based on a percentage configured by the user.
   2. Do not close the idle connections at all. Keeping connections alive is not expensive on the server.
   
   Please add pros/cons about this in the design and send an email to the community mailing list for a design review/discussion.



##########
gremlin-go/driver/connectionPool.go:
##########
@@ -0,0 +1,147 @@
+/*
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements.  See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership.  The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License.  You may obtain a copy of the License at
+
+http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied.  See the License for the
+specific language governing permissions and limitations
+under the License.
+*/
+
+package gremlingo
+
+import (
+	"sync"
+)
+
+type connectionPool interface {
+	write(*request) (ResultSet, error)
+	close()
+}
+
+const defaultNewConnectionThreshold = 4
+
+// loadBalancingPool has two configurations: maximumConcurrentConnections/cap(connections) and newConnectionThreshold.
+// maximumConcurrentConnections denotes the maximum amount of active connections at any given time.
+// newConnectionThreshold specifies the minimum amount of concurrent active traversals on the least used connection
+// which will trigger creation of a new connection if maximumConcurrentConnections has not bee reached.
+// loadBalancingPool will use the least-used connection, and as a part of the process, getLeastUsedConnection(), will
+// remove any unusable connections from the pool and ensure that the returned connection is usable. If there are
+// multiple active connections with no active traversals on them, one will be used and the others will be closed and
+// removed from the pool.
+type loadBalancingPool struct {
+	url          string
+	logHandler   *logHandler
+	connSettings *connectionSettings
+
+	newConnectionThreshold int
+	connections            []*connection
+	loadBalanceLock        sync.Mutex
+}
+
+func (pool *loadBalancingPool) close() {
+	for _, connection := range pool.connections {
+		err := connection.close()
+		if err != nil {
+			pool.logHandler.logf(Warning, errorClosingConnection, err.Error())
+		}
+	}
+}
+
+func (pool *loadBalancingPool) write(request *request) (ResultSet, error) {
+	connection, err := pool.getLeastUsedConnection()
+	if err != nil {
+		return nil, err
+	}
+	return connection.write(request)
+}
+
+func (pool *loadBalancingPool) getLeastUsedConnection() (*connection, error) {
+	pool.loadBalanceLock.Lock()
+	defer pool.loadBalanceLock.Unlock()
+	if len(pool.connections) == 0 {
+		return pool.newConnection()
+	} else {
+		var leastUsed *connection = nil
+		validIndex := 0
+		for _, connection := range pool.connections {
+			// Purge dead connections from pool
+			if connection.state == established {
+				// Close and purge connections from pool if there is more than one being unused
+				if leastUsed != nil && (leastUsed.activeResults() == 0 && connection.activeResults() == 0) {
+					// Close the connection asynchronously since it is a high-latency method
+					go func() {
+						pool.logHandler.log(Debug, closeUnusedPoolConnection)
+						err := connection.close()
+						if err != nil {
+							pool.logHandler.logf(Warning, errorClosingConnection, err.Error())
+						}
+					}()
+
+					continue
+				}
+
+				// Mark connection as valid to keep
+				pool.connections[validIndex] = connection
+				validIndex++
+
+				// Set the least used connection
+				if leastUsed == nil || connection.activeResults() < leastUsed.activeResults() {
+					leastUsed = connection
+				}
+			} else {
+				pool.logHandler.log(Debug, purgingDeadConnection)
+			}
+		}
+
+		// Deallocate truncated dead connections to prevent memory leak
+		for invalidIndex := validIndex; invalidIndex < len(pool.connections); invalidIndex++ {
+			pool.connections[invalidIndex] = nil
+		}
+		pool.connections = pool.connections[:validIndex]
+
+		// Create new connection if no valid connections were found in the pool or the least used connection exceeded
+		// the concurrent usage threshold while the pool still has capacity for a new connection
+		if leastUsed == nil ||
+			(leastUsed.activeResults() >= pool.newConnectionThreshold && len(pool.connections) < cap(pool.connections)) {
+			return pool.newConnection()
+		} else {
+			return leastUsed, nil
+		}
+	}
+}
+
+func (pool *loadBalancingPool) newConnection() (*connection, error) {
+	connection, err := createConnection(pool.url, pool.logHandler, pool.connSettings)
+	if err != nil {
+		return nil, err
+	}
+	pool.connections = append(pool.connections, connection)
+	return connection, nil
+}
+
+func newLoadBalancingPool(url string, logHandler *logHandler, connSettings *connectionSettings, newConnectionThreshold int,
+	maximumConcurrentConnections int) (connectionPool, error) {
+	pool := make([]*connection, 0, maximumConcurrentConnections)
+	initialConnection, err := createConnection(url, logHandler, connSettings)
+	if err != nil {
+		return nil, err
+	}
+	pool = append(pool, initialConnection)

Review Comment:
   Code duplication. Refactor `newConnection()` to work for this use case as well instead?



##########
gremlin-go/driver/connection.go:
##########
@@ -52,7 +60,7 @@ func (connection *connection) errorCallback() {
 
 func (connection *connection) close() error {
 	if connection.state != established {
-		return errors.New("cannot close connection that has already been closed or has not been connected")
+		return newError(err0101ConnectionCloseError)
 	}

Review Comment:
   We need to introduce a 'closing' state here. For the same connection, this method could be called simultaneously from two threads (e.g. one from go routine inside pool.write and another from pool.close). Only one should be allowed to 'close'.
   
   Also please add method doc which mentions whether a function is idempotent or not and also whether it is thread safe or not.



##########
gremlin-go/driver/connectionPool.go:
##########
@@ -0,0 +1,147 @@
+/*
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements.  See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership.  The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License.  You may obtain a copy of the License at
+
+http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied.  See the License for the
+specific language governing permissions and limitations
+under the License.
+*/
+
+package gremlingo
+
+import (
+	"sync"
+)
+
+type connectionPool interface {
+	write(*request) (ResultSet, error)
+	close()
+}
+
+const defaultNewConnectionThreshold = 4
+
+// loadBalancingPool has two configurations: maximumConcurrentConnections/cap(connections) and newConnectionThreshold.
+// maximumConcurrentConnections denotes the maximum amount of active connections at any given time.
+// newConnectionThreshold specifies the minimum amount of concurrent active traversals on the least used connection
+// which will trigger creation of a new connection if maximumConcurrentConnections has not bee reached.
+// loadBalancingPool will use the least-used connection, and as a part of the process, getLeastUsedConnection(), will
+// remove any unusable connections from the pool and ensure that the returned connection is usable. If there are
+// multiple active connections with no active traversals on them, one will be used and the others will be closed and
+// removed from the pool.
+type loadBalancingPool struct {
+	url          string
+	logHandler   *logHandler
+	connSettings *connectionSettings
+
+	newConnectionThreshold int
+	connections            []*connection
+	loadBalanceLock        sync.Mutex
+}
+
+func (pool *loadBalancingPool) close() {
+	for _, connection := range pool.connections {
+		err := connection.close()
+		if err != nil {
+			pool.logHandler.logf(Warning, errorClosingConnection, err.Error())
+		}
+	}
+}
+
+func (pool *loadBalancingPool) write(request *request) (ResultSet, error) {
+	connection, err := pool.getLeastUsedConnection()
+	if err != nil {
+		return nil, err
+	}
+	return connection.write(request)
+}
+
+func (pool *loadBalancingPool) getLeastUsedConnection() (*connection, error) {
+	pool.loadBalanceLock.Lock()
+	defer pool.loadBalanceLock.Unlock()
+	if len(pool.connections) == 0 {
+		return pool.newConnection()
+	} else {
+		var leastUsed *connection = nil
+		validIndex := 0
+		for _, connection := range pool.connections {
+			// Purge dead connections from pool
+			if connection.state == established {
+				// Close and purge connections from pool if there is more than one being unused
+				if leastUsed != nil && (leastUsed.activeResults() == 0 && connection.activeResults() == 0) {
+					// Close the connection asynchronously since it is a high-latency method
+					go func() {
+						pool.logHandler.log(Debug, closeUnusedPoolConnection)
+						err := connection.close()
+						if err != nil {
+							pool.logHandler.logf(Warning, errorClosingConnection, err.Error())
+						}
+					}()
+
+					continue
+				}
+
+				// Mark connection as valid to keep
+				pool.connections[validIndex] = connection
+				validIndex++
+
+				// Set the least used connection
+				if leastUsed == nil || connection.activeResults() < leastUsed.activeResults() {
+					leastUsed = connection
+				}
+			} else {
+				pool.logHandler.log(Debug, purgingDeadConnection)
+			}
+		}
+
+		// Deallocate truncated dead connections to prevent memory leak

Review Comment:
   perhaps an attempt to gracefully close them first?



##########
gremlin-go/driver/connectionPool.go:
##########
@@ -0,0 +1,147 @@
+/*
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements.  See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership.  The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License.  You may obtain a copy of the License at
+
+http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied.  See the License for the
+specific language governing permissions and limitations
+under the License.
+*/
+
+package gremlingo
+
+import (
+	"sync"
+)
+
+type connectionPool interface {
+	write(*request) (ResultSet, error)
+	close()
+}
+
+const defaultNewConnectionThreshold = 4
+
+// loadBalancingPool has two configurations: maximumConcurrentConnections/cap(connections) and newConnectionThreshold.
+// maximumConcurrentConnections denotes the maximum amount of active connections at any given time.
+// newConnectionThreshold specifies the minimum amount of concurrent active traversals on the least used connection
+// which will trigger creation of a new connection if maximumConcurrentConnections has not bee reached.
+// loadBalancingPool will use the least-used connection, and as a part of the process, getLeastUsedConnection(), will
+// remove any unusable connections from the pool and ensure that the returned connection is usable. If there are
+// multiple active connections with no active traversals on them, one will be used and the others will be closed and
+// removed from the pool.
+type loadBalancingPool struct {
+	url          string
+	logHandler   *logHandler
+	connSettings *connectionSettings
+
+	newConnectionThreshold int
+	connections            []*connection
+	loadBalanceLock        sync.Mutex
+}
+
+func (pool *loadBalancingPool) close() {
+	for _, connection := range pool.connections {
+		err := connection.close()
+		if err != nil {
+			pool.logHandler.logf(Warning, errorClosingConnection, err.Error())
+		}
+	}
+}
+
+func (pool *loadBalancingPool) write(request *request) (ResultSet, error) {
+	connection, err := pool.getLeastUsedConnection()
+	if err != nil {
+		return nil, err
+	}
+	return connection.write(request)
+}
+
+func (pool *loadBalancingPool) getLeastUsedConnection() (*connection, error) {
+	pool.loadBalanceLock.Lock()
+	defer pool.loadBalanceLock.Unlock()
+	if len(pool.connections) == 0 {
+		return pool.newConnection()
+	} else {
+		var leastUsed *connection = nil
+		validIndex := 0
+		for _, connection := range pool.connections {
+			// Purge dead connections from pool
+			if connection.state == established {

Review Comment:
   this might end up purging "initialized" state connections too.



##########
gremlin-go/driver/connectionPool.go:
##########
@@ -0,0 +1,147 @@
+/*
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements.  See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership.  The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License.  You may obtain a copy of the License at
+
+http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied.  See the License for the
+specific language governing permissions and limitations
+under the License.
+*/
+
+package gremlingo
+
+import (
+	"sync"
+)
+
+type connectionPool interface {
+	write(*request) (ResultSet, error)
+	close()
+}
+
+const defaultNewConnectionThreshold = 4
+
+// loadBalancingPool has two configurations: maximumConcurrentConnections/cap(connections) and newConnectionThreshold.
+// maximumConcurrentConnections denotes the maximum amount of active connections at any given time.
+// newConnectionThreshold specifies the minimum amount of concurrent active traversals on the least used connection
+// which will trigger creation of a new connection if maximumConcurrentConnections has not bee reached.
+// loadBalancingPool will use the least-used connection, and as a part of the process, getLeastUsedConnection(), will
+// remove any unusable connections from the pool and ensure that the returned connection is usable. If there are
+// multiple active connections with no active traversals on them, one will be used and the others will be closed and
+// removed from the pool.
+type loadBalancingPool struct {
+	url          string
+	logHandler   *logHandler
+	connSettings *connectionSettings
+
+	newConnectionThreshold int
+	connections            []*connection
+	loadBalanceLock        sync.Mutex
+}
+
+func (pool *loadBalancingPool) close() {
+	for _, connection := range pool.connections {
+		err := connection.close()
+		if err != nil {
+			pool.logHandler.logf(Warning, errorClosingConnection, err.Error())
+		}
+	}
+}
+
+func (pool *loadBalancingPool) write(request *request) (ResultSet, error) {

Review Comment:
   what happens when write() is called when another thread is executing the close() method?



##########
gremlin-go/driver/connectionPool.go:
##########
@@ -0,0 +1,147 @@
+/*
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements.  See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership.  The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License.  You may obtain a copy of the License at
+
+http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied.  See the License for the
+specific language governing permissions and limitations
+under the License.
+*/
+
+package gremlingo
+
+import (
+	"sync"
+)
+
+type connectionPool interface {
+	write(*request) (ResultSet, error)
+	close()
+}
+
+const defaultNewConnectionThreshold = 4
+
+// loadBalancingPool has two configurations: maximumConcurrentConnections/cap(connections) and newConnectionThreshold.
+// maximumConcurrentConnections denotes the maximum amount of active connections at any given time.
+// newConnectionThreshold specifies the minimum amount of concurrent active traversals on the least used connection
+// which will trigger creation of a new connection if maximumConcurrentConnections has not bee reached.
+// loadBalancingPool will use the least-used connection, and as a part of the process, getLeastUsedConnection(), will
+// remove any unusable connections from the pool and ensure that the returned connection is usable. If there are
+// multiple active connections with no active traversals on them, one will be used and the others will be closed and
+// removed from the pool.
+type loadBalancingPool struct {
+	url          string
+	logHandler   *logHandler
+	connSettings *connectionSettings
+
+	newConnectionThreshold int
+	connections            []*connection
+	loadBalanceLock        sync.Mutex
+}
+
+func (pool *loadBalancingPool) close() {
+	for _, connection := range pool.connections {
+		err := connection.close()
+		if err != nil {
+			pool.logHandler.logf(Warning, errorClosingConnection, err.Error())
+		}
+	}
+}
+
+func (pool *loadBalancingPool) write(request *request) (ResultSet, error) {
+	connection, err := pool.getLeastUsedConnection()
+	if err != nil {
+		return nil, err
+	}
+	return connection.write(request)
+}
+
+func (pool *loadBalancingPool) getLeastUsedConnection() (*connection, error) {
+	pool.loadBalanceLock.Lock()
+	defer pool.loadBalanceLock.Unlock()
+	if len(pool.connections) == 0 {
+		return pool.newConnection()
+	} else {

Review Comment:
   This else block is not required since there is no code to execute outside the else block.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@tinkerpop.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org