You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@accumulo.apache.org by kt...@apache.org on 2014/01/07 03:05:25 UTC

[5/5] git commit: Merge remote-tracking branch 'origin/1.5.1-SNAPSHOT' into 1.6.0-SNAPSHOT

Merge remote-tracking branch 'origin/1.5.1-SNAPSHOT' into 1.6.0-SNAPSHOT

Conflicts:
	core/src/main/java/org/apache/accumulo/core/client/impl/ThriftTransportPool.java


Project: http://git-wip-us.apache.org/repos/asf/accumulo/repo
Commit: http://git-wip-us.apache.org/repos/asf/accumulo/commit/9c092cad
Tree: http://git-wip-us.apache.org/repos/asf/accumulo/tree/9c092cad
Diff: http://git-wip-us.apache.org/repos/asf/accumulo/diff/9c092cad

Branch: refs/heads/1.6.0-SNAPSHOT
Commit: 9c092cadde75250fa84e049eeae33687636dbc99
Parents: ee3ccb8 400b991
Author: Keith Turner <kt...@apache.org>
Authored: Mon Jan 6 21:11:02 2014 -0500
Committer: Keith Turner <kt...@apache.org>
Committed: Mon Jan 6 21:11:02 2014 -0500

----------------------------------------------------------------------
 .../core/client/impl/ThriftTransportPool.java   | 100 +++++++++++---
 .../org/apache/accumulo/core/util/CleanUp.java  |  63 +++++++++
 .../accumulo/fate/zookeeper/ZooSession.java     |  23 +++-
 .../accumulo/test/functional/CleanUpIT.java     | 137 +++++++++++++++++++
 4 files changed, 304 insertions(+), 19 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/accumulo/blob/9c092cad/core/src/main/java/org/apache/accumulo/core/client/impl/ThriftTransportPool.java
----------------------------------------------------------------------
diff --cc core/src/main/java/org/apache/accumulo/core/client/impl/ThriftTransportPool.java
index a553cc1,f123289..a5fecd5
--- a/core/src/main/java/org/apache/accumulo/core/client/impl/ThriftTransportPool.java
+++ b/core/src/main/java/org/apache/accumulo/core/client/impl/ThriftTransportPool.java
@@@ -357,35 -376,20 +375,35 @@@ public class ThriftTransportPool 
    
    private ThriftTransportPool() {}
    
 -  public TTransport getTransport(String location, int port) throws TTransportException {
 -    return getTransport(location, port, 0);
 -  }
 -  
 -  public TTransport getTransportWithDefaultTimeout(InetSocketAddress addr, AccumuloConfiguration conf) throws TTransportException {
 -    return getTransport(addr.getAddress().getHostAddress(), addr.getPort(), conf.getTimeInMillis(Property.GENERAL_RPC_TIMEOUT));
 +  public TTransport getTransportWithDefaultTimeout(HostAndPort addr, AccumuloConfiguration conf) throws TTransportException {
 +    return getTransport(String.format("%s:%d", addr.getHostText(), addr.getPort()), conf.getTimeInMillis(Property.GENERAL_RPC_TIMEOUT), SslConnectionParams.forClient(conf));
    }
    
 -  public TTransport getTransport(InetSocketAddress addr, long timeout) throws TTransportException {
 -    return getTransport(addr.getAddress().getHostAddress(), addr.getPort(), timeout);
 +  public TTransport getTransport(String location, long milliseconds, SslConnectionParams sslParams) throws TTransportException {
 +    return getTransport(new ThriftTransportKey(location, milliseconds, sslParams));
    }
    
 -  public TTransport getTransportWithDefaultTimeout(String location, int port, AccumuloConfiguration conf) throws TTransportException {
 -    return getTransport(location, port, conf.getTimeInMillis(Property.GENERAL_RPC_TIMEOUT));
 +  private TTransport getTransport(ThriftTransportKey cacheKey) throws TTransportException {
 +    synchronized (this) {
 +      // atomically reserve location if it exist in cache
-       List<CachedConnection> ccl = cache.get(cacheKey);
++      List<CachedConnection> ccl = getCache().get(cacheKey);
 +      
 +      if (ccl == null) {
 +        ccl = new LinkedList<CachedConnection>();
-         cache.put(cacheKey, ccl);
++        getCache().put(cacheKey, ccl);
 +      }
 +      
 +      for (CachedConnection cachedConnection : ccl) {
 +        if (!cachedConnection.isReserved()) {
 +          cachedConnection.setReserved(true);
 +          if (log.isTraceEnabled())
 +            log.trace("Using existing connection to " + cacheKey.getLocation() + ":" + cacheKey.getPort());
 +          return cachedConnection.transport;
 +        }
 +      }
 +    }
 +    
 +    return createNewTransport(cacheKey);
    }
    
    Pair<String,TTransport> getAnyTransport(List<ThriftTransportKey> servers, boolean preferCachedConnection) throws TTransportException {
@@@ -484,9 -531,9 +507,9 @@@
      CachedTTransport ctsc = (CachedTTransport) tsc;
      
      ArrayList<CachedConnection> closeList = new ArrayList<ThriftTransportPool.CachedConnection>();
 -
 +    
      synchronized (this) {
-       List<CachedConnection> ccl = cache.get(ctsc.getCacheKey());
+       List<CachedConnection> ccl = getCache().get(ctsc.getCacheKey());
        for (Iterator<CachedConnection> iterator = ccl.iterator(); iterator.hasNext();) {
          CachedConnection cachedConnection = iterator.next();
          if (cachedConnection.transport == tsc) {

http://git-wip-us.apache.org/repos/asf/accumulo/blob/9c092cad/fate/src/main/java/org/apache/accumulo/fate/zookeeper/ZooSession.java
----------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/accumulo/blob/9c092cad/test/src/test/java/org/apache/accumulo/test/functional/CleanUpIT.java
----------------------------------------------------------------------
diff --cc test/src/test/java/org/apache/accumulo/test/functional/CleanUpIT.java
index 0000000,0000000..ad5b2fd
new file mode 100644
--- /dev/null
+++ b/test/src/test/java/org/apache/accumulo/test/functional/CleanUpIT.java
@@@ -1,0 -1,0 +1,137 @@@
++/*
++ * Licensed to the Apache Software Foundation (ASF) under one or more
++ * contributor license agreements.  See the NOTICE file distributed with
++ * this work for additional information regarding copyright ownership.
++ * The ASF licenses this file to You under the Apache License, Version 2.0
++ * (the "License"); you may not use this file except in compliance with
++ * the License.  You may obtain a copy of the License at
++ *
++ *     http://www.apache.org/licenses/LICENSE-2.0
++ *
++ * Unless required by applicable law or agreed to in writing, software
++ * distributed under the License is distributed on an "AS IS" BASIS,
++ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
++ * See the License for the specific language governing permissions and
++ * limitations under the License.
++ */
++package org.apache.accumulo.test.functional;
++
++import java.util.Iterator;
++import java.util.Map.Entry;
++import java.util.Set;
++
++import org.apache.accumulo.core.client.BatchWriter;
++import org.apache.accumulo.core.client.BatchWriterConfig;
++import org.apache.accumulo.core.client.Scanner;
++import org.apache.accumulo.core.data.Key;
++import org.apache.accumulo.core.data.Mutation;
++import org.apache.accumulo.core.data.Value;
++import org.apache.accumulo.core.security.Authorizations;
++import org.apache.accumulo.core.util.CleanUp;
++import org.junit.Test;
++
++/**
++ * 
++ */
++public class CleanUpIT extends SimpleMacIT {
++  @Test(timeout = 30 * 1000)
++  public void run() throws Exception {
++
++    String tableName = getTableNames(1)[0];
++    getConnector().tableOperations().create(tableName);
++
++    BatchWriter bw = getConnector().createBatchWriter(tableName, new BatchWriterConfig());
++
++    Mutation m1 = new Mutation("r1");
++    m1.put("cf1", "cq1", 1, "5");
++
++    bw.addMutation(m1);
++
++    bw.flush();
++
++    Scanner scanner = getConnector().createScanner(tableName, new Authorizations());
++
++    int count = 0;
++    for (Entry<Key,Value> entry : scanner) {
++      count++;
++      if (!entry.getValue().toString().equals("5")) {
++        throw new Exception("Unexpected value " + entry.getValue());
++      }
++    }
++
++    if (count != 1) {
++      throw new Exception("Unexpected count " + count);
++    }
++
++    if (countThreads() < 2) {
++      printThreadNames();
++      throw new Exception("Not seeing expected threads");
++    }
++
++    CleanUp.shutdownNow();
++
++    Mutation m2 = new Mutation("r2");
++    m2.put("cf1", "cq1", 1, "6");
++
++    try {
++      bw.addMutation(m1);
++      bw.flush();
++      throw new Exception("batch writer did not fail");
++    } catch (Exception e) {
++
++    }
++
++    try {
++      // expect this to fail also, want to clean up batch writer threads
++      bw.close();
++      throw new Exception("batch writer close not fail");
++    } catch (Exception e) {
++
++    }
++
++    try {
++      count = 0;
++      Iterator<Entry<Key,Value>> iter = scanner.iterator();
++      while (iter.hasNext()) {
++        iter.next();
++        count++;
++      }
++      throw new Exception("scanner did not fail");
++    } catch (Exception e) {
++
++    }
++
++    if (countThreads() > 0) {
++      printThreadNames();
++      throw new Exception("Threads did not go away");
++    }
++  }
++
++  private void printThreadNames() {
++    Set<Thread> threads = Thread.getAllStackTraces().keySet();
++    for (Thread thread : threads) {
++      System.out.println("thread name:" + thread.getName());
++      thread.getStackTrace();
++
++    }
++  }
++
++  /**
++   * count threads that should be cleaned up
++   * 
++   */
++  private int countThreads() {
++    int count = 0;
++    Set<Thread> threads = Thread.getAllStackTraces().keySet();
++    for (Thread thread : threads) {
++
++      if (thread.getName().toLowerCase().contains("sendthread") || thread.getName().toLowerCase().contains("eventthread"))
++        count++;
++
++      if (thread.getName().toLowerCase().contains("thrift") && thread.getName().toLowerCase().contains("pool"))
++        count++;
++    }
++
++    return count;
++  }
++}