You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@phoenix.apache.org by gr...@apache.org on 2015/03/24 20:12:09 UTC
phoenix git commit: PHOENIX-1653 Support separate clusters for MR jobs
Repository: phoenix
Updated Branches:
refs/heads/4.x-HBase-0.98 c92de2749 -> af0d65a0a
PHOENIX-1653 Support separate clusters for MR jobs
Add support for the input and output formats of a Phoenix MapReduce job to
point to separate clusters using override configuration settings. Defaults to
existing behavior (HConstants.ZOOKEEPER_QUORUM)
Project: http://git-wip-us.apache.org/repos/asf/phoenix/repo
Commit: http://git-wip-us.apache.org/repos/asf/phoenix/commit/af0d65a0
Tree: http://git-wip-us.apache.org/repos/asf/phoenix/tree/af0d65a0
Diff: http://git-wip-us.apache.org/repos/asf/phoenix/diff/af0d65a0
Branch: refs/heads/4.x-HBase-0.98
Commit: af0d65a0abdbb8dc73d766a15e3ffa0e6d854d13
Parents: c92de27
Author: gjacoby <gj...@salesforce.com>
Authored: Fri Feb 27 16:49:14 2015 -0800
Committer: Gabriel Reid <ga...@ngdata.com>
Committed: Tue Mar 24 19:40:21 2015 +0100
----------------------------------------------------------------------
.../phoenix/mapreduce/PhoenixInputFormat.java | 15 ++--
.../phoenix/mapreduce/PhoenixRecordWriter.java | 2 +-
.../phoenix/mapreduce/index/IndexTool.java | 2 +-
.../index/PhoenixIndexImportMapper.java | 2 +-
.../phoenix/mapreduce/util/ConnectionUtil.java | 88 ++++++++++++++------
.../util/PhoenixConfigurationUtil.java | 72 ++++++++++++++--
.../mapreduce/util/PhoenixMapReduceUtil.java | 22 ++++-
.../util/PhoenixConfigurationUtilTest.java | 60 ++++++++++++-
.../pig/util/QuerySchemaParserFunction.java | 2 +-
.../pig/util/SqlQueryToColumnInfoFunction.java | 2 +-
10 files changed, 219 insertions(+), 48 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/phoenix/blob/af0d65a0/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/PhoenixInputFormat.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/PhoenixInputFormat.java b/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/PhoenixInputFormat.java
index a83b9ae..31759b4 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/PhoenixInputFormat.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/PhoenixInputFormat.java
@@ -98,15 +98,16 @@ public class PhoenixInputFormat<T extends DBWritable> extends InputFormat<NullWr
* @throws IOException
* @throws SQLException
*/
- private QueryPlan getQueryPlan(final JobContext context,final Configuration configuration) throws IOException {
+ private QueryPlan getQueryPlan(final JobContext context, final Configuration configuration)
+ throws IOException {
Preconditions.checkNotNull(context);
- try{
+ try {
final String currentScnValue = configuration.get(PhoenixConfigurationUtil.CURRENT_SCN_VALUE);
final Properties overridingProps = new Properties();
if(currentScnValue != null) {
overridingProps.put(PhoenixRuntime.CURRENT_SCN_ATTRIB, currentScnValue);
}
- final Connection connection = ConnectionUtil.getConnection(configuration,overridingProps);
+ final Connection connection = ConnectionUtil.getInputConnection(configuration, overridingProps);
final String selectStatement = PhoenixConfigurationUtil.getSelectStatement(configuration);
Preconditions.checkNotNull(selectStatement);
final Statement statement = connection.createStatement();
@@ -116,9 +117,11 @@ public class PhoenixInputFormat<T extends DBWritable> extends InputFormat<NullWr
// Initialize the query plan so it sets up the parallel scans
queryPlan.iterator();
return queryPlan;
- } catch(Exception exception) {
- LOG.error(String.format("Failed to get the query plan with error [%s]",exception.getMessage()));
+ } catch (Exception exception) {
+ LOG.error(String.format("Failed to get the query plan with error [%s]",
+ exception.getMessage()));
throw new RuntimeException(exception);
}
- }
+ }
+
}
http://git-wip-us.apache.org/repos/asf/phoenix/blob/af0d65a0/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/PhoenixRecordWriter.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/PhoenixRecordWriter.java b/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/PhoenixRecordWriter.java
index 4d26bf4..5843076 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/PhoenixRecordWriter.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/PhoenixRecordWriter.java
@@ -46,7 +46,7 @@ public class PhoenixRecordWriter<T extends DBWritable> extends RecordWriter<Nul
private long numRecords = 0;
public PhoenixRecordWriter(final Configuration configuration) throws SQLException {
- this.conn = ConnectionUtil.getConnection(configuration);
+ this.conn = ConnectionUtil.getOutputConnection(configuration);
this.batchSize = PhoenixConfigurationUtil.getBatchSize(configuration);
final String upsertQuery = PhoenixConfigurationUtil.getUpsertStatement(configuration);
this.statement = this.conn.prepareStatement(upsertQuery);
http://git-wip-us.apache.org/repos/asf/phoenix/blob/af0d65a0/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/index/IndexTool.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/index/IndexTool.java b/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/index/IndexTool.java
index d93ef9c..300f575 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/index/IndexTool.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/index/IndexTool.java
@@ -163,7 +163,7 @@ public class IndexTool extends Configured implements Tool {
final String qDataTable = SchemaUtil.getTableName(schemaName, dataTable);
final String qIndexTable = SchemaUtil.getTableName(schemaName, indexTable);
- connection = ConnectionUtil.getConnection(configuration);
+ connection = ConnectionUtil.getInputConnection(configuration);
if(!isValidIndexTable(connection, dataTable, indexTable)) {
throw new IllegalArgumentException(String.format(" %s is not an index table for %s ",qIndexTable,qDataTable));
}
http://git-wip-us.apache.org/repos/asf/phoenix/blob/af0d65a0/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/index/PhoenixIndexImportMapper.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/index/PhoenixIndexImportMapper.java b/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/index/PhoenixIndexImportMapper.java
index 7bf4bfc..30f6dc0 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/index/PhoenixIndexImportMapper.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/index/PhoenixIndexImportMapper.java
@@ -73,7 +73,7 @@ public class PhoenixIndexImportMapper extends Mapper<NullWritable, PhoenixIndexD
indexTableName = PhoenixConfigurationUtil.getOutputTableName(configuration);
final Properties overrideProps = new Properties ();
overrideProps.put(PhoenixRuntime.CURRENT_SCN_ATTRIB, configuration.get(PhoenixConfigurationUtil.CURRENT_SCN_VALUE));
- connection = ConnectionUtil.getConnection(configuration,overrideProps);
+ connection = ConnectionUtil.getOutputConnection(configuration,overrideProps);
connection.setAutoCommit(false);
final String upsertQuery = PhoenixConfigurationUtil.getUpsertStatement(configuration);
this.pStatement = connection.prepareStatement(upsertQuery);
http://git-wip-us.apache.org/repos/asf/phoenix/blob/af0d65a0/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/util/ConnectionUtil.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/util/ConnectionUtil.java b/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/util/ConnectionUtil.java
index 3234967..e677104 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/util/ConnectionUtil.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/util/ConnectionUtil.java
@@ -24,49 +24,89 @@ import java.util.Iterator;
import java.util.Map;
import java.util.Properties;
+import com.google.common.base.Preconditions;
import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.hbase.HConstants;
import org.apache.phoenix.util.QueryUtil;
-import com.google.common.base.Preconditions;
-
/**
* Utility class to return a {@link Connection} .
*/
public class ConnectionUtil {
+
+
+ /**
+ * Retrieve the configured input Connection.
+ *
+ * @param conf configuration containing connection information
+ * @return the configured input connection
+ */
+ public static Connection getInputConnection(final Configuration conf) throws SQLException {
+ return getInputConnection(conf, new Properties());
+ }
/**
- * Returns the {@link Connection} from Configuration
- * @param configuration
- * @return
- * @throws SQLException
+ * Retrieve the configured input Connection.
+ *
+ * @param conf configuration containing connection information
+ * @param props custom connection properties
+ * @return the configured input connection
+ */
+ public static Connection getInputConnection(final Configuration conf , final Properties props) throws SQLException {
+ Preconditions.checkNotNull(conf);
+ return getConnection(PhoenixConfigurationUtil.getInputCluster(conf),
+ extractProperties(props, conf));
+ }
+
+ /**
+ * Create the configured output Connection.
+ *
+ * @param conf configuration containing the connection information
+ * @return the configured output connection
*/
- public static Connection getConnection(final Configuration configuration) throws SQLException {
- return getConnection(configuration, null);
+ public static Connection getOutputConnection(final Configuration conf) throws SQLException {
+ return getOutputConnection(conf, new Properties());
}
/**
- * Used primarily in cases where we need to pass few additional/overriding properties
- * @param configuration
- * @param properties
- * @return
- * @throws SQLException
+ * Create the configured output Connection.
+ *
+ * @param conf configuration containing the connection information
+ * @param props custom connection properties
+ * @return the configured output connection
+ */
+ public static Connection getOutputConnection(final Configuration conf, Properties props) throws SQLException {
+ Preconditions.checkNotNull(conf);
+ return getConnection(PhoenixConfigurationUtil.getOutputCluster(conf),
+ extractProperties(props, conf));
+ }
+
+ /**
+ * Returns the {@link Connection} from a ZooKeeper cluster string.
+ *
+ * @param quorum a ZooKeeper quorum connection string
+ * @return a Phoenix connection to the given connection string
*/
- public static Connection getConnection(final Configuration configuration , final Properties properties) throws SQLException {
- Preconditions.checkNotNull(configuration);
- final Properties props = new Properties();
- Iterator<Map.Entry<String, String>> iterator = configuration.iterator();
+ private static Connection getConnection(final String quorum, Properties props) throws SQLException {
+ Preconditions.checkNotNull(quorum);
+ return DriverManager.getConnection(QueryUtil.getUrl(quorum), props);
+ }
+
+ /**
+ * Add properties from the given Configuration to the provided Properties.
+ *
+ * @param props properties to which connection information from the Configuration will be added
+ * @param conf configuration containing connection information
+ * @return the input Properties value, with additional connection information from the
+ * given Configuration
+ */
+ private static Properties extractProperties(Properties props, final Configuration conf) {
+ Iterator<Map.Entry<String, String>> iterator = conf.iterator();
if(iterator != null) {
while (iterator.hasNext()) {
Map.Entry<String, String> entry = iterator.next();
props.setProperty(entry.getKey(), entry.getValue());
}
}
- if(properties != null && !properties.isEmpty()) {
- props.putAll(properties);
- }
- final Connection conn = DriverManager.getConnection(QueryUtil.getUrl(configuration.get(HConstants.ZOOKEEPER_QUORUM)), props);
- return conn;
+ return props;
}
-
}
http://git-wip-us.apache.org/repos/asf/phoenix/blob/af0d65a0/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/util/PhoenixConfigurationUtil.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/util/PhoenixConfigurationUtil.java b/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/util/PhoenixConfigurationUtil.java
index b8b64b2..6e0e5e4 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/util/PhoenixConfigurationUtil.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/util/PhoenixConfigurationUtil.java
@@ -17,18 +17,21 @@
*/
package org.apache.phoenix.mapreduce.util;
-import static org.apache.commons.lang.StringUtils.isNotEmpty;
-
import java.io.IOException;
import java.sql.Connection;
import java.sql.SQLException;
import java.util.List;
import java.util.Map;
+import com.google.common.base.Joiner;
+import com.google.common.base.Preconditions;
+import com.google.common.base.Splitter;
+import com.google.common.collect.Lists;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.HBaseConfiguration;
+import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.mapreduce.TableMapReduceUtil;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.db.DBInputFormat.NullDBWritable;
@@ -42,10 +45,7 @@ import org.apache.phoenix.util.ColumnInfo;
import org.apache.phoenix.util.PhoenixRuntime;
import org.apache.phoenix.util.QueryUtil;
-import com.google.common.base.Joiner;
-import com.google.common.base.Preconditions;
-import com.google.common.base.Splitter;
-import com.google.common.collect.Lists;
+import static org.apache.commons.lang.StringUtils.isNotEmpty;
/**
* A utility class to set properties on the {#link Configuration} instance.
@@ -90,7 +90,11 @@ public final class PhoenixConfigurationUtil {
/** Configuration key for the class name of an ImportPreUpsertKeyValueProcessor */
public static final String UPSERT_HOOK_CLASS_CONFKEY = "phoenix.mapreduce.import.kvprocessor";
+
+ public static final String MAPREDUCE_INPUT_CLUSTER_QUORUM = "phoenix.mapreduce.input.cluster.quorum";
+ public static final String MAPREDUCE_OUTPUT_CLUSTER_QUORUM = "phoneix.mapreduce.output.cluster.quorum";
+
public enum SchemaType {
TABLE,
QUERY;
@@ -165,6 +169,28 @@ public final class PhoenixConfigurationUtil {
configuration.setLong(UPSERT_BATCH_SIZE, batchSize);
}
+ /**
+ * Sets which HBase cluster a Phoenix MapReduce job should read from
+ * @param configuration
+ * @param quorum ZooKeeper quorum string for HBase cluster the MapReduce job will read from
+ */
+ public static void setInputCluster(final Configuration configuration,
+ final String quorum) {
+ Preconditions.checkNotNull(configuration);
+ configuration.set(MAPREDUCE_INPUT_CLUSTER_QUORUM, quorum);
+ }
+
+ /**
+ * Sets which HBase cluster a Phoenix MapReduce job should write to
+ * @param configuration
+ * @param quorum ZooKeeper quorum string for HBase cluster the MapReduce job will write to
+ */
+ public static void setOutputCluster(final Configuration configuration,
+ final String quorum) {
+ Preconditions.checkNotNull(configuration);
+ configuration.set(MAPREDUCE_OUTPUT_CLUSTER_QUORUM, quorum);
+ }
+
public static Class<?> getInputClass(final Configuration configuration) {
return configuration.getClass(INPUT_CLASS, NullDBWritable.class);
}
@@ -182,7 +208,7 @@ public final class PhoenixConfigurationUtil {
if(isNotEmpty(columnInfoStr)) {
return ColumnInfoToStringEncoderDecoder.decode(columnInfoStr);
}
- final Connection connection = ConnectionUtil.getConnection(configuration);
+ final Connection connection = ConnectionUtil.getOutputConnection(configuration);
String upsertColumns = configuration.get(UPSERT_COLUMNS);
List<String> upsertColumnList = null;
if(isNotEmpty(upsertColumns)) {
@@ -232,7 +258,7 @@ public final class PhoenixConfigurationUtil {
}
final String tableName = getInputTableName(configuration);
Preconditions.checkNotNull(tableName);
- final Connection connection = ConnectionUtil.getConnection(configuration);
+ final Connection connection = ConnectionUtil.getInputConnection(configuration);
final List<String> selectColumnList = getSelectColumnList(configuration);
final List<ColumnInfo> columnMetadataList = PhoenixRuntime.generateColumnInfo(connection, tableName, selectColumnList);
final String encodedColumnInfos = ColumnInfoToStringEncoderDecoder.encode(columnMetadataList);
@@ -276,7 +302,7 @@ public final class PhoenixConfigurationUtil {
Preconditions.checkNotNull(configuration);
long batchSize = configuration.getLong(UPSERT_BATCH_SIZE, DEFAULT_UPSERT_BATCH_SIZE);
if(batchSize <= 0) {
- Connection conn = ConnectionUtil.getConnection(configuration);
+ Connection conn = ConnectionUtil.getOutputConnection(configuration);
batchSize = ((PhoenixConnection) conn).getMutateBatchSize();
conn.close();
}
@@ -309,6 +335,34 @@ public final class PhoenixConfigurationUtil {
Preconditions.checkNotNull(configuration);
return configuration.get(OUTPUT_TABLE_NAME);
}
+
+ /**
+ * Returns the ZooKeeper quorum string for the HBase cluster a Phoenix MapReduce job will read from
+ * @param configuration
+ * @return ZooKeeper quorum string
+ */
+ public static String getInputCluster(final Configuration configuration) {
+ Preconditions.checkNotNull(configuration);
+ String quorum = configuration.get(MAPREDUCE_INPUT_CLUSTER_QUORUM);
+ if (quorum == null) {
+ quorum = configuration.get(HConstants.ZOOKEEPER_QUORUM);
+ }
+ return quorum;
+ }
+
+ /**
+ * Returns the ZooKeeper quorum string for the HBase cluster a Phoenix MapReduce job will write to
+ * @param configuration
+ * @return ZooKeeper quorum string
+ */
+ public static String getOutputCluster(final Configuration configuration) {
+ Preconditions.checkNotNull(configuration);
+ String quorum = configuration.get(MAPREDUCE_OUTPUT_CLUSTER_QUORUM);
+ if (quorum == null) {
+ quorum = configuration.get(HConstants.ZOOKEEPER_QUORUM);
+ }
+ return quorum;
+ }
public static void loadHBaseConfiguration(Job job) throws IOException {
// load hbase-site.xml
http://git-wip-us.apache.org/repos/asf/phoenix/blob/af0d65a0/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/util/PhoenixMapReduceUtil.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/util/PhoenixMapReduceUtil.java b/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/util/PhoenixMapReduceUtil.java
index f1a7f5a..74d39bd 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/util/PhoenixMapReduceUtil.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/util/PhoenixMapReduceUtil.java
@@ -49,7 +49,7 @@ public final class PhoenixMapReduceUtil {
PhoenixConfigurationUtil.setInputClass(configuration,inputClass);
PhoenixConfigurationUtil.setSchemaType(configuration, SchemaType.TABLE);
}
-
+
/**
*
* @param job
@@ -64,9 +64,19 @@ public final class PhoenixMapReduceUtil {
PhoenixConfigurationUtil.setInputQuery(configuration, inputQuery);
PhoenixConfigurationUtil.setInputClass(configuration,inputClass);
PhoenixConfigurationUtil.setSchemaType(configuration, SchemaType.QUERY);
+
}
/**
+ * A method to override which HBase cluster for {@link PhoenixInputFormat} to read from
+ * @param job MapReduce Job
+ * @param quorum an HBase cluster's ZooKeeper quorum
+ */
+ public static void setInputCluster(final Job job, final String quorum) {
+ final Configuration configuration = job.getConfiguration();
+ PhoenixConfigurationUtil.setInputCluster(configuration, quorum);
+ }
+ /**
*
* @param job
* @param outputClass
@@ -94,6 +104,16 @@ public final class PhoenixMapReduceUtil {
PhoenixConfigurationUtil.setOutputTableName(configuration, tableName);
PhoenixConfigurationUtil.setUpsertColumnNames(configuration,fieldNames);
}
+
+ /**
+ * A method to override which HBase cluster for {@link PhoenixOutputFormat} to write to
+ * @param job MapReduce Job
+ * @param quorum an HBase cluster's ZooKeeper quorum
+ */
+ public static void setOutputCluster(final Job job, final String quorum) {
+ final Configuration configuration = job.getConfiguration();
+ PhoenixConfigurationUtil.setOutputCluster(configuration, quorum);
+ }
}
http://git-wip-us.apache.org/repos/asf/phoenix/blob/af0d65a0/phoenix-core/src/test/java/org/apache/phoenix/mapreduce/util/PhoenixConfigurationUtilTest.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/test/java/org/apache/phoenix/mapreduce/util/PhoenixConfigurationUtilTest.java b/phoenix-core/src/test/java/org/apache/phoenix/mapreduce/util/PhoenixConfigurationUtilTest.java
index 33c7531..f8f2a63 100644
--- a/phoenix-core/src/test/java/org/apache/phoenix/mapreduce/util/PhoenixConfigurationUtilTest.java
+++ b/phoenix-core/src/test/java/org/apache/phoenix/mapreduce/util/PhoenixConfigurationUtilTest.java
@@ -23,13 +23,12 @@ import static org.junit.Assert.assertEquals;
import java.sql.Connection;
import java.sql.DriverManager;
-import java.util.List;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.HConstants;
+import org.apache.hadoop.mapreduce.Job;
import org.apache.phoenix.mapreduce.util.PhoenixConfigurationUtil.SchemaType;
import org.apache.phoenix.query.BaseConnectionlessQueryTest;
-import org.apache.phoenix.util.ColumnInfo;
import org.apache.phoenix.util.PropertiesUtil;
import org.apache.phoenix.util.SchemaUtil;
import org.apache.phoenix.util.TestUtil;
@@ -39,7 +38,8 @@ import org.junit.Test;
* Test for {@link PhoenixConfigurationUtil}
*/
public class PhoenixConfigurationUtilTest extends BaseConnectionlessQueryTest {
-
+ private static final String ORIGINAL_CLUSTER_QUORUM = "myzookeeperhost";
+ private static final String OVERRIDE_CLUSTER_QUORUM = "myoverridezookeeperhost";
@Test
public void testUpsertStatement() throws Exception {
Connection conn = DriverManager.getConnection(getUrl(), PropertiesUtil.deepCopy(TestUtil.TEST_PROPERTIES));
@@ -121,4 +121,58 @@ public class PhoenixConfigurationUtilTest extends BaseConnectionlessQueryTest {
conn.close();
}
}
+
+ @Test
+ public void testInputClusterOverride() throws Exception {
+ final Configuration configuration = new Configuration();
+ configuration.set(HConstants.ZOOKEEPER_QUORUM, ORIGINAL_CLUSTER_QUORUM);
+ String zkQuorum = PhoenixConfigurationUtil.getInputCluster(configuration);
+ assertEquals(zkQuorum, ORIGINAL_CLUSTER_QUORUM);
+
+ configuration.set(PhoenixConfigurationUtil.MAPREDUCE_INPUT_CLUSTER_QUORUM,
+ OVERRIDE_CLUSTER_QUORUM);
+ String zkQuorumOverride = PhoenixConfigurationUtil.getInputCluster(configuration);
+ assertEquals(zkQuorumOverride, OVERRIDE_CLUSTER_QUORUM);
+
+ final Configuration configuration2 = new Configuration();
+ PhoenixConfigurationUtil.setInputCluster(configuration2, OVERRIDE_CLUSTER_QUORUM);
+ String zkQuorumOverride2 =
+ PhoenixConfigurationUtil.getInputCluster(configuration2);
+ assertEquals(zkQuorumOverride2, OVERRIDE_CLUSTER_QUORUM);
+
+ final Job job = Job.getInstance();
+ PhoenixMapReduceUtil.setInputCluster(job, OVERRIDE_CLUSTER_QUORUM);
+ Configuration configuration3 = job.getConfiguration();
+ String zkQuorumOverride3 =
+ PhoenixConfigurationUtil.getInputCluster(configuration3);
+ assertEquals(zkQuorumOverride3, OVERRIDE_CLUSTER_QUORUM);
+
+ }
+
+ @Test
+ public void testOutputClusterOverride() throws Exception {
+ final Configuration configuration = new Configuration();
+ configuration.set(HConstants.ZOOKEEPER_QUORUM, ORIGINAL_CLUSTER_QUORUM);
+ String zkQuorum = PhoenixConfigurationUtil.getOutputCluster(configuration);
+ assertEquals(zkQuorum, ORIGINAL_CLUSTER_QUORUM);
+
+ configuration.set(PhoenixConfigurationUtil.MAPREDUCE_OUTPUT_CLUSTER_QUORUM,
+ OVERRIDE_CLUSTER_QUORUM);
+ String zkQuorumOverride = PhoenixConfigurationUtil.getOutputCluster(configuration);
+ assertEquals(zkQuorumOverride, OVERRIDE_CLUSTER_QUORUM);
+
+ final Configuration configuration2 = new Configuration();
+ PhoenixConfigurationUtil.setOutputCluster(configuration2, OVERRIDE_CLUSTER_QUORUM);
+ String zkQuorumOverride2 =
+ PhoenixConfigurationUtil.getOutputCluster(configuration2);
+ assertEquals(zkQuorumOverride2, OVERRIDE_CLUSTER_QUORUM);
+
+ final Job job = Job.getInstance();
+ PhoenixMapReduceUtil.setOutputCluster(job, OVERRIDE_CLUSTER_QUORUM);
+ Configuration configuration3 = job.getConfiguration();
+ String zkQuorumOverride3 =
+ PhoenixConfigurationUtil.getOutputCluster(configuration3);
+ assertEquals(zkQuorumOverride3, OVERRIDE_CLUSTER_QUORUM);
+
+ }
}
http://git-wip-us.apache.org/repos/asf/phoenix/blob/af0d65a0/phoenix-pig/src/main/java/org/apache/phoenix/pig/util/QuerySchemaParserFunction.java
----------------------------------------------------------------------
diff --git a/phoenix-pig/src/main/java/org/apache/phoenix/pig/util/QuerySchemaParserFunction.java b/phoenix-pig/src/main/java/org/apache/phoenix/pig/util/QuerySchemaParserFunction.java
index f0148a6..4f43811 100644
--- a/phoenix-pig/src/main/java/org/apache/phoenix/pig/util/QuerySchemaParserFunction.java
+++ b/phoenix-pig/src/main/java/org/apache/phoenix/pig/util/QuerySchemaParserFunction.java
@@ -59,7 +59,7 @@ public class QuerySchemaParserFunction implements Function<String,Pair<String,St
Preconditions.checkArgument(!selectStatement.isEmpty(), "Select Query is empty!!");
Connection connection = null;
try {
- connection = ConnectionUtil.getConnection(this.configuration);
+ connection = ConnectionUtil.getInputConnection(this.configuration);
final Statement statement = connection.createStatement();
final PhoenixStatement pstmt = statement.unwrap(PhoenixStatement.class);
final QueryPlan queryPlan = pstmt.compileQuery(selectStatement);
http://git-wip-us.apache.org/repos/asf/phoenix/blob/af0d65a0/phoenix-pig/src/main/java/org/apache/phoenix/pig/util/SqlQueryToColumnInfoFunction.java
----------------------------------------------------------------------
diff --git a/phoenix-pig/src/main/java/org/apache/phoenix/pig/util/SqlQueryToColumnInfoFunction.java b/phoenix-pig/src/main/java/org/apache/phoenix/pig/util/SqlQueryToColumnInfoFunction.java
index 3ed35bb..2ea2c06 100644
--- a/phoenix-pig/src/main/java/org/apache/phoenix/pig/util/SqlQueryToColumnInfoFunction.java
+++ b/phoenix-pig/src/main/java/org/apache/phoenix/pig/util/SqlQueryToColumnInfoFunction.java
@@ -52,7 +52,7 @@ public final class SqlQueryToColumnInfoFunction implements Function<String,List<
Connection connection = null;
List<ColumnInfo> columnInfos = null;
try {
- connection = ConnectionUtil.getConnection(this.configuration);
+ connection = ConnectionUtil.getInputConnection(this.configuration);
final Statement statement = connection.createStatement();
final PhoenixStatement pstmt = statement.unwrap(PhoenixStatement.class);
final QueryPlan queryPlan = pstmt.compileQuery(sqlQuery);