You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ranger.apache.org by bo...@apache.org on 2015/03/18 00:33:29 UTC
[07/17] incubator-ranger git commit: Support for Solr as Audit
Destination.
http://git-wip-us.apache.org/repos/asf/incubator-ranger/blob/40aa090d/ranger_solrj/src/main/java/org/apache/solr/common/cloud/SolrZkClient.java
----------------------------------------------------------------------
diff --git a/ranger_solrj/src/main/java/org/apache/solr/common/cloud/SolrZkClient.java b/ranger_solrj/src/main/java/org/apache/solr/common/cloud/SolrZkClient.java
new file mode 100644
index 0000000..f14aedd
--- /dev/null
+++ b/ranger_solrj/src/main/java/org/apache/solr/common/cloud/SolrZkClient.java
@@ -0,0 +1,736 @@
+package org.apache.solr.common.cloud;
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with this
+ * work for additional information regarding copyright ownership. The ASF
+ * licenses this file to You under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ */
+
+import org.apache.commons.io.FileUtils;
+import org.apache.solr.common.SolrException;
+import org.apache.solr.common.StringUtils;
+import org.apache.solr.common.cloud.ZkClientConnectionStrategy.ZkUpdate;
+import org.apache.solr.common.util.ExecutorUtil;
+import org.apache.solr.common.util.ObjectReleaseTracker;
+import org.apache.solr.common.util.SolrjNamedThreadFactory;
+import org.apache.zookeeper.CreateMode;
+import org.apache.zookeeper.KeeperException;
+import org.apache.zookeeper.KeeperException.NoNodeException;
+import org.apache.zookeeper.KeeperException.NodeExistsException;
+import org.apache.zookeeper.KeeperException.NotEmptyException;
+import org.apache.zookeeper.WatchedEvent;
+import org.apache.zookeeper.Watcher;
+import org.apache.zookeeper.ZooKeeper;
+import org.apache.zookeeper.data.ACL;
+import org.apache.zookeeper.data.Stat;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.xml.transform.OutputKeys;
+import javax.xml.transform.Source;
+import javax.xml.transform.Transformer;
+import javax.xml.transform.TransformerFactory;
+import javax.xml.transform.stream.StreamResult;
+import javax.xml.transform.stream.StreamSource;
+import java.io.Closeable;
+import java.io.File;
+import java.io.IOException;
+import java.io.StringReader;
+import java.io.StringWriter;
+import java.nio.charset.StandardCharsets;
+import java.util.List;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+
+/**
+ *
+ * All Solr ZooKeeper interactions should go through this class rather than
+ * ZooKeeper. This class handles synchronous connects and reconnections.
+ *
+ */
+public class SolrZkClient implements Closeable {
+
+ static final String NEWL = System.getProperty("line.separator");
+
+ static final int DEFAULT_CLIENT_CONNECT_TIMEOUT = 30000;
+
+ private static final Logger log = LoggerFactory
+ .getLogger(SolrZkClient.class);
+
+ private ConnectionManager connManager;
+
+ private volatile SolrZooKeeper keeper;
+
+ private ZkCmdExecutor zkCmdExecutor;
+
+ private final ExecutorService zkCallbackExecutor = Executors.newCachedThreadPool(new SolrjNamedThreadFactory("zkCallback"));
+
+ private volatile boolean isClosed = false;
+ private ZkClientConnectionStrategy zkClientConnectionStrategy;
+ private int zkClientTimeout;
+ private ZkACLProvider zkACLProvider;
+ private String zkServerAddress;
+
+ public int getZkClientTimeout() {
+ return zkClientTimeout;
+ }
+
+ // expert: for tests
+ public SolrZkClient() {
+
+ }
+
+ public SolrZkClient(String zkServerAddress, int zkClientTimeout) {
+ this(zkServerAddress, zkClientTimeout, new DefaultConnectionStrategy(), null);
+ }
+
+ public SolrZkClient(String zkServerAddress, int zkClientTimeout, int zkClientConnectTimeout) {
+ this(zkServerAddress, zkClientTimeout, zkClientConnectTimeout, new DefaultConnectionStrategy(), null);
+ }
+
+ public SolrZkClient(String zkServerAddress, int zkClientTimeout, int zkClientConnectTimeout, OnReconnect onReonnect) {
+ this(zkServerAddress, zkClientTimeout, zkClientConnectTimeout, new DefaultConnectionStrategy(), onReonnect);
+ }
+
+ public SolrZkClient(String zkServerAddress, int zkClientTimeout,
+ ZkClientConnectionStrategy strat, final OnReconnect onReconnect) {
+ this(zkServerAddress, zkClientTimeout, DEFAULT_CLIENT_CONNECT_TIMEOUT, strat, onReconnect);
+ }
+
+ public SolrZkClient(String zkServerAddress, int zkClientTimeout, int clientConnectTimeout,
+ ZkClientConnectionStrategy strat, final OnReconnect onReconnect) {
+ this(zkServerAddress, zkClientTimeout, clientConnectTimeout, strat, onReconnect, null, null);
+ }
+
+ public SolrZkClient(String zkServerAddress, int zkClientTimeout, int clientConnectTimeout,
+ ZkClientConnectionStrategy strat, final OnReconnect onReconnect, BeforeReconnect beforeReconnect) {
+ this(zkServerAddress, zkClientTimeout, clientConnectTimeout, strat, onReconnect, beforeReconnect, null);
+ }
+
+ public SolrZkClient(String zkServerAddress, int zkClientTimeout, int clientConnectTimeout,
+ ZkClientConnectionStrategy strat, final OnReconnect onReconnect, BeforeReconnect beforeReconnect, ZkACLProvider zkACLProvider) {
+ this.zkClientConnectionStrategy = strat;
+ this.zkServerAddress = zkServerAddress;
+
+ if (strat == null) {
+ strat = new DefaultConnectionStrategy();
+ }
+
+ if (!strat.hasZkCredentialsToAddAutomatically()) {
+ ZkCredentialsProvider zkCredentialsToAddAutomatically = createZkCredentialsToAddAutomatically();
+ strat.setZkCredentialsToAddAutomatically(zkCredentialsToAddAutomatically);
+ }
+
+ this.zkClientTimeout = zkClientTimeout;
+ // we must retry at least as long as the session timeout
+ zkCmdExecutor = new ZkCmdExecutor(zkClientTimeout);
+ connManager = new ConnectionManager("ZooKeeperConnection Watcher:"
+ + zkServerAddress, this, zkServerAddress, strat, onReconnect, beforeReconnect);
+
+ try {
+ strat.connect(zkServerAddress, zkClientTimeout, wrapWatcher(connManager),
+ new ZkUpdate() {
+ @Override
+ public void update(SolrZooKeeper zooKeeper) {
+ SolrZooKeeper oldKeeper = keeper;
+ keeper = zooKeeper;
+ try {
+ closeKeeper(oldKeeper);
+ } finally {
+ if (isClosed) {
+ // we may have been closed
+ closeKeeper(SolrZkClient.this.keeper);
+ }
+ }
+ }
+ });
+ } catch (Exception e) {
+ connManager.close();
+ if (keeper != null) {
+ try {
+ keeper.close();
+ } catch (InterruptedException e1) {
+ Thread.currentThread().interrupt();
+ }
+ }
+ throw new SolrException(SolrException.ErrorCode.SERVER_ERROR, e);
+ }
+
+ try {
+ connManager.waitForConnected(clientConnectTimeout);
+ } catch (Exception e) {
+ connManager.close();
+ try {
+ keeper.close();
+ } catch (InterruptedException e1) {
+ Thread.currentThread().interrupt();
+ }
+ throw new SolrException(SolrException.ErrorCode.SERVER_ERROR, e);
+ }
+ assert ObjectReleaseTracker.track(this);
+ if (zkACLProvider == null) {
+ this.zkACLProvider = createZkACLProvider();
+ } else {
+ this.zkACLProvider = zkACLProvider;
+ }
+ }
+
+ public ConnectionManager getConnectionManager() {
+ return connManager;
+ }
+
+ public ZkClientConnectionStrategy getZkClientConnectionStrategy() {
+ return zkClientConnectionStrategy;
+ }
+
+ public static final String ZK_CRED_PROVIDER_CLASS_NAME_VM_PARAM_NAME = "zkCredentialsProvider";
+ protected ZkCredentialsProvider createZkCredentialsToAddAutomatically() {
+ String zkCredentialsProviderClassName = System.getProperty(ZK_CRED_PROVIDER_CLASS_NAME_VM_PARAM_NAME);
+ if (!StringUtils.isEmpty(zkCredentialsProviderClassName)) {
+ try {
+ log.info("Using ZkCredentialsProvider: " + zkCredentialsProviderClassName);
+ return (ZkCredentialsProvider)Class.forName(zkCredentialsProviderClassName).getConstructor().newInstance();
+ } catch (Throwable t) {
+ // just ignore - go default
+ log.warn("VM param zkCredentialsProvider does not point to a class implementing ZkCredentialsProvider and with a non-arg constructor", t);
+ }
+ }
+ log.info("Using default ZkCredentialsProvider");
+ return new DefaultZkCredentialsProvider();
+ }
+
+ public static final String ZK_ACL_PROVIDER_CLASS_NAME_VM_PARAM_NAME = "zkACLProvider";
+ protected ZkACLProvider createZkACLProvider() {
+ String zkACLProviderClassName = System.getProperty(ZK_ACL_PROVIDER_CLASS_NAME_VM_PARAM_NAME);
+ if (!StringUtils.isEmpty(zkACLProviderClassName)) {
+ try {
+ log.info("Using ZkACLProvider: " + zkACLProviderClassName);
+ return (ZkACLProvider)Class.forName(zkACLProviderClassName).getConstructor().newInstance();
+ } catch (Throwable t) {
+ // just ignore - go default
+ log.warn("VM param zkACLProvider does not point to a class implementing ZkACLProvider and with a non-arg constructor", t);
+ }
+ }
+ log.info("Using default ZkACLProvider");
+ return new DefaultZkACLProvider();
+ }
+
+ /**
+ * Returns true if client is connected
+ */
+ public boolean isConnected() {
+ return keeper != null && keeper.getState() == ZooKeeper.States.CONNECTED;
+ }
+
+ public void delete(final String path, final int version, boolean retryOnConnLoss)
+ throws InterruptedException, KeeperException {
+ if (retryOnConnLoss) {
+ zkCmdExecutor.retryOperation(new ZkOperation() {
+ @Override
+ public Stat execute() throws KeeperException, InterruptedException {
+ keeper.delete(path, version);
+ return null;
+ }
+ });
+ } else {
+ keeper.delete(path, version);
+ }
+ }
+
+ private Watcher wrapWatcher (final Watcher watcher) {
+ if (watcher == null) return watcher;
+
+ // wrap the watcher so that it doesn't fire off ZK's event queue
+ return new Watcher() {
+ @Override
+ public void process(final WatchedEvent event) {
+ log.debug("Submitting job to respond to event " + event);
+ zkCallbackExecutor.submit(new Runnable () {
+ @Override
+ public void run () {
+ watcher.process(event);
+ }
+ });
+ }
+ };
+ }
+
+ /**
+ * Return the stat of the node of the given path. Return null if no such a
+ * node exists.
+ * <p>
+ * If the watch is non-null and the call is successful (no exception is thrown),
+ * a watch will be left on the node with the given path. The watch will be
+ * triggered by a successful operation that creates/delete the node or sets
+ * the data on the node.
+ *
+ * @param path the node path
+ * @param watcher explicit watcher
+ * @return the stat of the node of the given path; return null if no such a
+ * node exists.
+ * @throws KeeperException If the server signals an error
+ * @throws InterruptedException If the server transaction is interrupted.
+ * @throws IllegalArgumentException if an invalid path is specified
+ */
+ public Stat exists(final String path, final Watcher watcher, boolean retryOnConnLoss)
+ throws KeeperException, InterruptedException {
+ if (retryOnConnLoss) {
+ return zkCmdExecutor.retryOperation(new ZkOperation() {
+ @Override
+ public Stat execute() throws KeeperException, InterruptedException {
+ return keeper.exists(path, wrapWatcher(watcher));
+ }
+ });
+ } else {
+ return keeper.exists(path, wrapWatcher(watcher));
+ }
+ }
+
+ /**
+ * Returns true if path exists
+ */
+ public Boolean exists(final String path, boolean retryOnConnLoss)
+ throws KeeperException, InterruptedException {
+ if (retryOnConnLoss) {
+ return zkCmdExecutor.retryOperation(new ZkOperation() {
+ @Override
+ public Boolean execute() throws KeeperException, InterruptedException {
+ return keeper.exists(path, null) != null;
+ }
+ });
+ } else {
+ return keeper.exists(path, null) != null;
+ }
+ }
+
+ /**
+ * Returns children of the node at the path
+ */
+ public List<String> getChildren(final String path, final Watcher watcher, boolean retryOnConnLoss)
+ throws KeeperException, InterruptedException {
+ if (retryOnConnLoss) {
+ return zkCmdExecutor.retryOperation(new ZkOperation() {
+ @Override
+ public List<String> execute() throws KeeperException, InterruptedException {
+ return keeper.getChildren(path, wrapWatcher(watcher));
+ }
+ });
+ } else {
+ return keeper.getChildren(path, wrapWatcher(watcher));
+ }
+ }
+
+ /**
+ * Returns node's data
+ */
+ public byte[] getData(final String path, final Watcher watcher, final Stat stat, boolean retryOnConnLoss)
+ throws KeeperException, InterruptedException {
+ if (retryOnConnLoss) {
+ return zkCmdExecutor.retryOperation(new ZkOperation() {
+ @Override
+ public byte[] execute() throws KeeperException, InterruptedException {
+ return keeper.getData(path, wrapWatcher(watcher), stat);
+ }
+ });
+ } else {
+ return keeper.getData(path, wrapWatcher(watcher), stat);
+ }
+ }
+
+ /**
+ * Returns node's state
+ */
+ public Stat setData(final String path, final byte data[], final int version, boolean retryOnConnLoss)
+ throws KeeperException, InterruptedException {
+ if (retryOnConnLoss) {
+ return zkCmdExecutor.retryOperation(new ZkOperation() {
+ @Override
+ public Stat execute() throws KeeperException, InterruptedException {
+ return keeper.setData(path, data, version);
+ }
+ });
+ } else {
+ return keeper.setData(path, data, version);
+ }
+ }
+
+ /**
+ * Returns path of created node
+ */
+ public String create(final String path, final byte[] data,
+ final CreateMode createMode, boolean retryOnConnLoss) throws KeeperException,
+ InterruptedException {
+ if (retryOnConnLoss) {
+ return zkCmdExecutor.retryOperation(new ZkOperation() {
+ @Override
+ public String execute() throws KeeperException, InterruptedException {
+ return keeper.create(path, data, zkACLProvider.getACLsToAdd(path),
+ createMode);
+ }
+ });
+ } else {
+ List<ACL> acls = zkACLProvider.getACLsToAdd(path);
+ return keeper.create(path, data, acls, createMode);
+ }
+ }
+
+ /**
+ * Creates the path in ZooKeeper, creating each node as necessary.
+ *
+ * e.g. If <code>path=/solr/group/node</code> and none of the nodes, solr,
+ * group, node exist, each will be created.
+ */
+ public void makePath(String path, boolean retryOnConnLoss) throws KeeperException,
+ InterruptedException {
+ makePath(path, null, CreateMode.PERSISTENT, retryOnConnLoss);
+ }
+
+ public void makePath(String path, boolean failOnExists, boolean retryOnConnLoss) throws KeeperException,
+ InterruptedException {
+ makePath(path, null, CreateMode.PERSISTENT, null, failOnExists, retryOnConnLoss);
+ }
+
+ public void makePath(String path, File file, boolean failOnExists, boolean retryOnConnLoss)
+ throws IOException, KeeperException, InterruptedException {
+ makePath(path, FileUtils.readFileToByteArray(file),
+ CreateMode.PERSISTENT, null, failOnExists, retryOnConnLoss);
+ }
+
+ public void makePath(String path, File file, boolean retryOnConnLoss) throws IOException,
+ KeeperException, InterruptedException {
+ makePath(path, FileUtils.readFileToByteArray(file), retryOnConnLoss);
+ }
+
+ public void makePath(String path, CreateMode createMode, boolean retryOnConnLoss) throws KeeperException,
+ InterruptedException {
+ makePath(path, null, createMode, retryOnConnLoss);
+ }
+
+ /**
+ * Creates the path in ZooKeeper, creating each node as necessary.
+ *
+ * @param data to set on the last zkNode
+ */
+ public void makePath(String path, byte[] data, boolean retryOnConnLoss) throws KeeperException,
+ InterruptedException {
+ makePath(path, data, CreateMode.PERSISTENT, retryOnConnLoss);
+ }
+
+ /**
+ * Creates the path in ZooKeeper, creating each node as necessary.
+ *
+ * e.g. If <code>path=/solr/group/node</code> and none of the nodes, solr,
+ * group, node exist, each will be created.
+ *
+ * @param data to set on the last zkNode
+ */
+ public void makePath(String path, byte[] data, CreateMode createMode, boolean retryOnConnLoss)
+ throws KeeperException, InterruptedException {
+ makePath(path, data, createMode, null, retryOnConnLoss);
+ }
+
+ /**
+ * Creates the path in ZooKeeper, creating each node as necessary.
+ *
+ * e.g. If <code>path=/solr/group/node</code> and none of the nodes, solr,
+ * group, node exist, each will be created.
+ *
+ * @param data to set on the last zkNode
+ */
+ public void makePath(String path, byte[] data, CreateMode createMode,
+ Watcher watcher, boolean retryOnConnLoss) throws KeeperException, InterruptedException {
+ makePath(path, data, createMode, watcher, true, retryOnConnLoss);
+ }
+
+
+
+ /**
+ * Creates the path in ZooKeeper, creating each node as necessary.
+ *
+ * e.g. If <code>path=/solr/group/node</code> and none of the nodes, solr,
+ * group, node exist, each will be created.
+ *
+ * Note: retryOnConnLoss is only respected for the final node - nodes
+ * before that are always retried on connection loss.
+ */
+ public void makePath(String path, byte[] data, CreateMode createMode,
+ Watcher watcher, boolean failOnExists, boolean retryOnConnLoss) throws KeeperException, InterruptedException {
+ if (log.isInfoEnabled()) {
+ log.info("makePath: " + path);
+ }
+ boolean retry = true;
+
+ if (path.startsWith("/")) {
+ path = path.substring(1, path.length());
+ }
+ String[] paths = path.split("/");
+ StringBuilder sbPath = new StringBuilder();
+ for (int i = 0; i < paths.length; i++) {
+ byte[] bytes = null;
+ String pathPiece = paths[i];
+ sbPath.append("/" + pathPiece);
+ final String currentPath = sbPath.toString();
+ Object exists = exists(currentPath, watcher, retryOnConnLoss);
+ if (exists == null || ((i == paths.length -1) && failOnExists)) {
+ CreateMode mode = CreateMode.PERSISTENT;
+ if (i == paths.length - 1) {
+ mode = createMode;
+ bytes = data;
+ if (!retryOnConnLoss) retry = false;
+ }
+ try {
+ if (retry) {
+ final CreateMode finalMode = mode;
+ final byte[] finalBytes = bytes;
+ zkCmdExecutor.retryOperation(new ZkOperation() {
+ @Override
+ public Object execute() throws KeeperException, InterruptedException {
+ keeper.create(currentPath, finalBytes, zkACLProvider.getACLsToAdd(currentPath), finalMode);
+ return null;
+ }
+ });
+ } else {
+ keeper.create(currentPath, bytes, zkACLProvider.getACLsToAdd(currentPath), mode);
+ }
+ } catch (NodeExistsException e) {
+
+ if (!failOnExists) {
+ // TODO: version ? for now, don't worry about race
+ setData(currentPath, data, -1, retryOnConnLoss);
+ // set new watch
+ exists(currentPath, watcher, retryOnConnLoss);
+ return;
+ }
+
+ // ignore unless it's the last node in the path
+ if (i == paths.length - 1) {
+ throw e;
+ }
+ }
+ if(i == paths.length -1) {
+ // set new watch
+ exists(currentPath, watcher, retryOnConnLoss);
+ }
+ } else if (i == paths.length - 1) {
+ // TODO: version ? for now, don't worry about race
+ setData(currentPath, data, -1, retryOnConnLoss);
+ // set new watch
+ exists(currentPath, watcher, retryOnConnLoss);
+ }
+ }
+ }
+
+ public void makePath(String zkPath, CreateMode createMode, Watcher watcher, boolean retryOnConnLoss)
+ throws KeeperException, InterruptedException {
+ makePath(zkPath, null, createMode, watcher, retryOnConnLoss);
+ }
+
+ /**
+ * Write data to ZooKeeper.
+ */
+ public Stat setData(String path, byte[] data, boolean retryOnConnLoss) throws KeeperException,
+ InterruptedException {
+ return setData(path, data, -1, retryOnConnLoss);
+ }
+
+ /**
+ * Write file to ZooKeeper - default system encoding used.
+ *
+ * @param path path to upload file to e.g. /solr/conf/solrconfig.xml
+ * @param file path to file to be uploaded
+ */
+ public Stat setData(String path, File file, boolean retryOnConnLoss) throws IOException,
+ KeeperException, InterruptedException {
+ if (log.isInfoEnabled()) {
+ log.info("Write to ZooKeepeer " + file.getAbsolutePath() + " to " + path);
+ }
+
+ byte[] data = FileUtils.readFileToByteArray(file);
+ return setData(path, data, retryOnConnLoss);
+ }
+
+ /**
+ * Fills string with printout of current ZooKeeper layout.
+ */
+ public void printLayout(String path, int indent, StringBuilder string)
+ throws KeeperException, InterruptedException {
+ byte[] data = getData(path, null, null, true);
+ List<String> children = getChildren(path, null, true);
+ StringBuilder dent = new StringBuilder();
+ for (int i = 0; i < indent; i++) {
+ dent.append(" ");
+ }
+ string.append(dent + path + " (" + children.size() + ")" + NEWL);
+ if (data != null) {
+ String dataString = new String(data, StandardCharsets.UTF_8);
+ if ((!path.endsWith(".txt") && !path.endsWith(".xml")) || path.endsWith(ZkStateReader.CLUSTER_STATE)) {
+ if (path.endsWith(".xml")) {
+ // this is the cluster state in xml format - lets pretty print
+ dataString = prettyPrint(dataString);
+ }
+
+ string.append(dent + "DATA:\n" + dent + " "
+ + dataString.replaceAll("\n", "\n" + dent + " ") + NEWL);
+ } else {
+ string.append(dent + "DATA: ...supressed..." + NEWL);
+ }
+ }
+
+ for (String child : children) {
+ if (!child.equals("quota")) {
+ try {
+ printLayout(path + (path.equals("/") ? "" : "/") + child, indent + 1,
+ string);
+ } catch (NoNodeException e) {
+ // must have gone away
+ }
+ }
+ }
+
+ }
+
+ /**
+ * Prints current ZooKeeper layout to stdout.
+ */
+ public void printLayoutToStdOut() throws KeeperException,
+ InterruptedException {
+ StringBuilder sb = new StringBuilder();
+ printLayout("/", 0, sb);
+ System.out.println(sb.toString());
+ }
+
+ public static String prettyPrint(String input, int indent) {
+ try {
+ Source xmlInput = new StreamSource(new StringReader(input));
+ StringWriter stringWriter = new StringWriter();
+ StreamResult xmlOutput = new StreamResult(stringWriter);
+ TransformerFactory transformerFactory = TransformerFactory.newInstance();
+ transformerFactory.setAttribute("indent-number", indent);
+ Transformer transformer = transformerFactory.newTransformer();
+ transformer.setOutputProperty(OutputKeys.INDENT, "yes");
+ transformer.transform(xmlInput, xmlOutput);
+ return xmlOutput.getWriter().toString();
+ } catch (Exception e) {
+ throw new RuntimeException("Problem pretty printing XML", e);
+ }
+ }
+
+ private static String prettyPrint(String input) {
+ return prettyPrint(input, 2);
+ }
+
+ public void close() {
+ if (isClosed) return; // it's okay if we over close - same as solrcore
+ isClosed = true;
+ try {
+ closeKeeper(keeper);
+ } finally {
+ connManager.close();
+ closeCallbackExecutor();
+ }
+ assert ObjectReleaseTracker.release(this);
+ }
+
+ public boolean isClosed() {
+ return isClosed;
+ }
+
+ /**
+ * Allows package private classes to update volatile ZooKeeper.
+ */
+ void updateKeeper(SolrZooKeeper keeper) throws InterruptedException {
+ SolrZooKeeper oldKeeper = this.keeper;
+ this.keeper = keeper;
+ if (oldKeeper != null) {
+ oldKeeper.close();
+ }
+ // we might have been closed already
+ if (isClosed) this.keeper.close();
+ }
+
+ public SolrZooKeeper getSolrZooKeeper() {
+ return keeper;
+ }
+
+ private void closeKeeper(SolrZooKeeper keeper) {
+ if (keeper != null) {
+ try {
+ keeper.close();
+ } catch (InterruptedException e) {
+ // Restore the interrupted status
+ Thread.currentThread().interrupt();
+ log.error("", e);
+ throw new ZooKeeperException(SolrException.ErrorCode.SERVER_ERROR, "",
+ e);
+ }
+ }
+ }
+
+ private void closeCallbackExecutor() {
+ try {
+ ExecutorUtil.shutdownAndAwaitTermination(zkCallbackExecutor);
+ } catch (Exception e) {
+ SolrException.log(log, e);
+ }
+ }
+
+ // yeah, it's recursive :(
+ public void clean(String path) throws InterruptedException, KeeperException {
+ List<String> children;
+ try {
+ children = getChildren(path, null, true);
+ } catch (NoNodeException r) {
+ return;
+ }
+ for (String string : children) {
+ // we can't clean the built-in zookeeper node
+ if (path.equals("/") && string.equals("zookeeper")) continue;
+ if (path.equals("/")) {
+ clean(path + string);
+ } else {
+ clean(path + "/" + string);
+ }
+ }
+ try {
+ if (!path.equals("/")) {
+ try {
+ delete(path, -1, true);
+ } catch (NotEmptyException e) {
+ clean(path);
+ }
+ }
+ } catch (NoNodeException r) {
+ return;
+ }
+ }
+
+ /**
+ * Validates if zkHost contains a chroot. See http://zookeeper.apache.org/doc/r3.2.2/zookeeperProgrammers.html#ch_zkSessions
+ */
+ public static boolean containsChroot(String zkHost) {
+ return zkHost.contains("/");
+ }
+
+ /**
+ * Check to see if a Throwable is an InterruptedException, and if it is, set the thread interrupt flag
+ * @param e the Throwable
+ * @return the Throwable
+ */
+ public static Throwable checkInterrupted(Throwable e) {
+ if (e instanceof InterruptedException)
+ Thread.interrupted();
+ return e;
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-ranger/blob/40aa090d/ranger_solrj/src/main/java/org/apache/solr/common/cloud/SolrZooKeeper.java
----------------------------------------------------------------------
diff --git a/ranger_solrj/src/main/java/org/apache/solr/common/cloud/SolrZooKeeper.java b/ranger_solrj/src/main/java/org/apache/solr/common/cloud/SolrZooKeeper.java
new file mode 100644
index 0000000..35ad8bf
--- /dev/null
+++ b/ranger_solrj/src/main/java/org/apache/solr/common/cloud/SolrZooKeeper.java
@@ -0,0 +1,103 @@
+package org.apache.solr.common.cloud;
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+import java.io.IOException;
+import java.lang.reflect.Field;
+import java.lang.reflect.InvocationTargetException;
+import java.lang.reflect.Method;
+import java.net.SocketAddress;
+import java.util.Set;
+import java.util.concurrent.CopyOnWriteArraySet;
+
+import org.apache.zookeeper.ClientCnxn;
+import org.apache.zookeeper.Watcher;
+import org.apache.zookeeper.ZooKeeper;
+
+// we use this class to expose nasty stuff for tests
+public class SolrZooKeeper extends ZooKeeper {
+ final Set<Thread> spawnedThreads = new CopyOnWriteArraySet<>();
+
+ // for test debug
+ //static Map<SolrZooKeeper,Exception> clients = new ConcurrentHashMap<SolrZooKeeper,Exception>();
+
+ public SolrZooKeeper(String connectString, int sessionTimeout,
+ Watcher watcher) throws IOException {
+ super(connectString, sessionTimeout, watcher);
+ //clients.put(this, new RuntimeException());
+ }
+
+ public ClientCnxn getConnection() {
+ return cnxn;
+ }
+
+ public SocketAddress getSocketAddress() {
+ return testableLocalSocketAddress();
+ }
+
+ public void closeCnxn() {
+ final Thread t = new Thread() {
+ @Override
+ public void run() {
+ try {
+ final ClientCnxn cnxn = getConnection();
+ synchronized (cnxn) {
+ try {
+ final Field sendThreadFld = cnxn.getClass().getDeclaredField("sendThread");
+ sendThreadFld.setAccessible(true);
+ Object sendThread = sendThreadFld.get(cnxn);
+ if (sendThread != null) {
+ Method method = sendThread.getClass().getDeclaredMethod("testableCloseSocket");
+ method.setAccessible(true);
+ try {
+ method.invoke(sendThread);
+ } catch (InvocationTargetException e) {
+ // is fine
+ }
+ }
+ } catch (Exception e) {
+ throw new RuntimeException("Closing Zookeeper send channel failed.", e);
+ }
+ }
+ } finally {
+ spawnedThreads.remove(this);
+ }
+ }
+ };
+ spawnedThreads.add(t);
+ t.start();
+ }
+
+ @Override
+ public synchronized void close() throws InterruptedException {
+ for (Thread t : spawnedThreads) {
+ if (t.isAlive()) t.interrupt();
+ }
+ super.close();
+ }
+
+// public static void assertCloses() {
+// if (clients.size() > 0) {
+// Iterator<Exception> stacktraces = clients.values().iterator();
+// Exception cause = null;
+// cause = stacktraces.next();
+// throw new RuntimeException("Found a bad one!", cause);
+// }
+// }
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-ranger/blob/40aa090d/ranger_solrj/src/main/java/org/apache/solr/common/cloud/VMParamsAllAndReadonlyDigestZkACLProvider.java
----------------------------------------------------------------------
diff --git a/ranger_solrj/src/main/java/org/apache/solr/common/cloud/VMParamsAllAndReadonlyDigestZkACLProvider.java b/ranger_solrj/src/main/java/org/apache/solr/common/cloud/VMParamsAllAndReadonlyDigestZkACLProvider.java
new file mode 100644
index 0000000..0b9ae1d
--- /dev/null
+++ b/ranger_solrj/src/main/java/org/apache/solr/common/cloud/VMParamsAllAndReadonlyDigestZkACLProvider.java
@@ -0,0 +1,89 @@
+package org.apache.solr.common.cloud;
+
+import java.security.NoSuchAlgorithmException;
+import java.util.ArrayList;
+import java.util.List;
+
+import org.apache.solr.common.StringUtils;
+import org.apache.zookeeper.ZooDefs;
+import org.apache.zookeeper.data.ACL;
+import org.apache.zookeeper.data.Id;
+import org.apache.zookeeper.server.auth.DigestAuthenticationProvider;
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+public class VMParamsAllAndReadonlyDigestZkACLProvider extends DefaultZkACLProvider {
+
+ public static final String DEFAULT_DIGEST_READONLY_USERNAME_VM_PARAM_NAME = "zkDigestReadonlyUsername";
+ public static final String DEFAULT_DIGEST_READONLY_PASSWORD_VM_PARAM_NAME = "zkDigestReadonlyPassword";
+
+ final String zkDigestAllUsernameVMParamName;
+ final String zkDigestAllPasswordVMParamName;
+ final String zkDigestReadonlyUsernameVMParamName;
+ final String zkDigestReadonlyPasswordVMParamName;
+
+ public VMParamsAllAndReadonlyDigestZkACLProvider() {
+ this(
+ VMParamsSingleSetCredentialsDigestZkCredentialsProvider.DEFAULT_DIGEST_USERNAME_VM_PARAM_NAME,
+ VMParamsSingleSetCredentialsDigestZkCredentialsProvider.DEFAULT_DIGEST_PASSWORD_VM_PARAM_NAME,
+ DEFAULT_DIGEST_READONLY_USERNAME_VM_PARAM_NAME,
+ DEFAULT_DIGEST_READONLY_PASSWORD_VM_PARAM_NAME
+ );
+ }
+
+ public VMParamsAllAndReadonlyDigestZkACLProvider(String zkDigestAllUsernameVMParamName, String zkDigestAllPasswordVMParamName,
+ String zkDigestReadonlyUsernameVMParamName, String zkDigestReadonlyPasswordVMParamName) {
+ this.zkDigestAllUsernameVMParamName = zkDigestAllUsernameVMParamName;
+ this.zkDigestAllPasswordVMParamName = zkDigestAllPasswordVMParamName;
+ this.zkDigestReadonlyUsernameVMParamName = zkDigestReadonlyUsernameVMParamName;
+ this.zkDigestReadonlyPasswordVMParamName = zkDigestReadonlyPasswordVMParamName;
+ }
+
+
+ @Override
+ protected List<ACL> createGlobalACLsToAdd() {
+ try {
+ List<ACL> result = new ArrayList<ACL>();
+
+ // Not to have to provide too much credentials and ACL information to the process it is assumed that you want "ALL"-acls
+ // added to the user you are using to connect to ZK (if you are using VMParamsSingleSetCredentialsDigestZkCredentialsProvider)
+ String digestAllUsername = System.getProperty(zkDigestAllUsernameVMParamName);
+ String digestAllPassword = System.getProperty(zkDigestAllPasswordVMParamName);
+ if (!StringUtils.isEmpty(digestAllUsername) && !StringUtils.isEmpty(digestAllPassword)) {
+ result.add(new ACL(ZooDefs.Perms.ALL, new Id("digest", DigestAuthenticationProvider.generateDigest(digestAllUsername + ":" + digestAllPassword))));
+ }
+
+ // Besides that support for adding additional "READONLY"-acls for another user
+ String digestReadonlyUsername = System.getProperty(zkDigestReadonlyUsernameVMParamName);
+ String digestReadonlyPassword = System.getProperty(zkDigestReadonlyPasswordVMParamName);
+ if (!StringUtils.isEmpty(digestReadonlyUsername) && !StringUtils.isEmpty(digestReadonlyPassword)) {
+ result.add(new ACL(ZooDefs.Perms.READ, new Id("digest", DigestAuthenticationProvider.generateDigest(digestReadonlyUsername + ":" + digestReadonlyPassword))));
+ }
+
+ if (result.isEmpty()) {
+ result = super.createGlobalACLsToAdd();
+ }
+
+ return result;
+ } catch (NoSuchAlgorithmException e) {
+ throw new RuntimeException(e);
+ }
+ }
+
+}
+
http://git-wip-us.apache.org/repos/asf/incubator-ranger/blob/40aa090d/ranger_solrj/src/main/java/org/apache/solr/common/cloud/VMParamsSingleSetCredentialsDigestZkCredentialsProvider.java
----------------------------------------------------------------------
diff --git a/ranger_solrj/src/main/java/org/apache/solr/common/cloud/VMParamsSingleSetCredentialsDigestZkCredentialsProvider.java b/ranger_solrj/src/main/java/org/apache/solr/common/cloud/VMParamsSingleSetCredentialsDigestZkCredentialsProvider.java
new file mode 100644
index 0000000..1e575fd
--- /dev/null
+++ b/ranger_solrj/src/main/java/org/apache/solr/common/cloud/VMParamsSingleSetCredentialsDigestZkCredentialsProvider.java
@@ -0,0 +1,60 @@
+package org.apache.solr.common.cloud;
+
+import java.io.UnsupportedEncodingException;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.List;
+
+import org.apache.solr.common.StringUtils;
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+public class VMParamsSingleSetCredentialsDigestZkCredentialsProvider extends DefaultZkCredentialsProvider {
+
+ public static final String DEFAULT_DIGEST_USERNAME_VM_PARAM_NAME = "zkDigestUsername";
+ public static final String DEFAULT_DIGEST_PASSWORD_VM_PARAM_NAME = "zkDigestPassword";
+
+ final String zkDigestUsernameVMParamName;
+ final String zkDigestPasswordVMParamName;
+
+ public VMParamsSingleSetCredentialsDigestZkCredentialsProvider() {
+ this(DEFAULT_DIGEST_USERNAME_VM_PARAM_NAME, DEFAULT_DIGEST_PASSWORD_VM_PARAM_NAME);
+ }
+
+ public VMParamsSingleSetCredentialsDigestZkCredentialsProvider(String zkDigestUsernameVMParamName, String zkDigestPasswordVMParamName) {
+ this.zkDigestUsernameVMParamName = zkDigestUsernameVMParamName;
+ this.zkDigestPasswordVMParamName = zkDigestPasswordVMParamName;
+ }
+
+ @Override
+ protected Collection<ZkCredentials> createCredentials() {
+ List<ZkCredentials> result = new ArrayList<ZkCredentials>();
+ String digestUsername = System.getProperty(zkDigestUsernameVMParamName);
+ String digestPassword = System.getProperty(zkDigestPasswordVMParamName);
+ if (!StringUtils.isEmpty(digestUsername) && !StringUtils.isEmpty(digestPassword)) {
+ try {
+ result.add(new ZkCredentials("digest", (digestUsername + ":" + digestPassword).getBytes("UTF-8")));
+ } catch (UnsupportedEncodingException e) {
+ throw new RuntimeException(e);
+ }
+ }
+ return result;
+ }
+
+}
+
http://git-wip-us.apache.org/repos/asf/incubator-ranger/blob/40aa090d/ranger_solrj/src/main/java/org/apache/solr/common/cloud/ZkACLProvider.java
----------------------------------------------------------------------
diff --git a/ranger_solrj/src/main/java/org/apache/solr/common/cloud/ZkACLProvider.java b/ranger_solrj/src/main/java/org/apache/solr/common/cloud/ZkACLProvider.java
new file mode 100644
index 0000000..03149b3
--- /dev/null
+++ b/ranger_solrj/src/main/java/org/apache/solr/common/cloud/ZkACLProvider.java
@@ -0,0 +1,28 @@
+package org.apache.solr.common.cloud;
+
+import java.util.List;
+
+import org.apache.zookeeper.data.ACL;
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+public interface ZkACLProvider {
+
+ List<ACL> getACLsToAdd(String zNodePath);
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-ranger/blob/40aa090d/ranger_solrj/src/main/java/org/apache/solr/common/cloud/ZkClientConnectionStrategy.java
----------------------------------------------------------------------
diff --git a/ranger_solrj/src/main/java/org/apache/solr/common/cloud/ZkClientConnectionStrategy.java b/ranger_solrj/src/main/java/org/apache/solr/common/cloud/ZkClientConnectionStrategy.java
new file mode 100644
index 0000000..5f4baa5
--- /dev/null
+++ b/ranger_solrj/src/main/java/org/apache/solr/common/cloud/ZkClientConnectionStrategy.java
@@ -0,0 +1,113 @@
+package org.apache.solr.common.cloud;
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with this
+ * work for additional information regarding copyright ownership. The ASF
+ * licenses this file to You under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ */
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.concurrent.TimeoutException;
+
+import org.apache.solr.common.SolrException;
+import org.apache.solr.common.cloud.ZkCredentialsProvider.ZkCredentials;
+import org.apache.zookeeper.Watcher;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ *
+ */
+public abstract class ZkClientConnectionStrategy {
+ private static Logger log = LoggerFactory.getLogger(ZkClientConnectionStrategy.class);
+
+ private volatile ZkCredentialsProvider zkCredentialsToAddAutomatically;
+ private volatile boolean zkCredentialsToAddAutomaticallyUsed;
+
+ private List<DisconnectedListener> disconnectedListeners = new ArrayList<>();
+ private List<ConnectedListener> connectedListeners = new ArrayList<>();
+
+ public abstract void connect(String zkServerAddress, int zkClientTimeout, Watcher watcher, ZkUpdate updater) throws IOException, InterruptedException, TimeoutException;
+ public abstract void reconnect(String serverAddress, int zkClientTimeout, Watcher watcher, ZkUpdate updater) throws IOException, InterruptedException, TimeoutException;
+
+ public ZkClientConnectionStrategy() {
+ zkCredentialsToAddAutomaticallyUsed = false;
+ }
+
+ public synchronized void disconnected() {
+ for (DisconnectedListener listener : disconnectedListeners) {
+ try {
+ listener.disconnected();
+ } catch (Exception e) {
+ SolrException.log(log, "", e);
+ }
+ }
+ }
+
+ public synchronized void connected() {
+ for (ConnectedListener listener : connectedListeners) {
+ try {
+ listener.connected();
+ } catch (Exception e) {
+ SolrException.log(log, "", e);
+ }
+ }
+ }
+
+ public interface DisconnectedListener {
+ public void disconnected();
+ };
+
+ public interface ConnectedListener {
+ public void connected();
+ };
+
+
+ public synchronized void addDisconnectedListener(DisconnectedListener listener) {
+ disconnectedListeners.add(listener);
+ }
+
+ public synchronized void addConnectedListener(ConnectedListener listener) {
+ connectedListeners.add(listener);
+ }
+
+ public static abstract class ZkUpdate {
+ public abstract void update(SolrZooKeeper zooKeeper) throws InterruptedException, TimeoutException, IOException;
+ }
+
+ public void setZkCredentialsToAddAutomatically(ZkCredentialsProvider zkCredentialsToAddAutomatically) {
+ if (zkCredentialsToAddAutomaticallyUsed || (zkCredentialsToAddAutomatically == null))
+ throw new RuntimeException("Cannot change zkCredentialsToAddAutomatically after it has been (connect or reconnect was called) used or to null");
+ this.zkCredentialsToAddAutomatically = zkCredentialsToAddAutomatically;
+ }
+
+ public boolean hasZkCredentialsToAddAutomatically() {
+ return zkCredentialsToAddAutomatically != null;
+ }
+
+ protected SolrZooKeeper createSolrZooKeeper(final String serverAddress, final int zkClientTimeout,
+ final Watcher watcher) throws IOException {
+ SolrZooKeeper result = new SolrZooKeeper(serverAddress, zkClientTimeout, watcher);
+
+ zkCredentialsToAddAutomaticallyUsed = true;
+ for (ZkCredentials zkCredentials : zkCredentialsToAddAutomatically.getCredentials()) {
+ result.addAuthInfo(zkCredentials.getScheme(), zkCredentials.getAuth());
+ }
+
+ return result;
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-ranger/blob/40aa090d/ranger_solrj/src/main/java/org/apache/solr/common/cloud/ZkCmdExecutor.java
----------------------------------------------------------------------
diff --git a/ranger_solrj/src/main/java/org/apache/solr/common/cloud/ZkCmdExecutor.java b/ranger_solrj/src/main/java/org/apache/solr/common/cloud/ZkCmdExecutor.java
new file mode 100644
index 0000000..d77ad06
--- /dev/null
+++ b/ranger_solrj/src/main/java/org/apache/solr/common/cloud/ZkCmdExecutor.java
@@ -0,0 +1,111 @@
+package org.apache.solr.common.cloud;
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+import org.apache.zookeeper.CreateMode;
+import org.apache.zookeeper.KeeperException;
+import org.apache.zookeeper.KeeperException.NodeExistsException;
+
+
+public class ZkCmdExecutor {
+ private long retryDelay = 1500L; // 1 second would match timeout, so 500 ms over for padding
+ private int retryCount;
+ private double timeouts;
+
+ /**
+ * TODO: At this point, this should probably take a SolrZkClient in
+ * its constructor.
+ *
+ * @param timeoutms
+ * the client timeout for the ZooKeeper clients that will be used
+ * with this class.
+ */
+ public ZkCmdExecutor(int timeoutms) {
+ timeouts = timeoutms / 1000.0;
+ this.retryCount = Math.round(0.5f * ((float)Math.sqrt(8.0f * timeouts + 1.0f) - 1.0f)) + 1;
+ }
+
+ public long getRetryDelay() {
+ return retryDelay;
+ }
+
+ public void setRetryDelay(long retryDelay) {
+ this.retryDelay = retryDelay;
+ }
+
+
+ /**
+ * Perform the given operation, retrying if the connection fails
+ */
+ @SuppressWarnings("unchecked")
+ public <T> T retryOperation(ZkOperation operation)
+ throws KeeperException, InterruptedException {
+ KeeperException exception = null;
+ for (int i = 0; i < retryCount; i++) {
+ try {
+ return (T) operation.execute();
+ } catch (KeeperException.ConnectionLossException e) {
+ if (exception == null) {
+ exception = e;
+ }
+ if (Thread.currentThread().isInterrupted()) {
+ Thread.currentThread().interrupt();
+ throw new InterruptedException();
+ }
+ if (Thread.currentThread() instanceof ClosableThread) {
+ if (((ClosableThread) Thread.currentThread()).isClosed()) {
+ throw exception;
+ }
+ }
+ if (i != retryCount -1) {
+ retryDelay(i);
+ }
+ }
+ }
+ throw exception;
+ }
+
+ public void ensureExists(String path, final SolrZkClient zkClient) throws KeeperException, InterruptedException {
+ ensureExists(path, null, CreateMode.PERSISTENT, zkClient);
+ }
+
+ public void ensureExists(final String path, final byte[] data,
+ CreateMode createMode, final SolrZkClient zkClient) throws KeeperException, InterruptedException {
+
+ if (zkClient.exists(path, true)) {
+ return;
+ }
+ try {
+ zkClient.makePath(path, data, true);
+ } catch (NodeExistsException e) {
+ // it's okay if another beats us creating the node
+ }
+
+ }
+
+ /**
+ * Performs a retry delay if this is not the first attempt
+ *
+ * @param attemptCount
+ * the number of the attempts performed so far
+ */
+ protected void retryDelay(int attemptCount) throws InterruptedException {
+ Thread.sleep((attemptCount + 1) * retryDelay);
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-ranger/blob/40aa090d/ranger_solrj/src/main/java/org/apache/solr/common/cloud/ZkConfigManager.java
----------------------------------------------------------------------
diff --git a/ranger_solrj/src/main/java/org/apache/solr/common/cloud/ZkConfigManager.java b/ranger_solrj/src/main/java/org/apache/solr/common/cloud/ZkConfigManager.java
new file mode 100644
index 0000000..a3a8060
--- /dev/null
+++ b/ranger_solrj/src/main/java/org/apache/solr/common/cloud/ZkConfigManager.java
@@ -0,0 +1,145 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.solr.common.cloud;
+
+import org.apache.zookeeper.KeeperException;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.nio.file.FileVisitResult;
+import java.nio.file.Files;
+import java.nio.file.Path;
+import java.nio.file.SimpleFileVisitor;
+import java.nio.file.attribute.BasicFileAttributes;
+import java.util.Collections;
+import java.util.List;
+
+/**
+ * Class that manages named configs in Zookeeper
+ */
+public class ZkConfigManager {
+
+ private static final Logger logger = LoggerFactory.getLogger(ZkConfigManager.class);
+
+ /** ZkNode where named configs are stored */
+ public static final String CONFIGS_ZKNODE = "/configs";
+
+ private final SolrZkClient zkClient;
+
+ /**
+ * Creates a new ZkConfigManager
+ * @param zkClient the {@link SolrZkClient} to use
+ */
+ public ZkConfigManager(SolrZkClient zkClient) {
+ this.zkClient = zkClient;
+ }
+
+ private void uploadToZK(final Path rootPath, final String zkPath) throws IOException {
+
+ if (!Files.exists(rootPath))
+ throw new IOException("Path " + rootPath + " does not exist");
+
+ Files.walkFileTree(rootPath, new SimpleFileVisitor<Path>(){
+ @Override
+ public FileVisitResult visitFile(Path file, BasicFileAttributes attrs) throws IOException {
+ String filename = file.getFileName().toString();
+ if (filename.startsWith("."))
+ return FileVisitResult.CONTINUE;
+ String zkNode = createZkNodeName(zkPath, rootPath, file);
+ try {
+ zkClient.makePath(zkNode, file.toFile(), false, true);
+ } catch (KeeperException | InterruptedException e) {
+ throw new IOException("Error uploading file " + file.toString() + " to zookeeper path " + zkNode,
+ SolrZkClient.checkInterrupted(e));
+ }
+ return FileVisitResult.CONTINUE;
+ }
+
+ @Override
+ public FileVisitResult preVisitDirectory(Path dir, BasicFileAttributes attrs) throws IOException {
+ return (dir.getFileName().toString().startsWith(".")) ? FileVisitResult.SKIP_SUBTREE : FileVisitResult.CONTINUE;
+ }
+ });
+ }
+
+ private static String createZkNodeName(String zkRoot, Path root, Path file) {
+ String relativePath = root.relativize(file).toString();
+ // Windows shenanigans
+ String separator = root.getFileSystem().getSeparator();
+ if ("\\".equals(separator))
+ relativePath = relativePath.replaceAll("\\\\", "/");
+ return zkRoot + "/" + relativePath;
+ }
+
+ private void downloadFromZK(String zkPath, Path dir) throws IOException {
+ try {
+ List<String> files = zkClient.getChildren(zkPath, null, true);
+ Files.createDirectories(dir);
+ for (String file : files) {
+ List<String> children = zkClient.getChildren(zkPath + "/" + file, null, true);
+ if (children.size() == 0) {
+ byte[] data = zkClient.getData(zkPath + "/" + file, null, null, true);
+ Path filename = dir.resolve(file);
+ logger.info("Writing file {}", filename);
+ Files.write(filename, data);
+ } else {
+ downloadFromZK(zkPath + "/" + file, dir.resolve(file));
+ }
+ }
+ }
+ catch (KeeperException | InterruptedException e) {
+ throw new IOException("Error downloading files from zookeeper path " + zkPath + " to " + dir.toString(),
+ SolrZkClient.checkInterrupted(e));
+ }
+ }
+
+ /**
+ * Upload files from a given path to a config in Zookeeper
+ * @param dir {@link java.nio.file.Path} to the files
+ * @param configName the name to give the config
+ * @throws IOException
+ * if an I/O error occurs or the path does not exist
+ */
+ public void uploadConfigDir(Path dir, String configName) throws IOException {
+ uploadToZK(dir, CONFIGS_ZKNODE + "/" + configName);
+ }
+
+ /**
+ * Download a config from Zookeeper and write it to the filesystem
+ * @param configName the config to download
+ * @param dir the {@link Path} to write files under
+ * @throws IOException
+ * if an I/O error occurs or the config does not exist
+ */
+ public void downloadConfigDir(String configName, Path dir) throws IOException {
+ downloadFromZK(CONFIGS_ZKNODE + "/" + configName, dir);
+ }
+
+ public List<String> listConfigs() throws IOException {
+ try {
+ return zkClient.getChildren(ZkConfigManager.CONFIGS_ZKNODE, null, true);
+ }
+ catch (KeeperException.NoNodeException e) {
+ return Collections.emptyList();
+ }
+ catch (KeeperException | InterruptedException e) {
+ throw new IOException("Error listing configs", SolrZkClient.checkInterrupted(e));
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-ranger/blob/40aa090d/ranger_solrj/src/main/java/org/apache/solr/common/cloud/ZkCoreNodeProps.java
----------------------------------------------------------------------
diff --git a/ranger_solrj/src/main/java/org/apache/solr/common/cloud/ZkCoreNodeProps.java b/ranger_solrj/src/main/java/org/apache/solr/common/cloud/ZkCoreNodeProps.java
new file mode 100644
index 0000000..131d330
--- /dev/null
+++ b/ranger_solrj/src/main/java/org/apache/solr/common/cloud/ZkCoreNodeProps.java
@@ -0,0 +1,74 @@
+package org.apache.solr.common.cloud;
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with this
+ * work for additional information regarding copyright ownership. The ASF
+ * licenses this file to You under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ */
+
+public class ZkCoreNodeProps {
+ private ZkNodeProps nodeProps;
+
+ public ZkCoreNodeProps(ZkNodeProps nodeProps) {
+ this.nodeProps = nodeProps;
+ }
+
+ public String getCoreUrl() {
+ return getCoreUrl(nodeProps.getStr(ZkStateReader.BASE_URL_PROP), nodeProps.getStr(ZkStateReader.CORE_NAME_PROP));
+ }
+
+ public String getNodeName() {
+ return nodeProps.getStr(ZkStateReader.NODE_NAME_PROP);
+ }
+
+ public String getState() {
+ return nodeProps.getStr(ZkStateReader.STATE_PROP);
+ }
+
+ public String getBaseUrl() {
+ return nodeProps.getStr(ZkStateReader.BASE_URL_PROP);
+ }
+
+ public String getCoreName() {
+ return nodeProps.getStr(ZkStateReader.CORE_NAME_PROP);
+ }
+
+ public static String getCoreUrl(ZkNodeProps nodeProps) {
+ return getCoreUrl(nodeProps.getStr(ZkStateReader.BASE_URL_PROP), nodeProps.getStr(ZkStateReader.CORE_NAME_PROP));
+ }
+
+ public static String getCoreUrl(String baseUrl, String coreName) {
+ StringBuilder sb = new StringBuilder();
+ sb.append(baseUrl);
+ if (!baseUrl.endsWith("/")) sb.append("/");
+ sb.append(coreName);
+ if (!(sb.substring(sb.length() - 1).equals("/"))) sb.append("/");
+ return sb.toString();
+ }
+
+ @Override
+ public String toString() {
+ return nodeProps.toString();
+ }
+
+ public ZkNodeProps getNodeProps() {
+ return nodeProps;
+ }
+
+ public boolean isLeader() {
+ return nodeProps.containsKey(ZkStateReader.LEADER_PROP);
+ }
+
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-ranger/blob/40aa090d/ranger_solrj/src/main/java/org/apache/solr/common/cloud/ZkCredentialsProvider.java
----------------------------------------------------------------------
diff --git a/ranger_solrj/src/main/java/org/apache/solr/common/cloud/ZkCredentialsProvider.java b/ranger_solrj/src/main/java/org/apache/solr/common/cloud/ZkCredentialsProvider.java
new file mode 100644
index 0000000..b4ab6d8
--- /dev/null
+++ b/ranger_solrj/src/main/java/org/apache/solr/common/cloud/ZkCredentialsProvider.java
@@ -0,0 +1,45 @@
+package org.apache.solr.common.cloud;
+
+import java.util.Collection;
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+public interface ZkCredentialsProvider {
+
+ public class ZkCredentials {
+ String scheme;
+ byte[] auth;
+
+ public ZkCredentials(String scheme, byte[] auth) {
+ super();
+ this.scheme = scheme;
+ this.auth = auth;
+ }
+
+ String getScheme() {
+ return scheme;
+ }
+
+ byte[] getAuth() {
+ return auth;
+ }
+ }
+
+ Collection<ZkCredentials> getCredentials();
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-ranger/blob/40aa090d/ranger_solrj/src/main/java/org/apache/solr/common/cloud/ZkNodeProps.java
----------------------------------------------------------------------
diff --git a/ranger_solrj/src/main/java/org/apache/solr/common/cloud/ZkNodeProps.java b/ranger_solrj/src/main/java/org/apache/solr/common/cloud/ZkNodeProps.java
new file mode 100644
index 0000000..5ddfa24
--- /dev/null
+++ b/ranger_solrj/src/main/java/org/apache/solr/common/cloud/ZkNodeProps.java
@@ -0,0 +1,154 @@
+package org.apache.solr.common.cloud;
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+import org.noggit.JSONUtil;
+import org.noggit.JSONWriter;
+
+import java.util.HashMap;
+import java.util.LinkedHashMap;
+import java.util.Map;
+import java.util.Set;
+
+/**
+ * ZkNodeProps contains generic immutable properties.
+ */
+public class ZkNodeProps implements JSONWriter.Writable {
+
+ protected final Map<String,Object> propMap;
+
+ /**
+ * Construct ZKNodeProps from map.
+ */
+ public ZkNodeProps(Map<String,Object> propMap) {
+ this.propMap = propMap;
+ // TODO: store an unmodifiable map, but in a way that guarantees not to wrap more than once.
+ // Always wrapping introduces a memory leak.
+ }
+
+
+ /**
+ * Constructor that populates the from array of Strings in form key1, value1,
+ * key2, value2, ..., keyN, valueN
+ */
+ public ZkNodeProps(String... keyVals) {
+ this( makeMap((Object[])keyVals) );
+ }
+
+ public static ZkNodeProps fromKeyVals(Object... keyVals) {
+ return new ZkNodeProps( makeMap(keyVals) );
+ }
+
+ public static Map<String,Object> makeMap(Object... keyVals) {
+ if ((keyVals.length & 0x01) != 0) {
+ throw new IllegalArgumentException("arguments should be key,value");
+ }
+ Map<String,Object> propMap = new LinkedHashMap<>(keyVals.length>>1);
+ for (int i = 0; i < keyVals.length; i+=2) {
+ propMap.put(keyVals[i].toString(), keyVals[i+1]);
+ }
+ return propMap;
+ }
+
+
+ /**
+ * Get property keys.
+ */
+ public Set<String> keySet() {
+ return propMap.keySet();
+ }
+
+ /**
+ * Get all properties as map.
+ */
+ public Map<String, Object> getProperties() {
+ return propMap;
+ }
+
+ /** Returns a shallow writable copy of the properties */
+ public Map<String,Object> shallowCopy() {
+ return new LinkedHashMap<>(propMap);
+ }
+
+ /**
+ * Create Replica from json string that is typically stored in zookeeper.
+ */
+ public static ZkNodeProps load(byte[] bytes) {
+ Map<String, Object> props = (Map<String, Object>) ZkStateReader.fromJSON(bytes);
+ return new ZkNodeProps(props);
+ }
+
+ @Override
+ public void write(JSONWriter jsonWriter) {
+ jsonWriter.write(propMap);
+ }
+
+ /**
+ * Get a string property value.
+ */
+ public String getStr(String key) {
+ Object o = propMap.get(key);
+ return o == null ? null : o.toString();
+ }
+
+ /**
+ * Get a string property value.
+ */
+ public Integer getInt(String key, Integer def) {
+ Object o = propMap.get(key);
+ return o == null ? def : Integer.valueOf(o.toString());
+ }
+
+ /**
+ * Get a string property value.
+ */
+ public String getStr(String key,String def) {
+ Object o = propMap.get(key);
+ return o == null ? def : o.toString();
+ }
+
+ public Object get(String key) {
+ return propMap.get(key);
+ }
+
+ @Override
+ public String toString() {
+ return JSONUtil.toJSON(this);
+ /***
+ StringBuilder sb = new StringBuilder();
+ Set<Entry<String,Object>> entries = propMap.entrySet();
+ for(Entry<String,Object> entry : entries) {
+ sb.append(entry.getKey() + "=" + entry.getValue() + "\n");
+ }
+ return sb.toString();
+ ***/
+ }
+
+ /**
+ * Check if property key exists.
+ */
+ public boolean containsKey(String key) {
+ return propMap.containsKey(key);
+ }
+
+ public boolean getBool(String key, boolean b) {
+ Object o = propMap.get(key);
+ if(o==null) return b;
+ return Boolean.parseBoolean(o.toString());
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-ranger/blob/40aa090d/ranger_solrj/src/main/java/org/apache/solr/common/cloud/ZkOperation.java
----------------------------------------------------------------------
diff --git a/ranger_solrj/src/main/java/org/apache/solr/common/cloud/ZkOperation.java b/ranger_solrj/src/main/java/org/apache/solr/common/cloud/ZkOperation.java
new file mode 100644
index 0000000..b4da540
--- /dev/null
+++ b/ranger_solrj/src/main/java/org/apache/solr/common/cloud/ZkOperation.java
@@ -0,0 +1,37 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.solr.common.cloud;
+
+import java.io.IOException;
+
+import org.apache.zookeeper.KeeperException;
+
+/**
+ * A callback object which can be used for implementing retry-able operations.
+ *
+ */
+public abstract class ZkOperation {
+
+ /**
+ * Performs the operation - which may be involved multiple times if the connection
+ * to ZooKeeper closes during this operation
+ *
+ * @return the result of the operation or null
+ */
+ public abstract Object execute() throws KeeperException, InterruptedException;
+}