You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@phoenix.apache.org by ra...@apache.org on 2014/12/10 01:51:49 UTC

[3/3] phoenix git commit: PHOENIX-1454 Map-Reduce-over-Phoenix-Tables

PHOENIX-1454 Map-Reduce-over-Phoenix-Tables


Project: http://git-wip-us.apache.org/repos/asf/phoenix/repo
Commit: http://git-wip-us.apache.org/repos/asf/phoenix/commit/cabb16f7
Tree: http://git-wip-us.apache.org/repos/asf/phoenix/tree/cabb16f7
Diff: http://git-wip-us.apache.org/repos/asf/phoenix/diff/cabb16f7

Branch: refs/heads/3.0
Commit: cabb16f7d08a4d9f5e0ae5027b1f9810edd69c1d
Parents: d6f70d2
Author: ravimagham <ra...@apache.org>
Authored: Tue Dec 9 16:51:22 2014 -0800
Committer: ravimagham <ra...@apache.org>
Committed: Tue Dec 9 16:51:22 2014 -0800

----------------------------------------------------------------------
 .../phoenix/mapreduce/PhoenixInputFormat.java   | 117 +++++++
 .../phoenix/mapreduce/PhoenixInputSplit.java    | 122 +++++++
 .../mapreduce/PhoenixOutputCommitter.java       |  54 +++
 .../phoenix/mapreduce/PhoenixOutputFormat.java  |  62 ++++
 .../phoenix/mapreduce/PhoenixRecordReader.java  | 140 ++++++++
 .../phoenix/mapreduce/PhoenixRecordWriter.java  |  91 +++++
 .../util/ColumnInfoToStringEncoderDecoder.java  |  65 ++++
 .../phoenix/mapreduce/util/ConnectionUtil.java  |  49 +++
 .../util/PhoenixConfigurationUtil.java          | 299 ++++++++++++++++
 .../mapreduce/util/PhoenixMapReduceUtil.java    |  99 ++++++
 .../org/apache/phoenix/util/PhoenixRuntime.java |   4 +-
 .../java/org/apache/phoenix/util/QueryUtil.java |  26 +-
 .../ColumnInfoToStringEncoderDecoderTest.java   |  61 ++++
 .../util/PhoenixConfigurationUtilTest.java      | 124 +++++++
 .../org/apache/phoenix/util/QueryUtilTest.java  |   2 +-
 .../phoenix/pig/PhoenixPigConfigurationIT.java  | 109 ------
 .../apache/phoenix/pig/PhoenixHBaseLoader.java  |  45 +--
 .../apache/phoenix/pig/PhoenixHBaseStorage.java | 218 ++++++------
 .../phoenix/pig/PhoenixPigConfiguration.java    | 340 -------------------
 .../phoenix/pig/hadoop/PhoenixInputFormat.java  | 142 --------
 .../phoenix/pig/hadoop/PhoenixInputSplit.java   | 127 -------
 .../pig/hadoop/PhoenixOutputCommitter.java      | 111 ------
 .../phoenix/pig/hadoop/PhoenixOutputFormat.java |  94 -----
 .../phoenix/pig/hadoop/PhoenixRecord.java       | 112 ------
 .../phoenix/pig/hadoop/PhoenixRecordReader.java | 142 --------
 .../phoenix/pig/hadoop/PhoenixRecordWriter.java |  83 -----
 .../util/ColumnInfoToStringEncoderDecoder.java  |  69 ----
 .../phoenix/pig/util/PhoenixPigSchemaUtil.java  |  29 +-
 .../pig/util/QuerySchemaParserFunction.java     |  21 +-
 .../pig/util/SqlQueryToColumnInfoFunction.java  |  49 ++-
 .../org/apache/phoenix/pig/util/TypeUtil.java   | 248 +++++++-------
 .../pig/writable/PhoenixPigDBWritable.java      | 121 +++++++
 .../pig/PhoenixPigConfigurationTest.java        |  49 ---
 .../ColumnInfoToStringEncoderDecoderTest.java   |  61 ----
 .../pig/util/PhoenixPigSchemaUtilTest.java      |  17 +-
 .../pig/util/QuerySchemaParserFunctionTest.java |  22 +-
 .../util/SqlQueryToColumnInfoFunctionTest.java  |  21 +-
 37 files changed, 1759 insertions(+), 1786 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/phoenix/blob/cabb16f7/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/PhoenixInputFormat.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/PhoenixInputFormat.java b/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/PhoenixInputFormat.java
new file mode 100644
index 0000000..7c67c2c
--- /dev/null
+++ b/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/PhoenixInputFormat.java
@@ -0,0 +1,117 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.phoenix.mapreduce;
+
+import java.io.IOException;
+import java.sql.Connection;
+import java.sql.SQLException;
+import java.sql.Statement;
+import java.util.List;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.client.Scan;
+import org.apache.hadoop.io.NullWritable;
+import org.apache.hadoop.mapreduce.InputFormat;
+import org.apache.hadoop.mapreduce.InputSplit;
+import org.apache.hadoop.mapreduce.JobContext;
+import org.apache.hadoop.mapreduce.RecordReader;
+import org.apache.hadoop.mapreduce.TaskAttemptContext;
+import org.apache.hadoop.mapreduce.lib.db.DBWritable;
+import org.apache.phoenix.compile.QueryPlan;
+import org.apache.phoenix.jdbc.PhoenixStatement;
+import org.apache.phoenix.mapreduce.util.ConnectionUtil;
+import org.apache.phoenix.mapreduce.util.PhoenixConfigurationUtil;
+import org.apache.phoenix.query.KeyRange;
+
+import com.google.common.base.Preconditions;
+import com.google.common.collect.Lists;
+
+/**
+ * {@link InputFormat} implementation from Phoenix.
+ * 
+ */
+public class PhoenixInputFormat<T extends DBWritable> extends InputFormat<NullWritable,T> {
+
+    private static final Log LOG = LogFactory.getLog(PhoenixInputFormat.class);
+       
+    /**
+     * instantiated by framework
+     */
+    public PhoenixInputFormat() {
+    }
+
+    @Override
+    public RecordReader<NullWritable,T> createRecordReader(InputSplit split, TaskAttemptContext context)
+            throws IOException, InterruptedException {
+        
+        final Configuration configuration = context.getConfiguration();
+        final QueryPlan queryPlan = getQueryPlan(context,configuration);
+        @SuppressWarnings("unchecked")
+        final Class<T> inputClass = (Class<T>) PhoenixConfigurationUtil.getInputClass(configuration);
+        return new PhoenixRecordReader<T>(inputClass , configuration, queryPlan);
+    }
+    
+   
+
+    @Override
+    public List<InputSplit> getSplits(JobContext context) throws IOException, InterruptedException {  
+        final Configuration configuration = context.getConfiguration();
+        final QueryPlan queryPlan = getQueryPlan(context,configuration);
+        final List<KeyRange> allSplits = queryPlan.getSplits();
+        final List<InputSplit> splits = generateSplits(queryPlan,allSplits);
+        return splits;
+    }
+
+    private List<InputSplit> generateSplits(final QueryPlan qplan, final List<KeyRange> splits) throws IOException {
+        Preconditions.checkNotNull(qplan);
+        Preconditions.checkNotNull(splits);
+        final List<InputSplit> psplits = Lists.newArrayListWithExpectedSize(splits.size());
+        for (List<Scan> scans : qplan.getScans()) {
+            psplits.add(new PhoenixInputSplit(scans));
+        }
+        return psplits;
+    }
+    
+    /**
+     * Returns the query plan associated with the select query.
+     * @param context
+     * @return
+     * @throws IOException
+     * @throws SQLException
+     */
+    private QueryPlan getQueryPlan(final JobContext context,final Configuration configuration) throws IOException {
+        Preconditions.checkNotNull(context);
+        try{
+            final Connection connection = ConnectionUtil.getConnection(configuration);
+            final String selectStatement = PhoenixConfigurationUtil.getSelectStatement(configuration);
+            Preconditions.checkNotNull(selectStatement);
+            final Statement statement = connection.createStatement();
+            final PhoenixStatement pstmt = statement.unwrap(PhoenixStatement.class);
+            // Optimize the query plan so that we potentially use secondary indexes
+            final QueryPlan queryPlan = pstmt.optimizeQuery(selectStatement);
+            // Initialize the query plan so it sets up the parallel scans
+            queryPlan.iterator();
+            return queryPlan;
+        } catch(Exception exception) {
+            LOG.error(String.format("Failed to get the query plan with error [%s]",exception.getMessage()));
+            throw new RuntimeException(exception);
+        }
+   }
+}

http://git-wip-us.apache.org/repos/asf/phoenix/blob/cabb16f7/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/PhoenixInputSplit.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/PhoenixInputSplit.java b/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/PhoenixInputSplit.java
new file mode 100644
index 0000000..592cb7d
--- /dev/null
+++ b/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/PhoenixInputSplit.java
@@ -0,0 +1,122 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.phoenix.mapreduce;
+
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+import java.util.List;
+
+import org.apache.hadoop.hbase.client.Scan;
+import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.io.WritableUtils;
+import org.apache.hadoop.mapreduce.InputSplit;
+import org.apache.phoenix.query.KeyRange;
+
+import com.google.common.base.Preconditions;
+import com.google.common.collect.Lists;
+
+/**
+ * Input split class to hold the lower and upper bound range. {@link KeyRange}
+ */
+public class PhoenixInputSplit extends InputSplit implements Writable {
+
+    private List<Scan> scans;
+    private KeyRange keyRange;
+   
+    /**
+     * No Arg constructor
+     */
+    public PhoenixInputSplit() {
+    }
+    
+   /**
+    * 
+    * @param keyRange
+    */
+    public PhoenixInputSplit(final List<Scan> scans) {
+        Preconditions.checkNotNull(scans);
+        Preconditions.checkState(!scans.isEmpty());
+        this.scans = scans;
+        init();
+    }
+    
+    public List<Scan> getScans() {
+        return scans;
+    }
+    
+    public KeyRange getKeyRange() {
+        return keyRange;
+    }
+    
+    private void init() {
+        this.keyRange = KeyRange.getKeyRange(scans.get(0).getStartRow(), scans.get(scans.size()-1).getStopRow());
+    }
+    
+    @Override
+    public void readFields(DataInput input) throws IOException {
+        int count = WritableUtils.readVInt(input);
+        scans = Lists.newArrayListWithExpectedSize(count);
+        for (int i = 0; i < count; i++) {
+            Scan scan = new Scan();
+            scan.readFields(input);
+            scans.add(scan);
+        }
+        init();
+    }
+    
+    @Override
+    public void write(DataOutput output) throws IOException {
+        Preconditions.checkNotNull(scans);
+        WritableUtils.writeVInt(output, scans.size());
+        for (Scan scan : scans) {
+            scan.write(output);
+        }
+    }
+
+    @Override
+    public long getLength() throws IOException, InterruptedException {
+         return 0;
+    }
+
+    @Override
+    public String[] getLocations() throws IOException, InterruptedException {
+        return new String[]{};
+    }
+
+    @Override
+    public int hashCode() {
+        final int prime = 31;
+        int result = 1;
+        result = prime * result + keyRange.hashCode();
+        return result;
+    }
+
+    @Override
+    public boolean equals(Object obj) {
+        if (this == obj) { return true; }
+        if (obj == null) { return false; }
+        if (!(obj instanceof PhoenixInputSplit)) { return false; }
+        PhoenixInputSplit other = (PhoenixInputSplit)obj;
+        if (keyRange == null) {
+            if (other.keyRange != null) { return false; }
+        } else if (!keyRange.equals(other.keyRange)) { return false; }
+        return true;
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/phoenix/blob/cabb16f7/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/PhoenixOutputCommitter.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/PhoenixOutputCommitter.java b/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/PhoenixOutputCommitter.java
new file mode 100644
index 0000000..ffee5c7
--- /dev/null
+++ b/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/PhoenixOutputCommitter.java
@@ -0,0 +1,54 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.phoenix.mapreduce;
+
+import java.io.IOException;
+
+import org.apache.hadoop.mapreduce.JobContext;
+import org.apache.hadoop.mapreduce.OutputCommitter;
+import org.apache.hadoop.mapreduce.TaskAttemptContext;
+
+/**
+ * A no-op {@link OutputCommitter}
+ */
+public class PhoenixOutputCommitter extends OutputCommitter {
+    
+    public PhoenixOutputCommitter() {}
+
+    @Override
+    public void abortTask(TaskAttemptContext context) throws IOException {
+    }
+
+    @Override
+    public void commitTask(TaskAttemptContext context) throws IOException {
+    }
+
+    @Override
+    public boolean needsTaskCommit(TaskAttemptContext context) throws IOException {
+        return true;
+    }
+
+    @Override
+    public void setupJob(JobContext jobContext) throws IOException {        
+    }
+
+    @Override
+    public void setupTask(TaskAttemptContext context) throws IOException {
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/phoenix/blob/cabb16f7/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/PhoenixOutputFormat.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/PhoenixOutputFormat.java b/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/PhoenixOutputFormat.java
new file mode 100644
index 0000000..e55b977
--- /dev/null
+++ b/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/PhoenixOutputFormat.java
@@ -0,0 +1,62 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.phoenix.mapreduce;
+
+import java.io.IOException;
+import java.sql.SQLException;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.io.NullWritable;
+import org.apache.hadoop.mapreduce.JobContext;
+import org.apache.hadoop.mapreduce.OutputCommitter;
+import org.apache.hadoop.mapreduce.OutputFormat;
+import org.apache.hadoop.mapreduce.RecordWriter;
+import org.apache.hadoop.mapreduce.TaskAttemptContext;
+import org.apache.hadoop.mapreduce.lib.db.DBWritable;
+
+/**
+ * {@link OutputFormat} implementation for Phoenix.
+ *
+ */
+public class PhoenixOutputFormat <T extends DBWritable> extends OutputFormat<NullWritable,T> {
+    private static final Log LOG = LogFactory.getLog(PhoenixOutputFormat.class);
+    
+    @Override
+    public void checkOutputSpecs(JobContext jobContext) throws IOException, InterruptedException {      
+    }
+    
+    /**
+     * 
+     */
+    @Override
+    public OutputCommitter getOutputCommitter(TaskAttemptContext context) throws IOException, InterruptedException {
+        return new PhoenixOutputCommitter();
+    }
+
+    @Override
+    public RecordWriter<NullWritable, T> getRecordWriter(TaskAttemptContext context) throws IOException, InterruptedException {
+        try {
+            return new PhoenixRecordWriter<T>(context.getConfiguration());
+        } catch (SQLException e) {
+            LOG.error("Error calling PhoenixRecordWriter "  + e.getMessage());
+            throw new RuntimeException(e);
+        }
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/phoenix/blob/cabb16f7/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/PhoenixRecordReader.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/PhoenixRecordReader.java b/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/PhoenixRecordReader.java
new file mode 100644
index 0000000..2c206ab
--- /dev/null
+++ b/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/PhoenixRecordReader.java
@@ -0,0 +1,140 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.phoenix.mapreduce;
+
+import java.io.IOException;
+import java.sql.SQLException;
+import java.util.List;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.client.Scan;
+import org.apache.hadoop.io.NullWritable;
+import org.apache.hadoop.mapreduce.InputSplit;
+import org.apache.hadoop.mapreduce.RecordReader;
+import org.apache.hadoop.mapreduce.TaskAttemptContext;
+import org.apache.hadoop.mapreduce.lib.db.DBWritable;
+import org.apache.hadoop.util.ReflectionUtils;
+import org.apache.phoenix.compile.QueryPlan;
+import org.apache.phoenix.iterate.ConcatResultIterator;
+import org.apache.phoenix.iterate.LookAheadResultIterator;
+import org.apache.phoenix.iterate.PeekingResultIterator;
+import org.apache.phoenix.iterate.ResultIterator;
+import org.apache.phoenix.iterate.SequenceResultIterator;
+import org.apache.phoenix.iterate.TableResultIterator;
+import org.apache.phoenix.jdbc.PhoenixResultSet;
+
+import com.google.common.base.Preconditions;
+import com.google.common.base.Throwables;
+import com.google.common.collect.Lists;
+
+/**
+ * {@link RecordReader} implementation that iterates over the the records.
+ */
+public class PhoenixRecordReader<T extends DBWritable> extends RecordReader<NullWritable,T> {
+    
+    private static final Log LOG = LogFactory.getLog(PhoenixRecordReader.class);
+    private final Configuration  configuration;
+    private final QueryPlan queryPlan;
+    private NullWritable key =  NullWritable.get();
+    private T value = null;
+    private Class<T> inputClass;
+    private ResultIterator resultIterator = null;
+    private PhoenixResultSet resultSet;
+    
+    public PhoenixRecordReader(Class<T> inputClass,final Configuration configuration,final QueryPlan queryPlan) {
+        Preconditions.checkNotNull(configuration);
+        Preconditions.checkNotNull(queryPlan);
+        this.inputClass = inputClass;
+        this.configuration = configuration;
+        this.queryPlan = queryPlan;
+    }
+
+    @Override
+    public void close() throws IOException {
+       if(resultIterator != null) {
+           try {
+               resultIterator.close();
+        } catch (SQLException e) {
+           LOG.error(" Error closing resultset.");
+           throw new RuntimeException(e);
+        }
+       }
+    }
+
+    @Override
+    public NullWritable getCurrentKey() throws IOException, InterruptedException {
+        return key;
+    }
+
+    @Override
+    public T getCurrentValue() throws IOException, InterruptedException {
+        return value;
+    }
+
+    @Override
+    public float getProgress() throws IOException, InterruptedException {
+        return 0;
+    }
+
+    @Override
+    public void initialize(InputSplit split, TaskAttemptContext context) throws IOException, InterruptedException {
+        final PhoenixInputSplit pSplit = (PhoenixInputSplit)split;
+        final List<Scan> scans = pSplit.getScans();
+        try {
+            List<PeekingResultIterator> iterators = Lists.newArrayListWithExpectedSize(scans.size());
+            for (Scan scan : scans) {
+                final TableResultIterator tableResultIterator = new TableResultIterator(queryPlan.getContext(), queryPlan.getTableRef(),scan);
+                PeekingResultIterator peekingResultIterator = LookAheadResultIterator.wrap(tableResultIterator);
+                iterators.add(peekingResultIterator);
+            }
+            ResultIterator iterator = ConcatResultIterator.newIterator(iterators);
+            if(queryPlan.getContext().getSequenceManager().getSequenceCount() > 0) {
+                iterator = new SequenceResultIterator(iterator, queryPlan.getContext().getSequenceManager());
+            }
+            this.resultIterator = iterator;
+            this.resultSet = new PhoenixResultSet(this.resultIterator, queryPlan.getProjector(),queryPlan.getContext().getStatement());
+        } catch (SQLException e) {
+            LOG.error(String.format(" Error [%s] initializing PhoenixRecordReader. ",e.getMessage()));
+            Throwables.propagate(e);
+        }
+   }
+
+    @Override
+    public boolean nextKeyValue() throws IOException, InterruptedException {
+        if (key == null) {
+            key = NullWritable.get();
+        }
+        if (value == null) {
+            value =  ReflectionUtils.newInstance(inputClass, this.configuration);
+        }
+        Preconditions.checkNotNull(this.resultSet);
+        try {
+            if(!resultSet.next()) {
+                return false;
+            }
+            value.readFields(resultSet);
+            return true;
+        } catch (SQLException e) {
+            LOG.error(String.format(" Error [%s] occurred while iterating over the resultset. ",e.getMessage()));
+            throw new RuntimeException(e);
+        }
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/phoenix/blob/cabb16f7/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/PhoenixRecordWriter.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/PhoenixRecordWriter.java b/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/PhoenixRecordWriter.java
new file mode 100644
index 0000000..4d26bf4
--- /dev/null
+++ b/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/PhoenixRecordWriter.java
@@ -0,0 +1,91 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.phoenix.mapreduce;
+
+import java.io.IOException;
+import java.sql.Connection;
+import java.sql.PreparedStatement;
+import java.sql.SQLException;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.io.NullWritable;
+import org.apache.hadoop.mapreduce.RecordWriter;
+import org.apache.hadoop.mapreduce.TaskAttemptContext;
+import org.apache.hadoop.mapreduce.lib.db.DBWritable;
+import org.apache.phoenix.mapreduce.util.ConnectionUtil;
+import org.apache.phoenix.mapreduce.util.PhoenixConfigurationUtil;
+
+/**
+ * Default {@link RecordWriter} implementation from Phoenix
+ *
+ */
+public class PhoenixRecordWriter<T extends DBWritable>  extends RecordWriter<NullWritable, T> {
+    
+    private static final Log LOG = LogFactory.getLog(PhoenixRecordWriter.class);
+    
+    private final Connection conn;
+    private final PreparedStatement statement;
+    private final long batchSize;
+    private long numRecords = 0;
+    
+    public PhoenixRecordWriter(final Configuration configuration) throws SQLException {
+        this.conn = ConnectionUtil.getConnection(configuration);
+        this.batchSize = PhoenixConfigurationUtil.getBatchSize(configuration);
+        final String upsertQuery = PhoenixConfigurationUtil.getUpsertStatement(configuration);
+        this.statement = this.conn.prepareStatement(upsertQuery);
+    }
+
+    @Override
+    public void close(TaskAttemptContext context) throws IOException, InterruptedException {
+        try {
+            statement.executeBatch();
+            conn.commit();
+         } catch (SQLException e) {
+             LOG.error("SQLException while performing the commit for the task.");
+             throw new RuntimeException(e);
+          } finally {
+            try {
+              statement.close();
+              conn.close();
+            }
+            catch (SQLException ex) {
+              LOG.error("SQLException while closing the connection for the task.");
+              throw new RuntimeException(ex);
+            }
+          }
+    }
+
+    @Override
+    public void write(NullWritable n, T record) throws IOException, InterruptedException {      
+        try {
+            record.write(statement);
+            numRecords++;
+            statement.addBatch();
+            if (numRecords % batchSize == 0) {
+                LOG.debug("commit called on a batch of size : " + batchSize);
+                statement.executeBatch();
+                conn.commit();
+            }
+        } catch (SQLException e) {
+            throw new RuntimeException("Exception while committing to database.", e);
+        }
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/phoenix/blob/cabb16f7/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/util/ColumnInfoToStringEncoderDecoder.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/util/ColumnInfoToStringEncoderDecoder.java b/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/util/ColumnInfoToStringEncoderDecoder.java
new file mode 100644
index 0000000..ec52fba
--- /dev/null
+++ b/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/util/ColumnInfoToStringEncoderDecoder.java
@@ -0,0 +1,65 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.phoenix.mapreduce.util;
+
+import java.util.List;
+
+import org.apache.phoenix.util.ColumnInfo;
+
+import com.google.common.base.Function;
+import com.google.common.base.Joiner;
+import com.google.common.base.Preconditions;
+import com.google.common.base.Splitter;
+import com.google.common.collect.Iterables;
+import com.google.common.collect.Lists;
+
+/**
+ * A codec to transform a {@link ColumnInfo} to a {@link String} and decode back.
+ */
+public class ColumnInfoToStringEncoderDecoder {
+
+    private static final String COLUMN_INFO_DELIMITER = "|";
+    
+    private ColumnInfoToStringEncoderDecoder() {
+        
+    }
+    
+    public static String encode(List<ColumnInfo> columnInfos) {
+        Preconditions.checkNotNull(columnInfos);
+        return Joiner.on(COLUMN_INFO_DELIMITER)
+                     .skipNulls()
+                     .join(columnInfos);
+    }
+    
+    public static List<ColumnInfo> decode(final String columnInfoStr) {
+        Preconditions.checkNotNull(columnInfoStr);
+        List<ColumnInfo> columnInfos = Lists.newArrayList(
+                                Iterables.transform(
+                                        Splitter.on(COLUMN_INFO_DELIMITER).omitEmptyStrings().split(columnInfoStr),
+                                        new Function<String, ColumnInfo>() {
+                                            @Override
+                                            public ColumnInfo apply(String colInfo) {
+                                               return ColumnInfo.fromString(colInfo);
+                                            }
+                                        }));
+        return columnInfos;
+        
+    }
+
+    
+}

http://git-wip-us.apache.org/repos/asf/phoenix/blob/cabb16f7/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/util/ConnectionUtil.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/util/ConnectionUtil.java b/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/util/ConnectionUtil.java
new file mode 100644
index 0000000..0864cba
--- /dev/null
+++ b/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/util/ConnectionUtil.java
@@ -0,0 +1,49 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.phoenix.mapreduce.util;
+
+import java.sql.Connection;
+import java.sql.DriverManager;
+import java.sql.SQLException;
+import java.util.Properties;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.HConstants;
+import org.apache.phoenix.util.QueryUtil;
+
+import com.google.common.base.Preconditions;
+
+/**
+ * Utility class to return a {@link Connection} .
+ */
+public class ConnectionUtil {
+    
+    /**
+     * Returns the {@link Connection} from Configuration
+     * @param configuration
+     * @return
+     * @throws SQLException
+     */
+    public static Connection getConnection(final Configuration configuration) throws SQLException {
+        Preconditions.checkNotNull(configuration);
+        final Properties props = new Properties();
+        final Connection conn = DriverManager.getConnection(QueryUtil.getUrl(configuration.get(HConstants.ZOOKEEPER_QUORUM)), props);
+        return conn;
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/phoenix/blob/cabb16f7/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/util/PhoenixConfigurationUtil.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/util/PhoenixConfigurationUtil.java b/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/util/PhoenixConfigurationUtil.java
new file mode 100644
index 0000000..83a606b
--- /dev/null
+++ b/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/util/PhoenixConfigurationUtil.java
@@ -0,0 +1,299 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.phoenix.mapreduce.util;
+
+import static org.apache.commons.lang.StringUtils.isNotEmpty;
+
+import java.sql.Connection;
+import java.sql.SQLException;
+import java.util.List;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.mapreduce.lib.db.DBInputFormat.NullDBWritable;
+import org.apache.hadoop.mapreduce.lib.db.DBWritable;
+import org.apache.phoenix.jdbc.PhoenixConnection;
+import org.apache.phoenix.mapreduce.PhoenixInputFormat;
+import org.apache.phoenix.util.ColumnInfo;
+import org.apache.phoenix.util.PhoenixRuntime;
+import org.apache.phoenix.util.QueryUtil;
+
+import com.google.common.base.Joiner;
+import com.google.common.base.Preconditions;
+import com.google.common.base.Splitter;
+import com.google.common.collect.Lists;
+
+/**
+ * A utility class to set properties on the {#link Configuration} instance.
+ * Used as part of Map Reduce job configuration.
+ * 
+ */
+public final class PhoenixConfigurationUtil {
+
+    private static final Log LOG = LogFactory.getLog(PhoenixInputFormat.class);
+    
+    public static final String UPSERT_COLUMNS = "phoenix.upsert.columns";
+    
+    public static final String UPSERT_STATEMENT = "phoenix.upsert.stmt";
+    
+    public static final String UPSERT_COLUMN_INFO_KEY  = "phoenix.upsert.columninfos.list";
+    
+    public static final String SELECT_STATEMENT = "phoenix.select.stmt";
+    
+    public static final String UPSERT_BATCH_SIZE = "phoenix.upsert.batch.size";
+    
+    public static final String SELECT_COLUMNS = "phoneix.select.query.columns";
+    
+    public static final String SELECT_COLUMN_INFO_KEY  = "phoenix.select.columninfos.list";
+    
+    public static final String SCHEMA_TYPE = "phoenix.select.schema.type";
+    
+    public static final String COLUMN_NAMES_DELIMITER = "phoenix.column.names.delimiter";
+    
+    public static final String INPUT_TABLE_NAME = "phoenix.input.table.name" ;
+    
+    public static final String INPUT_TABLE_CONDITIONS = "phoenix.input.table.conditions" ;
+    
+    public static final String OUTPUT_TABLE_NAME = "phoenix.output.table.name" ;
+    
+    public static final long DEFAULT_UPSERT_BATCH_SIZE = 1000;
+    
+    public static final String DEFAULT_COLUMN_NAMES_DELIMITER = ",";
+
+    public static final String INPUT_CLASS = "phoenix.input.class";
+    
+    public enum SchemaType {
+        TABLE,
+        QUERY;
+    }
+
+    private PhoenixConfigurationUtil(){
+        
+    }
+    /**
+     * 
+     * @param tableName
+     */
+    public static void setInputTableName(final Configuration configuration, final String tableName) {
+        Preconditions.checkNotNull(configuration);
+        Preconditions.checkNotNull(tableName);
+        configuration.set(INPUT_TABLE_NAME, tableName);
+    }
+    
+    public static void setInputTableConditions(final Configuration configuration, final String conditions) {
+        Preconditions.checkNotNull(configuration);
+        Preconditions.checkNotNull(conditions);
+        configuration.set(INPUT_TABLE_CONDITIONS, conditions);
+    }
+    
+    public static void setSelectColumnNames(final Configuration configuration,final String[] columns) {
+        Preconditions.checkNotNull(configuration);
+        final String selectColumnNames = Joiner.on(DEFAULT_COLUMN_NAMES_DELIMITER).join(columns);
+        configuration.set(SELECT_COLUMNS, selectColumnNames);
+    }
+    
+    public static void setSelectColumnNames(final Configuration configuration,final String columns) {
+        Preconditions.checkNotNull(configuration);
+        configuration.set(SELECT_COLUMNS, columns);
+    }
+    
+    public static void setInputClass(final Configuration configuration, Class<? extends DBWritable> inputClass) {
+        Preconditions.checkNotNull(configuration);
+        configuration.setClass(INPUT_CLASS ,inputClass,DBWritable.class);
+    }
+    
+    public static void setInputQuery(final Configuration configuration, final String inputQuery) {
+        Preconditions.checkNotNull(configuration);
+        Preconditions.checkNotNull(inputQuery);
+        configuration.set(SELECT_STATEMENT, inputQuery);
+    }
+    
+    public static void setSchemaType(Configuration configuration, final SchemaType schemaType) {
+        Preconditions.checkNotNull(configuration);
+        configuration.set(SCHEMA_TYPE, schemaType.name());
+    }
+    
+    public static void setOutputTableName(final Configuration configuration, final String tableName) {
+        Preconditions.checkNotNull(configuration);
+        Preconditions.checkNotNull(tableName);
+        configuration.set(OUTPUT_TABLE_NAME, tableName);
+    }
+    
+    public static void setUpsertColumnNames(final Configuration configuration,final String[] columns) {
+        Preconditions.checkNotNull(configuration);
+        final String upsertColumnNames = Joiner.on(DEFAULT_COLUMN_NAMES_DELIMITER).join(columns);
+        configuration.set(UPSERT_COLUMNS, upsertColumnNames);
+    }
+    
+    public static void setUpsertColumnNames(final Configuration configuration,final String columns) {
+        Preconditions.checkNotNull(configuration);
+        configuration.set(UPSERT_COLUMNS, columns);
+    }
+    
+   
+    public static void setBatchSize(final Configuration configuration, final Long batchSize) {
+        Preconditions.checkNotNull(configuration);
+        configuration.setLong(UPSERT_BATCH_SIZE, batchSize);
+    }
+    
+    public static Class<?> getInputClass(final Configuration configuration) {
+        return configuration.getClass(INPUT_CLASS, NullDBWritable.class);
+    }
+    public static SchemaType getSchemaType(final Configuration configuration) {
+        final String schemaTp = configuration.get(SCHEMA_TYPE);
+        Preconditions.checkNotNull(schemaTp);
+        return SchemaType.valueOf(schemaTp);
+    }
+    
+    public static List<ColumnInfo> getUpsertColumnMetadataList(final Configuration configuration) throws SQLException {
+        Preconditions.checkNotNull(configuration);
+        final String tableName = getOutputTableName(configuration);
+        Preconditions.checkNotNull(tableName);
+        final String columnInfoStr = configuration.get(UPSERT_COLUMN_INFO_KEY);
+        if(isNotEmpty(columnInfoStr)) {
+            return ColumnInfoToStringEncoderDecoder.decode(columnInfoStr);
+        }
+        final Connection connection = ConnectionUtil.getConnection(configuration);
+        String upsertColumns = configuration.get(UPSERT_COLUMNS);
+        List<String> upsertColumnList = null;
+        if(isNotEmpty(upsertColumns)) {
+            final String columnNamesDelimiter = configuration.get(COLUMN_NAMES_DELIMITER, DEFAULT_COLUMN_NAMES_DELIMITER);
+            upsertColumnList = Lists.newArrayList(Splitter.on(columnNamesDelimiter).omitEmptyStrings().trimResults().split(upsertColumns));
+            LOG.info(String.format("UseUpsertColumns=%s, upsertColumns=%s, upsertColumnSet.size()=%s, parsedColumns=%s "
+                    ,!upsertColumnList.isEmpty(),upsertColumns, upsertColumnList.size(), Joiner.on(DEFAULT_COLUMN_NAMES_DELIMITER).join(upsertColumnList)
+                    ));
+        } 
+       List<ColumnInfo> columnMetadataList = PhoenixRuntime.generateColumnInfo(connection, tableName, upsertColumnList);
+       final String encodedColumnInfos = ColumnInfoToStringEncoderDecoder.encode(columnMetadataList);
+       // we put the encoded column infos in the Configuration for re usability. 
+       configuration.set(UPSERT_COLUMN_INFO_KEY, encodedColumnInfos);
+       connection.close();
+       return columnMetadataList;
+    }
+    
+     public static String getUpsertStatement(final Configuration configuration) throws SQLException {
+        Preconditions.checkNotNull(configuration);
+        final String tableName = getOutputTableName(configuration);
+        Preconditions.checkNotNull(tableName);
+        String upsertStmt = configuration.get(UPSERT_STATEMENT);
+        if(isNotEmpty(upsertStmt)) {
+            return upsertStmt;
+        }
+        final boolean useUpsertColumns = isNotEmpty(configuration.get(UPSERT_COLUMNS,""));
+        final List<ColumnInfo> columnMetadataList = getUpsertColumnMetadataList(configuration);
+        if (useUpsertColumns) {
+            // Generating UPSERT statement without column name information.
+            upsertStmt = QueryUtil.constructUpsertStatement(tableName, columnMetadataList);
+            LOG.info("Phoenix Custom Upsert Statement: "+ upsertStmt);
+        } else {
+            // Generating UPSERT statement without column name information.
+            upsertStmt = QueryUtil.constructGenericUpsertStatement(tableName, columnMetadataList.size());
+            LOG.info("Phoenix Generic Upsert Statement: " + upsertStmt);
+        }
+        configuration.set(UPSERT_STATEMENT, upsertStmt);
+        return upsertStmt;
+        
+    }
+    
+    public static List<ColumnInfo> getSelectColumnMetadataList(final Configuration configuration) throws SQLException {
+        Preconditions.checkNotNull(configuration);
+        final String columnInfoStr = configuration.get(SELECT_COLUMN_INFO_KEY);
+        if(isNotEmpty(columnInfoStr)) {
+            return ColumnInfoToStringEncoderDecoder.decode(columnInfoStr);
+        }
+        final String tableName = getInputTableName(configuration);
+        Preconditions.checkNotNull(tableName);
+        final Connection connection = ConnectionUtil.getConnection(configuration);
+        final List<String> selectColumnList = getSelectColumnList(configuration);
+        final List<ColumnInfo> columnMetadataList = PhoenixRuntime.generateColumnInfo(connection, tableName, selectColumnList);
+        final String encodedColumnInfos = ColumnInfoToStringEncoderDecoder.encode(columnMetadataList);
+        // we put the encoded column infos in the Configuration for re usability. 
+        configuration.set(SELECT_COLUMN_INFO_KEY, encodedColumnInfos);
+        connection.close();
+        return columnMetadataList;
+    }
+
+    private static List<String> getSelectColumnList(
+            final Configuration configuration) {
+        String selectColumns = configuration.get(SELECT_COLUMNS);
+        List<String> selectColumnList = null;
+        if(isNotEmpty(selectColumns)) {
+            final String columnNamesDelimiter = configuration.get(COLUMN_NAMES_DELIMITER, DEFAULT_COLUMN_NAMES_DELIMITER);
+            selectColumnList = Lists.newArrayList(Splitter.on(columnNamesDelimiter).omitEmptyStrings().trimResults().split(selectColumns));
+            LOG.info(String.format("UseSelectColumns=%s, selectColumns=%s, selectColumnSet.size()=%s, parsedColumns=%s "
+                    ,!selectColumnList.isEmpty(),selectColumns, selectColumnList.size(), Joiner.on(DEFAULT_COLUMN_NAMES_DELIMITER).join(selectColumnList)
+                    ));
+        }
+        return selectColumnList;
+    }
+    
+    public static String getSelectStatement(final Configuration configuration) throws SQLException {
+        Preconditions.checkNotNull(configuration);
+        String selectStmt = configuration.get(SELECT_STATEMENT);
+        if(isNotEmpty(selectStmt)) {
+            return selectStmt;
+        }
+        final String tableName = getInputTableName(configuration);
+        Preconditions.checkNotNull(tableName);
+        final List<ColumnInfo> columnMetadataList = getSelectColumnMetadataList(configuration);
+        final String conditions = configuration.get(INPUT_TABLE_CONDITIONS);
+        selectStmt = QueryUtil.constructSelectStatement(tableName, columnMetadataList, conditions);
+        LOG.info("Select Statement: "+ selectStmt);
+        configuration.set(SELECT_STATEMENT, selectStmt);
+        return selectStmt;
+    }
+    
+    public static long getBatchSize(final Configuration configuration) throws SQLException {
+        Preconditions.checkNotNull(configuration);
+        long batchSize = configuration.getLong(UPSERT_BATCH_SIZE, DEFAULT_UPSERT_BATCH_SIZE);
+        if(batchSize <= 0) {
+           Connection conn = ConnectionUtil.getConnection(configuration);
+           batchSize = ((PhoenixConnection) conn).getMutateBatchSize();
+           conn.close();
+        }
+        configuration.setLong(UPSERT_BATCH_SIZE, batchSize);
+        return batchSize;
+    }
+    
+    public static int getSelectColumnsCount(Configuration configuration,
+            String tableName) throws SQLException {
+        Preconditions.checkNotNull(configuration);
+        final String schemaTp = configuration.get(SCHEMA_TYPE);
+        final SchemaType schemaType = SchemaType.valueOf(schemaTp);
+        int count = 0;
+        if(SchemaType.QUERY.equals(schemaType)) {
+            List<String> selectedColumnList = getSelectColumnList(configuration);
+            count = selectedColumnList == null ? 0 : selectedColumnList.size();
+        } else {
+            List<ColumnInfo> columnInfos = getSelectColumnMetadataList(configuration);
+            count = columnInfos == null ? 0 : columnInfos.size();
+        }
+        return count;
+    }
+
+    public static String getInputTableName(Configuration configuration) {
+        Preconditions.checkNotNull(configuration);
+        return configuration.get(INPUT_TABLE_NAME);
+    }
+
+    public static String getOutputTableName(Configuration configuration) {
+        Preconditions.checkNotNull(configuration);
+        return configuration.get(OUTPUT_TABLE_NAME);
+    }
+}

http://git-wip-us.apache.org/repos/asf/phoenix/blob/cabb16f7/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/util/PhoenixMapReduceUtil.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/util/PhoenixMapReduceUtil.java b/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/util/PhoenixMapReduceUtil.java
new file mode 100644
index 0000000..f1a7f5a
--- /dev/null
+++ b/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/util/PhoenixMapReduceUtil.java
@@ -0,0 +1,99 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.phoenix.mapreduce.util;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.mapreduce.Job;
+import org.apache.hadoop.mapreduce.lib.db.DBWritable;
+import org.apache.phoenix.mapreduce.PhoenixInputFormat;
+import org.apache.phoenix.mapreduce.PhoenixOutputFormat;
+import org.apache.phoenix.mapreduce.util.PhoenixConfigurationUtil.SchemaType;
+
+/**
+ * Utility class for setting Configuration parameters for the Map Reduce job
+ */
+public final class PhoenixMapReduceUtil {
+
+    private PhoenixMapReduceUtil() {
+        
+    }
+    
+    /**
+     * 
+     * @param job
+     * @param inputClass DBWritable class
+     * @param tableName  Input table name
+     * @param conditions Condition clause to be added to the WHERE clause.
+     * @param fieldNames fields being projected for the SELECT query.
+     */
+    public static void setInput(final Job job, final Class<? extends DBWritable> inputClass, final String tableName , final String conditions, final String... fieldNames) {
+          job.setInputFormatClass(PhoenixInputFormat.class);
+          final Configuration configuration = job.getConfiguration();
+          PhoenixConfigurationUtil.setInputTableName(configuration, tableName);
+          PhoenixConfigurationUtil.setSelectColumnNames(configuration,fieldNames);
+          PhoenixConfigurationUtil.setInputClass(configuration,inputClass);
+          PhoenixConfigurationUtil.setSchemaType(configuration, SchemaType.TABLE);
+    }
+    
+    /**
+     * 
+     * @param job         
+     * @param inputClass  DBWritable class  
+     * @param tableName   Input table name
+     * @param inputQuery  Select query.
+     */
+    public static void setInput(final Job job, final Class<? extends DBWritable> inputClass, final String tableName, final String inputQuery) {
+          job.setInputFormatClass(PhoenixInputFormat.class);
+          final Configuration configuration = job.getConfiguration();
+          PhoenixConfigurationUtil.setInputTableName(configuration, tableName);
+          PhoenixConfigurationUtil.setInputQuery(configuration, inputQuery);
+          PhoenixConfigurationUtil.setInputClass(configuration,inputClass);
+          PhoenixConfigurationUtil.setSchemaType(configuration, SchemaType.QUERY);
+     }
+    
+    /**
+     * 
+     * @param job
+     * @param outputClass  
+     * @param tableName  Output table 
+     * @param columns    List of columns separated by ,
+     */
+    public static void setOutput(final Job job, final String tableName,final String columns) {
+        job.setOutputFormatClass(PhoenixOutputFormat.class);
+        final Configuration configuration = job.getConfiguration();
+        PhoenixConfigurationUtil.setOutputTableName(configuration, tableName);
+        PhoenixConfigurationUtil.setUpsertColumnNames(configuration,columns);
+    }
+    
+    
+    /**
+     * 
+     * @param job
+     * @param outputClass
+     * @param tableName  Output table 
+     * @param fieldNames fields
+     */
+    public static void setOutput(final Job job, final String tableName , final String... fieldNames) {
+          job.setOutputFormatClass(PhoenixOutputFormat.class);
+          final Configuration configuration = job.getConfiguration();
+          PhoenixConfigurationUtil.setOutputTableName(configuration, tableName);
+          PhoenixConfigurationUtil.setUpsertColumnNames(configuration,fieldNames);
+    }
+
+    
+}

http://git-wip-us.apache.org/repos/asf/phoenix/blob/cabb16f7/phoenix-core/src/main/java/org/apache/phoenix/util/PhoenixRuntime.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/util/PhoenixRuntime.java b/phoenix-core/src/main/java/org/apache/phoenix/util/PhoenixRuntime.java
index 870ae62..c0c2783 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/util/PhoenixRuntime.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/util/PhoenixRuntime.java
@@ -317,7 +317,7 @@ public class PhoenixRuntime {
         if (columns == null) {
             // use all columns in the table
             for(PColumn pColumn : table.getColumns()) {
-               int sqlType = pColumn.getDataType().getResultSetSqlType();
+               int sqlType = pColumn.getDataType().getSqlType();
                columnInfoList.add(new ColumnInfo(pColumn.toString(), sqlType));
             }
         } else {
@@ -397,7 +397,7 @@ public class PhoenixRuntime {
         if (pColumn==null) {
             throw new SQLException("pColumn must not be null.");
         }
-        int sqlType = pColumn.getDataType().getResultSetSqlType();
+        int sqlType = pColumn.getDataType().getSqlType();
         ColumnInfo columnInfo = new ColumnInfo(pColumn.toString(),sqlType);
         return columnInfo;
     }

http://git-wip-us.apache.org/repos/asf/phoenix/blob/cabb16f7/phoenix-core/src/main/java/org/apache/phoenix/util/QueryUtil.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/util/QueryUtil.java b/phoenix-core/src/main/java/org/apache/phoenix/util/QueryUtil.java
index 2bea37d..2317e8c 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/util/QueryUtil.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/util/QueryUtil.java
@@ -18,6 +18,8 @@
 
 package org.apache.phoenix.util;
 
+import static org.apache.phoenix.util.SchemaUtil.getEscapedFullColumnName;
+
 import java.sql.DatabaseMetaData;
 import java.sql.ResultSet;
 import java.sql.SQLException;
@@ -130,29 +132,33 @@ public final class QueryUtil {
      * 
      * @param fullTableName name of the table for which the select statement needs to be created.
      * @param columnInfos  list of columns to be projected in the select statement.
+     * @param conditions   The condition clause to be added to the WHERE condition
      * @return Select Query 
      */
-    public static String constructSelectStatement(String fullTableName, List<ColumnInfo> columnInfos) {
+    public static String constructSelectStatement(String fullTableName, List<ColumnInfo> columnInfos,final String conditions) {
         Preconditions.checkNotNull(fullTableName,"Table name cannot be null");
         if(columnInfos == null || columnInfos.isEmpty()) {
              throw new IllegalArgumentException("At least one column must be provided");
         }
         // escape the table name to ensure it is case sensitive.
         final String escapedFullTableName = SchemaUtil.getEscapedFullTableName(fullTableName);
-        StringBuilder sb = new StringBuilder();
-        sb.append("SELECT ");
+        StringBuilder query = new StringBuilder();
+        query.append("SELECT ");
         for (ColumnInfo cinfo : columnInfos) {
             if (cinfo != null) {
-                String fullColumnName = SchemaUtil.getEscapedFullColumnName(cinfo.getColumnName());
-                sb.append(fullColumnName);
-                sb.append(",");
+                String fullColumnName = getEscapedFullColumnName(cinfo.getColumnName());
+                query.append(fullColumnName);
+                query.append(",");
              }
          }
         // Remove the trailing comma
-        sb.setLength(sb.length() - 1);
-        sb.append(" FROM ");
-        sb.append(escapedFullTableName);
-        return sb.toString();
+        query.setLength(query.length() - 1);
+        query.append(" FROM ");
+        query.append(escapedFullTableName);
+        if(conditions != null && conditions.length() > 0) {
+            query.append(" WHERE (").append(conditions).append(")");
+        }
+        return query.toString();
     }
 
     public static String getUrl(String server) {

http://git-wip-us.apache.org/repos/asf/phoenix/blob/cabb16f7/phoenix-core/src/test/java/org/apache/phoenix/mapreduce/util/ColumnInfoToStringEncoderDecoderTest.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/test/java/org/apache/phoenix/mapreduce/util/ColumnInfoToStringEncoderDecoderTest.java b/phoenix-core/src/test/java/org/apache/phoenix/mapreduce/util/ColumnInfoToStringEncoderDecoderTest.java
new file mode 100644
index 0000000..1004981
--- /dev/null
+++ b/phoenix-core/src/test/java/org/apache/phoenix/mapreduce/util/ColumnInfoToStringEncoderDecoderTest.java
@@ -0,0 +1,61 @@
+/*
+ * Copyright 2010 The Apache Software Foundation
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ *distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you maynot use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicablelaw or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.phoenix.mapreduce.util;
+
+import static org.junit.Assert.assertEquals;
+
+import java.util.List;
+
+import org.apache.phoenix.mapreduce.util.ColumnInfoToStringEncoderDecoder;
+import org.apache.phoenix.schema.PDataType;
+import org.apache.phoenix.util.ColumnInfo;
+import org.junit.Test;
+
+import com.google.common.collect.Lists;
+
+/**
+ * Tests methods on {@link ColumnInfoToStringEncoderDecoder}
+ */
+public class ColumnInfoToStringEncoderDecoderTest {
+
+    @Test
+    public void testEncode() {
+        final ColumnInfo columnInfo = new ColumnInfo("col1", PDataType.VARCHAR.getSqlType());
+        final String encodedColumnInfo = ColumnInfoToStringEncoderDecoder.encode(Lists.newArrayList(columnInfo));
+        assertEquals(columnInfo.toString(),encodedColumnInfo);
+    }
+    
+    @Test
+    public void testDecode() {
+        final ColumnInfo columnInfo = new ColumnInfo("col1", PDataType.VARCHAR.getSqlType());
+        final String encodedColumnInfo = ColumnInfoToStringEncoderDecoder.encode(Lists.newArrayList(columnInfo));
+        assertEquals(columnInfo.toString(),encodedColumnInfo);
+    }
+    
+    @Test
+    public void testEncodeDecodeWithNulls() {
+        final ColumnInfo columnInfo1 = new ColumnInfo("col1", PDataType.VARCHAR.getSqlType());
+        final ColumnInfo columnInfo2 = null;
+        final String columnInfoStr = ColumnInfoToStringEncoderDecoder.encode(Lists.newArrayList(columnInfo1,columnInfo2));
+        final List<ColumnInfo> decodedColumnInfo = ColumnInfoToStringEncoderDecoder.decode(columnInfoStr);
+        assertEquals(1,decodedColumnInfo.size()); 
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/phoenix/blob/cabb16f7/phoenix-core/src/test/java/org/apache/phoenix/mapreduce/util/PhoenixConfigurationUtilTest.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/test/java/org/apache/phoenix/mapreduce/util/PhoenixConfigurationUtilTest.java b/phoenix-core/src/test/java/org/apache/phoenix/mapreduce/util/PhoenixConfigurationUtilTest.java
new file mode 100644
index 0000000..33c7531
--- /dev/null
+++ b/phoenix-core/src/test/java/org/apache/phoenix/mapreduce/util/PhoenixConfigurationUtilTest.java
@@ -0,0 +1,124 @@
+/*
+ * Copyright 2010 The Apache Software Foundation
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ *distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you maynot use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicablelaw or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.phoenix.mapreduce.util;
+
+import static org.junit.Assert.assertEquals;
+
+import java.sql.Connection;
+import java.sql.DriverManager;
+import java.util.List;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.HConstants;
+import org.apache.phoenix.mapreduce.util.PhoenixConfigurationUtil.SchemaType;
+import org.apache.phoenix.query.BaseConnectionlessQueryTest;
+import org.apache.phoenix.util.ColumnInfo;
+import org.apache.phoenix.util.PropertiesUtil;
+import org.apache.phoenix.util.SchemaUtil;
+import org.apache.phoenix.util.TestUtil;
+import org.junit.Test;
+
+/**
+ * Test for {@link PhoenixConfigurationUtil}
+ */
+public class PhoenixConfigurationUtilTest extends BaseConnectionlessQueryTest {
+    
+    @Test
+    public void testUpsertStatement() throws Exception {
+        Connection conn = DriverManager.getConnection(getUrl(), PropertiesUtil.deepCopy(TestUtil.TEST_PROPERTIES));
+        final String tableName = "TEST_TABLE";
+        try {
+            String ddl = "CREATE TABLE "+ tableName + 
+                    "  (a_string varchar not null, a_binary varbinary not null, col1 integer" +
+                    "  CONSTRAINT pk PRIMARY KEY (a_string, a_binary))\n";
+            conn.createStatement().execute(ddl);
+            final Configuration configuration = new Configuration ();
+            configuration.set(HConstants.ZOOKEEPER_QUORUM, getUrl());
+            PhoenixConfigurationUtil.setOutputTableName(configuration, tableName);
+            final String upserStatement = PhoenixConfigurationUtil.getUpsertStatement(configuration);
+            final String expectedUpsertStatement = "UPSERT INTO " + tableName + " VALUES (?, ?, ?)"; 
+            assertEquals(expectedUpsertStatement, upserStatement);
+        } finally {
+            conn.close();
+        }
+     }
+
+    @Test
+    public void testSelectStatement() throws Exception {
+        Connection conn = DriverManager.getConnection(getUrl(), PropertiesUtil.deepCopy(TestUtil.TEST_PROPERTIES));
+        final String tableName = "TEST_TABLE";
+        try {
+            String ddl = "CREATE TABLE "+ tableName + 
+                    "  (a_string varchar not null, a_binary varbinary not null, col1 integer" +
+                    "  CONSTRAINT pk PRIMARY KEY (a_string, a_binary))\n";
+            conn.createStatement().execute(ddl);
+            final Configuration configuration = new Configuration ();
+            configuration.set(HConstants.ZOOKEEPER_QUORUM, getUrl());
+            PhoenixConfigurationUtil.setInputTableName(configuration, tableName);
+            final String selectStatement = PhoenixConfigurationUtil.getSelectStatement(configuration);
+            final String expectedSelectStatement = "SELECT \"A_STRING\",\"A_BINARY\",\"0\".\"COL1\" FROM " + SchemaUtil.getEscapedArgument(tableName) ; 
+            assertEquals(expectedSelectStatement, selectStatement);
+        } finally {
+            conn.close();
+        }
+    }
+    
+    @Test
+    public void testSelectStatementForSpecificColumns() throws Exception {
+        Connection conn = DriverManager.getConnection(getUrl(), PropertiesUtil.deepCopy(TestUtil.TEST_PROPERTIES));
+        final String tableName = "TEST_TABLE";
+        try {
+            String ddl = "CREATE TABLE "+ tableName + 
+                    "  (a_string varchar not null, a_binary varbinary not null, col1 integer" +
+                    "  CONSTRAINT pk PRIMARY KEY (a_string, a_binary))\n";
+            conn.createStatement().execute(ddl);
+            final Configuration configuration = new Configuration ();
+            configuration.set(HConstants.ZOOKEEPER_QUORUM, getUrl());
+            PhoenixConfigurationUtil.setInputTableName(configuration, tableName);
+            PhoenixConfigurationUtil.setSelectColumnNames(configuration, "A_BINARY");
+            final String selectStatement = PhoenixConfigurationUtil.getSelectStatement(configuration);
+            final String expectedSelectStatement = "SELECT \"A_BINARY\" FROM " + SchemaUtil.getEscapedArgument(tableName) ; 
+            assertEquals(expectedSelectStatement, selectStatement);
+        } finally {
+            conn.close();
+        }
+    }
+    
+    @Test
+    public void testSelectStatementForArrayTypes() throws Exception {
+        Connection conn = DriverManager.getConnection(getUrl(), PropertiesUtil.deepCopy(TestUtil.TEST_PROPERTIES));
+        final String tableName = "TEST_TABLE";
+        try {
+            String ddl = "CREATE TABLE "+ tableName + 
+                    "  (ID BIGINT NOT NULL PRIMARY KEY, VCARRAY VARCHAR[])\n";
+            conn.createStatement().execute(ddl);
+            final Configuration configuration = new Configuration ();
+            configuration.set(HConstants.ZOOKEEPER_QUORUM, getUrl());
+            PhoenixConfigurationUtil.setSelectColumnNames(configuration,"ID,VCARRAY");
+            PhoenixConfigurationUtil.setSchemaType(configuration, SchemaType.QUERY);
+            PhoenixConfigurationUtil.setInputTableName(configuration, tableName);
+            final String selectStatement = PhoenixConfigurationUtil.getSelectStatement(configuration);
+            final String expectedSelectStatement = "SELECT \"ID\",\"0\".\"VCARRAY\" FROM " + SchemaUtil.getEscapedArgument(tableName) ; 
+            assertEquals(expectedSelectStatement, selectStatement);
+        } finally {
+            conn.close();
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/phoenix/blob/cabb16f7/phoenix-core/src/test/java/org/apache/phoenix/util/QueryUtilTest.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/test/java/org/apache/phoenix/util/QueryUtilTest.java b/phoenix-core/src/test/java/org/apache/phoenix/util/QueryUtilTest.java
index 0ac2bbc..5a36817 100644
--- a/phoenix-core/src/test/java/org/apache/phoenix/util/QueryUtilTest.java
+++ b/phoenix-core/src/test/java/org/apache/phoenix/util/QueryUtilTest.java
@@ -59,6 +59,6 @@ public class QueryUtilTest {
     public void testConstructSelectStatement() {
         assertEquals(
                 "SELECT \"ID\",\"NAME\" FROM \"MYTAB\"",
-                QueryUtil.constructSelectStatement("MYTAB", ImmutableList.of(ID_COLUMN,NAME_COLUMN)));
+                QueryUtil.constructSelectStatement("MYTAB", ImmutableList.of(ID_COLUMN,NAME_COLUMN),null));
     }
 }

http://git-wip-us.apache.org/repos/asf/phoenix/blob/cabb16f7/phoenix-pig/src/it/java/org/apache/phoenix/pig/PhoenixPigConfigurationIT.java
----------------------------------------------------------------------
diff --git a/phoenix-pig/src/it/java/org/apache/phoenix/pig/PhoenixPigConfigurationIT.java b/phoenix-pig/src/it/java/org/apache/phoenix/pig/PhoenixPigConfigurationIT.java
deleted file mode 100644
index efbfbf8..0000000
--- a/phoenix-pig/src/it/java/org/apache/phoenix/pig/PhoenixPigConfigurationIT.java
+++ /dev/null
@@ -1,109 +0,0 @@
-/*
- * Copyright 2010 The Apache Software Foundation
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- *distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you maynot use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicablelaw or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.phoenix.pig;
-
-import static org.apache.phoenix.util.PhoenixRuntime.JDBC_PROTOCOL_TERMINATOR;
-import static org.apache.phoenix.util.PhoenixRuntime.PHOENIX_TEST_DRIVER_URL_PARAM;
-import static org.apache.phoenix.util.TestUtil.TEST_PROPERTIES;
-import static org.junit.Assert.assertEquals;
-
-import java.sql.Connection;
-import java.sql.DriverManager;
-import java.util.Properties;
-
-import org.apache.hadoop.conf.Configuration;
-import org.apache.phoenix.end2end.BaseHBaseManagedTimeIT;
-import org.apache.phoenix.util.PropertiesUtil;
-import org.apache.phoenix.util.SchemaUtil;
-import org.apache.phoenix.util.TestUtil;
-import org.junit.Test;
-
-
-public class PhoenixPigConfigurationIT extends BaseHBaseManagedTimeIT {
-    private static final String zkQuorum = TestUtil.LOCALHOST + JDBC_PROTOCOL_TERMINATOR + PHOENIX_TEST_DRIVER_URL_PARAM;
-    
-    @Test
-    public void testUpsertStatement() throws Exception {
-        Properties props = PropertiesUtil.deepCopy(TEST_PROPERTIES);
-        Connection conn = DriverManager.getConnection(getUrl(), props);
-        conn.setAutoCommit(false);
-        final String tableName = "TEST_TABLE";
-        try {
-            String ddl = "CREATE TABLE "+ tableName + 
-                    "  (a_string varchar not null, a_binary varbinary not null, col1 integer" +
-                    "  CONSTRAINT pk PRIMARY KEY (a_string, a_binary))\n";
-            createTestTable(getUrl(), ddl);
-            final PhoenixPigConfiguration configuration = newConfiguration (tableName);
-            final String upserStatement = configuration.getUpsertStatement();
-            final String expectedUpsertStatement = "UPSERT INTO " + tableName + " VALUES (?, ?, ?)"; 
-            assertEquals(expectedUpsertStatement, upserStatement);
-        } finally {
-            conn.close();
-        }
-    }
-    
-    @Test
-    public void testSelectStatement() throws Exception {
-        Properties props = PropertiesUtil.deepCopy(TEST_PROPERTIES);
-        Connection conn = DriverManager.getConnection(getUrl(), props);
-        conn.setAutoCommit(false);
-        final String tableName = "TEST_TABLE";
-        try {
-            String ddl = "CREATE TABLE "+ tableName + 
-                    "  (a_string varchar not null, a_binary varbinary not null, col1 integer" +
-                    "  CONSTRAINT pk PRIMARY KEY (a_string, a_binary))\n";
-            createTestTable(getUrl(), ddl);
-            final PhoenixPigConfiguration configuration = newConfiguration (tableName);
-            final String selectStatement = configuration.getSelectStatement();
-            final String expectedSelectStatement = "SELECT \"A_STRING\",\"A_BINARY\",\"0\".\"COL1\" FROM " + SchemaUtil.getEscapedArgument(tableName) ; 
-            assertEquals(expectedSelectStatement, selectStatement);
-        } finally {
-            conn.close();
-        }
-    }
-    
-    @Test
-    public void testSelectStatementForSpecificColumns() throws Exception {
-        Properties props = PropertiesUtil.deepCopy(TEST_PROPERTIES);
-        Connection conn = DriverManager.getConnection(getUrl(), props);
-        conn.setAutoCommit(false);
-        final String tableName = "TEST_TABLE";
-        try {
-            String ddl = "CREATE TABLE "+ tableName + 
-                    "  (a_string varchar not null, a_binary varbinary not null, col1 integer" +
-                    "  CONSTRAINT pk PRIMARY KEY (a_string, a_binary))\n";
-            createTestTable(getUrl(), ddl);
-            final PhoenixPigConfiguration configuration = newConfiguration (tableName);
-            configuration.setSelectColumns("A_BINARY");
-            final String selectStatement = configuration.getSelectStatement();
-            final String expectedSelectStatement = "SELECT \"A_BINARY\" FROM " + SchemaUtil.getEscapedArgument(tableName) ; 
-            assertEquals(expectedSelectStatement, selectStatement);
-        } finally {
-            conn.close();
-        }
-    }
-
-    private PhoenixPigConfiguration newConfiguration(String tableName) {
-        final Configuration configuration = new Configuration();
-        final PhoenixPigConfiguration phoenixConfiguration = new PhoenixPigConfiguration(configuration);
-        phoenixConfiguration.configure(zkQuorum, tableName.toUpperCase(), 100);
-        return phoenixConfiguration;
-    }
-}

http://git-wip-us.apache.org/repos/asf/phoenix/blob/cabb16f7/phoenix-pig/src/main/java/org/apache/phoenix/pig/PhoenixHBaseLoader.java
----------------------------------------------------------------------
diff --git a/phoenix-pig/src/main/java/org/apache/phoenix/pig/PhoenixHBaseLoader.java b/phoenix-pig/src/main/java/org/apache/phoenix/pig/PhoenixHBaseLoader.java
index d8bedf6..1218e82 100644
--- a/phoenix-pig/src/main/java/org/apache/phoenix/pig/PhoenixHBaseLoader.java
+++ b/phoenix-pig/src/main/java/org/apache/phoenix/pig/PhoenixHBaseLoader.java
@@ -28,18 +28,21 @@ import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hbase.HConstants;
+import org.apache.hadoop.hbase.mapreduce.TableMapReduceUtil;
 import org.apache.hadoop.hbase.util.Pair;
 import org.apache.hadoop.io.NullWritable;
 import org.apache.hadoop.mapreduce.InputFormat;
 import org.apache.hadoop.mapreduce.Job;
 import org.apache.hadoop.mapreduce.RecordReader;
-import org.apache.phoenix.pig.PhoenixPigConfiguration.SchemaType;
-import org.apache.phoenix.pig.hadoop.PhoenixInputFormat;
-import org.apache.phoenix.pig.hadoop.PhoenixRecord;
+import org.apache.phoenix.mapreduce.PhoenixInputFormat;
+import org.apache.phoenix.mapreduce.util.PhoenixConfigurationUtil;
+import org.apache.phoenix.mapreduce.util.PhoenixConfigurationUtil.SchemaType;
 import org.apache.phoenix.pig.util.PhoenixPigSchemaUtil;
 import org.apache.phoenix.pig.util.QuerySchemaParserFunction;
 import org.apache.phoenix.pig.util.TableSchemaParserFunction;
 import org.apache.phoenix.pig.util.TypeUtil;
+import org.apache.phoenix.pig.writable.PhoenixPigDBWritable;
 import org.apache.pig.Expression;
 import org.apache.pig.LoadFunc;
 import org.apache.pig.LoadMetadata;
@@ -83,12 +86,12 @@ public final class PhoenixHBaseLoader extends LoadFunc implements LoadMetadata {
     private static final String PHOENIX_QUERY_SCHEME      = "hbase://query/";
     private static final String RESOURCE_SCHEMA_SIGNATURE = "phoenix.pig.schema";
    
-    private PhoenixPigConfiguration config;
+    private Configuration config;
     private String tableName;
     private String selectQuery;
     private String zkQuorum ;
-    private PhoenixInputFormat inputFormat;
-    private RecordReader<NullWritable, PhoenixRecord> reader;
+    private PhoenixInputFormat<PhoenixPigDBWritable> inputFormat;
+    private RecordReader<NullWritable,PhoenixPigDBWritable> reader;
     private String contextSignature;
     private ResourceSchema schema;
        
@@ -107,6 +110,8 @@ public final class PhoenixHBaseLoader extends LoadFunc implements LoadMetadata {
         final Configuration configuration = job.getConfiguration();
         //explicitly turning off combining splits. 
         configuration.setBoolean("pig.noSplitCombination", true);
+        //to have phoenix working on a secured cluster
+        TableMapReduceUtil.initCredentials(job);
         this.initializePhoenixPigConfiguration(location, configuration);
     }
 
@@ -120,21 +125,22 @@ public final class PhoenixHBaseLoader extends LoadFunc implements LoadMetadata {
         if(this.config != null) {
             return;
         }
-        this.config = new PhoenixPigConfiguration(configuration);
-        this.config.setServerName(this.zkQuorum);
+        this.config = configuration;
+        this.config.set(HConstants.ZOOKEEPER_QUORUM,this.zkQuorum);
+        PhoenixConfigurationUtil.setInputClass(this.config, PhoenixPigDBWritable.class);
         Pair<String,String> pair = null;
         try {
             if (location.startsWith(PHOENIX_TABLE_NAME_SCHEME)) {
                 String tableSchema = location.substring(PHOENIX_TABLE_NAME_SCHEME.length());
                 final TableSchemaParserFunction parseFunction = new TableSchemaParserFunction();
                 pair =  parseFunction.apply(tableSchema);
-				this.config.setSchemaType(SchemaType.TABLE);
+                PhoenixConfigurationUtil.setSchemaType(this.config, SchemaType.TABLE);
              } else if (location.startsWith(PHOENIX_QUERY_SCHEME)) {
                 this.selectQuery = location.substring(PHOENIX_QUERY_SCHEME.length());
                 final QuerySchemaParserFunction queryParseFunction = new QuerySchemaParserFunction(this.config);
                 pair = queryParseFunction.apply(this.selectQuery);
-                config.setSelectStatement(this.selectQuery);
-				this.config.setSchemaType(SchemaType.QUERY);
+                PhoenixConfigurationUtil.setInputQuery(this.config, this.selectQuery);
+                PhoenixConfigurationUtil.setSchemaType(this.config, SchemaType.QUERY);
             }
             this.tableName = pair.getFirst();
             final String selectedColumns = pair.getSecond();
@@ -142,9 +148,9 @@ public final class PhoenixHBaseLoader extends LoadFunc implements LoadMetadata {
             if(isEmpty(this.tableName) && isEmpty(this.selectQuery)) {
                 printUsage(location);
             }
-            this.config.setTableName(this.tableName);
+            PhoenixConfigurationUtil.setInputTableName(this.config, this.tableName);
             if(!isEmpty(selectedColumns)) {
-                this.config.setSelectColumns(selectedColumns);    
+                PhoenixConfigurationUtil.setSelectColumnNames(this.config, selectedColumns);   
             }
         } catch(IllegalArgumentException iae) {
             printUsage(location);
@@ -160,7 +166,8 @@ public final class PhoenixHBaseLoader extends LoadFunc implements LoadMetadata {
     @Override
     public InputFormat getInputFormat() throws IOException {
         if(inputFormat == null) {
-            inputFormat = new PhoenixInputFormat();
+            inputFormat = new PhoenixInputFormat<PhoenixPigDBWritable>();
+            PhoenixConfigurationUtil.setInputClass(this.config,PhoenixPigDBWritable.class);
         }
         return inputFormat;
     }
@@ -188,13 +195,13 @@ public final class PhoenixHBaseLoader extends LoadFunc implements LoadMetadata {
     public Tuple getNext() throws IOException {
         try {
             if(!reader.nextKeyValue()) {
-               return null; 
-            }
-            final PhoenixRecord phoenixRecord = reader.getCurrentValue();
-            if(phoenixRecord == null) {
+                return null; 
+             }
+             final PhoenixPigDBWritable record = reader.getCurrentValue();
+            if(record == null) {
                 return null;
             }
-            final Tuple tuple = TypeUtil.transformToTuple(phoenixRecord,schema.getFields());
+            final Tuple tuple = TypeUtil.transformToTuple(record,schema.getFields());
             return tuple;
        } catch (InterruptedException e) {
             int errCode = 6018;