You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ranger.apache.org by bo...@apache.org on 2015/03/18 00:33:29 UTC

[07/17] incubator-ranger git commit: Support for Solr as Audit Destination.

http://git-wip-us.apache.org/repos/asf/incubator-ranger/blob/40aa090d/ranger_solrj/src/main/java/org/apache/solr/common/cloud/SolrZkClient.java
----------------------------------------------------------------------
diff --git a/ranger_solrj/src/main/java/org/apache/solr/common/cloud/SolrZkClient.java b/ranger_solrj/src/main/java/org/apache/solr/common/cloud/SolrZkClient.java
new file mode 100644
index 0000000..f14aedd
--- /dev/null
+++ b/ranger_solrj/src/main/java/org/apache/solr/common/cloud/SolrZkClient.java
@@ -0,0 +1,736 @@
+package org.apache.solr.common.cloud;
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with this
+ * work for additional information regarding copyright ownership. The ASF
+ * licenses this file to You under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * 
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ */
+
+import org.apache.commons.io.FileUtils;
+import org.apache.solr.common.SolrException;
+import org.apache.solr.common.StringUtils;
+import org.apache.solr.common.cloud.ZkClientConnectionStrategy.ZkUpdate;
+import org.apache.solr.common.util.ExecutorUtil;
+import org.apache.solr.common.util.ObjectReleaseTracker;
+import org.apache.solr.common.util.SolrjNamedThreadFactory;
+import org.apache.zookeeper.CreateMode;
+import org.apache.zookeeper.KeeperException;
+import org.apache.zookeeper.KeeperException.NoNodeException;
+import org.apache.zookeeper.KeeperException.NodeExistsException;
+import org.apache.zookeeper.KeeperException.NotEmptyException;
+import org.apache.zookeeper.WatchedEvent;
+import org.apache.zookeeper.Watcher;
+import org.apache.zookeeper.ZooKeeper;
+import org.apache.zookeeper.data.ACL;
+import org.apache.zookeeper.data.Stat;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.xml.transform.OutputKeys;
+import javax.xml.transform.Source;
+import javax.xml.transform.Transformer;
+import javax.xml.transform.TransformerFactory;
+import javax.xml.transform.stream.StreamResult;
+import javax.xml.transform.stream.StreamSource;
+import java.io.Closeable;
+import java.io.File;
+import java.io.IOException;
+import java.io.StringReader;
+import java.io.StringWriter;
+import java.nio.charset.StandardCharsets;
+import java.util.List;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+
+/**
+ * 
+ * All Solr ZooKeeper interactions should go through this class rather than
+ * ZooKeeper. This class handles synchronous connects and reconnections.
+ *
+ */
+public class SolrZkClient implements Closeable {
+  
+  static final String NEWL = System.getProperty("line.separator");
+
+  static final int DEFAULT_CLIENT_CONNECT_TIMEOUT = 30000;
+
+  private static final Logger log = LoggerFactory
+      .getLogger(SolrZkClient.class);
+
+  private ConnectionManager connManager;
+
+  private volatile SolrZooKeeper keeper;
+  
+  private ZkCmdExecutor zkCmdExecutor;
+
+  private final ExecutorService zkCallbackExecutor = Executors.newCachedThreadPool(new SolrjNamedThreadFactory("zkCallback"));
+
+  private volatile boolean isClosed = false;
+  private ZkClientConnectionStrategy zkClientConnectionStrategy;
+  private int zkClientTimeout;
+  private ZkACLProvider zkACLProvider;
+  private String zkServerAddress;
+
+  public int getZkClientTimeout() {
+    return zkClientTimeout;
+  }
+
+  // expert: for tests
+  public SolrZkClient() {
+    
+  }
+  
+  public SolrZkClient(String zkServerAddress, int zkClientTimeout) {
+    this(zkServerAddress, zkClientTimeout, new DefaultConnectionStrategy(), null);
+  }
+  
+  public SolrZkClient(String zkServerAddress, int zkClientTimeout, int zkClientConnectTimeout) {
+    this(zkServerAddress, zkClientTimeout, zkClientConnectTimeout, new DefaultConnectionStrategy(), null);
+  }
+  
+  public SolrZkClient(String zkServerAddress, int zkClientTimeout, int zkClientConnectTimeout, OnReconnect onReonnect) {
+    this(zkServerAddress, zkClientTimeout, zkClientConnectTimeout, new DefaultConnectionStrategy(), onReonnect);
+  }
+
+  public SolrZkClient(String zkServerAddress, int zkClientTimeout,
+      ZkClientConnectionStrategy strat, final OnReconnect onReconnect) {
+    this(zkServerAddress, zkClientTimeout, DEFAULT_CLIENT_CONNECT_TIMEOUT, strat, onReconnect);
+  }
+  
+  public SolrZkClient(String zkServerAddress, int zkClientTimeout, int clientConnectTimeout,
+      ZkClientConnectionStrategy strat, final OnReconnect onReconnect) {
+    this(zkServerAddress, zkClientTimeout, clientConnectTimeout, strat, onReconnect, null, null);
+  }
+  
+  public SolrZkClient(String zkServerAddress, int zkClientTimeout, int clientConnectTimeout,
+      ZkClientConnectionStrategy strat, final OnReconnect onReconnect, BeforeReconnect beforeReconnect) {
+    this(zkServerAddress, zkClientTimeout, clientConnectTimeout, strat, onReconnect, beforeReconnect, null);
+  }
+
+  public SolrZkClient(String zkServerAddress, int zkClientTimeout, int clientConnectTimeout, 
+      ZkClientConnectionStrategy strat, final OnReconnect onReconnect, BeforeReconnect beforeReconnect, ZkACLProvider zkACLProvider) {
+    this.zkClientConnectionStrategy = strat;
+    this.zkServerAddress = zkServerAddress;
+    
+    if (strat == null) {
+      strat = new DefaultConnectionStrategy();
+    }
+    
+    if (!strat.hasZkCredentialsToAddAutomatically()) {
+      ZkCredentialsProvider zkCredentialsToAddAutomatically = createZkCredentialsToAddAutomatically();
+      strat.setZkCredentialsToAddAutomatically(zkCredentialsToAddAutomatically);
+    }
+    
+    this.zkClientTimeout = zkClientTimeout;
+    // we must retry at least as long as the session timeout
+    zkCmdExecutor = new ZkCmdExecutor(zkClientTimeout);
+    connManager = new ConnectionManager("ZooKeeperConnection Watcher:"
+        + zkServerAddress, this, zkServerAddress, strat, onReconnect, beforeReconnect);
+
+    try {
+      strat.connect(zkServerAddress, zkClientTimeout, wrapWatcher(connManager),
+          new ZkUpdate() {
+            @Override
+            public void update(SolrZooKeeper zooKeeper) {
+              SolrZooKeeper oldKeeper = keeper;
+              keeper = zooKeeper;
+              try {
+                closeKeeper(oldKeeper);
+              } finally {
+                if (isClosed) {
+                  // we may have been closed
+                  closeKeeper(SolrZkClient.this.keeper);
+                }
+              }
+            }
+          });
+    } catch (Exception e) {
+      connManager.close();
+      if (keeper != null) {
+        try {
+          keeper.close();
+        } catch (InterruptedException e1) {
+          Thread.currentThread().interrupt();
+        }
+      }
+      throw new SolrException(SolrException.ErrorCode.SERVER_ERROR, e);
+    }
+    
+    try {
+      connManager.waitForConnected(clientConnectTimeout);
+    } catch (Exception e) {
+      connManager.close();
+      try {
+        keeper.close();
+      } catch (InterruptedException e1) {
+        Thread.currentThread().interrupt();
+      }
+      throw new SolrException(SolrException.ErrorCode.SERVER_ERROR, e);
+    }
+    assert ObjectReleaseTracker.track(this);
+    if (zkACLProvider == null) {
+      this.zkACLProvider = createZkACLProvider();
+    } else {
+      this.zkACLProvider = zkACLProvider;
+    }
+  }
+
+  public ConnectionManager getConnectionManager() {
+    return connManager;
+  }
+  
+  public ZkClientConnectionStrategy getZkClientConnectionStrategy() {
+    return zkClientConnectionStrategy;
+  }
+
+  public static final String ZK_CRED_PROVIDER_CLASS_NAME_VM_PARAM_NAME = "zkCredentialsProvider";
+  protected ZkCredentialsProvider createZkCredentialsToAddAutomatically() {
+    String zkCredentialsProviderClassName = System.getProperty(ZK_CRED_PROVIDER_CLASS_NAME_VM_PARAM_NAME);
+    if (!StringUtils.isEmpty(zkCredentialsProviderClassName)) {
+      try {
+        log.info("Using ZkCredentialsProvider: " + zkCredentialsProviderClassName);
+        return (ZkCredentialsProvider)Class.forName(zkCredentialsProviderClassName).getConstructor().newInstance();
+      } catch (Throwable t) {
+        // just ignore - go default
+        log.warn("VM param zkCredentialsProvider does not point to a class implementing ZkCredentialsProvider and with a non-arg constructor", t);
+      }
+    }
+    log.info("Using default ZkCredentialsProvider");
+    return new DefaultZkCredentialsProvider();
+  }
+
+  public static final String ZK_ACL_PROVIDER_CLASS_NAME_VM_PARAM_NAME = "zkACLProvider";
+  protected ZkACLProvider createZkACLProvider() {
+    String zkACLProviderClassName = System.getProperty(ZK_ACL_PROVIDER_CLASS_NAME_VM_PARAM_NAME);
+    if (!StringUtils.isEmpty(zkACLProviderClassName)) {
+      try {
+        log.info("Using ZkACLProvider: " + zkACLProviderClassName);
+        return (ZkACLProvider)Class.forName(zkACLProviderClassName).getConstructor().newInstance();
+      } catch (Throwable t) {
+        // just ignore - go default
+        log.warn("VM param zkACLProvider does not point to a class implementing ZkACLProvider and with a non-arg constructor", t);
+      }
+    }
+    log.info("Using default ZkACLProvider");
+    return new DefaultZkACLProvider();
+  }
+  
+  /**
+   * Returns true if client is connected
+   */
+  public boolean isConnected() {
+    return keeper != null && keeper.getState() == ZooKeeper.States.CONNECTED;
+  }
+  
+  public void delete(final String path, final int version, boolean retryOnConnLoss)
+      throws InterruptedException, KeeperException {
+    if (retryOnConnLoss) {
+      zkCmdExecutor.retryOperation(new ZkOperation() {
+        @Override
+        public Stat execute() throws KeeperException, InterruptedException {
+          keeper.delete(path, version);
+          return null;
+        }
+      });
+    } else {
+      keeper.delete(path, version);
+    }
+  }
+
+  private Watcher wrapWatcher (final Watcher watcher) {
+    if (watcher == null) return watcher;
+
+    // wrap the watcher so that it doesn't fire off ZK's event queue
+    return new Watcher() {
+      @Override
+      public void process(final WatchedEvent event) {
+        log.debug("Submitting job to respond to event " + event);
+        zkCallbackExecutor.submit(new Runnable () {
+          @Override
+          public void run () {
+            watcher.process(event);
+          }
+        });
+      }
+    };
+  }
+
+  /**
+   * Return the stat of the node of the given path. Return null if no such a
+   * node exists.
+   * <p>
+   * If the watch is non-null and the call is successful (no exception is thrown),
+   * a watch will be left on the node with the given path. The watch will be
+   * triggered by a successful operation that creates/delete the node or sets
+   * the data on the node.
+   *
+   * @param path the node path
+   * @param watcher explicit watcher
+   * @return the stat of the node of the given path; return null if no such a
+   *         node exists.
+   * @throws KeeperException If the server signals an error
+   * @throws InterruptedException If the server transaction is interrupted.
+   * @throws IllegalArgumentException if an invalid path is specified
+   */
+  public Stat exists(final String path, final Watcher watcher, boolean retryOnConnLoss)
+      throws KeeperException, InterruptedException {
+    if (retryOnConnLoss) {
+      return zkCmdExecutor.retryOperation(new ZkOperation() {
+        @Override
+        public Stat execute() throws KeeperException, InterruptedException {
+          return keeper.exists(path, wrapWatcher(watcher));
+        }
+      });
+    } else {
+      return keeper.exists(path, wrapWatcher(watcher));
+    }
+  }
+  
+  /**
+   * Returns true if path exists
+   */
+  public Boolean exists(final String path, boolean retryOnConnLoss)
+      throws KeeperException, InterruptedException {
+    if (retryOnConnLoss) {
+      return zkCmdExecutor.retryOperation(new ZkOperation() {
+        @Override
+        public Boolean execute() throws KeeperException, InterruptedException {
+          return keeper.exists(path, null) != null;
+        }
+      });
+    } else {
+      return keeper.exists(path, null) != null;
+    }
+  }
+
+  /**
+   * Returns children of the node at the path
+   */
+  public List<String> getChildren(final String path, final Watcher watcher, boolean retryOnConnLoss)
+      throws KeeperException, InterruptedException {
+    if (retryOnConnLoss) {
+      return zkCmdExecutor.retryOperation(new ZkOperation() {
+        @Override
+        public List<String> execute() throws KeeperException, InterruptedException {
+          return keeper.getChildren(path, wrapWatcher(watcher));
+        }
+      });
+    } else {
+      return keeper.getChildren(path, wrapWatcher(watcher));
+    }
+  }
+
+  /**
+   * Returns node's data
+   */
+  public byte[] getData(final String path, final Watcher watcher, final Stat stat, boolean retryOnConnLoss)
+      throws KeeperException, InterruptedException {
+    if (retryOnConnLoss) {
+      return zkCmdExecutor.retryOperation(new ZkOperation() {
+        @Override
+        public byte[] execute() throws KeeperException, InterruptedException {
+          return keeper.getData(path, wrapWatcher(watcher), stat);
+        }
+      });
+    } else {
+      return keeper.getData(path, wrapWatcher(watcher), stat);
+    }
+  }
+
+  /**
+   * Returns node's state
+   */
+  public Stat setData(final String path, final byte data[], final int version, boolean retryOnConnLoss)
+      throws KeeperException, InterruptedException {
+    if (retryOnConnLoss) {
+      return zkCmdExecutor.retryOperation(new ZkOperation() {
+        @Override
+        public Stat execute() throws KeeperException, InterruptedException {
+          return keeper.setData(path, data, version);
+        }
+      });
+    } else {
+      return keeper.setData(path, data, version);
+    }
+  }
+  
+  /**
+   * Returns path of created node
+   */
+  public String create(final String path, final byte[] data,
+      final CreateMode createMode, boolean retryOnConnLoss) throws KeeperException,
+      InterruptedException {
+    if (retryOnConnLoss) {
+      return zkCmdExecutor.retryOperation(new ZkOperation() {
+        @Override
+        public String execute() throws KeeperException, InterruptedException {
+          return keeper.create(path, data, zkACLProvider.getACLsToAdd(path),
+              createMode);
+        }
+      });
+    } else {
+      List<ACL> acls = zkACLProvider.getACLsToAdd(path);
+      return keeper.create(path, data, acls, createMode);
+    }
+  }
+
+  /**
+   * Creates the path in ZooKeeper, creating each node as necessary.
+   * 
+   * e.g. If <code>path=/solr/group/node</code> and none of the nodes, solr,
+   * group, node exist, each will be created.
+   */
+  public void makePath(String path, boolean retryOnConnLoss) throws KeeperException,
+      InterruptedException {
+    makePath(path, null, CreateMode.PERSISTENT, retryOnConnLoss);
+  }
+  
+  public void makePath(String path, boolean failOnExists, boolean retryOnConnLoss) throws KeeperException,
+      InterruptedException {
+    makePath(path, null, CreateMode.PERSISTENT, null, failOnExists, retryOnConnLoss);
+  }
+  
+  public void makePath(String path, File file, boolean failOnExists, boolean retryOnConnLoss)
+      throws IOException, KeeperException, InterruptedException {
+    makePath(path, FileUtils.readFileToByteArray(file),
+        CreateMode.PERSISTENT, null, failOnExists, retryOnConnLoss);
+  }
+  
+  public void makePath(String path, File file, boolean retryOnConnLoss) throws IOException,
+      KeeperException, InterruptedException {
+    makePath(path, FileUtils.readFileToByteArray(file), retryOnConnLoss);
+  }
+  
+  public void makePath(String path, CreateMode createMode, boolean retryOnConnLoss) throws KeeperException,
+      InterruptedException {
+    makePath(path, null, createMode, retryOnConnLoss);
+  }
+
+  /**
+   * Creates the path in ZooKeeper, creating each node as necessary.
+   * 
+   * @param data to set on the last zkNode
+   */
+  public void makePath(String path, byte[] data, boolean retryOnConnLoss) throws KeeperException,
+      InterruptedException {
+    makePath(path, data, CreateMode.PERSISTENT, retryOnConnLoss);
+  }
+
+  /**
+   * Creates the path in ZooKeeper, creating each node as necessary.
+   * 
+   * e.g. If <code>path=/solr/group/node</code> and none of the nodes, solr,
+   * group, node exist, each will be created.
+   * 
+   * @param data to set on the last zkNode
+   */
+  public void makePath(String path, byte[] data, CreateMode createMode, boolean retryOnConnLoss)
+      throws KeeperException, InterruptedException {
+    makePath(path, data, createMode, null, retryOnConnLoss);
+  }
+
+  /**
+   * Creates the path in ZooKeeper, creating each node as necessary.
+   * 
+   * e.g. If <code>path=/solr/group/node</code> and none of the nodes, solr,
+   * group, node exist, each will be created.
+   * 
+   * @param data to set on the last zkNode
+   */
+  public void makePath(String path, byte[] data, CreateMode createMode,
+      Watcher watcher, boolean retryOnConnLoss) throws KeeperException, InterruptedException {
+    makePath(path, data, createMode, watcher, true, retryOnConnLoss);
+  }
+  
+
+
+  /**
+   * Creates the path in ZooKeeper, creating each node as necessary.
+   * 
+   * e.g. If <code>path=/solr/group/node</code> and none of the nodes, solr,
+   * group, node exist, each will be created.
+   * 
+   * Note: retryOnConnLoss is only respected for the final node - nodes
+   * before that are always retried on connection loss.
+   */
+  public void makePath(String path, byte[] data, CreateMode createMode,
+      Watcher watcher, boolean failOnExists, boolean retryOnConnLoss) throws KeeperException, InterruptedException {
+    if (log.isInfoEnabled()) {
+      log.info("makePath: " + path);
+    }
+    boolean retry = true;
+    
+    if (path.startsWith("/")) {
+      path = path.substring(1, path.length());
+    }
+    String[] paths = path.split("/");
+    StringBuilder sbPath = new StringBuilder();
+    for (int i = 0; i < paths.length; i++) {
+      byte[] bytes = null;
+      String pathPiece = paths[i];
+      sbPath.append("/" + pathPiece);
+      final String currentPath = sbPath.toString();
+      Object exists = exists(currentPath, watcher, retryOnConnLoss);
+      if (exists == null || ((i == paths.length -1) && failOnExists)) {
+        CreateMode mode = CreateMode.PERSISTENT;
+        if (i == paths.length - 1) {
+          mode = createMode;
+          bytes = data;
+          if (!retryOnConnLoss) retry = false;
+        }
+        try {
+          if (retry) {
+            final CreateMode finalMode = mode;
+            final byte[] finalBytes = bytes;
+            zkCmdExecutor.retryOperation(new ZkOperation() {
+              @Override
+              public Object execute() throws KeeperException, InterruptedException {
+                keeper.create(currentPath, finalBytes, zkACLProvider.getACLsToAdd(currentPath), finalMode);
+                return null;
+              }
+            });
+          } else {
+            keeper.create(currentPath, bytes, zkACLProvider.getACLsToAdd(currentPath), mode);
+          }
+        } catch (NodeExistsException e) {
+          
+          if (!failOnExists) {
+            // TODO: version ? for now, don't worry about race
+            setData(currentPath, data, -1, retryOnConnLoss);
+            // set new watch
+            exists(currentPath, watcher, retryOnConnLoss);
+            return;
+          }
+          
+          // ignore unless it's the last node in the path
+          if (i == paths.length - 1) {
+            throw e;
+          }
+        }
+        if(i == paths.length -1) {
+          // set new watch
+          exists(currentPath, watcher, retryOnConnLoss);
+        }
+      } else if (i == paths.length - 1) {
+        // TODO: version ? for now, don't worry about race
+        setData(currentPath, data, -1, retryOnConnLoss);
+        // set new watch
+        exists(currentPath, watcher, retryOnConnLoss);
+      }
+    }
+  }
+
+  public void makePath(String zkPath, CreateMode createMode, Watcher watcher, boolean retryOnConnLoss)
+      throws KeeperException, InterruptedException {
+    makePath(zkPath, null, createMode, watcher, retryOnConnLoss);
+  }
+
+  /**
+   * Write data to ZooKeeper.
+   */
+  public Stat setData(String path, byte[] data, boolean retryOnConnLoss) throws KeeperException,
+      InterruptedException {
+    return setData(path, data, -1, retryOnConnLoss);
+  }
+
+  /**
+   * Write file to ZooKeeper - default system encoding used.
+   * 
+   * @param path path to upload file to e.g. /solr/conf/solrconfig.xml
+   * @param file path to file to be uploaded
+   */
+  public Stat setData(String path, File file, boolean retryOnConnLoss) throws IOException,
+      KeeperException, InterruptedException {
+    if (log.isInfoEnabled()) {
+      log.info("Write to ZooKeepeer " + file.getAbsolutePath() + " to " + path);
+    }
+
+    byte[] data = FileUtils.readFileToByteArray(file);
+    return setData(path, data, retryOnConnLoss);
+  }
+
+  /**
+   * Fills string with printout of current ZooKeeper layout.
+   */
+  public void printLayout(String path, int indent, StringBuilder string)
+      throws KeeperException, InterruptedException {
+    byte[] data = getData(path, null, null, true);
+    List<String> children = getChildren(path, null, true);
+    StringBuilder dent = new StringBuilder();
+    for (int i = 0; i < indent; i++) {
+      dent.append(" ");
+    }
+    string.append(dent + path + " (" + children.size() + ")" + NEWL);
+    if (data != null) {
+      String dataString = new String(data, StandardCharsets.UTF_8);
+      if ((!path.endsWith(".txt") && !path.endsWith(".xml")) || path.endsWith(ZkStateReader.CLUSTER_STATE)) {
+        if (path.endsWith(".xml")) {
+          // this is the cluster state in xml format - lets pretty print
+          dataString = prettyPrint(dataString);
+        }
+        
+        string.append(dent + "DATA:\n" + dent + "    "
+            + dataString.replaceAll("\n", "\n" + dent + "    ") + NEWL);
+      } else {
+        string.append(dent + "DATA: ...supressed..." + NEWL);
+      }
+    }
+
+    for (String child : children) {
+      if (!child.equals("quota")) {
+        try {
+          printLayout(path + (path.equals("/") ? "" : "/") + child, indent + 1,
+              string);
+        } catch (NoNodeException e) {
+          // must have gone away
+        }
+      }
+    }
+
+  }
+
+  /**
+   * Prints current ZooKeeper layout to stdout.
+   */
+  public void printLayoutToStdOut() throws KeeperException,
+      InterruptedException {
+    StringBuilder sb = new StringBuilder();
+    printLayout("/", 0, sb);
+    System.out.println(sb.toString());
+  }
+  
+  public static String prettyPrint(String input, int indent) {
+    try {
+      Source xmlInput = new StreamSource(new StringReader(input));
+      StringWriter stringWriter = new StringWriter();
+      StreamResult xmlOutput = new StreamResult(stringWriter);
+      TransformerFactory transformerFactory = TransformerFactory.newInstance();
+      transformerFactory.setAttribute("indent-number", indent);
+      Transformer transformer = transformerFactory.newTransformer();
+      transformer.setOutputProperty(OutputKeys.INDENT, "yes");
+      transformer.transform(xmlInput, xmlOutput);
+      return xmlOutput.getWriter().toString();
+    } catch (Exception e) {
+      throw new RuntimeException("Problem pretty printing XML", e);
+    }
+  }
+  
+  private static String prettyPrint(String input) {
+    return prettyPrint(input, 2);
+  }
+
+  public void close() {
+    if (isClosed) return; // it's okay if we over close - same as solrcore
+    isClosed = true;
+    try {
+      closeKeeper(keeper);
+    } finally {
+      connManager.close();
+      closeCallbackExecutor();
+    }
+    assert ObjectReleaseTracker.release(this);
+  }
+
+  public boolean isClosed() {
+    return isClosed;
+  }
+
+  /**
+   * Allows package private classes to update volatile ZooKeeper.
+   */
+  void updateKeeper(SolrZooKeeper keeper) throws InterruptedException {
+   SolrZooKeeper oldKeeper = this.keeper;
+   this.keeper = keeper;
+   if (oldKeeper != null) {
+     oldKeeper.close();
+   }
+   // we might have been closed already
+   if (isClosed) this.keeper.close();
+  }
+  
+  public SolrZooKeeper getSolrZooKeeper() {
+    return keeper;
+  }
+  
+  private void closeKeeper(SolrZooKeeper keeper) {
+    if (keeper != null) {
+      try {
+        keeper.close();
+      } catch (InterruptedException e) {
+        // Restore the interrupted status
+        Thread.currentThread().interrupt();
+        log.error("", e);
+        throw new ZooKeeperException(SolrException.ErrorCode.SERVER_ERROR, "",
+            e);
+      }
+    }
+  }
+
+  private void closeCallbackExecutor() {
+    try {
+      ExecutorUtil.shutdownAndAwaitTermination(zkCallbackExecutor);
+    } catch (Exception e) {
+      SolrException.log(log, e);
+    }
+  }
+
+  // yeah, it's recursive :(
+  public void clean(String path) throws InterruptedException, KeeperException {
+    List<String> children;
+    try {
+      children = getChildren(path, null, true);
+    } catch (NoNodeException r) {
+      return;
+    }
+    for (String string : children) {
+      // we can't clean the built-in zookeeper node
+      if (path.equals("/") && string.equals("zookeeper")) continue;
+      if (path.equals("/")) {
+        clean(path + string);
+      } else {
+        clean(path + "/" + string);
+      }
+    }
+    try {
+      if (!path.equals("/")) {
+        try {
+          delete(path, -1, true);
+        } catch (NotEmptyException e) {
+          clean(path);
+        }
+      }
+    } catch (NoNodeException r) {
+      return;
+    }
+  }
+  
+  /**
+   * Validates if zkHost contains a chroot. See http://zookeeper.apache.org/doc/r3.2.2/zookeeperProgrammers.html#ch_zkSessions
+   */
+  public static boolean containsChroot(String zkHost) {
+    return zkHost.contains("/");
+  }
+
+  /**
+   * Check to see if a Throwable is an InterruptedException, and if it is, set the thread interrupt flag
+   * @param e the Throwable
+   * @return the Throwable
+   */
+  public static Throwable checkInterrupted(Throwable e) {
+    if (e instanceof InterruptedException)
+      Thread.interrupted();
+    return e;
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-ranger/blob/40aa090d/ranger_solrj/src/main/java/org/apache/solr/common/cloud/SolrZooKeeper.java
----------------------------------------------------------------------
diff --git a/ranger_solrj/src/main/java/org/apache/solr/common/cloud/SolrZooKeeper.java b/ranger_solrj/src/main/java/org/apache/solr/common/cloud/SolrZooKeeper.java
new file mode 100644
index 0000000..35ad8bf
--- /dev/null
+++ b/ranger_solrj/src/main/java/org/apache/solr/common/cloud/SolrZooKeeper.java
@@ -0,0 +1,103 @@
+package org.apache.solr.common.cloud;
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+import java.io.IOException;
+import java.lang.reflect.Field;
+import java.lang.reflect.InvocationTargetException;
+import java.lang.reflect.Method;
+import java.net.SocketAddress;
+import java.util.Set;
+import java.util.concurrent.CopyOnWriteArraySet;
+
+import org.apache.zookeeper.ClientCnxn;
+import org.apache.zookeeper.Watcher;
+import org.apache.zookeeper.ZooKeeper;
+
+// we use this class to expose nasty stuff for tests
+public class SolrZooKeeper extends ZooKeeper {
+  final Set<Thread> spawnedThreads = new CopyOnWriteArraySet<>();
+  
+  // for test debug
+  //static Map<SolrZooKeeper,Exception> clients = new ConcurrentHashMap<SolrZooKeeper,Exception>();
+
+  public SolrZooKeeper(String connectString, int sessionTimeout,
+      Watcher watcher) throws IOException {
+    super(connectString, sessionTimeout, watcher);
+    //clients.put(this, new RuntimeException());
+  }
+  
+  public ClientCnxn getConnection() {
+    return cnxn;
+  }
+  
+  public SocketAddress getSocketAddress() {
+    return testableLocalSocketAddress();
+  }
+  
+  public void closeCnxn() {
+    final Thread t = new Thread() {
+      @Override
+      public void run() {
+        try {
+          final ClientCnxn cnxn = getConnection();
+          synchronized (cnxn) {
+            try {
+              final Field sendThreadFld = cnxn.getClass().getDeclaredField("sendThread");
+              sendThreadFld.setAccessible(true);
+              Object sendThread = sendThreadFld.get(cnxn);
+              if (sendThread != null) {
+                Method method = sendThread.getClass().getDeclaredMethod("testableCloseSocket");
+                method.setAccessible(true);
+                try {
+                  method.invoke(sendThread);
+                } catch (InvocationTargetException e) {
+                  // is fine
+                }
+              }
+            } catch (Exception e) {
+              throw new RuntimeException("Closing Zookeeper send channel failed.", e);
+            }
+          }
+        } finally {
+          spawnedThreads.remove(this);
+        }
+      }
+    };
+    spawnedThreads.add(t);
+    t.start();
+  }
+
+  @Override
+  public synchronized void close() throws InterruptedException {
+    for (Thread t : spawnedThreads) {
+      if (t.isAlive()) t.interrupt();
+    }
+    super.close();
+  }
+  
+//  public static void assertCloses() {
+//    if (clients.size() > 0) {
+//      Iterator<Exception> stacktraces = clients.values().iterator();
+//      Exception cause = null;
+//      cause = stacktraces.next();
+//      throw new RuntimeException("Found a bad one!", cause);
+//    }
+//  }
+  
+}

http://git-wip-us.apache.org/repos/asf/incubator-ranger/blob/40aa090d/ranger_solrj/src/main/java/org/apache/solr/common/cloud/VMParamsAllAndReadonlyDigestZkACLProvider.java
----------------------------------------------------------------------
diff --git a/ranger_solrj/src/main/java/org/apache/solr/common/cloud/VMParamsAllAndReadonlyDigestZkACLProvider.java b/ranger_solrj/src/main/java/org/apache/solr/common/cloud/VMParamsAllAndReadonlyDigestZkACLProvider.java
new file mode 100644
index 0000000..0b9ae1d
--- /dev/null
+++ b/ranger_solrj/src/main/java/org/apache/solr/common/cloud/VMParamsAllAndReadonlyDigestZkACLProvider.java
@@ -0,0 +1,89 @@
+package org.apache.solr.common.cloud;
+
+import java.security.NoSuchAlgorithmException;
+import java.util.ArrayList;
+import java.util.List;
+
+import org.apache.solr.common.StringUtils;
+import org.apache.zookeeper.ZooDefs;
+import org.apache.zookeeper.data.ACL;
+import org.apache.zookeeper.data.Id;
+import org.apache.zookeeper.server.auth.DigestAuthenticationProvider;
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+public class VMParamsAllAndReadonlyDigestZkACLProvider extends DefaultZkACLProvider {
+
+  public static final String DEFAULT_DIGEST_READONLY_USERNAME_VM_PARAM_NAME = "zkDigestReadonlyUsername";
+  public static final String DEFAULT_DIGEST_READONLY_PASSWORD_VM_PARAM_NAME = "zkDigestReadonlyPassword";
+  
+  final String zkDigestAllUsernameVMParamName;
+  final String zkDigestAllPasswordVMParamName;
+  final String zkDigestReadonlyUsernameVMParamName;
+  final String zkDigestReadonlyPasswordVMParamName;
+  
+  public VMParamsAllAndReadonlyDigestZkACLProvider() {
+    this(
+        VMParamsSingleSetCredentialsDigestZkCredentialsProvider.DEFAULT_DIGEST_USERNAME_VM_PARAM_NAME, 
+        VMParamsSingleSetCredentialsDigestZkCredentialsProvider.DEFAULT_DIGEST_PASSWORD_VM_PARAM_NAME,
+        DEFAULT_DIGEST_READONLY_USERNAME_VM_PARAM_NAME,
+        DEFAULT_DIGEST_READONLY_PASSWORD_VM_PARAM_NAME
+        );
+  }
+  
+  public VMParamsAllAndReadonlyDigestZkACLProvider(String zkDigestAllUsernameVMParamName, String zkDigestAllPasswordVMParamName,
+      String zkDigestReadonlyUsernameVMParamName, String zkDigestReadonlyPasswordVMParamName) {
+    this.zkDigestAllUsernameVMParamName = zkDigestAllUsernameVMParamName;
+    this.zkDigestAllPasswordVMParamName = zkDigestAllPasswordVMParamName;
+    this.zkDigestReadonlyUsernameVMParamName = zkDigestReadonlyUsernameVMParamName;
+    this.zkDigestReadonlyPasswordVMParamName = zkDigestReadonlyPasswordVMParamName;
+  }
+
+
+  @Override
+  protected List<ACL> createGlobalACLsToAdd() {
+    try {
+      List<ACL> result = new ArrayList<ACL>();
+  
+      // Not to have to provide too much credentials and ACL information to the process it is assumed that you want "ALL"-acls
+      // added to the user you are using to connect to ZK (if you are using VMParamsSingleSetCredentialsDigestZkCredentialsProvider)
+      String digestAllUsername = System.getProperty(zkDigestAllUsernameVMParamName);
+      String digestAllPassword = System.getProperty(zkDigestAllPasswordVMParamName);
+      if (!StringUtils.isEmpty(digestAllUsername) && !StringUtils.isEmpty(digestAllPassword)) {
+        result.add(new ACL(ZooDefs.Perms.ALL, new Id("digest", DigestAuthenticationProvider.generateDigest(digestAllUsername + ":" + digestAllPassword))));
+      }
+  
+      // Besides that support for adding additional "READONLY"-acls for another user
+      String digestReadonlyUsername = System.getProperty(zkDigestReadonlyUsernameVMParamName);
+      String digestReadonlyPassword = System.getProperty(zkDigestReadonlyPasswordVMParamName);
+      if (!StringUtils.isEmpty(digestReadonlyUsername) && !StringUtils.isEmpty(digestReadonlyPassword)) {
+        result.add(new ACL(ZooDefs.Perms.READ, new Id("digest", DigestAuthenticationProvider.generateDigest(digestReadonlyUsername + ":" + digestReadonlyPassword))));
+      }
+      
+      if (result.isEmpty()) {
+        result = super.createGlobalACLsToAdd();
+      }
+      
+      return result;
+    } catch (NoSuchAlgorithmException e) {
+      throw new RuntimeException(e);
+    }
+  }
+
+}
+

http://git-wip-us.apache.org/repos/asf/incubator-ranger/blob/40aa090d/ranger_solrj/src/main/java/org/apache/solr/common/cloud/VMParamsSingleSetCredentialsDigestZkCredentialsProvider.java
----------------------------------------------------------------------
diff --git a/ranger_solrj/src/main/java/org/apache/solr/common/cloud/VMParamsSingleSetCredentialsDigestZkCredentialsProvider.java b/ranger_solrj/src/main/java/org/apache/solr/common/cloud/VMParamsSingleSetCredentialsDigestZkCredentialsProvider.java
new file mode 100644
index 0000000..1e575fd
--- /dev/null
+++ b/ranger_solrj/src/main/java/org/apache/solr/common/cloud/VMParamsSingleSetCredentialsDigestZkCredentialsProvider.java
@@ -0,0 +1,60 @@
+package org.apache.solr.common.cloud;
+
+import java.io.UnsupportedEncodingException;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.List;
+
+import org.apache.solr.common.StringUtils;
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+public class VMParamsSingleSetCredentialsDigestZkCredentialsProvider extends DefaultZkCredentialsProvider {
+  
+  public static final String DEFAULT_DIGEST_USERNAME_VM_PARAM_NAME = "zkDigestUsername";
+  public static final String DEFAULT_DIGEST_PASSWORD_VM_PARAM_NAME = "zkDigestPassword";
+  
+  final String zkDigestUsernameVMParamName;
+  final String zkDigestPasswordVMParamName;
+  
+  public VMParamsSingleSetCredentialsDigestZkCredentialsProvider() {
+    this(DEFAULT_DIGEST_USERNAME_VM_PARAM_NAME, DEFAULT_DIGEST_PASSWORD_VM_PARAM_NAME);
+  }
+  
+  public VMParamsSingleSetCredentialsDigestZkCredentialsProvider(String zkDigestUsernameVMParamName, String zkDigestPasswordVMParamName) {
+    this.zkDigestUsernameVMParamName = zkDigestUsernameVMParamName;
+    this.zkDigestPasswordVMParamName = zkDigestPasswordVMParamName;
+  }
+
+  @Override
+  protected Collection<ZkCredentials> createCredentials() {
+    List<ZkCredentials> result = new ArrayList<ZkCredentials>();
+    String digestUsername = System.getProperty(zkDigestUsernameVMParamName);
+    String digestPassword = System.getProperty(zkDigestPasswordVMParamName);
+    if (!StringUtils.isEmpty(digestUsername) && !StringUtils.isEmpty(digestPassword)) {
+      try {
+        result.add(new ZkCredentials("digest", (digestUsername + ":" + digestPassword).getBytes("UTF-8")));
+      } catch (UnsupportedEncodingException e) {
+        throw new RuntimeException(e);
+      }
+    }
+    return result;
+  }
+  
+}
+

http://git-wip-us.apache.org/repos/asf/incubator-ranger/blob/40aa090d/ranger_solrj/src/main/java/org/apache/solr/common/cloud/ZkACLProvider.java
----------------------------------------------------------------------
diff --git a/ranger_solrj/src/main/java/org/apache/solr/common/cloud/ZkACLProvider.java b/ranger_solrj/src/main/java/org/apache/solr/common/cloud/ZkACLProvider.java
new file mode 100644
index 0000000..03149b3
--- /dev/null
+++ b/ranger_solrj/src/main/java/org/apache/solr/common/cloud/ZkACLProvider.java
@@ -0,0 +1,28 @@
+package org.apache.solr.common.cloud;
+
+import java.util.List;
+
+import org.apache.zookeeper.data.ACL;
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+public interface ZkACLProvider {
+  
+  List<ACL> getACLsToAdd(String zNodePath);
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-ranger/blob/40aa090d/ranger_solrj/src/main/java/org/apache/solr/common/cloud/ZkClientConnectionStrategy.java
----------------------------------------------------------------------
diff --git a/ranger_solrj/src/main/java/org/apache/solr/common/cloud/ZkClientConnectionStrategy.java b/ranger_solrj/src/main/java/org/apache/solr/common/cloud/ZkClientConnectionStrategy.java
new file mode 100644
index 0000000..5f4baa5
--- /dev/null
+++ b/ranger_solrj/src/main/java/org/apache/solr/common/cloud/ZkClientConnectionStrategy.java
@@ -0,0 +1,113 @@
+package org.apache.solr.common.cloud;
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with this
+ * work for additional information regarding copyright ownership. The ASF
+ * licenses this file to You under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * 
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ */
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.concurrent.TimeoutException;
+
+import org.apache.solr.common.SolrException;
+import org.apache.solr.common.cloud.ZkCredentialsProvider.ZkCredentials;
+import org.apache.zookeeper.Watcher;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ *
+ */
+public abstract class ZkClientConnectionStrategy {
+  private static Logger log = LoggerFactory.getLogger(ZkClientConnectionStrategy.class);
+  
+  private volatile ZkCredentialsProvider zkCredentialsToAddAutomatically;
+  private volatile boolean zkCredentialsToAddAutomaticallyUsed;
+  
+  private List<DisconnectedListener> disconnectedListeners = new ArrayList<>();
+  private List<ConnectedListener> connectedListeners = new ArrayList<>();
+  
+  public abstract void connect(String zkServerAddress, int zkClientTimeout, Watcher watcher, ZkUpdate updater) throws IOException, InterruptedException, TimeoutException;
+  public abstract void reconnect(String serverAddress, int zkClientTimeout, Watcher watcher, ZkUpdate updater) throws IOException, InterruptedException, TimeoutException;
+  
+  public ZkClientConnectionStrategy() {
+    zkCredentialsToAddAutomaticallyUsed = false;
+  }
+  
+  public synchronized void disconnected() {
+    for (DisconnectedListener listener : disconnectedListeners) {
+      try {
+        listener.disconnected();
+      } catch (Exception e) {
+        SolrException.log(log, "", e);
+      }
+    }
+  }
+  
+  public synchronized void connected() {
+    for (ConnectedListener listener : connectedListeners) {
+      try {
+        listener.connected();
+      } catch (Exception e) {
+        SolrException.log(log, "", e);
+      }
+    }
+  }
+  
+  public interface DisconnectedListener {
+    public void disconnected();
+  };
+  
+  public interface ConnectedListener {
+    public void connected();
+  };
+  
+  
+  public synchronized void addDisconnectedListener(DisconnectedListener listener) {
+    disconnectedListeners.add(listener);
+  }
+  
+  public synchronized void addConnectedListener(ConnectedListener listener) {
+    connectedListeners.add(listener);
+  }
+  
+  public static abstract class ZkUpdate {
+    public abstract void update(SolrZooKeeper zooKeeper) throws InterruptedException, TimeoutException, IOException;
+  }
+  
+  public void setZkCredentialsToAddAutomatically(ZkCredentialsProvider zkCredentialsToAddAutomatically) {
+    if (zkCredentialsToAddAutomaticallyUsed || (zkCredentialsToAddAutomatically == null)) 
+      throw new RuntimeException("Cannot change zkCredentialsToAddAutomatically after it has been (connect or reconnect was called) used or to null");
+    this.zkCredentialsToAddAutomatically = zkCredentialsToAddAutomatically;
+  }
+  
+  public boolean hasZkCredentialsToAddAutomatically() {
+    return zkCredentialsToAddAutomatically != null;
+  }
+  
+  protected SolrZooKeeper createSolrZooKeeper(final String serverAddress, final int zkClientTimeout,
+      final Watcher watcher) throws IOException {
+    SolrZooKeeper result = new SolrZooKeeper(serverAddress, zkClientTimeout, watcher);
+    
+    zkCredentialsToAddAutomaticallyUsed = true;
+    for (ZkCredentials zkCredentials : zkCredentialsToAddAutomatically.getCredentials()) {
+      result.addAuthInfo(zkCredentials.getScheme(), zkCredentials.getAuth());
+    }
+
+    return result;
+  }
+  
+}

http://git-wip-us.apache.org/repos/asf/incubator-ranger/blob/40aa090d/ranger_solrj/src/main/java/org/apache/solr/common/cloud/ZkCmdExecutor.java
----------------------------------------------------------------------
diff --git a/ranger_solrj/src/main/java/org/apache/solr/common/cloud/ZkCmdExecutor.java b/ranger_solrj/src/main/java/org/apache/solr/common/cloud/ZkCmdExecutor.java
new file mode 100644
index 0000000..d77ad06
--- /dev/null
+++ b/ranger_solrj/src/main/java/org/apache/solr/common/cloud/ZkCmdExecutor.java
@@ -0,0 +1,111 @@
+package org.apache.solr.common.cloud;
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+import org.apache.zookeeper.CreateMode;
+import org.apache.zookeeper.KeeperException;
+import org.apache.zookeeper.KeeperException.NodeExistsException;
+
+
+public class ZkCmdExecutor {
+  private long retryDelay = 1500L; // 1 second would match timeout, so 500 ms over for padding
+  private int retryCount;
+  private double timeouts;
+  
+  /**
+   * TODO: At this point, this should probably take a SolrZkClient in
+   * its constructor.
+   * 
+   * @param timeoutms
+   *          the client timeout for the ZooKeeper clients that will be used
+   *          with this class.
+   */
+  public ZkCmdExecutor(int timeoutms) {
+    timeouts = timeoutms / 1000.0;
+    this.retryCount = Math.round(0.5f * ((float)Math.sqrt(8.0f * timeouts + 1.0f) - 1.0f)) + 1;
+  }
+  
+  public long getRetryDelay() {
+    return retryDelay;
+  }
+  
+  public void setRetryDelay(long retryDelay) {
+    this.retryDelay = retryDelay;
+  }
+  
+
+  /**
+   * Perform the given operation, retrying if the connection fails
+   */
+  @SuppressWarnings("unchecked")
+  public <T> T retryOperation(ZkOperation operation)
+      throws KeeperException, InterruptedException {
+    KeeperException exception = null;
+    for (int i = 0; i < retryCount; i++) {
+      try {
+        return (T) operation.execute();
+      } catch (KeeperException.ConnectionLossException e) {
+        if (exception == null) {
+          exception = e;
+        }
+        if (Thread.currentThread().isInterrupted()) {
+          Thread.currentThread().interrupt();
+          throw new InterruptedException();
+        }
+        if (Thread.currentThread() instanceof ClosableThread) {
+          if (((ClosableThread) Thread.currentThread()).isClosed()) {
+            throw exception;
+          }
+        }
+        if (i != retryCount -1) {
+          retryDelay(i);
+        }
+      }
+    }
+    throw exception;
+  }
+  
+  public void ensureExists(String path, final SolrZkClient zkClient) throws KeeperException, InterruptedException {
+    ensureExists(path, null, CreateMode.PERSISTENT, zkClient);
+  }
+  
+  public void ensureExists(final String path, final byte[] data,
+      CreateMode createMode, final SolrZkClient zkClient) throws KeeperException, InterruptedException {
+    
+    if (zkClient.exists(path, true)) {
+      return;
+    }
+    try {
+      zkClient.makePath(path, data, true);
+    } catch (NodeExistsException e) {
+      // it's okay if another beats us creating the node
+    }
+    
+  }
+  
+  /**
+   * Performs a retry delay if this is not the first attempt
+   * 
+   * @param attemptCount
+   *          the number of the attempts performed so far
+   */
+  protected void retryDelay(int attemptCount) throws InterruptedException {
+    Thread.sleep((attemptCount + 1) * retryDelay);
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-ranger/blob/40aa090d/ranger_solrj/src/main/java/org/apache/solr/common/cloud/ZkConfigManager.java
----------------------------------------------------------------------
diff --git a/ranger_solrj/src/main/java/org/apache/solr/common/cloud/ZkConfigManager.java b/ranger_solrj/src/main/java/org/apache/solr/common/cloud/ZkConfigManager.java
new file mode 100644
index 0000000..a3a8060
--- /dev/null
+++ b/ranger_solrj/src/main/java/org/apache/solr/common/cloud/ZkConfigManager.java
@@ -0,0 +1,145 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.solr.common.cloud;
+
+import org.apache.zookeeper.KeeperException;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.nio.file.FileVisitResult;
+import java.nio.file.Files;
+import java.nio.file.Path;
+import java.nio.file.SimpleFileVisitor;
+import java.nio.file.attribute.BasicFileAttributes;
+import java.util.Collections;
+import java.util.List;
+
+/**
+ * Class that manages named configs in Zookeeper
+ */
+public class ZkConfigManager {
+
+  private static final Logger logger = LoggerFactory.getLogger(ZkConfigManager.class);
+
+  /** ZkNode where named configs are stored */
+  public static final String CONFIGS_ZKNODE = "/configs";
+
+  private final SolrZkClient zkClient;
+
+  /**
+   * Creates a new ZkConfigManager
+   * @param zkClient the {@link SolrZkClient} to use
+   */
+  public ZkConfigManager(SolrZkClient zkClient) {
+    this.zkClient = zkClient;
+  }
+
+  private void uploadToZK(final Path rootPath, final String zkPath) throws IOException {
+
+    if (!Files.exists(rootPath))
+      throw new IOException("Path " + rootPath + " does not exist");
+
+    Files.walkFileTree(rootPath, new SimpleFileVisitor<Path>(){
+      @Override
+      public FileVisitResult visitFile(Path file, BasicFileAttributes attrs) throws IOException {
+        String filename = file.getFileName().toString();
+        if (filename.startsWith("."))
+          return FileVisitResult.CONTINUE;
+        String zkNode = createZkNodeName(zkPath, rootPath, file);
+        try {
+          zkClient.makePath(zkNode, file.toFile(), false, true);
+        } catch (KeeperException | InterruptedException e) {
+          throw new IOException("Error uploading file " + file.toString() + " to zookeeper path " + zkNode,
+              SolrZkClient.checkInterrupted(e));
+        }
+        return FileVisitResult.CONTINUE;
+      }
+
+      @Override
+      public FileVisitResult preVisitDirectory(Path dir, BasicFileAttributes attrs) throws IOException {
+        return (dir.getFileName().toString().startsWith(".")) ? FileVisitResult.SKIP_SUBTREE : FileVisitResult.CONTINUE;
+      }
+    });
+  }
+
+  private static String createZkNodeName(String zkRoot, Path root, Path file) {
+    String relativePath = root.relativize(file).toString();
+    // Windows shenanigans
+    String separator = root.getFileSystem().getSeparator();
+    if ("\\".equals(separator))
+      relativePath = relativePath.replaceAll("\\\\", "/");
+    return zkRoot + "/" + relativePath;
+  }
+
+  private void downloadFromZK(String zkPath, Path dir) throws IOException {
+    try {
+      List<String> files = zkClient.getChildren(zkPath, null, true);
+      Files.createDirectories(dir);
+      for (String file : files) {
+        List<String> children = zkClient.getChildren(zkPath + "/" + file, null, true);
+        if (children.size() == 0) {
+          byte[] data = zkClient.getData(zkPath + "/" + file, null, null, true);
+          Path filename = dir.resolve(file);
+          logger.info("Writing file {}", filename);
+          Files.write(filename, data);
+        } else {
+          downloadFromZK(zkPath + "/" + file, dir.resolve(file));
+        }
+      }
+    }
+    catch (KeeperException | InterruptedException e) {
+      throw new IOException("Error downloading files from zookeeper path " + zkPath + " to " + dir.toString(),
+          SolrZkClient.checkInterrupted(e));
+    }
+  }
+
+  /**
+   * Upload files from a given path to a config in Zookeeper
+   * @param dir         {@link java.nio.file.Path} to the files
+   * @param configName  the name to give the config
+   * @throws IOException
+   *                    if an I/O error occurs or the path does not exist
+   */
+  public void uploadConfigDir(Path dir, String configName) throws IOException {
+    uploadToZK(dir, CONFIGS_ZKNODE + "/" + configName);
+  }
+
+  /**
+   * Download a config from Zookeeper and write it to the filesystem
+   * @param configName  the config to download
+   * @param dir         the {@link Path} to write files under
+   * @throws IOException
+   *                    if an I/O error occurs or the config does not exist
+   */
+  public void downloadConfigDir(String configName, Path dir) throws IOException {
+    downloadFromZK(CONFIGS_ZKNODE + "/" + configName, dir);
+  }
+
+  public List<String> listConfigs() throws IOException {
+    try {
+      return zkClient.getChildren(ZkConfigManager.CONFIGS_ZKNODE, null, true);
+    }
+    catch (KeeperException.NoNodeException e) {
+      return Collections.emptyList();
+    }
+    catch (KeeperException | InterruptedException e) {
+      throw new IOException("Error listing configs", SolrZkClient.checkInterrupted(e));
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-ranger/blob/40aa090d/ranger_solrj/src/main/java/org/apache/solr/common/cloud/ZkCoreNodeProps.java
----------------------------------------------------------------------
diff --git a/ranger_solrj/src/main/java/org/apache/solr/common/cloud/ZkCoreNodeProps.java b/ranger_solrj/src/main/java/org/apache/solr/common/cloud/ZkCoreNodeProps.java
new file mode 100644
index 0000000..131d330
--- /dev/null
+++ b/ranger_solrj/src/main/java/org/apache/solr/common/cloud/ZkCoreNodeProps.java
@@ -0,0 +1,74 @@
+package org.apache.solr.common.cloud;
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with this
+ * work for additional information regarding copyright ownership. The ASF
+ * licenses this file to You under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * 
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ */
+
+public class ZkCoreNodeProps {
+  private ZkNodeProps nodeProps;
+  
+  public ZkCoreNodeProps(ZkNodeProps nodeProps) {
+    this.nodeProps = nodeProps;
+  }
+  
+  public String getCoreUrl() {
+    return getCoreUrl(nodeProps.getStr(ZkStateReader.BASE_URL_PROP), nodeProps.getStr(ZkStateReader.CORE_NAME_PROP));
+  }
+  
+  public String getNodeName() {
+    return nodeProps.getStr(ZkStateReader.NODE_NAME_PROP);
+  }
+
+  public String getState() {
+    return nodeProps.getStr(ZkStateReader.STATE_PROP);
+  }
+
+  public String getBaseUrl() {
+    return nodeProps.getStr(ZkStateReader.BASE_URL_PROP);
+  }
+  
+  public String getCoreName() {
+    return nodeProps.getStr(ZkStateReader.CORE_NAME_PROP);
+  }
+  
+  public static String getCoreUrl(ZkNodeProps nodeProps) {
+    return getCoreUrl(nodeProps.getStr(ZkStateReader.BASE_URL_PROP), nodeProps.getStr(ZkStateReader.CORE_NAME_PROP));
+  }
+  
+  public static String getCoreUrl(String baseUrl, String coreName) {
+    StringBuilder sb = new StringBuilder();
+    sb.append(baseUrl);
+    if (!baseUrl.endsWith("/")) sb.append("/");
+    sb.append(coreName);
+    if (!(sb.substring(sb.length() - 1).equals("/"))) sb.append("/");
+    return sb.toString();
+  }
+
+  @Override
+  public String toString() {
+    return nodeProps.toString();
+  }
+
+  public ZkNodeProps getNodeProps() {
+    return nodeProps;
+  }
+
+  public boolean isLeader() {
+    return nodeProps.containsKey(ZkStateReader.LEADER_PROP);
+  }
+
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-ranger/blob/40aa090d/ranger_solrj/src/main/java/org/apache/solr/common/cloud/ZkCredentialsProvider.java
----------------------------------------------------------------------
diff --git a/ranger_solrj/src/main/java/org/apache/solr/common/cloud/ZkCredentialsProvider.java b/ranger_solrj/src/main/java/org/apache/solr/common/cloud/ZkCredentialsProvider.java
new file mode 100644
index 0000000..b4ab6d8
--- /dev/null
+++ b/ranger_solrj/src/main/java/org/apache/solr/common/cloud/ZkCredentialsProvider.java
@@ -0,0 +1,45 @@
+package org.apache.solr.common.cloud;
+
+import java.util.Collection;
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+public interface ZkCredentialsProvider {
+  
+  public class ZkCredentials {
+    String scheme;
+    byte[] auth;
+    
+    public ZkCredentials(String scheme, byte[] auth) {
+      super();
+      this.scheme = scheme;
+      this.auth = auth;
+    }
+    
+    String getScheme() {
+      return scheme;
+    }
+    
+    byte[] getAuth() {
+      return auth;
+    }
+  }
+  
+  Collection<ZkCredentials> getCredentials();
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-ranger/blob/40aa090d/ranger_solrj/src/main/java/org/apache/solr/common/cloud/ZkNodeProps.java
----------------------------------------------------------------------
diff --git a/ranger_solrj/src/main/java/org/apache/solr/common/cloud/ZkNodeProps.java b/ranger_solrj/src/main/java/org/apache/solr/common/cloud/ZkNodeProps.java
new file mode 100644
index 0000000..5ddfa24
--- /dev/null
+++ b/ranger_solrj/src/main/java/org/apache/solr/common/cloud/ZkNodeProps.java
@@ -0,0 +1,154 @@
+package org.apache.solr.common.cloud;
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+import org.noggit.JSONUtil;
+import org.noggit.JSONWriter;
+
+import java.util.HashMap;
+import java.util.LinkedHashMap;
+import java.util.Map;
+import java.util.Set;
+
+/**
+ * ZkNodeProps contains generic immutable properties.
+ */
+public class ZkNodeProps implements JSONWriter.Writable {
+
+  protected final Map<String,Object> propMap;
+
+  /**
+   * Construct ZKNodeProps from map.
+   */
+  public ZkNodeProps(Map<String,Object> propMap) {
+    this.propMap = propMap;
+    // TODO: store an unmodifiable map, but in a way that guarantees not to wrap more than once.
+    // Always wrapping introduces a memory leak.
+  }
+
+
+  /**
+   * Constructor that populates the from array of Strings in form key1, value1,
+   * key2, value2, ..., keyN, valueN
+   */
+  public ZkNodeProps(String... keyVals) {
+    this( makeMap((Object[])keyVals) );
+  }
+
+  public static ZkNodeProps fromKeyVals(Object... keyVals)  {
+    return new ZkNodeProps( makeMap(keyVals) );
+  }
+
+  public static Map<String,Object> makeMap(Object... keyVals) {
+    if ((keyVals.length & 0x01) != 0) {
+      throw new IllegalArgumentException("arguments should be key,value");
+    }
+    Map<String,Object> propMap = new LinkedHashMap<>(keyVals.length>>1);
+    for (int i = 0; i < keyVals.length; i+=2) {
+      propMap.put(keyVals[i].toString(), keyVals[i+1]);
+    }
+    return propMap;
+  }
+
+
+  /**
+   * Get property keys.
+   */
+  public Set<String> keySet() {
+    return propMap.keySet();
+  }
+
+  /**
+   * Get all properties as map.
+   */
+  public Map<String, Object> getProperties() {
+    return propMap;
+  }
+
+  /** Returns a shallow writable copy of the properties */
+  public Map<String,Object> shallowCopy() {
+    return new LinkedHashMap<>(propMap);
+  }
+
+  /**
+   * Create Replica from json string that is typically stored in zookeeper.
+   */
+  public static ZkNodeProps load(byte[] bytes) {
+    Map<String, Object> props = (Map<String, Object>) ZkStateReader.fromJSON(bytes);
+    return new ZkNodeProps(props);
+  }
+
+  @Override
+  public void write(JSONWriter jsonWriter) {
+    jsonWriter.write(propMap);
+  }
+  
+  /**
+   * Get a string property value.
+   */
+  public String getStr(String key) {
+    Object o = propMap.get(key);
+    return o == null ? null : o.toString();
+  }
+
+  /**
+   * Get a string property value.
+   */
+  public Integer getInt(String key, Integer def) {
+    Object o = propMap.get(key);
+    return o == null ? def : Integer.valueOf(o.toString());
+  }
+
+  /**
+   * Get a string property value.
+   */
+  public String getStr(String key,String def) {
+    Object o = propMap.get(key);
+    return o == null ? def : o.toString();
+  }
+
+  public Object get(String key) {
+    return propMap.get(key);
+  }
+
+  @Override
+  public String toString() {
+    return JSONUtil.toJSON(this);
+    /***
+    StringBuilder sb = new StringBuilder();
+    Set<Entry<String,Object>> entries = propMap.entrySet();
+    for(Entry<String,Object> entry : entries) {
+      sb.append(entry.getKey() + "=" + entry.getValue() + "\n");
+    }
+    return sb.toString();
+    ***/
+  }
+
+  /**
+   * Check if property key exists.
+   */
+  public boolean containsKey(String key) {
+    return propMap.containsKey(key);
+  }
+
+  public boolean getBool(String key, boolean b) {
+    Object o = propMap.get(key);
+    if(o==null) return b;
+    return Boolean.parseBoolean(o.toString());
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-ranger/blob/40aa090d/ranger_solrj/src/main/java/org/apache/solr/common/cloud/ZkOperation.java
----------------------------------------------------------------------
diff --git a/ranger_solrj/src/main/java/org/apache/solr/common/cloud/ZkOperation.java b/ranger_solrj/src/main/java/org/apache/solr/common/cloud/ZkOperation.java
new file mode 100644
index 0000000..b4da540
--- /dev/null
+++ b/ranger_solrj/src/main/java/org/apache/solr/common/cloud/ZkOperation.java
@@ -0,0 +1,37 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.solr.common.cloud;
+
+import java.io.IOException;
+
+import org.apache.zookeeper.KeeperException;
+
+/**
+ * A callback object which can be used for implementing retry-able operations.
+ *
+ */
+public abstract class ZkOperation {
+
+    /**
+     * Performs the operation - which may be involved multiple times if the connection
+     * to ZooKeeper closes during this operation
+     *
+     * @return the result of the operation or null
+     */
+    public abstract Object execute() throws KeeperException, InterruptedException;
+}