You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@accumulo.apache.org by el...@apache.org on 2015/02/13 04:28:15 UTC

[6/9] accumulo git commit: ACCUMULO-3574 Add an IT for testing transport caching.

ACCUMULO-3574 Add an IT for testing transport caching.


Project: http://git-wip-us.apache.org/repos/asf/accumulo/repo
Commit: http://git-wip-us.apache.org/repos/asf/accumulo/commit/1213ee2b
Tree: http://git-wip-us.apache.org/repos/asf/accumulo/tree/1213ee2b
Diff: http://git-wip-us.apache.org/repos/asf/accumulo/diff/1213ee2b

Branch: refs/heads/master
Commit: 1213ee2beaa4ac970c7bb6ba84355bae049cb4c4
Parents: 6111043
Author: Josh Elser <el...@apache.org>
Authored: Thu Feb 12 19:18:36 2015 -0500
Committer: Josh Elser <el...@apache.org>
Committed: Thu Feb 12 19:18:36 2015 -0500

----------------------------------------------------------------------
 .../core/client/impl/ThriftTransportKey.java    |   8 +-
 .../core/client/impl/ThriftTransportPool.java   |   4 +-
 .../accumulo/test/TransportCachingIT.java       | 115 +++++++++++++++++++
 3 files changed, 124 insertions(+), 3 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/accumulo/blob/1213ee2b/core/src/main/java/org/apache/accumulo/core/client/impl/ThriftTransportKey.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/accumulo/core/client/impl/ThriftTransportKey.java b/core/src/main/java/org/apache/accumulo/core/client/impl/ThriftTransportKey.java
index de33941..8e3ee47 100644
--- a/core/src/main/java/org/apache/accumulo/core/client/impl/ThriftTransportKey.java
+++ b/core/src/main/java/org/apache/accumulo/core/client/impl/ThriftTransportKey.java
@@ -19,7 +19,10 @@ package org.apache.accumulo.core.client.impl;
 import org.apache.accumulo.core.util.ArgumentChecker;
 import org.apache.accumulo.core.util.SslConnectionParams;
 
-class ThriftTransportKey {
+import com.google.common.annotations.VisibleForTesting;
+
+@VisibleForTesting
+public class ThriftTransportKey {
   private final String location;
   private final int port;
   private final long timeout;
@@ -27,7 +30,8 @@ class ThriftTransportKey {
 
   private int hash = -1;
 
-  ThriftTransportKey(String location, long timeout, SslConnectionParams sslParams) {
+  @VisibleForTesting
+  public ThriftTransportKey(String location, long timeout, SslConnectionParams sslParams) {
     ArgumentChecker.notNull(location);
     String[] locationAndPort = location.split(":", 2);
     if (locationAndPort.length == 2) {

http://git-wip-us.apache.org/repos/asf/accumulo/blob/1213ee2b/core/src/main/java/org/apache/accumulo/core/client/impl/ThriftTransportPool.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/accumulo/core/client/impl/ThriftTransportPool.java b/core/src/main/java/org/apache/accumulo/core/client/impl/ThriftTransportPool.java
index 575a537..33997e0 100644
--- a/core/src/main/java/org/apache/accumulo/core/client/impl/ThriftTransportPool.java
+++ b/core/src/main/java/org/apache/accumulo/core/client/impl/ThriftTransportPool.java
@@ -41,6 +41,7 @@ import org.apache.log4j.Logger;
 import org.apache.thrift.transport.TTransport;
 import org.apache.thrift.transport.TTransportException;
 
+import com.google.common.annotations.VisibleForTesting;
 import com.google.common.net.HostAndPort;
 
 public class ThriftTransportPool {
@@ -420,7 +421,8 @@ public class ThriftTransportPool {
     return createNewTransport(cacheKey);
   }
 
-  Pair<String,TTransport> getAnyTransport(List<ThriftTransportKey> servers, boolean preferCachedConnection) throws TTransportException {
+  @VisibleForTesting
+  public Pair<String,TTransport> getAnyTransport(List<ThriftTransportKey> servers, boolean preferCachedConnection) throws TTransportException {
 
     servers = new ArrayList<ThriftTransportKey>(servers);
 

http://git-wip-us.apache.org/repos/asf/accumulo/blob/1213ee2b/test/src/test/java/org/apache/accumulo/test/TransportCachingIT.java
----------------------------------------------------------------------
diff --git a/test/src/test/java/org/apache/accumulo/test/TransportCachingIT.java b/test/src/test/java/org/apache/accumulo/test/TransportCachingIT.java
new file mode 100644
index 0000000..ddbd3e8
--- /dev/null
+++ b/test/src/test/java/org/apache/accumulo/test/TransportCachingIT.java
@@ -0,0 +1,115 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.accumulo.test;
+
+import static com.google.common.base.Charsets.UTF_8;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertTrue;
+
+import java.util.ArrayList;
+
+import org.apache.accumulo.core.Constants;
+import org.apache.accumulo.core.client.Connector;
+import org.apache.accumulo.core.client.Instance;
+import org.apache.accumulo.core.client.impl.ServerConfigurationUtil;
+import org.apache.accumulo.core.client.impl.ThriftTransportKey;
+import org.apache.accumulo.core.client.impl.ThriftTransportPool;
+import org.apache.accumulo.core.conf.DefaultConfiguration;
+import org.apache.accumulo.core.conf.Property;
+import org.apache.accumulo.core.util.ServerServices;
+import org.apache.accumulo.core.util.ServerServices.Service;
+import org.apache.accumulo.core.util.SslConnectionParams;
+import org.apache.accumulo.core.zookeeper.ZooUtil;
+import org.apache.accumulo.fate.zookeeper.ZooCache;
+import org.apache.accumulo.fate.zookeeper.ZooCacheFactory;
+import org.apache.accumulo.harness.AccumuloClusterIT;
+import org.apache.thrift.transport.TTransport;
+import org.apache.thrift.transport.TTransportException;
+import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Test that {@link ThriftTransportPool} actually adheres to the cachedConnection argument
+ */
+public class TransportCachingIT extends AccumuloClusterIT {
+  private static final Logger log = LoggerFactory.getLogger(TransportCachingIT.class);
+
+  @Test
+  public void testCachedTransport() {
+    Connector conn = getConnector();
+    Instance instance = conn.getInstance();
+    long rpcTimeout = DefaultConfiguration.getTimeInMillis(Property.GENERAL_RPC_TIMEOUT.getDefaultValue());
+
+    // create list of servers
+    ArrayList<ThriftTransportKey> servers = new ArrayList<ThriftTransportKey>();
+
+    // add tservers
+    ZooCache zc = new ZooCacheFactory().getZooCache(instance.getZooKeepers(), instance.getZooKeepersSessionTimeOut());
+    for (String tserver : zc.getChildren(ZooUtil.getRoot(instance) + Constants.ZTSERVERS)) {
+      String path = ZooUtil.getRoot(instance) + Constants.ZTSERVERS + "/" + tserver;
+      byte[] data = ZooUtil.getLockData(zc, path);
+      if (data != null && !new String(data, UTF_8).equals("master"))
+        servers.add(new ThriftTransportKey(new ServerServices(new String(data)).getAddressString(Service.TSERV_CLIENT), rpcTimeout, SslConnectionParams
+            .forClient(ServerConfigurationUtil.getConfiguration(instance))));
+    }
+
+    ThriftTransportPool pool = ThriftTransportPool.getInstance();
+    TTransport first = null;
+    while (null == first) {
+      try {
+        // Get a transport (cached or not)
+        first = pool.getAnyTransport(servers, true).getSecond();
+      } catch (TTransportException e) {
+        log.warn("Failed to obtain transport to " + servers);
+      }
+    }
+
+    assertNotNull(first);
+    // Return it to unreserve it
+    pool.returnTransport(first);
+
+    TTransport second = null;
+    while (null == second) {
+      try {
+        // Get a cached transport (should be the first)
+        second = pool.getAnyTransport(servers, true).getSecond();
+      } catch (TTransportException e) {
+        log.warn("Failed obtain 2nd transport to " + servers);
+      }
+    }
+
+    // We should get the same transport
+    assertTrue("Expected the first and second to be the same instance", first == second);
+    // Return the 2nd
+    pool.returnTransport(second);
+
+    TTransport third = null;
+    while (null == third) {
+      try {
+        // Get a non-cached transport
+        third = pool.getAnyTransport(servers, false).getSecond();
+      } catch (TTransportException e) {
+        log.warn("Failed obtain 2nd transport to " + servers);
+      }
+    }
+
+    assertFalse("Expected second and third transport to be different instances", second == third);
+    pool.returnTransport(third);
+  }
+}