You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@accumulo.apache.org by bh...@apache.org on 2014/04/11 19:27:37 UTC

[07/10] git commit: Merge branch '1.4.6-SNAPSHOT' into 1.5.2-SNAPSHOT

Merge branch '1.4.6-SNAPSHOT' into 1.5.2-SNAPSHOT


Project: http://git-wip-us.apache.org/repos/asf/accumulo/repo
Commit: http://git-wip-us.apache.org/repos/asf/accumulo/commit/7a14fcc9
Tree: http://git-wip-us.apache.org/repos/asf/accumulo/tree/7a14fcc9
Diff: http://git-wip-us.apache.org/repos/asf/accumulo/diff/7a14fcc9

Branch: refs/heads/master
Commit: 7a14fcc923685f6997430e2b665a20dba94be171
Parents: 0607c8c 7461ed9
Author: Bill Havanki <bh...@cloudera.com>
Authored: Fri Apr 11 13:16:12 2014 -0400
Committer: Bill Havanki <bh...@cloudera.com>
Committed: Fri Apr 11 13:16:12 2014 -0400

----------------------------------------------------------------------
 .../java/org/apache/accumulo/fate/zookeeper/ZooReaderWriter.java   | 2 +-
 1 file changed, 1 insertion(+), 1 deletion(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/accumulo/blob/7a14fcc9/fate/src/main/java/org/apache/accumulo/fate/zookeeper/ZooReaderWriter.java
----------------------------------------------------------------------
diff --cc fate/src/main/java/org/apache/accumulo/fate/zookeeper/ZooReaderWriter.java
index 7800ec0,0000000..64cb1e1
mode 100644,000000..100644
--- a/fate/src/main/java/org/apache/accumulo/fate/zookeeper/ZooReaderWriter.java
+++ b/fate/src/main/java/org/apache/accumulo/fate/zookeeper/ZooReaderWriter.java
@@@ -1,207 -1,0 +1,207 @@@
 +/*
 + * Licensed to the Apache Software Foundation (ASF) under one or more
 + * contributor license agreements.  See the NOTICE file distributed with
 + * this work for additional information regarding copyright ownership.
 + * The ASF licenses this file to You under the Apache License, Version 2.0
 + * (the "License"); you may not use this file except in compliance with
 + * the License.  You may obtain a copy of the License at
 + *
 + *     http://www.apache.org/licenses/LICENSE-2.0
 + *
 + * Unless required by applicable law or agreed to in writing, software
 + * distributed under the License is distributed on an "AS IS" BASIS,
 + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 + * See the License for the specific language governing permissions and
 + * limitations under the License.
 + */
 +package org.apache.accumulo.fate.zookeeper;
 +
 +import java.lang.reflect.InvocationHandler;
 +import java.lang.reflect.InvocationTargetException;
 +import java.lang.reflect.Method;
 +import java.lang.reflect.Proxy;
 +import java.security.SecurityPermission;
 +import java.util.List;
 +
 +import org.apache.accumulo.fate.util.UtilWaitThread;
 +import org.apache.accumulo.fate.zookeeper.ZooUtil.NodeExistsPolicy;
 +import org.apache.accumulo.fate.zookeeper.ZooUtil.NodeMissingPolicy;
 +import org.apache.log4j.Logger;
 +import org.apache.zookeeper.CreateMode;
 +import org.apache.zookeeper.KeeperException;
 +import org.apache.zookeeper.KeeperException.BadVersionException;
 +import org.apache.zookeeper.KeeperException.NodeExistsException;
 +import org.apache.zookeeper.ZooKeeper;
 +import org.apache.zookeeper.data.ACL;
 +import org.apache.zookeeper.data.Stat;
 +
 +public class ZooReaderWriter extends ZooReader implements IZooReaderWriter {
 +  
 +  private static SecurityPermission ZOOWRITER_PERMISSION = new SecurityPermission("zookeeperWriterPermission");
 +  
 +  private static ZooReaderWriter instance = null;
 +  private static IZooReaderWriter retryingInstance = null;
 +  private final String scheme;
 +  private final byte[] auth;
 +  
 +  @Override
 +  public ZooKeeper getZooKeeper() {
 +    SecurityManager sm = System.getSecurityManager();
 +    if (sm != null) {
 +      sm.checkPermission(ZOOWRITER_PERMISSION);
 +    }
 +    return getSession(keepers, timeout, scheme, auth);
 +  }
 +  
 +  public ZooReaderWriter(String string, int timeInMillis, String scheme, byte[] auth) {
 +    super(string, timeInMillis);
 +    this.scheme = scheme;
 +    this.auth = auth;
 +  }
 +  
 +  @Override
 +  public void recursiveDelete(String zPath, NodeMissingPolicy policy) throws KeeperException, InterruptedException {
 +    ZooUtil.recursiveDelete(getZooKeeper(), zPath, policy);
 +  }
 +  
 +  @Override
 +  public void recursiveDelete(String zPath, int version, NodeMissingPolicy policy) throws KeeperException, InterruptedException {
 +    ZooUtil.recursiveDelete(getZooKeeper(), zPath, version, policy);
 +  }
 +  
 +  /**
 +   * Create a persistent node with the default ACL
 +   * 
 +   * @return true if the node was created or altered; false if it was skipped
 +   */
 +  @Override
 +  public boolean putPersistentData(String zPath, byte[] data, NodeExistsPolicy policy) throws KeeperException, InterruptedException {
 +    return ZooUtil.putPersistentData(getZooKeeper(), zPath, data, policy);
 +  }
 +  
 +  @Override
 +  public boolean putPrivatePersistentData(String zPath, byte[] data, NodeExistsPolicy policy) throws KeeperException, InterruptedException {
 +    return ZooUtil.putPrivatePersistentData(getZooKeeper(), zPath, data, policy);
 +  }
 +  
 +  @Override
 +  public void putPersistentData(String zPath, byte[] data, int version, NodeExistsPolicy policy) throws KeeperException, InterruptedException {
 +    ZooUtil.putPersistentData(getZooKeeper(), zPath, data, version, policy);
 +  }
 +  
 +  @Override
 +  public String putPersistentSequential(String zPath, byte[] data) throws KeeperException, InterruptedException {
 +    return ZooUtil.putPersistentSequential(getZooKeeper(), zPath, data);
 +  }
 +  
 +  @Override
 +  public String putEphemeralData(String zPath, byte[] data) throws KeeperException, InterruptedException {
 +    return ZooUtil.putEphemeralData(getZooKeeper(), zPath, data);
 +  }
 +  
 +  @Override
 +  public String putEphemeralSequential(String zPath, byte[] data) throws KeeperException, InterruptedException {
 +    return ZooUtil.putEphemeralSequential(getZooKeeper(), zPath, data);
 +  }
 +  
 +  @Override
 +  public void recursiveCopyPersistent(String source, String destination, NodeExistsPolicy policy) throws KeeperException, InterruptedException {
 +    ZooUtil.recursiveCopyPersistent(getZooKeeper(), source, destination, policy);
 +  }
 +  
 +  @Override
 +  public void delete(String path, int version) throws InterruptedException, KeeperException {
 +    getZooKeeper().delete(path, version);
 +  }
 +  
 +  public interface Mutator {
 +    byte[] mutate(byte[] currentValue) throws Exception;
 +  }
 +  
 +  @Override
 +  public byte[] mutate(String zPath, byte[] createValue, List<ACL> acl, Mutator mutator) throws Exception {
 +    if (createValue != null) {
 +      try {
 +        getZooKeeper().create(zPath, createValue, acl, CreateMode.PERSISTENT);
 +        return createValue;
 +      } catch (NodeExistsException ex) {
 +        // expected
 +      }
 +    }
 +    do {
 +      Stat stat = new Stat();
 +      byte[] data = getZooKeeper().getData(zPath, false, stat);
 +      data = mutator.mutate(data);
 +      if (data == null)
 +        return data;
 +      try {
 +        getZooKeeper().setData(zPath, data, stat.getVersion());
 +        return data;
 +      } catch (BadVersionException ex) {
 +        //
 +      }
 +    } while (true);
 +  }
 +  
 +  public static synchronized ZooReaderWriter getInstance(String zookeepers, int timeInMillis, String scheme, byte[] auth) {
 +    if (instance == null)
 +      instance = new ZooReaderWriter(zookeepers, timeInMillis, scheme, auth);
 +    return instance;
 +  }
 +  
 +  /**
 +   * get an instance that retries when zookeeper connection errors occur
 +   * 
 +   * @return an instance that retries when Zookeeper connection errors occur.
 +   */
 +  public static synchronized IZooReaderWriter getRetryingInstance(String zookeepers, int timeInMillis, String scheme, byte[] auth) {
 +    
 +    if (retryingInstance == null) {
 +      final IZooReaderWriter inst = getInstance(zookeepers, timeInMillis, scheme, auth);
 +      
 +      InvocationHandler ih = new InvocationHandler() {
 +        @Override
 +        public Object invoke(Object obj, Method method, Object[] args) throws Throwable {
 +          long retryTime = 250;
 +          while (true) {
 +            try {
 +              return method.invoke(inst, args);
 +            } catch (InvocationTargetException e) {
 +              if (e.getCause() instanceof KeeperException.ConnectionLossException) {
-                 Logger.getLogger(ZooReaderWriter.class).warn("Error connecting to zookeeper, will retry in " + retryTime, e.getCause());
++                Logger.getLogger(ZooReaderWriter.class).warn("Error connecting to zookeeper, will retry in " + String.format("%.2f secs", retryTime / 1000.0), e.getCause());
 +                UtilWaitThread.sleep(retryTime);
 +                retryTime = Math.min(5000, retryTime + 250);
 +              } else {
 +                throw e.getCause();
 +              }
 +            }
 +          }
 +        }
 +      };
 +      
 +      retryingInstance = (IZooReaderWriter) Proxy.newProxyInstance(ZooReaderWriter.class.getClassLoader(), new Class[] {IZooReaderWriter.class}, ih);
 +    }
 +    
 +    return retryingInstance;
 +  }
 +  
 +  @Override
 +  public boolean isLockHeld(ZooUtil.LockID lockID) throws KeeperException, InterruptedException {
 +    return ZooUtil.isLockHeld(getZooKeeper(), lockID);
 +  }
 +  
 +  @Override
 +  public void mkdirs(String path) throws KeeperException, InterruptedException {
 +    if (path.equals(""))
 +      return;
 +    if (!path.startsWith("/"))
 +      throw new IllegalArgumentException(path + "does not start with /");
 +    if (getZooKeeper().exists(path, false) != null)
 +      return;
 +    String parent = path.substring(0, path.lastIndexOf("/"));
 +    mkdirs(parent);
 +    putPersistentData(path, new byte[] {}, NodeExistsPolicy.SKIP);
 +  }
 +
 +
 +}