You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@phoenix.apache.org by ra...@apache.org on 2014/12/09 03:03:12 UTC

[1/3] phoenix git commit: PHOENIX-1454 Map Reduce over Phoenix tables

Repository: phoenix
Updated Branches:
  refs/heads/4.0 0de7863bc -> b70f3abd2


http://git-wip-us.apache.org/repos/asf/phoenix/blob/b70f3abd/phoenix-pig/src/test/java/org/apache/phoenix/pig/util/QuerySchemaParserFunctionTest.java
----------------------------------------------------------------------
diff --git a/phoenix-pig/src/test/java/org/apache/phoenix/pig/util/QuerySchemaParserFunctionTest.java b/phoenix-pig/src/test/java/org/apache/phoenix/pig/util/QuerySchemaParserFunctionTest.java
index 3daf4e1..27fd879 100644
--- a/phoenix-pig/src/test/java/org/apache/phoenix/pig/util/QuerySchemaParserFunctionTest.java
+++ b/phoenix-pig/src/test/java/org/apache/phoenix/pig/util/QuerySchemaParserFunctionTest.java
@@ -22,15 +22,12 @@ package org.apache.phoenix.pig.util;
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.fail;
 
-import java.sql.Connection;
-import java.sql.DriverManager;
 import java.sql.SQLException;
 
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.HConstants;
 import org.apache.hadoop.hbase.util.Pair;
-import org.apache.phoenix.pig.PhoenixPigConfiguration;
-import org.apache.phoenix.pig.util.QuerySchemaParserFunction;
 import org.apache.phoenix.query.BaseConnectionlessQueryTest;
-import org.junit.After;
 import org.junit.Before;
 import org.junit.Test;
 import org.mockito.Mockito;
@@ -44,16 +41,14 @@ import com.google.common.base.Joiner;
  */
 public class QuerySchemaParserFunctionTest extends BaseConnectionlessQueryTest {
 
-    private PhoenixPigConfiguration phoenixConfiguration;
-    private Connection conn;
+    private Configuration configuration;
     private QuerySchemaParserFunction function;
     
     @Before
     public void setUp() throws SQLException {
-        phoenixConfiguration = Mockito.mock(PhoenixPigConfiguration.class);
-        conn = DriverManager.getConnection(getUrl());
-        Mockito.when(phoenixConfiguration.getConnection()).thenReturn(conn);
-        function = new QuerySchemaParserFunction(phoenixConfiguration);
+        configuration = Mockito.mock(Configuration.class);
+        Mockito.when(configuration.get(HConstants.ZOOKEEPER_QUORUM)).thenReturn(getUrl());
+        function = new QuerySchemaParserFunction(configuration);
     }
     
     @Test(expected=RuntimeException.class)
@@ -101,9 +96,4 @@ public class QuerySchemaParserFunctionTest extends BaseConnectionlessQueryTest {
         function.apply(selectQuery);
         fail(" Function call successful despite passing an aggreagate query");
     }
-
-    @After
-    public void tearDown() throws SQLException {
-        conn.close();
-    }
 }

http://git-wip-us.apache.org/repos/asf/phoenix/blob/b70f3abd/phoenix-pig/src/test/java/org/apache/phoenix/pig/util/SqlQueryToColumnInfoFunctionTest.java
----------------------------------------------------------------------
diff --git a/phoenix-pig/src/test/java/org/apache/phoenix/pig/util/SqlQueryToColumnInfoFunctionTest.java b/phoenix-pig/src/test/java/org/apache/phoenix/pig/util/SqlQueryToColumnInfoFunctionTest.java
index 1bd16e3..667d814 100644
--- a/phoenix-pig/src/test/java/org/apache/phoenix/pig/util/SqlQueryToColumnInfoFunctionTest.java
+++ b/phoenix-pig/src/test/java/org/apache/phoenix/pig/util/SqlQueryToColumnInfoFunctionTest.java
@@ -19,16 +19,14 @@
  */
 package org.apache.phoenix.pig.util;
 
-import java.sql.Connection;
-import java.sql.DriverManager;
 import java.sql.SQLException;
 import java.sql.Types;
 import java.util.List;
 
-import org.apache.phoenix.pig.PhoenixPigConfiguration;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.HConstants;
 import org.apache.phoenix.query.BaseConnectionlessQueryTest;
 import org.apache.phoenix.util.ColumnInfo;
-import org.junit.After;
 import org.junit.Assert;
 import org.junit.Before;
 import org.junit.Test;
@@ -38,16 +36,14 @@ import com.google.common.collect.ImmutableList;
 
 public class SqlQueryToColumnInfoFunctionTest  extends BaseConnectionlessQueryTest {
 
-    private PhoenixPigConfiguration phoenixConfiguration;
-    private Connection conn;
+    private Configuration configuration;
     private SqlQueryToColumnInfoFunction function;
     
     @Before
     public void setUp() throws SQLException {
-        phoenixConfiguration = Mockito.mock(PhoenixPigConfiguration.class);
-        conn = DriverManager.getConnection(getUrl());
-        Mockito.when(phoenixConfiguration.getConnection()).thenReturn(conn);
-        function = new SqlQueryToColumnInfoFunction(phoenixConfiguration);
+        configuration = Mockito.mock(Configuration.class);
+        Mockito.when(configuration.get(HConstants.ZOOKEEPER_QUORUM)).thenReturn(getUrl());
+        function = new SqlQueryToColumnInfoFunction(configuration);
     }
     
     @Test
@@ -66,10 +62,4 @@ public class SqlQueryToColumnInfoFunctionTest  extends BaseConnectionlessQueryTe
         Assert.assertEquals(expectedColumnInfos, actualColumnInfos);
         
     }
-    
-    @After
-    public void tearDown() throws SQLException {
-        conn.close();
-    }
-
 }


[3/3] phoenix git commit: PHOENIX-1454 Map Reduce over Phoenix tables

Posted by ra...@apache.org.
PHOENIX-1454 Map Reduce over Phoenix tables


Project: http://git-wip-us.apache.org/repos/asf/phoenix/repo
Commit: http://git-wip-us.apache.org/repos/asf/phoenix/commit/b70f3abd
Tree: http://git-wip-us.apache.org/repos/asf/phoenix/tree/b70f3abd
Diff: http://git-wip-us.apache.org/repos/asf/phoenix/diff/b70f3abd

Branch: refs/heads/4.0
Commit: b70f3abd2f7e0d4393a5235a46de73f445ea51d1
Parents: 0de7863
Author: ravimagham <ra...@apache.org>
Authored: Mon Dec 8 18:02:57 2014 -0800
Committer: ravimagham <ra...@apache.org>
Committed: Mon Dec 8 18:02:57 2014 -0800

----------------------------------------------------------------------
 .../phoenix/mapreduce/PhoenixInputFormat.java   | 117 +++++++
 .../phoenix/mapreduce/PhoenixInputSplit.java    | 129 +++++++
 .../mapreduce/PhoenixOutputCommitter.java       |  54 +++
 .../phoenix/mapreduce/PhoenixOutputFormat.java  |  62 ++++
 .../phoenix/mapreduce/PhoenixRecordReader.java  | 140 ++++++++
 .../phoenix/mapreduce/PhoenixRecordWriter.java  |  91 +++++
 .../util/ColumnInfoToStringEncoderDecoder.java  |  65 ++++
 .../phoenix/mapreduce/util/ConnectionUtil.java  |  49 +++
 .../util/PhoenixConfigurationUtil.java          | 299 ++++++++++++++++
 .../mapreduce/util/PhoenixMapReduceUtil.java    |  99 ++++++
 .../org/apache/phoenix/util/PhoenixRuntime.java |   6 +-
 .../java/org/apache/phoenix/util/QueryUtil.java |  22 +-
 .../ColumnInfoToStringEncoderDecoderTest.java   |  61 ++++
 .../util/PhoenixConfigurationUtilTest.java      | 124 +++++++
 .../org/apache/phoenix/util/QueryUtilTest.java  |   2 +-
 .../phoenix/pig/PhoenixPigConfigurationIT.java  | 109 ------
 .../apache/phoenix/pig/PhoenixHBaseLoader.java  |  45 +--
 .../apache/phoenix/pig/PhoenixHBaseStorage.java | 218 ++++++------
 .../phoenix/pig/PhoenixPigConfiguration.java    | 340 -------------------
 .../phoenix/pig/hadoop/PhoenixInputFormat.java  | 142 --------
 .../phoenix/pig/hadoop/PhoenixInputSplit.java   | 134 --------
 .../pig/hadoop/PhoenixOutputCommitter.java      | 111 ------
 .../phoenix/pig/hadoop/PhoenixOutputFormat.java |  94 -----
 .../phoenix/pig/hadoop/PhoenixRecord.java       | 112 ------
 .../phoenix/pig/hadoop/PhoenixRecordReader.java | 142 --------
 .../phoenix/pig/hadoop/PhoenixRecordWriter.java |  83 -----
 .../util/ColumnInfoToStringEncoderDecoder.java  |  69 ----
 .../phoenix/pig/util/PhoenixPigSchemaUtil.java  |  27 +-
 .../pig/util/QuerySchemaParserFunction.java     |  21 +-
 .../pig/util/SqlQueryToColumnInfoFunction.java  |  51 ++-
 .../org/apache/phoenix/pig/util/TypeUtil.java   |   4 +-
 .../pig/writable/PhoenixPigDBWritable.java      | 121 +++++++
 .../pig/PhoenixPigConfigurationTest.java        |  49 ---
 .../ColumnInfoToStringEncoderDecoderTest.java   |  61 ----
 .../pig/util/PhoenixPigSchemaUtilTest.java      |  17 +-
 .../pig/util/QuerySchemaParserFunctionTest.java |  22 +-
 .../util/SqlQueryToColumnInfoFunctionTest.java  |  22 +-
 37 files changed, 1642 insertions(+), 1672 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/phoenix/blob/b70f3abd/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/PhoenixInputFormat.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/PhoenixInputFormat.java b/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/PhoenixInputFormat.java
new file mode 100644
index 0000000..7c67c2c
--- /dev/null
+++ b/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/PhoenixInputFormat.java
@@ -0,0 +1,117 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.phoenix.mapreduce;
+
+import java.io.IOException;
+import java.sql.Connection;
+import java.sql.SQLException;
+import java.sql.Statement;
+import java.util.List;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.client.Scan;
+import org.apache.hadoop.io.NullWritable;
+import org.apache.hadoop.mapreduce.InputFormat;
+import org.apache.hadoop.mapreduce.InputSplit;
+import org.apache.hadoop.mapreduce.JobContext;
+import org.apache.hadoop.mapreduce.RecordReader;
+import org.apache.hadoop.mapreduce.TaskAttemptContext;
+import org.apache.hadoop.mapreduce.lib.db.DBWritable;
+import org.apache.phoenix.compile.QueryPlan;
+import org.apache.phoenix.jdbc.PhoenixStatement;
+import org.apache.phoenix.mapreduce.util.ConnectionUtil;
+import org.apache.phoenix.mapreduce.util.PhoenixConfigurationUtil;
+import org.apache.phoenix.query.KeyRange;
+
+import com.google.common.base.Preconditions;
+import com.google.common.collect.Lists;
+
+/**
+ * {@link InputFormat} implementation from Phoenix.
+ * 
+ */
+public class PhoenixInputFormat<T extends DBWritable> extends InputFormat<NullWritable,T> {
+
+    private static final Log LOG = LogFactory.getLog(PhoenixInputFormat.class);
+       
+    /**
+     * instantiated by framework
+     */
+    public PhoenixInputFormat() {
+    }
+
+    @Override
+    public RecordReader<NullWritable,T> createRecordReader(InputSplit split, TaskAttemptContext context)
+            throws IOException, InterruptedException {
+        
+        final Configuration configuration = context.getConfiguration();
+        final QueryPlan queryPlan = getQueryPlan(context,configuration);
+        @SuppressWarnings("unchecked")
+        final Class<T> inputClass = (Class<T>) PhoenixConfigurationUtil.getInputClass(configuration);
+        return new PhoenixRecordReader<T>(inputClass , configuration, queryPlan);
+    }
+    
+   
+
+    @Override
+    public List<InputSplit> getSplits(JobContext context) throws IOException, InterruptedException {  
+        final Configuration configuration = context.getConfiguration();
+        final QueryPlan queryPlan = getQueryPlan(context,configuration);
+        final List<KeyRange> allSplits = queryPlan.getSplits();
+        final List<InputSplit> splits = generateSplits(queryPlan,allSplits);
+        return splits;
+    }
+
+    private List<InputSplit> generateSplits(final QueryPlan qplan, final List<KeyRange> splits) throws IOException {
+        Preconditions.checkNotNull(qplan);
+        Preconditions.checkNotNull(splits);
+        final List<InputSplit> psplits = Lists.newArrayListWithExpectedSize(splits.size());
+        for (List<Scan> scans : qplan.getScans()) {
+            psplits.add(new PhoenixInputSplit(scans));
+        }
+        return psplits;
+    }
+    
+    /**
+     * Returns the query plan associated with the select query.
+     * @param context
+     * @return
+     * @throws IOException
+     * @throws SQLException
+     */
+    private QueryPlan getQueryPlan(final JobContext context,final Configuration configuration) throws IOException {
+        Preconditions.checkNotNull(context);
+        try{
+            final Connection connection = ConnectionUtil.getConnection(configuration);
+            final String selectStatement = PhoenixConfigurationUtil.getSelectStatement(configuration);
+            Preconditions.checkNotNull(selectStatement);
+            final Statement statement = connection.createStatement();
+            final PhoenixStatement pstmt = statement.unwrap(PhoenixStatement.class);
+            // Optimize the query plan so that we potentially use secondary indexes
+            final QueryPlan queryPlan = pstmt.optimizeQuery(selectStatement);
+            // Initialize the query plan so it sets up the parallel scans
+            queryPlan.iterator();
+            return queryPlan;
+        } catch(Exception exception) {
+            LOG.error(String.format("Failed to get the query plan with error [%s]",exception.getMessage()));
+            throw new RuntimeException(exception);
+        }
+   }
+}

http://git-wip-us.apache.org/repos/asf/phoenix/blob/b70f3abd/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/PhoenixInputSplit.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/PhoenixInputSplit.java b/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/PhoenixInputSplit.java
new file mode 100644
index 0000000..b222fc9
--- /dev/null
+++ b/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/PhoenixInputSplit.java
@@ -0,0 +1,129 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.phoenix.mapreduce;
+
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+import java.util.List;
+
+import org.apache.hadoop.hbase.client.Scan;
+import org.apache.hadoop.hbase.protobuf.ProtobufUtil;
+import org.apache.hadoop.hbase.protobuf.generated.ClientProtos;
+import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.io.WritableUtils;
+import org.apache.hadoop.mapreduce.InputSplit;
+import org.apache.phoenix.query.KeyRange;
+
+import com.google.common.base.Preconditions;
+import com.google.common.collect.Lists;
+
+/**
+ * Input split class to hold the lower and upper bound range. {@link KeyRange}
+ */
+public class PhoenixInputSplit extends InputSplit implements Writable {
+
+    private List<Scan> scans;
+    private KeyRange keyRange;
+   
+    /**
+     * No Arg constructor
+     */
+    public PhoenixInputSplit() {
+    }
+    
+   /**
+    * 
+    * @param keyRange
+    */
+    public PhoenixInputSplit(final List<Scan> scans) {
+        Preconditions.checkNotNull(scans);
+        Preconditions.checkState(!scans.isEmpty());
+        this.scans = scans;
+        init();
+    }
+    
+    public List<Scan> getScans() {
+        return scans;
+    }
+    
+    public KeyRange getKeyRange() {
+        return keyRange;
+    }
+    
+    private void init() {
+        this.keyRange = KeyRange.getKeyRange(scans.get(0).getStartRow(), scans.get(scans.size()-1).getStopRow());
+    }
+    
+    @Override
+    public void readFields(DataInput input) throws IOException {
+        int count = WritableUtils.readVInt(input);
+        scans = Lists.newArrayListWithExpectedSize(count);
+        for (int i = 0; i < count; i++) {
+            byte[] protoScanBytes = new byte[WritableUtils.readVInt(input)];
+            input.readFully(protoScanBytes);
+            ClientProtos.Scan protoScan = ClientProtos.Scan.parseFrom(protoScanBytes);
+            Scan scan = ProtobufUtil.toScan(protoScan);
+            scans.add(scan);
+        }
+        init();
+    }
+    
+    @Override
+    public void write(DataOutput output) throws IOException {
+        Preconditions.checkNotNull(scans);
+        WritableUtils.writeVInt(output, scans.size());
+        for (Scan scan : scans) {
+            ClientProtos.Scan protoScan = ProtobufUtil.toScan(scan);
+            byte[] protoScanBytes = protoScan.toByteArray();
+            WritableUtils.writeVInt(output, protoScanBytes.length);
+            output.write(protoScanBytes);
+        }
+    }
+
+    @Override
+    public long getLength() throws IOException, InterruptedException {
+         return 0;
+    }
+
+    @Override
+    public String[] getLocations() throws IOException, InterruptedException {
+        return new String[]{};
+    }
+
+    @Override
+    public int hashCode() {
+        final int prime = 31;
+        int result = 1;
+        result = prime * result + keyRange.hashCode();
+        return result;
+    }
+
+    @Override
+    public boolean equals(Object obj) {
+        if (this == obj) { return true; }
+        if (obj == null) { return false; }
+        if (!(obj instanceof PhoenixInputSplit)) { return false; }
+        PhoenixInputSplit other = (PhoenixInputSplit)obj;
+        if (keyRange == null) {
+            if (other.keyRange != null) { return false; }
+        } else if (!keyRange.equals(other.keyRange)) { return false; }
+        return true;
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/phoenix/blob/b70f3abd/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/PhoenixOutputCommitter.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/PhoenixOutputCommitter.java b/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/PhoenixOutputCommitter.java
new file mode 100644
index 0000000..ffee5c7
--- /dev/null
+++ b/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/PhoenixOutputCommitter.java
@@ -0,0 +1,54 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.phoenix.mapreduce;
+
+import java.io.IOException;
+
+import org.apache.hadoop.mapreduce.JobContext;
+import org.apache.hadoop.mapreduce.OutputCommitter;
+import org.apache.hadoop.mapreduce.TaskAttemptContext;
+
+/**
+ * A no-op {@link OutputCommitter}
+ */
+public class PhoenixOutputCommitter extends OutputCommitter {
+    
+    public PhoenixOutputCommitter() {}
+
+    @Override
+    public void abortTask(TaskAttemptContext context) throws IOException {
+    }
+
+    @Override
+    public void commitTask(TaskAttemptContext context) throws IOException {
+    }
+
+    @Override
+    public boolean needsTaskCommit(TaskAttemptContext context) throws IOException {
+        return true;
+    }
+
+    @Override
+    public void setupJob(JobContext jobContext) throws IOException {        
+    }
+
+    @Override
+    public void setupTask(TaskAttemptContext context) throws IOException {
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/phoenix/blob/b70f3abd/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/PhoenixOutputFormat.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/PhoenixOutputFormat.java b/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/PhoenixOutputFormat.java
new file mode 100644
index 0000000..e55b977
--- /dev/null
+++ b/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/PhoenixOutputFormat.java
@@ -0,0 +1,62 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.phoenix.mapreduce;
+
+import java.io.IOException;
+import java.sql.SQLException;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.io.NullWritable;
+import org.apache.hadoop.mapreduce.JobContext;
+import org.apache.hadoop.mapreduce.OutputCommitter;
+import org.apache.hadoop.mapreduce.OutputFormat;
+import org.apache.hadoop.mapreduce.RecordWriter;
+import org.apache.hadoop.mapreduce.TaskAttemptContext;
+import org.apache.hadoop.mapreduce.lib.db.DBWritable;
+
+/**
+ * {@link OutputFormat} implementation for Phoenix.
+ *
+ */
+public class PhoenixOutputFormat <T extends DBWritable> extends OutputFormat<NullWritable,T> {
+    private static final Log LOG = LogFactory.getLog(PhoenixOutputFormat.class);
+    
+    @Override
+    public void checkOutputSpecs(JobContext jobContext) throws IOException, InterruptedException {      
+    }
+    
+    /**
+     * 
+     */
+    @Override
+    public OutputCommitter getOutputCommitter(TaskAttemptContext context) throws IOException, InterruptedException {
+        return new PhoenixOutputCommitter();
+    }
+
+    @Override
+    public RecordWriter<NullWritable, T> getRecordWriter(TaskAttemptContext context) throws IOException, InterruptedException {
+        try {
+            return new PhoenixRecordWriter<T>(context.getConfiguration());
+        } catch (SQLException e) {
+            LOG.error("Error calling PhoenixRecordWriter "  + e.getMessage());
+            throw new RuntimeException(e);
+        }
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/phoenix/blob/b70f3abd/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/PhoenixRecordReader.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/PhoenixRecordReader.java b/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/PhoenixRecordReader.java
new file mode 100644
index 0000000..2c206ab
--- /dev/null
+++ b/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/PhoenixRecordReader.java
@@ -0,0 +1,140 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.phoenix.mapreduce;
+
+import java.io.IOException;
+import java.sql.SQLException;
+import java.util.List;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.client.Scan;
+import org.apache.hadoop.io.NullWritable;
+import org.apache.hadoop.mapreduce.InputSplit;
+import org.apache.hadoop.mapreduce.RecordReader;
+import org.apache.hadoop.mapreduce.TaskAttemptContext;
+import org.apache.hadoop.mapreduce.lib.db.DBWritable;
+import org.apache.hadoop.util.ReflectionUtils;
+import org.apache.phoenix.compile.QueryPlan;
+import org.apache.phoenix.iterate.ConcatResultIterator;
+import org.apache.phoenix.iterate.LookAheadResultIterator;
+import org.apache.phoenix.iterate.PeekingResultIterator;
+import org.apache.phoenix.iterate.ResultIterator;
+import org.apache.phoenix.iterate.SequenceResultIterator;
+import org.apache.phoenix.iterate.TableResultIterator;
+import org.apache.phoenix.jdbc.PhoenixResultSet;
+
+import com.google.common.base.Preconditions;
+import com.google.common.base.Throwables;
+import com.google.common.collect.Lists;
+
+/**
+ * {@link RecordReader} implementation that iterates over the the records.
+ */
+public class PhoenixRecordReader<T extends DBWritable> extends RecordReader<NullWritable,T> {
+    
+    private static final Log LOG = LogFactory.getLog(PhoenixRecordReader.class);
+    private final Configuration  configuration;
+    private final QueryPlan queryPlan;
+    private NullWritable key =  NullWritable.get();
+    private T value = null;
+    private Class<T> inputClass;
+    private ResultIterator resultIterator = null;
+    private PhoenixResultSet resultSet;
+    
+    public PhoenixRecordReader(Class<T> inputClass,final Configuration configuration,final QueryPlan queryPlan) {
+        Preconditions.checkNotNull(configuration);
+        Preconditions.checkNotNull(queryPlan);
+        this.inputClass = inputClass;
+        this.configuration = configuration;
+        this.queryPlan = queryPlan;
+    }
+
+    @Override
+    public void close() throws IOException {
+       if(resultIterator != null) {
+           try {
+               resultIterator.close();
+        } catch (SQLException e) {
+           LOG.error(" Error closing resultset.");
+           throw new RuntimeException(e);
+        }
+       }
+    }
+
+    @Override
+    public NullWritable getCurrentKey() throws IOException, InterruptedException {
+        return key;
+    }
+
+    @Override
+    public T getCurrentValue() throws IOException, InterruptedException {
+        return value;
+    }
+
+    @Override
+    public float getProgress() throws IOException, InterruptedException {
+        return 0;
+    }
+
+    @Override
+    public void initialize(InputSplit split, TaskAttemptContext context) throws IOException, InterruptedException {
+        final PhoenixInputSplit pSplit = (PhoenixInputSplit)split;
+        final List<Scan> scans = pSplit.getScans();
+        try {
+            List<PeekingResultIterator> iterators = Lists.newArrayListWithExpectedSize(scans.size());
+            for (Scan scan : scans) {
+                final TableResultIterator tableResultIterator = new TableResultIterator(queryPlan.getContext(), queryPlan.getTableRef(),scan);
+                PeekingResultIterator peekingResultIterator = LookAheadResultIterator.wrap(tableResultIterator);
+                iterators.add(peekingResultIterator);
+            }
+            ResultIterator iterator = ConcatResultIterator.newIterator(iterators);
+            if(queryPlan.getContext().getSequenceManager().getSequenceCount() > 0) {
+                iterator = new SequenceResultIterator(iterator, queryPlan.getContext().getSequenceManager());
+            }
+            this.resultIterator = iterator;
+            this.resultSet = new PhoenixResultSet(this.resultIterator, queryPlan.getProjector(),queryPlan.getContext().getStatement());
+        } catch (SQLException e) {
+            LOG.error(String.format(" Error [%s] initializing PhoenixRecordReader. ",e.getMessage()));
+            Throwables.propagate(e);
+        }
+   }
+
+    @Override
+    public boolean nextKeyValue() throws IOException, InterruptedException {
+        if (key == null) {
+            key = NullWritable.get();
+        }
+        if (value == null) {
+            value =  ReflectionUtils.newInstance(inputClass, this.configuration);
+        }
+        Preconditions.checkNotNull(this.resultSet);
+        try {
+            if(!resultSet.next()) {
+                return false;
+            }
+            value.readFields(resultSet);
+            return true;
+        } catch (SQLException e) {
+            LOG.error(String.format(" Error [%s] occurred while iterating over the resultset. ",e.getMessage()));
+            throw new RuntimeException(e);
+        }
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/phoenix/blob/b70f3abd/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/PhoenixRecordWriter.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/PhoenixRecordWriter.java b/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/PhoenixRecordWriter.java
new file mode 100644
index 0000000..4d26bf4
--- /dev/null
+++ b/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/PhoenixRecordWriter.java
@@ -0,0 +1,91 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.phoenix.mapreduce;
+
+import java.io.IOException;
+import java.sql.Connection;
+import java.sql.PreparedStatement;
+import java.sql.SQLException;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.io.NullWritable;
+import org.apache.hadoop.mapreduce.RecordWriter;
+import org.apache.hadoop.mapreduce.TaskAttemptContext;
+import org.apache.hadoop.mapreduce.lib.db.DBWritable;
+import org.apache.phoenix.mapreduce.util.ConnectionUtil;
+import org.apache.phoenix.mapreduce.util.PhoenixConfigurationUtil;
+
+/**
+ * Default {@link RecordWriter} implementation from Phoenix
+ *
+ */
+public class PhoenixRecordWriter<T extends DBWritable>  extends RecordWriter<NullWritable, T> {
+    
+    private static final Log LOG = LogFactory.getLog(PhoenixRecordWriter.class);
+    
+    private final Connection conn;
+    private final PreparedStatement statement;
+    private final long batchSize;
+    private long numRecords = 0;
+    
+    public PhoenixRecordWriter(final Configuration configuration) throws SQLException {
+        this.conn = ConnectionUtil.getConnection(configuration);
+        this.batchSize = PhoenixConfigurationUtil.getBatchSize(configuration);
+        final String upsertQuery = PhoenixConfigurationUtil.getUpsertStatement(configuration);
+        this.statement = this.conn.prepareStatement(upsertQuery);
+    }
+
+    @Override
+    public void close(TaskAttemptContext context) throws IOException, InterruptedException {
+        try {
+            statement.executeBatch();
+            conn.commit();
+         } catch (SQLException e) {
+             LOG.error("SQLException while performing the commit for the task.");
+             throw new RuntimeException(e);
+          } finally {
+            try {
+              statement.close();
+              conn.close();
+            }
+            catch (SQLException ex) {
+              LOG.error("SQLException while closing the connection for the task.");
+              throw new RuntimeException(ex);
+            }
+          }
+    }
+
+    @Override
+    public void write(NullWritable n, T record) throws IOException, InterruptedException {      
+        try {
+            record.write(statement);
+            numRecords++;
+            statement.addBatch();
+            if (numRecords % batchSize == 0) {
+                LOG.debug("commit called on a batch of size : " + batchSize);
+                statement.executeBatch();
+                conn.commit();
+            }
+        } catch (SQLException e) {
+            throw new RuntimeException("Exception while committing to database.", e);
+        }
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/phoenix/blob/b70f3abd/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/util/ColumnInfoToStringEncoderDecoder.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/util/ColumnInfoToStringEncoderDecoder.java b/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/util/ColumnInfoToStringEncoderDecoder.java
new file mode 100644
index 0000000..ec52fba
--- /dev/null
+++ b/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/util/ColumnInfoToStringEncoderDecoder.java
@@ -0,0 +1,65 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.phoenix.mapreduce.util;
+
+import java.util.List;
+
+import org.apache.phoenix.util.ColumnInfo;
+
+import com.google.common.base.Function;
+import com.google.common.base.Joiner;
+import com.google.common.base.Preconditions;
+import com.google.common.base.Splitter;
+import com.google.common.collect.Iterables;
+import com.google.common.collect.Lists;
+
+/**
+ * A codec to transform a {@link ColumnInfo} to a {@link String} and decode back.
+ */
+public class ColumnInfoToStringEncoderDecoder {
+
+    private static final String COLUMN_INFO_DELIMITER = "|";
+    
+    private ColumnInfoToStringEncoderDecoder() {
+        
+    }
+    
+    public static String encode(List<ColumnInfo> columnInfos) {
+        Preconditions.checkNotNull(columnInfos);
+        return Joiner.on(COLUMN_INFO_DELIMITER)
+                     .skipNulls()
+                     .join(columnInfos);
+    }
+    
+    public static List<ColumnInfo> decode(final String columnInfoStr) {
+        Preconditions.checkNotNull(columnInfoStr);
+        List<ColumnInfo> columnInfos = Lists.newArrayList(
+                                Iterables.transform(
+                                        Splitter.on(COLUMN_INFO_DELIMITER).omitEmptyStrings().split(columnInfoStr),
+                                        new Function<String, ColumnInfo>() {
+                                            @Override
+                                            public ColumnInfo apply(String colInfo) {
+                                               return ColumnInfo.fromString(colInfo);
+                                            }
+                                        }));
+        return columnInfos;
+        
+    }
+
+    
+}

http://git-wip-us.apache.org/repos/asf/phoenix/blob/b70f3abd/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/util/ConnectionUtil.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/util/ConnectionUtil.java b/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/util/ConnectionUtil.java
new file mode 100644
index 0000000..0864cba
--- /dev/null
+++ b/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/util/ConnectionUtil.java
@@ -0,0 +1,49 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.phoenix.mapreduce.util;
+
+import java.sql.Connection;
+import java.sql.DriverManager;
+import java.sql.SQLException;
+import java.util.Properties;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.HConstants;
+import org.apache.phoenix.util.QueryUtil;
+
+import com.google.common.base.Preconditions;
+
+/**
+ * Utility class to return a {@link Connection} .
+ */
+public class ConnectionUtil {
+    
+    /**
+     * Returns the {@link Connection} from Configuration
+     * @param configuration
+     * @return
+     * @throws SQLException
+     */
+    public static Connection getConnection(final Configuration configuration) throws SQLException {
+        Preconditions.checkNotNull(configuration);
+        final Properties props = new Properties();
+        final Connection conn = DriverManager.getConnection(QueryUtil.getUrl(configuration.get(HConstants.ZOOKEEPER_QUORUM)), props);
+        return conn;
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/phoenix/blob/b70f3abd/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/util/PhoenixConfigurationUtil.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/util/PhoenixConfigurationUtil.java b/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/util/PhoenixConfigurationUtil.java
new file mode 100644
index 0000000..83a606b
--- /dev/null
+++ b/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/util/PhoenixConfigurationUtil.java
@@ -0,0 +1,299 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.phoenix.mapreduce.util;
+
+import static org.apache.commons.lang.StringUtils.isNotEmpty;
+
+import java.sql.Connection;
+import java.sql.SQLException;
+import java.util.List;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.mapreduce.lib.db.DBInputFormat.NullDBWritable;
+import org.apache.hadoop.mapreduce.lib.db.DBWritable;
+import org.apache.phoenix.jdbc.PhoenixConnection;
+import org.apache.phoenix.mapreduce.PhoenixInputFormat;
+import org.apache.phoenix.util.ColumnInfo;
+import org.apache.phoenix.util.PhoenixRuntime;
+import org.apache.phoenix.util.QueryUtil;
+
+import com.google.common.base.Joiner;
+import com.google.common.base.Preconditions;
+import com.google.common.base.Splitter;
+import com.google.common.collect.Lists;
+
+/**
+ * A utility class to set properties on the {#link Configuration} instance.
+ * Used as part of Map Reduce job configuration.
+ * 
+ */
+public final class PhoenixConfigurationUtil {
+
+    private static final Log LOG = LogFactory.getLog(PhoenixInputFormat.class);
+    
+    public static final String UPSERT_COLUMNS = "phoenix.upsert.columns";
+    
+    public static final String UPSERT_STATEMENT = "phoenix.upsert.stmt";
+    
+    public static final String UPSERT_COLUMN_INFO_KEY  = "phoenix.upsert.columninfos.list";
+    
+    public static final String SELECT_STATEMENT = "phoenix.select.stmt";
+    
+    public static final String UPSERT_BATCH_SIZE = "phoenix.upsert.batch.size";
+    
+    public static final String SELECT_COLUMNS = "phoneix.select.query.columns";
+    
+    public static final String SELECT_COLUMN_INFO_KEY  = "phoenix.select.columninfos.list";
+    
+    public static final String SCHEMA_TYPE = "phoenix.select.schema.type";
+    
+    public static final String COLUMN_NAMES_DELIMITER = "phoenix.column.names.delimiter";
+    
+    public static final String INPUT_TABLE_NAME = "phoenix.input.table.name" ;
+    
+    public static final String INPUT_TABLE_CONDITIONS = "phoenix.input.table.conditions" ;
+    
+    public static final String OUTPUT_TABLE_NAME = "phoenix.output.table.name" ;
+    
+    public static final long DEFAULT_UPSERT_BATCH_SIZE = 1000;
+    
+    public static final String DEFAULT_COLUMN_NAMES_DELIMITER = ",";
+
+    public static final String INPUT_CLASS = "phoenix.input.class";
+    
+    public enum SchemaType {
+        TABLE,
+        QUERY;
+    }
+
+    private PhoenixConfigurationUtil(){
+        
+    }
+    /**
+     * 
+     * @param tableName
+     */
+    public static void setInputTableName(final Configuration configuration, final String tableName) {
+        Preconditions.checkNotNull(configuration);
+        Preconditions.checkNotNull(tableName);
+        configuration.set(INPUT_TABLE_NAME, tableName);
+    }
+    
+    public static void setInputTableConditions(final Configuration configuration, final String conditions) {
+        Preconditions.checkNotNull(configuration);
+        Preconditions.checkNotNull(conditions);
+        configuration.set(INPUT_TABLE_CONDITIONS, conditions);
+    }
+    
+    public static void setSelectColumnNames(final Configuration configuration,final String[] columns) {
+        Preconditions.checkNotNull(configuration);
+        final String selectColumnNames = Joiner.on(DEFAULT_COLUMN_NAMES_DELIMITER).join(columns);
+        configuration.set(SELECT_COLUMNS, selectColumnNames);
+    }
+    
+    public static void setSelectColumnNames(final Configuration configuration,final String columns) {
+        Preconditions.checkNotNull(configuration);
+        configuration.set(SELECT_COLUMNS, columns);
+    }
+    
+    public static void setInputClass(final Configuration configuration, Class<? extends DBWritable> inputClass) {
+        Preconditions.checkNotNull(configuration);
+        configuration.setClass(INPUT_CLASS ,inputClass,DBWritable.class);
+    }
+    
+    public static void setInputQuery(final Configuration configuration, final String inputQuery) {
+        Preconditions.checkNotNull(configuration);
+        Preconditions.checkNotNull(inputQuery);
+        configuration.set(SELECT_STATEMENT, inputQuery);
+    }
+    
+    public static void setSchemaType(Configuration configuration, final SchemaType schemaType) {
+        Preconditions.checkNotNull(configuration);
+        configuration.set(SCHEMA_TYPE, schemaType.name());
+    }
+    
+    public static void setOutputTableName(final Configuration configuration, final String tableName) {
+        Preconditions.checkNotNull(configuration);
+        Preconditions.checkNotNull(tableName);
+        configuration.set(OUTPUT_TABLE_NAME, tableName);
+    }
+    
+    public static void setUpsertColumnNames(final Configuration configuration,final String[] columns) {
+        Preconditions.checkNotNull(configuration);
+        final String upsertColumnNames = Joiner.on(DEFAULT_COLUMN_NAMES_DELIMITER).join(columns);
+        configuration.set(UPSERT_COLUMNS, upsertColumnNames);
+    }
+    
+    public static void setUpsertColumnNames(final Configuration configuration,final String columns) {
+        Preconditions.checkNotNull(configuration);
+        configuration.set(UPSERT_COLUMNS, columns);
+    }
+    
+   
+    public static void setBatchSize(final Configuration configuration, final Long batchSize) {
+        Preconditions.checkNotNull(configuration);
+        configuration.setLong(UPSERT_BATCH_SIZE, batchSize);
+    }
+    
+    public static Class<?> getInputClass(final Configuration configuration) {
+        return configuration.getClass(INPUT_CLASS, NullDBWritable.class);
+    }
+    public static SchemaType getSchemaType(final Configuration configuration) {
+        final String schemaTp = configuration.get(SCHEMA_TYPE);
+        Preconditions.checkNotNull(schemaTp);
+        return SchemaType.valueOf(schemaTp);
+    }
+    
+    public static List<ColumnInfo> getUpsertColumnMetadataList(final Configuration configuration) throws SQLException {
+        Preconditions.checkNotNull(configuration);
+        final String tableName = getOutputTableName(configuration);
+        Preconditions.checkNotNull(tableName);
+        final String columnInfoStr = configuration.get(UPSERT_COLUMN_INFO_KEY);
+        if(isNotEmpty(columnInfoStr)) {
+            return ColumnInfoToStringEncoderDecoder.decode(columnInfoStr);
+        }
+        final Connection connection = ConnectionUtil.getConnection(configuration);
+        String upsertColumns = configuration.get(UPSERT_COLUMNS);
+        List<String> upsertColumnList = null;
+        if(isNotEmpty(upsertColumns)) {
+            final String columnNamesDelimiter = configuration.get(COLUMN_NAMES_DELIMITER, DEFAULT_COLUMN_NAMES_DELIMITER);
+            upsertColumnList = Lists.newArrayList(Splitter.on(columnNamesDelimiter).omitEmptyStrings().trimResults().split(upsertColumns));
+            LOG.info(String.format("UseUpsertColumns=%s, upsertColumns=%s, upsertColumnSet.size()=%s, parsedColumns=%s "
+                    ,!upsertColumnList.isEmpty(),upsertColumns, upsertColumnList.size(), Joiner.on(DEFAULT_COLUMN_NAMES_DELIMITER).join(upsertColumnList)
+                    ));
+        } 
+       List<ColumnInfo> columnMetadataList = PhoenixRuntime.generateColumnInfo(connection, tableName, upsertColumnList);
+       final String encodedColumnInfos = ColumnInfoToStringEncoderDecoder.encode(columnMetadataList);
+       // we put the encoded column infos in the Configuration for re usability. 
+       configuration.set(UPSERT_COLUMN_INFO_KEY, encodedColumnInfos);
+       connection.close();
+       return columnMetadataList;
+    }
+    
+     public static String getUpsertStatement(final Configuration configuration) throws SQLException {
+        Preconditions.checkNotNull(configuration);
+        final String tableName = getOutputTableName(configuration);
+        Preconditions.checkNotNull(tableName);
+        String upsertStmt = configuration.get(UPSERT_STATEMENT);
+        if(isNotEmpty(upsertStmt)) {
+            return upsertStmt;
+        }
+        final boolean useUpsertColumns = isNotEmpty(configuration.get(UPSERT_COLUMNS,""));
+        final List<ColumnInfo> columnMetadataList = getUpsertColumnMetadataList(configuration);
+        if (useUpsertColumns) {
+            // Generating UPSERT statement without column name information.
+            upsertStmt = QueryUtil.constructUpsertStatement(tableName, columnMetadataList);
+            LOG.info("Phoenix Custom Upsert Statement: "+ upsertStmt);
+        } else {
+            // Generating UPSERT statement without column name information.
+            upsertStmt = QueryUtil.constructGenericUpsertStatement(tableName, columnMetadataList.size());
+            LOG.info("Phoenix Generic Upsert Statement: " + upsertStmt);
+        }
+        configuration.set(UPSERT_STATEMENT, upsertStmt);
+        return upsertStmt;
+        
+    }
+    
+    public static List<ColumnInfo> getSelectColumnMetadataList(final Configuration configuration) throws SQLException {
+        Preconditions.checkNotNull(configuration);
+        final String columnInfoStr = configuration.get(SELECT_COLUMN_INFO_KEY);
+        if(isNotEmpty(columnInfoStr)) {
+            return ColumnInfoToStringEncoderDecoder.decode(columnInfoStr);
+        }
+        final String tableName = getInputTableName(configuration);
+        Preconditions.checkNotNull(tableName);
+        final Connection connection = ConnectionUtil.getConnection(configuration);
+        final List<String> selectColumnList = getSelectColumnList(configuration);
+        final List<ColumnInfo> columnMetadataList = PhoenixRuntime.generateColumnInfo(connection, tableName, selectColumnList);
+        final String encodedColumnInfos = ColumnInfoToStringEncoderDecoder.encode(columnMetadataList);
+        // we put the encoded column infos in the Configuration for re usability. 
+        configuration.set(SELECT_COLUMN_INFO_KEY, encodedColumnInfos);
+        connection.close();
+        return columnMetadataList;
+    }
+
+    private static List<String> getSelectColumnList(
+            final Configuration configuration) {
+        String selectColumns = configuration.get(SELECT_COLUMNS);
+        List<String> selectColumnList = null;
+        if(isNotEmpty(selectColumns)) {
+            final String columnNamesDelimiter = configuration.get(COLUMN_NAMES_DELIMITER, DEFAULT_COLUMN_NAMES_DELIMITER);
+            selectColumnList = Lists.newArrayList(Splitter.on(columnNamesDelimiter).omitEmptyStrings().trimResults().split(selectColumns));
+            LOG.info(String.format("UseSelectColumns=%s, selectColumns=%s, selectColumnSet.size()=%s, parsedColumns=%s "
+                    ,!selectColumnList.isEmpty(),selectColumns, selectColumnList.size(), Joiner.on(DEFAULT_COLUMN_NAMES_DELIMITER).join(selectColumnList)
+                    ));
+        }
+        return selectColumnList;
+    }
+    
+    public static String getSelectStatement(final Configuration configuration) throws SQLException {
+        Preconditions.checkNotNull(configuration);
+        String selectStmt = configuration.get(SELECT_STATEMENT);
+        if(isNotEmpty(selectStmt)) {
+            return selectStmt;
+        }
+        final String tableName = getInputTableName(configuration);
+        Preconditions.checkNotNull(tableName);
+        final List<ColumnInfo> columnMetadataList = getSelectColumnMetadataList(configuration);
+        final String conditions = configuration.get(INPUT_TABLE_CONDITIONS);
+        selectStmt = QueryUtil.constructSelectStatement(tableName, columnMetadataList, conditions);
+        LOG.info("Select Statement: "+ selectStmt);
+        configuration.set(SELECT_STATEMENT, selectStmt);
+        return selectStmt;
+    }
+    
+    public static long getBatchSize(final Configuration configuration) throws SQLException {
+        Preconditions.checkNotNull(configuration);
+        long batchSize = configuration.getLong(UPSERT_BATCH_SIZE, DEFAULT_UPSERT_BATCH_SIZE);
+        if(batchSize <= 0) {
+           Connection conn = ConnectionUtil.getConnection(configuration);
+           batchSize = ((PhoenixConnection) conn).getMutateBatchSize();
+           conn.close();
+        }
+        configuration.setLong(UPSERT_BATCH_SIZE, batchSize);
+        return batchSize;
+    }
+    
+    public static int getSelectColumnsCount(Configuration configuration,
+            String tableName) throws SQLException {
+        Preconditions.checkNotNull(configuration);
+        final String schemaTp = configuration.get(SCHEMA_TYPE);
+        final SchemaType schemaType = SchemaType.valueOf(schemaTp);
+        int count = 0;
+        if(SchemaType.QUERY.equals(schemaType)) {
+            List<String> selectedColumnList = getSelectColumnList(configuration);
+            count = selectedColumnList == null ? 0 : selectedColumnList.size();
+        } else {
+            List<ColumnInfo> columnInfos = getSelectColumnMetadataList(configuration);
+            count = columnInfos == null ? 0 : columnInfos.size();
+        }
+        return count;
+    }
+
+    public static String getInputTableName(Configuration configuration) {
+        Preconditions.checkNotNull(configuration);
+        return configuration.get(INPUT_TABLE_NAME);
+    }
+
+    public static String getOutputTableName(Configuration configuration) {
+        Preconditions.checkNotNull(configuration);
+        return configuration.get(OUTPUT_TABLE_NAME);
+    }
+}

http://git-wip-us.apache.org/repos/asf/phoenix/blob/b70f3abd/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/util/PhoenixMapReduceUtil.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/util/PhoenixMapReduceUtil.java b/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/util/PhoenixMapReduceUtil.java
new file mode 100644
index 0000000..f1a7f5a
--- /dev/null
+++ b/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/util/PhoenixMapReduceUtil.java
@@ -0,0 +1,99 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.phoenix.mapreduce.util;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.mapreduce.Job;
+import org.apache.hadoop.mapreduce.lib.db.DBWritable;
+import org.apache.phoenix.mapreduce.PhoenixInputFormat;
+import org.apache.phoenix.mapreduce.PhoenixOutputFormat;
+import org.apache.phoenix.mapreduce.util.PhoenixConfigurationUtil.SchemaType;
+
+/**
+ * Utility class for setting Configuration parameters for the Map Reduce job
+ */
+public final class PhoenixMapReduceUtil {
+
+    private PhoenixMapReduceUtil() {
+        
+    }
+    
+    /**
+     * 
+     * @param job
+     * @param inputClass DBWritable class
+     * @param tableName  Input table name
+     * @param conditions Condition clause to be added to the WHERE clause.
+     * @param fieldNames fields being projected for the SELECT query.
+     */
+    public static void setInput(final Job job, final Class<? extends DBWritable> inputClass, final String tableName , final String conditions, final String... fieldNames) {
+          job.setInputFormatClass(PhoenixInputFormat.class);
+          final Configuration configuration = job.getConfiguration();
+          PhoenixConfigurationUtil.setInputTableName(configuration, tableName);
+          PhoenixConfigurationUtil.setSelectColumnNames(configuration,fieldNames);
+          PhoenixConfigurationUtil.setInputClass(configuration,inputClass);
+          PhoenixConfigurationUtil.setSchemaType(configuration, SchemaType.TABLE);
+    }
+    
+    /**
+     * 
+     * @param job         
+     * @param inputClass  DBWritable class  
+     * @param tableName   Input table name
+     * @param inputQuery  Select query.
+     */
+    public static void setInput(final Job job, final Class<? extends DBWritable> inputClass, final String tableName, final String inputQuery) {
+          job.setInputFormatClass(PhoenixInputFormat.class);
+          final Configuration configuration = job.getConfiguration();
+          PhoenixConfigurationUtil.setInputTableName(configuration, tableName);
+          PhoenixConfigurationUtil.setInputQuery(configuration, inputQuery);
+          PhoenixConfigurationUtil.setInputClass(configuration,inputClass);
+          PhoenixConfigurationUtil.setSchemaType(configuration, SchemaType.QUERY);
+     }
+    
+    /**
+     * 
+     * @param job
+     * @param outputClass  
+     * @param tableName  Output table 
+     * @param columns    List of columns separated by ,
+     */
+    public static void setOutput(final Job job, final String tableName,final String columns) {
+        job.setOutputFormatClass(PhoenixOutputFormat.class);
+        final Configuration configuration = job.getConfiguration();
+        PhoenixConfigurationUtil.setOutputTableName(configuration, tableName);
+        PhoenixConfigurationUtil.setUpsertColumnNames(configuration,columns);
+    }
+    
+    
+    /**
+     * 
+     * @param job
+     * @param outputClass
+     * @param tableName  Output table 
+     * @param fieldNames fields
+     */
+    public static void setOutput(final Job job, final String tableName , final String... fieldNames) {
+          job.setOutputFormatClass(PhoenixOutputFormat.class);
+          final Configuration configuration = job.getConfiguration();
+          PhoenixConfigurationUtil.setOutputTableName(configuration, tableName);
+          PhoenixConfigurationUtil.setUpsertColumnNames(configuration,fieldNames);
+    }
+
+    
+}

http://git-wip-us.apache.org/repos/asf/phoenix/blob/b70f3abd/phoenix-core/src/main/java/org/apache/phoenix/util/PhoenixRuntime.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/util/PhoenixRuntime.java b/phoenix-core/src/main/java/org/apache/phoenix/util/PhoenixRuntime.java
index f7abe7e..2dc4029 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/util/PhoenixRuntime.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/util/PhoenixRuntime.java
@@ -325,8 +325,8 @@ public class PhoenixRuntime {
         if (columns == null) {
             // use all columns in the table
             for(PColumn pColumn : table.getColumns()) {
-               int sqlType = pColumn.getDataType().getResultSetSqlType();
-               columnInfoList.add(new ColumnInfo(pColumn.toString(), sqlType));
+               int sqlType = pColumn.getDataType().getSqlType();
+               columnInfoList.add(new ColumnInfo(pColumn.toString(), sqlType)); 
             }
         } else {
             // Leave "null" as indication to skip b/c it doesn't exist
@@ -405,7 +405,7 @@ public class PhoenixRuntime {
         if (pColumn==null) {
             throw new SQLException("pColumn must not be null.");
         }
-        int sqlType = pColumn.getDataType().getResultSetSqlType();
+        int sqlType = pColumn.getDataType().getSqlType();
         ColumnInfo columnInfo = new ColumnInfo(pColumn.toString(),sqlType);
         return columnInfo;
     }

http://git-wip-us.apache.org/repos/asf/phoenix/blob/b70f3abd/phoenix-core/src/main/java/org/apache/phoenix/util/QueryUtil.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/util/QueryUtil.java b/phoenix-core/src/main/java/org/apache/phoenix/util/QueryUtil.java
index af77001..b91fddc 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/util/QueryUtil.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/util/QueryUtil.java
@@ -145,29 +145,33 @@ public final class QueryUtil {
      * 
      * @param fullTableName name of the table for which the select statement needs to be created.
      * @param columnInfos  list of columns to be projected in the select statement.
+     * @param conditions   The condition clause to be added to the WHERE condition
      * @return Select Query 
      */
-    public static String constructSelectStatement(String fullTableName, List<ColumnInfo> columnInfos) {
+    public static String constructSelectStatement(String fullTableName, List<ColumnInfo> columnInfos,final String conditions) {
         Preconditions.checkNotNull(fullTableName,"Table name cannot be null");
         if(columnInfos == null || columnInfos.isEmpty()) {
              throw new IllegalArgumentException("At least one column must be provided");
         }
         // escape the table name to ensure it is case sensitive.
         final String escapedFullTableName = SchemaUtil.getEscapedFullTableName(fullTableName);
-        StringBuilder sb = new StringBuilder();
-        sb.append("SELECT ");
+        StringBuilder query = new StringBuilder();
+        query.append("SELECT ");
         for (ColumnInfo cinfo : columnInfos) {
             if (cinfo != null) {
                 String fullColumnName = getEscapedFullColumnName(cinfo.getColumnName());
-                sb.append(fullColumnName);
-                sb.append(",");
+                query.append(fullColumnName);
+                query.append(",");
              }
          }
         // Remove the trailing comma
-        sb.setLength(sb.length() - 1);
-        sb.append(" FROM ");
-        sb.append(escapedFullTableName);
-        return sb.toString();
+        query.setLength(query.length() - 1);
+        query.append(" FROM ");
+        query.append(escapedFullTableName);
+        if(conditions != null && conditions.length() > 0) {
+            query.append(" WHERE (").append(conditions).append(")");
+        }
+        return query.toString();
     }
 
     public static String getUrl(String server) {

http://git-wip-us.apache.org/repos/asf/phoenix/blob/b70f3abd/phoenix-core/src/test/java/org/apache/phoenix/mapreduce/util/ColumnInfoToStringEncoderDecoderTest.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/test/java/org/apache/phoenix/mapreduce/util/ColumnInfoToStringEncoderDecoderTest.java b/phoenix-core/src/test/java/org/apache/phoenix/mapreduce/util/ColumnInfoToStringEncoderDecoderTest.java
new file mode 100644
index 0000000..1004981
--- /dev/null
+++ b/phoenix-core/src/test/java/org/apache/phoenix/mapreduce/util/ColumnInfoToStringEncoderDecoderTest.java
@@ -0,0 +1,61 @@
+/*
+ * Copyright 2010 The Apache Software Foundation
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ *distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you maynot use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicablelaw or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.phoenix.mapreduce.util;
+
+import static org.junit.Assert.assertEquals;
+
+import java.util.List;
+
+import org.apache.phoenix.mapreduce.util.ColumnInfoToStringEncoderDecoder;
+import org.apache.phoenix.schema.PDataType;
+import org.apache.phoenix.util.ColumnInfo;
+import org.junit.Test;
+
+import com.google.common.collect.Lists;
+
+/**
+ * Tests methods on {@link ColumnInfoToStringEncoderDecoder}
+ */
+public class ColumnInfoToStringEncoderDecoderTest {
+
+    @Test
+    public void testEncode() {
+        final ColumnInfo columnInfo = new ColumnInfo("col1", PDataType.VARCHAR.getSqlType());
+        final String encodedColumnInfo = ColumnInfoToStringEncoderDecoder.encode(Lists.newArrayList(columnInfo));
+        assertEquals(columnInfo.toString(),encodedColumnInfo);
+    }
+    
+    @Test
+    public void testDecode() {
+        final ColumnInfo columnInfo = new ColumnInfo("col1", PDataType.VARCHAR.getSqlType());
+        final String encodedColumnInfo = ColumnInfoToStringEncoderDecoder.encode(Lists.newArrayList(columnInfo));
+        assertEquals(columnInfo.toString(),encodedColumnInfo);
+    }
+    
+    @Test
+    public void testEncodeDecodeWithNulls() {
+        final ColumnInfo columnInfo1 = new ColumnInfo("col1", PDataType.VARCHAR.getSqlType());
+        final ColumnInfo columnInfo2 = null;
+        final String columnInfoStr = ColumnInfoToStringEncoderDecoder.encode(Lists.newArrayList(columnInfo1,columnInfo2));
+        final List<ColumnInfo> decodedColumnInfo = ColumnInfoToStringEncoderDecoder.decode(columnInfoStr);
+        assertEquals(1,decodedColumnInfo.size()); 
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/phoenix/blob/b70f3abd/phoenix-core/src/test/java/org/apache/phoenix/mapreduce/util/PhoenixConfigurationUtilTest.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/test/java/org/apache/phoenix/mapreduce/util/PhoenixConfigurationUtilTest.java b/phoenix-core/src/test/java/org/apache/phoenix/mapreduce/util/PhoenixConfigurationUtilTest.java
new file mode 100644
index 0000000..33c7531
--- /dev/null
+++ b/phoenix-core/src/test/java/org/apache/phoenix/mapreduce/util/PhoenixConfigurationUtilTest.java
@@ -0,0 +1,124 @@
+/*
+ * Copyright 2010 The Apache Software Foundation
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ *distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you maynot use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicablelaw or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.phoenix.mapreduce.util;
+
+import static org.junit.Assert.assertEquals;
+
+import java.sql.Connection;
+import java.sql.DriverManager;
+import java.util.List;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.HConstants;
+import org.apache.phoenix.mapreduce.util.PhoenixConfigurationUtil.SchemaType;
+import org.apache.phoenix.query.BaseConnectionlessQueryTest;
+import org.apache.phoenix.util.ColumnInfo;
+import org.apache.phoenix.util.PropertiesUtil;
+import org.apache.phoenix.util.SchemaUtil;
+import org.apache.phoenix.util.TestUtil;
+import org.junit.Test;
+
+/**
+ * Test for {@link PhoenixConfigurationUtil}
+ */
+public class PhoenixConfigurationUtilTest extends BaseConnectionlessQueryTest {
+    
+    @Test
+    public void testUpsertStatement() throws Exception {
+        Connection conn = DriverManager.getConnection(getUrl(), PropertiesUtil.deepCopy(TestUtil.TEST_PROPERTIES));
+        final String tableName = "TEST_TABLE";
+        try {
+            String ddl = "CREATE TABLE "+ tableName + 
+                    "  (a_string varchar not null, a_binary varbinary not null, col1 integer" +
+                    "  CONSTRAINT pk PRIMARY KEY (a_string, a_binary))\n";
+            conn.createStatement().execute(ddl);
+            final Configuration configuration = new Configuration ();
+            configuration.set(HConstants.ZOOKEEPER_QUORUM, getUrl());
+            PhoenixConfigurationUtil.setOutputTableName(configuration, tableName);
+            final String upserStatement = PhoenixConfigurationUtil.getUpsertStatement(configuration);
+            final String expectedUpsertStatement = "UPSERT INTO " + tableName + " VALUES (?, ?, ?)"; 
+            assertEquals(expectedUpsertStatement, upserStatement);
+        } finally {
+            conn.close();
+        }
+     }
+
+    @Test
+    public void testSelectStatement() throws Exception {
+        Connection conn = DriverManager.getConnection(getUrl(), PropertiesUtil.deepCopy(TestUtil.TEST_PROPERTIES));
+        final String tableName = "TEST_TABLE";
+        try {
+            String ddl = "CREATE TABLE "+ tableName + 
+                    "  (a_string varchar not null, a_binary varbinary not null, col1 integer" +
+                    "  CONSTRAINT pk PRIMARY KEY (a_string, a_binary))\n";
+            conn.createStatement().execute(ddl);
+            final Configuration configuration = new Configuration ();
+            configuration.set(HConstants.ZOOKEEPER_QUORUM, getUrl());
+            PhoenixConfigurationUtil.setInputTableName(configuration, tableName);
+            final String selectStatement = PhoenixConfigurationUtil.getSelectStatement(configuration);
+            final String expectedSelectStatement = "SELECT \"A_STRING\",\"A_BINARY\",\"0\".\"COL1\" FROM " + SchemaUtil.getEscapedArgument(tableName) ; 
+            assertEquals(expectedSelectStatement, selectStatement);
+        } finally {
+            conn.close();
+        }
+    }
+    
+    @Test
+    public void testSelectStatementForSpecificColumns() throws Exception {
+        Connection conn = DriverManager.getConnection(getUrl(), PropertiesUtil.deepCopy(TestUtil.TEST_PROPERTIES));
+        final String tableName = "TEST_TABLE";
+        try {
+            String ddl = "CREATE TABLE "+ tableName + 
+                    "  (a_string varchar not null, a_binary varbinary not null, col1 integer" +
+                    "  CONSTRAINT pk PRIMARY KEY (a_string, a_binary))\n";
+            conn.createStatement().execute(ddl);
+            final Configuration configuration = new Configuration ();
+            configuration.set(HConstants.ZOOKEEPER_QUORUM, getUrl());
+            PhoenixConfigurationUtil.setInputTableName(configuration, tableName);
+            PhoenixConfigurationUtil.setSelectColumnNames(configuration, "A_BINARY");
+            final String selectStatement = PhoenixConfigurationUtil.getSelectStatement(configuration);
+            final String expectedSelectStatement = "SELECT \"A_BINARY\" FROM " + SchemaUtil.getEscapedArgument(tableName) ; 
+            assertEquals(expectedSelectStatement, selectStatement);
+        } finally {
+            conn.close();
+        }
+    }
+    
+    @Test
+    public void testSelectStatementForArrayTypes() throws Exception {
+        Connection conn = DriverManager.getConnection(getUrl(), PropertiesUtil.deepCopy(TestUtil.TEST_PROPERTIES));
+        final String tableName = "TEST_TABLE";
+        try {
+            String ddl = "CREATE TABLE "+ tableName + 
+                    "  (ID BIGINT NOT NULL PRIMARY KEY, VCARRAY VARCHAR[])\n";
+            conn.createStatement().execute(ddl);
+            final Configuration configuration = new Configuration ();
+            configuration.set(HConstants.ZOOKEEPER_QUORUM, getUrl());
+            PhoenixConfigurationUtil.setSelectColumnNames(configuration,"ID,VCARRAY");
+            PhoenixConfigurationUtil.setSchemaType(configuration, SchemaType.QUERY);
+            PhoenixConfigurationUtil.setInputTableName(configuration, tableName);
+            final String selectStatement = PhoenixConfigurationUtil.getSelectStatement(configuration);
+            final String expectedSelectStatement = "SELECT \"ID\",\"0\".\"VCARRAY\" FROM " + SchemaUtil.getEscapedArgument(tableName) ; 
+            assertEquals(expectedSelectStatement, selectStatement);
+        } finally {
+            conn.close();
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/phoenix/blob/b70f3abd/phoenix-core/src/test/java/org/apache/phoenix/util/QueryUtilTest.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/test/java/org/apache/phoenix/util/QueryUtilTest.java b/phoenix-core/src/test/java/org/apache/phoenix/util/QueryUtilTest.java
index 182eb56..33e3b5a 100644
--- a/phoenix-core/src/test/java/org/apache/phoenix/util/QueryUtilTest.java
+++ b/phoenix-core/src/test/java/org/apache/phoenix/util/QueryUtilTest.java
@@ -64,7 +64,7 @@ public class QueryUtilTest {
     public void testConstructSelectStatement() {
         assertEquals(
                 "SELECT \"ID\",\"NAME\" FROM \"MYTAB\"",
-                QueryUtil.constructSelectStatement("MYTAB", ImmutableList.of(ID_COLUMN,NAME_COLUMN)));
+                QueryUtil.constructSelectStatement("MYTAB", ImmutableList.of(ID_COLUMN,NAME_COLUMN),null));
     }
 
     /**

http://git-wip-us.apache.org/repos/asf/phoenix/blob/b70f3abd/phoenix-pig/src/it/java/org/apache/phoenix/pig/PhoenixPigConfigurationIT.java
----------------------------------------------------------------------
diff --git a/phoenix-pig/src/it/java/org/apache/phoenix/pig/PhoenixPigConfigurationIT.java b/phoenix-pig/src/it/java/org/apache/phoenix/pig/PhoenixPigConfigurationIT.java
deleted file mode 100644
index efbfbf8..0000000
--- a/phoenix-pig/src/it/java/org/apache/phoenix/pig/PhoenixPigConfigurationIT.java
+++ /dev/null
@@ -1,109 +0,0 @@
-/*
- * Copyright 2010 The Apache Software Foundation
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- *distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you maynot use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicablelaw or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.phoenix.pig;
-
-import static org.apache.phoenix.util.PhoenixRuntime.JDBC_PROTOCOL_TERMINATOR;
-import static org.apache.phoenix.util.PhoenixRuntime.PHOENIX_TEST_DRIVER_URL_PARAM;
-import static org.apache.phoenix.util.TestUtil.TEST_PROPERTIES;
-import static org.junit.Assert.assertEquals;
-
-import java.sql.Connection;
-import java.sql.DriverManager;
-import java.util.Properties;
-
-import org.apache.hadoop.conf.Configuration;
-import org.apache.phoenix.end2end.BaseHBaseManagedTimeIT;
-import org.apache.phoenix.util.PropertiesUtil;
-import org.apache.phoenix.util.SchemaUtil;
-import org.apache.phoenix.util.TestUtil;
-import org.junit.Test;
-
-
-public class PhoenixPigConfigurationIT extends BaseHBaseManagedTimeIT {
-    private static final String zkQuorum = TestUtil.LOCALHOST + JDBC_PROTOCOL_TERMINATOR + PHOENIX_TEST_DRIVER_URL_PARAM;
-    
-    @Test
-    public void testUpsertStatement() throws Exception {
-        Properties props = PropertiesUtil.deepCopy(TEST_PROPERTIES);
-        Connection conn = DriverManager.getConnection(getUrl(), props);
-        conn.setAutoCommit(false);
-        final String tableName = "TEST_TABLE";
-        try {
-            String ddl = "CREATE TABLE "+ tableName + 
-                    "  (a_string varchar not null, a_binary varbinary not null, col1 integer" +
-                    "  CONSTRAINT pk PRIMARY KEY (a_string, a_binary))\n";
-            createTestTable(getUrl(), ddl);
-            final PhoenixPigConfiguration configuration = newConfiguration (tableName);
-            final String upserStatement = configuration.getUpsertStatement();
-            final String expectedUpsertStatement = "UPSERT INTO " + tableName + " VALUES (?, ?, ?)"; 
-            assertEquals(expectedUpsertStatement, upserStatement);
-        } finally {
-            conn.close();
-        }
-    }
-    
-    @Test
-    public void testSelectStatement() throws Exception {
-        Properties props = PropertiesUtil.deepCopy(TEST_PROPERTIES);
-        Connection conn = DriverManager.getConnection(getUrl(), props);
-        conn.setAutoCommit(false);
-        final String tableName = "TEST_TABLE";
-        try {
-            String ddl = "CREATE TABLE "+ tableName + 
-                    "  (a_string varchar not null, a_binary varbinary not null, col1 integer" +
-                    "  CONSTRAINT pk PRIMARY KEY (a_string, a_binary))\n";
-            createTestTable(getUrl(), ddl);
-            final PhoenixPigConfiguration configuration = newConfiguration (tableName);
-            final String selectStatement = configuration.getSelectStatement();
-            final String expectedSelectStatement = "SELECT \"A_STRING\",\"A_BINARY\",\"0\".\"COL1\" FROM " + SchemaUtil.getEscapedArgument(tableName) ; 
-            assertEquals(expectedSelectStatement, selectStatement);
-        } finally {
-            conn.close();
-        }
-    }
-    
-    @Test
-    public void testSelectStatementForSpecificColumns() throws Exception {
-        Properties props = PropertiesUtil.deepCopy(TEST_PROPERTIES);
-        Connection conn = DriverManager.getConnection(getUrl(), props);
-        conn.setAutoCommit(false);
-        final String tableName = "TEST_TABLE";
-        try {
-            String ddl = "CREATE TABLE "+ tableName + 
-                    "  (a_string varchar not null, a_binary varbinary not null, col1 integer" +
-                    "  CONSTRAINT pk PRIMARY KEY (a_string, a_binary))\n";
-            createTestTable(getUrl(), ddl);
-            final PhoenixPigConfiguration configuration = newConfiguration (tableName);
-            configuration.setSelectColumns("A_BINARY");
-            final String selectStatement = configuration.getSelectStatement();
-            final String expectedSelectStatement = "SELECT \"A_BINARY\" FROM " + SchemaUtil.getEscapedArgument(tableName) ; 
-            assertEquals(expectedSelectStatement, selectStatement);
-        } finally {
-            conn.close();
-        }
-    }
-
-    private PhoenixPigConfiguration newConfiguration(String tableName) {
-        final Configuration configuration = new Configuration();
-        final PhoenixPigConfiguration phoenixConfiguration = new PhoenixPigConfiguration(configuration);
-        phoenixConfiguration.configure(zkQuorum, tableName.toUpperCase(), 100);
-        return phoenixConfiguration;
-    }
-}

http://git-wip-us.apache.org/repos/asf/phoenix/blob/b70f3abd/phoenix-pig/src/main/java/org/apache/phoenix/pig/PhoenixHBaseLoader.java
----------------------------------------------------------------------
diff --git a/phoenix-pig/src/main/java/org/apache/phoenix/pig/PhoenixHBaseLoader.java b/phoenix-pig/src/main/java/org/apache/phoenix/pig/PhoenixHBaseLoader.java
index d8bedf6..1218e82 100644
--- a/phoenix-pig/src/main/java/org/apache/phoenix/pig/PhoenixHBaseLoader.java
+++ b/phoenix-pig/src/main/java/org/apache/phoenix/pig/PhoenixHBaseLoader.java
@@ -28,18 +28,21 @@ import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hbase.HConstants;
+import org.apache.hadoop.hbase.mapreduce.TableMapReduceUtil;
 import org.apache.hadoop.hbase.util.Pair;
 import org.apache.hadoop.io.NullWritable;
 import org.apache.hadoop.mapreduce.InputFormat;
 import org.apache.hadoop.mapreduce.Job;
 import org.apache.hadoop.mapreduce.RecordReader;
-import org.apache.phoenix.pig.PhoenixPigConfiguration.SchemaType;
-import org.apache.phoenix.pig.hadoop.PhoenixInputFormat;
-import org.apache.phoenix.pig.hadoop.PhoenixRecord;
+import org.apache.phoenix.mapreduce.PhoenixInputFormat;
+import org.apache.phoenix.mapreduce.util.PhoenixConfigurationUtil;
+import org.apache.phoenix.mapreduce.util.PhoenixConfigurationUtil.SchemaType;
 import org.apache.phoenix.pig.util.PhoenixPigSchemaUtil;
 import org.apache.phoenix.pig.util.QuerySchemaParserFunction;
 import org.apache.phoenix.pig.util.TableSchemaParserFunction;
 import org.apache.phoenix.pig.util.TypeUtil;
+import org.apache.phoenix.pig.writable.PhoenixPigDBWritable;
 import org.apache.pig.Expression;
 import org.apache.pig.LoadFunc;
 import org.apache.pig.LoadMetadata;
@@ -83,12 +86,12 @@ public final class PhoenixHBaseLoader extends LoadFunc implements LoadMetadata {
     private static final String PHOENIX_QUERY_SCHEME      = "hbase://query/";
     private static final String RESOURCE_SCHEMA_SIGNATURE = "phoenix.pig.schema";
    
-    private PhoenixPigConfiguration config;
+    private Configuration config;
     private String tableName;
     private String selectQuery;
     private String zkQuorum ;
-    private PhoenixInputFormat inputFormat;
-    private RecordReader<NullWritable, PhoenixRecord> reader;
+    private PhoenixInputFormat<PhoenixPigDBWritable> inputFormat;
+    private RecordReader<NullWritable,PhoenixPigDBWritable> reader;
     private String contextSignature;
     private ResourceSchema schema;
        
@@ -107,6 +110,8 @@ public final class PhoenixHBaseLoader extends LoadFunc implements LoadMetadata {
         final Configuration configuration = job.getConfiguration();
         //explicitly turning off combining splits. 
         configuration.setBoolean("pig.noSplitCombination", true);
+        //to have phoenix working on a secured cluster
+        TableMapReduceUtil.initCredentials(job);
         this.initializePhoenixPigConfiguration(location, configuration);
     }
 
@@ -120,21 +125,22 @@ public final class PhoenixHBaseLoader extends LoadFunc implements LoadMetadata {
         if(this.config != null) {
             return;
         }
-        this.config = new PhoenixPigConfiguration(configuration);
-        this.config.setServerName(this.zkQuorum);
+        this.config = configuration;
+        this.config.set(HConstants.ZOOKEEPER_QUORUM,this.zkQuorum);
+        PhoenixConfigurationUtil.setInputClass(this.config, PhoenixPigDBWritable.class);
         Pair<String,String> pair = null;
         try {
             if (location.startsWith(PHOENIX_TABLE_NAME_SCHEME)) {
                 String tableSchema = location.substring(PHOENIX_TABLE_NAME_SCHEME.length());
                 final TableSchemaParserFunction parseFunction = new TableSchemaParserFunction();
                 pair =  parseFunction.apply(tableSchema);
-				this.config.setSchemaType(SchemaType.TABLE);
+                PhoenixConfigurationUtil.setSchemaType(this.config, SchemaType.TABLE);
              } else if (location.startsWith(PHOENIX_QUERY_SCHEME)) {
                 this.selectQuery = location.substring(PHOENIX_QUERY_SCHEME.length());
                 final QuerySchemaParserFunction queryParseFunction = new QuerySchemaParserFunction(this.config);
                 pair = queryParseFunction.apply(this.selectQuery);
-                config.setSelectStatement(this.selectQuery);
-				this.config.setSchemaType(SchemaType.QUERY);
+                PhoenixConfigurationUtil.setInputQuery(this.config, this.selectQuery);
+                PhoenixConfigurationUtil.setSchemaType(this.config, SchemaType.QUERY);
             }
             this.tableName = pair.getFirst();
             final String selectedColumns = pair.getSecond();
@@ -142,9 +148,9 @@ public final class PhoenixHBaseLoader extends LoadFunc implements LoadMetadata {
             if(isEmpty(this.tableName) && isEmpty(this.selectQuery)) {
                 printUsage(location);
             }
-            this.config.setTableName(this.tableName);
+            PhoenixConfigurationUtil.setInputTableName(this.config, this.tableName);
             if(!isEmpty(selectedColumns)) {
-                this.config.setSelectColumns(selectedColumns);    
+                PhoenixConfigurationUtil.setSelectColumnNames(this.config, selectedColumns);   
             }
         } catch(IllegalArgumentException iae) {
             printUsage(location);
@@ -160,7 +166,8 @@ public final class PhoenixHBaseLoader extends LoadFunc implements LoadMetadata {
     @Override
     public InputFormat getInputFormat() throws IOException {
         if(inputFormat == null) {
-            inputFormat = new PhoenixInputFormat();
+            inputFormat = new PhoenixInputFormat<PhoenixPigDBWritable>();
+            PhoenixConfigurationUtil.setInputClass(this.config,PhoenixPigDBWritable.class);
         }
         return inputFormat;
     }
@@ -188,13 +195,13 @@ public final class PhoenixHBaseLoader extends LoadFunc implements LoadMetadata {
     public Tuple getNext() throws IOException {
         try {
             if(!reader.nextKeyValue()) {
-               return null; 
-            }
-            final PhoenixRecord phoenixRecord = reader.getCurrentValue();
-            if(phoenixRecord == null) {
+                return null; 
+             }
+             final PhoenixPigDBWritable record = reader.getCurrentValue();
+            if(record == null) {
                 return null;
             }
-            final Tuple tuple = TypeUtil.transformToTuple(phoenixRecord,schema.getFields());
+            final Tuple tuple = TypeUtil.transformToTuple(record,schema.getFields());
             return tuple;
        } catch (InterruptedException e) {
             int errCode = 6018;


[2/3] phoenix git commit: PHOENIX-1454 Map Reduce over Phoenix tables

Posted by ra...@apache.org.
http://git-wip-us.apache.org/repos/asf/phoenix/blob/b70f3abd/phoenix-pig/src/main/java/org/apache/phoenix/pig/PhoenixHBaseStorage.java
----------------------------------------------------------------------
diff --git a/phoenix-pig/src/main/java/org/apache/phoenix/pig/PhoenixHBaseStorage.java b/phoenix-pig/src/main/java/org/apache/phoenix/pig/PhoenixHBaseStorage.java
index 500e403..eb2c124 100644
--- a/phoenix-pig/src/main/java/org/apache/phoenix/pig/PhoenixHBaseStorage.java
+++ b/phoenix-pig/src/main/java/org/apache/phoenix/pig/PhoenixHBaseStorage.java
@@ -20,6 +20,8 @@ package org.apache.phoenix.pig;
 import java.io.IOException;
 import java.net.URI;
 import java.net.URISyntaxException;
+import java.sql.SQLException;
+import java.util.List;
 import java.util.Properties;
 
 import org.apache.commons.cli.CommandLine;
@@ -28,13 +30,17 @@ import org.apache.commons.cli.GnuParser;
 import org.apache.commons.cli.HelpFormatter;
 import org.apache.commons.cli.Options;
 import org.apache.commons.cli.ParseException;
+import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hbase.HConstants;
 import org.apache.hadoop.io.NullWritable;
 import org.apache.hadoop.mapreduce.Job;
 import org.apache.hadoop.mapreduce.OutputFormat;
 import org.apache.hadoop.mapreduce.RecordWriter;
-import org.apache.phoenix.pig.hadoop.PhoenixOutputFormat;
-import org.apache.phoenix.pig.hadoop.PhoenixRecord;
+import org.apache.phoenix.mapreduce.PhoenixOutputFormat;
+import org.apache.phoenix.mapreduce.util.PhoenixConfigurationUtil;
+import org.apache.phoenix.pig.writable.PhoenixPigDBWritable;
+import org.apache.phoenix.util.ColumnInfo;
 import org.apache.pig.ResourceSchema;
 import org.apache.pig.ResourceSchema.ResourceFieldSchema;
 import org.apache.pig.StoreFuncInterface;
@@ -75,74 +81,76 @@ import org.apache.pig.impl.util.UDFContext;
 @SuppressWarnings("rawtypes")
 public class PhoenixHBaseStorage implements StoreFuncInterface {
 
-	private PhoenixPigConfiguration config;
-	private RecordWriter<NullWritable, PhoenixRecord> writer;
-	private String contextSignature = null;
-	private ResourceSchema schema;	
-	private long batchSize;
-	private final PhoenixOutputFormat outputFormat = new PhoenixOutputFormat();
-
-	// Set of options permitted
-	private final static Options validOptions = new Options();
-	private final static CommandLineParser parser = new GnuParser();
-	private final static String SCHEMA = "_schema";
-
-	private final CommandLine configuredOptions;
-	private final String server;
-
-	public PhoenixHBaseStorage(String server) throws ParseException {
-		this(server, null);
-	}
-
-	public PhoenixHBaseStorage(String server, String optString)
-			throws ParseException {
-		populateValidOptions();
-		this.server = server;
-
-		String[] optsArr = optString == null ? new String[0] : optString.split(" ");
-		try {
-			configuredOptions = parser.parse(validOptions, optsArr);
-		} catch (ParseException e) {
-			HelpFormatter formatter = new HelpFormatter();
-			formatter.printHelp("[-batchSize]", validOptions);
-			throw e;
-		}
-
-		batchSize = Long.parseLong(configuredOptions.getOptionValue("batchSize"));
-	}
-
-	private static void populateValidOptions() {
-		validOptions.addOption("batchSize", true, "Specify upsert batch size");
-	}
-
-	/**
-	 * Returns UDFProperties based on <code>contextSignature</code>.
-	 */
-	private Properties getUDFProperties() {
-		return UDFContext.getUDFContext().getUDFProperties(this.getClass(), new String[] { contextSignature });
-	}
-
-	
-	/**
-	 * Parse the HBase table name and configure job
-	 */
-	@Override
-	public void setStoreLocation(String location, Job job) throws IOException {
-	    URI locationURI;
+    private Configuration config;
+    private RecordWriter<NullWritable, PhoenixPigDBWritable> writer;
+    private List<ColumnInfo> columnInfo = null;
+    private String contextSignature = null;
+    private ResourceSchema schema;  
+    private long batchSize;
+    private final PhoenixOutputFormat outputFormat = new PhoenixOutputFormat();
+
+    // Set of options permitted
+    private final static Options validOptions = new Options();
+    private final static CommandLineParser parser = new GnuParser();
+    private final static String SCHEMA = "_schema";
+
+    private final CommandLine configuredOptions;
+    private final String server;
+
+    public PhoenixHBaseStorage(String server) throws ParseException {
+        this(server, null);
+    }
+
+    public PhoenixHBaseStorage(String server, String optString)
+            throws ParseException {
+        populateValidOptions();
+        this.server = server;
+
+        String[] optsArr = optString == null ? new String[0] : optString.split(" ");
+        try {
+            configuredOptions = parser.parse(validOptions, optsArr);
+        } catch (ParseException e) {
+            HelpFormatter formatter = new HelpFormatter();
+            formatter.printHelp("[-batchSize]", validOptions);
+            throw e;
+        }
+        batchSize = Long.parseLong(configuredOptions.getOptionValue("batchSize"));
+    }
+
+    private static void populateValidOptions() {
+        validOptions.addOption("batchSize", true, "Specify upsert batch size");
+    }
+
+    /**
+     * Returns UDFProperties based on <code>contextSignature</code>.
+     */
+    private Properties getUDFProperties() {
+        return UDFContext.getUDFContext().getUDFProperties(this.getClass(), new String[] { contextSignature });
+    }
+
+    
+    /**
+     * Parse the HBase table name and configure job
+     */
+    @Override
+    public void setStoreLocation(String location, Job job) throws IOException {
+        URI locationURI;
         try {
             locationURI = new URI(location);
             if (!"hbase".equals(locationURI.getScheme())) {
                 throw new IOException(String.format("Location must use the hbase protocol, hbase://tableName[/columnList]. Supplied location=%s",location));
             }
+            config = job.getConfiguration();
+            config.set(HConstants.ZOOKEEPER_QUORUM, server);
             String tableName = locationURI.getAuthority();
             // strip off the leading path token '/'
             String columns = null;
             if(!locationURI.getPath().isEmpty()) {
                 columns = locationURI.getPath().substring(1);
+                PhoenixConfigurationUtil.setUpsertColumnNames(config, columns);
             }
-            config = new PhoenixPigConfiguration(job.getConfiguration());
-            config.configure(server, tableName, batchSize, columns);
-            
+            PhoenixConfigurationUtil.setOutputTableName(config,tableName);
+            PhoenixConfigurationUtil.setBatchSize(config,batchSize);
             String serializedSchema = getUDFProperties().getProperty(contextSignature + SCHEMA);
             if (serializedSchema != null) {
                 schema = (ResourceSchema) ObjectSerializer.deserialize(serializedSchema);
@@ -150,59 +158,61 @@ public class PhoenixHBaseStorage implements StoreFuncInterface {
         } catch (URISyntaxException e) {
             throw new IOException(String.format("Location must use the hbase protocol, hbase://tableName[/columnList]. Supplied location=%s",location),e);
         }
-	}
+    }
 
-	@SuppressWarnings("unchecked")
+    @SuppressWarnings("unchecked")
     @Override
-	public void prepareToWrite(RecordWriter writer) throws IOException {
-		this.writer =writer;
-	}
+    public void prepareToWrite(RecordWriter writer) throws IOException {
+        this.writer = writer;
+        try {
+            this.columnInfo = PhoenixConfigurationUtil.getUpsertColumnMetadataList(this.config);
+        } catch(SQLException sqle) {
+            throw new IOException(sqle);
+        }
+    }
 
-	@Override
-	public void putNext(Tuple t) throws IOException {
+    @Override
+    public void putNext(Tuple t) throws IOException {
         ResourceFieldSchema[] fieldSchemas = (schema == null) ? null : schema.getFields();      
-        
-        PhoenixRecord record = new PhoenixRecord(fieldSchemas);
-        
+        PhoenixPigDBWritable record = PhoenixPigDBWritable.newInstance(fieldSchemas,this.columnInfo);
         for(int i=0; i<t.size(); i++) {
-        	record.add(t.get(i));
+            record.add(t.get(i));
+        }
+        try {
+            this.writer.write(null, record);
+        } catch (InterruptedException e) {
+            throw new RuntimeException(e);
         }
         
-		try {
-			writer.write(null, record);
-		} catch (InterruptedException e) {
-			throw new RuntimeException(e);
-		}
-        
-	}
+    }
 
-	@Override
-	public void setStoreFuncUDFContextSignature(String signature) {
+    @Override
+    public void setStoreFuncUDFContextSignature(String signature) {
         this.contextSignature = signature;
-	}
-
-	@Override
-	public void cleanupOnFailure(String location, Job job) throws IOException {
-	}
-
-	@Override
-	public void cleanupOnSuccess(String location, Job job) throws IOException {
-	}
-
-	@Override
-	public String relToAbsPathForStoreLocation(String location, Path curDir) throws IOException {
-		return location;
-	}
-
-	@Override
-	public OutputFormat getOutputFormat() throws IOException {
-		return outputFormat;
-	}
-
-	@Override
-	public void checkSchema(ResourceSchema s) throws IOException {
-		schema = s;
-		getUDFProperties().setProperty(contextSignature + SCHEMA, ObjectSerializer.serialize(schema));
-	}
+    }
+
+    @Override
+    public void cleanupOnFailure(String location, Job job) throws IOException {
+    }
+
+    @Override
+    public void cleanupOnSuccess(String location, Job job) throws IOException {
+    }
+
+    @Override
+    public String relToAbsPathForStoreLocation(String location, Path curDir) throws IOException {
+        return location;
+    }
+
+    @Override
+    public OutputFormat getOutputFormat() throws IOException {
+        return outputFormat;
+    }
+
+    @Override
+    public void checkSchema(ResourceSchema s) throws IOException {
+        schema = s;
+        getUDFProperties().setProperty(contextSignature + SCHEMA, ObjectSerializer.serialize(schema));
+    }
 
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/phoenix/blob/b70f3abd/phoenix-pig/src/main/java/org/apache/phoenix/pig/PhoenixPigConfiguration.java
----------------------------------------------------------------------
diff --git a/phoenix-pig/src/main/java/org/apache/phoenix/pig/PhoenixPigConfiguration.java b/phoenix-pig/src/main/java/org/apache/phoenix/pig/PhoenixPigConfiguration.java
deleted file mode 100644
index c6b6ec9..0000000
--- a/phoenix-pig/src/main/java/org/apache/phoenix/pig/PhoenixPigConfiguration.java
+++ /dev/null
@@ -1,340 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.phoenix.pig;
-
-import static org.apache.commons.lang.StringUtils.isNotEmpty;
-
-import java.sql.Connection;
-import java.sql.DriverManager;
-import java.sql.SQLException;
-import java.util.List;
-import java.util.Properties;
-
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.phoenix.jdbc.PhoenixConnection;
-import org.apache.phoenix.pig.util.ColumnInfoToStringEncoderDecoder;
-import org.apache.phoenix.util.ColumnInfo;
-import org.apache.phoenix.util.PhoenixRuntime;
-import org.apache.phoenix.util.QueryUtil;
-
-import com.google.common.annotations.VisibleForTesting;
-import com.google.common.base.Joiner;
-import com.google.common.base.Preconditions;
-import com.google.common.base.Splitter;
-import com.google.common.collect.Lists;
-
-
-/**
- * A container for configuration to be used with {@link PhoenixHBaseStorage} and {@link PhoenixHBaseLoader}
- * 
- */
-public class PhoenixPigConfiguration {
-	
-	private static final Log LOG = LogFactory.getLog(PhoenixPigConfiguration.class);
-	
-	private PhoenixPigConfigurationUtil util;
-	
-	/**
-	 * Speculative execution of Map tasks
-	 */
-	public static final String MAP_SPECULATIVE_EXEC = "mapred.map.tasks.speculative.execution";
-
-	/**
-	 * Speculative execution of Reduce tasks
-	 */
-	public static final String REDUCE_SPECULATIVE_EXEC = "mapred.reduce.tasks.speculative.execution";
-	
-	public static final String SERVER_NAME = "phoenix.hbase.server.name";
-	
-	public static final String TABLE_NAME = "phoenix.hbase.table.name";
-	
-	public static final String UPSERT_COLUMNS = "phoenix.hbase.upsert.columns";
-	
-	public static final String UPSERT_STATEMENT = "phoenix.upsert.stmt";
-	
-	public static final String UPSERT_COLUMN_INFO_KEY  = "phoenix.upsert.columninfos.list";
-	
-	public static final String SELECT_STATEMENT = "phoenix.select.stmt";
-	
-	public static final String UPSERT_BATCH_SIZE = "phoenix.upsert.batch.size";
-	
-	//columns projected given as part of LOAD.
-	public static final String SELECT_COLUMNS = "phoneix.select.query.columns";
-	
-	public static final String SELECT_COLUMN_INFO_KEY  = "phoenix.select.columninfos.list";
-	
-	public static final String SCHEMA_TYPE = "phoenix.select.schema.type";
-	
-	// the delimiter supported during LOAD and STORE when projected columns are given.
-	public static final String COLUMN_NAMES_DELIMITER = "phoenix.column.names.delimiter";
-	
-	public static final long DEFAULT_UPSERT_BATCH_SIZE = 1000;
-	
-	public static final String DEFAULT_COLUMN_NAMES_DELIMITER = ",";
-	
-	private final Configuration conf;
-		
-	public PhoenixPigConfiguration(Configuration conf) {
-		this.conf = conf;
-		this.util = new PhoenixPigConfigurationUtil();
-	}
-	
-	public void configure(String server, String tableName, long batchSize) {
-        configure(server,tableName,batchSize,null);
-    }
-	
-	public void configure(String server, String tableName, long batchSize, String columns) {
-	    conf.set(SERVER_NAME, server);
-        conf.set(TABLE_NAME, tableName);
-        conf.setLong(UPSERT_BATCH_SIZE, batchSize);
-        if (isNotEmpty(columns)) {
-            conf.set(UPSERT_COLUMNS, columns);
-        }
-        conf.setBoolean(MAP_SPECULATIVE_EXEC, false);
-        conf.setBoolean(REDUCE_SPECULATIVE_EXEC, false);
-	}
-	
-	
-	/**
-	 * Creates a {@link Connection} with autoCommit set to false.
-	 * @throws SQLException
-	 */
-	public Connection getConnection() throws SQLException {
-	    return getUtil().getConnection(getConfiguration());
-	}
-	
-	public String getUpsertStatement() throws SQLException {
-		return getUtil().getUpsertStatement(getConfiguration(), getTableName());
-	}
-
-	public long getBatchSize() throws SQLException {
-		return getUtil().getBatchSize(getConfiguration());
-	}
-
-	public String getServer() {
-		return conf.get(SERVER_NAME);
-	}
-
-	public List<ColumnInfo> getColumnMetadataList() throws SQLException {
-	    return getUtil().getUpsertColumnMetadataList(getConfiguration(), getTableName());
-	}
-	
-	public String getUpsertColumns() {
-	    return conf.get(UPSERT_COLUMNS);
-	}
-	
-	public String getTableName() {
-		return conf.get(TABLE_NAME);
-	}
-	
-	public Configuration getConfiguration() {
-		return this.conf;
-	}
-	
-	public String getSelectStatement() throws SQLException {
-	   return getUtil().getSelectStatement(getConfiguration(), getTableName());
-	}
-	
-	public List<ColumnInfo> getSelectColumnMetadataList() throws SQLException {
-        return getUtil().getSelectColumnMetadataList(getConfiguration(), getTableName());
-    }
-	
-	public int getSelectColumnsCount() throws SQLException {
-		return getUtil().getSelectColumnsCount(getConfiguration(), getTableName());
-	}
-	
-	public SchemaType getSchemaType() {
-		final String schemaTp = conf.get(SCHEMA_TYPE);
-		return SchemaType.valueOf(schemaTp);
-	}
-	
-	
-	public void setServerName(final String zookeeperQuorum) {
-	    this.conf.set(SERVER_NAME, zookeeperQuorum);
-	}
-	
-	public void setTableName(final String tableName) {
-	    Preconditions.checkNotNull(tableName, "HBase Table name cannot be null!");
-	    this.conf.set(TABLE_NAME, tableName);
-	}
-	
-	public void setSelectStatement(final String selectStatement) {
-	    this.conf.set(SELECT_STATEMENT, selectStatement);
-	}
-
-	public void setSelectColumns(String selectColumns) {
-        this.conf.set(SELECT_COLUMNS, selectColumns);
-    }
-	
-	public PhoenixPigConfigurationUtil getUtil() {
-	    return this.util;
-	}
-	
-	public void setSchemaType(final SchemaType schemaType) {
-		this.conf.set(SCHEMA_TYPE, schemaType.name());
-	}
-	
-	public enum SchemaType {
-		TABLE,
-		QUERY;
-	}
-	
-		
-	@VisibleForTesting
-	static class PhoenixPigConfigurationUtil {
-                
-        public Connection getConnection(final Configuration configuration) throws SQLException {
-            Preconditions.checkNotNull(configuration);
-            Properties props = new Properties();
-            final Connection conn = DriverManager.getConnection(QueryUtil.getUrl(configuration.get(SERVER_NAME)), props).unwrap(PhoenixConnection.class);
-            conn.setAutoCommit(false);
-            return conn;
-        }
-        
-      public List<ColumnInfo> getUpsertColumnMetadataList(final Configuration configuration,final String tableName) throws SQLException {
-            Preconditions.checkNotNull(configuration);
-            Preconditions.checkNotNull(tableName);
-            final String columnInfoStr = configuration.get(UPSERT_COLUMN_INFO_KEY);
-            if(isNotEmpty(columnInfoStr)) {
-                return ColumnInfoToStringEncoderDecoder.decode(columnInfoStr);
-            }
-            final Connection connection = getConnection(configuration);
-            String upsertColumns = configuration.get(UPSERT_COLUMNS);
-            List<String> upsertColumnList = null;
-            if(isNotEmpty(upsertColumns)) {
-                final String columnNamesDelimiter = configuration.get(COLUMN_NAMES_DELIMITER, DEFAULT_COLUMN_NAMES_DELIMITER);
-                upsertColumnList = Lists.newArrayList(Splitter.on(columnNamesDelimiter).omitEmptyStrings().trimResults().split(upsertColumns));
-                LOG.info(String.format("UseUpsertColumns=%s, upsertColumns=%s, upsertColumnSet.size()=%s, parsedColumns=%s "
-                        ,!upsertColumnList.isEmpty(),upsertColumns, upsertColumnList.size(), Joiner.on(DEFAULT_COLUMN_NAMES_DELIMITER).join(upsertColumnList)
-                        ));
-            } 
-           List<ColumnInfo> columnMetadataList = PhoenixRuntime.generateColumnInfo(connection, tableName, upsertColumnList);
-           final String encodedColumnInfos = ColumnInfoToStringEncoderDecoder.encode(columnMetadataList);
-           // we put the encoded column infos in the Configuration for re usability. 
-           configuration.set(UPSERT_COLUMN_INFO_KEY, encodedColumnInfos);
-           closeConnection(connection);
-           return columnMetadataList;
-        }
-        
-        public String getUpsertStatement(final Configuration configuration,final String tableName) throws SQLException {
-            Preconditions.checkNotNull(configuration);
-            Preconditions.checkNotNull(tableName);
-            String upsertStmt = configuration.get(UPSERT_STATEMENT);
-            if(isNotEmpty(upsertStmt)) {
-                return upsertStmt;
-            }
-            final boolean useUpsertColumns = isNotEmpty(configuration.get(UPSERT_COLUMNS,""));
-            final List<ColumnInfo> columnMetadataList = getUpsertColumnMetadataList(configuration, tableName);
-            if (useUpsertColumns) {
-                // Generating UPSERT statement without column name information.
-                upsertStmt = QueryUtil.constructUpsertStatement(tableName, columnMetadataList);
-                LOG.info("Phoenix Custom Upsert Statement: "+ upsertStmt);
-            } else {
-                // Generating UPSERT statement without column name information.
-                upsertStmt = QueryUtil.constructGenericUpsertStatement(tableName, columnMetadataList.size());
-                LOG.info("Phoenix Generic Upsert Statement: " + upsertStmt);
-            }
-            configuration.set(UPSERT_STATEMENT, upsertStmt);
-            return upsertStmt;
-            
-        }
-        
-        public List<ColumnInfo> getSelectColumnMetadataList(final Configuration configuration,final String tableName) throws SQLException {
-            Preconditions.checkNotNull(configuration);
-            Preconditions.checkNotNull(tableName);
-            final String columnInfoStr = configuration.get(SELECT_COLUMN_INFO_KEY);
-            if(isNotEmpty(columnInfoStr)) {
-                return ColumnInfoToStringEncoderDecoder.decode(columnInfoStr);
-            }
-            final Connection connection = getConnection(configuration);
-            final List<String> selectColumnList = getSelectColumnList(configuration);
-            final List<ColumnInfo> columnMetadataList = PhoenixRuntime.generateColumnInfo(connection, tableName, selectColumnList);
-            final String encodedColumnInfos = ColumnInfoToStringEncoderDecoder.encode(columnMetadataList);
-            // we put the encoded column infos in the Configuration for re usability. 
-            configuration.set(SELECT_COLUMN_INFO_KEY, encodedColumnInfos);
-            closeConnection(connection);
-            return columnMetadataList;
-        }
-
-		private List<String> getSelectColumnList(
-				final Configuration configuration) {
-			String selectColumns = configuration.get(SELECT_COLUMNS);
-            List<String> selectColumnList = null;
-            if(isNotEmpty(selectColumns)) {
-                final String columnNamesDelimiter = configuration.get(COLUMN_NAMES_DELIMITER, DEFAULT_COLUMN_NAMES_DELIMITER);
-                selectColumnList = Lists.newArrayList(Splitter.on(columnNamesDelimiter).omitEmptyStrings().trimResults().split(selectColumns));
-                LOG.info(String.format("UseSelectColumns=%s, selectColumns=%s, selectColumnSet.size()=%s, parsedColumns=%s "
-                        ,!selectColumnList.isEmpty(),selectColumns, selectColumnList.size(), Joiner.on(DEFAULT_COLUMN_NAMES_DELIMITER).join(selectColumnList)
-                        ));
-            }
-			return selectColumnList;
-		}
-        
-        public String getSelectStatement(final Configuration configuration,final String tableName) throws SQLException {
-            Preconditions.checkNotNull(configuration);
-            Preconditions.checkNotNull(tableName);
-            String selectStmt = configuration.get(SELECT_STATEMENT);
-            if(isNotEmpty(selectStmt)) {
-                return selectStmt;
-            }
-            final List<ColumnInfo> columnMetadataList = getSelectColumnMetadataList(configuration, tableName);
-            selectStmt = QueryUtil.constructSelectStatement(tableName, columnMetadataList);
-            LOG.info("Select Statement: "+ selectStmt);
-            configuration.set(SELECT_STATEMENT, selectStmt);
-            return selectStmt;
-        }
-        
-        public long getBatchSize(final Configuration configuration) throws SQLException {
-            Preconditions.checkNotNull(configuration);
-            long batchSize = configuration.getLong(UPSERT_BATCH_SIZE, DEFAULT_UPSERT_BATCH_SIZE);
-            if(batchSize <= 0) {
-               Connection conn = getConnection(configuration);
-               batchSize = ((PhoenixConnection) conn).getMutateBatchSize();
-               closeConnection(conn);
-            }
-            configuration.setLong(UPSERT_BATCH_SIZE, batchSize);
-            return batchSize;
-        }
-        
-        public int getSelectColumnsCount(Configuration configuration,
-				String tableName) throws SQLException {
-        	Preconditions.checkNotNull(configuration);
-        	final String schemaTp = configuration.get(SCHEMA_TYPE);
-        	final SchemaType schemaType = SchemaType.valueOf(schemaTp);
-        	int count = 0;
-        	if(SchemaType.QUERY.equals(schemaType)) {
-        		List<String> selectedColumnList = getSelectColumnList(configuration);
-        		count = selectedColumnList == null ? 0 : selectedColumnList.size();
-        	} else {
-        		List<ColumnInfo> columnInfos = getSelectColumnMetadataList(configuration,tableName);
-        		count = columnInfos == null ? 0 : columnInfos.size();
-        	}
-			return count;
-		}
-        
-        private void closeConnection(final Connection connection) throws SQLException {
-            if(connection != null) {
-                connection.close();
-            }
-        }
-    }
-    
-}

http://git-wip-us.apache.org/repos/asf/phoenix/blob/b70f3abd/phoenix-pig/src/main/java/org/apache/phoenix/pig/hadoop/PhoenixInputFormat.java
----------------------------------------------------------------------
diff --git a/phoenix-pig/src/main/java/org/apache/phoenix/pig/hadoop/PhoenixInputFormat.java b/phoenix-pig/src/main/java/org/apache/phoenix/pig/hadoop/PhoenixInputFormat.java
deleted file mode 100644
index b58e7e3..0000000
--- a/phoenix-pig/src/main/java/org/apache/phoenix/pig/hadoop/PhoenixInputFormat.java
+++ /dev/null
@@ -1,142 +0,0 @@
-/*
- * Copyright 2010 The Apache Software Foundation
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- *distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you maynot use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicablelaw or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.phoenix.pig.hadoop;
-
-import java.io.IOException;
-import java.sql.Connection;
-import java.sql.SQLException;
-import java.sql.Statement;
-import java.util.List;
-
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.hbase.client.Scan;
-import org.apache.hadoop.io.NullWritable;
-import org.apache.hadoop.mapreduce.InputFormat;
-import org.apache.hadoop.mapreduce.InputSplit;
-import org.apache.hadoop.mapreduce.JobContext;
-import org.apache.hadoop.mapreduce.RecordReader;
-import org.apache.hadoop.mapreduce.TaskAttemptContext;
-import org.apache.phoenix.compile.QueryPlan;
-import org.apache.phoenix.jdbc.PhoenixStatement;
-import org.apache.phoenix.pig.PhoenixPigConfiguration;
-import org.apache.phoenix.query.KeyRange;
-
-import com.google.common.base.Preconditions;
-import com.google.common.collect.Lists;
-
-/**
- * The InputFormat class for generating the splits and creating the record readers.
- * 
- */
-public final class PhoenixInputFormat extends InputFormat<NullWritable, PhoenixRecord> {
-
-    private static final Log LOG = LogFactory.getLog(PhoenixInputFormat.class);
-    private PhoenixPigConfiguration phoenixConfiguration;
-    private Connection connection;
-    private QueryPlan  queryPlan;
-    
-    /**
-     * instantiated by framework
-     */
-    public PhoenixInputFormat() {
-    }
-
-    @Override
-    public RecordReader<NullWritable, PhoenixRecord> createRecordReader(InputSplit split, TaskAttemptContext context)
-            throws IOException, InterruptedException {       
-        setConf(context.getConfiguration());
-        final QueryPlan queryPlan = getQueryPlan(context);
-        try {
-            return new PhoenixRecordReader(phoenixConfiguration,queryPlan);    
-        }catch(SQLException sqle) {
-            throw new IOException(sqle);
-        }
-    }
-    
-   
-
-    @Override
-    public List<InputSplit> getSplits(JobContext context) throws IOException, InterruptedException {        
-        setConf(context.getConfiguration());
-        final QueryPlan queryPlan = getQueryPlan(context);
-        final List<KeyRange> allSplits = queryPlan.getSplits();
-        final List<InputSplit> splits = generateSplits(queryPlan,allSplits);
-        return splits;
-    }
-
-    private List<InputSplit> generateSplits(final QueryPlan qplan, final List<KeyRange> splits) throws IOException {
-        Preconditions.checkNotNull(qplan);
-        Preconditions.checkNotNull(splits);
-        final List<InputSplit> psplits = Lists.newArrayListWithExpectedSize(splits.size());
-        for (List<Scan> scans : qplan.getScans()) {
-            psplits.add(new PhoenixInputSplit(scans));
-        }
-        return psplits;
-    }
-    
-    public void setConf(Configuration configuration) {
-        this.phoenixConfiguration = new PhoenixPigConfiguration(configuration);
-    }
-
-    public PhoenixPigConfiguration getConf() {
-        return this.phoenixConfiguration;
-    }
-    
-    private Connection getConnection() {
-        try {
-            if (this.connection == null) {
-                this.connection = phoenixConfiguration.getConnection();
-           }
-        } catch (Exception e) {
-            throw new RuntimeException(e);
-        }
-        return connection;
-    }
-    
-    /**
-     * Returns the query plan associated with the select query.
-     * @param context
-     * @return
-     * @throws IOException
-     * @throws SQLException
-     */
-    private QueryPlan getQueryPlan(final JobContext context) throws IOException {
-        Preconditions.checkNotNull(context);
-        if(queryPlan == null) {
-            try{
-                final Connection connection = getConnection();
-                final String selectStatement = getConf().getSelectStatement();
-                Preconditions.checkNotNull(selectStatement);
-                final Statement statement = connection.createStatement();
-                final PhoenixStatement pstmt = statement.unwrap(PhoenixStatement.class);
-                // Optimize the query plan so that we potentially use secondary indexes
-                this.queryPlan = pstmt.optimizeQuery(selectStatement);
-                // Initialize the query plan so it sets up the parallel scans
-                queryPlan.iterator();
-            } catch(Exception exception) {
-                LOG.error(String.format("Failed to get the query plan with error [%s]",exception.getMessage()));
-                throw new RuntimeException(exception);
-            }
-        }
-        return queryPlan;
-    }
-}

http://git-wip-us.apache.org/repos/asf/phoenix/blob/b70f3abd/phoenix-pig/src/main/java/org/apache/phoenix/pig/hadoop/PhoenixInputSplit.java
----------------------------------------------------------------------
diff --git a/phoenix-pig/src/main/java/org/apache/phoenix/pig/hadoop/PhoenixInputSplit.java b/phoenix-pig/src/main/java/org/apache/phoenix/pig/hadoop/PhoenixInputSplit.java
deleted file mode 100644
index b1d015a..0000000
--- a/phoenix-pig/src/main/java/org/apache/phoenix/pig/hadoop/PhoenixInputSplit.java
+++ /dev/null
@@ -1,134 +0,0 @@
-/*
- * Copyright 2010 The Apache Software Foundation
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- *distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you maynot use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicablelaw or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.phoenix.pig.hadoop;
-
-import java.io.DataInput;
-import java.io.DataOutput;
-import java.io.IOException;
-import java.util.List;
-
-import org.apache.hadoop.hbase.client.Scan;
-import org.apache.hadoop.hbase.protobuf.ProtobufUtil;
-import org.apache.hadoop.hbase.protobuf.generated.ClientProtos;
-import org.apache.hadoop.io.Writable;
-import org.apache.hadoop.io.WritableUtils;
-import org.apache.hadoop.mapreduce.InputSplit;
-import org.apache.phoenix.query.KeyRange;
-
-import com.google.common.base.Preconditions;
-import com.google.common.collect.Lists;
-
-/**
- * 
- * Input split class to hold the lower and upper bound range. {@link KeyRange}
- * 
- */
-public class PhoenixInputSplit extends InputSplit implements Writable {
-
-    private List<Scan> scans;
-    private KeyRange keyRange;
-   
-    /**
-     * No Arg constructor
-     */
-    public PhoenixInputSplit() {
-    }
-    
-   /**
-    * 
-    * @param keyRange
-    */
-    public PhoenixInputSplit(final List<Scan> scans) {
-        Preconditions.checkNotNull(scans);
-        Preconditions.checkState(!scans.isEmpty());
-        this.scans = scans;
-        init();
-    }
-    
-    public List<Scan> getScans() {
-        return scans;
-    }
-    
-    public KeyRange getKeyRange() {
-        return keyRange;
-    }
-    
-    private void init() {
-        this.keyRange = KeyRange.getKeyRange(scans.get(0).getStartRow(), scans.get(scans.size()-1).getStopRow());
-    }
-    
-    @Override
-    public void readFields(DataInput input) throws IOException {
-        int count = WritableUtils.readVInt(input);
-        scans = Lists.newArrayListWithExpectedSize(count);
-        for (int i = 0; i < count; i++) {
-            byte[] protoScanBytes = new byte[WritableUtils.readVInt(input)];
-            input.readFully(protoScanBytes);
-            ClientProtos.Scan protoScan = ClientProtos.Scan.parseFrom(protoScanBytes);
-            Scan scan = ProtobufUtil.toScan(protoScan);
-            scans.add(scan);
-        }
-        init();
-    }
-    
-    @Override
-    public void write(DataOutput output) throws IOException {
-        Preconditions.checkNotNull(scans);
-        WritableUtils.writeVInt(output, scans.size());
-        for (Scan scan : scans) {
-            ClientProtos.Scan protoScan = ProtobufUtil.toScan(scan);
-            byte[] protoScanBytes = protoScan.toByteArray();
-            WritableUtils.writeVInt(output, protoScanBytes.length);
-            output.write(protoScanBytes);
-        }
-    }
-
-    @Override
-    public long getLength() throws IOException, InterruptedException {
-         return 0;
-    }
-
-    @Override
-    public String[] getLocations() throws IOException, InterruptedException {
-        return new String[]{};
-    }
-
-    @Override
-    public int hashCode() {
-        final int prime = 31;
-        int result = 1;
-        result = prime * result + keyRange.hashCode();
-        return result;
-    }
-
-    @Override
-    public boolean equals(Object obj) {
-        // TODO: review: it's a reasonable check to use the keyRange,
-        // but it's not perfect. Do we need an equals impl?
-        if (this == obj) { return true; }
-        if (obj == null) { return false; }
-        if (!(obj instanceof PhoenixInputSplit)) { return false; }
-        PhoenixInputSplit other = (PhoenixInputSplit)obj;
-        if (keyRange == null) {
-            if (other.keyRange != null) { return false; }
-        } else if (!keyRange.equals(other.keyRange)) { return false; }
-        return true;
-    }
-}

http://git-wip-us.apache.org/repos/asf/phoenix/blob/b70f3abd/phoenix-pig/src/main/java/org/apache/phoenix/pig/hadoop/PhoenixOutputCommitter.java
----------------------------------------------------------------------
diff --git a/phoenix-pig/src/main/java/org/apache/phoenix/pig/hadoop/PhoenixOutputCommitter.java b/phoenix-pig/src/main/java/org/apache/phoenix/pig/hadoop/PhoenixOutputCommitter.java
deleted file mode 100644
index a8d9d8f..0000000
--- a/phoenix-pig/src/main/java/org/apache/phoenix/pig/hadoop/PhoenixOutputCommitter.java
+++ /dev/null
@@ -1,111 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.phoenix.pig.hadoop;
-
-import java.io.IOException;
-import java.sql.Connection;
-import java.sql.SQLException;
-
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
-import org.apache.hadoop.mapreduce.JobContext;
-import org.apache.hadoop.mapreduce.OutputCommitter;
-import org.apache.hadoop.mapreduce.TaskAttemptContext;
-
-import org.apache.phoenix.jdbc.PhoenixStatement;
-
-/**
- * 
- * {@link OutputCommitter} implementation for Pig tasks using Phoenix
- * connections to upsert to HBase
- * 
- * 
- *
- */
-public class PhoenixOutputCommitter extends OutputCommitter {
-	private final Log LOG = LogFactory.getLog(PhoenixOutputCommitter.class);
-	
-	private final PhoenixOutputFormat outputFormat;
-	
-	public PhoenixOutputCommitter(PhoenixOutputFormat outputFormat) {
-		if(outputFormat == null) {
-			throw new IllegalArgumentException("PhoenixOutputFormat must not be null.");
-		}
-		this.outputFormat = outputFormat;
-	}
-
-	/**
-	 *  TODO implement rollback functionality. 
-	 *  
-	 *  {@link PhoenixStatement#execute(String)} is buffered on the client, this makes 
-	 *  it difficult to implement rollback as once a commit is issued it's hard to go 
-	 *  back all the way to undo. 
-	 */
-	@Override
-	public void abortTask(TaskAttemptContext context) throws IOException {
-	}
-
-	@Override
-	public void commitTask(TaskAttemptContext context) throws IOException {
-		commit(outputFormat.getConnection(context.getConfiguration()));
-	}
-
-	@Override
-	public boolean needsTaskCommit(TaskAttemptContext context) throws IOException {
-		return true;
-	}
-
-	@Override
-	public void setupJob(JobContext jobContext) throws IOException {		
-	}
-
-	@Override
-	public void setupTask(TaskAttemptContext context) throws IOException {
-	}
-
-	/**
-	 * Commit a transaction on task completion
-	 * 
-	 * @param connection
-	 * @throws IOException
-	 */
-	private void commit(Connection connection) throws IOException {
-		try {
-			if (connection == null || connection.isClosed()) {
-				throw new IOException("Trying to commit a connection that is null or closed: "+ connection);
-			}
-		} catch (SQLException e) {
-			throw new IOException("Exception calling isClosed on connection", e);
-		}
-
-		try {
-			LOG.debug("Commit called on task completion");
-			connection.commit();
-		} catch (SQLException e) {
-			throw new IOException("Exception while trying to commit a connection. ", e);
-		} finally {
-			try {
-				LOG.debug("Closing connection to database on task completion");
-				connection.close();
-			} catch (SQLException e) {
-				LOG.warn("Exception while trying to close database connection", e);
-			}
-		}
-	}
-}

http://git-wip-us.apache.org/repos/asf/phoenix/blob/b70f3abd/phoenix-pig/src/main/java/org/apache/phoenix/pig/hadoop/PhoenixOutputFormat.java
----------------------------------------------------------------------
diff --git a/phoenix-pig/src/main/java/org/apache/phoenix/pig/hadoop/PhoenixOutputFormat.java b/phoenix-pig/src/main/java/org/apache/phoenix/pig/hadoop/PhoenixOutputFormat.java
deleted file mode 100644
index 9c29f8f..0000000
--- a/phoenix-pig/src/main/java/org/apache/phoenix/pig/hadoop/PhoenixOutputFormat.java
+++ /dev/null
@@ -1,94 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.phoenix.pig.hadoop;
-
-import java.io.IOException;
-import java.sql.Connection;
-import java.sql.SQLException;
-
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.io.NullWritable;
-import org.apache.hadoop.mapreduce.JobContext;
-import org.apache.hadoop.mapreduce.OutputCommitter;
-import org.apache.hadoop.mapreduce.OutputFormat;
-import org.apache.hadoop.mapreduce.RecordWriter;
-import org.apache.hadoop.mapreduce.TaskAttemptContext;
-
-import org.apache.phoenix.pig.PhoenixPigConfiguration;
-
-/**
- * {@link OutputFormat} implementation for Phoenix
- * 
- * 
- *
- */
-public class PhoenixOutputFormat extends OutputFormat<NullWritable, PhoenixRecord> {
-	private static final Log LOG = LogFactory.getLog(PhoenixOutputFormat.class);
-	
-	private Connection connection;
-	private PhoenixPigConfiguration config;
-
-	@Override
-	public void checkOutputSpecs(JobContext jobContext) throws IOException, InterruptedException {		
-	}
-
-	/**
-	 * TODO Implement {@link OutputCommitter} to rollback in case of task failure
-	 */
-	
-	@Override
-	public OutputCommitter getOutputCommitter(TaskAttemptContext context) throws IOException, InterruptedException {
-		return new PhoenixOutputCommitter(this);
-	}
-
-	@Override
-	public RecordWriter<NullWritable, PhoenixRecord> getRecordWriter(TaskAttemptContext context) throws IOException, InterruptedException {
-		try {
-			return new PhoenixRecordWriter(getConnection(context.getConfiguration()), config);
-		} catch (SQLException e) {
-			throw new IOException(e);
-		}
-	}
-	
-	/**
-	 * This method creates a database connection. A single instance is created
-	 * and passed around for re-use.
-	 * 
-	 * @param configuration
-	 * @return
-	 * @throws IOException
-	 */
-	synchronized Connection getConnection(Configuration configuration) throws IOException {
-	    if (connection != null) { 
-	    	return connection; 
-	    }
-	    
-	    config = new PhoenixPigConfiguration(configuration);	    
-		try {
-			LOG.info("Initializing new Phoenix connection...");
-			connection = config.getConnection();
-			LOG.info("Initialized Phoenix connection, autoCommit="+ connection.getAutoCommit());
-			return connection;
-		} catch (SQLException e) {
-			throw new IOException(e);
-		}
-	}
-}

http://git-wip-us.apache.org/repos/asf/phoenix/blob/b70f3abd/phoenix-pig/src/main/java/org/apache/phoenix/pig/hadoop/PhoenixRecord.java
----------------------------------------------------------------------
diff --git a/phoenix-pig/src/main/java/org/apache/phoenix/pig/hadoop/PhoenixRecord.java b/phoenix-pig/src/main/java/org/apache/phoenix/pig/hadoop/PhoenixRecord.java
deleted file mode 100644
index 5063ed0..0000000
--- a/phoenix-pig/src/main/java/org/apache/phoenix/pig/hadoop/PhoenixRecord.java
+++ /dev/null
@@ -1,112 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.phoenix.pig.hadoop;
-
-import java.io.DataInput;
-import java.io.DataOutput;
-import java.io.IOException;
-import java.sql.PreparedStatement;
-import java.sql.ResultSet;
-import java.sql.SQLException;
-import java.util.ArrayList;
-import java.util.List;
-
-import org.apache.hadoop.io.Writable;
-import org.apache.phoenix.pig.util.TypeUtil;
-import org.apache.phoenix.schema.PDataType;
-import org.apache.phoenix.util.ColumnInfo;
-import org.apache.pig.ResourceSchema.ResourceFieldSchema;
-import org.apache.pig.data.DataType;
-
-import com.google.common.base.Preconditions;
-
-/**
- * A {@link Writable} representing a Phoenix record. This class
- * a) does a type mapping and sets the value accordingly in the {@link PreparedStatement}
- * b) reads the column values from the {@link ResultSet}
- * 
- */
-public class PhoenixRecord implements Writable {
-	
-	private final List<Object> values;
-	private final ResourceFieldSchema[] fieldSchemas;
-	
-	public PhoenixRecord() {
-	    this(null);
-	}
-	
-	public PhoenixRecord(ResourceFieldSchema[] fieldSchemas) {
-		this.values = new ArrayList<Object>();
-		this.fieldSchemas = fieldSchemas;
-	}
-	
-	@Override
-	public void readFields(DataInput in) throws IOException {		
-	}
-
-	@Override
-	public void write(DataOutput out) throws IOException {		
-	}
-	
-	public void write(PreparedStatement statement, List<ColumnInfo> columnMetadataList) throws SQLException {
-		for (int i = 0; i < columnMetadataList.size(); i++) {
-			Object o = values.get(i);
-			ColumnInfo columnInfo = columnMetadataList.get(i);
-			byte type = (fieldSchemas == null) ? DataType.findType(o) : fieldSchemas[i].getType();
-			try {
-                Object upsertValue = convertTypeSpecificValue(o, type, columnInfo.getSqlType());
-                if (upsertValue != null) {
-                    statement.setObject(i + 1, upsertValue, columnInfo.getSqlType());
-                } else {
-                    statement.setNull(i + 1, columnInfo.getSqlType());
-                }
-            } catch (RuntimeException re) {
-                throw new RuntimeException(String.format("Unable to process column %s, innerMessage=%s"
-                        ,columnInfo.toString(),re.getMessage()),re);
-                
-            }
-		}
-		
-		statement.execute();
-	}
-	
-	public void read(final ResultSet rs, final int noOfColumns) throws SQLException {
-	    Preconditions.checkNotNull(rs);
-        Preconditions.checkArgument(noOfColumns > 0, "No of arguments passed is <= 0");
-        values.clear();
-        for(int i = 1 ; i <= noOfColumns ; i++) {
-            Object obj = rs.getObject(i);
-            values.add(obj);
-        }
-	}
-	
-	public void add(Object value) {
-		values.add(value);
-	}
-
-	private Object convertTypeSpecificValue(Object o, byte type, Integer sqlType) {
-		PDataType pDataType = PDataType.fromTypeId(sqlType);
-
-		return TypeUtil.castPigTypeToPhoenix(o, type, pDataType);
-	}
-
-    public List<Object> getValues() {
-        return values;
-    }
-}

http://git-wip-us.apache.org/repos/asf/phoenix/blob/b70f3abd/phoenix-pig/src/main/java/org/apache/phoenix/pig/hadoop/PhoenixRecordReader.java
----------------------------------------------------------------------
diff --git a/phoenix-pig/src/main/java/org/apache/phoenix/pig/hadoop/PhoenixRecordReader.java b/phoenix-pig/src/main/java/org/apache/phoenix/pig/hadoop/PhoenixRecordReader.java
deleted file mode 100644
index f6808a8..0000000
--- a/phoenix-pig/src/main/java/org/apache/phoenix/pig/hadoop/PhoenixRecordReader.java
+++ /dev/null
@@ -1,142 +0,0 @@
-/*
- * Copyright 2010 The Apache Software Foundation
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- *distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you maynot use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicablelaw or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.phoenix.pig.hadoop;
-
-import java.io.IOException;
-import java.sql.SQLException;
-import java.util.List;
-
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
-import org.apache.hadoop.hbase.client.Scan;
-import org.apache.hadoop.io.NullWritable;
-import org.apache.hadoop.mapreduce.InputSplit;
-import org.apache.hadoop.mapreduce.RecordReader;
-import org.apache.hadoop.mapreduce.TaskAttemptContext;
-import org.apache.phoenix.compile.QueryPlan;
-import org.apache.phoenix.iterate.ConcatResultIterator;
-import org.apache.phoenix.iterate.LookAheadResultIterator;
-import org.apache.phoenix.iterate.PeekingResultIterator;
-import org.apache.phoenix.iterate.ResultIterator;
-import org.apache.phoenix.iterate.SequenceResultIterator;
-import org.apache.phoenix.iterate.TableResultIterator;
-import org.apache.phoenix.jdbc.PhoenixResultSet;
-import org.apache.phoenix.pig.PhoenixPigConfiguration;
-
-import com.google.common.base.Preconditions;
-import com.google.common.base.Throwables;
-import com.google.common.collect.Lists;
-
-/**
- * RecordReader that process the scan and returns PhoenixRecord
- * 
- */
-public final class PhoenixRecordReader extends RecordReader<NullWritable,PhoenixRecord>{
-    
-    private static final Log LOG = LogFactory.getLog(PhoenixRecordReader.class);
-    private final PhoenixPigConfiguration phoenixConfiguration;
-    private final QueryPlan queryPlan;
-    private final int columnsCount;
-    private NullWritable key =  NullWritable.get();
-    private PhoenixRecord value = null;
-    private ResultIterator resultIterator = null;
-    private PhoenixResultSet resultSet;
-    
-    public PhoenixRecordReader(final PhoenixPigConfiguration pConfiguration,final QueryPlan qPlan) throws SQLException {
-        
-        Preconditions.checkNotNull(pConfiguration);
-        Preconditions.checkNotNull(qPlan);
-        this.phoenixConfiguration = pConfiguration;
-        this.queryPlan = qPlan;
-        this.columnsCount = phoenixConfiguration.getSelectColumnsCount();
-     }
-
-    @Override
-    public void close() throws IOException {
-       if(resultIterator != null) {
-           try {
-               resultIterator.close();
-        } catch (SQLException e) {
-           LOG.error(" Error closing resultset.");
-        }
-       }
-    }
-
-    @Override
-    public NullWritable getCurrentKey() throws IOException, InterruptedException {
-        return key;
-    }
-
-    @Override
-    public PhoenixRecord getCurrentValue() throws IOException, InterruptedException {
-        return value;
-    }
-
-    @Override
-    public float getProgress() throws IOException, InterruptedException {
-        return 0;
-    }
-
-    @Override
-    public void initialize(InputSplit split, TaskAttemptContext context) throws IOException, InterruptedException {
-        final PhoenixInputSplit pSplit = (PhoenixInputSplit)split;
-        final List<Scan> scans = pSplit.getScans();
-        try {
-            List<PeekingResultIterator> iterators = Lists.newArrayListWithExpectedSize(scans.size());
-            for (Scan scan : scans) {
-                final TableResultIterator tableResultIterator = new TableResultIterator(queryPlan.getContext(), queryPlan.getTableRef(),scan);
-                PeekingResultIterator peekingResultIterator = LookAheadResultIterator.wrap(tableResultIterator);
-                iterators.add(peekingResultIterator);
-            }
-            ResultIterator iterator = ConcatResultIterator.newIterator(iterators);
-            if(queryPlan.getContext().getSequenceManager().getSequenceCount() > 0) {
-                iterator = new SequenceResultIterator(iterator, queryPlan.getContext().getSequenceManager());
-            }
-            this.resultIterator = iterator;
-            this.resultSet = new PhoenixResultSet(this.resultIterator, queryPlan.getProjector(),queryPlan.getContext().getStatement());
-        } catch (SQLException e) {
-            LOG.error(String.format(" Error [%s] initializing PhoenixRecordReader. ",e.getMessage()));
-            Throwables.propagate(e);
-        }
-        
-   }
-
-    @Override
-    public boolean nextKeyValue() throws IOException, InterruptedException {
-        if (key == null) {
-            key = NullWritable.get();
-        }
-        if (value == null) {
-            value =  new PhoenixRecord();
-        }
-        Preconditions.checkNotNull(this.resultSet);
-        try {
-            if(!resultSet.next()) {
-                return false;
-            }
-            value.read(resultSet,columnsCount);
-            return true;
-        } catch (SQLException e) {
-            LOG.error(String.format(" Error [%s] occurred while iterating over the resultset. ",e.getMessage()));
-            Throwables.propagate(e);
-        }
-        return false;
-    }
-}

http://git-wip-us.apache.org/repos/asf/phoenix/blob/b70f3abd/phoenix-pig/src/main/java/org/apache/phoenix/pig/hadoop/PhoenixRecordWriter.java
----------------------------------------------------------------------
diff --git a/phoenix-pig/src/main/java/org/apache/phoenix/pig/hadoop/PhoenixRecordWriter.java b/phoenix-pig/src/main/java/org/apache/phoenix/pig/hadoop/PhoenixRecordWriter.java
deleted file mode 100644
index c980a38..0000000
--- a/phoenix-pig/src/main/java/org/apache/phoenix/pig/hadoop/PhoenixRecordWriter.java
+++ /dev/null
@@ -1,83 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.phoenix.pig.hadoop;
-
-import java.io.IOException;
-import java.sql.Connection;
-import java.sql.PreparedStatement;
-import java.sql.SQLException;
-
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
-import org.apache.hadoop.io.NullWritable;
-import org.apache.hadoop.mapreduce.RecordWriter;
-import org.apache.hadoop.mapreduce.TaskAttemptContext;
-
-import org.apache.phoenix.pig.PhoenixPigConfiguration;
-
-/**
- * 
- * {@link RecordWriter} implementation for Phoenix
- * 
- * 
- *
- */
-public class PhoenixRecordWriter extends RecordWriter<NullWritable, PhoenixRecord> {
-	
-	private static final Log LOG = LogFactory.getLog(PhoenixRecordWriter.class);
-	
-	private long numRecords = 0;
-	
-	private final Connection conn;
-	private final PreparedStatement statement;
-	private final PhoenixPigConfiguration config;
-	private final long batchSize;
-	
-	public PhoenixRecordWriter(Connection conn, PhoenixPigConfiguration config) throws SQLException {
-		this.conn = conn;
-		this.config = config;
-		this.batchSize = config.getBatchSize();
-		this.statement = this.conn.prepareStatement(config.getUpsertStatement());
-	}
-
-
-	/**
-	 * Committing and closing the connection is handled by {@link PhoenixOutputCommitter}.
-	 * 
-	 */
-	@Override
-	public void close(TaskAttemptContext context) throws IOException, InterruptedException {
-	}
-
-	@Override
-	public void write(NullWritable n, PhoenixRecord record) throws IOException, InterruptedException {		
-		try {
-			record.write(statement, config.getColumnMetadataList());
-			numRecords++;
-
-			if (numRecords % batchSize == 0) {
-				LOG.debug("commit called on a batch of size : " + batchSize);
-				conn.commit();
-			}
-		} catch (SQLException e) {
-			throw new IOException("Exception while committing to database.", e);
-		}
-	}
-
-}

http://git-wip-us.apache.org/repos/asf/phoenix/blob/b70f3abd/phoenix-pig/src/main/java/org/apache/phoenix/pig/util/ColumnInfoToStringEncoderDecoder.java
----------------------------------------------------------------------
diff --git a/phoenix-pig/src/main/java/org/apache/phoenix/pig/util/ColumnInfoToStringEncoderDecoder.java b/phoenix-pig/src/main/java/org/apache/phoenix/pig/util/ColumnInfoToStringEncoderDecoder.java
deleted file mode 100644
index 3ea9b5b..0000000
--- a/phoenix-pig/src/main/java/org/apache/phoenix/pig/util/ColumnInfoToStringEncoderDecoder.java
+++ /dev/null
@@ -1,69 +0,0 @@
-/*
- * Copyright 2010 The Apache Software Foundation
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- *distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you maynot use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicablelaw or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.phoenix.pig.util;
-
-import java.util.List;
-
-import org.apache.phoenix.util.ColumnInfo;
-
-import com.google.common.base.Function;
-import com.google.common.base.Joiner;
-import com.google.common.base.Preconditions;
-import com.google.common.base.Splitter;
-import com.google.common.collect.Iterables;
-import com.google.common.collect.Lists;
-
-/**
- * 
- * A codec to transform a {@link ColumnInfo} to a {@link String} and decode back.
- *
- */
-public final class ColumnInfoToStringEncoderDecoder {
-
-    private static final String COLUMN_INFO_DELIMITER = "|";
-    
-    private ColumnInfoToStringEncoderDecoder() {
-        
-    }
-    
-    public static String encode(List<ColumnInfo> columnInfos) {
-        Preconditions.checkNotNull(columnInfos);
-        return Joiner.on(COLUMN_INFO_DELIMITER).
-                        skipNulls().join(columnInfos);
-    }
-    
-    public static List<ColumnInfo> decode(final String columnInfoStr) {
-        Preconditions.checkNotNull(columnInfoStr);
-        List<ColumnInfo> columnInfos = Lists.newArrayList(
-                                Iterables.transform(
-                                        Splitter.on(COLUMN_INFO_DELIMITER).omitEmptyStrings().split(columnInfoStr),
-                                        new Function<String, ColumnInfo>() {
-                                            @Override
-                                            public ColumnInfo apply(String colInfo) {
-                                                if (colInfo.isEmpty()) {
-                                                      return null;
-                                                }
-                                                return ColumnInfo.fromString(colInfo);
-                                            }
-                                        }));
-        return columnInfos;
-        
-    }
-}

http://git-wip-us.apache.org/repos/asf/phoenix/blob/b70f3abd/phoenix-pig/src/main/java/org/apache/phoenix/pig/util/PhoenixPigSchemaUtil.java
----------------------------------------------------------------------
diff --git a/phoenix-pig/src/main/java/org/apache/phoenix/pig/util/PhoenixPigSchemaUtil.java b/phoenix-pig/src/main/java/org/apache/phoenix/pig/util/PhoenixPigSchemaUtil.java
index 9f8a5e4..4f7d776 100644
--- a/phoenix-pig/src/main/java/org/apache/phoenix/pig/util/PhoenixPigSchemaUtil.java
+++ b/phoenix-pig/src/main/java/org/apache/phoenix/pig/util/PhoenixPigSchemaUtil.java
@@ -25,8 +25,9 @@ import java.util.List;
 
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
-import org.apache.phoenix.pig.PhoenixPigConfiguration;
-import org.apache.phoenix.pig.PhoenixPigConfiguration.SchemaType;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.phoenix.mapreduce.util.PhoenixConfigurationUtil;
+import org.apache.phoenix.mapreduce.util.PhoenixConfigurationUtil.SchemaType;
 import org.apache.phoenix.schema.PDataType;
 import org.apache.phoenix.util.ColumnInfo;
 import org.apache.pig.ResourceSchema;
@@ -46,19 +47,20 @@ public final class PhoenixPigSchemaUtil {
     private PhoenixPigSchemaUtil() {
     }
     
-    public static ResourceSchema getResourceSchema(final PhoenixPigConfiguration phoenixConfiguration) throws IOException {
+public static ResourceSchema getResourceSchema(final Configuration configuration) throws IOException {
         
         final ResourceSchema schema = new ResourceSchema();
         try {
-        	List<ColumnInfo> columns = null;
-        	if(SchemaType.QUERY.equals(phoenixConfiguration.getSchemaType())) {
-        		final String sqlQuery = phoenixConfiguration.getSelectStatement();
-        		Preconditions.checkNotNull(sqlQuery, "No Sql Query exists within the configuration");
-        		final SqlQueryToColumnInfoFunction function = new SqlQueryToColumnInfoFunction(phoenixConfiguration);
-        		columns = function.apply(sqlQuery);
-        	} else {
-        		columns = phoenixConfiguration.getSelectColumnMetadataList();
-        	}
+            List<ColumnInfo> columns = null;
+            final SchemaType schemaType = PhoenixConfigurationUtil.getSchemaType(configuration);
+            if(SchemaType.QUERY.equals(schemaType)) {
+                final String sqlQuery = PhoenixConfigurationUtil.getSelectStatement(configuration);
+                Preconditions.checkNotNull(sqlQuery, "No Sql Query exists within the configuration");
+                final SqlQueryToColumnInfoFunction function = new SqlQueryToColumnInfoFunction(configuration);
+                columns = function.apply(sqlQuery);
+            } else {
+                columns = PhoenixConfigurationUtil.getSelectColumnMetadataList(configuration);
+            }
             ResourceFieldSchema fields[] = new ResourceFieldSchema[columns.size()];
             int i = 0;
             for(ColumnInfo cinfo : columns) {
@@ -76,6 +78,5 @@ public final class PhoenixPigSchemaUtil {
         }
         
         return schema;
-        
     }
 }

http://git-wip-us.apache.org/repos/asf/phoenix/blob/b70f3abd/phoenix-pig/src/main/java/org/apache/phoenix/pig/util/QuerySchemaParserFunction.java
----------------------------------------------------------------------
diff --git a/phoenix-pig/src/main/java/org/apache/phoenix/pig/util/QuerySchemaParserFunction.java b/phoenix-pig/src/main/java/org/apache/phoenix/pig/util/QuerySchemaParserFunction.java
index 1b3a90a..f0148a6 100644
--- a/phoenix-pig/src/main/java/org/apache/phoenix/pig/util/QuerySchemaParserFunction.java
+++ b/phoenix-pig/src/main/java/org/apache/phoenix/pig/util/QuerySchemaParserFunction.java
@@ -26,16 +26,16 @@ import java.util.List;
 
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.hbase.util.Pair;
 import org.apache.phoenix.compile.ColumnProjector;
 import org.apache.phoenix.compile.QueryPlan;
 import org.apache.phoenix.jdbc.PhoenixStatement;
-import org.apache.phoenix.pig.PhoenixPigConfiguration;
+import org.apache.phoenix.mapreduce.util.ConnectionUtil;
 
 import com.google.common.base.Function;
 import com.google.common.base.Joiner;
 import com.google.common.base.Preconditions;
-import com.google.common.base.Throwables;
 import com.google.common.collect.Lists;
 
 /**
@@ -46,21 +46,20 @@ import com.google.common.collect.Lists;
 public class QuerySchemaParserFunction implements Function<String,Pair<String,String>> {
 
     private static final Log LOG = LogFactory.getLog(QuerySchemaParserFunction.class);
-    private PhoenixPigConfiguration phoenixConfiguration;
+    private final Configuration configuration;
     
-    public QuerySchemaParserFunction(PhoenixPigConfiguration phoenixConfiguration) {
-        Preconditions.checkNotNull(phoenixConfiguration);
-        this.phoenixConfiguration = phoenixConfiguration;
+    public QuerySchemaParserFunction(Configuration configuration) {
+        Preconditions.checkNotNull(configuration);
+        this.configuration = configuration;
     }
     
     @Override
     public Pair<String, String> apply(final String selectStatement) {
         Preconditions.checkNotNull(selectStatement);
         Preconditions.checkArgument(!selectStatement.isEmpty(), "Select Query is empty!!");
-        Preconditions.checkNotNull(this.phoenixConfiguration);
         Connection connection = null;
         try {
-            connection = this.phoenixConfiguration.getConnection();
+            connection = ConnectionUtil.getConnection(this.configuration);
             final Statement  statement = connection.createStatement();
             final PhoenixStatement pstmt = statement.unwrap(PhoenixStatement.class);
             final QueryPlan queryPlan = pstmt.compileQuery(selectStatement);
@@ -78,17 +77,17 @@ public class QuerySchemaParserFunction implements Function<String,Pair<String,St
             return new Pair<String, String>(tableName, columnsAsStr);
         } catch (SQLException e) {
             LOG.error(String.format(" Error [%s] parsing SELECT query [%s] ",e.getMessage(),selectStatement));
-            Throwables.propagate(e);
+            throw new RuntimeException(e);
         } finally {
             if(connection != null) {
                 try {
                     connection.close();
                 } catch(SQLException sqle) {
-                    Throwables.propagate(sqle);
+                    LOG.error(" Error closing connection ");
+                    throw new RuntimeException(sqle);
                 }
             }
         }
-        return null;
     }
     
     /**

http://git-wip-us.apache.org/repos/asf/phoenix/blob/b70f3abd/phoenix-pig/src/main/java/org/apache/phoenix/pig/util/SqlQueryToColumnInfoFunction.java
----------------------------------------------------------------------
diff --git a/phoenix-pig/src/main/java/org/apache/phoenix/pig/util/SqlQueryToColumnInfoFunction.java b/phoenix-pig/src/main/java/org/apache/phoenix/pig/util/SqlQueryToColumnInfoFunction.java
index 52f646c..3ed35bb 100644
--- a/phoenix-pig/src/main/java/org/apache/phoenix/pig/util/SqlQueryToColumnInfoFunction.java
+++ b/phoenix-pig/src/main/java/org/apache/phoenix/pig/util/SqlQueryToColumnInfoFunction.java
@@ -26,60 +26,59 @@ import java.util.List;
 
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
 import org.apache.phoenix.compile.ColumnProjector;
 import org.apache.phoenix.compile.QueryPlan;
 import org.apache.phoenix.jdbc.PhoenixStatement;
-import org.apache.phoenix.pig.PhoenixPigConfiguration;
+import org.apache.phoenix.mapreduce.util.ConnectionUtil;
 import org.apache.phoenix.util.ColumnInfo;
 
 import com.google.common.base.Function;
 import com.google.common.base.Preconditions;
-import com.google.common.base.Throwables;
 import com.google.common.collect.Lists;
 
 public final class SqlQueryToColumnInfoFunction implements Function<String,List<ColumnInfo>> {
-	
-	private static final Log LOG = LogFactory.getLog(SqlQueryToColumnInfoFunction.class);
-	private final PhoenixPigConfiguration phoenixConfiguration;
+    
+    private static final Log LOG = LogFactory.getLog(SqlQueryToColumnInfoFunction.class);
+    private final Configuration configuration;
 
-	public SqlQueryToColumnInfoFunction(
-			final PhoenixPigConfiguration phoenixPigConfiguration) {
-		super();
-		this.phoenixConfiguration = phoenixPigConfiguration;
-	}
+    public SqlQueryToColumnInfoFunction(final Configuration configuration) {
+        this.configuration = configuration;
+    }
 
-	@Override
-	public List<ColumnInfo> apply(String sqlQuery) {
-		Preconditions.checkNotNull(sqlQuery);
-		Connection connection = null;
-		List<ColumnInfo> columnInfos = null;
+    @Override
+    public List<ColumnInfo> apply(String sqlQuery) {
+        Preconditions.checkNotNull(sqlQuery);
+        Connection connection = null;
+        List<ColumnInfo> columnInfos = null;
         try {
-            connection = this.phoenixConfiguration.getConnection();
+            connection = ConnectionUtil.getConnection(this.configuration);
             final Statement  statement = connection.createStatement();
             final PhoenixStatement pstmt = statement.unwrap(PhoenixStatement.class);
             final QueryPlan queryPlan = pstmt.compileQuery(sqlQuery);
             final List<? extends ColumnProjector> projectedColumns = queryPlan.getProjector().getColumnProjectors();
             columnInfos = Lists.newArrayListWithCapacity(projectedColumns.size());
             columnInfos = Lists.transform(projectedColumns, new Function<ColumnProjector,ColumnInfo>() {
-            	@Override
-				public ColumnInfo apply(final ColumnProjector columnProjector) {
-					return new ColumnInfo(columnProjector.getName(), columnProjector.getExpression().getDataType().getSqlType());
-				}
-            	
+                @Override
+                public ColumnInfo apply(final ColumnProjector columnProjector) {
+                    return new ColumnInfo(columnProjector.getName(), columnProjector.getExpression().getDataType().getSqlType());
+                }
+                
             });
-	   } catch (SQLException e) {
+       } catch (SQLException e) {
             LOG.error(String.format(" Error [%s] parsing SELECT query [%s] ",e.getMessage(),sqlQuery));
-            Throwables.propagate(e);
+            throw new RuntimeException(e);
         } finally {
             if(connection != null) {
                 try {
                     connection.close();
                 } catch(SQLException sqle) {
-                    Throwables.propagate(sqle);
+                    LOG.error("Error closing connection!!");
+                    throw new RuntimeException(sqle);
                 }
             }
         }
-		return columnInfos;
-	}
+        return columnInfos;
+    }
 
 }

http://git-wip-us.apache.org/repos/asf/phoenix/blob/b70f3abd/phoenix-pig/src/main/java/org/apache/phoenix/pig/util/TypeUtil.java
----------------------------------------------------------------------
diff --git a/phoenix-pig/src/main/java/org/apache/phoenix/pig/util/TypeUtil.java b/phoenix-pig/src/main/java/org/apache/phoenix/pig/util/TypeUtil.java
index 1cdd66d..1da2d01 100644
--- a/phoenix-pig/src/main/java/org/apache/phoenix/pig/util/TypeUtil.java
+++ b/phoenix-pig/src/main/java/org/apache/phoenix/pig/util/TypeUtil.java
@@ -27,7 +27,7 @@ import java.util.List;
 
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
-import org.apache.phoenix.pig.hadoop.PhoenixRecord;
+import org.apache.phoenix.pig.writable.PhoenixPigDBWritable;
 import org.apache.phoenix.schema.PDataType;
 import org.apache.pig.PigException;
 import org.apache.pig.ResourceSchema.ResourceFieldSchema;
@@ -240,7 +240,7 @@ public final class TypeUtil {
      * @return
      * @throws IOException
      */
-    public static Tuple transformToTuple(final PhoenixRecord record, final ResourceFieldSchema[] projectedColumns) throws IOException {
+    public static Tuple transformToTuple(final PhoenixPigDBWritable record, final ResourceFieldSchema[] projectedColumns) throws IOException {
         
         List<Object> columnValues = record.getValues();
         if(columnValues == null || columnValues.size() == 0 || projectedColumns == null || projectedColumns.length != columnValues.size()) {

http://git-wip-us.apache.org/repos/asf/phoenix/blob/b70f3abd/phoenix-pig/src/main/java/org/apache/phoenix/pig/writable/PhoenixPigDBWritable.java
----------------------------------------------------------------------
diff --git a/phoenix-pig/src/main/java/org/apache/phoenix/pig/writable/PhoenixPigDBWritable.java b/phoenix-pig/src/main/java/org/apache/phoenix/pig/writable/PhoenixPigDBWritable.java
new file mode 100644
index 0000000..a7399c9
--- /dev/null
+++ b/phoenix-pig/src/main/java/org/apache/phoenix/pig/writable/PhoenixPigDBWritable.java
@@ -0,0 +1,121 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.phoenix.pig.writable;
+
+import java.sql.PreparedStatement;
+import java.sql.ResultSet;
+import java.sql.SQLException;
+import java.util.ArrayList;
+import java.util.List;
+
+import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.mapreduce.lib.db.DBWritable;
+import org.apache.phoenix.pig.util.TypeUtil;
+import org.apache.phoenix.schema.PDataType;
+import org.apache.phoenix.util.ColumnInfo;
+import org.apache.pig.ResourceSchema.ResourceFieldSchema;
+import org.apache.pig.data.DataType;
+
+import com.google.common.base.Preconditions;
+
+/**
+ * A {@link Writable} representing a Phoenix record. This class
+ * a) does a type mapping and sets the value accordingly in the {@link PreparedStatement}
+ * b) reads the column values from the {@link ResultSet}
+ * 
+ */
+public class PhoenixPigDBWritable implements DBWritable {
+    
+    private final List<Object> values;
+    private ResourceFieldSchema[] fieldSchemas;
+    private List<ColumnInfo> columnMetadataList;
+  
+    public PhoenixPigDBWritable() {
+        this.values = new ArrayList<Object>();
+    }
+    
+    @Override
+    public void write(PreparedStatement statement) throws SQLException {
+        for (int i = 0; i < columnMetadataList.size(); i++) {
+            Object o = values.get(i);
+            ColumnInfo columnInfo = columnMetadataList.get(i);
+            byte type = (fieldSchemas == null) ? DataType.findType(o) : fieldSchemas[i].getType();
+            try {
+                Object upsertValue = convertTypeSpecificValue(o, type, columnInfo.getSqlType());
+                if (upsertValue != null) {
+                    statement.setObject(i + 1, upsertValue, columnInfo.getSqlType());
+                } else {
+                    statement.setNull(i + 1, columnInfo.getSqlType());
+                }
+            } catch (RuntimeException re) {
+                throw new RuntimeException(String.format("Unable to process column %s, innerMessage=%s"
+                        ,columnInfo.toString(),re.getMessage()),re);
+                
+            }
+        }
+    }
+    
+    public void add(Object value) {
+        values.add(value);
+    }
+
+    private Object convertTypeSpecificValue(Object o, byte type, Integer sqlType) {
+        PDataType pDataType = PDataType.fromTypeId(sqlType);
+        return TypeUtil.castPigTypeToPhoenix(o, type, pDataType);
+    }
+
+    public List<Object> getValues() {
+        return values;
+    }
+
+    @Override
+    public void readFields(final ResultSet rs) throws SQLException {
+        Preconditions.checkNotNull(rs);
+        final int noOfColumns = rs.getMetaData().getColumnCount();
+        values.clear();
+        for(int i = 1 ; i <= noOfColumns ; i++) {
+            Object obj = rs.getObject(i);
+            values.add(obj);
+        }
+    }
+
+    public ResourceFieldSchema[] getFieldSchemas() {
+        return fieldSchemas;
+    }
+
+    public void setFieldSchemas(ResourceFieldSchema[] fieldSchemas) {
+        this.fieldSchemas = fieldSchemas;
+    }
+
+    public List<ColumnInfo> getColumnMetadataList() {
+        return columnMetadataList;
+    }
+
+    public void setColumnMetadataList(List<ColumnInfo> columnMetadataList) {
+        this.columnMetadataList = columnMetadataList;
+    }
+
+    public static PhoenixPigDBWritable newInstance(final ResourceFieldSchema[] fieldSchemas,
+            final List<ColumnInfo> columnMetadataList) {
+        final PhoenixPigDBWritable dbWritable = new PhoenixPigDBWritable ();
+        dbWritable.setFieldSchemas(fieldSchemas);
+        dbWritable.setColumnMetadataList(columnMetadataList);
+        return dbWritable;
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/phoenix/blob/b70f3abd/phoenix-pig/src/test/java/org/apache/phoenix/pig/PhoenixPigConfigurationTest.java
----------------------------------------------------------------------
diff --git a/phoenix-pig/src/test/java/org/apache/phoenix/pig/PhoenixPigConfigurationTest.java b/phoenix-pig/src/test/java/org/apache/phoenix/pig/PhoenixPigConfigurationTest.java
deleted file mode 100644
index 0337563..0000000
--- a/phoenix-pig/src/test/java/org/apache/phoenix/pig/PhoenixPigConfigurationTest.java
+++ /dev/null
@@ -1,49 +0,0 @@
-/*
- * Copyright 2010 The Apache Software Foundation
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- *distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you maynot use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicablelaw or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.phoenix.pig;
-
-import static org.junit.Assert.assertEquals;
-
-import java.sql.SQLException;
-
-import org.apache.hadoop.conf.Configuration;
-import org.junit.Test;
-
-
-/**
- * Tests for PhoenixPigConfiguration. 
- *
- */
-public class PhoenixPigConfigurationTest {
-
-  
-    @Test
-    public void testBasicConfiguration() throws SQLException {
-        Configuration conf = new Configuration();
-        final PhoenixPigConfiguration phoenixConfiguration = new PhoenixPigConfiguration(conf);
-        final String zkQuorum = "localhost";
-        final String tableName = "TABLE";
-        final long batchSize = 100;
-        phoenixConfiguration.configure(zkQuorum, tableName, batchSize);
-        assertEquals(zkQuorum,phoenixConfiguration.getServer());
-        assertEquals(tableName,phoenixConfiguration.getTableName());
-        assertEquals(batchSize,phoenixConfiguration.getBatchSize());
-     }
-}

http://git-wip-us.apache.org/repos/asf/phoenix/blob/b70f3abd/phoenix-pig/src/test/java/org/apache/phoenix/pig/util/ColumnInfoToStringEncoderDecoderTest.java
----------------------------------------------------------------------
diff --git a/phoenix-pig/src/test/java/org/apache/phoenix/pig/util/ColumnInfoToStringEncoderDecoderTest.java b/phoenix-pig/src/test/java/org/apache/phoenix/pig/util/ColumnInfoToStringEncoderDecoderTest.java
deleted file mode 100644
index 9777bb5..0000000
--- a/phoenix-pig/src/test/java/org/apache/phoenix/pig/util/ColumnInfoToStringEncoderDecoderTest.java
+++ /dev/null
@@ -1,61 +0,0 @@
-/*
- * Copyright 2010 The Apache Software Foundation
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- *distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you maynot use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicablelaw or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.phoenix.pig.util;
-
-import static org.junit.Assert.assertEquals;
-
-import java.util.List;
-
-import org.apache.phoenix.pig.util.ColumnInfoToStringEncoderDecoder;
-import org.apache.phoenix.schema.PDataType;
-import org.apache.phoenix.util.ColumnInfo;
-import org.junit.Test;
-
-import com.google.common.collect.Lists;
-
-/**
- * Tests methods on {@link ColumnInfoToStringEncoderDecoder}
- */
-public class ColumnInfoToStringEncoderDecoderTest {
-
-    @Test
-    public void testEncode() {
-        final ColumnInfo columnInfo = new ColumnInfo("col1", PDataType.VARCHAR.getSqlType());
-        final String encodedColumnInfo = ColumnInfoToStringEncoderDecoder.encode(Lists.newArrayList(columnInfo));
-        assertEquals(columnInfo.toString(),encodedColumnInfo);
-    }
-    
-    @Test
-    public void testDecode() {
-        final ColumnInfo columnInfo = new ColumnInfo("col1", PDataType.VARCHAR.getSqlType());
-        final String encodedColumnInfo = ColumnInfoToStringEncoderDecoder.encode(Lists.newArrayList(columnInfo));
-        assertEquals(columnInfo.toString(),encodedColumnInfo);
-    }
-    
-    @Test
-    public void testEncodeDecodeWithNulls() {
-        final ColumnInfo columnInfo1 = new ColumnInfo("col1", PDataType.VARCHAR.getSqlType());
-        final ColumnInfo columnInfo2 = null;
-        final String columnInfoStr = ColumnInfoToStringEncoderDecoder.encode(Lists.newArrayList(columnInfo1,columnInfo2));
-        final List<ColumnInfo> decodedColumnInfo = ColumnInfoToStringEncoderDecoder.decode(columnInfoStr);
-        assertEquals(1,decodedColumnInfo.size()); 
-    }
-
-}

http://git-wip-us.apache.org/repos/asf/phoenix/blob/b70f3abd/phoenix-pig/src/test/java/org/apache/phoenix/pig/util/PhoenixPigSchemaUtilTest.java
----------------------------------------------------------------------
diff --git a/phoenix-pig/src/test/java/org/apache/phoenix/pig/util/PhoenixPigSchemaUtilTest.java b/phoenix-pig/src/test/java/org/apache/phoenix/pig/util/PhoenixPigSchemaUtilTest.java
index 310128c..7a861b9 100644
--- a/phoenix-pig/src/test/java/org/apache/phoenix/pig/util/PhoenixPigSchemaUtilTest.java
+++ b/phoenix-pig/src/test/java/org/apache/phoenix/pig/util/PhoenixPigSchemaUtilTest.java
@@ -29,8 +29,10 @@ import java.sql.SQLException;
 import java.sql.Types;
 import java.util.List;
 
-import org.apache.phoenix.pig.PhoenixPigConfiguration;
-import org.apache.phoenix.pig.util.PhoenixPigSchemaUtil;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.phoenix.mapreduce.util.ColumnInfoToStringEncoderDecoder;
+import org.apache.phoenix.mapreduce.util.PhoenixConfigurationUtil;
+import org.apache.phoenix.mapreduce.util.PhoenixConfigurationUtil.SchemaType;
 import org.apache.phoenix.schema.IllegalDataException;
 import org.apache.phoenix.util.ColumnInfo;
 import org.apache.pig.ResourceSchema;
@@ -54,9 +56,11 @@ public class PhoenixPigSchemaUtilTest {
     @Test
     public void testSchema() throws SQLException, IOException {
         
-        final PhoenixPigConfiguration configuration = mock(PhoenixPigConfiguration.class);
+        final Configuration configuration = mock(Configuration.class);
         final List<ColumnInfo> columnInfos = ImmutableList.of(ID_COLUMN,NAME_COLUMN);
-        when(configuration.getSelectColumnMetadataList()).thenReturn(columnInfos);
+        final String encodedColumnInfos = ColumnInfoToStringEncoderDecoder.encode(columnInfos);
+        when(configuration.get(PhoenixConfigurationUtil.SELECT_COLUMN_INFO_KEY)).thenReturn(encodedColumnInfos);
+        when(configuration.get(PhoenixConfigurationUtil.SCHEMA_TYPE)).thenReturn(SchemaType.TABLE.name());
         final ResourceSchema actual = PhoenixPigSchemaUtil.getResourceSchema(configuration);
         
         // expected schema.
@@ -75,9 +79,10 @@ public class PhoenixPigSchemaUtilTest {
     @Test(expected=IllegalDataException.class)
     public void testUnSupportedTypes() throws SQLException, IOException {
         
-        final PhoenixPigConfiguration configuration = mock(PhoenixPigConfiguration.class);
+        final Configuration configuration = mock(Configuration.class);
         final List<ColumnInfo> columnInfos = ImmutableList.of(ID_COLUMN,LOCATION_COLUMN);
-        when(configuration.getSelectColumnMetadataList()).thenReturn(columnInfos);
+        final String encodedColumnInfos = ColumnInfoToStringEncoderDecoder.encode(columnInfos);
+        when(configuration.get(PhoenixConfigurationUtil.SELECT_COLUMN_INFO_KEY)).thenReturn(encodedColumnInfos);
         PhoenixPigSchemaUtil.getResourceSchema(configuration);
         fail("We currently don't support Array type yet. WIP!!");
     }