You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@phoenix.apache.org by nd...@apache.org on 2015/06/24 23:00:39 UTC
[07/31] phoenix git commit: PHOENIX-2005 Connection utilities omit zk
client port, parent znode
PHOENIX-2005 Connection utilities omit zk client port, parent znode
Project: http://git-wip-us.apache.org/repos/asf/phoenix/repo
Commit: http://git-wip-us.apache.org/repos/asf/phoenix/commit/c6b37b97
Tree: http://git-wip-us.apache.org/repos/asf/phoenix/tree/c6b37b97
Diff: http://git-wip-us.apache.org/repos/asf/phoenix/diff/c6b37b97
Branch: refs/heads/4.x-HBase-1.1
Commit: c6b37b979da1b514bcb9257c7e095e39b0c2c215
Parents: 3cdc323
Author: Nick Dimiduk <nd...@apache.org>
Authored: Tue May 26 11:11:48 2015 -0700
Committer: Nick Dimiduk <nd...@apache.org>
Committed: Tue May 26 13:27:03 2015 -0700
----------------------------------------------------------------------
.../phoenix/jdbc/PhoenixEmbeddedDriver.java | 28 ++++--
.../phoenix/mapreduce/CsvBulkLoadTool.java | 93 ++++++++++----------
.../phoenix/mapreduce/CsvToKeyValueMapper.java | 26 +-----
.../query/ConnectionQueryServicesImpl.java | 4 +-
.../java/org/apache/phoenix/util/QueryUtil.java | 45 ++++++++--
.../phoenix/jdbc/PhoenixEmbeddedDriverTest.java | 14 ++-
.../phoenix/mapreduce/CsvBulkLoadToolTest.java | 11 ---
.../mapreduce/CsvToKeyValueMapperTest.java | 15 ----
.../org/apache/phoenix/util/QueryUtilTest.java | 33 ++++---
9 files changed, 139 insertions(+), 130 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/phoenix/blob/c6b37b97/phoenix-core/src/main/java/org/apache/phoenix/jdbc/PhoenixEmbeddedDriver.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/jdbc/PhoenixEmbeddedDriver.java b/phoenix-core/src/main/java/org/apache/phoenix/jdbc/PhoenixEmbeddedDriver.java
index 9e95667..2451603 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/jdbc/PhoenixEmbeddedDriver.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/jdbc/PhoenixEmbeddedDriver.java
@@ -31,6 +31,7 @@ import java.util.logging.Logger;
import javax.annotation.concurrent.Immutable;
+import org.apache.hadoop.hbase.HConstants;
import org.apache.phoenix.coprocessor.MetaDataProtocol;
import org.apache.phoenix.exception.SQLExceptionCode;
import org.apache.phoenix.exception.SQLExceptionInfo;
@@ -174,10 +175,10 @@ public abstract class PhoenixEmbeddedDriver implements Driver, org.apache.phoeni
}
/**
- *
+ *
* Class to encapsulate connection info for HBase
*
- *
+ *
* @since 0.1.1
*/
public static class ConnectionInfo {
@@ -204,12 +205,18 @@ public abstract class PhoenixEmbeddedDriver implements Driver, org.apache.phoeni
return false;
}
- protected static ConnectionInfo create(String url) throws SQLException {
- StringTokenizer tokenizer = new StringTokenizer(url == null ? "" : url.substring(PhoenixRuntime.JDBC_PROTOCOL.length()),DELIMITERS, true);
+ public static ConnectionInfo create(String url) throws SQLException {
+ url = url == null ? "" : url;
+ url = url.startsWith(PhoenixRuntime.JDBC_PROTOCOL)
+ ? url.substring(PhoenixRuntime.JDBC_PROTOCOL.length())
+ : url;
+ StringTokenizer tokenizer = new StringTokenizer(url, DELIMITERS, true);
int nTokens = 0;
String[] tokens = new String[5];
String token = null;
- while (tokenizer.hasMoreTokens() && !(token=tokenizer.nextToken()).equals(TERMINATOR) && tokenizer.hasMoreTokens() && nTokens < tokens.length) {
+ while (tokenizer.hasMoreTokens() &&
+ !(token=tokenizer.nextToken()).equals(TERMINATOR) &&
+ tokenizer.hasMoreTokens() && nTokens < tokens.length) {
token = tokenizer.nextToken();
// This would mean we have an empty string for a token which is illegal
if (DELIMITERS.contains(token)) {
@@ -316,8 +323,7 @@ public abstract class PhoenixEmbeddedDriver implements Driver, org.apache.phoeni
private final String principal;
private final String keytab;
- // used for testing
- ConnectionInfo(String zookeeperQuorum, Integer port, String rootNode, String principal, String keytab) {
+ public ConnectionInfo(String zookeeperQuorum, Integer port, String rootNode, String principal, String keytab) {
this.zookeeperQuorum = zookeeperQuorum;
this.port = port;
this.rootNode = rootNode;
@@ -326,8 +332,7 @@ public abstract class PhoenixEmbeddedDriver implements Driver, org.apache.phoeni
this.keytab = keytab;
}
- // used for testing
- ConnectionInfo(String zookeeperQuorum, Integer port, String rootNode) {
+ public ConnectionInfo(String zookeeperQuorum, Integer port, String rootNode) {
this(zookeeperQuorum, port, rootNode, null, null);
}
@@ -417,6 +422,11 @@ public abstract class PhoenixEmbeddedDriver implements Driver, org.apache.phoeni
+ (principal == null ? "" : ":" + principal)
+ (keytab == null ? "" : ":" + keytab);
}
+
+ public String toUrl() {
+ return PhoenixRuntime.JDBC_PROTOCOL + PhoenixRuntime.JDBC_PROTOCOL_SEPARATOR
+ + toString();
+ }
}
public static boolean isTestUrl(String url) {
http://git-wip-us.apache.org/repos/asf/phoenix/blob/c6b37b97/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/CsvBulkLoadTool.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/CsvBulkLoadTool.java b/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/CsvBulkLoadTool.java
index 31f8b42..7afde98 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/CsvBulkLoadTool.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/CsvBulkLoadTool.java
@@ -18,11 +18,11 @@
package org.apache.phoenix.mapreduce;
import java.sql.Connection;
-import java.sql.DriverManager;
import java.sql.ResultSet;
import java.sql.SQLException;
import java.util.ArrayList;
import java.util.List;
+import java.util.Map;
import java.util.UUID;
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutionException;
@@ -41,7 +41,6 @@ import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.HBaseConfiguration;
-import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.KeyValue;
import org.apache.hadoop.hbase.client.HTable;
import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
@@ -56,8 +55,8 @@ import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;
import org.apache.phoenix.jdbc.PhoenixConnection;
import org.apache.phoenix.jdbc.PhoenixDatabaseMetaData;
+import org.apache.phoenix.jdbc.PhoenixDriver;
import org.apache.phoenix.job.JobManager;
-import org.apache.phoenix.mapreduce.util.PhoenixConfigurationUtil;
import org.apache.phoenix.query.QueryConstants;
import org.apache.phoenix.query.QueryServices;
import org.apache.phoenix.query.QueryServicesOptions;
@@ -67,6 +66,7 @@ import org.apache.phoenix.util.CSVCommonsLoader;
import org.apache.phoenix.util.ColumnInfo;
import org.apache.phoenix.util.MetaDataUtil;
import org.apache.phoenix.util.PhoenixRuntime;
+import org.apache.phoenix.util.QueryUtil;
import org.apache.phoenix.util.SchemaUtil;
import org.apache.phoenix.util.StringUtil;
import org.slf4j.Logger;
@@ -84,7 +84,7 @@ public class CsvBulkLoadTool extends Configured implements Tool {
private static final Logger LOG = LoggerFactory.getLogger(CsvBulkLoadTool.class);
- static final Option ZK_QUORUM_OPT = new Option("z", "zookeeper", true, "Zookeeper quorum to connect to (optional)");
+ static final Option ZK_QUORUM_OPT = new Option("z", "zookeeper", true, "Supply zookeeper connection details (optional)");
static final Option INPUT_PATH_OPT = new Option("i", "input", true, "Input CSV path (mandatory)");
static final Option OUTPUT_PATH_OPT = new Option("o", "output", true, "Output path for temporary HFiles (optional)");
static final Option SCHEMA_NAME_OPT = new Option("s", "schema", true, "Phoenix schema name (optional)");
@@ -184,35 +184,48 @@ public class CsvBulkLoadTool extends Configured implements Tool {
} catch (IllegalStateException e) {
printHelpAndExit(e.getMessage(), getOptions());
}
- Class.forName(DriverManager.class.getName());
- Connection conn = DriverManager.getConnection(
- getJdbcUrl(cmdLine.getOptionValue(ZK_QUORUM_OPT.getOpt())));
-
- return loadData(conf, cmdLine, conn);
+ return loadData(conf, cmdLine);
}
- private int loadData(Configuration conf, CommandLine cmdLine,
- Connection conn) throws SQLException, InterruptedException,
- ExecutionException {
- String tableName = cmdLine.getOptionValue(TABLE_NAME_OPT.getOpt());
+ private int loadData(Configuration conf, CommandLine cmdLine) throws SQLException,
+ InterruptedException, ExecutionException, ClassNotFoundException {
+ String tableName = cmdLine.getOptionValue(TABLE_NAME_OPT.getOpt());
String schemaName = cmdLine.getOptionValue(SCHEMA_NAME_OPT.getOpt());
String indexTableName = cmdLine.getOptionValue(INDEX_TABLE_NAME_OPT.getOpt());
String qualifiedTableName = getQualifiedTableName(schemaName, tableName);
- String qualifedIndexTableName = null;
- if(indexTableName != null){
- qualifedIndexTableName = getQualifiedTableName(schemaName, indexTableName);
+ String qualifiedIndexTableName = null;
+ if (indexTableName != null){
+ qualifiedIndexTableName = getQualifiedTableName(schemaName, indexTableName);
+ }
+
+ if (cmdLine.hasOption(ZK_QUORUM_OPT.getOpt())) {
+ // ZK_QUORUM_OPT is optional, but if it's there, use it for both the conn and the job.
+ String zkQuorum = cmdLine.getOptionValue(ZK_QUORUM_OPT.getOpt());
+ PhoenixDriver.ConnectionInfo info = PhoenixDriver.ConnectionInfo.create(zkQuorum);
+ LOG.info("Configuring HBase connection to {}", info);
+ for (Map.Entry<String,String> entry : info.asProps()) {
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Setting {} = {}", entry.getKey(), entry.getValue());
+ }
+ conf.set(entry.getKey(), entry.getValue());
+ }
+ }
+
+ final Connection conn = QueryUtil.getConnection(conf);
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Reading columns from {} :: {}", ((PhoenixConnection) conn).getURL(),
+ qualifiedTableName);
}
List<ColumnInfo> importColumns = buildImportColumns(conn, cmdLine, qualifiedTableName);
configureOptions(cmdLine, importColumns, conf);
-
try {
validateTable(conn, schemaName, tableName);
} finally {
conn.close();
}
- Path inputPath = new Path(cmdLine.getOptionValue(INPUT_PATH_OPT.getOpt()));
- Path outputPath = null;
+ final Path inputPath = new Path(cmdLine.getOptionValue(INPUT_PATH_OPT.getOpt()));
+ final Path outputPath;
if (cmdLine.hasOption(OUTPUT_PATH_OPT.getOpt())) {
outputPath = new Path(cmdLine.getOptionValue(OUTPUT_PATH_OPT.getOpt()));
} else {
@@ -221,20 +234,21 @@ public class CsvBulkLoadTool extends Configured implements Tool {
List<TargetTableRef> tablesToBeLoaded = new ArrayList<TargetTableRef>();
tablesToBeLoaded.add(new TargetTableRef(qualifiedTableName));
+ // using conn after it's been closed... o.O
tablesToBeLoaded.addAll(getIndexTables(conn, schemaName, qualifiedTableName));
// When loading a single index table, check index table name is correct
- if(qualifedIndexTableName != null){
+ if (qualifiedIndexTableName != null){
TargetTableRef targetIndexRef = null;
for (TargetTableRef tmpTable : tablesToBeLoaded){
- if(tmpTable.getLogicalName().compareToIgnoreCase(qualifedIndexTableName) == 0) {
+ if (tmpTable.getLogicalName().compareToIgnoreCase(qualifiedIndexTableName) == 0) {
targetIndexRef = tmpTable;
break;
}
}
- if(targetIndexRef == null){
+ if (targetIndexRef == null){
throw new IllegalStateException("CSV Bulk Loader error: index table " +
- qualifedIndexTableName + " doesn't exist");
+ qualifiedIndexTableName + " doesn't exist");
}
tablesToBeLoaded.clear();
tablesToBeLoaded.add(targetIndexRef);
@@ -247,13 +261,14 @@ public class CsvBulkLoadTool extends Configured implements Tool {
.getProps()
.getBoolean(QueryServices.METRICS_ENABLED,
QueryServicesOptions.DEFAULT_IS_METRICS_ENABLED);
- ExecutorService executor = JobManager.createThreadPoolExec(Integer.MAX_VALUE, 5, 20, useInstrumentedPool);
+ ExecutorService executor =
+ JobManager.createThreadPoolExec(Integer.MAX_VALUE, 5, 20, useInstrumentedPool);
try{
for (TargetTableRef table : tablesToBeLoaded) {
Path tablePath = new Path(outputPath, table.getPhysicalName());
Configuration jobConf = new Configuration(conf);
jobConf.set(CsvToKeyValueMapper.TABLE_NAME_CONFKEY, qualifiedTableName);
- if(qualifiedTableName.compareToIgnoreCase(table.getLogicalName()) != 0) {
+ if (qualifiedTableName.compareToIgnoreCase(table.getLogicalName()) != 0) {
jobConf.set(CsvToKeyValueMapper.INDEX_TABLE_NAME_CONFKEY, table.getPhysicalName());
}
TableLoader tableLoader = new TableLoader(
@@ -274,14 +289,6 @@ public class CsvBulkLoadTool extends Configured implements Tool {
return retCode;
}
- String getJdbcUrl(String zkQuorum) {
- if (zkQuorum == null) {
- LOG.warn("Defaulting to localhost for ZooKeeper quorum");
- zkQuorum = "localhost:2181";
- }
- return PhoenixRuntime.JDBC_PROTOCOL + PhoenixRuntime.JDBC_PROTOCOL_SEPARATOR + zkQuorum;
- }
-
/**
* Build up the list of columns to be imported. The list is taken from the command line if
* present, otherwise it is taken from the table description.
@@ -327,9 +334,11 @@ public class CsvBulkLoadTool extends Configured implements Tool {
* @param importColumns descriptors of columns to be imported
* @param conf job configuration
*/
- @VisibleForTesting
- static void configureOptions(CommandLine cmdLine, List<ColumnInfo> importColumns,
- Configuration conf) {
+ private static void configureOptions(CommandLine cmdLine, List<ColumnInfo> importColumns,
+ Configuration conf) throws SQLException {
+
+ // we don't parse ZK_QUORUM_OPT here because we need it in order to
+ // create the connection we need to build importColumns.
char delimiterChar = ',';
if (cmdLine.hasOption(DELIMITER_OPT.getOpt())) {
@@ -358,12 +367,6 @@ public class CsvBulkLoadTool extends Configured implements Tool {
escapeChar = escapeString.charAt(0);
}
- if (cmdLine.hasOption(ZK_QUORUM_OPT.getOpt())) {
- String zkQuorum = cmdLine.getOptionValue(ZK_QUORUM_OPT.getOpt());
- LOG.info("Configuring ZK quorum to {}", zkQuorum);
- conf.set(HConstants.ZOOKEEPER_QUORUM, zkQuorum);
- }
-
CsvBulkImportUtil.initCsvImportJob(
conf,
getQualifiedTableName(
@@ -493,7 +496,7 @@ public class CsvBulkLoadTool extends Configured implements Tool {
job.setMapOutputKeyClass(ImmutableBytesWritable.class);
job.setMapOutputValueClass(KeyValue.class);
- // initialize credentials to possibily run in a secure env
+ // initialize credentials to possibly run in a secure env
TableMapReduceUtil.initCredentials(job);
HTable htable = new HTable(conf, tableName);
@@ -522,8 +525,8 @@ public class CsvBulkLoadTool extends Configured implements Tool {
}
return true;
- } catch(Exception ex) {
- LOG.error("Import job on table=" + tableName + " failed due to exception:" + ex);
+ } catch (Exception ex) {
+ LOG.error("Import job on table=" + tableName + " failed due to exception.", ex);
return false;
}
}
http://git-wip-us.apache.org/repos/asf/phoenix/blob/c6b37b97/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/CsvToKeyValueMapper.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/CsvToKeyValueMapper.java b/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/CsvToKeyValueMapper.java
index 90cb854..c0328bd 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/CsvToKeyValueMapper.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/CsvToKeyValueMapper.java
@@ -19,7 +19,6 @@ package org.apache.phoenix.mapreduce;
import java.io.IOException;
import java.io.StringReader;
-import java.sql.DriverManager;
import java.sql.SQLException;
import java.util.Iterator;
import java.util.List;
@@ -32,7 +31,6 @@ import org.apache.commons.csv.CSVFormat;
import org.apache.commons.csv.CSVParser;
import org.apache.commons.csv.CSVRecord;
import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.KeyValue;
import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
import org.apache.hadoop.hbase.util.Bytes;
@@ -41,11 +39,11 @@ import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.phoenix.jdbc.PhoenixConnection;
-import org.apache.phoenix.jdbc.PhoenixDriver;
import org.apache.phoenix.mapreduce.util.PhoenixConfigurationUtil;
import org.apache.phoenix.util.CSVCommonsLoader;
import org.apache.phoenix.util.ColumnInfo;
import org.apache.phoenix.util.PhoenixRuntime;
+import org.apache.phoenix.util.QueryUtil;
import org.apache.phoenix.util.csv.CsvUpsertExecutor;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -108,7 +106,6 @@ public class CsvToKeyValueMapper extends Mapper<LongWritable,Text,ImmutableBytes
protected void setup(Context context) throws IOException, InterruptedException {
Configuration conf = context.getConfiguration();
- String jdbcUrl = getJdbcUrl(conf);
// pass client configuration into driver
Properties clientInfos = new Properties();
@@ -118,12 +115,9 @@ public class CsvToKeyValueMapper extends Mapper<LongWritable,Text,ImmutableBytes
clientInfos.setProperty(entry.getKey(), entry.getValue());
}
- // This statement also ensures that the driver class is loaded
- LOG.info("Connection with driver {} with url {}", PhoenixDriver.class.getName(), jdbcUrl);
-
try {
- conn = (PhoenixConnection) DriverManager.getConnection(jdbcUrl, clientInfos);
- } catch (SQLException e) {
+ conn = (PhoenixConnection) QueryUtil.getConnection(clientInfos, conf);
+ } catch (SQLException | ClassNotFoundException e) {
throw new RuntimeException(e);
}
@@ -189,20 +183,6 @@ public class CsvToKeyValueMapper extends Mapper<LongWritable,Text,ImmutableBytes
}
}
- /**
- * Build up the JDBC URL for connecting to Phoenix.
- *
- * @return the full JDBC URL for a Phoenix connection
- */
- @VisibleForTesting
- static String getJdbcUrl(Configuration conf) {
- String zkQuorum = conf.get(HConstants.ZOOKEEPER_QUORUM);
- if (zkQuorum == null) {
- throw new IllegalStateException(HConstants.ZOOKEEPER_QUORUM + " is not configured");
- }
- return PhoenixRuntime.JDBC_PROTOCOL + PhoenixRuntime.JDBC_PROTOCOL_SEPARATOR + zkQuorum;
- }
-
@VisibleForTesting
CsvUpsertExecutor buildUpsertExecutor(Configuration conf) {
String tableName = conf.get(TABLE_NAME_CONFKEY);
http://git-wip-us.apache.org/repos/asf/phoenix/blob/c6b37b97/phoenix-core/src/main/java/org/apache/phoenix/query/ConnectionQueryServicesImpl.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/query/ConnectionQueryServicesImpl.java b/phoenix-core/src/main/java/org/apache/phoenix/query/ConnectionQueryServicesImpl.java
index 30b43d5..b071bc5 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/query/ConnectionQueryServicesImpl.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/query/ConnectionQueryServicesImpl.java
@@ -807,7 +807,9 @@ public class ConnectionQueryServicesImpl extends DelegateQueryServices implement
boolean isMetaTable = SchemaUtil.isMetaTable(tableName);
boolean tableExist = true;
try {
- logger.info("Found quorum: " + ZKConfig.getZKQuorumServersString(config));
+ final String quorum = ZKConfig.getZKQuorumServersString(config);
+ final String znode = config.get(HConstants.ZOOKEEPER_ZNODE_PARENT);
+ logger.debug("Found quorum: " + quorum + ":" + znode);
admin = new HBaseAdmin(config);
try {
existingDesc = admin.getTableDescriptor(tableName);
http://git-wip-us.apache.org/repos/asf/phoenix/blob/c6b37b97/phoenix-core/src/main/java/org/apache/phoenix/util/QueryUtil.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/util/QueryUtil.java b/phoenix-core/src/main/java/org/apache/phoenix/util/QueryUtil.java
index d63a68f..bd38983 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/util/QueryUtil.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/util/QueryUtil.java
@@ -34,11 +34,13 @@ import javax.annotation.Nullable;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.filter.CompareFilter.CompareOp;
import org.apache.hadoop.hbase.util.Addressing;
import org.apache.hadoop.hbase.zookeeper.ZKConfig;
import org.apache.phoenix.iterate.ResultIterator;
import org.apache.phoenix.jdbc.PhoenixDriver;
+import org.apache.phoenix.jdbc.PhoenixEmbeddedDriver;
import org.apache.phoenix.parse.HintNode;
import org.apache.phoenix.parse.HintNode.Hint;
import org.apache.phoenix.parse.WildcardParseNode;
@@ -49,8 +51,6 @@ import com.google.common.base.Joiner;
import com.google.common.base.Preconditions;
import com.google.common.collect.Iterables;
import com.google.common.collect.Lists;
-import java.util.Iterator;
-import java.util.Map;
public final class QueryUtil {
@@ -129,7 +129,7 @@ public final class QueryUtil {
*
* @param tableName name of the table for which the upsert statement is to be created
* @param columns list of columns to be included in the upsert statement
- * @param Hint hint to be added to the UPSERT statement.
+ * @param hint hint to be added to the UPSERT statement.
* @return the created {@code UPSERT} statement
*/
public static String constructUpsertStatement(String tableName, List<String> columns, Hint hint) {
@@ -222,13 +222,36 @@ public final class QueryUtil {
return query.toString();
}
- public static String getUrl(String server) {
- return PhoenixRuntime.JDBC_PROTOCOL + PhoenixRuntime.JDBC_PROTOCOL_SEPARATOR + server;
+ /**
+ * Create the Phoenix JDBC connection URL from the provided cluster connection details.
+ */
+ public static String getUrl(String zkQuorum) {
+ return getUrlInternal(zkQuorum, null, null);
+ }
+
+ /**
+ * Create the Phoenix JDBC connection URL from the provided cluster connection details.
+ */
+ public static String getUrl(String zkQuorum, int clientPort) {
+ return getUrlInternal(zkQuorum, clientPort, null);
+ }
+
+ /**
+ * Create the Phoenix JDBC connection URL from the provided cluster connection details.
+ */
+ public static String getUrl(String zkQuorum, String znodeParent) {
+ return getUrlInternal(zkQuorum, null, znodeParent);
+ }
+
+ /**
+ * Create the Phoenix JDBC connection URL from the provided cluster connection details.
+ */
+ public static String getUrl(String zkQuorum, int port, String znodeParent) {
+ return getUrlInternal(zkQuorum, port, znodeParent);
}
- public static String getUrl(String server, long port) {
- String serverUrl = getUrl(server);
- return serverUrl + PhoenixRuntime.JDBC_PROTOCOL_SEPARATOR + port
+ private static String getUrlInternal(String zkQuorum, Integer port, String znodeParent) {
+ return new PhoenixEmbeddedDriver.ConnectionInfo(zkQuorum, port, znodeParent).toUrl()
+ PhoenixRuntime.JDBC_PROTOCOL_TERMINATOR;
}
@@ -274,6 +297,7 @@ public final class QueryUtil {
public static String getConnectionUrl(Properties props, Configuration conf)
throws ClassNotFoundException, SQLException {
+ // TODO: props is ignored!
// make sure we load the phoenix driver
Class.forName(PhoenixDriver.class.getName());
@@ -304,12 +328,15 @@ public final class QueryUtil {
if (port == -1) {
port = conf.getInt(QueryServices.ZOOKEEPER_PORT_ATTRIB, -1);
if (port == -1) {
+ // TODO: fall back to the default in HConstants#DEFAULT_ZOOKEPER_CLIENT_PORT
throw new RuntimeException("Client zk port was not set!");
}
}
server = Joiner.on(',').join(servers);
+ String znodeParent = conf.get(HConstants.ZOOKEEPER_ZNODE_PARENT,
+ HConstants.DEFAULT_ZOOKEEPER_ZNODE_PARENT);
- return getUrl(server, port);
+ return getUrl(server, port, znodeParent);
}
public static String getViewStatement(String schemaName, String tableName, String where) {
http://git-wip-us.apache.org/repos/asf/phoenix/blob/c6b37b97/phoenix-core/src/test/java/org/apache/phoenix/jdbc/PhoenixEmbeddedDriverTest.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/test/java/org/apache/phoenix/jdbc/PhoenixEmbeddedDriverTest.java b/phoenix-core/src/test/java/org/apache/phoenix/jdbc/PhoenixEmbeddedDriverTest.java
index 79f9ec6..083b205 100644
--- a/phoenix-core/src/test/java/org/apache/phoenix/jdbc/PhoenixEmbeddedDriverTest.java
+++ b/phoenix-core/src/test/java/org/apache/phoenix/jdbc/PhoenixEmbeddedDriverTest.java
@@ -40,9 +40,11 @@ public class PhoenixEmbeddedDriverTest {
"jdbc:phoenix:localhost:123",
"jdbc:phoenix:localhost:123;foo=bar",
"jdbc:phoenix:localhost:123:/hbase",
- "jdbc:phoenix:localhost:123:/hbase;foo=bas",
+ "jdbc:phoenix:localhost:123:/foo-bar",
+ "jdbc:phoenix:localhost:123:/foo-bar;foo=bas",
"jdbc:phoenix:localhost:/hbase",
- "jdbc:phoenix:localhost:/hbase;test=true",
+ "jdbc:phoenix:localhost:/foo-bar",
+ "jdbc:phoenix:localhost:/foo-bar;test=true",
"jdbc:phoenix:v1,v2,v3",
"jdbc:phoenix:v1,v2,v3;",
"jdbc:phoenix:v1,v2,v3;test=true",
@@ -51,6 +53,7 @@ public class PhoenixEmbeddedDriverTest {
"jdbc:phoenix:v1,v2,v3:123:/hbase",
"jdbc:phoenix:v1,v2,v3:123:/hbase;test=false",
"jdbc:phoenix:v1,v2,v3:123:/hbase:user/principal:/user.keytab;test=false",
+ "jdbc:phoenix:v1,v2,v3:123:/foo-bar:user/principal:/user.keytab;test=false",
"jdbc:phoenix:v1,v2,v3:123:user/principal:/user.keytab;test=false",
"jdbc:phoenix:v1,v2,v3:user/principal:/user.keytab;test=false",
"jdbc:phoenix:v1,v2,v3:/hbase:user/principal:/user.keytab;test=false",
@@ -64,9 +67,11 @@ public class PhoenixEmbeddedDriverTest {
new ConnectionInfo("localhost",123,null),
new ConnectionInfo("localhost",123,null),
new ConnectionInfo("localhost",123,"/hbase"),
- new ConnectionInfo("localhost",123,"/hbase"),
- new ConnectionInfo("localhost",null,"/hbase"),
+ new ConnectionInfo("localhost",123,"/foo-bar"),
+ new ConnectionInfo("localhost",123,"/foo-bar"),
new ConnectionInfo("localhost",null,"/hbase"),
+ new ConnectionInfo("localhost",null,"/foo-bar"),
+ new ConnectionInfo("localhost",null,"/foo-bar"),
new ConnectionInfo("v1,v2,v3",null,null),
new ConnectionInfo("v1,v2,v3",null,null),
new ConnectionInfo("v1,v2,v3",null,null),
@@ -75,6 +80,7 @@ public class PhoenixEmbeddedDriverTest {
new ConnectionInfo("v1,v2,v3",123,"/hbase"),
new ConnectionInfo("v1,v2,v3",123,"/hbase"),
new ConnectionInfo("v1,v2,v3",123,"/hbase","user/principal", "/user.keytab" ),
+ new ConnectionInfo("v1,v2,v3",123,"/foo-bar","user/principal", "/user.keytab" ),
new ConnectionInfo("v1,v2,v3",123, null,"user/principal", "/user.keytab" ),
new ConnectionInfo("v1,v2,v3", null, null,"user/principal", "/user.keytab" ),
new ConnectionInfo("v1,v2,v3",null,"/hbase","user/principal", "/user.keytab" ),
http://git-wip-us.apache.org/repos/asf/phoenix/blob/c6b37b97/phoenix-core/src/test/java/org/apache/phoenix/mapreduce/CsvBulkLoadToolTest.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/test/java/org/apache/phoenix/mapreduce/CsvBulkLoadToolTest.java b/phoenix-core/src/test/java/org/apache/phoenix/mapreduce/CsvBulkLoadToolTest.java
index 31fc71c..33bb976 100644
--- a/phoenix-core/src/test/java/org/apache/phoenix/mapreduce/CsvBulkLoadToolTest.java
+++ b/phoenix-core/src/test/java/org/apache/phoenix/mapreduce/CsvBulkLoadToolTest.java
@@ -66,15 +66,4 @@ public class CsvBulkLoadToolTest {
public void testGetQualifiedTableName_NullSchema() {
assertEquals("MYTABLE", CsvBulkLoadTool.getQualifiedTableName(null, "myTable"));
}
-
- @Test
- public void testGetJdbcUrl_WithQuorumSupplied() {
- assertEquals("jdbc:phoenix:myzkhost:2181", bulkLoadTool.getJdbcUrl("myzkhost:2181"));
- }
-
- @Test
- public void testGetJdbcUrl_NoQuorumSupplied() {
- assertEquals("jdbc:phoenix:localhost:2181", bulkLoadTool.getJdbcUrl(null));
- }
-
}
http://git-wip-us.apache.org/repos/asf/phoenix/blob/c6b37b97/phoenix-core/src/test/java/org/apache/phoenix/mapreduce/CsvToKeyValueMapperTest.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/test/java/org/apache/phoenix/mapreduce/CsvToKeyValueMapperTest.java b/phoenix-core/src/test/java/org/apache/phoenix/mapreduce/CsvToKeyValueMapperTest.java
index 4033a65..dc6f497 100644
--- a/phoenix-core/src/test/java/org/apache/phoenix/mapreduce/CsvToKeyValueMapperTest.java
+++ b/phoenix-core/src/test/java/org/apache/phoenix/mapreduce/CsvToKeyValueMapperTest.java
@@ -95,21 +95,6 @@ public class CsvToKeyValueMapperTest {
}
@Test
- public void testGetJdbcUrl() {
- Configuration conf = new Configuration();
- conf.set(HConstants.ZOOKEEPER_QUORUM, "myzkclient:2181");
- String jdbcUrl = CsvToKeyValueMapper.getJdbcUrl(conf);
-
- assertEquals("jdbc:phoenix:myzkclient:2181", jdbcUrl);
- }
-
- @Test(expected=IllegalStateException.class)
- public void testGetJdbcUrl_NotConfigured() {
- Configuration conf = new Configuration();
- CsvToKeyValueMapper.getJdbcUrl(conf);
- }
-
- @Test
public void testLoadPreUpdateProcessor() {
Configuration conf = new Configuration();
conf.setClass(PhoenixConfigurationUtil.UPSERT_HOOK_CLASS_CONFKEY, MockUpsertProcessor.class,
http://git-wip-us.apache.org/repos/asf/phoenix/blob/c6b37b97/phoenix-core/src/test/java/org/apache/phoenix/util/QueryUtilTest.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/test/java/org/apache/phoenix/util/QueryUtilTest.java b/phoenix-core/src/test/java/org/apache/phoenix/util/QueryUtilTest.java
index beabaf1..8446e9e 100644
--- a/phoenix-core/src/test/java/org/apache/phoenix/util/QueryUtilTest.java
+++ b/phoenix-core/src/test/java/org/apache/phoenix/util/QueryUtilTest.java
@@ -17,10 +17,6 @@
*/
package org.apache.phoenix.util;
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertFalse;
-import static org.junit.Assert.assertTrue;
-
import java.sql.Types;
import java.util.Properties;
@@ -30,6 +26,8 @@ import org.junit.Test;
import com.google.common.collect.ImmutableList;
+import static org.junit.Assert.*;
+
public class QueryUtilTest {
private static final ColumnInfo ID_COLUMN = new ColumnInfo("ID", Types.BIGINT);
@@ -96,19 +94,28 @@ public class QueryUtilTest {
}
private void validateUrl(String url) {
- String prefix = QueryUtil.getUrl("");
+ String prefix = PhoenixRuntime.JDBC_PROTOCOL + PhoenixRuntime.JDBC_PROTOCOL_SEPARATOR;
assertTrue("JDBC URL missing jdbc protocol prefix", url.startsWith(prefix));
- //remove the prefix, should only be left with server,server...:port
- url = url.substring(prefix.length()+1);
- // make sure only a single ':'
- assertEquals("More than a single ':' in url: "+url, url.indexOf(PhoenixRuntime
- .JDBC_PROTOCOL_SEPARATOR),
- url.lastIndexOf(PhoenixRuntime.JDBC_PROTOCOL_SEPARATOR));
+ assertTrue("JDBC URL missing jdbc terminator suffix", url.endsWith(";"));
+ // remove the prefix, should only be left with server[,server...]:port:/znode
+ url = url.substring(prefix.length());
+ String[] splits = url.split(":");
+ assertTrue("zk details should contain at least server component", splits.length >= 1);
// make sure that each server is comma separated
- url = url.substring(0, url.indexOf(PhoenixRuntime.JDBC_PROTOCOL_SEPARATOR));
- String[] servers = url.split(",");
+ String[] servers = splits[0].split(",");
for(String server: servers){
assertFalse("Found whitespace in server names for url: " + url, server.contains(" "));
}
+ if (splits.length >= 2) {
+ // second bit is a port number, should not through
+ try {
+ Integer.parseInt(splits[1]);
+ } catch (NumberFormatException e) {
+ fail(e.getMessage());
+ }
+ }
+ if (splits.length >= 3) {
+ assertTrue("znode parent is not an absolute path", splits[2].startsWith("/"));
+ }
}
}
\ No newline at end of file