You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@accumulo.apache.org by ec...@apache.org on 2013/12/12 17:25:32 UTC
[10/16] Merge branch '1.4.5-SNAPSHOT' into 1.5.1-SNAPSHOT
http://git-wip-us.apache.org/repos/asf/accumulo/blob/7eb838e3/fate/src/main/java/org/apache/accumulo/fate/zookeeper/ZooCache.java
----------------------------------------------------------------------
diff --cc fate/src/main/java/org/apache/accumulo/fate/zookeeper/ZooCache.java
index 420533a,0000000..c9c77b8
mode 100644,000000..100644
--- a/fate/src/main/java/org/apache/accumulo/fate/zookeeper/ZooCache.java
+++ b/fate/src/main/java/org/apache/accumulo/fate/zookeeper/ZooCache.java
@@@ -1,317 -1,0 +1,319 @@@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.accumulo.fate.zookeeper;
+
+import java.io.ByteArrayInputStream;
+import java.io.ByteArrayOutputStream;
++import java.io.Closeable;
+import java.io.DataInputStream;
+import java.io.DataOutputStream;
+import java.io.IOException;
+import java.util.Collections;
+import java.util.ConcurrentModificationException;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+
+import org.apache.log4j.Logger;
+import org.apache.zookeeper.KeeperException;
+import org.apache.zookeeper.KeeperException.Code;
+import org.apache.zookeeper.WatchedEvent;
+import org.apache.zookeeper.Watcher;
+import org.apache.zookeeper.ZooKeeper;
+import org.apache.zookeeper.data.Stat;
+
+/**
+ * Caches values stored in zookeeper and keeps them up to date as they change in zookeeper.
+ *
+ */
- public class ZooCache {
++public class ZooCache implements Closeable {
+ private static final Logger log = Logger.getLogger(ZooCache.class);
+
+ private ZCacheWatcher watcher = new ZCacheWatcher();
+ private Watcher externalWatcher = null;
+
+ private HashMap<String,byte[]> cache;
+ private HashMap<String,Stat> statCache;
+ private HashMap<String,List<String>> childrenCache;
+
+ private ZooReader zReader;
+
+ private ZooKeeper getZooKeeper() {
+ return zReader.getZooKeeper();
+ }
+
+ private class ZCacheWatcher implements Watcher {
+ @Override
+ public void process(WatchedEvent event) {
+
+ if (log.isTraceEnabled())
+ log.trace(event);
+
+ switch (event.getType()) {
+ case NodeDataChanged:
+ case NodeChildrenChanged:
+ case NodeCreated:
+ case NodeDeleted:
+ remove(event.getPath());
+ break;
+ case None:
+ switch (event.getState()) {
+ case Disconnected:
+ if (log.isTraceEnabled())
+ log.trace("Zoo keeper connection disconnected, clearing cache");
+ clear();
+ break;
+ case SyncConnected:
+ break;
+ case Expired:
+ if (log.isTraceEnabled())
+ log.trace("Zoo keeper connection expired, clearing cache");
+ clear();
+ break;
+ default:
+ log.warn("Unhandled: " + event);
+ }
+ break;
+ default:
+ log.warn("Unhandled: " + event);
+ }
+
+ if (externalWatcher != null) {
+ externalWatcher.process(event);
+ }
+ }
+ }
+
+ public ZooCache(String zooKeepers, int sessionTimeout) {
+ this(zooKeepers, sessionTimeout, null);
+ }
+
+ public ZooCache(String zooKeepers, int sessionTimeout, Watcher watcher) {
+ this(new ZooReader(zooKeepers, sessionTimeout), watcher);
+ }
+
+ public ZooCache(ZooReader reader, Watcher watcher) {
+ this.zReader = reader;
+ this.cache = new HashMap<String,byte[]>();
+ this.statCache = new HashMap<String,Stat>();
+ this.childrenCache = new HashMap<String,List<String>>();
+ this.externalWatcher = watcher;
+ }
+
+ private static interface ZooRunnable {
+ void run(ZooKeeper zooKeeper) throws KeeperException, InterruptedException;
+ }
+
+ private synchronized void retry(ZooRunnable op) {
+
+ int sleepTime = 100;
+
+ while (true) {
+
+ ZooKeeper zooKeeper = getZooKeeper();
+
+ try {
+ op.run(zooKeeper);
+ return;
+
+ } catch (KeeperException e) {
+ if (e.code() == Code.NONODE) {
+ log.error("Looked up non existant node in cache " + e.getPath(), e);
+ }
+ log.warn("Zookeeper error, will retry", e);
+ } catch (InterruptedException e) {
+ log.info("Zookeeper error, will retry", e);
+ } catch (ConcurrentModificationException e) {
+ log.debug("Zookeeper was modified, will retry");
+ }
+
+ try {
+ // do not hold lock while sleeping
+ wait(sleepTime);
+ } catch (InterruptedException e) {
+ e.printStackTrace();
+ }
+ if (sleepTime < 10000)
+ sleepTime = (int) (sleepTime + sleepTime * Math.random());
+
+ }
+ }
+
+ public synchronized List<String> getChildren(final String zPath) {
+
+ ZooRunnable zr = new ZooRunnable() {
+
+ @Override
+ public void run(ZooKeeper zooKeeper) throws KeeperException, InterruptedException {
+
+ if (childrenCache.containsKey(zPath))
+ return;
+
+ try {
+ List<String> children = zooKeeper.getChildren(zPath, watcher);
+ childrenCache.put(zPath, children);
+ } catch (KeeperException ke) {
+ if (ke.code() != Code.NONODE) {
+ throw ke;
+ }
+ }
+ }
+
+ };
+
+ retry(zr);
+
+ List<String> children = childrenCache.get(zPath);
+ if (children == null) {
+ return null;
+ }
+ return Collections.unmodifiableList(children);
+ }
+
+ public synchronized byte[] get(final String zPath) {
+ return get(zPath, null);
+ }
+
+ public synchronized byte[] get(final String zPath, Stat stat) {
+ ZooRunnable zr = new ZooRunnable() {
+
+ @Override
+ public void run(ZooKeeper zooKeeper) throws KeeperException, InterruptedException {
+
+ if (cache.containsKey(zPath))
+ return;
+
+ /*
+ * The following call to exists() is important, since we are caching that a node does not exist. Once the node comes into existance, it will be added to
+ * the cache. But this notification of a node coming into existance will only be given if exists() was previously called.
+ *
+ * If the call to exists() is bypassed and only getData() is called with a special case that looks for Code.NONODE in the KeeperException, then
+ * non-existance can not be cached.
+ */
+
+ Stat stat = zooKeeper.exists(zPath, watcher);
+
+ byte[] data = null;
+
+ if (stat == null) {
+ if (log.isTraceEnabled())
+ log.trace("zookeeper did not contain " + zPath);
+ } else {
+ try {
+ data = zooKeeper.getData(zPath, watcher, stat);
+ } catch (KeeperException.BadVersionException e1) {
+ throw new ConcurrentModificationException();
+ } catch (KeeperException.NoNodeException e2) {
+ throw new ConcurrentModificationException();
+ }
+ if (log.isTraceEnabled())
+ log.trace("zookeeper contained " + zPath + " " + (data == null ? null : new String(data)));
+ }
+ if (log.isTraceEnabled())
+ log.trace("putting " + zPath + " " + (data == null ? null : new String(data)) + " in cache");
+ put(zPath, data, stat);
+ }
+
+ };
+
+ retry(zr);
+
+ if (stat != null) {
+ Stat cstat = statCache.get(zPath);
+ if (cstat != null) {
+ try {
+ ByteArrayOutputStream baos = new ByteArrayOutputStream();
+ DataOutputStream dos = new DataOutputStream(baos);
+ cstat.write(dos);
+ dos.close();
+
+ ByteArrayInputStream bais = new ByteArrayInputStream(baos.toByteArray());
+ DataInputStream dis = new DataInputStream(bais);
+ stat.readFields(dis);
+
+ dis.close();
+ } catch (IOException e) {
+ throw new RuntimeException(e);
+ }
+ }
+ }
+
+ return cache.get(zPath);
+ }
+
+ private synchronized void put(String zPath, byte[] data, Stat stat) {
+ cache.put(zPath, data);
+ statCache.put(zPath, stat);
+ }
+
+ private synchronized void remove(String zPath) {
+ if (log.isTraceEnabled())
+ log.trace("removing " + zPath + " from cache");
+ cache.remove(zPath);
+ childrenCache.remove(zPath);
+ statCache.remove(zPath);
+ }
+
+ public synchronized void clear() {
+ cache.clear();
+ childrenCache.clear();
+ statCache.clear();
+ }
+
+ public synchronized void clear(String zPath) {
+
+ for (Iterator<String> i = cache.keySet().iterator(); i.hasNext();) {
+ String path = i.next();
+ if (path.startsWith(zPath))
+ i.remove();
+ }
+
+ for (Iterator<String> i = childrenCache.keySet().iterator(); i.hasNext();) {
+ String path = i.next();
+ if (path.startsWith(zPath))
+ i.remove();
+ }
+
+ for (Iterator<String> i = statCache.keySet().iterator(); i.hasNext();) {
+ String path = i.next();
+ if (path.startsWith(zPath))
+ i.remove();
+ }
+ }
+
+ private static Map<String,ZooCache> instances = new HashMap<String,ZooCache>();
+
+ public static synchronized ZooCache getInstance(String zooKeepers, int sessionTimeout) {
+ String key = zooKeepers + ":" + sessionTimeout;
+ ZooCache zc = instances.get(key);
+ if (zc == null) {
+ zc = new ZooCache(zooKeepers, sessionTimeout);
+ instances.put(key, zc);
+ }
+
+ return zc;
+ }
+
- public void close() throws InterruptedException {
++ @Override
++ public void close() {
+ cache.clear();
+ statCache.clear();
+ childrenCache.clear();
+ zReader.close();
+ }
+}
http://git-wip-us.apache.org/repos/asf/accumulo/blob/7eb838e3/fate/src/main/java/org/apache/accumulo/fate/zookeeper/ZooReader.java
----------------------------------------------------------------------
diff --cc fate/src/main/java/org/apache/accumulo/fate/zookeeper/ZooReader.java
index e11f570,0000000..5fc9595
mode 100644,000000..100644
--- a/fate/src/main/java/org/apache/accumulo/fate/zookeeper/ZooReader.java
+++ b/fate/src/main/java/org/apache/accumulo/fate/zookeeper/ZooReader.java
@@@ -1,109 -1,0 +1,118 @@@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.accumulo.fate.zookeeper;
+
++import java.io.Closeable;
+import java.util.List;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import org.apache.zookeeper.KeeperException;
+import org.apache.zookeeper.Watcher;
+import org.apache.zookeeper.ZooKeeper;
+import org.apache.zookeeper.AsyncCallback.VoidCallback;
+import org.apache.zookeeper.KeeperException.Code;
+import org.apache.zookeeper.data.Stat;
+
- public class ZooReader implements IZooReader {
++public class ZooReader implements IZooReader, Closeable {
+
+ protected String keepers;
+ protected int timeout;
+
+ protected ZooKeeper getSession(String keepers, int timeout, String scheme, byte[] auth) {
+ return ZooSession.getSession(keepers, timeout, scheme, auth);
+ }
+
+ protected ZooKeeper getZooKeeper() {
+ return getSession(keepers, timeout, null, null);
+ }
+
+ @Override
+ public byte[] getData(String zPath, Stat stat) throws KeeperException, InterruptedException {
+ return getZooKeeper().getData(zPath, false, stat);
+ }
+
+ @Override
+ public Stat getStatus(String zPath) throws KeeperException, InterruptedException {
+ return getZooKeeper().exists(zPath, false);
+ }
+
+ @Override
+ public Stat getStatus(String zPath, Watcher watcher) throws KeeperException, InterruptedException {
+ return getZooKeeper().exists(zPath, watcher);
+ }
+
+ @Override
+ public List<String> getChildren(String zPath) throws KeeperException, InterruptedException {
+ return getZooKeeper().getChildren(zPath, false);
+ }
+
+ @Override
+ public List<String> getChildren(String zPath, Watcher watcher) throws KeeperException, InterruptedException {
+ return getZooKeeper().getChildren(zPath, watcher);
+ }
+
+ @Override
+ public boolean exists(String zPath) throws KeeperException, InterruptedException {
+ return getZooKeeper().exists(zPath, false) != null;
+ }
+
+ @Override
+ public boolean exists(String zPath, Watcher watcher) throws KeeperException, InterruptedException {
+ return getZooKeeper().exists(zPath, watcher) != null;
+ }
+
+ @Override
+ public void sync(final String path) throws KeeperException, InterruptedException {
+ final AtomicInteger rc = new AtomicInteger();
+ final AtomicBoolean waiter = new AtomicBoolean(false);
+ getZooKeeper().sync(path, new VoidCallback() {
+ @Override
+ public void processResult(int code, String arg1, Object arg2) {
+ rc.set(code);
+ synchronized (waiter) {
+ waiter.set(true);
+ waiter.notifyAll();
+ }
+ }}, null);
+ synchronized (waiter) {
+ while (!waiter.get())
+ waiter.wait();
+ }
+ Code code = Code.get(rc.get());
+ if (code != KeeperException.Code.OK) {
+ throw KeeperException.create(code);
+ }
+ }
+
+ public ZooReader(String keepers, int timeout) {
+ this.keepers = keepers;
+ this.timeout = timeout;
+ }
+
- public void close() throws InterruptedException {
- getZooKeeper().close();
++ /**
++ * Closes this reader. If closure of the underlying session is interrupted,
++ * this method sets the calling thread's interrupt status.
++ */
++ public void close() {
++ try {
++ getZooKeeper().close();
++ } catch (InterruptedException e) {
++ Thread.currentThread().interrupt();
++ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/accumulo/blob/7eb838e3/server/src/main/java/org/apache/accumulo/server/client/HdfsZooInstance.java
----------------------------------------------------------------------
diff --cc server/src/main/java/org/apache/accumulo/server/client/HdfsZooInstance.java
index f12dca5,0000000..154c9c2
mode 100644,000000..100644
--- a/server/src/main/java/org/apache/accumulo/server/client/HdfsZooInstance.java
+++ b/server/src/main/java/org/apache/accumulo/server/client/HdfsZooInstance.java
@@@ -1,213 -1,0 +1,209 @@@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.accumulo.server.client;
+
+import java.nio.ByteBuffer;
+import java.util.Collections;
+import java.util.List;
+import java.util.UUID;
+
+import org.apache.accumulo.core.Constants;
+import org.apache.accumulo.core.client.AccumuloException;
+import org.apache.accumulo.core.client.AccumuloSecurityException;
+import org.apache.accumulo.core.client.Connector;
+import org.apache.accumulo.core.client.Instance;
+import org.apache.accumulo.core.client.ZooKeeperInstance;
+import org.apache.accumulo.core.client.impl.ConnectorImpl;
+import org.apache.accumulo.core.client.security.tokens.AuthenticationToken;
+import org.apache.accumulo.core.client.security.tokens.PasswordToken;
+import org.apache.accumulo.core.conf.AccumuloConfiguration;
+import org.apache.accumulo.core.conf.Property;
+import org.apache.accumulo.core.security.CredentialHelper;
+import org.apache.accumulo.core.security.thrift.TCredentials;
+import org.apache.accumulo.core.util.ByteBufferUtil;
+import org.apache.accumulo.core.util.OpTimer;
+import org.apache.accumulo.core.util.StringUtil;
+import org.apache.accumulo.core.util.TextUtil;
+import org.apache.accumulo.core.zookeeper.ZooUtil;
+import org.apache.accumulo.fate.zookeeper.ZooCache;
+import org.apache.accumulo.server.ServerConstants;
+import org.apache.accumulo.server.conf.ServerConfiguration;
+import org.apache.accumulo.server.zookeeper.ZooLock;
+import org.apache.hadoop.io.Text;
+import org.apache.log4j.Level;
+import org.apache.log4j.Logger;
+
+/**
+ * An implementation of Instance that looks in HDFS and ZooKeeper to find the master and root tablet location.
+ *
+ */
+public class HdfsZooInstance implements Instance {
+
+ public static class AccumuloNotInitializedException extends RuntimeException {
+ private static final long serialVersionUID = 1L;
+
+ public AccumuloNotInitializedException(String string) {
+ super(string);
+ }
+ }
+
+ private HdfsZooInstance() {
+ AccumuloConfiguration acuConf = ServerConfiguration.getSiteConfiguration();
+ zooCache = new ZooCache(acuConf.get(Property.INSTANCE_ZK_HOST), (int) acuConf.getTimeInMillis(Property.INSTANCE_ZK_TIMEOUT));
+ }
+
+ private static HdfsZooInstance cachedHdfsZooInstance = null;
+
+ public static synchronized Instance getInstance() {
+ if (cachedHdfsZooInstance == null)
+ cachedHdfsZooInstance = new HdfsZooInstance();
+ return cachedHdfsZooInstance;
+ }
+
+ private static ZooCache zooCache;
+ private static String instanceId = null;
+ private static final Logger log = Logger.getLogger(HdfsZooInstance.class);
+
+ @Override
+ public String getRootTabletLocation() {
+ String zRootLocPath = ZooUtil.getRoot(this) + Constants.ZROOT_TABLET_LOCATION;
+
+ OpTimer opTimer = new OpTimer(log, Level.TRACE).start("Looking up root tablet location in zoocache.");
+
+ byte[] loc = zooCache.get(zRootLocPath);
+
+ opTimer.stop("Found root tablet at " + (loc == null ? null : new String(loc)) + " in %DURATION%");
+
+ if (loc == null) {
+ return null;
+ }
+
+ return new String(loc).split("\\|")[0];
+ }
+
+ @Override
+ public List<String> getMasterLocations() {
+
+ String masterLocPath = ZooUtil.getRoot(this) + Constants.ZMASTER_LOCK;
+
+ OpTimer opTimer = new OpTimer(log, Level.TRACE).start("Looking up master location in zoocache.");
+
+ byte[] loc = ZooLock.getLockData(zooCache, masterLocPath, null);
+
+ opTimer.stop("Found master at " + (loc == null ? null : new String(loc)) + " in %DURATION%");
+
+ if (loc == null) {
+ return Collections.emptyList();
+ }
+
+ return Collections.singletonList(new String(loc));
+ }
+
+ @Override
+ public String getInstanceID() {
+ if (instanceId == null)
+ _getInstanceID();
+ return instanceId;
+ }
+
+ private static synchronized void _getInstanceID() {
+ if (instanceId == null) {
+ @SuppressWarnings("deprecation")
+ String instanceIdFromFile = ZooKeeperInstance.getInstanceIDFromHdfs(ServerConstants.getInstanceIdLocation());
+ instanceId = instanceIdFromFile;
+ }
+ }
+
+ @Override
+ public String getInstanceName() {
+ return ZooKeeperInstance.lookupInstanceName(zooCache, UUID.fromString(getInstanceID()));
+ }
+
+ @Override
+ public String getZooKeepers() {
+ return ServerConfiguration.getSiteConfiguration().get(Property.INSTANCE_ZK_HOST);
+ }
+
+ @Override
+ public int getZooKeepersSessionTimeOut() {
+ return (int) ServerConfiguration.getSiteConfiguration().getTimeInMillis(Property.INSTANCE_ZK_TIMEOUT);
+ }
+
+ @Override
+ // Not really deprecated, just not for client use
+ public Connector getConnector(String principal, AuthenticationToken token) throws AccumuloException, AccumuloSecurityException {
+ return getConnector(CredentialHelper.create(principal, token, getInstanceID()));
+ }
+
+ @SuppressWarnings("deprecation")
+ private Connector getConnector(TCredentials cred) throws AccumuloException, AccumuloSecurityException {
+ return new ConnectorImpl(this, cred);
+ }
+
+ @Deprecated
+ @Override
+ // Not really deprecated, just not for client use
+ public Connector getConnector(String user, byte[] pass) throws AccumuloException, AccumuloSecurityException {
+ return getConnector(user, new PasswordToken(pass));
+ }
+
+ @Deprecated
+ @Override
+ // Not really deprecated, just not for client use
+ public Connector getConnector(String user, ByteBuffer pass) throws AccumuloException, AccumuloSecurityException {
+ return getConnector(user, ByteBufferUtil.toBytes(pass));
+ }
+
+ @Deprecated
+ @Override
+ public Connector getConnector(String user, CharSequence pass) throws AccumuloException, AccumuloSecurityException {
+ return getConnector(user, TextUtil.getBytes(new Text(pass.toString())));
+ }
+
+ private AccumuloConfiguration conf = null;
+
+ @Override
+ public AccumuloConfiguration getConfiguration() {
+ if (conf == null)
+ conf = new ServerConfiguration(this).getConfiguration();
+ return conf;
+ }
+
+ @Override
+ public void setConfiguration(AccumuloConfiguration conf) {
+ this.conf = conf;
+ }
+
+ public static void main(String[] args) {
+ Instance instance = HdfsZooInstance.getInstance();
+ System.out.println("Instance Name: " + instance.getInstanceName());
+ System.out.println("Instance ID: " + instance.getInstanceID());
+ System.out.println("ZooKeepers: " + instance.getZooKeepers());
+ System.out.println("Masters: " + StringUtil.join(instance.getMasterLocations(), ", "));
+ }
+
+ @Override
- public void close() throws AccumuloException {
- try {
- zooCache.close();
- } catch (InterruptedException e) {
- throw new AccumuloException("Issues closing ZooKeeper, try again");
- }
++ public void close() {
++ zooCache.close();
+ }
+
+ @Deprecated
+ @Override
+ public Connector getConnector(org.apache.accumulo.core.security.thrift.AuthInfo auth) throws AccumuloException, AccumuloSecurityException {
+ return getConnector(auth.user, auth.getPassword());
+ }
+}