You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@accumulo.apache.org by vi...@apache.org on 2014/01/07 00:34:06 UTC
[05/10] git commit: Merge branch '1.5.1-SNAPSHOT' into 1.6.0-SNAPSHOT
Merge branch '1.5.1-SNAPSHOT' into 1.6.0-SNAPSHOT
Conflicts:
core/src/main/java/org/apache/accumulo/core/client/ZooKeeperInstance.java
core/src/main/java/org/apache/accumulo/core/util/ThriftUtil.java
core/src/test/java/org/apache/accumulo/core/client/impl/TabletLocatorImplTest.java
server/base/src/main/java/org/apache/accumulo/server/client/HdfsZooInstance.java
Project: http://git-wip-us.apache.org/repos/asf/accumulo/repo
Commit: http://git-wip-us.apache.org/repos/asf/accumulo/commit/f17661cd
Tree: http://git-wip-us.apache.org/repos/asf/accumulo/tree/f17661cd
Diff: http://git-wip-us.apache.org/repos/asf/accumulo/diff/f17661cd
Branch: refs/heads/master
Commit: f17661cd13180b3dce20ed833c0511c558aa3164
Parents: 016f3bb f624d40
Author: Keith Turner <kt...@apache.org>
Authored: Mon Jan 6 17:38:34 2014 -0500
Committer: Keith Turner <kt...@apache.org>
Committed: Mon Jan 6 17:38:34 2014 -0500
----------------------------------------------------------------------
.../apache/accumulo/core/client/Instance.java | 8 +--
.../accumulo/core/client/ZooKeeperInstance.java | 51 ++------------------
.../core/client/impl/ThriftTransportPool.java | 16 ++----
.../accumulo/core/client/mock/MockInstance.java | 5 --
.../apache/accumulo/core/util/ThriftUtil.java | 4 --
.../core/client/impl/TabletLocatorImplTest.java | 5 --
.../accumulo/fate/zookeeper/ZooCache.java | 7 ---
.../accumulo/fate/zookeeper/ZooReader.java | 12 -----
.../accumulo/server/client/HdfsZooInstance.java | 6 ---
9 files changed, 9 insertions(+), 105 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/accumulo/blob/f17661cd/core/src/main/java/org/apache/accumulo/core/client/Instance.java
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/accumulo/blob/f17661cd/core/src/main/java/org/apache/accumulo/core/client/ZooKeeperInstance.java
----------------------------------------------------------------------
diff --cc core/src/main/java/org/apache/accumulo/core/client/ZooKeeperInstance.java
index 4d12103,ccfb328..1497153
--- a/core/src/main/java/org/apache/accumulo/core/client/ZooKeeperInstance.java
+++ b/core/src/main/java/org/apache/accumulo/core/client/ZooKeeperInstance.java
@@@ -20,12 -22,9 +20,11 @@@ import java.nio.ByteBuffer
import java.util.Collections;
import java.util.List;
import java.util.UUID;
- import java.util.concurrent.atomic.AtomicInteger;
import org.apache.accumulo.core.Constants;
+import org.apache.accumulo.core.client.ClientConfiguration.ClientProperty;
import org.apache.accumulo.core.client.impl.ConnectorImpl;
+import org.apache.accumulo.core.client.impl.ServerConfigurationUtil;
import org.apache.accumulo.core.client.security.tokens.AuthenticationToken;
import org.apache.accumulo.core.client.security.tokens.PasswordToken;
import org.apache.accumulo.core.conf.AccumuloConfiguration;
@@@ -71,11 -74,6 +70,9 @@@ public class ZooKeeperInstance implemen
private final int zooKeepersSessionTimeOut;
+ private AccumuloConfiguration accumuloConf;
+ private ClientConfiguration clientConf;
+
- private volatile boolean closed = false;
-
/**
*
* @param instanceName
@@@ -125,35 -125,14 +122,34 @@@
* A comma separated list of zoo keeper server locations. Each location can contain an optional port, of the format host:port.
* @param sessionTimeout
* zoo keeper session time out in milliseconds.
+ * @deprecated since 1.6.0; Use {@link #ZooKeeperInstance(Configuration)} instead.
*/
-
+ @Deprecated
public ZooKeeperInstance(UUID instanceId, String zooKeepers, int sessionTimeout) {
- ArgumentChecker.notNull(instanceId, zooKeepers);
- this.instanceId = instanceId.toString();
- this.zooKeepers = zooKeepers;
- this.zooKeepersSessionTimeOut = sessionTimeout;
- zooCache = ZooCache.getInstance(zooKeepers, sessionTimeout);
+ this(ClientConfiguration.loadDefault().withInstance(instanceId).withZkHosts(zooKeepers).withZkTimeout(sessionTimeout));
+ }
+
+ /**
+ * @param config
+ * Client configuration for specifying connection options.
+ * See {@link ClientConfiguration} which extends Configuration with convenience methods specific to Accumulo.
+ * @since 1.6.0
+ */
+
+ public ZooKeeperInstance(Configuration config) {
+ ArgumentChecker.notNull(config);
+ if (config instanceof ClientConfiguration) {
+ this.clientConf = (ClientConfiguration)config;
+ } else {
+ this.clientConf = new ClientConfiguration(config);
+ }
+ this.instanceId = clientConf.get(ClientProperty.INSTANCE_ID);
+ this.instanceName = clientConf.get(ClientProperty.INSTANCE_NAME);
+ if ((instanceId == null) == (instanceName == null))
+ throw new IllegalArgumentException("Expected exactly one of instanceName and instanceId to be set");
+ this.zooKeepers = clientConf.get(ClientProperty.INSTANCE_ZK_HOST);
+ this.zooKeepersSessionTimeOut = (int) AccumuloConfiguration.getTimeInMillis(clientConf.get(ClientProperty.INSTANCE_ZK_TIMEOUT));
+ zooCache = ZooCache.getInstance(zooKeepers, zooKeepersSessionTimeOut);
- clientInstances.incrementAndGet();
}
@Override
@@@ -199,10 -174,8 +191,8 @@@
}
@Override
- public synchronized String getRootTabletLocation() {
- if (closed)
- throw new RuntimeException("ZooKeeperInstance has been closed.");
+ public String getRootTabletLocation() {
- String zRootLocPath = ZooUtil.getRoot(this) + Constants.ZROOT_TABLET_LOCATION;
+ String zRootLocPath = ZooUtil.getRoot(this) + RootTable.ZROOT_TABLET_LOCATION;
OpTimer opTimer = new OpTimer(log, Level.TRACE).start("Looking up root tablet location in zookeeper.");
byte[] loc = zooCache.get(zRootLocPath);
@@@ -249,25 -220,27 +237,19 @@@
@Override
public Connector getConnector(String principal, AuthenticationToken token) throws AccumuloException, AccumuloSecurityException {
- if (closed)
- throw new RuntimeException("ZooKeeperInstance has been closed.");
- return getConnector(CredentialHelper.create(principal, token, getInstanceID()));
- }
-
- @SuppressWarnings("deprecation")
- private Connector getConnector(TCredentials credential) throws AccumuloException, AccumuloSecurityException {
- return new ConnectorImpl(this, credential);
+ return new ConnectorImpl(this, new Credentials(principal, token));
}
-
+
@Override
@Deprecated
public Connector getConnector(String principal, byte[] pass) throws AccumuloException, AccumuloSecurityException {
- if (closed) {
- throw new RuntimeException("ZooKeeperInstance has been closed.");
- } else {
- return getConnector(principal, new PasswordToken(pass));
- }
+ return getConnector(principal, new PasswordToken(pass));
}
- private AccumuloConfiguration conf = null;
-
@Override
+ @Deprecated
public AccumuloConfiguration getConfiguration() {
- if (conf == null)
- conf = AccumuloConfiguration.getDefaultConfiguration();
- return conf;
+ return ServerConfigurationUtil.convertClientConfig(accumuloConf == null ? DefaultConfiguration.getInstance() : accumuloConf, clientConf);
}
@Override
@@@ -291,27 -275,39 +273,4 @@@
}
return null;
}
-
- static private final AtomicInteger clientInstances = new AtomicInteger(0);
-
- @Override
- public synchronized void close() {
- if (!closed && clientInstances.decrementAndGet() == 0) {
- try {
- zooCache.close();
- ThriftUtil.close();
- } catch (RuntimeException e) {
- clientInstances.incrementAndGet();
- throw e;
- }
- }
- closed = true;
- }
-
- @Override
- public void finalize() {
- // This method intentionally left blank. Users need to explicitly close Instances if they want things cleaned up nicely.
- if (!closed)
- log.warn("ZooKeeperInstance being cleaned up without being closed. Please remember to call close() before dereferencing to clean up threads.");
- }
-
- /**
- * To be moved to server code. Only lives here to support certain client side utilities to minimize command-line options.
- */
- @Deprecated
- public static String getInstanceIDFromHdfs(Path instanceDirectory) {
- try {
- FileSystem fs = FileUtil.getFileSystem(CachedConfiguration.getInstance(), AccumuloConfiguration.getSiteConfiguration());
- FileStatus[] files = null;
- try {
- files = fs.listStatus(instanceDirectory);
- } catch (FileNotFoundException ex) {
- // ignored
- }
- log.debug("Trying to read instance id from " + instanceDirectory);
- if (files == null || files.length == 0) {
- log.error("unable obtain instance id at " + instanceDirectory);
- throw new RuntimeException("Accumulo not initialized, there is no instance id at " + instanceDirectory);
- } else if (files.length != 1) {
- log.error("multiple potential instances in " + instanceDirectory);
- throw new RuntimeException("Accumulo found multiple possible instance ids in " + instanceDirectory);
- } else {
- String result = files[0].getPath().getName();
- return result;
- }
- } catch (IOException e) {
- throw new RuntimeException("Accumulo not initialized, there is no instance id at " + instanceDirectory, e);
- }
- }
-
- @Deprecated
- @Override
- public Connector getConnector(org.apache.accumulo.core.security.thrift.AuthInfo auth) throws AccumuloException, AccumuloSecurityException {
- return getConnector(auth.user, auth.password);
- }
}
http://git-wip-us.apache.org/repos/asf/accumulo/blob/f17661cd/core/src/main/java/org/apache/accumulo/core/client/impl/ThriftTransportPool.java
----------------------------------------------------------------------
diff --cc core/src/main/java/org/apache/accumulo/core/client/impl/ThriftTransportPool.java
index 765a4fc,ceeab21..a553cc1
--- a/core/src/main/java/org/apache/accumulo/core/client/impl/ThriftTransportPool.java
+++ b/core/src/main/java/org/apache/accumulo/core/client/impl/ThriftTransportPool.java
@@@ -567,10 -589,9 +565,9 @@@ public class ThriftTransportPool
this.killTime = time;
log.debug("Set thrift transport pool idle time to " + time);
}
-
+
private static ThriftTransportPool instance = new ThriftTransportPool();
private static final AtomicBoolean daemonStarted = new AtomicBoolean(false);
- private static AtomicBoolean stopDaemon;
public static ThriftTransportPool getInstance() {
SecurityManager sm = System.getSecurityManager();
http://git-wip-us.apache.org/repos/asf/accumulo/blob/f17661cd/core/src/main/java/org/apache/accumulo/core/client/mock/MockInstance.java
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/accumulo/blob/f17661cd/core/src/main/java/org/apache/accumulo/core/util/ThriftUtil.java
----------------------------------------------------------------------
diff --cc core/src/main/java/org/apache/accumulo/core/util/ThriftUtil.java
index fab02b2,9bffc81..da4e567
--- a/core/src/main/java/org/apache/accumulo/core/util/ThriftUtil.java
+++ b/core/src/main/java/org/apache/accumulo/core/util/ThriftUtil.java
@@@ -199,49 -220,4 +199,45 @@@ public class ThriftUtil
public static TProtocolFactory protocolFactory() {
return protocolFactory;
}
+
+ public static TServerSocket getServerSocket(int port, int timeout, InetAddress address, SslConnectionParams params) throws TTransportException {
+ if (params.useJsse()) {
+ return TSSLTransportFactory.getServerSocket(port, timeout, params.isClientAuth(), address);
+ } else {
+ return TSSLTransportFactory.getServerSocket(port, timeout, address, params.getTTransportParams());
+ }
+ }
+
- public static void close() {
- ThriftTransportPool.close();
- }
-
+ public static TTransport createClientTransport(HostAndPort address, int timeout, SslConnectionParams sslParams) throws TTransportException {
+ boolean success = false;
+ TTransport transport = null;
+ try {
+ if (sslParams != null) {
+ // TSSLTransportFactory handles timeout 0 -> forever natively
+ if (sslParams.useJsse()) {
+ transport = TSSLTransportFactory.getClientSocket(address.getHostText(), address.getPort(), timeout);
+ } else {
+ transport = TSSLTransportFactory.getClientSocket(address.getHostText(), address.getPort(), timeout, sslParams.getTTransportParams());
+ }
+ // TSSLTransportFactory leaves transports open, so no need to open here
+ } else if (timeout == 0) {
+ transport = new TSocket(address.getHostText(), address.getPort());
+ transport.open();
+ } else {
+ try {
+ transport = TTimeoutTransport.create(address, timeout);
+ } catch (IOException ex) {
+ throw new TTransportException(ex);
+ }
+ transport.open();
+ }
+ transport = ThriftUtil.transportFactory().getTransport(transport);
+ success = true;
+ } finally {
+ if (!success && transport != null) {
+ transport.close();
+ }
+ }
+ return transport;
+ }
}
http://git-wip-us.apache.org/repos/asf/accumulo/blob/f17661cd/core/src/test/java/org/apache/accumulo/core/client/impl/TabletLocatorImplTest.java
----------------------------------------------------------------------
diff --cc core/src/test/java/org/apache/accumulo/core/client/impl/TabletLocatorImplTest.java
index 375fcf4,8c63b1f..a594d19
--- a/core/src/test/java/org/apache/accumulo/core/client/impl/TabletLocatorImplTest.java
+++ b/core/src/test/java/org/apache/accumulo/core/client/impl/TabletLocatorImplTest.java
@@@ -461,13 -459,8 +461,8 @@@ public class TabletLocatorImplTest exte
public Connector getConnector(String principal, AuthenticationToken token) throws AccumuloException, AccumuloSecurityException {
throw new UnsupportedOperationException();
}
-
- @Override
- public void close() {
- // NOOP
- }
}
-
+
static class TServers {
private final Map<String,Map<KeyExtent,SortedMap<Key,Value>>> tservers = new HashMap<String,Map<KeyExtent,SortedMap<Key,Value>>>();
}
http://git-wip-us.apache.org/repos/asf/accumulo/blob/f17661cd/server/base/src/main/java/org/apache/accumulo/server/client/HdfsZooInstance.java
----------------------------------------------------------------------
diff --cc server/base/src/main/java/org/apache/accumulo/server/client/HdfsZooInstance.java
index d3c4bc5,0000000..f720272
mode 100644,000000..100644
--- a/server/base/src/main/java/org/apache/accumulo/server/client/HdfsZooInstance.java
+++ b/server/base/src/main/java/org/apache/accumulo/server/client/HdfsZooInstance.java
@@@ -1,197 -1,0 +1,191 @@@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.accumulo.server.client;
+
+import java.nio.ByteBuffer;
+import java.util.Collections;
+import java.util.List;
+import java.util.UUID;
+
+import org.apache.accumulo.core.Constants;
+import org.apache.accumulo.core.client.AccumuloException;
+import org.apache.accumulo.core.client.AccumuloSecurityException;
+import org.apache.accumulo.core.client.Connector;
+import org.apache.accumulo.core.client.Instance;
+import org.apache.accumulo.core.client.ZooKeeperInstance;
+import org.apache.accumulo.core.client.impl.ConnectorImpl;
+import org.apache.accumulo.core.client.security.tokens.AuthenticationToken;
+import org.apache.accumulo.core.client.security.tokens.PasswordToken;
+import org.apache.accumulo.core.conf.AccumuloConfiguration;
+import org.apache.accumulo.core.conf.Property;
+import org.apache.accumulo.core.metadata.RootTable;
+import org.apache.accumulo.core.security.Credentials;
+import org.apache.accumulo.core.util.ByteBufferUtil;
+import org.apache.accumulo.core.util.OpTimer;
+import org.apache.accumulo.core.util.StringUtil;
+import org.apache.accumulo.core.util.TextUtil;
+import org.apache.accumulo.core.zookeeper.ZooUtil;
+import org.apache.accumulo.fate.zookeeper.ZooCache;
+import org.apache.accumulo.server.ServerConstants;
+import org.apache.accumulo.server.conf.ServerConfiguration;
+import org.apache.accumulo.server.zookeeper.ZooLock;
+import org.apache.hadoop.io.Text;
+import org.apache.log4j.Level;
+import org.apache.log4j.Logger;
+
+/**
+ * An implementation of Instance that looks in HDFS and ZooKeeper to find the master and root tablet location.
+ *
+ */
+public class HdfsZooInstance implements Instance {
+
+ public static class AccumuloNotInitializedException extends RuntimeException {
+ private static final long serialVersionUID = 1L;
+
+ public AccumuloNotInitializedException(String string) {
+ super(string);
+ }
+ }
+
+ private HdfsZooInstance() {
+ AccumuloConfiguration acuConf = ServerConfiguration.getSiteConfiguration();
+ zooCache = new ZooCache(acuConf.get(Property.INSTANCE_ZK_HOST), (int) acuConf.getTimeInMillis(Property.INSTANCE_ZK_TIMEOUT));
+ }
+
+ private static HdfsZooInstance cachedHdfsZooInstance = null;
+
+ public static synchronized Instance getInstance() {
+ if (cachedHdfsZooInstance == null)
+ cachedHdfsZooInstance = new HdfsZooInstance();
+ return cachedHdfsZooInstance;
+ }
+
+ private static ZooCache zooCache;
+ private static String instanceId = null;
+ private static final Logger log = Logger.getLogger(HdfsZooInstance.class);
+
+ @Override
+ public String getRootTabletLocation() {
+ String zRootLocPath = ZooUtil.getRoot(this) + RootTable.ZROOT_TABLET_LOCATION;
+
+ OpTimer opTimer = new OpTimer(log, Level.TRACE).start("Looking up root tablet location in zoocache.");
+
+ byte[] loc = zooCache.get(zRootLocPath);
+
+ opTimer.stop("Found root tablet at " + (loc == null ? null : new String(loc)) + " in %DURATION%");
+
+ if (loc == null) {
+ return null;
+ }
+
+ return new String(loc).split("\\|")[0];
+ }
+
+ @Override
+ public List<String> getMasterLocations() {
+
+ String masterLocPath = ZooUtil.getRoot(this) + Constants.ZMASTER_LOCK;
+
+ OpTimer opTimer = new OpTimer(log, Level.TRACE).start("Looking up master location in zoocache.");
+
+ byte[] loc = ZooLock.getLockData(zooCache, masterLocPath, null);
+
+ opTimer.stop("Found master at " + (loc == null ? null : new String(loc)) + " in %DURATION%");
+
+ if (loc == null) {
+ return Collections.emptyList();
+ }
+
+ return Collections.singletonList(new String(loc));
+ }
+
+ @Override
+ public String getInstanceID() {
+ if (instanceId == null)
+ _getInstanceID();
+ return instanceId;
+ }
+
+ private static synchronized void _getInstanceID() {
+ if (instanceId == null) {
+ String instanceIdFromFile = ZooUtil.getInstanceIDFromHdfs(ServerConstants.getInstanceIdLocation());
+ instanceId = instanceIdFromFile;
+ }
+ }
+
+ @Override
+ public String getInstanceName() {
+ return ZooKeeperInstance.lookupInstanceName(zooCache, UUID.fromString(getInstanceID()));
+ }
+
+ @Override
+ public String getZooKeepers() {
+ return ServerConfiguration.getSiteConfiguration().get(Property.INSTANCE_ZK_HOST);
+ }
+
+ @Override
+ public int getZooKeepersSessionTimeOut() {
+ return (int) ServerConfiguration.getSiteConfiguration().getTimeInMillis(Property.INSTANCE_ZK_TIMEOUT);
+ }
+
+ @Override
+ public Connector getConnector(String principal, AuthenticationToken token) throws AccumuloException, AccumuloSecurityException {
+ return new ConnectorImpl(this, new Credentials(principal, token));
+ }
+
+ @Deprecated
+ @Override
+ public Connector getConnector(String user, byte[] pass) throws AccumuloException, AccumuloSecurityException {
+ return getConnector(user, new PasswordToken(pass));
+ }
+
+ @Deprecated
+ @Override
+ public Connector getConnector(String user, ByteBuffer pass) throws AccumuloException, AccumuloSecurityException {
+ return getConnector(user, ByteBufferUtil.toBytes(pass));
+ }
+
+ @Deprecated
+ @Override
+ public Connector getConnector(String user, CharSequence pass) throws AccumuloException, AccumuloSecurityException {
+ return getConnector(user, TextUtil.getBytes(new Text(pass.toString())));
+ }
+
+ private AccumuloConfiguration conf = null;
+
+ @Deprecated
+ @Override
+ public AccumuloConfiguration getConfiguration() {
+ if (conf == null)
+ conf = new ServerConfiguration(this).getConfiguration();
+ return conf;
+ }
+
+ @Override
+ @Deprecated
+ public void setConfiguration(AccumuloConfiguration conf) {
+ this.conf = conf;
+ }
+
+ public static void main(String[] args) {
+ Instance instance = HdfsZooInstance.getInstance();
+ System.out.println("Instance Name: " + instance.getInstanceName());
+ System.out.println("Instance ID: " + instance.getInstanceID());
+ System.out.println("ZooKeepers: " + instance.getZooKeepers());
+ System.out.println("Masters: " + StringUtil.join(instance.getMasterLocations(), ", "));
+ }
-
- @Override
- public void close() {
- zooCache.close();
- }
-
+}