You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@phoenix.apache.org by gr...@apache.org on 2015/03/24 20:12:09 UTC

phoenix git commit: PHOENIX-1653 Support separate clusters for MR jobs

Repository: phoenix
Updated Branches:
  refs/heads/4.x-HBase-0.98 c92de2749 -> af0d65a0a


PHOENIX-1653 Support separate clusters for MR jobs

Add support for the input and output formats of a Phoenix MapReduce job to
point to separate clusters using override configuration settings. Defaults to
existing behavior (HConstants.ZOOKEEPER_QUORUM)


Project: http://git-wip-us.apache.org/repos/asf/phoenix/repo
Commit: http://git-wip-us.apache.org/repos/asf/phoenix/commit/af0d65a0
Tree: http://git-wip-us.apache.org/repos/asf/phoenix/tree/af0d65a0
Diff: http://git-wip-us.apache.org/repos/asf/phoenix/diff/af0d65a0

Branch: refs/heads/4.x-HBase-0.98
Commit: af0d65a0abdbb8dc73d766a15e3ffa0e6d854d13
Parents: c92de27
Author: gjacoby <gj...@salesforce.com>
Authored: Fri Feb 27 16:49:14 2015 -0800
Committer: Gabriel Reid <ga...@ngdata.com>
Committed: Tue Mar 24 19:40:21 2015 +0100

----------------------------------------------------------------------
 .../phoenix/mapreduce/PhoenixInputFormat.java   | 15 ++--
 .../phoenix/mapreduce/PhoenixRecordWriter.java  |  2 +-
 .../phoenix/mapreduce/index/IndexTool.java      |  2 +-
 .../index/PhoenixIndexImportMapper.java         |  2 +-
 .../phoenix/mapreduce/util/ConnectionUtil.java  | 88 ++++++++++++++------
 .../util/PhoenixConfigurationUtil.java          | 72 ++++++++++++++--
 .../mapreduce/util/PhoenixMapReduceUtil.java    | 22 ++++-
 .../util/PhoenixConfigurationUtilTest.java      | 60 ++++++++++++-
 .../pig/util/QuerySchemaParserFunction.java     |  2 +-
 .../pig/util/SqlQueryToColumnInfoFunction.java  |  2 +-
 10 files changed, 219 insertions(+), 48 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/phoenix/blob/af0d65a0/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/PhoenixInputFormat.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/PhoenixInputFormat.java b/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/PhoenixInputFormat.java
index a83b9ae..31759b4 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/PhoenixInputFormat.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/PhoenixInputFormat.java
@@ -98,15 +98,16 @@ public class PhoenixInputFormat<T extends DBWritable> extends InputFormat<NullWr
      * @throws IOException
      * @throws SQLException
      */
-    private QueryPlan getQueryPlan(final JobContext context,final Configuration configuration) throws IOException {
+    private QueryPlan getQueryPlan(final JobContext context, final Configuration configuration)
+            throws IOException {
         Preconditions.checkNotNull(context);
-        try{
+        try {
             final String currentScnValue = configuration.get(PhoenixConfigurationUtil.CURRENT_SCN_VALUE);
             final Properties overridingProps = new Properties();
             if(currentScnValue != null) {
                 overridingProps.put(PhoenixRuntime.CURRENT_SCN_ATTRIB, currentScnValue);
             }
-            final Connection connection = ConnectionUtil.getConnection(configuration,overridingProps);
+            final Connection connection = ConnectionUtil.getInputConnection(configuration, overridingProps);
             final String selectStatement = PhoenixConfigurationUtil.getSelectStatement(configuration);
             Preconditions.checkNotNull(selectStatement);
             final Statement statement = connection.createStatement();
@@ -116,9 +117,11 @@ public class PhoenixInputFormat<T extends DBWritable> extends InputFormat<NullWr
             // Initialize the query plan so it sets up the parallel scans
             queryPlan.iterator();
             return queryPlan;
-        } catch(Exception exception) {
-            LOG.error(String.format("Failed to get the query plan with error [%s]",exception.getMessage()));
+        } catch (Exception exception) {
+            LOG.error(String.format("Failed to get the query plan with error [%s]",
+                exception.getMessage()));
             throw new RuntimeException(exception);
         }
-   }
+    }
+
 }

http://git-wip-us.apache.org/repos/asf/phoenix/blob/af0d65a0/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/PhoenixRecordWriter.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/PhoenixRecordWriter.java b/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/PhoenixRecordWriter.java
index 4d26bf4..5843076 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/PhoenixRecordWriter.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/PhoenixRecordWriter.java
@@ -46,7 +46,7 @@ public class PhoenixRecordWriter<T extends DBWritable>  extends RecordWriter<Nul
     private long numRecords = 0;
     
     public PhoenixRecordWriter(final Configuration configuration) throws SQLException {
-        this.conn = ConnectionUtil.getConnection(configuration);
+        this.conn = ConnectionUtil.getOutputConnection(configuration);
         this.batchSize = PhoenixConfigurationUtil.getBatchSize(configuration);
         final String upsertQuery = PhoenixConfigurationUtil.getUpsertStatement(configuration);
         this.statement = this.conn.prepareStatement(upsertQuery);

http://git-wip-us.apache.org/repos/asf/phoenix/blob/af0d65a0/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/index/IndexTool.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/index/IndexTool.java b/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/index/IndexTool.java
index d93ef9c..300f575 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/index/IndexTool.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/index/IndexTool.java
@@ -163,7 +163,7 @@ public class IndexTool extends Configured implements Tool {
             final String qDataTable = SchemaUtil.getTableName(schemaName, dataTable);
             final String qIndexTable = SchemaUtil.getTableName(schemaName, indexTable);
          
-            connection = ConnectionUtil.getConnection(configuration);
+            connection = ConnectionUtil.getInputConnection(configuration);
             if(!isValidIndexTable(connection, dataTable, indexTable)) {
                 throw new IllegalArgumentException(String.format(" %s is not an index table for %s ",qIndexTable,qDataTable));
             }

http://git-wip-us.apache.org/repos/asf/phoenix/blob/af0d65a0/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/index/PhoenixIndexImportMapper.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/index/PhoenixIndexImportMapper.java b/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/index/PhoenixIndexImportMapper.java
index 7bf4bfc..30f6dc0 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/index/PhoenixIndexImportMapper.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/index/PhoenixIndexImportMapper.java
@@ -73,7 +73,7 @@ public class PhoenixIndexImportMapper extends Mapper<NullWritable, PhoenixIndexD
             indexTableName = PhoenixConfigurationUtil.getOutputTableName(configuration);
             final Properties overrideProps = new Properties ();
             overrideProps.put(PhoenixRuntime.CURRENT_SCN_ATTRIB, configuration.get(PhoenixConfigurationUtil.CURRENT_SCN_VALUE));
-            connection = ConnectionUtil.getConnection(configuration,overrideProps);
+            connection = ConnectionUtil.getOutputConnection(configuration,overrideProps);
             connection.setAutoCommit(false);
             final String upsertQuery = PhoenixConfigurationUtil.getUpsertStatement(configuration);
             this.pStatement = connection.prepareStatement(upsertQuery);

http://git-wip-us.apache.org/repos/asf/phoenix/blob/af0d65a0/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/util/ConnectionUtil.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/util/ConnectionUtil.java b/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/util/ConnectionUtil.java
index 3234967..e677104 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/util/ConnectionUtil.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/util/ConnectionUtil.java
@@ -24,49 +24,89 @@ import java.util.Iterator;
 import java.util.Map;
 import java.util.Properties;
 
+import com.google.common.base.Preconditions;
 import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.hbase.HConstants;
 import org.apache.phoenix.util.QueryUtil;
 
-import com.google.common.base.Preconditions;
-
 /**
  * Utility class to return a {@link Connection} .
  */
 public class ConnectionUtil {
+
+
+    /**
+     * Retrieve the configured input Connection.
+     *
+     * @param conf configuration containing connection information
+     * @return the configured input connection
+     */
+    public static Connection getInputConnection(final Configuration conf) throws SQLException {
+        return getInputConnection(conf, new Properties());
+    }
     
     /**
-     * Returns the {@link Connection} from Configuration
-     * @param configuration
-     * @return
-     * @throws SQLException
+     * Retrieve the configured input Connection.
+     *
+     * @param conf configuration containing connection information
+     * @param props custom connection properties
+     * @return the configured input connection
+     */
+    public static Connection getInputConnection(final Configuration conf , final Properties props) throws SQLException {
+        Preconditions.checkNotNull(conf);
+        return getConnection(PhoenixConfigurationUtil.getInputCluster(conf),
+                extractProperties(props, conf));
+    }
+
+    /**
+     * Create the configured output Connection.
+     *
+     * @param conf configuration containing the connection information
+     * @return the configured output connection
      */
-    public static Connection getConnection(final Configuration configuration) throws SQLException {
-        return getConnection(configuration, null);
+    public static Connection getOutputConnection(final Configuration conf) throws SQLException {
+        return getOutputConnection(conf, new Properties());
     }
     
     /**
-     * Used primarily in cases where we need to pass few additional/overriding properties 
-     * @param configuration
-     * @param properties
-     * @return
-     * @throws SQLException
+     * Create the configured output Connection.
+     *
+     * @param conf configuration containing the connection information
+     * @param props custom connection properties
+     * @return the configured output connection
+     */
+    public static Connection getOutputConnection(final Configuration conf, Properties props) throws SQLException {
+        Preconditions.checkNotNull(conf);
+        return getConnection(PhoenixConfigurationUtil.getOutputCluster(conf),
+                extractProperties(props, conf));
+    }
+
+    /**
+     * Returns the {@link Connection} from a ZooKeeper cluster string.
+     *
+     * @param quorum a ZooKeeper quorum connection string
+     * @return a Phoenix connection to the given connection string
      */
-    public static Connection getConnection(final Configuration configuration , final Properties properties) throws SQLException {
-        Preconditions.checkNotNull(configuration);
-        final Properties props = new Properties();
-        Iterator<Map.Entry<String, String>> iterator = configuration.iterator();
+    private static Connection getConnection(final String quorum, Properties props) throws SQLException {
+        Preconditions.checkNotNull(quorum);
+        return DriverManager.getConnection(QueryUtil.getUrl(quorum), props);
+    }
+
+    /**
+     * Add properties from the given Configuration to the provided Properties.
+     *
+     * @param props properties to which connection information from the Configuration will be added
+     * @param conf configuration containing connection information
+     * @return the input Properties value, with additional connection information from the
+     * given Configuration
+     */
+    private static Properties extractProperties(Properties props, final Configuration conf) {
+        Iterator<Map.Entry<String, String>> iterator = conf.iterator();
         if(iterator != null) {
             while (iterator.hasNext()) {
                 Map.Entry<String, String> entry = iterator.next();
                 props.setProperty(entry.getKey(), entry.getValue());
             }
         }
-        if(properties != null && !properties.isEmpty()) {
-            props.putAll(properties);
-        }
-        final Connection conn = DriverManager.getConnection(QueryUtil.getUrl(configuration.get(HConstants.ZOOKEEPER_QUORUM)), props);
-        return conn;
+        return props;
     }
-
 }

http://git-wip-us.apache.org/repos/asf/phoenix/blob/af0d65a0/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/util/PhoenixConfigurationUtil.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/util/PhoenixConfigurationUtil.java b/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/util/PhoenixConfigurationUtil.java
index b8b64b2..6e0e5e4 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/util/PhoenixConfigurationUtil.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/util/PhoenixConfigurationUtil.java
@@ -17,18 +17,21 @@
  */
 package org.apache.phoenix.mapreduce.util;
 
-import static org.apache.commons.lang.StringUtils.isNotEmpty;
-
 import java.io.IOException;
 import java.sql.Connection;
 import java.sql.SQLException;
 import java.util.List;
 import java.util.Map;
 
+import com.google.common.base.Joiner;
+import com.google.common.base.Preconditions;
+import com.google.common.base.Splitter;
+import com.google.common.collect.Lists;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.hbase.HBaseConfiguration;
+import org.apache.hadoop.hbase.HConstants;
 import org.apache.hadoop.hbase.mapreduce.TableMapReduceUtil;
 import org.apache.hadoop.mapreduce.Job;
 import org.apache.hadoop.mapreduce.lib.db.DBInputFormat.NullDBWritable;
@@ -42,10 +45,7 @@ import org.apache.phoenix.util.ColumnInfo;
 import org.apache.phoenix.util.PhoenixRuntime;
 import org.apache.phoenix.util.QueryUtil;
 
-import com.google.common.base.Joiner;
-import com.google.common.base.Preconditions;
-import com.google.common.base.Splitter;
-import com.google.common.collect.Lists;
+import static org.apache.commons.lang.StringUtils.isNotEmpty;
 
 /**
  * A utility class to set properties on the {#link Configuration} instance.
@@ -90,7 +90,11 @@ public final class PhoenixConfigurationUtil {
     
     /** Configuration key for the class name of an ImportPreUpsertKeyValueProcessor */
     public static final String UPSERT_HOOK_CLASS_CONFKEY = "phoenix.mapreduce.import.kvprocessor";
+
+    public static final String MAPREDUCE_INPUT_CLUSTER_QUORUM = "phoenix.mapreduce.input.cluster.quorum";
     
+    public static final String MAPREDUCE_OUTPUT_CLUSTER_QUORUM = "phoneix.mapreduce.output.cluster.quorum";
+
     public enum SchemaType {
         TABLE,
         QUERY;
@@ -165,6 +169,28 @@ public final class PhoenixConfigurationUtil {
         configuration.setLong(UPSERT_BATCH_SIZE, batchSize);
     }
     
+    /**
+     * Sets which HBase cluster a Phoenix MapReduce job should read from
+     * @param configuration
+     * @param quorum ZooKeeper quorum string for HBase cluster the MapReduce job will read from
+     */
+    public static void setInputCluster(final Configuration configuration,
+            final String quorum) {
+        Preconditions.checkNotNull(configuration);
+        configuration.set(MAPREDUCE_INPUT_CLUSTER_QUORUM, quorum);
+    }
+
+    /**
+     * Sets which HBase cluster a Phoenix MapReduce job should write to
+     * @param configuration
+     * @param quorum ZooKeeper quorum string for HBase cluster the MapReduce job will write to
+     */
+    public static void setOutputCluster(final Configuration configuration,
+            final String quorum) {
+        Preconditions.checkNotNull(configuration);
+        configuration.set(MAPREDUCE_OUTPUT_CLUSTER_QUORUM, quorum);
+    }
+        
     public static Class<?> getInputClass(final Configuration configuration) {
         return configuration.getClass(INPUT_CLASS, NullDBWritable.class);
     }
@@ -182,7 +208,7 @@ public final class PhoenixConfigurationUtil {
         if(isNotEmpty(columnInfoStr)) {
             return ColumnInfoToStringEncoderDecoder.decode(columnInfoStr);
         }
-        final Connection connection = ConnectionUtil.getConnection(configuration);
+        final Connection connection = ConnectionUtil.getOutputConnection(configuration);
         String upsertColumns = configuration.get(UPSERT_COLUMNS);
         List<String> upsertColumnList = null;
         if(isNotEmpty(upsertColumns)) {
@@ -232,7 +258,7 @@ public final class PhoenixConfigurationUtil {
         }
         final String tableName = getInputTableName(configuration);
         Preconditions.checkNotNull(tableName);
-        final Connection connection = ConnectionUtil.getConnection(configuration);
+        final Connection connection = ConnectionUtil.getInputConnection(configuration);
         final List<String> selectColumnList = getSelectColumnList(configuration);
         final List<ColumnInfo> columnMetadataList = PhoenixRuntime.generateColumnInfo(connection, tableName, selectColumnList);
         final String encodedColumnInfos = ColumnInfoToStringEncoderDecoder.encode(columnMetadataList);
@@ -276,7 +302,7 @@ public final class PhoenixConfigurationUtil {
         Preconditions.checkNotNull(configuration);
         long batchSize = configuration.getLong(UPSERT_BATCH_SIZE, DEFAULT_UPSERT_BATCH_SIZE);
         if(batchSize <= 0) {
-           Connection conn = ConnectionUtil.getConnection(configuration);
+           Connection conn = ConnectionUtil.getOutputConnection(configuration);
            batchSize = ((PhoenixConnection) conn).getMutateBatchSize();
            conn.close();
         }
@@ -309,6 +335,34 @@ public final class PhoenixConfigurationUtil {
         Preconditions.checkNotNull(configuration);
         return configuration.get(OUTPUT_TABLE_NAME);
     }
+    
+    /**
+     * Returns the ZooKeeper quorum string for the HBase cluster a Phoenix MapReduce job will read from
+     * @param configuration
+     * @return ZooKeeper quorum string
+     */
+    public static String getInputCluster(final Configuration configuration) {
+        Preconditions.checkNotNull(configuration);
+        String quorum = configuration.get(MAPREDUCE_INPUT_CLUSTER_QUORUM);
+        if (quorum == null) {
+            quorum = configuration.get(HConstants.ZOOKEEPER_QUORUM);
+        }
+        return quorum;
+    }
+
+    /**
+     * Returns the ZooKeeper quorum string for the HBase cluster a Phoenix MapReduce job will write to
+     * @param configuration
+     * @return ZooKeeper quorum string
+     */
+    public static String getOutputCluster(final Configuration configuration) {
+        Preconditions.checkNotNull(configuration);
+        String quorum = configuration.get(MAPREDUCE_OUTPUT_CLUSTER_QUORUM);
+        if (quorum == null) {
+            quorum = configuration.get(HConstants.ZOOKEEPER_QUORUM);
+        }
+        return quorum;
+    }
 
     public static void loadHBaseConfiguration(Job job) throws IOException {
         // load hbase-site.xml

http://git-wip-us.apache.org/repos/asf/phoenix/blob/af0d65a0/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/util/PhoenixMapReduceUtil.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/util/PhoenixMapReduceUtil.java b/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/util/PhoenixMapReduceUtil.java
index f1a7f5a..74d39bd 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/util/PhoenixMapReduceUtil.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/util/PhoenixMapReduceUtil.java
@@ -49,7 +49,7 @@ public final class PhoenixMapReduceUtil {
           PhoenixConfigurationUtil.setInputClass(configuration,inputClass);
           PhoenixConfigurationUtil.setSchemaType(configuration, SchemaType.TABLE);
     }
-    
+       
     /**
      * 
      * @param job         
@@ -64,9 +64,19 @@ public final class PhoenixMapReduceUtil {
           PhoenixConfigurationUtil.setInputQuery(configuration, inputQuery);
           PhoenixConfigurationUtil.setInputClass(configuration,inputClass);
           PhoenixConfigurationUtil.setSchemaType(configuration, SchemaType.QUERY);
+          
      }
     
     /**
+     * A method to override which HBase cluster for {@link PhoenixInputFormat} to read from
+     * @param job MapReduce Job
+     * @param quorum an HBase cluster's ZooKeeper quorum
+     */
+    public static void setInputCluster(final Job job, final String quorum) {
+        final Configuration configuration = job.getConfiguration();
+        PhoenixConfigurationUtil.setInputCluster(configuration, quorum);
+    }
+    /**
      * 
      * @param job
      * @param outputClass  
@@ -94,6 +104,16 @@ public final class PhoenixMapReduceUtil {
           PhoenixConfigurationUtil.setOutputTableName(configuration, tableName);
           PhoenixConfigurationUtil.setUpsertColumnNames(configuration,fieldNames);
     }
+    
+    /**
+     * A method to override which HBase cluster for {@link PhoenixOutputFormat} to write to
+     * @param job MapReduce Job
+     * @param quorum an HBase cluster's ZooKeeper quorum
+     */
+    public static void setOutputCluster(final Job job, final String quorum) {
+        final Configuration configuration = job.getConfiguration();
+        PhoenixConfigurationUtil.setOutputCluster(configuration, quorum);
+    }
 
     
 }

http://git-wip-us.apache.org/repos/asf/phoenix/blob/af0d65a0/phoenix-core/src/test/java/org/apache/phoenix/mapreduce/util/PhoenixConfigurationUtilTest.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/test/java/org/apache/phoenix/mapreduce/util/PhoenixConfigurationUtilTest.java b/phoenix-core/src/test/java/org/apache/phoenix/mapreduce/util/PhoenixConfigurationUtilTest.java
index 33c7531..f8f2a63 100644
--- a/phoenix-core/src/test/java/org/apache/phoenix/mapreduce/util/PhoenixConfigurationUtilTest.java
+++ b/phoenix-core/src/test/java/org/apache/phoenix/mapreduce/util/PhoenixConfigurationUtilTest.java
@@ -23,13 +23,12 @@ import static org.junit.Assert.assertEquals;
 
 import java.sql.Connection;
 import java.sql.DriverManager;
-import java.util.List;
 
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.hbase.HConstants;
+import org.apache.hadoop.mapreduce.Job;
 import org.apache.phoenix.mapreduce.util.PhoenixConfigurationUtil.SchemaType;
 import org.apache.phoenix.query.BaseConnectionlessQueryTest;
-import org.apache.phoenix.util.ColumnInfo;
 import org.apache.phoenix.util.PropertiesUtil;
 import org.apache.phoenix.util.SchemaUtil;
 import org.apache.phoenix.util.TestUtil;
@@ -39,7 +38,8 @@ import org.junit.Test;
  * Test for {@link PhoenixConfigurationUtil}
  */
 public class PhoenixConfigurationUtilTest extends BaseConnectionlessQueryTest {
-    
+    private static final String ORIGINAL_CLUSTER_QUORUM = "myzookeeperhost";
+    private static final String OVERRIDE_CLUSTER_QUORUM = "myoverridezookeeperhost";
     @Test
     public void testUpsertStatement() throws Exception {
         Connection conn = DriverManager.getConnection(getUrl(), PropertiesUtil.deepCopy(TestUtil.TEST_PROPERTIES));
@@ -121,4 +121,58 @@ public class PhoenixConfigurationUtilTest extends BaseConnectionlessQueryTest {
             conn.close();
         }
     }
+    
+    @Test
+    public void testInputClusterOverride() throws Exception {
+        final Configuration configuration = new Configuration();
+        configuration.set(HConstants.ZOOKEEPER_QUORUM, ORIGINAL_CLUSTER_QUORUM);
+        String zkQuorum = PhoenixConfigurationUtil.getInputCluster(configuration);
+        assertEquals(zkQuorum, ORIGINAL_CLUSTER_QUORUM);
+
+        configuration.set(PhoenixConfigurationUtil.MAPREDUCE_INPUT_CLUSTER_QUORUM,
+            OVERRIDE_CLUSTER_QUORUM);
+        String zkQuorumOverride = PhoenixConfigurationUtil.getInputCluster(configuration);
+        assertEquals(zkQuorumOverride, OVERRIDE_CLUSTER_QUORUM);
+
+        final Configuration configuration2 = new Configuration();
+        PhoenixConfigurationUtil.setInputCluster(configuration2, OVERRIDE_CLUSTER_QUORUM);
+        String zkQuorumOverride2 =
+                PhoenixConfigurationUtil.getInputCluster(configuration2);
+        assertEquals(zkQuorumOverride2, OVERRIDE_CLUSTER_QUORUM);
+
+        final Job job = Job.getInstance();
+        PhoenixMapReduceUtil.setInputCluster(job, OVERRIDE_CLUSTER_QUORUM);
+        Configuration configuration3 = job.getConfiguration();
+        String zkQuorumOverride3 =
+                PhoenixConfigurationUtil.getInputCluster(configuration3);
+        assertEquals(zkQuorumOverride3, OVERRIDE_CLUSTER_QUORUM);
+
+    }
+
+    @Test
+    public void testOutputClusterOverride() throws Exception {
+        final Configuration configuration = new Configuration();
+        configuration.set(HConstants.ZOOKEEPER_QUORUM, ORIGINAL_CLUSTER_QUORUM);
+        String zkQuorum = PhoenixConfigurationUtil.getOutputCluster(configuration);
+        assertEquals(zkQuorum, ORIGINAL_CLUSTER_QUORUM);
+
+        configuration.set(PhoenixConfigurationUtil.MAPREDUCE_OUTPUT_CLUSTER_QUORUM,
+            OVERRIDE_CLUSTER_QUORUM);
+        String zkQuorumOverride = PhoenixConfigurationUtil.getOutputCluster(configuration);
+        assertEquals(zkQuorumOverride, OVERRIDE_CLUSTER_QUORUM);
+
+        final Configuration configuration2 = new Configuration();
+        PhoenixConfigurationUtil.setOutputCluster(configuration2, OVERRIDE_CLUSTER_QUORUM);
+        String zkQuorumOverride2 =
+                PhoenixConfigurationUtil.getOutputCluster(configuration2);
+        assertEquals(zkQuorumOverride2, OVERRIDE_CLUSTER_QUORUM);
+
+        final Job job = Job.getInstance();
+        PhoenixMapReduceUtil.setOutputCluster(job, OVERRIDE_CLUSTER_QUORUM);
+        Configuration configuration3 = job.getConfiguration();
+        String zkQuorumOverride3 =
+                PhoenixConfigurationUtil.getOutputCluster(configuration3);
+        assertEquals(zkQuorumOverride3, OVERRIDE_CLUSTER_QUORUM);
+
+    }
 }

http://git-wip-us.apache.org/repos/asf/phoenix/blob/af0d65a0/phoenix-pig/src/main/java/org/apache/phoenix/pig/util/QuerySchemaParserFunction.java
----------------------------------------------------------------------
diff --git a/phoenix-pig/src/main/java/org/apache/phoenix/pig/util/QuerySchemaParserFunction.java b/phoenix-pig/src/main/java/org/apache/phoenix/pig/util/QuerySchemaParserFunction.java
index f0148a6..4f43811 100644
--- a/phoenix-pig/src/main/java/org/apache/phoenix/pig/util/QuerySchemaParserFunction.java
+++ b/phoenix-pig/src/main/java/org/apache/phoenix/pig/util/QuerySchemaParserFunction.java
@@ -59,7 +59,7 @@ public class QuerySchemaParserFunction implements Function<String,Pair<String,St
         Preconditions.checkArgument(!selectStatement.isEmpty(), "Select Query is empty!!");
         Connection connection = null;
         try {
-            connection = ConnectionUtil.getConnection(this.configuration);
+            connection = ConnectionUtil.getInputConnection(this.configuration);
             final Statement  statement = connection.createStatement();
             final PhoenixStatement pstmt = statement.unwrap(PhoenixStatement.class);
             final QueryPlan queryPlan = pstmt.compileQuery(selectStatement);

http://git-wip-us.apache.org/repos/asf/phoenix/blob/af0d65a0/phoenix-pig/src/main/java/org/apache/phoenix/pig/util/SqlQueryToColumnInfoFunction.java
----------------------------------------------------------------------
diff --git a/phoenix-pig/src/main/java/org/apache/phoenix/pig/util/SqlQueryToColumnInfoFunction.java b/phoenix-pig/src/main/java/org/apache/phoenix/pig/util/SqlQueryToColumnInfoFunction.java
index 3ed35bb..2ea2c06 100644
--- a/phoenix-pig/src/main/java/org/apache/phoenix/pig/util/SqlQueryToColumnInfoFunction.java
+++ b/phoenix-pig/src/main/java/org/apache/phoenix/pig/util/SqlQueryToColumnInfoFunction.java
@@ -52,7 +52,7 @@ public final class SqlQueryToColumnInfoFunction implements Function<String,List<
         Connection connection = null;
         List<ColumnInfo> columnInfos = null;
         try {
-            connection = ConnectionUtil.getConnection(this.configuration);
+            connection = ConnectionUtil.getInputConnection(this.configuration);
             final Statement  statement = connection.createStatement();
             final PhoenixStatement pstmt = statement.unwrap(PhoenixStatement.class);
             final QueryPlan queryPlan = pstmt.compileQuery(sqlQuery);