You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@phoenix.apache.org by td...@apache.org on 2015/06/24 22:24:18 UTC

[22/49] phoenix git commit: PHOENIX-2005 Connection utilities omit zk client port, parent znode

PHOENIX-2005 Connection utilities omit zk client port, parent znode


Project: http://git-wip-us.apache.org/repos/asf/phoenix/repo
Commit: http://git-wip-us.apache.org/repos/asf/phoenix/commit/afb0120e
Tree: http://git-wip-us.apache.org/repos/asf/phoenix/tree/afb0120e
Diff: http://git-wip-us.apache.org/repos/asf/phoenix/diff/afb0120e

Branch: refs/heads/json
Commit: afb0120e079502d926c5f37de4e28d3865e29089
Parents: a28c1d3
Author: Nick Dimiduk <nd...@apache.org>
Authored: Tue May 26 11:11:48 2015 -0700
Committer: Nick Dimiduk <nd...@apache.org>
Committed: Tue May 26 11:12:28 2015 -0700

----------------------------------------------------------------------
 .../phoenix/jdbc/PhoenixEmbeddedDriver.java     | 28 ++++--
 .../phoenix/mapreduce/CsvBulkLoadTool.java      | 93 ++++++++++----------
 .../phoenix/mapreduce/CsvToKeyValueMapper.java  | 26 +-----
 .../query/ConnectionQueryServicesImpl.java      |  4 +-
 .../java/org/apache/phoenix/util/QueryUtil.java | 45 ++++++++--
 .../phoenix/jdbc/PhoenixEmbeddedDriverTest.java | 14 ++-
 .../phoenix/mapreduce/CsvBulkLoadToolTest.java  | 11 ---
 .../mapreduce/CsvToKeyValueMapperTest.java      | 15 ----
 .../org/apache/phoenix/util/QueryUtilTest.java  | 33 ++++---
 9 files changed, 139 insertions(+), 130 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/phoenix/blob/afb0120e/phoenix-core/src/main/java/org/apache/phoenix/jdbc/PhoenixEmbeddedDriver.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/jdbc/PhoenixEmbeddedDriver.java b/phoenix-core/src/main/java/org/apache/phoenix/jdbc/PhoenixEmbeddedDriver.java
index 9e95667..2451603 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/jdbc/PhoenixEmbeddedDriver.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/jdbc/PhoenixEmbeddedDriver.java
@@ -31,6 +31,7 @@ import java.util.logging.Logger;
 
 import javax.annotation.concurrent.Immutable;
 
+import org.apache.hadoop.hbase.HConstants;
 import org.apache.phoenix.coprocessor.MetaDataProtocol;
 import org.apache.phoenix.exception.SQLExceptionCode;
 import org.apache.phoenix.exception.SQLExceptionInfo;
@@ -174,10 +175,10 @@ public abstract class PhoenixEmbeddedDriver implements Driver, org.apache.phoeni
     }
     
     /**
-     * 
+     *
      * Class to encapsulate connection info for HBase
      *
-     * 
+     *
      * @since 0.1.1
      */
     public static class ConnectionInfo {
@@ -204,12 +205,18 @@ public abstract class PhoenixEmbeddedDriver implements Driver, org.apache.phoeni
             return false;
         }
         
-        protected static ConnectionInfo create(String url) throws SQLException {
-            StringTokenizer tokenizer = new StringTokenizer(url == null ? "" : url.substring(PhoenixRuntime.JDBC_PROTOCOL.length()),DELIMITERS, true);
+        public static ConnectionInfo create(String url) throws SQLException {
+            url = url == null ? "" : url;
+            url = url.startsWith(PhoenixRuntime.JDBC_PROTOCOL)
+                    ? url.substring(PhoenixRuntime.JDBC_PROTOCOL.length())
+                    : url;
+            StringTokenizer tokenizer = new StringTokenizer(url, DELIMITERS, true);
             int nTokens = 0;
             String[] tokens = new String[5];
             String token = null;
-            while (tokenizer.hasMoreTokens() && !(token=tokenizer.nextToken()).equals(TERMINATOR) && tokenizer.hasMoreTokens() && nTokens < tokens.length) {
+            while (tokenizer.hasMoreTokens() &&
+                    !(token=tokenizer.nextToken()).equals(TERMINATOR) &&
+                    tokenizer.hasMoreTokens() && nTokens < tokens.length) {
                 token = tokenizer.nextToken();
                 // This would mean we have an empty string for a token which is illegal
                 if (DELIMITERS.contains(token)) {
@@ -316,8 +323,7 @@ public abstract class PhoenixEmbeddedDriver implements Driver, org.apache.phoeni
         private final String principal;
         private final String keytab;
         
-        // used for testing
-        ConnectionInfo(String zookeeperQuorum, Integer port, String rootNode, String principal, String keytab) {
+        public ConnectionInfo(String zookeeperQuorum, Integer port, String rootNode, String principal, String keytab) {
             this.zookeeperQuorum = zookeeperQuorum;
             this.port = port;
             this.rootNode = rootNode;
@@ -326,8 +332,7 @@ public abstract class PhoenixEmbeddedDriver implements Driver, org.apache.phoeni
             this.keytab = keytab;
         }
         
-        // used for testing
-        ConnectionInfo(String zookeeperQuorum, Integer port, String rootNode) {
+        public ConnectionInfo(String zookeeperQuorum, Integer port, String rootNode) {
         	this(zookeeperQuorum, port, rootNode, null, null);
         }
 
@@ -417,6 +422,11 @@ public abstract class PhoenixEmbeddedDriver implements Driver, org.apache.phoeni
 					+ (principal == null ? "" : ":" + principal)
 					+ (keytab == null ? "" : ":" + keytab);
 		}
+
+        public String toUrl() {
+            return PhoenixRuntime.JDBC_PROTOCOL + PhoenixRuntime.JDBC_PROTOCOL_SEPARATOR
+                    + toString();
+        }
     }
 
     public static boolean isTestUrl(String url) {

http://git-wip-us.apache.org/repos/asf/phoenix/blob/afb0120e/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/CsvBulkLoadTool.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/CsvBulkLoadTool.java b/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/CsvBulkLoadTool.java
index a5a8aa1..9e27bac 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/CsvBulkLoadTool.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/CsvBulkLoadTool.java
@@ -18,11 +18,11 @@
 package org.apache.phoenix.mapreduce;
 
 import java.sql.Connection;
-import java.sql.DriverManager;
 import java.sql.ResultSet;
 import java.sql.SQLException;
 import java.util.ArrayList;
 import java.util.List;
+import java.util.Map;
 import java.util.UUID;
 import java.util.concurrent.Callable;
 import java.util.concurrent.ExecutionException;
@@ -41,7 +41,6 @@ import org.apache.hadoop.conf.Configured;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.hbase.HBaseConfiguration;
-import org.apache.hadoop.hbase.HConstants;
 import org.apache.hadoop.hbase.KeyValue;
 import org.apache.hadoop.hbase.client.HTable;
 import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
@@ -56,8 +55,8 @@ import org.apache.hadoop.util.Tool;
 import org.apache.hadoop.util.ToolRunner;
 import org.apache.phoenix.jdbc.PhoenixConnection;
 import org.apache.phoenix.jdbc.PhoenixDatabaseMetaData;
+import org.apache.phoenix.jdbc.PhoenixDriver;
 import org.apache.phoenix.job.JobManager;
-import org.apache.phoenix.mapreduce.util.PhoenixConfigurationUtil;
 import org.apache.phoenix.query.QueryConstants;
 import org.apache.phoenix.query.QueryServices;
 import org.apache.phoenix.query.QueryServicesOptions;
@@ -67,6 +66,7 @@ import org.apache.phoenix.util.CSVCommonsLoader;
 import org.apache.phoenix.util.ColumnInfo;
 import org.apache.phoenix.util.MetaDataUtil;
 import org.apache.phoenix.util.PhoenixRuntime;
+import org.apache.phoenix.util.QueryUtil;
 import org.apache.phoenix.util.SchemaUtil;
 import org.apache.phoenix.util.StringUtil;
 import org.slf4j.Logger;
@@ -84,7 +84,7 @@ public class CsvBulkLoadTool extends Configured implements Tool {
 
     private static final Logger LOG = LoggerFactory.getLogger(CsvBulkLoadTool.class);
 
-    static final Option ZK_QUORUM_OPT = new Option("z", "zookeeper", true, "Zookeeper quorum to connect to (optional)");
+    static final Option ZK_QUORUM_OPT = new Option("z", "zookeeper", true, "Supply zookeeper connection details (optional)");
     static final Option INPUT_PATH_OPT = new Option("i", "input", true, "Input CSV path (mandatory)");
     static final Option OUTPUT_PATH_OPT = new Option("o", "output", true, "Output path for temporary HFiles (optional)");
     static final Option SCHEMA_NAME_OPT = new Option("s", "schema", true, "Phoenix schema name (optional)");
@@ -184,35 +184,48 @@ public class CsvBulkLoadTool extends Configured implements Tool {
         } catch (IllegalStateException e) {
             printHelpAndExit(e.getMessage(), getOptions());
         }
-        Class.forName(DriverManager.class.getName());
-        Connection conn = DriverManager.getConnection(
-                getJdbcUrl(cmdLine.getOptionValue(ZK_QUORUM_OPT.getOpt())));
-        
-        return loadData(conf, cmdLine, conn);
+        return loadData(conf, cmdLine);
     }
 
-	private int loadData(Configuration conf, CommandLine cmdLine,
-			Connection conn) throws SQLException, InterruptedException,
-			ExecutionException {
-		    String tableName = cmdLine.getOptionValue(TABLE_NAME_OPT.getOpt());
+	private int loadData(Configuration conf, CommandLine cmdLine) throws SQLException,
+            InterruptedException, ExecutionException, ClassNotFoundException {
+        String tableName = cmdLine.getOptionValue(TABLE_NAME_OPT.getOpt());
         String schemaName = cmdLine.getOptionValue(SCHEMA_NAME_OPT.getOpt());
         String indexTableName = cmdLine.getOptionValue(INDEX_TABLE_NAME_OPT.getOpt());
         String qualifiedTableName = getQualifiedTableName(schemaName, tableName);
-        String qualifedIndexTableName = null;
-        if(indexTableName != null){
-        	qualifedIndexTableName = getQualifiedTableName(schemaName, indexTableName);
+        String qualifiedIndexTableName = null;
+        if (indexTableName != null){
+        	qualifiedIndexTableName = getQualifiedTableName(schemaName, indexTableName);
+        }
+
+        if (cmdLine.hasOption(ZK_QUORUM_OPT.getOpt())) {
+            // ZK_QUORUM_OPT is optional, but if it's there, use it for both the conn and the job.
+            String zkQuorum = cmdLine.getOptionValue(ZK_QUORUM_OPT.getOpt());
+            PhoenixDriver.ConnectionInfo info = PhoenixDriver.ConnectionInfo.create(zkQuorum);
+            LOG.info("Configuring HBase connection to {}", info);
+            for (Map.Entry<String,String> entry : info.asProps()) {
+                if (LOG.isDebugEnabled()) {
+                    LOG.debug("Setting {} = {}", entry.getKey(), entry.getValue());
+                }
+                conf.set(entry.getKey(), entry.getValue());
+            }
+        }
+
+        final Connection conn = QueryUtil.getConnection(conf);
+        if (LOG.isDebugEnabled()) {
+            LOG.debug("Reading columns from {} :: {}", ((PhoenixConnection) conn).getURL(),
+                    qualifiedTableName);
         }
         List<ColumnInfo> importColumns = buildImportColumns(conn, cmdLine, qualifiedTableName);
         configureOptions(cmdLine, importColumns, conf);
-
         try {
             validateTable(conn, schemaName, tableName);
         } finally {
             conn.close();
         }
 
-        Path inputPath = new Path(cmdLine.getOptionValue(INPUT_PATH_OPT.getOpt()));
-        Path outputPath = null;
+        final Path inputPath = new Path(cmdLine.getOptionValue(INPUT_PATH_OPT.getOpt()));
+        final Path outputPath;
         if (cmdLine.hasOption(OUTPUT_PATH_OPT.getOpt())) {
             outputPath = new Path(cmdLine.getOptionValue(OUTPUT_PATH_OPT.getOpt()));
         } else {
@@ -221,20 +234,21 @@ public class CsvBulkLoadTool extends Configured implements Tool {
         
         List<TargetTableRef> tablesToBeLoaded = new ArrayList<TargetTableRef>();
         tablesToBeLoaded.add(new TargetTableRef(qualifiedTableName));
+        // using conn after it's been closed... o.O
         tablesToBeLoaded.addAll(getIndexTables(conn, schemaName, qualifiedTableName));
         
         // When loading a single index table, check index table name is correct
-        if(qualifedIndexTableName != null){
+        if (qualifiedIndexTableName != null){
             TargetTableRef targetIndexRef = null;
         	for (TargetTableRef tmpTable : tablesToBeLoaded){
-        		if(tmpTable.getLogicalName().compareToIgnoreCase(qualifedIndexTableName) == 0) {
+        		if (tmpTable.getLogicalName().compareToIgnoreCase(qualifiedIndexTableName) == 0) {
                     targetIndexRef = tmpTable;
         			break;
         		}
         	}
-        	if(targetIndexRef == null){
+        	if (targetIndexRef == null){
                 throw new IllegalStateException("CSV Bulk Loader error: index table " +
-                    qualifedIndexTableName + " doesn't exist");
+                    qualifiedIndexTableName + " doesn't exist");
         	}
         	tablesToBeLoaded.clear();
         	tablesToBeLoaded.add(targetIndexRef);
@@ -247,13 +261,14 @@ public class CsvBulkLoadTool extends Configured implements Tool {
                 .getProps()
                 .getBoolean(QueryServices.METRICS_ENABLED,
                         QueryServicesOptions.DEFAULT_IS_METRICS_ENABLED);
-        ExecutorService executor =  JobManager.createThreadPoolExec(Integer.MAX_VALUE, 5, 20, useInstrumentedPool);
+        ExecutorService executor =
+                JobManager.createThreadPoolExec(Integer.MAX_VALUE, 5, 20, useInstrumentedPool);
         try{
 	        for (TargetTableRef table : tablesToBeLoaded) {
 	        	Path tablePath = new Path(outputPath, table.getPhysicalName());
 	        	Configuration jobConf = new Configuration(conf);
 	        	jobConf.set(CsvToKeyValueMapper.TABLE_NAME_CONFKEY, qualifiedTableName);
-	        	if(qualifiedTableName.compareToIgnoreCase(table.getLogicalName()) != 0) {
+	        	if (qualifiedTableName.compareToIgnoreCase(table.getLogicalName()) != 0) {
                     jobConf.set(CsvToKeyValueMapper.INDEX_TABLE_NAME_CONFKEY, table.getPhysicalName());
 	        	}
 	        	TableLoader tableLoader = new TableLoader(
@@ -274,14 +289,6 @@ public class CsvBulkLoadTool extends Configured implements Tool {
 		return retCode;
 	}
 
-    String getJdbcUrl(String zkQuorum) {
-        if (zkQuorum == null) {
-            LOG.warn("Defaulting to localhost for ZooKeeper quorum");
-            zkQuorum = "localhost:2181";
-        }
-        return PhoenixRuntime.JDBC_PROTOCOL + PhoenixRuntime.JDBC_PROTOCOL_SEPARATOR + zkQuorum;
-    }
-
     /**
      * Build up the list of columns to be imported. The list is taken from the command line if
      * present, otherwise it is taken from the table description.
@@ -327,9 +334,11 @@ public class CsvBulkLoadTool extends Configured implements Tool {
      * @param importColumns descriptors of columns to be imported
      * @param conf job configuration
      */
-    @VisibleForTesting
-    static void configureOptions(CommandLine cmdLine, List<ColumnInfo> importColumns,
-            Configuration conf) {
+    private static void configureOptions(CommandLine cmdLine, List<ColumnInfo> importColumns,
+            Configuration conf) throws SQLException {
+
+        // we don't parse ZK_QUORUM_OPT here because we need it in order to
+        // create the connection we need to build importColumns.
 
         char delimiterChar = ',';
         if (cmdLine.hasOption(DELIMITER_OPT.getOpt())) {
@@ -358,12 +367,6 @@ public class CsvBulkLoadTool extends Configured implements Tool {
             escapeChar = escapeString.charAt(0);
         }
 
-        if (cmdLine.hasOption(ZK_QUORUM_OPT.getOpt())) {
-            String zkQuorum = cmdLine.getOptionValue(ZK_QUORUM_OPT.getOpt());
-            LOG.info("Configuring ZK quorum to {}", zkQuorum);
-            conf.set(HConstants.ZOOKEEPER_QUORUM, zkQuorum);
-        }
-
         CsvBulkImportUtil.initCsvImportJob(
                 conf,
                 getQualifiedTableName(
@@ -493,7 +496,7 @@ public class CsvBulkLoadTool extends Configured implements Tool {
 	            job.setMapOutputKeyClass(ImmutableBytesWritable.class);
 	            job.setMapOutputValueClass(KeyValue.class);
 
-	            // initialize credentials to possibily run in a secure env
+	            // initialize credentials to possibly run in a secure env
 	            TableMapReduceUtil.initCredentials(job);
 
                 HTable htable = new HTable(conf, tableName);
@@ -522,8 +525,8 @@ public class CsvBulkLoadTool extends Configured implements Tool {
 	            }
 	            
 	            return true;
-            } catch(Exception ex) {
-            	LOG.error("Import job on table=" + tableName + " failed due to exception:" + ex);
+            } catch (Exception ex) {
+            	LOG.error("Import job on table=" + tableName + " failed due to exception.", ex);
             	return false;
             }
         }

http://git-wip-us.apache.org/repos/asf/phoenix/blob/afb0120e/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/CsvToKeyValueMapper.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/CsvToKeyValueMapper.java b/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/CsvToKeyValueMapper.java
index 90cb854..c0328bd 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/CsvToKeyValueMapper.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/CsvToKeyValueMapper.java
@@ -19,7 +19,6 @@ package org.apache.phoenix.mapreduce;
 
 import java.io.IOException;
 import java.io.StringReader;
-import java.sql.DriverManager;
 import java.sql.SQLException;
 import java.util.Iterator;
 import java.util.List;
@@ -32,7 +31,6 @@ import org.apache.commons.csv.CSVFormat;
 import org.apache.commons.csv.CSVParser;
 import org.apache.commons.csv.CSVRecord;
 import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.hbase.HConstants;
 import org.apache.hadoop.hbase.KeyValue;
 import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
 import org.apache.hadoop.hbase.util.Bytes;
@@ -41,11 +39,11 @@ import org.apache.hadoop.io.LongWritable;
 import org.apache.hadoop.io.Text;
 import org.apache.hadoop.mapreduce.Mapper;
 import org.apache.phoenix.jdbc.PhoenixConnection;
-import org.apache.phoenix.jdbc.PhoenixDriver;
 import org.apache.phoenix.mapreduce.util.PhoenixConfigurationUtil;
 import org.apache.phoenix.util.CSVCommonsLoader;
 import org.apache.phoenix.util.ColumnInfo;
 import org.apache.phoenix.util.PhoenixRuntime;
+import org.apache.phoenix.util.QueryUtil;
 import org.apache.phoenix.util.csv.CsvUpsertExecutor;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -108,7 +106,6 @@ public class CsvToKeyValueMapper extends Mapper<LongWritable,Text,ImmutableBytes
     protected void setup(Context context) throws IOException, InterruptedException {
 
         Configuration conf = context.getConfiguration();
-        String jdbcUrl = getJdbcUrl(conf);
 
         // pass client configuration into driver
         Properties clientInfos = new Properties();
@@ -118,12 +115,9 @@ public class CsvToKeyValueMapper extends Mapper<LongWritable,Text,ImmutableBytes
             clientInfos.setProperty(entry.getKey(), entry.getValue());
         }
         
-        // This statement also ensures that the driver class is loaded
-        LOG.info("Connection with driver {} with url {}", PhoenixDriver.class.getName(), jdbcUrl);
-
         try {
-            conn = (PhoenixConnection) DriverManager.getConnection(jdbcUrl, clientInfos);
-        } catch (SQLException e) {
+            conn = (PhoenixConnection) QueryUtil.getConnection(clientInfos, conf);
+        } catch (SQLException | ClassNotFoundException e) {
             throw new RuntimeException(e);
         }
 
@@ -189,20 +183,6 @@ public class CsvToKeyValueMapper extends Mapper<LongWritable,Text,ImmutableBytes
         }
     }
 
-    /**
-     * Build up the JDBC URL for connecting to Phoenix.
-     *
-     * @return the full JDBC URL for a Phoenix connection
-     */
-    @VisibleForTesting
-    static String getJdbcUrl(Configuration conf) {
-        String zkQuorum = conf.get(HConstants.ZOOKEEPER_QUORUM);
-        if (zkQuorum == null) {
-            throw new IllegalStateException(HConstants.ZOOKEEPER_QUORUM + " is not configured");
-        }
-        return PhoenixRuntime.JDBC_PROTOCOL + PhoenixRuntime.JDBC_PROTOCOL_SEPARATOR + zkQuorum;
-    }
-
     @VisibleForTesting
     CsvUpsertExecutor buildUpsertExecutor(Configuration conf) {
         String tableName = conf.get(TABLE_NAME_CONFKEY);

http://git-wip-us.apache.org/repos/asf/phoenix/blob/afb0120e/phoenix-core/src/main/java/org/apache/phoenix/query/ConnectionQueryServicesImpl.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/query/ConnectionQueryServicesImpl.java b/phoenix-core/src/main/java/org/apache/phoenix/query/ConnectionQueryServicesImpl.java
index c86ea48..d6d5df9 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/query/ConnectionQueryServicesImpl.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/query/ConnectionQueryServicesImpl.java
@@ -807,7 +807,9 @@ public class ConnectionQueryServicesImpl extends DelegateQueryServices implement
         boolean isMetaTable = SchemaUtil.isMetaTable(tableName);
         boolean tableExist = true;
         try {
-            logger.info("Found quorum: " + ZKConfig.getZKQuorumServersString(config));
+            final String quorum = ZKConfig.getZKQuorumServersString(config);
+            final String znode = config.get(HConstants.ZOOKEEPER_ZNODE_PARENT);
+            logger.debug("Found quorum: " + quorum + ":" + znode);
             admin = new HBaseAdmin(config);
             try {
                 existingDesc = admin.getTableDescriptor(tableName);

http://git-wip-us.apache.org/repos/asf/phoenix/blob/afb0120e/phoenix-core/src/main/java/org/apache/phoenix/util/QueryUtil.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/util/QueryUtil.java b/phoenix-core/src/main/java/org/apache/phoenix/util/QueryUtil.java
index d63a68f..bd38983 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/util/QueryUtil.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/util/QueryUtil.java
@@ -34,11 +34,13 @@ import javax.annotation.Nullable;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.HConstants;
 import org.apache.hadoop.hbase.filter.CompareFilter.CompareOp;
 import org.apache.hadoop.hbase.util.Addressing;
 import org.apache.hadoop.hbase.zookeeper.ZKConfig;
 import org.apache.phoenix.iterate.ResultIterator;
 import org.apache.phoenix.jdbc.PhoenixDriver;
+import org.apache.phoenix.jdbc.PhoenixEmbeddedDriver;
 import org.apache.phoenix.parse.HintNode;
 import org.apache.phoenix.parse.HintNode.Hint;
 import org.apache.phoenix.parse.WildcardParseNode;
@@ -49,8 +51,6 @@ import com.google.common.base.Joiner;
 import com.google.common.base.Preconditions;
 import com.google.common.collect.Iterables;
 import com.google.common.collect.Lists;
-import java.util.Iterator;
-import java.util.Map;
 
 public final class QueryUtil {
 
@@ -129,7 +129,7 @@ public final class QueryUtil {
      *
      * @param tableName name of the table for which the upsert statement is to be created
      * @param columns list of columns to be included in the upsert statement
-     * @param Hint hint to be added to the UPSERT statement.
+     * @param hint hint to be added to the UPSERT statement.
      * @return the created {@code UPSERT} statement
      */
     public static String constructUpsertStatement(String tableName, List<String> columns, Hint hint) {
@@ -222,13 +222,36 @@ public final class QueryUtil {
         return query.toString();
     }
 
-    public static String getUrl(String server) {
-        return PhoenixRuntime.JDBC_PROTOCOL + PhoenixRuntime.JDBC_PROTOCOL_SEPARATOR + server;
+    /**
+     * Create the Phoenix JDBC connection URL from the provided cluster connection details.
+     */
+    public static String getUrl(String zkQuorum) {
+        return getUrlInternal(zkQuorum, null, null);
+    }
+
+    /**
+     * Create the Phoenix JDBC connection URL from the provided cluster connection details.
+     */
+    public static String getUrl(String zkQuorum, int clientPort) {
+        return getUrlInternal(zkQuorum, clientPort, null);
+    }
+
+    /**
+     * Create the Phoenix JDBC connection URL from the provided cluster connection details.
+     */
+    public static String getUrl(String zkQuorum, String znodeParent) {
+        return getUrlInternal(zkQuorum, null, znodeParent);
+    }
+
+    /**
+     * Create the Phoenix JDBC connection URL from the provided cluster connection details.
+     */
+    public static String getUrl(String zkQuorum, int port, String znodeParent) {
+        return getUrlInternal(zkQuorum, port, znodeParent);
     }
 
-    public static String getUrl(String server, long port) {
-        String serverUrl = getUrl(server);
-        return serverUrl + PhoenixRuntime.JDBC_PROTOCOL_SEPARATOR + port
+    private static String getUrlInternal(String zkQuorum, Integer port, String znodeParent) {
+        return new PhoenixEmbeddedDriver.ConnectionInfo(zkQuorum, port, znodeParent).toUrl()
                 + PhoenixRuntime.JDBC_PROTOCOL_TERMINATOR;
     }
 
@@ -274,6 +297,7 @@ public final class QueryUtil {
 
     public static String getConnectionUrl(Properties props, Configuration conf)
             throws ClassNotFoundException, SQLException {
+        // TODO: props is ignored!
         // make sure we load the phoenix driver
         Class.forName(PhoenixDriver.class.getName());
 
@@ -304,12 +328,15 @@ public final class QueryUtil {
         if (port == -1) {
             port = conf.getInt(QueryServices.ZOOKEEPER_PORT_ATTRIB, -1);
             if (port == -1) {
+                // TODO: fall back to the default in HConstants#DEFAULT_ZOOKEPER_CLIENT_PORT
                 throw new RuntimeException("Client zk port was not set!");
             }
         }
         server = Joiner.on(',').join(servers);
+        String znodeParent = conf.get(HConstants.ZOOKEEPER_ZNODE_PARENT,
+                HConstants.DEFAULT_ZOOKEEPER_ZNODE_PARENT);
 
-        return getUrl(server, port);
+        return getUrl(server, port, znodeParent);
     }
     
     public static String getViewStatement(String schemaName, String tableName, String where) {

http://git-wip-us.apache.org/repos/asf/phoenix/blob/afb0120e/phoenix-core/src/test/java/org/apache/phoenix/jdbc/PhoenixEmbeddedDriverTest.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/test/java/org/apache/phoenix/jdbc/PhoenixEmbeddedDriverTest.java b/phoenix-core/src/test/java/org/apache/phoenix/jdbc/PhoenixEmbeddedDriverTest.java
index 79f9ec6..083b205 100644
--- a/phoenix-core/src/test/java/org/apache/phoenix/jdbc/PhoenixEmbeddedDriverTest.java
+++ b/phoenix-core/src/test/java/org/apache/phoenix/jdbc/PhoenixEmbeddedDriverTest.java
@@ -40,9 +40,11 @@ public class PhoenixEmbeddedDriverTest {
             "jdbc:phoenix:localhost:123",
             "jdbc:phoenix:localhost:123;foo=bar",
             "jdbc:phoenix:localhost:123:/hbase",
-            "jdbc:phoenix:localhost:123:/hbase;foo=bas",
+            "jdbc:phoenix:localhost:123:/foo-bar",
+            "jdbc:phoenix:localhost:123:/foo-bar;foo=bas",
             "jdbc:phoenix:localhost:/hbase",
-            "jdbc:phoenix:localhost:/hbase;test=true",
+            "jdbc:phoenix:localhost:/foo-bar",
+            "jdbc:phoenix:localhost:/foo-bar;test=true",
             "jdbc:phoenix:v1,v2,v3",
             "jdbc:phoenix:v1,v2,v3;",
             "jdbc:phoenix:v1,v2,v3;test=true",
@@ -51,6 +53,7 @@ public class PhoenixEmbeddedDriverTest {
             "jdbc:phoenix:v1,v2,v3:123:/hbase",
             "jdbc:phoenix:v1,v2,v3:123:/hbase;test=false",
             "jdbc:phoenix:v1,v2,v3:123:/hbase:user/principal:/user.keytab;test=false",
+            "jdbc:phoenix:v1,v2,v3:123:/foo-bar:user/principal:/user.keytab;test=false",
             "jdbc:phoenix:v1,v2,v3:123:user/principal:/user.keytab;test=false",
             "jdbc:phoenix:v1,v2,v3:user/principal:/user.keytab;test=false",
             "jdbc:phoenix:v1,v2,v3:/hbase:user/principal:/user.keytab;test=false",
@@ -64,9 +67,11 @@ public class PhoenixEmbeddedDriverTest {
             new ConnectionInfo("localhost",123,null),
             new ConnectionInfo("localhost",123,null),
             new ConnectionInfo("localhost",123,"/hbase"),
-            new ConnectionInfo("localhost",123,"/hbase"),
-            new ConnectionInfo("localhost",null,"/hbase"),
+            new ConnectionInfo("localhost",123,"/foo-bar"),
+            new ConnectionInfo("localhost",123,"/foo-bar"),
             new ConnectionInfo("localhost",null,"/hbase"),
+            new ConnectionInfo("localhost",null,"/foo-bar"),
+            new ConnectionInfo("localhost",null,"/foo-bar"),
             new ConnectionInfo("v1,v2,v3",null,null),
             new ConnectionInfo("v1,v2,v3",null,null),
             new ConnectionInfo("v1,v2,v3",null,null),
@@ -75,6 +80,7 @@ public class PhoenixEmbeddedDriverTest {
             new ConnectionInfo("v1,v2,v3",123,"/hbase"),
             new ConnectionInfo("v1,v2,v3",123,"/hbase"),
             new ConnectionInfo("v1,v2,v3",123,"/hbase","user/principal", "/user.keytab" ),
+            new ConnectionInfo("v1,v2,v3",123,"/foo-bar","user/principal", "/user.keytab" ),
             new ConnectionInfo("v1,v2,v3",123, null,"user/principal", "/user.keytab" ),
             new ConnectionInfo("v1,v2,v3", null, null,"user/principal", "/user.keytab" ),
             new ConnectionInfo("v1,v2,v3",null,"/hbase","user/principal", "/user.keytab" ),

http://git-wip-us.apache.org/repos/asf/phoenix/blob/afb0120e/phoenix-core/src/test/java/org/apache/phoenix/mapreduce/CsvBulkLoadToolTest.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/test/java/org/apache/phoenix/mapreduce/CsvBulkLoadToolTest.java b/phoenix-core/src/test/java/org/apache/phoenix/mapreduce/CsvBulkLoadToolTest.java
index 31fc71c..33bb976 100644
--- a/phoenix-core/src/test/java/org/apache/phoenix/mapreduce/CsvBulkLoadToolTest.java
+++ b/phoenix-core/src/test/java/org/apache/phoenix/mapreduce/CsvBulkLoadToolTest.java
@@ -66,15 +66,4 @@ public class CsvBulkLoadToolTest {
     public void testGetQualifiedTableName_NullSchema() {
         assertEquals("MYTABLE", CsvBulkLoadTool.getQualifiedTableName(null, "myTable"));
     }
-
-    @Test
-    public void testGetJdbcUrl_WithQuorumSupplied() {
-        assertEquals("jdbc:phoenix:myzkhost:2181", bulkLoadTool.getJdbcUrl("myzkhost:2181"));
-    }
-
-    @Test
-    public void testGetJdbcUrl_NoQuorumSupplied() {
-        assertEquals("jdbc:phoenix:localhost:2181", bulkLoadTool.getJdbcUrl(null));
-    }
-
 }

http://git-wip-us.apache.org/repos/asf/phoenix/blob/afb0120e/phoenix-core/src/test/java/org/apache/phoenix/mapreduce/CsvToKeyValueMapperTest.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/test/java/org/apache/phoenix/mapreduce/CsvToKeyValueMapperTest.java b/phoenix-core/src/test/java/org/apache/phoenix/mapreduce/CsvToKeyValueMapperTest.java
index 4033a65..dc6f497 100644
--- a/phoenix-core/src/test/java/org/apache/phoenix/mapreduce/CsvToKeyValueMapperTest.java
+++ b/phoenix-core/src/test/java/org/apache/phoenix/mapreduce/CsvToKeyValueMapperTest.java
@@ -95,21 +95,6 @@ public class CsvToKeyValueMapperTest {
     }
 
     @Test
-    public void testGetJdbcUrl() {
-        Configuration conf = new Configuration();
-        conf.set(HConstants.ZOOKEEPER_QUORUM, "myzkclient:2181");
-        String jdbcUrl = CsvToKeyValueMapper.getJdbcUrl(conf);
-
-        assertEquals("jdbc:phoenix:myzkclient:2181", jdbcUrl);
-    }
-
-    @Test(expected=IllegalStateException.class)
-    public void testGetJdbcUrl_NotConfigured() {
-        Configuration conf = new Configuration();
-        CsvToKeyValueMapper.getJdbcUrl(conf);
-    }
-
-    @Test
     public void testLoadPreUpdateProcessor() {
         Configuration conf = new Configuration();
         conf.setClass(PhoenixConfigurationUtil.UPSERT_HOOK_CLASS_CONFKEY, MockUpsertProcessor.class,

http://git-wip-us.apache.org/repos/asf/phoenix/blob/afb0120e/phoenix-core/src/test/java/org/apache/phoenix/util/QueryUtilTest.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/test/java/org/apache/phoenix/util/QueryUtilTest.java b/phoenix-core/src/test/java/org/apache/phoenix/util/QueryUtilTest.java
index beabaf1..8446e9e 100644
--- a/phoenix-core/src/test/java/org/apache/phoenix/util/QueryUtilTest.java
+++ b/phoenix-core/src/test/java/org/apache/phoenix/util/QueryUtilTest.java
@@ -17,10 +17,6 @@
  */
 package org.apache.phoenix.util;
 
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertFalse;
-import static org.junit.Assert.assertTrue;
-
 import java.sql.Types;
 import java.util.Properties;
 
@@ -30,6 +26,8 @@ import org.junit.Test;
 
 import com.google.common.collect.ImmutableList;
 
+import static org.junit.Assert.*;
+
 public class QueryUtilTest {
 
     private static final ColumnInfo ID_COLUMN = new ColumnInfo("ID", Types.BIGINT);
@@ -96,19 +94,28 @@ public class QueryUtilTest {
     }
 
     private void validateUrl(String url) {
-        String prefix = QueryUtil.getUrl("");
+        String prefix = PhoenixRuntime.JDBC_PROTOCOL + PhoenixRuntime.JDBC_PROTOCOL_SEPARATOR;
         assertTrue("JDBC URL missing jdbc protocol prefix", url.startsWith(prefix));
-        //remove the prefix, should only be left with server,server...:port
-        url = url.substring(prefix.length()+1);
-        // make sure only a single ':'
-        assertEquals("More than a single ':' in url: "+url, url.indexOf(PhoenixRuntime
-                .JDBC_PROTOCOL_SEPARATOR),
-                url.lastIndexOf(PhoenixRuntime.JDBC_PROTOCOL_SEPARATOR));
+        assertTrue("JDBC URL missing jdbc terminator suffix", url.endsWith(";"));
+        // remove the prefix, should only be left with server[,server...]:port:/znode
+        url = url.substring(prefix.length());
+        String[] splits = url.split(":");
+        assertTrue("zk details should contain at least server component", splits.length >= 1);
         // make sure that each server is comma separated
-        url = url.substring(0, url.indexOf(PhoenixRuntime.JDBC_PROTOCOL_SEPARATOR));
-        String[] servers = url.split(",");
+        String[] servers = splits[0].split(",");
         for(String server: servers){
             assertFalse("Found whitespace in server names for url: " + url, server.contains(" "));
         }
+        if (splits.length >= 2) {
+            // second bit is a port number, should not through
+            try {
+                Integer.parseInt(splits[1]);
+            } catch (NumberFormatException e) {
+                fail(e.getMessage());
+            }
+        }
+        if (splits.length >= 3) {
+            assertTrue("znode parent is not an absolute path", splits[2].startsWith("/"));
+        }
     }
 }
\ No newline at end of file