You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@accumulo.apache.org by ct...@apache.org on 2015/01/15 23:42:00 UTC

[1/6] accumulo git commit: ACCUMULO-2997 Improve comment about race condition

Repository: accumulo
Updated Branches:
  refs/heads/master c032ec4b2 -> aae9e977f


ACCUMULO-2997 Improve comment about race condition

Improves the comment in the threadpool size adjustment code, to clarify what the
race condition is, and why it's not really a problem.


Project: http://git-wip-us.apache.org/repos/asf/accumulo/repo
Commit: http://git-wip-us.apache.org/repos/asf/accumulo/commit/2016ab93
Tree: http://git-wip-us.apache.org/repos/asf/accumulo/tree/2016ab93
Diff: http://git-wip-us.apache.org/repos/asf/accumulo/diff/2016ab93

Branch: refs/heads/master
Commit: 2016ab93cc8960bf9aea0d196cb2297abf6c74ad
Parents: 87eec6f
Author: Christopher Tubbs <ct...@apache.org>
Authored: Thu Jan 15 16:07:38 2015 -0500
Committer: Christopher Tubbs <ct...@apache.org>
Committed: Thu Jan 15 16:07:38 2015 -0500

----------------------------------------------------------------------
 .../java/org/apache/accumulo/server/util/TServerUtils.java     | 6 ++----
 1 file changed, 2 insertions(+), 4 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/accumulo/blob/2016ab93/server/src/main/java/org/apache/accumulo/server/util/TServerUtils.java
----------------------------------------------------------------------
diff --git a/server/src/main/java/org/apache/accumulo/server/util/TServerUtils.java b/server/src/main/java/org/apache/accumulo/server/util/TServerUtils.java
index bece746..ba93227 100644
--- a/server/src/main/java/org/apache/accumulo/server/util/TServerUtils.java
+++ b/server/src/main/java/org/apache/accumulo/server/util/TServerUtils.java
@@ -221,6 +221,8 @@ public class TServerUtils {
     SimpleTimer.getInstance().schedule(new Runnable() {
       @Override
       public void run() {
+        // there is a minor race condition between sampling the current state of the thread pool and adjusting it
+        // however, this isn't really an issue, since it adjusts periodically anyway
         if (pool.getCorePoolSize() <= pool.getActiveCount()) {
           int larger = pool.getCorePoolSize() + Math.min(pool.getQueue().size(), 2);
           log.info("Increasing server thread pool size on " + serverName + " to " + larger);
@@ -230,10 +232,6 @@ public class TServerUtils {
           if (pool.getCorePoolSize() > pool.getActiveCount() + 3) {
             int smaller = Math.max(numThreads, pool.getCorePoolSize() - 1);
             if (smaller != pool.getCorePoolSize()) {
-              // ACCUMULO-2997 there is a race condition here... the active count could be higher by the time
-              // we decrease the core pool size... so the active count could end up higher than
-              // the core pool size, in which case everything will be queued... the increase case
-              // should handle this and prevent deadlock
               log.info("Decreasing server thread pool size on " + serverName + " to " + smaller);
               pool.setCorePoolSize(smaller);
             }


[6/6] accumulo git commit: Revert "ACCUMULO-3197 Remove deprecated Instance.getConfiguration()"

Posted by ct...@apache.org.
Revert "ACCUMULO-3197 Remove deprecated Instance.getConfiguration()"

This reverts commit 8fe826dd119289165cb19e9cdf922bb4ce933324.

Conflicts:
	core/src/main/java/org/apache/accumulo/core/client/ZooKeeperInstance.java
	core/src/main/java/org/apache/accumulo/core/client/impl/ConditionalWriterImpl.java
	core/src/main/java/org/apache/accumulo/core/client/impl/InstanceOperationsImpl.java
	core/src/main/java/org/apache/accumulo/core/client/impl/MasterClient.java
	core/src/main/java/org/apache/accumulo/core/client/impl/ReplicationClient.java
	core/src/main/java/org/apache/accumulo/core/client/impl/ScannerIterator.java
	core/src/main/java/org/apache/accumulo/core/client/impl/ServerClient.java
	core/src/main/java/org/apache/accumulo/core/client/impl/ServerConfigurationUtil.java
	core/src/main/java/org/apache/accumulo/core/client/impl/TableOperationsImpl.java
	core/src/main/java/org/apache/accumulo/core/client/impl/TabletServerBatchReaderIterator.java
	core/src/main/java/org/apache/accumulo/core/client/impl/TabletServerBatchWriter.java
	core/src/main/java/org/apache/accumulo/core/client/impl/Writer.java
	core/src/main/java/org/apache/accumulo/core/metadata/MetadataLocationObtainer.java
	core/src/test/java/org/apache/accumulo/core/client/impl/ClientConfigurationHelperTest.java
	core/src/test/java/org/apache/accumulo/core/client/impl/ClientContextTest.java
	core/src/test/java/org/apache/accumulo/core/client/impl/ServerConfigurationUtilTest.java
	server/tserver/src/main/java/org/apache/accumulo/tserver/replication/AccumuloReplicaSystem.java
	server/tserver/src/main/java/org/apache/accumulo/tserver/replication/ReplicationServicerHandler.java
	shell/src/main/java/org/apache/accumulo/shell/Shell.java
	test/src/test/java/org/apache/accumulo/server/security/SystemCredentialsIT.java


Project: http://git-wip-us.apache.org/repos/asf/accumulo/repo
Commit: http://git-wip-us.apache.org/repos/asf/accumulo/commit/aae9e977
Tree: http://git-wip-us.apache.org/repos/asf/accumulo/tree/aae9e977
Diff: http://git-wip-us.apache.org/repos/asf/accumulo/diff/aae9e977

Branch: refs/heads/master
Commit: aae9e977fe5f1e078851d4c9cac46b4418519c8a
Parents: 112f5e7
Author: Christopher Tubbs <ct...@apache.org>
Authored: Thu Jan 15 17:33:03 2015 -0500
Committer: Christopher Tubbs <ct...@apache.org>
Committed: Thu Jan 15 17:41:35 2015 -0500

----------------------------------------------------------------------
 .../apache/accumulo/core/client/Instance.java   | 23 ++++++++++++++++++++
 .../accumulo/core/client/ZooKeeperInstance.java | 14 ++++++++++++
 .../accumulo/core/client/mock/MockInstance.java | 16 ++++++++++++++
 .../core/client/impl/TabletLocatorImplTest.java | 13 +++++++++++
 .../accumulo/server/client/HdfsZooInstance.java | 15 +++++++++++++
 .../server/security/SystemCredentialsIT.java    | 23 ++++++++++++++++++++
 6 files changed, 104 insertions(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/accumulo/blob/aae9e977/core/src/main/java/org/apache/accumulo/core/client/Instance.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/accumulo/core/client/Instance.java b/core/src/main/java/org/apache/accumulo/core/client/Instance.java
index ff6375b..8a70d4c 100644
--- a/core/src/main/java/org/apache/accumulo/core/client/Instance.java
+++ b/core/src/main/java/org/apache/accumulo/core/client/Instance.java
@@ -19,8 +19,10 @@ package org.apache.accumulo.core.client;
 import java.nio.ByteBuffer;
 import java.util.List;
 
+import org.apache.accumulo.core.client.admin.InstanceOperations;
 import org.apache.accumulo.core.client.security.tokens.AuthenticationToken;
 import org.apache.accumulo.core.client.security.tokens.PasswordToken;
+import org.apache.accumulo.core.conf.AccumuloConfiguration;
 
 /**
  * This class represents the information a client needs to know to connect to an instance of accumulo.
@@ -121,6 +123,27 @@ public interface Instance {
   Connector getConnector(String user, CharSequence pass) throws AccumuloException, AccumuloSecurityException;
 
   /**
+   * Returns the AccumuloConfiguration to use when interacting with this instance.
+   *
+   * @return the AccumuloConfiguration that specifies properties related to interacting with this instance
+   * @deprecated since 1.6.0. This method makes very little sense in the context of the client API and never should have been exposed.
+   * @see InstanceOperations#getSystemConfiguration() for client-side reading of the server-side configuration.
+   */
+  @Deprecated
+  AccumuloConfiguration getConfiguration();
+
+  /**
+   * Set the AccumuloConfiguration to use when interacting with this instance.
+   *
+   * @param conf
+   *          accumulo configuration
+   * @deprecated since 1.6.0. This method makes very little sense in the context of the client API and never should have been exposed.
+   * @see InstanceOperations#setProperty(String, String)
+   */
+  @Deprecated
+  void setConfiguration(AccumuloConfiguration conf);
+
+  /**
    * Returns a connection to this instance of accumulo.
    *
    * @param principal

http://git-wip-us.apache.org/repos/asf/accumulo/blob/aae9e977/core/src/main/java/org/apache/accumulo/core/client/ZooKeeperInstance.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/accumulo/core/client/ZooKeeperInstance.java b/core/src/main/java/org/apache/accumulo/core/client/ZooKeeperInstance.java
index 7c8f2e2..2baa856 100644
--- a/core/src/main/java/org/apache/accumulo/core/client/ZooKeeperInstance.java
+++ b/core/src/main/java/org/apache/accumulo/core/client/ZooKeeperInstance.java
@@ -31,6 +31,7 @@ import org.apache.accumulo.core.client.impl.ConnectorImpl;
 import org.apache.accumulo.core.client.security.tokens.AuthenticationToken;
 import org.apache.accumulo.core.client.security.tokens.PasswordToken;
 import org.apache.accumulo.core.conf.AccumuloConfiguration;
+import org.apache.accumulo.core.conf.DefaultConfiguration;
 import org.apache.accumulo.core.metadata.RootTable;
 import org.apache.accumulo.core.security.Credentials;
 import org.apache.accumulo.core.util.ByteBufferUtil;
@@ -71,6 +72,7 @@ public class ZooKeeperInstance implements Instance {
 
   private final int zooKeepersSessionTimeOut;
 
+  private AccumuloConfiguration conf;
   private ClientConfiguration clientConf;
 
   /**
@@ -247,6 +249,18 @@ public class ZooKeeperInstance implements Instance {
     return getConnector(principal, new PasswordToken(pass));
   }
 
+  @Override
+  @Deprecated
+  public AccumuloConfiguration getConfiguration() {
+    return conf = conf == null ? DefaultConfiguration.getInstance() : ClientContext.convertClientConfig(clientConf);
+  }
+
+  @Override
+  @Deprecated
+  public void setConfiguration(AccumuloConfiguration conf) {
+    this.conf = conf;
+  }
+
   /**
    * Given a zooCache and instanceId, look up the instance name.
    */

http://git-wip-us.apache.org/repos/asf/accumulo/blob/aae9e977/core/src/main/java/org/apache/accumulo/core/client/mock/MockInstance.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/accumulo/core/client/mock/MockInstance.java b/core/src/main/java/org/apache/accumulo/core/client/mock/MockInstance.java
index 67435d2..01dc91b 100644
--- a/core/src/main/java/org/apache/accumulo/core/client/mock/MockInstance.java
+++ b/core/src/main/java/org/apache/accumulo/core/client/mock/MockInstance.java
@@ -30,6 +30,8 @@ import org.apache.accumulo.core.client.Instance;
 import org.apache.accumulo.core.client.impl.thrift.SecurityErrorCode;
 import org.apache.accumulo.core.client.security.tokens.AuthenticationToken;
 import org.apache.accumulo.core.client.security.tokens.PasswordToken;
+import org.apache.accumulo.core.conf.AccumuloConfiguration;
+import org.apache.accumulo.core.conf.DefaultConfiguration;
 import org.apache.accumulo.core.security.Credentials;
 import org.apache.accumulo.core.util.ByteBufferUtil;
 import org.apache.accumulo.core.util.CachedConfiguration;
@@ -134,6 +136,20 @@ public class MockInstance implements Instance {
     return getConnector(user, TextUtil.getBytes(new Text(pass.toString())));
   }
 
+  AccumuloConfiguration conf = null;
+
+  @Deprecated
+  @Override
+  public AccumuloConfiguration getConfiguration() {
+    return conf == null ? DefaultConfiguration.getInstance() : conf;
+  }
+
+  @Override
+  @Deprecated
+  public void setConfiguration(AccumuloConfiguration conf) {
+    this.conf = conf;
+  }
+
   @Override
   public Connector getConnector(String principal, AuthenticationToken token) throws AccumuloException, AccumuloSecurityException {
     Connector conn = new MockConnector(new Credentials(principal, token), acu, this);

http://git-wip-us.apache.org/repos/asf/accumulo/blob/aae9e977/core/src/test/java/org/apache/accumulo/core/client/impl/TabletLocatorImplTest.java
----------------------------------------------------------------------
diff --git a/core/src/test/java/org/apache/accumulo/core/client/impl/TabletLocatorImplTest.java b/core/src/test/java/org/apache/accumulo/core/client/impl/TabletLocatorImplTest.java
index 2181451..939a64c 100644
--- a/core/src/test/java/org/apache/accumulo/core/client/impl/TabletLocatorImplTest.java
+++ b/core/src/test/java/org/apache/accumulo/core/client/impl/TabletLocatorImplTest.java
@@ -45,6 +45,7 @@ import org.apache.accumulo.core.client.impl.TabletLocator.TabletServerMutations;
 import org.apache.accumulo.core.client.impl.TabletLocatorImpl.TabletLocationObtainer;
 import org.apache.accumulo.core.client.impl.TabletLocatorImpl.TabletServerLockChecker;
 import org.apache.accumulo.core.client.security.tokens.AuthenticationToken;
+import org.apache.accumulo.core.conf.AccumuloConfiguration;
 import org.apache.accumulo.core.data.Key;
 import org.apache.accumulo.core.data.KeyExtent;
 import org.apache.accumulo.core.data.Mutation;
@@ -451,6 +452,18 @@ public class TabletLocatorImplTest {
       throw new UnsupportedOperationException();
     }
 
+    @Deprecated
+    @Override
+    public AccumuloConfiguration getConfiguration() {
+      throw new UnsupportedOperationException();
+    }
+
+    @Override
+    @Deprecated
+    public void setConfiguration(AccumuloConfiguration conf) {
+      throw new UnsupportedOperationException();
+    }
+
     @Override
     @Deprecated
     public Connector getConnector(String user, CharSequence pass) throws AccumuloException, AccumuloSecurityException {

http://git-wip-us.apache.org/repos/asf/accumulo/blob/aae9e977/server/base/src/main/java/org/apache/accumulo/server/client/HdfsZooInstance.java
----------------------------------------------------------------------
diff --git a/server/base/src/main/java/org/apache/accumulo/server/client/HdfsZooInstance.java b/server/base/src/main/java/org/apache/accumulo/server/client/HdfsZooInstance.java
index 3175fff..f64f941 100644
--- a/server/base/src/main/java/org/apache/accumulo/server/client/HdfsZooInstance.java
+++ b/server/base/src/main/java/org/apache/accumulo/server/client/HdfsZooInstance.java
@@ -46,6 +46,7 @@ import org.apache.accumulo.core.zookeeper.ZooUtil;
 import org.apache.accumulo.fate.zookeeper.ZooCache;
 import org.apache.accumulo.fate.zookeeper.ZooCacheFactory;
 import org.apache.accumulo.server.Accumulo;
+import org.apache.accumulo.server.conf.ServerConfigurationFactory;
 import org.apache.accumulo.server.fs.VolumeManager;
 import org.apache.accumulo.server.fs.VolumeManagerImpl;
 import org.apache.accumulo.server.zookeeper.ZooLock;
@@ -176,6 +177,20 @@ public class HdfsZooInstance implements Instance {
     return getConnector(user, TextUtil.getBytes(new Text(pass.toString())));
   }
 
+  private AccumuloConfiguration conf = null;
+
+  @Deprecated
+  @Override
+  public AccumuloConfiguration getConfiguration() {
+    return conf = conf == null ? new ServerConfigurationFactory(this).getConfiguration() : conf;
+  }
+
+  @Override
+  @Deprecated
+  public void setConfiguration(AccumuloConfiguration conf) {
+    this.conf = conf;
+  }
+
   public static void main(String[] args) {
     Instance instance = HdfsZooInstance.getInstance();
     System.out.println("Instance Name: " + instance.getInstanceName());

http://git-wip-us.apache.org/repos/asf/accumulo/blob/aae9e977/test/src/test/java/org/apache/accumulo/server/security/SystemCredentialsIT.java
----------------------------------------------------------------------
diff --git a/test/src/test/java/org/apache/accumulo/server/security/SystemCredentialsIT.java b/test/src/test/java/org/apache/accumulo/server/security/SystemCredentialsIT.java
index 3889110..216ac3e 100644
--- a/test/src/test/java/org/apache/accumulo/server/security/SystemCredentialsIT.java
+++ b/test/src/test/java/org/apache/accumulo/server/security/SystemCredentialsIT.java
@@ -31,6 +31,7 @@ import org.apache.accumulo.core.client.TableNotFoundException;
 import org.apache.accumulo.core.client.security.SecurityErrorCode;
 import org.apache.accumulo.core.client.security.tokens.AuthenticationToken;
 import org.apache.accumulo.core.client.security.tokens.PasswordToken;
+import org.apache.accumulo.core.conf.AccumuloConfiguration;
 import org.apache.accumulo.core.data.Key;
 import org.apache.accumulo.core.data.Value;
 import org.apache.accumulo.core.metadata.RootTable;
@@ -63,6 +64,12 @@ public class SystemCredentialsIT extends ConfigurableMacIT {
     if (args[0].equals("bad")) {
       Instance inst = new Instance() {
 
+        @Deprecated
+        @Override
+        public void setConfiguration(AccumuloConfiguration conf) {
+          throw new UnsupportedOperationException();
+        }
+
         @Override
         public int getZooKeepersSessionTimeOut() {
           throw new UnsupportedOperationException();
@@ -116,6 +123,12 @@ public class SystemCredentialsIT extends ConfigurableMacIT {
           throw new UnsupportedOperationException();
         }
 
+        @Deprecated
+        @Override
+        public AccumuloConfiguration getConfiguration() {
+          throw new UnsupportedOperationException();
+        }
+
       };
       creds = SystemCredentials.get(inst);
     } else if (args[0].equals("good")) {
@@ -176,6 +189,16 @@ public class SystemCredentialsIT extends ConfigurableMacIT {
           throw new UnsupportedOperationException();
         }
 
+        @Override
+        public AccumuloConfiguration getConfiguration() {
+          throw new UnsupportedOperationException();
+        }
+
+        @Override
+        public void setConfiguration(AccumuloConfiguration conf) {
+          throw new UnsupportedOperationException();
+        }
+
       };
       creds = new SystemCredentials(inst, "!SYSTEM", new PasswordToken("fake"));
     }


[4/6] accumulo git commit: ACCUMULO-3479 Remove some warnings

Posted by ct...@apache.org.
ACCUMULO-3479 Remove some warnings

Remove extra warnings leftover from dropping Hadoop 1 support


Project: http://git-wip-us.apache.org/repos/asf/accumulo/repo
Commit: http://git-wip-us.apache.org/repos/asf/accumulo/commit/f6574fab
Tree: http://git-wip-us.apache.org/repos/asf/accumulo/tree/f6574fab
Diff: http://git-wip-us.apache.org/repos/asf/accumulo/diff/f6574fab

Branch: refs/heads/master
Commit: f6574fab9da4d1d37da91c530353921b7d66d779
Parents: 7719bc5
Author: Christopher Tubbs <ct...@apache.org>
Authored: Thu Jan 15 17:02:35 2015 -0500
Committer: Christopher Tubbs <ct...@apache.org>
Committed: Thu Jan 15 17:41:27 2015 -0500

----------------------------------------------------------------------
 pom.xml                                                        | 4 ++--
 .../java/org/apache/accumulo/master/tableOps/DeleteTable.java  | 2 --
 .../test/java/org/apache/accumulo/test/AccumuloDFSBase.java    | 1 -
 test/pom.xml                                                   | 6 ++++--
 4 files changed, 6 insertions(+), 7 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/accumulo/blob/f6574fab/pom.xml
----------------------------------------------------------------------
diff --git a/pom.xml b/pom.xml
index 8e0f141..be8adb6 100644
--- a/pom.xml
+++ b/pom.xml
@@ -942,8 +942,8 @@
                 <module name="OuterTypeFilename" />
                 <module name="LineLength">
                   <!-- needs extra, because Eclipse formatter ignores the ending left brace -->
-                  <property name="max" value="200"/>
-                  <property name="ignorePattern" value="^package.*|^import.*|a href|href|http://|https://|ftp://"/>
+                  <property name="max" value="200" />
+                  <property name="ignorePattern" value="^package.*|^import.*|a href|href|http://|https://|ftp://" />
                 </module>
                 <module name="AvoidStarImport" />
                 <module name="UnusedImports">

http://git-wip-us.apache.org/repos/asf/accumulo/blob/f6574fab/server/master/src/main/java/org/apache/accumulo/master/tableOps/DeleteTable.java
----------------------------------------------------------------------
diff --git a/server/master/src/main/java/org/apache/accumulo/master/tableOps/DeleteTable.java b/server/master/src/main/java/org/apache/accumulo/master/tableOps/DeleteTable.java
index 4cff0c7..873867b 100644
--- a/server/master/src/main/java/org/apache/accumulo/master/tableOps/DeleteTable.java
+++ b/server/master/src/main/java/org/apache/accumulo/master/tableOps/DeleteTable.java
@@ -268,8 +268,6 @@ class CleanUp extends MasterRepo {
       final String childName = child.getPath().getName();
       final Path childInSrc = new Path(src, childName), childInDest = new Path(dest, childName);
 
-      boolean isFile = fs.isFile(childInSrc), isDir = child.isDir();
-
       if (child.isFile()) {
         if (fs.exists(childInDest)) {
           log.warn("File already exists in archive, ignoring. " + childInDest);

http://git-wip-us.apache.org/repos/asf/accumulo/blob/f6574fab/start/src/test/java/org/apache/accumulo/test/AccumuloDFSBase.java
----------------------------------------------------------------------
diff --git a/start/src/test/java/org/apache/accumulo/test/AccumuloDFSBase.java b/start/src/test/java/org/apache/accumulo/test/AccumuloDFSBase.java
index cbc8beb..3e94291 100644
--- a/start/src/test/java/org/apache/accumulo/test/AccumuloDFSBase.java
+++ b/start/src/test/java/org/apache/accumulo/test/AccumuloDFSBase.java
@@ -35,7 +35,6 @@ import org.apache.hadoop.hdfs.MiniDFSCluster;
 import org.junit.AfterClass;
 import org.junit.BeforeClass;
 
-@SuppressWarnings("deprecation")
 public class AccumuloDFSBase {
 
   protected static Configuration conf = null;

http://git-wip-us.apache.org/repos/asf/accumulo/blob/f6574fab/test/pom.xml
----------------------------------------------------------------------
diff --git a/test/pom.xml b/test/pom.xml
index b58df3c..a0247c6 100644
--- a/test/pom.xml
+++ b/test/pom.xml
@@ -159,8 +159,10 @@
       <groupId>org.apache.hadoop</groupId>
       <artifactId>hadoop-minikdc</artifactId>
       <!-- Specifically depend on this version of minikdc to avoid having
-           to increase out normal hadoop dependency --> 
-      <version>2.3.0</version>
+           to increase out normal hadoop dependency -->
+      <?SORTPOM IGNORE?>
+      <version>2.3.0</version><!--$NO-MVN-MAN-VER$-->
+      <?SORTPOM RESUME?>
       <scope>test</scope>
       <exclusions>
         <!-- Pulls in an older bouncycastle version -->


[5/6] accumulo git commit: ACCUMULO-3416 Clean up TabletOperations

Posted by ct...@apache.org.
ACCUMULO-3416 Clean up TabletOperations

Remove unused method and consolidate remaining method into Tablet


Project: http://git-wip-us.apache.org/repos/asf/accumulo/repo
Commit: http://git-wip-us.apache.org/repos/asf/accumulo/commit/112f5e76
Tree: http://git-wip-us.apache.org/repos/asf/accumulo/tree/112f5e76
Diff: http://git-wip-us.apache.org/repos/asf/accumulo/diff/112f5e76

Branch: refs/heads/master
Commit: 112f5e76aca55b2b883d3433c53da988a697f1ff
Parents: f6574fa
Author: Christopher Tubbs <ct...@apache.org>
Authored: Thu Jan 15 17:06:22 2015 -0500
Committer: Christopher Tubbs <ct...@apache.org>
Committed: Thu Jan 15 17:41:32 2015 -0500

----------------------------------------------------------------------
 .../accumulo/server/util/TabletOperations.java  | 93 --------------------
 .../apache/accumulo/tserver/tablet/Tablet.java  | 42 ++++++++-
 2 files changed, 40 insertions(+), 95 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/accumulo/blob/112f5e76/server/base/src/main/java/org/apache/accumulo/server/util/TabletOperations.java
----------------------------------------------------------------------
diff --git a/server/base/src/main/java/org/apache/accumulo/server/util/TabletOperations.java b/server/base/src/main/java/org/apache/accumulo/server/util/TabletOperations.java
deleted file mode 100644
index b1e2d35..0000000
--- a/server/base/src/main/java/org/apache/accumulo/server/util/TabletOperations.java
+++ /dev/null
@@ -1,93 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.accumulo.server.util;
-
-import java.io.IOException;
-import java.net.UnknownHostException;
-
-import org.apache.accumulo.core.Constants;
-import org.apache.accumulo.core.util.UtilWaitThread;
-import org.apache.accumulo.server.ServerConstants;
-import org.apache.accumulo.server.fs.VolumeManager;
-import org.apache.accumulo.server.fs.VolumeManagerImpl;
-import org.apache.accumulo.server.tablets.UniqueNameAllocator;
-import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.io.Text;
-import org.apache.log4j.Logger;
-
-import com.google.common.base.Optional;
-
-public class TabletOperations {
-
-  private static final Logger log = Logger.getLogger(TabletOperations.class);
-
-  public static String createTabletDirectory(VolumeManager fs, String tableId, Text endRow) {
-    String lowDirectory;
-
-    UniqueNameAllocator namer = UniqueNameAllocator.getInstance();
-    String volume = fs.choose(Optional.of(tableId), ServerConstants.getBaseUris()) + Constants.HDFS_TABLES_DIR + Path.SEPARATOR;
-
-    while (true) {
-      try {
-        if (endRow == null) {
-          lowDirectory = Constants.DEFAULT_TABLET_LOCATION;
-          Path lowDirectoryPath = new Path(volume + "/" + tableId + "/" + lowDirectory);
-          if (fs.exists(lowDirectoryPath) || fs.mkdirs(lowDirectoryPath)) {
-            FileSystem pathFs = fs.getVolumeByPath(lowDirectoryPath).getFileSystem();
-            return lowDirectoryPath.makeQualified(pathFs.getUri(), pathFs.getWorkingDirectory()).toString();
-          }
-          log.warn("Failed to create " + lowDirectoryPath + " for unknown reason");
-        } else {
-          lowDirectory = "/" + Constants.GENERATED_TABLET_DIRECTORY_PREFIX + namer.getNextName();
-          Path lowDirectoryPath = new Path(volume + "/" + tableId + "/" + lowDirectory);
-          if (fs.exists(lowDirectoryPath))
-            throw new IllegalStateException("Dir exist when it should not " + lowDirectoryPath);
-          if (fs.mkdirs(lowDirectoryPath)) {
-            FileSystem lowDirectoryFs = fs.getVolumeByPath(lowDirectoryPath).getFileSystem();
-            return lowDirectoryPath.makeQualified(lowDirectoryFs.getUri(), lowDirectoryFs.getWorkingDirectory()).toString();
-          }
-        }
-      } catch (IOException e) {
-        log.warn(e);
-      }
-
-      log.warn("Failed to create dir for tablet in table " + tableId + " in volume " + volume + " + will retry ...");
-      UtilWaitThread.sleep(3000);
-
-    }
-  }
-
-  public static String createTabletDirectory(String tableDir, Text endRow) {
-    while (true) {
-      try {
-        VolumeManager fs = VolumeManagerImpl.get();
-        return createTabletDirectory(fs, tableDir, endRow);
-      } catch (IOException e) {
-        log.warn("problem creating tablet directory", e);
-      } catch (IllegalArgumentException exception) {
-        /* thrown in some edge cases of DNS failure by Hadoop instead of UnknownHostException */
-        if (exception.getCause() instanceof UnknownHostException) {
-          log.warn("problem creating tablet directory", exception);
-        } else {
-          throw exception;
-        }
-      }
-      UtilWaitThread.sleep(3000);
-    }
-  }
-}

http://git-wip-us.apache.org/repos/asf/accumulo/blob/112f5e76/server/tserver/src/main/java/org/apache/accumulo/tserver/tablet/Tablet.java
----------------------------------------------------------------------
diff --git a/server/tserver/src/main/java/org/apache/accumulo/tserver/tablet/Tablet.java b/server/tserver/src/main/java/org/apache/accumulo/tserver/tablet/Tablet.java
index af8334b..a73356d 100644
--- a/server/tserver/src/main/java/org/apache/accumulo/tserver/tablet/Tablet.java
+++ b/server/tserver/src/main/java/org/apache/accumulo/tserver/tablet/Tablet.java
@@ -90,6 +90,7 @@ import org.apache.accumulo.core.util.LocalityGroupUtil;
 import org.apache.accumulo.core.util.Pair;
 import org.apache.accumulo.core.util.UtilWaitThread;
 import org.apache.accumulo.server.AccumuloServerContext;
+import org.apache.accumulo.server.ServerConstants;
 import org.apache.accumulo.server.conf.TableConfiguration;
 import org.apache.accumulo.server.fs.FileRef;
 import org.apache.accumulo.server.fs.VolumeManager;
@@ -108,7 +109,6 @@ import org.apache.accumulo.server.util.FileUtil;
 import org.apache.accumulo.server.util.MasterMetadataUtil;
 import org.apache.accumulo.server.util.MetadataTableUtil;
 import org.apache.accumulo.server.util.ReplicationTableUtil;
-import org.apache.accumulo.server.util.TabletOperations;
 import org.apache.accumulo.server.zookeeper.ZooReaderWriter;
 import org.apache.accumulo.start.classloader.vfs.AccumuloVFSClassLoader;
 import org.apache.accumulo.tserver.InMemoryMap;
@@ -145,6 +145,7 @@ import org.apache.zookeeper.KeeperException;
 import org.apache.zookeeper.KeeperException.NoNodeException;
 
 import com.google.common.annotations.VisibleForTesting;
+import com.google.common.base.Optional;
 
 /**
  *
@@ -2236,7 +2237,7 @@ public class Tablet implements TabletCommitter {
       KeyExtent low = new KeyExtent(extent.getTableId(), midRow, extent.getPrevEndRow());
       KeyExtent high = new KeyExtent(extent.getTableId(), extent.getEndRow(), midRow);
 
-      String lowDirectory = TabletOperations.createTabletDirectory(getTabletServer().getFileSystem(), extent.getTableId().toString(), midRow);
+      String lowDirectory = createTabletDirectory(getTabletServer().getFileSystem(), extent.getTableId().toString(), midRow);
 
       // write new tablet information to MetadataTable
       SortedMap<FileRef,DataFileValue> lowDatafileSizes = new TreeMap<FileRef,DataFileValue>();
@@ -2722,4 +2723,41 @@ public class Tablet implements TabletCommitter {
   public AtomicLong getScannedCounter() {
     return scannedCount;
   }
+
+  private static String createTabletDirectory(VolumeManager fs, String tableId, Text endRow) {
+    String lowDirectory;
+
+    UniqueNameAllocator namer = UniqueNameAllocator.getInstance();
+    String volume = fs.choose(Optional.of(tableId), ServerConstants.getBaseUris()) + Constants.HDFS_TABLES_DIR + Path.SEPARATOR;
+
+    while (true) {
+      try {
+        if (endRow == null) {
+          lowDirectory = Constants.DEFAULT_TABLET_LOCATION;
+          Path lowDirectoryPath = new Path(volume + "/" + tableId + "/" + lowDirectory);
+          if (fs.exists(lowDirectoryPath) || fs.mkdirs(lowDirectoryPath)) {
+            FileSystem pathFs = fs.getVolumeByPath(lowDirectoryPath).getFileSystem();
+            return lowDirectoryPath.makeQualified(pathFs.getUri(), pathFs.getWorkingDirectory()).toString();
+          }
+          log.warn("Failed to create " + lowDirectoryPath + " for unknown reason");
+        } else {
+          lowDirectory = "/" + Constants.GENERATED_TABLET_DIRECTORY_PREFIX + namer.getNextName();
+          Path lowDirectoryPath = new Path(volume + "/" + tableId + "/" + lowDirectory);
+          if (fs.exists(lowDirectoryPath))
+            throw new IllegalStateException("Dir exist when it should not " + lowDirectoryPath);
+          if (fs.mkdirs(lowDirectoryPath)) {
+            FileSystem lowDirectoryFs = fs.getVolumeByPath(lowDirectoryPath).getFileSystem();
+            return lowDirectoryPath.makeQualified(lowDirectoryFs.getUri(), lowDirectoryFs.getWorkingDirectory()).toString();
+          }
+        }
+      } catch (IOException e) {
+        log.warn(e);
+      }
+
+      log.warn("Failed to create dir for tablet in table " + tableId + " in volume " + volume + " + will retry ...");
+      UtilWaitThread.sleep(3000);
+
+    }
+  }
+
 }


[3/6] accumulo git commit: Merge branch '1.6'

Posted by ct...@apache.org.
Merge branch '1.6'

Conflicts:
	server/base/src/main/java/org/apache/accumulo/server/util/TServerUtils.java


Project: http://git-wip-us.apache.org/repos/asf/accumulo/repo
Commit: http://git-wip-us.apache.org/repos/asf/accumulo/commit/7719bc5f
Tree: http://git-wip-us.apache.org/repos/asf/accumulo/tree/7719bc5f
Diff: http://git-wip-us.apache.org/repos/asf/accumulo/diff/7719bc5f

Branch: refs/heads/master
Commit: 7719bc5fd0d1d98adae293649214a1414f8bc0b5
Parents: c032ec4 e49a97a
Author: Christopher Tubbs <ct...@apache.org>
Authored: Thu Jan 15 17:41:10 2015 -0500
Committer: Christopher Tubbs <ct...@apache.org>
Committed: Thu Jan 15 17:41:10 2015 -0500

----------------------------------------------------------------------
 .../main/java/org/apache/accumulo/server/rpc/TServerUtils.java | 6 ++----
 1 file changed, 2 insertions(+), 4 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/accumulo/blob/7719bc5f/server/base/src/main/java/org/apache/accumulo/server/rpc/TServerUtils.java
----------------------------------------------------------------------
diff --cc server/base/src/main/java/org/apache/accumulo/server/rpc/TServerUtils.java
index 985df9c,0000000..ece46a2
mode 100644,000000..100644
--- a/server/base/src/main/java/org/apache/accumulo/server/rpc/TServerUtils.java
+++ b/server/base/src/main/java/org/apache/accumulo/server/rpc/TServerUtils.java
@@@ -1,509 -1,0 +1,507 @@@
 +/*
 + * Licensed to the Apache Software Foundation (ASF) under one or more
 + * contributor license agreements.  See the NOTICE file distributed with
 + * this work for additional information regarding copyright ownership.
 + * The ASF licenses this file to You under the Apache License, Version 2.0
 + * (the "License"); you may not use this file except in compliance with
 + * the License.  You may obtain a copy of the License at
 + *
 + *     http://www.apache.org/licenses/LICENSE-2.0
 + *
 + * Unless required by applicable law or agreed to in writing, software
 + * distributed under the License is distributed on an "AS IS" BASIS,
 + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 + * See the License for the specific language governing permissions and
 + * limitations under the License.
 + */
 +package org.apache.accumulo.server.rpc;
 +
 +import java.io.IOException;
 +import java.lang.reflect.Field;
 +import java.net.BindException;
 +import java.net.InetAddress;
 +import java.net.InetSocketAddress;
 +import java.net.ServerSocket;
 +import java.net.UnknownHostException;
 +import java.util.Arrays;
 +import java.util.HashSet;
 +import java.util.Random;
 +import java.util.Set;
 +import java.util.concurrent.ExecutorService;
 +import java.util.concurrent.ThreadPoolExecutor;
 +
 +import javax.net.ssl.SSLServerSocket;
 +
 +import org.apache.accumulo.core.conf.AccumuloConfiguration;
 +import org.apache.accumulo.core.conf.Property;
 +import org.apache.accumulo.core.rpc.SaslConnectionParams;
 +import org.apache.accumulo.core.rpc.SslConnectionParams;
 +import org.apache.accumulo.core.rpc.ThriftUtil;
 +import org.apache.accumulo.core.rpc.UGIAssumingTransportFactory;
 +import org.apache.accumulo.core.util.Daemon;
 +import org.apache.accumulo.core.util.LoggingRunnable;
 +import org.apache.accumulo.core.util.SimpleThreadPool;
 +import org.apache.accumulo.core.util.UtilWaitThread;
 +import org.apache.accumulo.server.AccumuloServerContext;
 +import org.apache.accumulo.server.thrift.UGIAssumingProcessor;
 +import org.apache.accumulo.server.util.Halt;
 +import org.apache.accumulo.server.util.time.SimpleTimer;
 +import org.apache.hadoop.security.SaslRpcServer;
 +import org.apache.hadoop.security.UserGroupInformation;
 +import org.apache.thrift.TProcessor;
 +import org.apache.thrift.TProcessorFactory;
 +import org.apache.thrift.server.TServer;
 +import org.apache.thrift.server.TThreadPoolServer;
 +import org.apache.thrift.transport.TSSLTransportFactory;
 +import org.apache.thrift.transport.TSaslServerTransport;
 +import org.apache.thrift.transport.TServerSocket;
 +import org.apache.thrift.transport.TServerTransport;
 +import org.apache.thrift.transport.TTransportException;
 +import org.apache.thrift.transport.TTransportFactory;
 +import org.slf4j.Logger;
 +import org.slf4j.LoggerFactory;
 +
 +import com.google.common.base.Preconditions;
 +import com.google.common.net.HostAndPort;
 +
 +/**
 + * Factory methods for creating Thrift server objects
 + */
 +public class TServerUtils {
 +  private static final Logger log = LoggerFactory.getLogger(TServerUtils.class);
 +
 +  /**
 +   * Static instance, passed to {@link ClientInfoProcessorFactory}, which will contain the client address of any incoming RPC.
 +   */
 +  public static final ThreadLocal<String> clientAddress = new ThreadLocal<String>();
 +
 +  /**
 +   * Start a server, at the given port, or higher, if that port is not available.
 +   *
 +   * @param service
 +   *          RPC configuration
 +   * @param portHintProperty
 +   *          the port to attempt to open, can be zero, meaning "any available port"
 +   * @param processor
 +   *          the service to be started
 +   * @param serverName
 +   *          the name of the class that is providing the service
 +   * @param threadName
 +   *          name this service's thread for better debugging
 +   * @param portSearchProperty
 +   *          A boolean Property to control if port-search should be used, or null to disable
 +   * @param minThreadProperty
 +   *          A Property to control the minimum number of threads in the pool
 +   * @param timeBetweenThreadChecksProperty
 +   *          A Property to control the amount of time between checks to resize the thread pool
 +   * @param maxMessageSizeProperty
 +   *          A Property to control the maximum Thrift message size accepted
 +   * @return the server object created, and the port actually used
 +   * @throws UnknownHostException
 +   *           when we don't know our own address
 +   */
 +  public static ServerAddress startServer(AccumuloServerContext service, String hostname, Property portHintProperty, TProcessor processor, String serverName,
 +      String threadName, Property portSearchProperty, Property minThreadProperty, Property timeBetweenThreadChecksProperty, Property maxMessageSizeProperty)
 +      throws UnknownHostException {
 +    final AccumuloConfiguration config = service.getConfiguration();
 +
 +    final int portHint = config.getPort(portHintProperty);
 +
 +    int minThreads = 2;
 +    if (minThreadProperty != null)
 +      minThreads = config.getCount(minThreadProperty);
 +
 +    long timeBetweenThreadChecks = 1000;
 +    if (timeBetweenThreadChecksProperty != null)
 +      timeBetweenThreadChecks = config.getTimeInMillis(timeBetweenThreadChecksProperty);
 +
 +    long maxMessageSize = 10 * 1000 * 1000;
 +    if (maxMessageSizeProperty != null)
 +      maxMessageSize = config.getMemoryInBytes(maxMessageSizeProperty);
 +
 +    boolean portSearch = false;
 +    if (portSearchProperty != null)
 +      portSearch = config.getBoolean(portSearchProperty);
 +
 +    final int simpleTimerThreadpoolSize = config.getCount(Property.GENERAL_SIMPLETIMER_THREADPOOL_SIZE);
 +    final ThriftServerType serverType = service.getThriftServerType();
 +
 +    if (ThriftServerType.SASL == serverType) {
 +      processor = updateSaslProcessor(serverType, processor);
 +    }
 +
 +    // create the TimedProcessor outside the port search loop so we don't try to register the same metrics mbean more than once
 +    TimedProcessor timedProcessor = new TimedProcessor(config, processor, serverName, threadName);
 +
 +    Random random = new Random();
 +    for (int j = 0; j < 100; j++) {
 +
 +      // Are we going to slide around, looking for an open port?
 +      int portsToSearch = 1;
 +      if (portSearch)
 +        portsToSearch = 1000;
 +
 +      for (int i = 0; i < portsToSearch; i++) {
 +        int port = portHint + i;
 +        if (portHint != 0 && i > 0)
 +          port = 1024 + random.nextInt(65535 - 1024);
 +        if (port > 65535)
 +          port = 1024 + port % (65535 - 1024);
 +        try {
 +          HostAndPort addr = HostAndPort.fromParts(hostname, port);
 +          return TServerUtils.startTServer(addr, serverType, timedProcessor, serverName, threadName, minThreads,
 +              simpleTimerThreadpoolSize, timeBetweenThreadChecks, maxMessageSize,
 +              service.getServerSslParams(), service.getServerSaslParams(), service.getClientTimeoutInMillis());
 +        } catch (TTransportException ex) {
 +          log.error("Unable to start TServer", ex);
 +          if (ex.getCause() == null || ex.getCause().getClass() == BindException.class) {
 +            // Note: with a TNonblockingServerSocket a "port taken" exception is a cause-less
 +            // TTransportException, and with a TSocket created by TSSLTransportFactory, it
 +            // comes through as caused by a BindException.
 +            log.info("Unable to use port {}, retrying. (Thread Name = {})", port, threadName);
 +            UtilWaitThread.sleep(250);
 +          } else {
 +            // thrift is passing up a nested exception that isn't a BindException,
 +            // so no reason to believe retrying on a different port would help.
 +            log.error("Unable to start TServer", ex);
 +            break;
 +          }
 +        }
 +      }
 +    }
 +    throw new UnknownHostException("Unable to find a listen port");
 +  }
 +
 +  /**
 +   * Create a NonBlockingServer with a custom thread pool that can dynamically resize itself.
 +   */
 +  public static ServerAddress createNonBlockingServer(HostAndPort address, TProcessor processor, final String serverName, String threadName,
 +      final int numThreads, final int numSTThreads, long timeBetweenThreadChecks, long maxMessageSize) throws TTransportException {
 +
 +    final TNonblockingServerSocket transport = new TNonblockingServerSocket(new InetSocketAddress(address.getHostText(), address.getPort()));
 +    final CustomNonBlockingServer.Args options = new CustomNonBlockingServer.Args(transport);
 +
 +    options.protocolFactory(ThriftUtil.protocolFactory());
 +    options.transportFactory(ThriftUtil.transportFactory(maxMessageSize));
 +    options.maxReadBufferBytes = maxMessageSize;
 +    options.stopTimeoutVal(5);
 +
 +    // Create our own very special thread pool.
 +    final ThreadPoolExecutor pool = new SimpleThreadPool(numThreads, "ClientPool");
 +    // periodically adjust the number of threads we need by checking how busy our threads are
 +    SimpleTimer.getInstance(numSTThreads).schedule(new Runnable() {
 +      @Override
 +      public void run() {
++        // there is a minor race condition between sampling the current state of the thread pool and adjusting it
++        // however, this isn't really an issue, since it adjusts periodically anyway
 +        if (pool.getCorePoolSize() <= pool.getActiveCount()) {
 +          int larger = pool.getCorePoolSize() + Math.min(pool.getQueue().size(), 2);
 +          log.info("Increasing server thread pool size on {} to {}", serverName, larger);
 +          pool.setMaximumPoolSize(larger);
 +          pool.setCorePoolSize(larger);
 +        } else {
 +          if (pool.getCorePoolSize() > pool.getActiveCount() + 3) {
 +            int smaller = Math.max(numThreads, pool.getCorePoolSize() - 1);
 +            if (smaller != pool.getCorePoolSize()) {
-               // ACCUMULO-2997 there is a race condition here... the active count could be higher by the time
-               // we decrease the core pool size... so the active count could end up higher than
-               // the core pool size, in which case everything will be queued... the increase case
-               // should handle this and prevent deadlock
 +              log.info("Decreasing server thread pool size on {} to {}", serverName, smaller);
 +              pool.setCorePoolSize(smaller);
 +            }
 +          }
 +        }
 +      }
 +    }, timeBetweenThreadChecks, timeBetweenThreadChecks);
 +
 +    options.executorService(pool);
 +    options.processorFactory(new TProcessorFactory(processor));
 +
 +    if (address.getPort() == 0) {
 +      address = HostAndPort.fromParts(address.getHostText(), transport.getPort());
 +    }
 +
 +    return new ServerAddress(new CustomNonBlockingServer(options), address);
 +  }
 +
 +  /**
 +   * Creates a TTheadPoolServer for normal unsecure operation. Useful for comparing performance against SSL or SASL transports.
 +   *
 +   * @param address
 +   *          Address to bind to
 +   * @param processor
 +   *          TProcessor for the server
 +   * @param maxMessageSize
 +   *          Maximum size of a Thrift message allowed
 +   * @return A configured TThreadPoolServer and its bound address information
 +   */
 +  public static ServerAddress createBlockingServer(HostAndPort address, TProcessor processor, long maxMessageSize) throws TTransportException {
 +
 +    TServerSocket transport = new TServerSocket(address.getPort());
 +    TThreadPoolServer server = createThreadPoolServer(transport, processor, ThriftUtil.transportFactory(maxMessageSize));
 +
 +    if (address.getPort() == 0) {
 +      address = HostAndPort.fromParts(address.getHostText(), transport.getServerSocket().getLocalPort());
 +    }
 +
 +    return new ServerAddress(server, address);
 +
 +  }
 +
 +  /**
 +   * Create a TThreadPoolServer with the given transport and processo with the default transport factory.r
 +   *
 +   * @param transport
 +   *          TServerTransport for the server
 +   * @param processor
 +   *          TProcessor for the server
 +   * @return A configured TThreadPoolServer
 +   */
 +  public static TThreadPoolServer createThreadPoolServer(TServerTransport transport, TProcessor processor) {
 +    return createThreadPoolServer(transport, processor, ThriftUtil.transportFactory());
 +  }
 +
 +  /**
 +   * Create a TServer with the provided server transport, processor and transport factory.
 +   *
 +   * @param transport
 +   *          TServerTransport for the server
 +   * @param processor
 +   *          TProcessor for the server
 +   * @param transportFactory
 +   *          TTransportFactory for the server
 +   */
 +  public static TThreadPoolServer createThreadPoolServer(TServerTransport transport, TProcessor processor, TTransportFactory transportFactory) {
 +    TThreadPoolServer.Args options = new TThreadPoolServer.Args(transport);
 +    options.protocolFactory(ThriftUtil.protocolFactory());
 +    options.transportFactory(transportFactory);
 +    options.processorFactory(new ClientInfoProcessorFactory(clientAddress, processor));
 +    return new TThreadPoolServer(options);
 +  }
 +
 +  /**
 +   * Create the Thrift server socket for RPC running over SSL.
 +   *
 +   * @param port
 +   *          Port of the server socket to bind to
 +   * @param timeout
 +   *          Socket timeout
 +   * @param address
 +   *          Address to bind the socket to
 +   * @param params
 +   *          SSL parameters
 +   * @return A configured TServerSocket configured to use SSL
 +   */
 +  public static TServerSocket getSslServerSocket(int port, int timeout, InetAddress address, SslConnectionParams params) throws TTransportException {
 +    TServerSocket tServerSock;
 +    if (params.useJsse()) {
 +      tServerSock = TSSLTransportFactory.getServerSocket(port, timeout, params.isClientAuth(), address);
 +    } else {
 +      tServerSock = TSSLTransportFactory.getServerSocket(port, timeout, address, params.getTTransportParams());
 +    }
 +
 +    final ServerSocket serverSock = tServerSock.getServerSocket();
 +    if (serverSock instanceof SSLServerSocket) {
 +      SSLServerSocket sslServerSock = (SSLServerSocket) serverSock;
 +      String[] protocols = params.getServerProtocols();
 +
 +      // Be nice for the user and automatically remove protocols that might not exist in their JVM. Keeps us from forcing config alterations too
 +      // e.g. TLSv1.1 and TLSv1.2 don't exist in JDK6
 +      Set<String> socketEnabledProtocols = new HashSet<String>(Arrays.asList(sslServerSock.getEnabledProtocols()));
 +      // Keep only the enabled protocols that were specified by the configuration
 +      socketEnabledProtocols.retainAll(Arrays.asList(protocols));
 +      if (socketEnabledProtocols.isEmpty()) {
 +        // Bad configuration...
 +        throw new RuntimeException("No available protocols available for secure socket. Availaable protocols: "
 +            + Arrays.toString(sslServerSock.getEnabledProtocols()) + ", allowed protocols: " + Arrays.toString(protocols));
 +      }
 +
 +      // Set the protocol(s) on the server socket
 +      sslServerSock.setEnabledProtocols(socketEnabledProtocols.toArray(new String[0]));
 +    }
 +
 +    return tServerSock;
 +  }
 +
 +  /**
 +   * Create a Thrift SSL server.
 +   *
 +   * @param address
 +   *          host and port to bind to
 +   * @param processor
 +   *          TProcessor for the server
 +   * @param socketTimeout
 +   *          Socket timeout
 +   * @param sslParams
 +   *          SSL parameters
 +   * @return A ServerAddress with the bound-socket information and the Thrift server
 +   */
 +  public static ServerAddress createSslThreadPoolServer(HostAndPort address, TProcessor processor, long socketTimeout, SslConnectionParams sslParams)
 +      throws TTransportException {
 +    TServerSocket transport;
 +    try {
 +      transport = getSslServerSocket(address.getPort(), (int) socketTimeout, InetAddress.getByName(address.getHostText()), sslParams);
 +    } catch (UnknownHostException e) {
 +      throw new TTransportException(e);
 +    }
 +    if (address.getPort() == 0) {
 +      address = HostAndPort.fromParts(address.getHostText(), transport.getServerSocket().getLocalPort());
 +    }
 +    return new ServerAddress(createThreadPoolServer(transport, processor), address);
 +  }
 +
 +  public static ServerAddress createSaslThreadPoolServer(HostAndPort address, TProcessor processor, long socketTimeout, SaslConnectionParams params,
 +      final String serverName, String threadName, final int numThreads, final int numSTThreads, long timeBetweenThreadChecks, long maxMessageSize)
 +      throws TTransportException {
 +    // We'd really prefer to use THsHaServer (or similar) to avoid 1 RPC == 1 Thread that the TThreadPoolServer does,
 +    // but sadly this isn't the case. Because TSaslTransport needs to issue a handshake when it open()'s which will fail
 +    // when the server does an accept() to (presumably) wake up the eventing system.
 +    log.info("Creating SASL thread pool thrift server on port=" + address.getPort());
 +    TServerSocket transport = new TServerSocket(address.getPort());
 +
 +    final String hostname;
 +    try {
 +      hostname = InetAddress.getByName(address.getHostText()).getCanonicalHostName();
 +    } catch (UnknownHostException e) {
 +      throw new TTransportException(e);
 +    }
 +
 +    final UserGroupInformation serverUser;
 +    try {
 +      serverUser = UserGroupInformation.getLoginUser();
 +    } catch (IOException e) {
 +      throw new TTransportException(e);
 +    }
 +
 +    log.trace("Logged in as {}, creating TSsaslServerTransport factory as {}/{}", serverUser, params.getKerberosServerPrimary(), hostname);
 +
 +    // Make the SASL transport factory with the instance and primary from the kerberos server principal, SASL properties
 +    // and the SASL callback handler from Hadoop to ensure authorization ID is the authentication ID. Despite the 'protocol' argument seeming to be useless, it
 +    // *must* be the primary of the server.
 +    TSaslServerTransport.Factory saslTransportFactory = new TSaslServerTransport.Factory();
 +    saslTransportFactory.addServerDefinition(ThriftUtil.GSSAPI, params.getKerberosServerPrimary(), hostname, params.getSaslProperties(),
 +        new SaslRpcServer.SaslGssCallbackHandler());
 +
 +    // Updates the clientAddress threadlocal so we know who the client's address
 +    final ClientInfoProcessorFactory clientInfoFactory = new ClientInfoProcessorFactory(clientAddress, processor);
 +
 +    // Make sure the TTransportFactory is performing a UGI.doAs
 +    TTransportFactory ugiTransportFactory = new UGIAssumingTransportFactory(saslTransportFactory, serverUser);
 +
 +    if (address.getPort() == 0) {
 +      address = HostAndPort.fromParts(address.getHostText(), transport.getServerSocket().getLocalPort());
 +    }
 +
 +    return new ServerAddress(new TThreadPoolServer(new TThreadPoolServer.Args(transport).transportFactory(ugiTransportFactory)
 +        .processorFactory(clientInfoFactory)
 +        .protocolFactory(ThriftUtil.protocolFactory())), address);
 +  }
 +
 +  public static ServerAddress startTServer(AccumuloConfiguration conf, HostAndPort address, ThriftServerType serverType, TProcessor processor,
 +      String serverName, String threadName, int numThreads, int numSTThreads, long timeBetweenThreadChecks, long maxMessageSize, SslConnectionParams sslParams,
 +      SaslConnectionParams saslParams, long serverSocketTimeout) throws TTransportException {
 +
 +    if (ThriftServerType.SASL == serverType) {
 +      processor = updateSaslProcessor(serverType, processor);
 +    }
 +
 +    return startTServer(address, serverType, new TimedProcessor(conf, processor, serverName, threadName), serverName, threadName, numThreads, numSTThreads,
 +        timeBetweenThreadChecks, maxMessageSize, sslParams, saslParams, serverSocketTimeout);
 +  }
 +
 +  /**
 +   * Start the appropriate Thrift server (SSL or non-blocking server) for the given parameters. Non-null SSL parameters will cause an SSL server to be started.
 +   *
 +   * @return A ServerAddress encapsulating the Thrift server created and the host/port which it is bound to.
 +   */
 +  public static ServerAddress startTServer(HostAndPort address,ThriftServerType serverType, TimedProcessor processor, String serverName, String threadName, int numThreads,
 +      int numSTThreads, long timeBetweenThreadChecks, long maxMessageSize,  SslConnectionParams sslParams,
 +      SaslConnectionParams saslParams, long serverSocketTimeout) throws TTransportException {
 +
 +    // This is presently not supported. It's hypothetically possible, I believe, to work, but it would require changes in how the transports
 +    // work at the Thrift layer to ensure that both the SSL and SASL handshakes function. SASL's quality of protection addresses privacy issues.
 +    Preconditions.checkArgument(!(sslParams != null && saslParams != null), "Cannot start a Thrift server using both SSL and SASL");
 +
 +    ServerAddress serverAddress;
 +    switch (serverType) {
 +      case SSL:
 +        log.debug("Instantiating SSL Thrift server");
 +        serverAddress = createSslThreadPoolServer(address, processor, serverSocketTimeout, sslParams);
 +        break;
 +      case SASL:
 +        log.debug("Instantiating SASL Thrift server");
 +        serverAddress = createSaslThreadPoolServer(address, processor, serverSocketTimeout, saslParams, serverName, threadName, numThreads, numSTThreads,
 +            timeBetweenThreadChecks, maxMessageSize);
 +        break;
 +      case THREADPOOL:
 +        log.debug("Instantiating unsecure TThreadPool Thrift server");
 +        serverAddress = createBlockingServer(address, processor, maxMessageSize);
 +        break;
 +      case CUSTOM_HS_HA: // Intentional passthrough -- Our custom wrapper around HsHa is the default
 +      default:
 +        log.debug("Instantiating default, unsecure custom half-async Thrift server");
 +        serverAddress = createNonBlockingServer(address, processor, serverName, threadName, numThreads, numSTThreads, timeBetweenThreadChecks, maxMessageSize);
 +    }
 +
 +    final TServer finalServer = serverAddress.server;
 +    Runnable serveTask = new Runnable() {
 +      @Override
 +      public void run() {
 +        try {
 +          finalServer.serve();
 +        } catch (Error e) {
 +          Halt.halt("Unexpected error in TThreadPoolServer " + e + ", halting.");
 +        }
 +      }
 +    };
 +
 +    serveTask = new LoggingRunnable(TServerUtils.log, serveTask);
 +    Thread thread = new Daemon(serveTask, threadName);
 +    thread.start();
 +
 +    // check for the special "bind to everything address"
 +    if (serverAddress.address.getHostText().equals("0.0.0.0")) {
 +      // can't get the address from the bind, so we'll do our best to invent our hostname
 +      try {
 +        serverAddress = new ServerAddress(finalServer, HostAndPort.fromParts(InetAddress.getLocalHost().getHostName(), serverAddress.address.getPort()));
 +      } catch (UnknownHostException e) {
 +        throw new TTransportException(e);
 +      }
 +    }
 +    return serverAddress;
 +  }
 +
 +  /**
 +   * Stop a Thrift TServer. Existing connections will keep our thread running; use reflection to forcibly shut down the threadpool.
 +   *
 +   * @param s
 +   *          The TServer to stop
 +   */
 +  public static void stopTServer(TServer s) {
 +    if (s == null)
 +      return;
 +    s.stop();
 +    try {
 +      Field f = s.getClass().getDeclaredField("executorService_");
 +      f.setAccessible(true);
 +      ExecutorService es = (ExecutorService) f.get(s);
 +      es.shutdownNow();
 +    } catch (Exception e) {
 +      log.error("Unable to call shutdownNow", e);
 +    }
 +  }
 +
 +  /**
 +   * Wrap the provided processor in the {@link UGIAssumingProcessor} so Kerberos authentication works. Requires the <code>serverType</code> to be
 +   * {@link ThriftServerType#SASL} and throws an exception when it is not.
 +   *
 +   * @return A {@link UGIAssumingProcessor} which wraps the provided processor
 +   */
 +  private static TProcessor updateSaslProcessor(ThriftServerType serverType, TProcessor processor) {
 +    Preconditions.checkArgument(ThriftServerType.SASL == serverType);
 +
 +    // Wrap the provided processor in our special processor which proxies the provided UGI on the logged-in UGI
 +    // Important that we have Timed -> UGIAssuming -> [provided] to make sure that the metrics are still reported
 +    // as the logged-in user.
 +    log.info("Wrapping {} in UGIAssumingProcessor", processor.getClass());
 +
 +    return new UGIAssumingProcessor(processor);
 +  }
 +}


[2/6] accumulo git commit: Merge branch '1.5' into 1.6

Posted by ct...@apache.org.
Merge branch '1.5' into 1.6


Project: http://git-wip-us.apache.org/repos/asf/accumulo/repo
Commit: http://git-wip-us.apache.org/repos/asf/accumulo/commit/e49a97ad
Tree: http://git-wip-us.apache.org/repos/asf/accumulo/tree/e49a97ad
Diff: http://git-wip-us.apache.org/repos/asf/accumulo/diff/e49a97ad

Branch: refs/heads/master
Commit: e49a97ad7eb0cfb115101d64575984715df1520e
Parents: 65a8c1a 2016ab9
Author: Christopher Tubbs <ct...@apache.org>
Authored: Thu Jan 15 16:09:26 2015 -0500
Committer: Christopher Tubbs <ct...@apache.org>
Committed: Thu Jan 15 16:09:26 2015 -0500

----------------------------------------------------------------------
 .../java/org/apache/accumulo/server/util/TServerUtils.java     | 6 ++----
 1 file changed, 2 insertions(+), 4 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/accumulo/blob/e49a97ad/server/base/src/main/java/org/apache/accumulo/server/util/TServerUtils.java
----------------------------------------------------------------------
diff --cc server/base/src/main/java/org/apache/accumulo/server/util/TServerUtils.java
index 11adfd2,0000000..d932b2a
mode 100644,000000..100644
--- a/server/base/src/main/java/org/apache/accumulo/server/util/TServerUtils.java
+++ b/server/base/src/main/java/org/apache/accumulo/server/util/TServerUtils.java
@@@ -1,337 -1,0 +1,335 @@@
 +/*
 + * Licensed to the Apache Software Foundation (ASF) under one or more
 + * contributor license agreements.  See the NOTICE file distributed with
 + * this work for additional information regarding copyright ownership.
 + * The ASF licenses this file to You under the Apache License, Version 2.0
 + * (the "License"); you may not use this file except in compliance with
 + * the License.  You may obtain a copy of the License at
 + *
 + *     http://www.apache.org/licenses/LICENSE-2.0
 + *
 + * Unless required by applicable law or agreed to in writing, software
 + * distributed under the License is distributed on an "AS IS" BASIS,
 + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 + * See the License for the specific language governing permissions and
 + * limitations under the License.
 + */
 +package org.apache.accumulo.server.util;
 +
 +import java.io.IOException;
 +import java.lang.reflect.Field;
 +import java.net.BindException;
 +import java.net.InetAddress;
 +import java.net.InetSocketAddress;
 +import java.net.ServerSocket;
 +import java.net.UnknownHostException;
 +import java.nio.channels.ServerSocketChannel;
 +import java.util.Random;
 +import java.util.concurrent.ExecutorService;
 +import java.util.concurrent.ThreadPoolExecutor;
 +
 +import org.apache.accumulo.core.conf.AccumuloConfiguration;
 +import org.apache.accumulo.core.conf.Property;
 +import org.apache.accumulo.core.util.Daemon;
 +import org.apache.accumulo.core.util.LoggingRunnable;
 +import org.apache.accumulo.core.util.SimpleThreadPool;
 +import org.apache.accumulo.core.util.SslConnectionParams;
 +import org.apache.accumulo.core.util.TBufferedSocket;
 +import org.apache.accumulo.core.util.ThriftUtil;
 +import org.apache.accumulo.core.util.UtilWaitThread;
 +import org.apache.accumulo.server.metrics.ThriftMetrics;
 +import org.apache.accumulo.server.util.time.SimpleTimer;
 +import org.apache.log4j.Logger;
 +import org.apache.thrift.TException;
 +import org.apache.thrift.TProcessor;
 +import org.apache.thrift.TProcessorFactory;
 +import org.apache.thrift.protocol.TProtocol;
 +import org.apache.thrift.server.TServer;
 +import org.apache.thrift.server.TThreadPoolServer;
 +import org.apache.thrift.transport.TServerTransport;
 +import org.apache.thrift.transport.TSocket;
 +import org.apache.thrift.transport.TTransport;
 +import org.apache.thrift.transport.TTransportException;
 +
 +import com.google.common.net.HostAndPort;
 +
 +public class TServerUtils {
 +  private static final Logger log = Logger.getLogger(TServerUtils.class);
 +
 +  public static final ThreadLocal<String> clientAddress = new ThreadLocal<String>();
 +
 +  public static class ServerAddress {
 +    public final TServer server;
 +    public final HostAndPort address;
 +
 +    public ServerAddress(TServer server, HostAndPort address) {
 +      this.server = server;
 +      this.address = address;
 +    }
 +  }
 +
 +  /**
 +   * Start a server, at the given port, or higher, if that port is not available.
 +   *
 +   * @param portHintProperty
 +   *          the port to attempt to open, can be zero, meaning "any available port"
 +   * @param processor
 +   *          the service to be started
 +   * @param serverName
 +   *          the name of the class that is providing the service
 +   * @param threadName
 +   *          name this service's thread for better debugging
 +   * @return the server object created, and the port actually used
 +   * @throws UnknownHostException
 +   *           when we don't know our own address
 +   */
 +  public static ServerAddress startServer(AccumuloConfiguration conf, String address, Property portHintProperty, TProcessor processor, String serverName,
 +      String threadName, Property portSearchProperty, Property minThreadProperty, Property timeBetweenThreadChecksProperty, Property maxMessageSizeProperty)
 +      throws UnknownHostException {
 +    int portHint = conf.getPort(portHintProperty);
 +    int minThreads = 2;
 +    if (minThreadProperty != null)
 +      minThreads = conf.getCount(minThreadProperty);
 +    long timeBetweenThreadChecks = 1000;
 +    if (timeBetweenThreadChecksProperty != null)
 +      timeBetweenThreadChecks = conf.getTimeInMillis(timeBetweenThreadChecksProperty);
 +    long maxMessageSize = 10 * 1000 * 1000;
 +    if (maxMessageSizeProperty != null)
 +      maxMessageSize = conf.getMemoryInBytes(maxMessageSizeProperty);
 +    boolean portSearch = false;
 +    if (portSearchProperty != null)
 +      portSearch = conf.getBoolean(portSearchProperty);
 +    // create the TimedProcessor outside the port search loop so we don't try to register the same metrics mbean more than once
 +    TServerUtils.TimedProcessor timedProcessor = new TServerUtils.TimedProcessor(processor, serverName, threadName);
 +    Random random = new Random();
 +    for (int j = 0; j < 100; j++) {
 +
 +      // Are we going to slide around, looking for an open port?
 +      int portsToSearch = 1;
 +      if (portSearch)
 +        portsToSearch = 1000;
 +
 +      for (int i = 0; i < portsToSearch; i++) {
 +        int port = portHint + i;
 +        if (portHint != 0 && i > 0)
 +          port = 1024 + random.nextInt(65535 - 1024);
 +        if (port > 65535)
 +          port = 1024 + port % (65535 - 1024);
 +        try {
 +          HostAndPort addr = HostAndPort.fromParts(address, port);
 +          return TServerUtils.startTServer(addr, timedProcessor, serverName, threadName, minThreads, timeBetweenThreadChecks, maxMessageSize,
 +              SslConnectionParams.forServer(conf), conf.getTimeInMillis(Property.GENERAL_RPC_TIMEOUT));
 +        } catch (TTransportException ex) {
 +          log.error("Unable to start TServer", ex);
 +          if (ex.getCause() == null || ex.getCause().getClass() == BindException.class) {
 +            // Note: with a TNonblockingServerSocket a "port taken" exception is a cause-less
 +            // TTransportException, and with a TSocket created by TSSLTransportFactory, it
 +            // comes through as caused by a BindException.
 +            log.info("Unable to use port " + port + ", retrying. (Thread Name = " + threadName + ")");
 +            UtilWaitThread.sleep(250);
 +          } else {
 +            // thrift is passing up a nested exception that isn't a BindException,
 +            // so no reason to believe retrying on a different port would help.
 +            log.error("Unable to start TServer", ex);
 +            break;
 +          }
 +        }
 +      }
 +    }
 +    throw new UnknownHostException("Unable to find a listen port");
 +  }
 +
 +  public static class TimedProcessor implements TProcessor {
 +
 +    final TProcessor other;
 +    ThriftMetrics metrics = null;
 +    long idleStart = 0;
 +
 +    TimedProcessor(TProcessor next, String serverName, String threadName) {
 +      this.other = next;
 +      // Register the metrics MBean
 +      try {
 +        metrics = new ThriftMetrics(serverName, threadName);
 +        metrics.register();
 +      } catch (Exception e) {
 +        log.error("Exception registering MBean with MBean Server", e);
 +      }
 +      idleStart = System.currentTimeMillis();
 +    }
 +
 +    @Override
 +    public boolean process(TProtocol in, TProtocol out) throws TException {
 +      long now = 0;
 +      if (metrics.isEnabled()) {
 +        now = System.currentTimeMillis();
 +        metrics.add(ThriftMetrics.idle, (now - idleStart));
 +      }
 +      try {
 +        return other.process(in, out);
 +      } finally {
 +        if (metrics.isEnabled()) {
 +          idleStart = System.currentTimeMillis();
 +          metrics.add(ThriftMetrics.execute, idleStart - now);
 +        }
 +      }
 +    }
 +  }
 +
 +  public static class ClientInfoProcessorFactory extends TProcessorFactory {
 +
 +    public ClientInfoProcessorFactory(TProcessor processor) {
 +      super(processor);
 +    }
 +
 +    @Override
 +    public TProcessor getProcessor(TTransport trans) {
 +      if (trans instanceof TBufferedSocket) {
 +        TBufferedSocket tsock = (TBufferedSocket) trans;
 +        clientAddress.set(tsock.getClientString());
 +      } else if (trans instanceof TSocket) {
 +        TSocket tsock = (TSocket) trans;
 +        clientAddress.set(tsock.getSocket().getInetAddress().getHostAddress() + ":" + tsock.getSocket().getPort());
 +      } else {
 +        log.warn("Unable to extract clientAddress from transport of type " + trans.getClass());
 +      }
 +      return super.getProcessor(trans);
 +    }
 +  }
 +
 +  public static ServerAddress createNonBlockingServer(HostAndPort address, TProcessor processor, final String serverName, String threadName,
 +      final int numThreads, long timeBetweenThreadChecks, long maxMessageSize) throws TTransportException {
 +    TNonblockingServerSocket transport = new TNonblockingServerSocket(new InetSocketAddress(address.getHostText(), address.getPort()));
 +    CustomNonBlockingServer.Args options = new CustomNonBlockingServer.Args(transport);
 +    options.protocolFactory(ThriftUtil.protocolFactory());
 +    options.transportFactory(ThriftUtil.transportFactory(maxMessageSize));
 +    options.maxReadBufferBytes = maxMessageSize;
 +    options.stopTimeoutVal(5);
 +    /*
 +     * Create our own very special thread pool.
 +     */
 +    final ThreadPoolExecutor pool = new SimpleThreadPool(numThreads, "ClientPool");
 +    // periodically adjust the number of threads we need by checking how busy our threads are
 +    SimpleTimer.getInstance().schedule(new Runnable() {
 +      @Override
 +      public void run() {
++        // there is a minor race condition between sampling the current state of the thread pool and adjusting it
++        // however, this isn't really an issue, since it adjusts periodically anyway
 +        if (pool.getCorePoolSize() <= pool.getActiveCount()) {
 +          int larger = pool.getCorePoolSize() + Math.min(pool.getQueue().size(), 2);
 +          log.info("Increasing server thread pool size on " + serverName + " to " + larger);
 +          pool.setMaximumPoolSize(larger);
 +          pool.setCorePoolSize(larger);
 +        } else {
 +          if (pool.getCorePoolSize() > pool.getActiveCount() + 3) {
 +            int smaller = Math.max(numThreads, pool.getCorePoolSize() - 1);
 +            if (smaller != pool.getCorePoolSize()) {
-               // ACCUMULO-2997 there is a race condition here... the active count could be higher by the time
-               // we decrease the core pool size... so the active count could end up higher than
-               // the core pool size, in which case everything will be queued... the increase case
-               // should handle this and prevent deadlock
 +              log.info("Decreasing server thread pool size on " + serverName + " to " + smaller);
 +              pool.setCorePoolSize(smaller);
 +            }
 +          }
 +        }
 +      }
 +    }, timeBetweenThreadChecks, timeBetweenThreadChecks);
 +    options.executorService(pool);
 +    options.processorFactory(new TProcessorFactory(processor));
 +    if (address.getPort() == 0) {
 +      address = HostAndPort.fromParts(address.getHostText(), transport.getPort());
 +    }
 +    return new ServerAddress(new CustomNonBlockingServer(options), address);
 +  }
 +
 +  public static ServerAddress createThreadPoolServer(HostAndPort address, TProcessor processor, String serverName, String threadName, int numThreads)
 +      throws TTransportException {
 +
 +    // if port is zero, then we must bind to get the port number
 +    ServerSocket sock;
 +    try {
 +      sock = ServerSocketChannel.open().socket();
 +      sock.setReuseAddress(true);
 +      sock.bind(new InetSocketAddress(address.getHostText(), address.getPort()));
 +      address = HostAndPort.fromParts(address.getHostText(), sock.getLocalPort());
 +    } catch (IOException ex) {
 +      throw new TTransportException(ex);
 +    }
 +    TServerTransport transport = new TBufferedServerSocket(sock, 32 * 1024);
 +    return new ServerAddress(createThreadPoolServer(transport, processor), address);
 +  }
 +
 +  public static TServer createThreadPoolServer(TServerTransport transport, TProcessor processor) {
 +    TThreadPoolServer.Args options = new TThreadPoolServer.Args(transport);
 +    options.protocolFactory(ThriftUtil.protocolFactory());
 +    options.transportFactory(ThriftUtil.transportFactory());
 +    options.processorFactory(new ClientInfoProcessorFactory(processor));
 +    return new TThreadPoolServer(options);
 +  }
 +
 +  public static ServerAddress createSslThreadPoolServer(HostAndPort address, TProcessor processor, long socketTimeout, SslConnectionParams sslParams)
 +      throws TTransportException {
 +    org.apache.thrift.transport.TServerSocket transport;
 +    try {
 +      transport = ThriftUtil.getServerSocket(address.getPort(), (int) socketTimeout, InetAddress.getByName(address.getHostText()), sslParams);
 +    } catch (UnknownHostException e) {
 +      throw new TTransportException(e);
 +    }
 +    if (address.getPort() == 0) {
 +      address = HostAndPort.fromParts(address.getHostText(), transport.getServerSocket().getLocalPort());
 +    }
 +    return new ServerAddress(createThreadPoolServer(transport, processor), address);
 +  }
 +
 +  public static ServerAddress startTServer(HostAndPort address, TProcessor processor, String serverName, String threadName, int numThreads,
 +      long timeBetweenThreadChecks, long maxMessageSize, SslConnectionParams sslParams, long sslSocketTimeout) throws TTransportException {
 +    return startTServer(address, new TimedProcessor(processor, serverName, threadName), serverName, threadName, numThreads, timeBetweenThreadChecks,
 +        maxMessageSize, sslParams, sslSocketTimeout);
 +  }
 +
 +  public static ServerAddress startTServer(HostAndPort address, TimedProcessor processor, String serverName, String threadName, int numThreads,
 +      long timeBetweenThreadChecks, long maxMessageSize, SslConnectionParams sslParams, long sslSocketTimeout) throws TTransportException {
 +
 +    ServerAddress serverAddress;
 +    if (sslParams != null) {
 +      serverAddress = createSslThreadPoolServer(address, processor, sslSocketTimeout, sslParams);
 +    } else {
 +      serverAddress = createNonBlockingServer(address, processor, serverName, threadName, numThreads, timeBetweenThreadChecks, maxMessageSize);
 +    }
 +    final TServer finalServer = serverAddress.server;
 +    Runnable serveTask = new Runnable() {
 +      @Override
 +      public void run() {
 +        try {
 +          finalServer.serve();
 +        } catch (Error e) {
 +          Halt.halt("Unexpected error in TThreadPoolServer " + e + ", halting.");
 +        }
 +      }
 +    };
 +    serveTask = new LoggingRunnable(TServerUtils.log, serveTask);
 +    Thread thread = new Daemon(serveTask, threadName);
 +    thread.start();
 +    // check for the special "bind to everything address"
 +    if (serverAddress.address.getHostText().equals("0.0.0.0")) {
 +      // can't get the address from the bind, so we'll do our best to invent our hostname
 +      try {
 +        serverAddress = new ServerAddress(finalServer, HostAndPort.fromParts(InetAddress.getLocalHost().getHostName(), serverAddress.address.getPort()));
 +      } catch (UnknownHostException e) {
 +        throw new TTransportException(e);
 +      }
 +    }
 +    return serverAddress;
 +  }
 +
 +  // Existing connections will keep our thread running: reach in with reflection and insist that they shutdown.
 +  public static void stopTServer(TServer s) {
 +    if (s == null)
 +      return;
 +    s.stop();
 +    try {
 +      Field f = s.getClass().getDeclaredField("executorService_");
 +      f.setAccessible(true);
 +      ExecutorService es = (ExecutorService) f.get(s);
 +      es.shutdownNow();
 +    } catch (Exception e) {
 +      TServerUtils.log.error("Unable to call shutdownNow", e);
 +    }
 +  }
 +}