You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@accumulo.apache.org by el...@apache.org on 2014/12/09 06:15:40 UTC
[3/3] accumulo git commit: ACCUMULO-3394 Change thrift package to rpc
and make an rpc package in core too
ACCUMULO-3394 Change thrift package to rpc and make an rpc package in core too
Project: http://git-wip-us.apache.org/repos/asf/accumulo/repo
Commit: http://git-wip-us.apache.org/repos/asf/accumulo/commit/de1d3ee3
Tree: http://git-wip-us.apache.org/repos/asf/accumulo/tree/de1d3ee3
Diff: http://git-wip-us.apache.org/repos/asf/accumulo/diff/de1d3ee3
Branch: refs/heads/master
Commit: de1d3ee381d308c9e4d9d9f41532bde4cc1e6706
Parents: b792790
Author: Josh Elser <el...@apache.org>
Authored: Tue Dec 9 00:05:19 2014 -0500
Committer: Josh Elser <el...@apache.org>
Committed: Tue Dec 9 00:05:19 2014 -0500
----------------------------------------------------------------------
.../core/client/impl/ClientContext.java | 2 +-
.../core/client/impl/ConditionalWriterImpl.java | 2 +-
.../client/impl/InstanceOperationsImpl.java | 2 +-
.../accumulo/core/client/impl/MasterClient.java | 2 +-
.../core/client/impl/ReplicationClient.java | 2 +-
.../accumulo/core/client/impl/ServerClient.java | 2 +-
.../core/client/impl/TableOperationsImpl.java | 4 +-
.../impl/TabletServerBatchReaderIterator.java | 2 +-
.../client/impl/TabletServerBatchWriter.java | 2 +-
.../core/client/impl/ThriftScanner.java | 2 +-
.../core/client/impl/ThriftTransportKey.java | 2 +-
.../core/client/impl/ThriftTransportPool.java | 2 +-
.../accumulo/core/client/impl/Writer.java | 2 +-
.../accumulo/core/rpc/SslConnectionParams.java | 275 ++++++++++++
.../accumulo/core/rpc/TBufferedSocket.java | 39 ++
.../accumulo/core/rpc/TTimeoutTransport.java | 60 +++
.../apache/accumulo/core/rpc/ThriftUtil.java | 434 +++++++++++++++++++
.../accumulo/core/util/SslConnectionParams.java | 275 ------------
.../accumulo/core/util/TBufferedSocket.java | 39 --
.../accumulo/core/util/TTimeoutTransport.java | 60 ---
.../apache/accumulo/core/util/ThriftUtil.java | 433 ------------------
.../java/org/apache/accumulo/proxy/Proxy.java | 2 +-
.../accumulo/server/AccumuloServerContext.java | 2 +-
.../accumulo/server/client/BulkImporter.java | 2 +-
.../accumulo/server/master/LiveTServerSet.java | 2 +-
.../server/master/balancer/TabletBalancer.java | 2 +-
.../server/rpc/ClientInfoProcessorFactory.java | 53 +++
.../server/rpc/CustomNonBlockingServer.java | 268 ++++++++++++
.../apache/accumulo/server/rpc/RpcWrapper.java | 62 +++
.../accumulo/server/rpc/ServerAddress.java | 42 ++
.../server/rpc/TBufferedServerSocket.java | 71 +++
.../server/rpc/TNonblockingServerSocket.java | 157 +++++++
.../accumulo/server/rpc/TServerUtils.java | 255 +++++++++++
.../accumulo/server/rpc/TimedProcessor.java | 69 +++
.../security/AuditedSecurityOperation.java | 2 +-
.../thrift/ClientInfoProcessorFactory.java | 53 ---
.../server/thrift/CustomNonBlockingServer.java | 268 ------------
.../accumulo/server/thrift/RpcWrapper.java | 62 ---
.../accumulo/server/thrift/ServerAddress.java | 42 --
.../server/thrift/TBufferedServerSocket.java | 71 ---
.../server/thrift/TNonblockingServerSocket.java | 157 -------
.../accumulo/server/thrift/TServerUtils.java | 255 -----------
.../accumulo/server/thrift/TimedProcessor.java | 69 ---
.../server/util/VerifyTabletAssignments.java | 2 +-
.../accumulo/server/util/TServerUtilsTest.java | 2 +-
.../gc/GarbageCollectWriteAheadLogs.java | 2 +-
.../accumulo/gc/SimpleGarbageCollector.java | 4 +-
.../CloseWriteAheadLogReferences.java | 2 +-
.../java/org/apache/accumulo/master/Master.java | 6 +-
.../org/apache/accumulo/monitor/Monitor.java | 2 +-
.../accumulo/monitor/ZooKeeperStatus.java | 2 +-
.../monitor/servlets/TServersServlet.java | 2 +-
.../apache/accumulo/tserver/TabletServer.java | 8 +-
.../accumulo/tserver/session/Session.java | 2 +-
.../apache/accumulo/test/WrongTabletTest.java | 2 +-
.../accumulo/test/functional/ZombieTServer.java | 4 +-
.../test/performance/thrift/NullTserver.java | 2 +-
.../org/apache/accumulo/test/TotalQueuedIT.java | 2 +-
...bageCollectorCommunicatesWithTServersIT.java | 2 +-
59 files changed, 1828 insertions(+), 1827 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/accumulo/blob/de1d3ee3/core/src/main/java/org/apache/accumulo/core/client/impl/ClientContext.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/accumulo/core/client/impl/ClientContext.java b/core/src/main/java/org/apache/accumulo/core/client/impl/ClientContext.java
index 8fd12f2..e75bec6 100644
--- a/core/src/main/java/org/apache/accumulo/core/client/impl/ClientContext.java
+++ b/core/src/main/java/org/apache/accumulo/core/client/impl/ClientContext.java
@@ -33,9 +33,9 @@ import org.apache.accumulo.core.conf.AccumuloConfiguration;
import org.apache.accumulo.core.conf.CredentialProviderFactoryShim;
import org.apache.accumulo.core.conf.DefaultConfiguration;
import org.apache.accumulo.core.conf.Property;
+import org.apache.accumulo.core.rpc.SslConnectionParams;
import org.apache.accumulo.core.security.Credentials;
import org.apache.accumulo.core.security.thrift.TCredentials;
-import org.apache.accumulo.core.util.SslConnectionParams;
import org.apache.commons.configuration.Configuration;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
http://git-wip-us.apache.org/repos/asf/accumulo/blob/de1d3ee3/core/src/main/java/org/apache/accumulo/core/client/impl/ConditionalWriterImpl.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/accumulo/core/client/impl/ConditionalWriterImpl.java b/core/src/main/java/org/apache/accumulo/core/client/impl/ConditionalWriterImpl.java
index c831915..ee56fff 100644
--- a/core/src/main/java/org/apache/accumulo/core/client/impl/ConditionalWriterImpl.java
+++ b/core/src/main/java/org/apache/accumulo/core/client/impl/ConditionalWriterImpl.java
@@ -60,6 +60,7 @@ import org.apache.accumulo.core.data.thrift.TConditionalSession;
import org.apache.accumulo.core.data.thrift.TKeyExtent;
import org.apache.accumulo.core.data.thrift.TMutation;
import org.apache.accumulo.core.master.state.tables.TableState;
+import org.apache.accumulo.core.rpc.ThriftUtil;
import org.apache.accumulo.core.security.Authorizations;
import org.apache.accumulo.core.security.ColumnVisibility;
import org.apache.accumulo.core.security.VisibilityEvaluator;
@@ -72,7 +73,6 @@ import org.apache.accumulo.core.trace.thrift.TInfo;
import org.apache.accumulo.core.util.BadArgumentException;
import org.apache.accumulo.core.util.ByteBufferUtil;
import org.apache.accumulo.core.util.LoggingRunnable;
-import org.apache.accumulo.core.util.ThriftUtil;
import org.apache.accumulo.core.util.UtilWaitThread;
import org.apache.accumulo.core.zookeeper.ZooUtil;
import org.apache.accumulo.fate.zookeeper.ZooCacheFactory;
http://git-wip-us.apache.org/repos/asf/accumulo/blob/de1d3ee3/core/src/main/java/org/apache/accumulo/core/client/impl/InstanceOperationsImpl.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/accumulo/core/client/impl/InstanceOperationsImpl.java b/core/src/main/java/org/apache/accumulo/core/client/impl/InstanceOperationsImpl.java
index 4e74069..a62496b 100644
--- a/core/src/main/java/org/apache/accumulo/core/client/impl/InstanceOperationsImpl.java
+++ b/core/src/main/java/org/apache/accumulo/core/client/impl/InstanceOperationsImpl.java
@@ -36,11 +36,11 @@ import org.apache.accumulo.core.client.impl.thrift.ClientService;
import org.apache.accumulo.core.client.impl.thrift.ConfigurationType;
import org.apache.accumulo.core.client.impl.thrift.ThriftSecurityException;
import org.apache.accumulo.core.master.thrift.MasterClientService;
+import org.apache.accumulo.core.rpc.ThriftUtil;
import org.apache.accumulo.core.tabletserver.thrift.TabletClientService;
import org.apache.accumulo.core.tabletserver.thrift.TabletClientService.Client;
import org.apache.accumulo.core.trace.Tracer;
import org.apache.accumulo.core.util.AddressUtil;
-import org.apache.accumulo.core.util.ThriftUtil;
import org.apache.accumulo.core.zookeeper.ZooUtil;
import org.apache.accumulo.fate.zookeeper.ZooCache;
import org.apache.accumulo.fate.zookeeper.ZooCacheFactory;
http://git-wip-us.apache.org/repos/asf/accumulo/blob/de1d3ee3/core/src/main/java/org/apache/accumulo/core/client/impl/MasterClient.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/accumulo/core/client/impl/MasterClient.java b/core/src/main/java/org/apache/accumulo/core/client/impl/MasterClient.java
index 092eec6..74b8ea9 100644
--- a/core/src/main/java/org/apache/accumulo/core/client/impl/MasterClient.java
+++ b/core/src/main/java/org/apache/accumulo/core/client/impl/MasterClient.java
@@ -28,7 +28,7 @@ import org.apache.accumulo.core.client.TableNotFoundException;
import org.apache.accumulo.core.client.impl.thrift.ThriftSecurityException;
import org.apache.accumulo.core.client.impl.thrift.ThriftTableOperationException;
import org.apache.accumulo.core.master.thrift.MasterClientService;
-import org.apache.accumulo.core.util.ThriftUtil;
+import org.apache.accumulo.core.rpc.ThriftUtil;
import org.apache.accumulo.core.util.UtilWaitThread;
import org.apache.log4j.Logger;
import org.apache.thrift.TServiceClient;
http://git-wip-us.apache.org/repos/asf/accumulo/blob/de1d3ee3/core/src/main/java/org/apache/accumulo/core/client/impl/ReplicationClient.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/accumulo/core/client/impl/ReplicationClient.java b/core/src/main/java/org/apache/accumulo/core/client/impl/ReplicationClient.java
index f2007e9..edfba50 100644
--- a/core/src/main/java/org/apache/accumulo/core/client/impl/ReplicationClient.java
+++ b/core/src/main/java/org/apache/accumulo/core/client/impl/ReplicationClient.java
@@ -28,7 +28,7 @@ import org.apache.accumulo.core.client.Instance;
import org.apache.accumulo.core.client.impl.thrift.ThriftSecurityException;
import org.apache.accumulo.core.replication.thrift.ReplicationCoordinator;
import org.apache.accumulo.core.replication.thrift.ReplicationServicer;
-import org.apache.accumulo.core.util.ThriftUtil;
+import org.apache.accumulo.core.rpc.ThriftUtil;
import org.apache.accumulo.core.util.UtilWaitThread;
import org.apache.accumulo.core.zookeeper.ZooUtil;
import org.apache.accumulo.fate.zookeeper.ZooReader;
http://git-wip-us.apache.org/repos/asf/accumulo/blob/de1d3ee3/core/src/main/java/org/apache/accumulo/core/client/impl/ServerClient.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/accumulo/core/client/impl/ServerClient.java b/core/src/main/java/org/apache/accumulo/core/client/impl/ServerClient.java
index ebccdab..1e44727 100644
--- a/core/src/main/java/org/apache/accumulo/core/client/impl/ServerClient.java
+++ b/core/src/main/java/org/apache/accumulo/core/client/impl/ServerClient.java
@@ -28,10 +28,10 @@ import org.apache.accumulo.core.client.Instance;
import org.apache.accumulo.core.client.impl.thrift.ClientService;
import org.apache.accumulo.core.client.impl.thrift.ClientService.Client;
import org.apache.accumulo.core.client.impl.thrift.ThriftSecurityException;
+import org.apache.accumulo.core.rpc.ThriftUtil;
import org.apache.accumulo.core.util.Pair;
import org.apache.accumulo.core.util.ServerServices;
import org.apache.accumulo.core.util.ServerServices.Service;
-import org.apache.accumulo.core.util.ThriftUtil;
import org.apache.accumulo.core.util.UtilWaitThread;
import org.apache.accumulo.core.zookeeper.ZooUtil;
import org.apache.accumulo.fate.zookeeper.ZooCache;
http://git-wip-us.apache.org/repos/asf/accumulo/blob/de1d3ee3/core/src/main/java/org/apache/accumulo/core/client/impl/TableOperationsImpl.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/accumulo/core/client/impl/TableOperationsImpl.java b/core/src/main/java/org/apache/accumulo/core/client/impl/TableOperationsImpl.java
index 1def091..4843a9c 100644
--- a/core/src/main/java/org/apache/accumulo/core/client/impl/TableOperationsImpl.java
+++ b/core/src/main/java/org/apache/accumulo/core/client/impl/TableOperationsImpl.java
@@ -49,7 +49,6 @@ import java.util.zip.ZipEntry;
import java.util.zip.ZipInputStream;
import org.apache.accumulo.core.client.admin.CompactionConfig;
-
import org.apache.accumulo.core.Constants;
import org.apache.accumulo.core.client.AccumuloException;
import org.apache.accumulo.core.client.AccumuloSecurityException;
@@ -92,6 +91,7 @@ import org.apache.accumulo.core.metadata.MetadataServicer;
import org.apache.accumulo.core.metadata.MetadataTable;
import org.apache.accumulo.core.metadata.RootTable;
import org.apache.accumulo.core.metadata.schema.MetadataSchema.TabletsSection;
+import org.apache.accumulo.core.rpc.ThriftUtil;
import org.apache.accumulo.core.security.Authorizations;
import org.apache.accumulo.core.tabletserver.thrift.NotServingTabletException;
import org.apache.accumulo.core.tabletserver.thrift.TabletClientService;
@@ -104,7 +104,6 @@ import org.apache.accumulo.core.util.NamingThreadFactory;
import org.apache.accumulo.core.util.OpTimer;
import org.apache.accumulo.core.util.Pair;
import org.apache.accumulo.core.util.TextUtil;
-import org.apache.accumulo.core.util.ThriftUtil;
import org.apache.accumulo.core.util.UtilWaitThread;
import org.apache.accumulo.core.volume.VolumeConfiguration;
import org.apache.hadoop.fs.FileStatus;
@@ -116,6 +115,7 @@ import org.apache.log4j.Logger;
import org.apache.thrift.TApplicationException;
import org.apache.thrift.TException;
import org.apache.thrift.transport.TTransportException;
+
import com.google.common.base.Joiner;
public class TableOperationsImpl extends TableOperationsHelper {
http://git-wip-us.apache.org/repos/asf/accumulo/blob/de1d3ee3/core/src/main/java/org/apache/accumulo/core/client/impl/TabletServerBatchReaderIterator.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/accumulo/core/client/impl/TabletServerBatchReaderIterator.java b/core/src/main/java/org/apache/accumulo/core/client/impl/TabletServerBatchReaderIterator.java
index cd3dfd0..fb5c20b 100644
--- a/core/src/main/java/org/apache/accumulo/core/client/impl/TabletServerBatchReaderIterator.java
+++ b/core/src/main/java/org/apache/accumulo/core/client/impl/TabletServerBatchReaderIterator.java
@@ -54,6 +54,7 @@ import org.apache.accumulo.core.data.thrift.TKeyExtent;
import org.apache.accumulo.core.data.thrift.TKeyValue;
import org.apache.accumulo.core.data.thrift.TRange;
import org.apache.accumulo.core.master.state.tables.TableState;
+import org.apache.accumulo.core.rpc.ThriftUtil;
import org.apache.accumulo.core.security.Authorizations;
import org.apache.accumulo.core.tabletserver.thrift.NoSuchScanIDException;
import org.apache.accumulo.core.tabletserver.thrift.TabletClientService;
@@ -61,7 +62,6 @@ import org.apache.accumulo.core.trace.Tracer;
import org.apache.accumulo.core.trace.wrappers.TraceRunnable;
import org.apache.accumulo.core.util.ByteBufferUtil;
import org.apache.accumulo.core.util.OpTimer;
-import org.apache.accumulo.core.util.ThriftUtil;
import org.apache.hadoop.io.Text;
import org.apache.log4j.Level;
import org.apache.log4j.Logger;
http://git-wip-us.apache.org/repos/asf/accumulo/blob/de1d3ee3/core/src/main/java/org/apache/accumulo/core/client/impl/TabletServerBatchWriter.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/accumulo/core/client/impl/TabletServerBatchWriter.java b/core/src/main/java/org/apache/accumulo/core/client/impl/TabletServerBatchWriter.java
index a6112da..30a707e 100644
--- a/core/src/main/java/org/apache/accumulo/core/client/impl/TabletServerBatchWriter.java
+++ b/core/src/main/java/org/apache/accumulo/core/client/impl/TabletServerBatchWriter.java
@@ -54,6 +54,7 @@ import org.apache.accumulo.core.data.Mutation;
import org.apache.accumulo.core.data.thrift.TMutation;
import org.apache.accumulo.core.data.thrift.UpdateErrors;
import org.apache.accumulo.core.master.state.tables.TableState;
+import org.apache.accumulo.core.rpc.ThriftUtil;
import org.apache.accumulo.core.tabletserver.thrift.ConstraintViolationException;
import org.apache.accumulo.core.tabletserver.thrift.NoSuchScanIDException;
import org.apache.accumulo.core.tabletserver.thrift.NotServingTabletException;
@@ -63,7 +64,6 @@ import org.apache.accumulo.core.trace.Trace;
import org.apache.accumulo.core.trace.Tracer;
import org.apache.accumulo.core.trace.thrift.TInfo;
import org.apache.accumulo.core.util.SimpleThreadPool;
-import org.apache.accumulo.core.util.ThriftUtil;
import org.apache.hadoop.io.Text;
import org.apache.log4j.Logger;
import org.apache.thrift.TApplicationException;
http://git-wip-us.apache.org/repos/asf/accumulo/blob/de1d3ee3/core/src/main/java/org/apache/accumulo/core/client/impl/ThriftScanner.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/accumulo/core/client/impl/ThriftScanner.java b/core/src/main/java/org/apache/accumulo/core/client/impl/ThriftScanner.java
index 4b7d9ae..90e4421 100644
--- a/core/src/main/java/org/apache/accumulo/core/client/impl/ThriftScanner.java
+++ b/core/src/main/java/org/apache/accumulo/core/client/impl/ThriftScanner.java
@@ -48,6 +48,7 @@ import org.apache.accumulo.core.data.thrift.IterInfo;
import org.apache.accumulo.core.data.thrift.ScanResult;
import org.apache.accumulo.core.data.thrift.TKeyValue;
import org.apache.accumulo.core.master.state.tables.TableState;
+import org.apache.accumulo.core.rpc.ThriftUtil;
import org.apache.accumulo.core.security.Authorizations;
import org.apache.accumulo.core.tabletserver.thrift.NoSuchScanIDException;
import org.apache.accumulo.core.tabletserver.thrift.NotServingTabletException;
@@ -58,7 +59,6 @@ import org.apache.accumulo.core.trace.Trace;
import org.apache.accumulo.core.trace.Tracer;
import org.apache.accumulo.core.trace.thrift.TInfo;
import org.apache.accumulo.core.util.OpTimer;
-import org.apache.accumulo.core.util.ThriftUtil;
import org.apache.hadoop.io.Text;
import org.apache.log4j.Level;
import org.apache.log4j.Logger;
http://git-wip-us.apache.org/repos/asf/accumulo/blob/de1d3ee3/core/src/main/java/org/apache/accumulo/core/client/impl/ThriftTransportKey.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/accumulo/core/client/impl/ThriftTransportKey.java b/core/src/main/java/org/apache/accumulo/core/client/impl/ThriftTransportKey.java
index a3a4f25..176e947 100644
--- a/core/src/main/java/org/apache/accumulo/core/client/impl/ThriftTransportKey.java
+++ b/core/src/main/java/org/apache/accumulo/core/client/impl/ThriftTransportKey.java
@@ -18,7 +18,7 @@ package org.apache.accumulo.core.client.impl;
import static com.google.common.base.Preconditions.checkArgument;
-import org.apache.accumulo.core.util.SslConnectionParams;
+import org.apache.accumulo.core.rpc.SslConnectionParams;
class ThriftTransportKey {
private final String location;
http://git-wip-us.apache.org/repos/asf/accumulo/blob/de1d3ee3/core/src/main/java/org/apache/accumulo/core/client/impl/ThriftTransportPool.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/accumulo/core/client/impl/ThriftTransportPool.java b/core/src/main/java/org/apache/accumulo/core/client/impl/ThriftTransportPool.java
index 3380e13..bdb04d9 100644
--- a/core/src/main/java/org/apache/accumulo/core/client/impl/ThriftTransportPool.java
+++ b/core/src/main/java/org/apache/accumulo/core/client/impl/ThriftTransportPool.java
@@ -31,9 +31,9 @@ import java.util.Set;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.atomic.AtomicBoolean;
+import org.apache.accumulo.core.rpc.ThriftUtil;
import org.apache.accumulo.core.util.Daemon;
import org.apache.accumulo.core.util.Pair;
-import org.apache.accumulo.core.util.ThriftUtil;
import org.apache.log4j.Logger;
import org.apache.thrift.transport.TTransport;
import org.apache.thrift.transport.TTransportException;
http://git-wip-us.apache.org/repos/asf/accumulo/blob/de1d3ee3/core/src/main/java/org/apache/accumulo/core/client/impl/Writer.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/accumulo/core/client/impl/Writer.java b/core/src/main/java/org/apache/accumulo/core/client/impl/Writer.java
index df7074d..d7761e9 100644
--- a/core/src/main/java/org/apache/accumulo/core/client/impl/Writer.java
+++ b/core/src/main/java/org/apache/accumulo/core/client/impl/Writer.java
@@ -26,12 +26,12 @@ import org.apache.accumulo.core.client.impl.TabletLocator.TabletLocation;
import org.apache.accumulo.core.client.impl.thrift.ThriftSecurityException;
import org.apache.accumulo.core.data.KeyExtent;
import org.apache.accumulo.core.data.Mutation;
+import org.apache.accumulo.core.rpc.ThriftUtil;
import org.apache.accumulo.core.tabletserver.thrift.ConstraintViolationException;
import org.apache.accumulo.core.tabletserver.thrift.NotServingTabletException;
import org.apache.accumulo.core.tabletserver.thrift.TDurability;
import org.apache.accumulo.core.tabletserver.thrift.TabletClientService;
import org.apache.accumulo.core.trace.Tracer;
-import org.apache.accumulo.core.util.ThriftUtil;
import org.apache.accumulo.core.util.UtilWaitThread;
import org.apache.hadoop.io.Text;
import org.apache.log4j.Logger;
http://git-wip-us.apache.org/repos/asf/accumulo/blob/de1d3ee3/core/src/main/java/org/apache/accumulo/core/rpc/SslConnectionParams.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/accumulo/core/rpc/SslConnectionParams.java b/core/src/main/java/org/apache/accumulo/core/rpc/SslConnectionParams.java
new file mode 100644
index 0000000..718bf85
--- /dev/null
+++ b/core/src/main/java/org/apache/accumulo/core/rpc/SslConnectionParams.java
@@ -0,0 +1,275 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.accumulo.core.rpc;
+
+import java.io.File;
+import java.io.FileNotFoundException;
+import java.net.URL;
+import java.util.Arrays;
+
+import org.apache.accumulo.core.conf.AccumuloConfiguration;
+import org.apache.accumulo.core.conf.Property;
+import org.apache.commons.lang.StringUtils;
+import org.apache.log4j.Logger;
+import org.apache.thrift.transport.TSSLTransportFactory.TSSLTransportParameters;
+
+public class SslConnectionParams {
+ private static final Logger log = Logger.getLogger(SslConnectionParams.class);
+
+ private boolean useJsse = false;
+ private boolean clientAuth = false;
+
+ private boolean keyStoreSet;
+ private String keyStorePath;
+ private String keyStorePass;
+ private String keyStoreType;
+
+ private boolean trustStoreSet;
+ private String trustStorePath;
+ private String trustStorePass;
+ private String trustStoreType;
+
+ private String[] cipherSuites;
+ private String[] serverProtocols;
+ private String clientProtocol;
+
+ // Use the static construction methods
+ private SslConnectionParams() {}
+
+ public static SslConnectionParams forConfig(AccumuloConfiguration conf, boolean server) {
+ if (!conf.getBoolean(Property.INSTANCE_RPC_SSL_ENABLED))
+ return null;
+
+ SslConnectionParams result = new SslConnectionParams();
+ boolean requireClientAuth = conf.getBoolean(Property.INSTANCE_RPC_SSL_CLIENT_AUTH);
+ if (server) {
+ result.setClientAuth(requireClientAuth);
+ }
+ if (conf.getBoolean(Property.RPC_USE_JSSE)) {
+ result.setUseJsse(true);
+ return result;
+ }
+
+ try {
+ if (!server || requireClientAuth) {
+ result.setTrustStoreFromConf(conf);
+ }
+ if (server || requireClientAuth) {
+ result.setKeyStoreFromConf(conf);
+ }
+ } catch (FileNotFoundException e) {
+ throw new IllegalArgumentException("Could not load configured keystore file", e);
+ }
+
+ String ciphers = conf.get(Property.RPC_SSL_CIPHER_SUITES);
+ if (null != ciphers && !ciphers.isEmpty()) {
+ result.cipherSuites = StringUtils.split(ciphers, ',');
+ }
+
+ String enabledProtocols = conf.get(Property.RPC_SSL_ENABLED_PROTOCOLS);
+ result.serverProtocols = StringUtils.split(enabledProtocols, ',');
+
+ result.clientProtocol = conf.get(Property.RPC_SSL_CLIENT_PROTOCOL);
+
+ return result;
+ }
+
+ private static String passwordFromConf(AccumuloConfiguration conf, String defaultPassword, Property passwordOverrideProperty) {
+ String keystorePassword = conf.get(passwordOverrideProperty);
+ if (!keystorePassword.isEmpty()) {
+ if (log.isTraceEnabled())
+ log.trace("Using explicit SSL private key password from " + passwordOverrideProperty.getKey());
+ } else {
+ keystorePassword = defaultPassword;
+ }
+ return keystorePassword;
+ }
+
+ private static String storePathFromConf(AccumuloConfiguration conf, Property pathProperty) throws FileNotFoundException {
+ return findKeystore(conf.getPath(pathProperty));
+ }
+
+ public void setKeyStoreFromConf(AccumuloConfiguration conf) throws FileNotFoundException {
+ keyStoreSet = true;
+ keyStorePath = storePathFromConf(conf, Property.RPC_SSL_KEYSTORE_PATH);
+ keyStorePass = passwordFromConf(conf, conf.get(Property.INSTANCE_SECRET), Property.RPC_SSL_KEYSTORE_PASSWORD);
+ keyStoreType = conf.get(Property.RPC_SSL_KEYSTORE_TYPE);
+ }
+
+ public void setTrustStoreFromConf(AccumuloConfiguration conf) throws FileNotFoundException {
+ trustStoreSet = true;
+ trustStorePath = storePathFromConf(conf, Property.RPC_SSL_TRUSTSTORE_PATH);
+ trustStorePass = passwordFromConf(conf, "", Property.RPC_SSL_TRUSTSTORE_PASSWORD);
+ trustStoreType = conf.get(Property.RPC_SSL_TRUSTSTORE_TYPE);
+ }
+
+ public static SslConnectionParams forServer(AccumuloConfiguration configuration) {
+ return forConfig(configuration, true);
+ }
+
+ public static SslConnectionParams forClient(AccumuloConfiguration configuration) {
+ return forConfig(configuration, false);
+ }
+
+ private static String findKeystore(String keystorePath) throws FileNotFoundException {
+ try {
+ // first just try the file
+ File file = new File(keystorePath);
+ if (file.exists())
+ return file.getAbsolutePath();
+ if (!file.isAbsolute()) {
+ // try classpath
+ URL url = SslConnectionParams.class.getClassLoader().getResource(keystorePath);
+ if (url != null) {
+ file = new File(url.toURI());
+ if (file.exists())
+ return file.getAbsolutePath();
+ }
+ }
+ } catch (Exception e) {
+ log.warn("Exception finding keystore", e);
+ }
+ throw new FileNotFoundException("Failed to load SSL keystore from " + keystorePath);
+ }
+
+ public void setUseJsse(boolean useJsse) {
+ this.useJsse = useJsse;
+ }
+
+ public boolean useJsse() {
+ return useJsse;
+ }
+
+ public void setClientAuth(boolean clientAuth) {
+ this.clientAuth = clientAuth;
+ }
+
+ public boolean isClientAuth() {
+ return clientAuth;
+ }
+
+ public String[] getServerProtocols() {
+ return serverProtocols;
+ }
+
+ public String getClientProtocol() {
+ return clientProtocol;
+ }
+
+ public boolean isKeyStoreSet() {
+ return keyStoreSet;
+ }
+
+ public String getKeyStorePath() {
+ return keyStorePath;
+ }
+
+ /**
+ * @return the keyStorePass
+ */
+ public String getKeyStorePass() {
+ return keyStorePass;
+ }
+
+ public String getKeyStoreType() {
+ return keyStoreType;
+ }
+
+ public boolean isTrustStoreSet() {
+ return trustStoreSet;
+ }
+
+ public String getTrustStorePath() {
+ return trustStorePath;
+ }
+
+ public String getTrustStorePass() {
+ return trustStorePass;
+ }
+
+ /**
+ * @return the trustStoreType
+ */
+ public String getTrustStoreType() {
+ return trustStoreType;
+ }
+
+ public TSSLTransportParameters getTTransportParams() {
+ if (useJsse)
+ throw new IllegalStateException("Cannot get TTransportParams for JSEE configuration.");
+
+ // Null cipherSuites is implicitly handled
+ TSSLTransportParameters params = new TSSLTransportParameters(clientProtocol, cipherSuites);
+
+ params.requireClientAuth(clientAuth);
+ if (keyStoreSet) {
+ params.setKeyStore(keyStorePath, keyStorePass, null, keyStoreType);
+ }
+ if (trustStoreSet) {
+ params.setTrustStore(trustStorePath, trustStorePass, null, trustStoreType);
+ }
+ return params;
+ }
+
+ @Override
+ public int hashCode() {
+ int hash = 0;
+ hash = 31 * hash + (clientAuth ? 0 : 1);
+ hash = 31 * hash + (useJsse ? 0 : 1);
+ if (useJsse)
+ return hash;
+ hash = 31 * hash + (keyStoreSet ? 0 : 1);
+ hash = 31 * hash + (trustStoreSet ? 0 : 1);
+ if (keyStoreSet) {
+ hash = 31 * hash + keyStorePath.hashCode();
+ }
+ if (trustStoreSet) {
+ hash = 31 * hash + trustStorePath.hashCode();
+ }
+ hash = 31 * hash + clientProtocol.hashCode();
+ hash = 31 * hash + Arrays.hashCode(serverProtocols);
+ return super.hashCode();
+ }
+
+ @Override
+ public boolean equals(Object obj) {
+ if (!(obj instanceof SslConnectionParams))
+ return false;
+
+ SslConnectionParams other = (SslConnectionParams) obj;
+ if (clientAuth != other.clientAuth)
+ return false;
+ if (useJsse)
+ return other.useJsse;
+ if (keyStoreSet) {
+ if (!other.keyStoreSet)
+ return false;
+ if (!keyStorePath.equals(other.keyStorePath) || !keyStorePass.equals(other.keyStorePass) || !keyStoreType.equals(other.keyStoreType))
+ return false;
+ }
+ if (trustStoreSet) {
+ if (!other.trustStoreSet)
+ return false;
+ if (!trustStorePath.equals(other.trustStorePath) || !trustStorePass.equals(other.trustStorePass) || !trustStoreType.equals(other.trustStoreType))
+ return false;
+ }
+ if (!Arrays.equals(serverProtocols, other.serverProtocols)) {
+ return false;
+ }
+ return clientProtocol.equals(other.clientProtocol);
+ }
+}
http://git-wip-us.apache.org/repos/asf/accumulo/blob/de1d3ee3/core/src/main/java/org/apache/accumulo/core/rpc/TBufferedSocket.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/accumulo/core/rpc/TBufferedSocket.java b/core/src/main/java/org/apache/accumulo/core/rpc/TBufferedSocket.java
new file mode 100644
index 0000000..87b7c13
--- /dev/null
+++ b/core/src/main/java/org/apache/accumulo/core/rpc/TBufferedSocket.java
@@ -0,0 +1,39 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.accumulo.core.rpc;
+
+import java.io.BufferedInputStream;
+import java.io.BufferedOutputStream;
+import java.io.IOException;
+
+import org.apache.thrift.transport.TIOStreamTransport;
+import org.apache.thrift.transport.TSocket;
+
+public class TBufferedSocket extends TIOStreamTransport {
+
+ String client;
+
+ public TBufferedSocket(TSocket sock, int bufferSize) throws IOException {
+ super(new BufferedInputStream(sock.getSocket().getInputStream(), bufferSize), new BufferedOutputStream(sock.getSocket().getOutputStream(), bufferSize));
+ client = sock.getSocket().getInetAddress().getHostAddress() + ":" + sock.getSocket().getPort();
+ }
+
+ public String getClientString() {
+ return client;
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/accumulo/blob/de1d3ee3/core/src/main/java/org/apache/accumulo/core/rpc/TTimeoutTransport.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/accumulo/core/rpc/TTimeoutTransport.java b/core/src/main/java/org/apache/accumulo/core/rpc/TTimeoutTransport.java
new file mode 100644
index 0000000..6eace77
--- /dev/null
+++ b/core/src/main/java/org/apache/accumulo/core/rpc/TTimeoutTransport.java
@@ -0,0 +1,60 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.accumulo.core.rpc;
+
+import java.io.BufferedInputStream;
+import java.io.BufferedOutputStream;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStream;
+import java.lang.reflect.Method;
+import java.net.InetSocketAddress;
+import java.net.Socket;
+import java.net.SocketAddress;
+import java.nio.channels.spi.SelectorProvider;
+
+import org.apache.hadoop.net.NetUtils;
+import org.apache.thrift.transport.TIOStreamTransport;
+import org.apache.thrift.transport.TTransport;
+
+import com.google.common.net.HostAndPort;
+
+public class TTimeoutTransport {
+
+ private static InputStream getInputStream(Socket socket, long timeout) {
+ try {
+ Method m = NetUtils.class.getMethod("getInputStream", Socket.class, Long.TYPE);
+ return (InputStream)m.invoke(null, socket, timeout);
+ } catch (Exception e) {
+ throw new RuntimeException(e);
+ }
+ }
+
+ public static TTransport create(HostAndPort addr, long timeoutMillis) throws IOException {
+ return create(new InetSocketAddress(addr.getHostText(), addr.getPort()), timeoutMillis);
+ }
+
+ public static TTransport create(SocketAddress addr, long timeoutMillis) throws IOException {
+ Socket socket = SelectorProvider.provider().openSocketChannel().socket();
+ socket.setSoLinger(false, 0);
+ socket.setTcpNoDelay(true);
+ socket.connect(addr);
+ InputStream input = new BufferedInputStream(getInputStream(socket, timeoutMillis), 1024 * 10);
+ OutputStream output = new BufferedOutputStream(NetUtils.getOutputStream(socket, timeoutMillis), 1024 * 10);
+ return new TIOStreamTransport(input, output);
+ }
+}
http://git-wip-us.apache.org/repos/asf/accumulo/blob/de1d3ee3/core/src/main/java/org/apache/accumulo/core/rpc/ThriftUtil.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/accumulo/core/rpc/ThriftUtil.java b/core/src/main/java/org/apache/accumulo/core/rpc/ThriftUtil.java
new file mode 100644
index 0000000..a3cb252
--- /dev/null
+++ b/core/src/main/java/org/apache/accumulo/core/rpc/ThriftUtil.java
@@ -0,0 +1,434 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.accumulo.core.rpc;
+
+import java.io.FileInputStream;
+import java.io.IOException;
+import java.net.InetAddress;
+import java.net.ServerSocket;
+import java.net.Socket;
+import java.net.UnknownHostException;
+import java.security.KeyStore;
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Map;
+import java.util.Set;
+
+import javax.net.ssl.KeyManagerFactory;
+import javax.net.ssl.SSLContext;
+import javax.net.ssl.SSLServerSocket;
+import javax.net.ssl.SSLSocket;
+import javax.net.ssl.SSLSocketFactory;
+import javax.net.ssl.TrustManagerFactory;
+
+import org.apache.accumulo.core.client.AccumuloException;
+import org.apache.accumulo.core.client.AccumuloSecurityException;
+import org.apache.accumulo.core.client.impl.ClientContext;
+import org.apache.accumulo.core.client.impl.ClientExec;
+import org.apache.accumulo.core.client.impl.ClientExecReturn;
+import org.apache.accumulo.core.client.impl.ThriftTransportPool;
+import org.apache.accumulo.core.client.impl.thrift.ThriftSecurityException;
+import org.apache.accumulo.core.tabletserver.thrift.TabletClientService;
+import org.apache.accumulo.core.trace.Span;
+import org.apache.accumulo.core.trace.Trace;
+import org.apache.accumulo.core.util.UtilWaitThread;
+import org.apache.log4j.Logger;
+import org.apache.thrift.TException;
+import org.apache.thrift.TServiceClient;
+import org.apache.thrift.TServiceClientFactory;
+import org.apache.thrift.protocol.TCompactProtocol;
+import org.apache.thrift.protocol.TMessage;
+import org.apache.thrift.protocol.TProtocol;
+import org.apache.thrift.protocol.TProtocolFactory;
+import org.apache.thrift.transport.TFramedTransport;
+import org.apache.thrift.transport.TSSLTransportFactory;
+import org.apache.thrift.transport.TServerSocket;
+import org.apache.thrift.transport.TSocket;
+import org.apache.thrift.transport.TTransport;
+import org.apache.thrift.transport.TTransportException;
+import org.apache.thrift.transport.TTransportFactory;
+
+import com.google.common.base.Preconditions;
+import com.google.common.net.HostAndPort;
+
+public class ThriftUtil {
+ private static final Logger log = Logger.getLogger(ThriftUtil.class);
+
+ public static class TraceProtocol extends TCompactProtocol {
+ private Span span = null;
+
+ @Override
+ public void writeMessageBegin(TMessage message) throws TException {
+ span = Trace.start("client:" + message.name);
+ super.writeMessageBegin(message);
+ }
+
+ @Override
+ public void writeMessageEnd() throws TException {
+ super.writeMessageEnd();
+ span.stop();
+ }
+
+ public TraceProtocol(TTransport transport) {
+ super(transport);
+ }
+ }
+
+ public static class TraceProtocolFactory extends TCompactProtocol.Factory {
+ private static final long serialVersionUID = 1L;
+
+ @Override
+ public TProtocol getProtocol(TTransport trans) {
+ return new TraceProtocol(trans);
+ }
+ }
+
+ static private TProtocolFactory protocolFactory = new TraceProtocolFactory();
+ static private TTransportFactory transportFactory = new TFramedTransport.Factory(Integer.MAX_VALUE);
+
+ static public <T extends TServiceClient> T createClient(TServiceClientFactory<T> factory, TTransport transport) {
+ return factory.getClient(protocolFactory.getProtocol(transport), protocolFactory.getProtocol(transport));
+ }
+
+ static public <T extends TServiceClient> T getClient(TServiceClientFactory<T> factory, HostAndPort address, ClientContext context)
+ throws TTransportException {
+ return createClient(factory, ThriftTransportPool.getInstance().getTransportWithDefaultTimeout(address, context));
+ }
+
+ static public <T extends TServiceClient> T getClientNoTimeout(TServiceClientFactory<T> factory, String address, ClientContext context)
+ throws TTransportException {
+ return getClient(factory, address, context, 0);
+ }
+
+ static public <T extends TServiceClient> T getClient(TServiceClientFactory<T> factory, String address, ClientContext context)
+ throws TTransportException {
+ TTransport transport = ThriftTransportPool.getInstance().getTransport(address, context.getClientTimeoutInMillis(), context);
+ return createClient(factory, transport);
+ }
+
+ static private <T extends TServiceClient> T getClient(TServiceClientFactory<T> factory, String address, ClientContext context, long timeout)
+ throws TTransportException {
+ TTransport transport = ThriftTransportPool.getInstance().getTransport(address, timeout, context);
+ return createClient(factory, transport);
+ }
+
+ static public void returnClient(TServiceClient iface) { // Eew... the typing here is horrible
+ if (iface != null) {
+ ThriftTransportPool.getInstance().returnTransport(iface.getInputProtocol().getTransport());
+ }
+ }
+
+ static public TabletClientService.Client getTServerClient(String address, ClientContext context) throws TTransportException {
+ return getClient(new TabletClientService.Client.Factory(), address, context);
+ }
+
+ static public TabletClientService.Client getTServerClient(String address, ClientContext context, long timeout) throws TTransportException {
+ return getClient(new TabletClientService.Client.Factory(), address, context, timeout);
+ }
+
+ public static void execute(String address, ClientContext context, ClientExec<TabletClientService.Client> exec) throws AccumuloException,
+ AccumuloSecurityException {
+ while (true) {
+ TabletClientService.Client client = null;
+ try {
+ exec.execute(client = getTServerClient(address, context));
+ break;
+ } catch (TTransportException tte) {
+ log.debug("getTServerClient request failed, retrying ... ", tte);
+ UtilWaitThread.sleep(100);
+ } catch (ThriftSecurityException e) {
+ throw new AccumuloSecurityException(e.user, e.code, e);
+ } catch (Exception e) {
+ throw new AccumuloException(e);
+ } finally {
+ if (client != null)
+ returnClient(client);
+ }
+ }
+ }
+
+ public static <T> T execute(String address, ClientContext context, ClientExecReturn<T,TabletClientService.Client> exec) throws AccumuloException,
+ AccumuloSecurityException {
+ while (true) {
+ TabletClientService.Client client = null;
+ try {
+ return exec.execute(client = getTServerClient(address, context));
+ } catch (TTransportException tte) {
+ log.debug("getTServerClient request failed, retrying ... ", tte);
+ UtilWaitThread.sleep(100);
+ } catch (ThriftSecurityException e) {
+ throw new AccumuloSecurityException(e.user, e.code, e);
+ } catch (Exception e) {
+ throw new AccumuloException(e);
+ } finally {
+ if (client != null)
+ returnClient(client);
+ }
+ }
+ }
+
+ /**
+ * create a transport that is not pooled
+ */
+ public static TTransport createTransport(HostAndPort address, ClientContext context) throws TException {
+ return createClientTransport(address, (int) context.getClientTimeoutInMillis(), context.getClientSslParams());
+ }
+
+ public static TTransportFactory transportFactory() {
+ return transportFactory;
+ }
+
+ private final static Map<Integer,TTransportFactory> factoryCache = new HashMap<Integer,TTransportFactory>();
+
+ synchronized public static TTransportFactory transportFactory(int maxFrameSize) {
+ TTransportFactory factory = factoryCache.get(maxFrameSize);
+ if (factory == null) {
+ factory = new TFramedTransport.Factory(maxFrameSize);
+ factoryCache.put(maxFrameSize, factory);
+ }
+ return factory;
+ }
+
+ synchronized public static TTransportFactory transportFactory(long maxFrameSize) {
+ if (maxFrameSize > Integer.MAX_VALUE || maxFrameSize < 1)
+ throw new RuntimeException("Thrift transport frames are limited to " + Integer.MAX_VALUE);
+ return transportFactory((int) maxFrameSize);
+ }
+
+ public static TProtocolFactory protocolFactory() {
+ return protocolFactory;
+ }
+
+ public static TServerSocket getServerSocket(int port, int timeout, InetAddress address, SslConnectionParams params) throws TTransportException {
+ TServerSocket tServerSock;
+ if (params.useJsse()) {
+ tServerSock = TSSLTransportFactory.getServerSocket(port, timeout, params.isClientAuth(), address);
+ } else {
+ tServerSock = TSSLTransportFactory.getServerSocket(port, timeout, address, params.getTTransportParams());
+ }
+
+ ServerSocket serverSock = tServerSock.getServerSocket();
+ if (serverSock instanceof SSLServerSocket) {
+ SSLServerSocket sslServerSock = (SSLServerSocket) serverSock;
+ String[] protocols = params.getServerProtocols();
+
+ // Be nice for the user and automatically remove protocols that might not exist in their JVM. Keeps us from forcing config alterations too
+ // e.g. TLSv1.1 and TLSv1.2 don't exist in JDK6
+ Set<String> socketEnabledProtocols = new HashSet<String>(Arrays.asList(sslServerSock.getEnabledProtocols()));
+ // Keep only the enabled protocols that were specified by the configuration
+ socketEnabledProtocols.retainAll(Arrays.asList(protocols));
+ if (socketEnabledProtocols.isEmpty()) {
+ // Bad configuration...
+ throw new RuntimeException("No available protocols available for secure socket. Availaable protocols: "
+ + Arrays.toString(sslServerSock.getEnabledProtocols()) + ", allowed protocols: " + Arrays.toString(protocols));
+ }
+
+ // Set the protocol(s) on the server socket
+ sslServerSock.setEnabledProtocols(socketEnabledProtocols.toArray(new String[0]));
+ }
+
+ return tServerSock;
+ }
+
+ public static TTransport createClientTransport(HostAndPort address, int timeout, SslConnectionParams sslParams) throws TTransportException {
+ boolean success = false;
+ TTransport transport = null;
+ try {
+ if (sslParams != null) {
+ // TSSLTransportFactory handles timeout 0 -> forever natively
+ if (sslParams.useJsse()) {
+ transport = TSSLTransportFactory.getClientSocket(address.getHostText(), address.getPort(), timeout);
+ } else {
+ // JDK6's factory doesn't appear to pass the protocol onto the Socket properly so we have
+ // to do some magic to make sure that happens. Not an issue in JDK7
+
+ // Taken from thrift-0.9.1 to make the SSLContext
+ SSLContext sslContext = createSSLContext(sslParams);
+
+ // Create the factory from it
+ SSLSocketFactory sslSockFactory = sslContext.getSocketFactory();
+
+ // Wrap the real factory with our own that will set the protocol on the Socket before returning it
+ ProtocolOverridingSSLSocketFactory wrappingSslSockFactory = new ProtocolOverridingSSLSocketFactory(sslSockFactory,
+ new String[] {sslParams.getClientProtocol()});
+
+ // Create the TSocket from that
+ transport = createClient(wrappingSslSockFactory, address.getHostText(), address.getPort(), timeout);
+ }
+ // TSSLTransportFactory leaves transports open, so no need to open here
+ } else if (timeout == 0) {
+ transport = new TSocket(address.getHostText(), address.getPort());
+ transport.open();
+ } else {
+ try {
+ transport = TTimeoutTransport.create(address, timeout);
+ } catch (IOException ex) {
+ throw new TTransportException(ex);
+ }
+ transport.open();
+ }
+ transport = ThriftUtil.transportFactory().getTransport(transport);
+ success = true;
+ } finally {
+ if (!success && transport != null) {
+ transport.close();
+ }
+ }
+ return transport;
+ }
+
+ /**
+ * Lifted from TSSLTransportFactory in Thrift-0.9.1. The method to create a client socket with an SSLContextFactory object is not visibile to us. Have to use
+ * SslConnectionParams instead of TSSLTransportParameters because no getters exist on TSSLTransportParameters.
+ *
+ * @param params
+ * Parameters to use to create the SSLContext
+ */
+ private static SSLContext createSSLContext(SslConnectionParams params) throws TTransportException {
+ SSLContext ctx;
+ try {
+ ctx = SSLContext.getInstance(params.getClientProtocol());
+ TrustManagerFactory tmf = null;
+ KeyManagerFactory kmf = null;
+
+ if (params.isTrustStoreSet()) {
+ tmf = TrustManagerFactory.getInstance(TrustManagerFactory.getDefaultAlgorithm());
+ KeyStore ts = KeyStore.getInstance(params.getTrustStoreType());
+ ts.load(new FileInputStream(params.getTrustStorePath()), params.getTrustStorePass().toCharArray());
+ tmf.init(ts);
+ }
+
+ if (params.isKeyStoreSet()) {
+ kmf = KeyManagerFactory.getInstance(KeyManagerFactory.getDefaultAlgorithm());
+ KeyStore ks = KeyStore.getInstance(params.getKeyStoreType());
+ ks.load(new FileInputStream(params.getKeyStorePath()), params.getKeyStorePass().toCharArray());
+ kmf.init(ks, params.getKeyStorePass().toCharArray());
+ }
+
+ if (params.isKeyStoreSet() && params.isTrustStoreSet()) {
+ ctx.init(kmf.getKeyManagers(), tmf.getTrustManagers(), null);
+ } else if (params.isKeyStoreSet()) {
+ ctx.init(kmf.getKeyManagers(), null, null);
+ } else {
+ ctx.init(null, tmf.getTrustManagers(), null);
+ }
+
+ } catch (Exception e) {
+ throw new TTransportException("Error creating the transport", e);
+ }
+ return ctx;
+ }
+
+ /**
+ * Lifted from Thrift-0.9.1 because it was private. Create an SSLSocket with the given factory, host:port, and timeout.
+ *
+ * @param factory
+ * Factory to create the socket from
+ * @param host
+ * Destination host
+ * @param port
+ * Destination port
+ * @param timeout
+ * Socket timeout
+ */
+ private static TSocket createClient(SSLSocketFactory factory, String host, int port, int timeout) throws TTransportException {
+ try {
+ SSLSocket socket = (SSLSocket) factory.createSocket(host, port);
+ socket.setSoTimeout(timeout);
+ return new TSocket(socket);
+ } catch (Exception e) {
+ throw new TTransportException("Could not connect to " + host + " on port " + port, e);
+ }
+ }
+
+ /**
+ * JDK6's SSLSocketFactory doesn't seem to properly set the protocols on the Sockets that it creates which causes an SSLv2 client hello message during
+ * handshake, even when only TLSv1 is enabled. This only appears to be an issue on the client sockets, not the server sockets.
+ *
+ * This class wraps the SSLSocketFactory ensuring that the Socket is properly configured.
+ * http://www.coderanch.com/t/637177/Security/Disabling-handshake-message-Java
+ *
+ * This class can be removed when JDK6 support is officially unsupported by Accumulo
+ */
+ private static class ProtocolOverridingSSLSocketFactory extends SSLSocketFactory {
+
+ private final SSLSocketFactory delegate;
+ private final String[] enabledProtocols;
+
+ public ProtocolOverridingSSLSocketFactory(final SSLSocketFactory delegate, final String[] enabledProtocols) {
+ Preconditions.checkNotNull(enabledProtocols);
+ Preconditions.checkArgument(0 != enabledProtocols.length, "Expected at least one protocol");
+ this.delegate = delegate;
+ this.enabledProtocols = enabledProtocols;
+ }
+
+ @Override
+ public String[] getDefaultCipherSuites() {
+ return delegate.getDefaultCipherSuites();
+ }
+
+ @Override
+ public String[] getSupportedCipherSuites() {
+ return delegate.getSupportedCipherSuites();
+ }
+
+ @Override
+ public Socket createSocket(final Socket socket, final String host, final int port, final boolean autoClose) throws IOException {
+ final Socket underlyingSocket = delegate.createSocket(socket, host, port, autoClose);
+ return overrideProtocol(underlyingSocket);
+ }
+
+ @Override
+ public Socket createSocket(final String host, final int port) throws IOException, UnknownHostException {
+ final Socket underlyingSocket = delegate.createSocket(host, port);
+ return overrideProtocol(underlyingSocket);
+ }
+
+ @Override
+ public Socket createSocket(final String host, final int port, final InetAddress localAddress, final int localPort) throws IOException, UnknownHostException {
+ final Socket underlyingSocket = delegate.createSocket(host, port, localAddress, localPort);
+ return overrideProtocol(underlyingSocket);
+ }
+
+ @Override
+ public Socket createSocket(final InetAddress host, final int port) throws IOException {
+ final Socket underlyingSocket = delegate.createSocket(host, port);
+ return overrideProtocol(underlyingSocket);
+ }
+
+ @Override
+ public Socket createSocket(final InetAddress host, final int port, final InetAddress localAddress, final int localPort) throws IOException {
+ final Socket underlyingSocket = delegate.createSocket(host, port, localAddress, localPort);
+ return overrideProtocol(underlyingSocket);
+ }
+
+ /**
+ * Set the {@link javax.net.ssl.SSLSocket#getEnabledProtocols() enabled protocols} to {@link #enabledProtocols} if the <code>socket</code> is a
+ * {@link SSLSocket}
+ *
+ * @param socket
+ * The Socket
+ */
+ private Socket overrideProtocol(final Socket socket) {
+ if (socket instanceof SSLSocket) {
+ ((SSLSocket) socket).setEnabledProtocols(enabledProtocols);
+ }
+ return socket;
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/accumulo/blob/de1d3ee3/core/src/main/java/org/apache/accumulo/core/util/SslConnectionParams.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/accumulo/core/util/SslConnectionParams.java b/core/src/main/java/org/apache/accumulo/core/util/SslConnectionParams.java
deleted file mode 100644
index cd433c6..0000000
--- a/core/src/main/java/org/apache/accumulo/core/util/SslConnectionParams.java
+++ /dev/null
@@ -1,275 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.accumulo.core.util;
-
-import java.io.File;
-import java.io.FileNotFoundException;
-import java.net.URL;
-import java.util.Arrays;
-
-import org.apache.accumulo.core.conf.AccumuloConfiguration;
-import org.apache.accumulo.core.conf.Property;
-import org.apache.commons.lang.StringUtils;
-import org.apache.log4j.Logger;
-import org.apache.thrift.transport.TSSLTransportFactory.TSSLTransportParameters;
-
-public class SslConnectionParams {
- private static final Logger log = Logger.getLogger(SslConnectionParams.class);
-
- private boolean useJsse = false;
- private boolean clientAuth = false;
-
- private boolean keyStoreSet;
- private String keyStorePath;
- private String keyStorePass;
- private String keyStoreType;
-
- private boolean trustStoreSet;
- private String trustStorePath;
- private String trustStorePass;
- private String trustStoreType;
-
- private String[] cipherSuites;
- private String[] serverProtocols;
- private String clientProtocol;
-
- // Use the static construction methods
- private SslConnectionParams() {}
-
- public static SslConnectionParams forConfig(AccumuloConfiguration conf, boolean server) {
- if (!conf.getBoolean(Property.INSTANCE_RPC_SSL_ENABLED))
- return null;
-
- SslConnectionParams result = new SslConnectionParams();
- boolean requireClientAuth = conf.getBoolean(Property.INSTANCE_RPC_SSL_CLIENT_AUTH);
- if (server) {
- result.setClientAuth(requireClientAuth);
- }
- if (conf.getBoolean(Property.RPC_USE_JSSE)) {
- result.setUseJsse(true);
- return result;
- }
-
- try {
- if (!server || requireClientAuth) {
- result.setTrustStoreFromConf(conf);
- }
- if (server || requireClientAuth) {
- result.setKeyStoreFromConf(conf);
- }
- } catch (FileNotFoundException e) {
- throw new IllegalArgumentException("Could not load configured keystore file", e);
- }
-
- String ciphers = conf.get(Property.RPC_SSL_CIPHER_SUITES);
- if (null != ciphers && !ciphers.isEmpty()) {
- result.cipherSuites = StringUtils.split(ciphers, ',');
- }
-
- String enabledProtocols = conf.get(Property.RPC_SSL_ENABLED_PROTOCOLS);
- result.serverProtocols = StringUtils.split(enabledProtocols, ',');
-
- result.clientProtocol = conf.get(Property.RPC_SSL_CLIENT_PROTOCOL);
-
- return result;
- }
-
- private static String passwordFromConf(AccumuloConfiguration conf, String defaultPassword, Property passwordOverrideProperty) {
- String keystorePassword = conf.get(passwordOverrideProperty);
- if (!keystorePassword.isEmpty()) {
- if (log.isTraceEnabled())
- log.trace("Using explicit SSL private key password from " + passwordOverrideProperty.getKey());
- } else {
- keystorePassword = defaultPassword;
- }
- return keystorePassword;
- }
-
- private static String storePathFromConf(AccumuloConfiguration conf, Property pathProperty) throws FileNotFoundException {
- return findKeystore(conf.getPath(pathProperty));
- }
-
- public void setKeyStoreFromConf(AccumuloConfiguration conf) throws FileNotFoundException {
- keyStoreSet = true;
- keyStorePath = storePathFromConf(conf, Property.RPC_SSL_KEYSTORE_PATH);
- keyStorePass = passwordFromConf(conf, conf.get(Property.INSTANCE_SECRET), Property.RPC_SSL_KEYSTORE_PASSWORD);
- keyStoreType = conf.get(Property.RPC_SSL_KEYSTORE_TYPE);
- }
-
- public void setTrustStoreFromConf(AccumuloConfiguration conf) throws FileNotFoundException {
- trustStoreSet = true;
- trustStorePath = storePathFromConf(conf, Property.RPC_SSL_TRUSTSTORE_PATH);
- trustStorePass = passwordFromConf(conf, "", Property.RPC_SSL_TRUSTSTORE_PASSWORD);
- trustStoreType = conf.get(Property.RPC_SSL_TRUSTSTORE_TYPE);
- }
-
- public static SslConnectionParams forServer(AccumuloConfiguration configuration) {
- return forConfig(configuration, true);
- }
-
- public static SslConnectionParams forClient(AccumuloConfiguration configuration) {
- return forConfig(configuration, false);
- }
-
- private static String findKeystore(String keystorePath) throws FileNotFoundException {
- try {
- // first just try the file
- File file = new File(keystorePath);
- if (file.exists())
- return file.getAbsolutePath();
- if (!file.isAbsolute()) {
- // try classpath
- URL url = SslConnectionParams.class.getClassLoader().getResource(keystorePath);
- if (url != null) {
- file = new File(url.toURI());
- if (file.exists())
- return file.getAbsolutePath();
- }
- }
- } catch (Exception e) {
- log.warn("Exception finding keystore", e);
- }
- throw new FileNotFoundException("Failed to load SSL keystore from " + keystorePath);
- }
-
- public void setUseJsse(boolean useJsse) {
- this.useJsse = useJsse;
- }
-
- public boolean useJsse() {
- return useJsse;
- }
-
- public void setClientAuth(boolean clientAuth) {
- this.clientAuth = clientAuth;
- }
-
- public boolean isClientAuth() {
- return clientAuth;
- }
-
- public String[] getServerProtocols() {
- return serverProtocols;
- }
-
- public String getClientProtocol() {
- return clientProtocol;
- }
-
- public boolean isKeyStoreSet() {
- return keyStoreSet;
- }
-
- public String getKeyStorePath() {
- return keyStorePath;
- }
-
- /**
- * @return the keyStorePass
- */
- public String getKeyStorePass() {
- return keyStorePass;
- }
-
- public String getKeyStoreType() {
- return keyStoreType;
- }
-
- public boolean isTrustStoreSet() {
- return trustStoreSet;
- }
-
- public String getTrustStorePath() {
- return trustStorePath;
- }
-
- public String getTrustStorePass() {
- return trustStorePass;
- }
-
- /**
- * @return the trustStoreType
- */
- public String getTrustStoreType() {
- return trustStoreType;
- }
-
- public TSSLTransportParameters getTTransportParams() {
- if (useJsse)
- throw new IllegalStateException("Cannot get TTransportParams for JSEE configuration.");
-
- // Null cipherSuites is implicitly handled
- TSSLTransportParameters params = new TSSLTransportParameters(clientProtocol, cipherSuites);
-
- params.requireClientAuth(clientAuth);
- if (keyStoreSet) {
- params.setKeyStore(keyStorePath, keyStorePass, null, keyStoreType);
- }
- if (trustStoreSet) {
- params.setTrustStore(trustStorePath, trustStorePass, null, trustStoreType);
- }
- return params;
- }
-
- @Override
- public int hashCode() {
- int hash = 0;
- hash = 31 * hash + (clientAuth ? 0 : 1);
- hash = 31 * hash + (useJsse ? 0 : 1);
- if (useJsse)
- return hash;
- hash = 31 * hash + (keyStoreSet ? 0 : 1);
- hash = 31 * hash + (trustStoreSet ? 0 : 1);
- if (keyStoreSet) {
- hash = 31 * hash + keyStorePath.hashCode();
- }
- if (trustStoreSet) {
- hash = 31 * hash + trustStorePath.hashCode();
- }
- hash = 31 * hash + clientProtocol.hashCode();
- hash = 31 * hash + Arrays.hashCode(serverProtocols);
- return super.hashCode();
- }
-
- @Override
- public boolean equals(Object obj) {
- if (!(obj instanceof SslConnectionParams))
- return false;
-
- SslConnectionParams other = (SslConnectionParams) obj;
- if (clientAuth != other.clientAuth)
- return false;
- if (useJsse)
- return other.useJsse;
- if (keyStoreSet) {
- if (!other.keyStoreSet)
- return false;
- if (!keyStorePath.equals(other.keyStorePath) || !keyStorePass.equals(other.keyStorePass) || !keyStoreType.equals(other.keyStoreType))
- return false;
- }
- if (trustStoreSet) {
- if (!other.trustStoreSet)
- return false;
- if (!trustStorePath.equals(other.trustStorePath) || !trustStorePass.equals(other.trustStorePass) || !trustStoreType.equals(other.trustStoreType))
- return false;
- }
- if (!Arrays.equals(serverProtocols, other.serverProtocols)) {
- return false;
- }
- return clientProtocol.equals(other.clientProtocol);
- }
-}
http://git-wip-us.apache.org/repos/asf/accumulo/blob/de1d3ee3/core/src/main/java/org/apache/accumulo/core/util/TBufferedSocket.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/accumulo/core/util/TBufferedSocket.java b/core/src/main/java/org/apache/accumulo/core/util/TBufferedSocket.java
deleted file mode 100644
index c2d9400..0000000
--- a/core/src/main/java/org/apache/accumulo/core/util/TBufferedSocket.java
+++ /dev/null
@@ -1,39 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.accumulo.core.util;
-
-import java.io.BufferedInputStream;
-import java.io.BufferedOutputStream;
-import java.io.IOException;
-
-import org.apache.thrift.transport.TIOStreamTransport;
-import org.apache.thrift.transport.TSocket;
-
-public class TBufferedSocket extends TIOStreamTransport {
-
- String client;
-
- public TBufferedSocket(TSocket sock, int bufferSize) throws IOException {
- super(new BufferedInputStream(sock.getSocket().getInputStream(), bufferSize), new BufferedOutputStream(sock.getSocket().getOutputStream(), bufferSize));
- client = sock.getSocket().getInetAddress().getHostAddress() + ":" + sock.getSocket().getPort();
- }
-
- public String getClientString() {
- return client;
- }
-
-}
http://git-wip-us.apache.org/repos/asf/accumulo/blob/de1d3ee3/core/src/main/java/org/apache/accumulo/core/util/TTimeoutTransport.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/accumulo/core/util/TTimeoutTransport.java b/core/src/main/java/org/apache/accumulo/core/util/TTimeoutTransport.java
deleted file mode 100644
index a66b268..0000000
--- a/core/src/main/java/org/apache/accumulo/core/util/TTimeoutTransport.java
+++ /dev/null
@@ -1,60 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.accumulo.core.util;
-
-import java.io.BufferedInputStream;
-import java.io.BufferedOutputStream;
-import java.io.IOException;
-import java.io.InputStream;
-import java.io.OutputStream;
-import java.lang.reflect.Method;
-import java.net.InetSocketAddress;
-import java.net.Socket;
-import java.net.SocketAddress;
-import java.nio.channels.spi.SelectorProvider;
-
-import org.apache.hadoop.net.NetUtils;
-import org.apache.thrift.transport.TIOStreamTransport;
-import org.apache.thrift.transport.TTransport;
-
-import com.google.common.net.HostAndPort;
-
-public class TTimeoutTransport {
-
- private static InputStream getInputStream(Socket socket, long timeout) {
- try {
- Method m = NetUtils.class.getMethod("getInputStream", Socket.class, Long.TYPE);
- return (InputStream)m.invoke(null, socket, timeout);
- } catch (Exception e) {
- throw new RuntimeException(e);
- }
- }
-
- public static TTransport create(HostAndPort addr, long timeoutMillis) throws IOException {
- return create(new InetSocketAddress(addr.getHostText(), addr.getPort()), timeoutMillis);
- }
-
- public static TTransport create(SocketAddress addr, long timeoutMillis) throws IOException {
- Socket socket = SelectorProvider.provider().openSocketChannel().socket();
- socket.setSoLinger(false, 0);
- socket.setTcpNoDelay(true);
- socket.connect(addr);
- InputStream input = new BufferedInputStream(getInputStream(socket, timeoutMillis), 1024 * 10);
- OutputStream output = new BufferedOutputStream(NetUtils.getOutputStream(socket, timeoutMillis), 1024 * 10);
- return new TIOStreamTransport(input, output);
- }
-}
http://git-wip-us.apache.org/repos/asf/accumulo/blob/de1d3ee3/core/src/main/java/org/apache/accumulo/core/util/ThriftUtil.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/accumulo/core/util/ThriftUtil.java b/core/src/main/java/org/apache/accumulo/core/util/ThriftUtil.java
deleted file mode 100644
index 619131a..0000000
--- a/core/src/main/java/org/apache/accumulo/core/util/ThriftUtil.java
+++ /dev/null
@@ -1,433 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.accumulo.core.util;
-
-import java.io.FileInputStream;
-import java.io.IOException;
-import java.net.InetAddress;
-import java.net.ServerSocket;
-import java.net.Socket;
-import java.net.UnknownHostException;
-import java.security.KeyStore;
-import java.util.Arrays;
-import java.util.HashMap;
-import java.util.HashSet;
-import java.util.Map;
-import java.util.Set;
-
-import javax.net.ssl.KeyManagerFactory;
-import javax.net.ssl.SSLContext;
-import javax.net.ssl.SSLServerSocket;
-import javax.net.ssl.SSLSocket;
-import javax.net.ssl.SSLSocketFactory;
-import javax.net.ssl.TrustManagerFactory;
-
-import org.apache.accumulo.core.client.AccumuloException;
-import org.apache.accumulo.core.client.AccumuloSecurityException;
-import org.apache.accumulo.core.client.impl.ClientContext;
-import org.apache.accumulo.core.client.impl.ClientExec;
-import org.apache.accumulo.core.client.impl.ClientExecReturn;
-import org.apache.accumulo.core.client.impl.ThriftTransportPool;
-import org.apache.accumulo.core.client.impl.thrift.ThriftSecurityException;
-import org.apache.accumulo.core.tabletserver.thrift.TabletClientService;
-import org.apache.accumulo.core.trace.Span;
-import org.apache.accumulo.core.trace.Trace;
-import org.apache.log4j.Logger;
-import org.apache.thrift.TException;
-import org.apache.thrift.TServiceClient;
-import org.apache.thrift.TServiceClientFactory;
-import org.apache.thrift.protocol.TCompactProtocol;
-import org.apache.thrift.protocol.TMessage;
-import org.apache.thrift.protocol.TProtocol;
-import org.apache.thrift.protocol.TProtocolFactory;
-import org.apache.thrift.transport.TFramedTransport;
-import org.apache.thrift.transport.TSSLTransportFactory;
-import org.apache.thrift.transport.TServerSocket;
-import org.apache.thrift.transport.TSocket;
-import org.apache.thrift.transport.TTransport;
-import org.apache.thrift.transport.TTransportException;
-import org.apache.thrift.transport.TTransportFactory;
-
-import com.google.common.base.Preconditions;
-import com.google.common.net.HostAndPort;
-
-public class ThriftUtil {
- private static final Logger log = Logger.getLogger(ThriftUtil.class);
-
- public static class TraceProtocol extends TCompactProtocol {
- private Span span = null;
-
- @Override
- public void writeMessageBegin(TMessage message) throws TException {
- span = Trace.start("client:" + message.name);
- super.writeMessageBegin(message);
- }
-
- @Override
- public void writeMessageEnd() throws TException {
- super.writeMessageEnd();
- span.stop();
- }
-
- public TraceProtocol(TTransport transport) {
- super(transport);
- }
- }
-
- public static class TraceProtocolFactory extends TCompactProtocol.Factory {
- private static final long serialVersionUID = 1L;
-
- @Override
- public TProtocol getProtocol(TTransport trans) {
- return new TraceProtocol(trans);
- }
- }
-
- static private TProtocolFactory protocolFactory = new TraceProtocolFactory();
- static private TTransportFactory transportFactory = new TFramedTransport.Factory(Integer.MAX_VALUE);
-
- static public <T extends TServiceClient> T createClient(TServiceClientFactory<T> factory, TTransport transport) {
- return factory.getClient(protocolFactory.getProtocol(transport), protocolFactory.getProtocol(transport));
- }
-
- static public <T extends TServiceClient> T getClient(TServiceClientFactory<T> factory, HostAndPort address, ClientContext context)
- throws TTransportException {
- return createClient(factory, ThriftTransportPool.getInstance().getTransportWithDefaultTimeout(address, context));
- }
-
- static public <T extends TServiceClient> T getClientNoTimeout(TServiceClientFactory<T> factory, String address, ClientContext context)
- throws TTransportException {
- return getClient(factory, address, context, 0);
- }
-
- static public <T extends TServiceClient> T getClient(TServiceClientFactory<T> factory, String address, ClientContext context)
- throws TTransportException {
- TTransport transport = ThriftTransportPool.getInstance().getTransport(address, context.getClientTimeoutInMillis(), context);
- return createClient(factory, transport);
- }
-
- static private <T extends TServiceClient> T getClient(TServiceClientFactory<T> factory, String address, ClientContext context, long timeout)
- throws TTransportException {
- TTransport transport = ThriftTransportPool.getInstance().getTransport(address, timeout, context);
- return createClient(factory, transport);
- }
-
- static public void returnClient(TServiceClient iface) { // Eew... the typing here is horrible
- if (iface != null) {
- ThriftTransportPool.getInstance().returnTransport(iface.getInputProtocol().getTransport());
- }
- }
-
- static public TabletClientService.Client getTServerClient(String address, ClientContext context) throws TTransportException {
- return getClient(new TabletClientService.Client.Factory(), address, context);
- }
-
- static public TabletClientService.Client getTServerClient(String address, ClientContext context, long timeout) throws TTransportException {
- return getClient(new TabletClientService.Client.Factory(), address, context, timeout);
- }
-
- public static void execute(String address, ClientContext context, ClientExec<TabletClientService.Client> exec) throws AccumuloException,
- AccumuloSecurityException {
- while (true) {
- TabletClientService.Client client = null;
- try {
- exec.execute(client = getTServerClient(address, context));
- break;
- } catch (TTransportException tte) {
- log.debug("getTServerClient request failed, retrying ... ", tte);
- UtilWaitThread.sleep(100);
- } catch (ThriftSecurityException e) {
- throw new AccumuloSecurityException(e.user, e.code, e);
- } catch (Exception e) {
- throw new AccumuloException(e);
- } finally {
- if (client != null)
- returnClient(client);
- }
- }
- }
-
- public static <T> T execute(String address, ClientContext context, ClientExecReturn<T,TabletClientService.Client> exec) throws AccumuloException,
- AccumuloSecurityException {
- while (true) {
- TabletClientService.Client client = null;
- try {
- return exec.execute(client = getTServerClient(address, context));
- } catch (TTransportException tte) {
- log.debug("getTServerClient request failed, retrying ... ", tte);
- UtilWaitThread.sleep(100);
- } catch (ThriftSecurityException e) {
- throw new AccumuloSecurityException(e.user, e.code, e);
- } catch (Exception e) {
- throw new AccumuloException(e);
- } finally {
- if (client != null)
- returnClient(client);
- }
- }
- }
-
- /**
- * create a transport that is not pooled
- */
- public static TTransport createTransport(HostAndPort address, ClientContext context) throws TException {
- return createClientTransport(address, (int) context.getClientTimeoutInMillis(), context.getClientSslParams());
- }
-
- public static TTransportFactory transportFactory() {
- return transportFactory;
- }
-
- private final static Map<Integer,TTransportFactory> factoryCache = new HashMap<Integer,TTransportFactory>();
-
- synchronized public static TTransportFactory transportFactory(int maxFrameSize) {
- TTransportFactory factory = factoryCache.get(maxFrameSize);
- if (factory == null) {
- factory = new TFramedTransport.Factory(maxFrameSize);
- factoryCache.put(maxFrameSize, factory);
- }
- return factory;
- }
-
- synchronized public static TTransportFactory transportFactory(long maxFrameSize) {
- if (maxFrameSize > Integer.MAX_VALUE || maxFrameSize < 1)
- throw new RuntimeException("Thrift transport frames are limited to " + Integer.MAX_VALUE);
- return transportFactory((int) maxFrameSize);
- }
-
- public static TProtocolFactory protocolFactory() {
- return protocolFactory;
- }
-
- public static TServerSocket getServerSocket(int port, int timeout, InetAddress address, SslConnectionParams params) throws TTransportException {
- TServerSocket tServerSock;
- if (params.useJsse()) {
- tServerSock = TSSLTransportFactory.getServerSocket(port, timeout, params.isClientAuth(), address);
- } else {
- tServerSock = TSSLTransportFactory.getServerSocket(port, timeout, address, params.getTTransportParams());
- }
-
- ServerSocket serverSock = tServerSock.getServerSocket();
- if (serverSock instanceof SSLServerSocket) {
- SSLServerSocket sslServerSock = (SSLServerSocket) serverSock;
- String[] protocols = params.getServerProtocols();
-
- // Be nice for the user and automatically remove protocols that might not exist in their JVM. Keeps us from forcing config alterations too
- // e.g. TLSv1.1 and TLSv1.2 don't exist in JDK6
- Set<String> socketEnabledProtocols = new HashSet<String>(Arrays.asList(sslServerSock.getEnabledProtocols()));
- // Keep only the enabled protocols that were specified by the configuration
- socketEnabledProtocols.retainAll(Arrays.asList(protocols));
- if (socketEnabledProtocols.isEmpty()) {
- // Bad configuration...
- throw new RuntimeException("No available protocols available for secure socket. Availaable protocols: "
- + Arrays.toString(sslServerSock.getEnabledProtocols()) + ", allowed protocols: " + Arrays.toString(protocols));
- }
-
- // Set the protocol(s) on the server socket
- sslServerSock.setEnabledProtocols(socketEnabledProtocols.toArray(new String[0]));
- }
-
- return tServerSock;
- }
-
- public static TTransport createClientTransport(HostAndPort address, int timeout, SslConnectionParams sslParams) throws TTransportException {
- boolean success = false;
- TTransport transport = null;
- try {
- if (sslParams != null) {
- // TSSLTransportFactory handles timeout 0 -> forever natively
- if (sslParams.useJsse()) {
- transport = TSSLTransportFactory.getClientSocket(address.getHostText(), address.getPort(), timeout);
- } else {
- // JDK6's factory doesn't appear to pass the protocol onto the Socket properly so we have
- // to do some magic to make sure that happens. Not an issue in JDK7
-
- // Taken from thrift-0.9.1 to make the SSLContext
- SSLContext sslContext = createSSLContext(sslParams);
-
- // Create the factory from it
- SSLSocketFactory sslSockFactory = sslContext.getSocketFactory();
-
- // Wrap the real factory with our own that will set the protocol on the Socket before returning it
- ProtocolOverridingSSLSocketFactory wrappingSslSockFactory = new ProtocolOverridingSSLSocketFactory(sslSockFactory,
- new String[] {sslParams.getClientProtocol()});
-
- // Create the TSocket from that
- transport = createClient(wrappingSslSockFactory, address.getHostText(), address.getPort(), timeout);
- }
- // TSSLTransportFactory leaves transports open, so no need to open here
- } else if (timeout == 0) {
- transport = new TSocket(address.getHostText(), address.getPort());
- transport.open();
- } else {
- try {
- transport = TTimeoutTransport.create(address, timeout);
- } catch (IOException ex) {
- throw new TTransportException(ex);
- }
- transport.open();
- }
- transport = ThriftUtil.transportFactory().getTransport(transport);
- success = true;
- } finally {
- if (!success && transport != null) {
- transport.close();
- }
- }
- return transport;
- }
-
- /**
- * Lifted from TSSLTransportFactory in Thrift-0.9.1. The method to create a client socket with an SSLContextFactory object is not visibile to us. Have to use
- * SslConnectionParams instead of TSSLTransportParameters because no getters exist on TSSLTransportParameters.
- *
- * @param params
- * Parameters to use to create the SSLContext
- */
- private static SSLContext createSSLContext(SslConnectionParams params) throws TTransportException {
- SSLContext ctx;
- try {
- ctx = SSLContext.getInstance(params.getClientProtocol());
- TrustManagerFactory tmf = null;
- KeyManagerFactory kmf = null;
-
- if (params.isTrustStoreSet()) {
- tmf = TrustManagerFactory.getInstance(TrustManagerFactory.getDefaultAlgorithm());
- KeyStore ts = KeyStore.getInstance(params.getTrustStoreType());
- ts.load(new FileInputStream(params.getTrustStorePath()), params.getTrustStorePass().toCharArray());
- tmf.init(ts);
- }
-
- if (params.isKeyStoreSet()) {
- kmf = KeyManagerFactory.getInstance(KeyManagerFactory.getDefaultAlgorithm());
- KeyStore ks = KeyStore.getInstance(params.getKeyStoreType());
- ks.load(new FileInputStream(params.getKeyStorePath()), params.getKeyStorePass().toCharArray());
- kmf.init(ks, params.getKeyStorePass().toCharArray());
- }
-
- if (params.isKeyStoreSet() && params.isTrustStoreSet()) {
- ctx.init(kmf.getKeyManagers(), tmf.getTrustManagers(), null);
- } else if (params.isKeyStoreSet()) {
- ctx.init(kmf.getKeyManagers(), null, null);
- } else {
- ctx.init(null, tmf.getTrustManagers(), null);
- }
-
- } catch (Exception e) {
- throw new TTransportException("Error creating the transport", e);
- }
- return ctx;
- }
-
- /**
- * Lifted from Thrift-0.9.1 because it was private. Create an SSLSocket with the given factory, host:port, and timeout.
- *
- * @param factory
- * Factory to create the socket from
- * @param host
- * Destination host
- * @param port
- * Destination port
- * @param timeout
- * Socket timeout
- */
- private static TSocket createClient(SSLSocketFactory factory, String host, int port, int timeout) throws TTransportException {
- try {
- SSLSocket socket = (SSLSocket) factory.createSocket(host, port);
- socket.setSoTimeout(timeout);
- return new TSocket(socket);
- } catch (Exception e) {
- throw new TTransportException("Could not connect to " + host + " on port " + port, e);
- }
- }
-
- /**
- * JDK6's SSLSocketFactory doesn't seem to properly set the protocols on the Sockets that it creates which causes an SSLv2 client hello message during
- * handshake, even when only TLSv1 is enabled. This only appears to be an issue on the client sockets, not the server sockets.
- *
- * This class wraps the SSLSocketFactory ensuring that the Socket is properly configured.
- * http://www.coderanch.com/t/637177/Security/Disabling-handshake-message-Java
- *
- * This class can be removed when JDK6 support is officially unsupported by Accumulo
- */
- private static class ProtocolOverridingSSLSocketFactory extends SSLSocketFactory {
-
- private final SSLSocketFactory delegate;
- private final String[] enabledProtocols;
-
- public ProtocolOverridingSSLSocketFactory(final SSLSocketFactory delegate, final String[] enabledProtocols) {
- Preconditions.checkNotNull(enabledProtocols);
- Preconditions.checkArgument(0 != enabledProtocols.length, "Expected at least one protocol");
- this.delegate = delegate;
- this.enabledProtocols = enabledProtocols;
- }
-
- @Override
- public String[] getDefaultCipherSuites() {
- return delegate.getDefaultCipherSuites();
- }
-
- @Override
- public String[] getSupportedCipherSuites() {
- return delegate.getSupportedCipherSuites();
- }
-
- @Override
- public Socket createSocket(final Socket socket, final String host, final int port, final boolean autoClose) throws IOException {
- final Socket underlyingSocket = delegate.createSocket(socket, host, port, autoClose);
- return overrideProtocol(underlyingSocket);
- }
-
- @Override
- public Socket createSocket(final String host, final int port) throws IOException, UnknownHostException {
- final Socket underlyingSocket = delegate.createSocket(host, port);
- return overrideProtocol(underlyingSocket);
- }
-
- @Override
- public Socket createSocket(final String host, final int port, final InetAddress localAddress, final int localPort) throws IOException, UnknownHostException {
- final Socket underlyingSocket = delegate.createSocket(host, port, localAddress, localPort);
- return overrideProtocol(underlyingSocket);
- }
-
- @Override
- public Socket createSocket(final InetAddress host, final int port) throws IOException {
- final Socket underlyingSocket = delegate.createSocket(host, port);
- return overrideProtocol(underlyingSocket);
- }
-
- @Override
- public Socket createSocket(final InetAddress host, final int port, final InetAddress localAddress, final int localPort) throws IOException {
- final Socket underlyingSocket = delegate.createSocket(host, port, localAddress, localPort);
- return overrideProtocol(underlyingSocket);
- }
-
- /**
- * Set the {@link javax.net.ssl.SSLSocket#getEnabledProtocols() enabled protocols} to {@link #enabledProtocols} if the <code>socket</code> is a
- * {@link SSLSocket}
- *
- * @param socket
- * The Socket
- */
- private Socket overrideProtocol(final Socket socket) {
- if (socket instanceof SSLSocket) {
- ((SSLSocket) socket).setEnabledProtocols(enabledProtocols);
- }
- return socket;
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/accumulo/blob/de1d3ee3/proxy/src/main/java/org/apache/accumulo/proxy/Proxy.java
----------------------------------------------------------------------
diff --git a/proxy/src/main/java/org/apache/accumulo/proxy/Proxy.java b/proxy/src/main/java/org/apache/accumulo/proxy/Proxy.java
index 0a7b301..4b048eb 100644
--- a/proxy/src/main/java/org/apache/accumulo/proxy/Proxy.java
+++ b/proxy/src/main/java/org/apache/accumulo/proxy/Proxy.java
@@ -27,7 +27,7 @@ import org.apache.accumulo.core.cli.Help;
import org.apache.accumulo.core.conf.AccumuloConfiguration;
import org.apache.accumulo.minicluster.MiniAccumuloCluster;
import org.apache.accumulo.proxy.thrift.AccumuloProxy;
-import org.apache.accumulo.server.thrift.RpcWrapper;
+import org.apache.accumulo.server.rpc.RpcWrapper;
import org.apache.log4j.Logger;
import org.apache.thrift.TProcessor;
import org.apache.thrift.protocol.TCompactProtocol;