You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@phoenix.apache.org by ja...@apache.org on 2014/04/21 18:28:22 UTC

[1/3] PHOENIX-11 Pig Loader (RaviMagham)

Repository: incubator-phoenix
Updated Branches:
  refs/heads/4.0 b07665510 -> a9b8eb9b5


http://git-wip-us.apache.org/repos/asf/incubator-phoenix/blob/a9b8eb9b/phoenix-pig/src/test/java/org/apache/phoenix/pig/util/TableSchemaParserFunctionTest.java
----------------------------------------------------------------------
diff --git a/phoenix-pig/src/test/java/org/apache/phoenix/pig/util/TableSchemaParserFunctionTest.java b/phoenix-pig/src/test/java/org/apache/phoenix/pig/util/TableSchemaParserFunctionTest.java
new file mode 100644
index 0000000..3ac0520
--- /dev/null
+++ b/phoenix-pig/src/test/java/org/apache/phoenix/pig/util/TableSchemaParserFunctionTest.java
@@ -0,0 +1,56 @@
+/*
+ * Copyright 2010 The Apache Software Foundation
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ *distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you maynot use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicablelaw or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.phoenix.pig.util;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNull;
+
+import org.apache.hadoop.hbase.util.Pair;
+import org.apache.phoenix.pig.util.TableSchemaParserFunction;
+import org.junit.Test;
+
+import com.google.common.base.Joiner;
+
+public class TableSchemaParserFunctionTest {
+
+    final TableSchemaParserFunction function = new TableSchemaParserFunction();
+    
+    @Test
+    public void testTableSchema() {
+        final String loadTableSchema = "EMPLOYEE/col1,col2";
+        final Pair<String,String> pair = function.apply(loadTableSchema);
+        assertEquals("EMPLOYEE", pair.getFirst());
+        assertEquals(pair.getSecond(),Joiner.on(',').join("col1","col2"));
+    }
+    
+    @Test(expected=IllegalArgumentException.class)
+    public void testEmptyTableSchema() {
+        final String loadTableSchema = "";
+        function.apply(loadTableSchema);
+    }
+    
+    @Test
+    public void testTableOnlySchema() {
+        final String loadTableSchema = "EMPLOYEE";
+        final Pair<String,String> pair = function.apply(loadTableSchema);
+        assertEquals("EMPLOYEE", pair.getFirst());
+        assertNull(pair.getSecond());
+    }
+}


[3/3] git commit: PHOENIX-11 Pig Loader (RaviMagham)

Posted by ja...@apache.org.
PHOENIX-11 Pig Loader (RaviMagham)


Project: http://git-wip-us.apache.org/repos/asf/incubator-phoenix/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-phoenix/commit/a9b8eb9b
Tree: http://git-wip-us.apache.org/repos/asf/incubator-phoenix/tree/a9b8eb9b
Diff: http://git-wip-us.apache.org/repos/asf/incubator-phoenix/diff/a9b8eb9b

Branch: refs/heads/4.0
Commit: a9b8eb9b5129bcc8a274a18b6c18cd554f50c935
Parents: b076655
Author: James Taylor <ja...@apache.org>
Authored: Mon Apr 21 09:28:25 2014 -0700
Committer: James Taylor <ja...@apache.org>
Committed: Mon Apr 21 09:28:25 2014 -0700

----------------------------------------------------------------------
 .../phoenix/jdbc/PhoenixDatabaseMetaData.java   |   2 +-
 .../apache/phoenix/query/QueryConstants.java    |   3 +-
 .../org/apache/phoenix/util/ColumnInfo.java     |  13 +
 .../org/apache/phoenix/util/PhoenixRuntime.java | 113 ++++-
 .../java/org/apache/phoenix/util/QueryUtil.java |  42 +-
 .../org/apache/phoenix/util/SchemaUtil.java     |  30 +-
 .../apache/phoenix/schema/SchemaUtilTest.java   |  10 +-
 .../org/apache/phoenix/util/QueryUtilTest.java  |  16 +-
 phoenix-pig/pom.xml                             | 155 +++++++
 .../phoenix/pig/PhoenixHBaseLoaderIT.java       | 440 +++++++++++++++++++
 .../phoenix/pig/PhoenixHBaseStorerIT.java       | 195 ++++++++
 .../phoenix/pig/PhoenixPigConfigurationIT.java  | 107 +++++
 .../apache/phoenix/pig/PhoenixHBaseLoader.java  | 253 +++++++++++
 .../apache/phoenix/pig/PhoenixHBaseStorage.java |  53 ++-
 .../phoenix/pig/PhoenixPigConfiguration.java    | 270 +++++++++---
 .../java/org/apache/phoenix/pig/TypeUtil.java   | 189 --------
 .../phoenix/pig/hadoop/PhoenixInputFormat.java  | 167 +++++++
 .../phoenix/pig/hadoop/PhoenixInputSplit.java   | 104 +++++
 .../phoenix/pig/hadoop/PhoenixRecord.java       |  53 ++-
 .../phoenix/pig/hadoop/PhoenixRecordReader.java | 133 ++++++
 .../util/ColumnInfoToStringEncoderDecoder.java  |  69 +++
 .../phoenix/pig/util/PhoenixPigSchemaUtil.java  |  70 +++
 .../pig/util/QuerySchemaParserFunction.java     | 121 +++++
 .../pig/util/TableSchemaParserFunction.java     |  54 +++
 .../org/apache/phoenix/pig/util/TypeUtil.java   | 312 +++++++++++++
 .../phoenix/pig/PhoenixHBaseStorageTest.java    | 136 ------
 .../pig/PhoenixPigConfigurationTest.java        |  49 +++
 .../ColumnInfoToStringEncoderDecoderTest.java   |  61 +++
 .../pig/util/PhoenixPigSchemaUtilTest.java      |  84 ++++
 .../pig/util/QuerySchemaParserFunctionTest.java | 109 +++++
 .../pig/util/TableSchemaParserFunctionTest.java |  56 +++
 31 files changed, 3033 insertions(+), 436 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-phoenix/blob/a9b8eb9b/phoenix-core/src/main/java/org/apache/phoenix/jdbc/PhoenixDatabaseMetaData.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/jdbc/PhoenixDatabaseMetaData.java b/phoenix-core/src/main/java/org/apache/phoenix/jdbc/PhoenixDatabaseMetaData.java
index 7b2f6da..5f7e017 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/jdbc/PhoenixDatabaseMetaData.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/jdbc/PhoenixDatabaseMetaData.java
@@ -488,7 +488,7 @@ public class PhoenixDatabaseMetaData implements DatabaseMetaData, org.apache.pho
 
     @Override
     public String getIdentifierQuoteString() throws SQLException {
-        return "\"";
+        return SchemaUtil.ESCAPE_CHARACTER;
     }
 
     @Override

http://git-wip-us.apache.org/repos/asf/incubator-phoenix/blob/a9b8eb9b/phoenix-core/src/main/java/org/apache/phoenix/query/QueryConstants.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/query/QueryConstants.java b/phoenix-core/src/main/java/org/apache/phoenix/query/QueryConstants.java
index 06bccce..7a5d4f6 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/query/QueryConstants.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/query/QueryConstants.java
@@ -38,6 +38,7 @@ import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.INCREMENT_BY;
 import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.INDEX_STATE;
 import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.IS_AUTOINCREMENT;
 import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.IS_NULLABLE;
+import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.IS_VIEW_REFERENCED;
 import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.KEY_SEQ;
 import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.LINK_TYPE;
 import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.MULTI_TENANT;
@@ -70,7 +71,6 @@ import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.TYPE_NAME;
 import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.TYPE_SEQUENCE;
 import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.VIEW_CONSTANT;
 import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.VIEW_INDEX_ID;
-import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.IS_VIEW_REFERENCED;
 import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.VIEW_STATEMENT;
 import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.VIEW_TYPE;
 
@@ -96,6 +96,7 @@ import org.apache.phoenix.util.ByteUtil;
  */
 public interface QueryConstants {
     public static final String NAME_SEPARATOR = ".";
+    public static final String NAME_SEPARATOR_REGEX = "\\" + NAME_SEPARATOR;
     public final static byte[] NAME_SEPARATOR_BYTES = Bytes.toBytes(NAME_SEPARATOR);
     public static final byte NAME_SEPARATOR_BYTE = NAME_SEPARATOR_BYTES[0];
     public static final String NULL_SCHEMA_NAME = "";

http://git-wip-us.apache.org/repos/asf/incubator-phoenix/blob/a9b8eb9b/phoenix-core/src/main/java/org/apache/phoenix/util/ColumnInfo.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/util/ColumnInfo.java b/phoenix-core/src/main/java/org/apache/phoenix/util/ColumnInfo.java
index b02db20..79eaeb0 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/util/ColumnInfo.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/util/ColumnInfo.java
@@ -12,6 +12,7 @@ package org.apache.phoenix.util;
 
 import java.util.List;
 
+import org.apache.phoenix.query.QueryConstants;
 import org.apache.phoenix.schema.PDataType;
 
 import com.google.common.base.Preconditions;
@@ -47,6 +48,18 @@ public class ColumnInfo {
     public PDataType getPDataType() {
         return PDataType.fromTypeId(sqlType);
     }
+    
+    /**
+     * Returns the column name without the associated Column Family. 
+     * @return
+     */
+    public String getDisplayName() {
+        int index = columnName.indexOf(QueryConstants.NAME_SEPARATOR);
+        if (index < 0) {
+            return columnName; 
+        }
+        return columnName.substring(index+1);
+    }
 
     @Override
     public String toString() {

http://git-wip-us.apache.org/repos/asf/incubator-phoenix/blob/a9b8eb9b/phoenix-core/src/main/java/org/apache/phoenix/util/PhoenixRuntime.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/util/PhoenixRuntime.java b/phoenix-core/src/main/java/org/apache/phoenix/util/PhoenixRuntime.java
index 27c0c2a..1ed353b 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/util/PhoenixRuntime.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/util/PhoenixRuntime.java
@@ -29,7 +29,9 @@ import java.util.Collections;
 import java.util.Iterator;
 import java.util.List;
 import java.util.Properties;
+import java.util.Set;
 import java.util.StringTokenizer;
+import java.util.TreeSet;
 
 import org.apache.hadoop.hbase.Cell;
 import org.apache.hadoop.hbase.KeyValue;
@@ -40,8 +42,11 @@ import org.apache.phoenix.coprocessor.MetaDataProtocol.MetaDataMutationResult;
 import org.apache.phoenix.coprocessor.MetaDataProtocol.MutationCode;
 import org.apache.phoenix.jdbc.PhoenixConnection;
 import org.apache.phoenix.query.QueryConstants;
+import org.apache.phoenix.schema.AmbiguousColumnException;
+import org.apache.phoenix.schema.ColumnNotFoundException;
 import org.apache.phoenix.schema.MetaDataClient;
 import org.apache.phoenix.schema.PColumn;
+import org.apache.phoenix.schema.PColumnFamily;
 import org.apache.phoenix.schema.PDataType;
 import org.apache.phoenix.schema.PTable;
 import org.apache.phoenix.schema.PTableKey;
@@ -325,7 +330,7 @@ public class PhoenixRuntime {
         };
     }
     
-    private static PTable getTable(Connection conn, String name) throws SQLException {
+    public static PTable getTable(Connection conn, String name) throws SQLException {
         PTable table = null;
         PhoenixConnection pconn = conn.unwrap(PhoenixConnection.class);
         try {
@@ -343,6 +348,112 @@ public class PhoenixRuntime {
     }
     
     /**
+     * Get list of ColumnInfos that contain Column Name and its associated
+     * PDataType for an import. The supplied list of columns can be null -- if it is non-null,
+     * it represents a user-supplied list of columns to be imported.
+     *
+     * @param conn Phoenix connection from which metadata will be read
+     * @param tableName Phoenix table name whose columns are to be checked. Can include a schema
+     *                  name
+     * @param columns user-supplied list of import columns, can be null
+     */
+    public static List<ColumnInfo> generateColumnInfo(Connection conn,
+            String tableName, List<String> columns)
+            throws SQLException {
+
+        PTable table = PhoenixRuntime.getTable(conn, tableName);
+        List<ColumnInfo> columnInfoList = Lists.newArrayList();
+        Set<String> unresolvedColumnNames = new TreeSet<String>();
+        if (columns == null) {
+            // use all columns in the table
+            for(PColumn pColumn : table.getColumns()) {
+               int sqlType = pColumn.getDataType().getResultSetSqlType();        
+               columnInfoList.add(new ColumnInfo(pColumn.toString(), sqlType));
+            }
+        } else {
+            // Leave "null" as indication to skip b/c it doesn't exist
+            for (int i = 0; i < columns.size(); i++) {
+                String columnName = columns.get(i);
+                try {
+                    ColumnInfo columnInfo = PhoenixRuntime.getColumnInfo(table, columnName);
+                    columnInfoList.add(columnInfo);
+                } catch (ColumnNotFoundException cnfe) {
+                    unresolvedColumnNames.add(columnName.trim());
+                } catch (AmbiguousColumnException ace) {
+                    unresolvedColumnNames.add(columnName.trim());
+                }
+            }
+        }
+        // if there exists columns that cannot be resolved, error out.
+        if (unresolvedColumnNames.size()>0) {
+                StringBuilder exceptionMessage = new StringBuilder();
+                boolean first = true;
+                exceptionMessage.append("Unable to resolve these column names:\n");
+                for (String col : unresolvedColumnNames) {
+                    if (first) first = false;
+                    else exceptionMessage.append(",");
+                    exceptionMessage.append(col);
+                }
+                exceptionMessage.append("\nAvailable columns with column families:\n");
+                first = true;
+                for (PColumn pColumn : table.getColumns()) {
+                    if (first) first = false;
+                    else exceptionMessage.append(",");
+                    exceptionMessage.append(pColumn.toString());
+                }
+                throw new SQLException(exceptionMessage.toString()); 
+      }
+       return columnInfoList;
+    }
+
+    /**
+     * Returns the column info for the given column for the given table.
+     * 
+     * @param table
+     * @param columnName User-specified column name. May be family-qualified or bare.
+     * @return columnInfo associated with the column in the table
+     * @throws SQLException if parameters are null or if column is not found or if column is ambiguous.
+     */
+    public static ColumnInfo getColumnInfo(PTable table, String columnName) throws SQLException {
+        if (table==null) {
+            throw new SQLException("Table must not be null.");
+        }
+        if (columnName==null) {
+            throw new SQLException("columnName must not be null.");
+        }
+        columnName = columnName.trim().toUpperCase(); 
+        PColumn pColumn = null;
+        if (columnName.contains(QueryConstants.NAME_SEPARATOR)) {
+            String[] tokens = columnName.split(QueryConstants.NAME_SEPARATOR_REGEX);
+            if (tokens.length!=2) {
+                throw new SQLException(String.format("Unable to process column %s, expected family-qualified name.",columnName));
+            }
+            String familyName = tokens[0];
+            String familyColumn = tokens[1];
+            PColumnFamily family = table.getColumnFamily(familyName);
+            pColumn = family.getColumn(familyColumn);
+        } else {
+            pColumn = table.getColumn(columnName);
+        }
+        return getColumnInfo(pColumn);
+    }
+    
+    /**
+     * Constructs a column info for the supplied pColumn
+     * @param pColumn
+     * @return columnInfo
+     * @throws SQLException if the parameter is null.
+     */
+    public static ColumnInfo getColumnInfo(PColumn pColumn) throws SQLException {
+        if (pColumn==null) {
+            throw new SQLException("pColumn must not be null.");
+        }
+        int sqlType = pColumn.getDataType().getResultSetSqlType();
+        ColumnInfo columnInfo = new ColumnInfo(pColumn.toString(),sqlType);
+        return columnInfo;
+    }
+    
+    /**
      * Encode the primary key values from the table as a byte array. The values must
      * be in the same order as the primary key constraint. If the connection and
      * table are both tenant-specific, the tenant ID column must not be present in

http://git-wip-us.apache.org/repos/asf/incubator-phoenix/blob/a9b8eb9b/phoenix-core/src/main/java/org/apache/phoenix/util/QueryUtil.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/util/QueryUtil.java b/phoenix-core/src/main/java/org/apache/phoenix/util/QueryUtil.java
index 384d845..f25e409 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/util/QueryUtil.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/util/QueryUtil.java
@@ -18,6 +18,8 @@
 
 package org.apache.phoenix.util;
 
+import static org.apache.phoenix.util.SchemaUtil.getEscapedFullColumnName;
+
 import java.sql.DatabaseMetaData;
 import java.sql.ResultSet;
 import java.sql.SQLException;
@@ -27,10 +29,11 @@ import javax.annotation.Nullable;
 
 import com.google.common.base.Function;
 import com.google.common.base.Joiner;
+import com.google.common.base.Preconditions;
 import com.google.common.collect.Iterables;
 import com.google.common.collect.Lists;
 
-public class QueryUtil {
+public final class QueryUtil {
 
     /**
      *  Column family name index within ResultSet resulting from {@link DatabaseMetaData#getColumns(String, String, String, String)}
@@ -54,6 +57,12 @@ public class QueryUtil {
     public static final int DATA_TYPE_NAME_POSITION = 6;
 
     /**
+     * Private constructor
+     */
+    private QueryUtil() {
+    }
+
+    /**
      * Generate an upsert statement based on a list of {@code ColumnInfo}s with parameter markers. The list of
      * {@code ColumnInfo}s must contain at least one element.
      *
@@ -81,7 +90,7 @@ public class QueryUtil {
                                     @Nullable
                                     @Override
                                     public String apply(@Nullable ColumnInfo columnInfo) {
-                                        return columnInfo.getColumnName();
+                                        return getEscapedFullColumnName(columnInfo.getColumnName());
                                     }
                                 })),
                 Joiner.on(", ").join(parameterList));
@@ -110,6 +119,35 @@ public class QueryUtil {
         }
         return String.format("UPSERT INTO %s VALUES (%s)", tableName, Joiner.on(", ").join(parameterList));
     }
+    
+    /**
+     * 
+     * @param fullTableName name of the table for which the select statement needs to be created.
+     * @param columnInfos  list of columns to be projected in the select statement.
+     * @return Select Query 
+     */
+    public static String constructSelectStatement(String fullTableName, List<ColumnInfo> columnInfos) {
+        Preconditions.checkNotNull(fullTableName,"Table name cannot be null");
+        if(columnInfos == null || columnInfos.isEmpty()) {
+             throw new IllegalArgumentException("At least one column must be provided");
+        }
+        // escape the table name to ensure it is case sensitive.
+        final String escapedFullTableName = SchemaUtil.getEscapedFullTableName(fullTableName);
+        StringBuilder sb = new StringBuilder();
+        sb.append("SELECT ");
+        for (ColumnInfo cinfo : columnInfos) {
+            if (cinfo != null) {
+                String fullColumnName = getEscapedFullColumnName(cinfo.getColumnName());
+                sb.append(fullColumnName);
+                sb.append(",");
+             }
+         }
+        // Remove the trailing comma
+        sb.setLength(sb.length() - 1);
+        sb.append(" FROM ");
+        sb.append(escapedFullTableName);
+        return sb.toString();
+    }
 
     public static String getUrl(String server) {
         return PhoenixRuntime.JDBC_PROTOCOL + PhoenixRuntime.JDBC_PROTOCOL_SEPARATOR + server;

http://git-wip-us.apache.org/repos/asf/incubator-phoenix/blob/a9b8eb9b/phoenix-core/src/main/java/org/apache/phoenix/util/SchemaUtil.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/util/SchemaUtil.java b/phoenix-core/src/main/java/org/apache/phoenix/util/SchemaUtil.java
index 2fd1fed..ea7683f 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/util/SchemaUtil.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/util/SchemaUtil.java
@@ -52,6 +52,8 @@ import org.apache.phoenix.schema.SaltingUtil;
 import org.apache.phoenix.schema.SortOrder;
 import org.apache.phoenix.schema.ValueSchema.Field;
 
+import com.google.common.base.Preconditions;
+
 /**
  * 
  * Static class for various schema-related utilities
@@ -61,7 +63,7 @@ import org.apache.phoenix.schema.ValueSchema.Field;
  */
 public class SchemaUtil {
     private static final int VAR_LENGTH_ESTIMATE = 10;
-    
+    public static final String ESCAPE_CHARACTER = "\"";
     public static final DataBlockEncoding DEFAULT_DATA_BLOCK_ENCODING = DataBlockEncoding.FAST_DIFF;
     public static final PDatum VAR_BINARY_DATUM = new PDatum() {
     
@@ -553,4 +555,30 @@ public class SchemaUtil {
         // TODO: when PColumn has getPKPosition, use that instead
         return table.getPKColumns().indexOf(column);
     }
+    
+    public static String getEscapedFullColumnName(String fullColumnName) {
+        int index = fullColumnName.indexOf(QueryConstants.NAME_SEPARATOR);
+        if (index < 0) {
+            return getEscapedArgument(fullColumnName); 
+        }
+        String columnFamily = fullColumnName.substring(0,index);
+        String columnName = fullColumnName.substring(index+1);
+        return getEscapedArgument(columnFamily) + QueryConstants.NAME_SEPARATOR + getEscapedArgument(columnName) ;
+    }
+    
+    public static String getEscapedFullTableName(String fullTableName) {
+        final String schemaName = getSchemaNameFromFullName(fullTableName);
+        final String tableName = getTableNameFromFullName(fullTableName);
+        return getEscapedTableName(schemaName, tableName);
+    }
+    
+    /**
+     * Escapes the given argument with {@value #ESCAPE_CHARACTER}
+     * @param argument any non null value.
+     * @return 
+     */
+    public static String getEscapedArgument(String argument) {
+        Preconditions.checkNotNull(argument,"Argument passed cannot be null");
+        return ESCAPE_CHARACTER + argument + ESCAPE_CHARACTER;
+    }
 }

http://git-wip-us.apache.org/repos/asf/incubator-phoenix/blob/a9b8eb9b/phoenix-core/src/test/java/org/apache/phoenix/schema/SchemaUtilTest.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/test/java/org/apache/phoenix/schema/SchemaUtilTest.java b/phoenix-core/src/test/java/org/apache/phoenix/schema/SchemaUtilTest.java
index 36c6a8d..0d465c6 100644
--- a/phoenix-core/src/test/java/org/apache/phoenix/schema/SchemaUtilTest.java
+++ b/phoenix-core/src/test/java/org/apache/phoenix/schema/SchemaUtilTest.java
@@ -19,10 +19,9 @@ package org.apache.phoenix.schema;
 
 import static org.junit.Assert.assertEquals;
 
-import org.junit.Test;
-
 import org.apache.phoenix.exception.SQLExceptionCode;
 import org.apache.phoenix.util.SchemaUtil;
+import org.junit.Test;
 
 public class SchemaUtilTest {
 
@@ -54,4 +53,11 @@ public class SchemaUtilTest {
         columnDisplayName = SchemaUtil.getMetaDataEntityName(null, null, null, "columnName");
         assertEquals(columnDisplayName, "columnName");
     }
+    
+    @Test
+    public void testEscapingColumnName() {
+        assertEquals("\"ID\"", SchemaUtil.getEscapedFullColumnName("ID"));
+        assertEquals("\"0\".\"NAME\"", SchemaUtil.getEscapedFullColumnName("0.NAME"));
+        assertEquals("\"CF1\".\"LOCATION\"", SchemaUtil.getEscapedFullColumnName("CF1.LOCATION"));
+    }
 }

http://git-wip-us.apache.org/repos/asf/incubator-phoenix/blob/a9b8eb9b/phoenix-core/src/test/java/org/apache/phoenix/util/QueryUtilTest.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/test/java/org/apache/phoenix/util/QueryUtilTest.java b/phoenix-core/src/test/java/org/apache/phoenix/util/QueryUtilTest.java
index 39bde72..0ac2bbc 100644
--- a/phoenix-core/src/test/java/org/apache/phoenix/util/QueryUtilTest.java
+++ b/phoenix-core/src/test/java/org/apache/phoenix/util/QueryUtilTest.java
@@ -17,12 +17,13 @@
  */
 package org.apache.phoenix.util;
 
-import com.google.common.collect.ImmutableList;
-import org.junit.Test;
+import static org.junit.Assert.assertEquals;
 
 import java.sql.Types;
 
-import static org.junit.Assert.assertEquals;
+import org.junit.Test;
+
+import com.google.common.collect.ImmutableList;
 
 public class QueryUtilTest {
 
@@ -32,7 +33,7 @@ public class QueryUtilTest {
     @Test
     public void testConstructUpsertStatement_ColumnInfos() {
         assertEquals(
-                "UPSERT INTO MYTAB (ID, NAME) VALUES (?, ?)",
+                "UPSERT INTO MYTAB (\"ID\", \"NAME\") VALUES (?, ?)",
                 QueryUtil.constructUpsertStatement("MYTAB", ImmutableList.of(ID_COLUMN, NAME_COLUMN)));
 
     }
@@ -53,4 +54,11 @@ public class QueryUtilTest {
     public void testConstructGenericUpsertStatement_NoColumns() {
         QueryUtil.constructGenericUpsertStatement("MYTAB", 0);
     }
+    
+    @Test
+    public void testConstructSelectStatement() {
+        assertEquals(
+                "SELECT \"ID\",\"NAME\" FROM \"MYTAB\"",
+                QueryUtil.constructSelectStatement("MYTAB", ImmutableList.of(ID_COLUMN,NAME_COLUMN)));
+    }
 }

http://git-wip-us.apache.org/repos/asf/incubator-phoenix/blob/a9b8eb9b/phoenix-pig/pom.xml
----------------------------------------------------------------------
diff --git a/phoenix-pig/pom.xml b/phoenix-pig/pom.xml
index edaa1f0..aeb6408 100644
--- a/phoenix-pig/pom.xml
+++ b/phoenix-pig/pom.xml
@@ -37,8 +37,163 @@
       <artifactId>phoenix-core</artifactId>
     </dependency>
     <dependency>
+      <groupId>org.apache.phoenix</groupId>
+      <artifactId>phoenix-core</artifactId>
+      <classifier>tests</classifier>
+      <scope>test</scope>
+    </dependency>
+    <dependency>
       <groupId>org.apache.pig</groupId>
       <artifactId>pig</artifactId>
     </dependency>
+    <dependency>
+      <groupId>org.mockito</groupId>
+      <artifactId>mockito-all</artifactId>
+    </dependency>
+     <!-- Test Dependencies -->
+    <dependency>
+      <groupId>junit</groupId>
+      <artifactId>junit</artifactId>
+    </dependency>
   </dependencies>
+  
+  <profiles>
+    <!-- Profile for building against Hadoop 1. Active by default. Not used if another 
+      Hadoop profile is specified with mvn -Dhadoop.profile=foo -->
+    <profile>
+      <id>hadoop-1</id>
+      <activation>
+        <property>
+          <name>!hadoop.profile</name>
+        </property>
+      </activation>
+      <dependencies>
+        <dependency>
+          <groupId>org.apache.hbase</groupId>
+          <artifactId>hbase-testing-util</artifactId>
+          <version>${hbase-hadoop1.version}</version>
+        </dependency>
+        <dependency>
+          <groupId>org.apache.hbase</groupId>
+          <artifactId>hbase-it</artifactId>
+          <version>${hbase-hadoop1.version}</version>
+          <type>test-jar</type>
+          <scope>test</scope>
+        </dependency>
+        <dependency>
+          <groupId>org.apache.hbase</groupId>
+          <artifactId>hbase-common</artifactId>
+          <version>${hbase-hadoop1.version}</version>
+        </dependency>
+        <dependency>
+          <groupId>org.apache.hbase</groupId>
+          <artifactId>hbase-protocol</artifactId>
+          <version>${hbase-hadoop1.version}</version>
+        </dependency>
+        <dependency>
+          <groupId>org.apache.hbase</groupId>
+          <artifactId>hbase-client</artifactId>
+          <version>${hbase-hadoop1.version}</version>
+        </dependency>
+        <dependency>
+          <groupId>org.apache.hadoop</groupId>
+          <artifactId>hadoop-core</artifactId>
+        </dependency>
+        <dependency>
+          <groupId>org.apache.hadoop</groupId>
+          <artifactId>hadoop-test</artifactId>
+        </dependency>
+      </dependencies>
+    </profile>
+
+    <!-- Profile for building against Hadoop 2. Activate using: mvn -Dhadoop.profile=2 -->
+    <profile>
+      <id>hadoop-2</id>
+      <activation>
+        <property>
+          <name>hadoop.profile</name>
+          <value>2</value>
+        </property>
+      </activation>
+      <dependencies>
+        <dependency>
+          <groupId>org.apache.hbase</groupId>
+          <artifactId>hbase-testing-util</artifactId>
+          <version>${hbase-hadoop2.version}</version>
+        </dependency>
+        <dependency>
+          <groupId>org.apache.hbase</groupId>
+          <artifactId>hbase-it</artifactId>
+          <version>${hbase-hadoop2.version}</version>
+          <type>test-jar</type>
+          <scope>test</scope>
+        </dependency>        
+        <dependency>
+          <groupId>org.apache.hbase</groupId>
+          <artifactId>hbase-common</artifactId>
+          <version>${hbase-hadoop2.version}</version>
+        </dependency>
+        <dependency>
+          <groupId>org.apache.hbase</groupId>
+          <artifactId>hbase-protocol</artifactId>
+          <version>${hbase-hadoop2.version}</version>
+        </dependency>
+        <dependency>
+          <groupId>org.apache.hbase</groupId>
+          <artifactId>hbase-client</artifactId>
+          <version>${hbase-hadoop2.version}</version>
+        </dependency>
+        <dependency>
+          <groupId>org.apache.hbase</groupId>
+          <artifactId>hbase-hadoop-compat</artifactId>
+          <version>${hbase-hadoop2.version}</version>
+          <scope>test</scope>
+        </dependency>
+        <dependency>
+          <groupId>org.apache.hbase</groupId>
+          <artifactId>hbase-hadoop-compat</artifactId>
+          <version>${hbase-hadoop2.version}</version>
+          <type>test-jar</type>
+          <scope>test</scope>
+        </dependency>
+        <dependency>
+          <groupId>org.apache.hbase</groupId>
+          <artifactId>hbase-hadoop2-compat</artifactId>
+          <version>${hbase-hadoop2.version}</version>
+          <scope>test</scope>
+        </dependency>
+        <dependency>
+          <groupId>org.apache.hbase</groupId>
+          <artifactId>hbase-hadoop2-compat</artifactId>
+          <version>${hbase-hadoop2.version}</version>
+          <type>test-jar</type>
+          <scope>test</scope>
+        </dependency>        
+        <dependency>
+          <groupId>org.apache.hadoop</groupId>
+          <artifactId>hadoop-common</artifactId>
+        </dependency>
+        <dependency>
+          <groupId>org.apache.hadoop</groupId>
+          <artifactId>hadoop-annotations</artifactId>
+        </dependency>
+        <dependency>
+          <groupId>org.apache.hadoop</groupId>
+          <artifactId>hadoop-mapreduce-client-core</artifactId>
+        </dependency>
+        <dependency>
+          <groupId>org.apache.hadoop</groupId>
+          <artifactId>hadoop-minicluster</artifactId>
+        </dependency>
+      </dependencies>
+      <build>
+        <plugins>
+          <plugin>
+            <artifactId>maven-dependency-plugin</artifactId>
+            <version>${maven-dependency-plugin.version}</version>
+          </plugin>
+        </plugins>
+      </build>
+    </profile>
+  </profiles>
 </project>

http://git-wip-us.apache.org/repos/asf/incubator-phoenix/blob/a9b8eb9b/phoenix-pig/src/it/java/org/apache/phoenix/pig/PhoenixHBaseLoaderIT.java
----------------------------------------------------------------------
diff --git a/phoenix-pig/src/it/java/org/apache/phoenix/pig/PhoenixHBaseLoaderIT.java b/phoenix-pig/src/it/java/org/apache/phoenix/pig/PhoenixHBaseLoaderIT.java
new file mode 100644
index 0000000..90f1c79
--- /dev/null
+++ b/phoenix-pig/src/it/java/org/apache/phoenix/pig/PhoenixHBaseLoaderIT.java
@@ -0,0 +1,440 @@
+/*
+ * Copyright 2010 The Apache Software Foundation
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ *distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you maynot use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicablelaw or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.phoenix.pig;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+
+import java.sql.Connection;
+import java.sql.DriverManager;
+import java.sql.PreparedStatement;
+import java.sql.ResultSet;
+import java.sql.SQLException;
+import java.util.ArrayList;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Properties;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.HBaseTestingUtility;
+import org.apache.phoenix.jdbc.PhoenixDriver;
+import org.apache.phoenix.query.QueryServices;
+import org.apache.phoenix.util.PhoenixRuntime;
+import org.apache.phoenix.util.SchemaUtil;
+import org.apache.phoenix.util.TestUtil;
+import org.apache.pig.ExecType;
+import org.apache.pig.PigServer;
+import org.apache.pig.backend.hadoop.datastorage.ConfigurationUtil;
+import org.apache.pig.builtin.mock.Storage;
+import org.apache.pig.builtin.mock.Storage.Data;
+import org.apache.pig.data.DataType;
+import org.apache.pig.data.Tuple;
+import org.apache.pig.impl.logicalLayer.schema.Schema;
+import org.apache.pig.impl.logicalLayer.schema.Schema.FieldSchema;
+import org.junit.After;
+import org.junit.AfterClass;
+import org.junit.Before;
+import org.junit.BeforeClass;
+import org.junit.Test;
+
+import com.google.common.base.Preconditions;
+
+/**
+ * 
+ * Test class to run all the integration tests against a virtual map reduce cluster.
+ */
+public class PhoenixHBaseLoaderIT {
+    
+    private static final Log LOG = LogFactory.getLog(PhoenixHBaseLoaderIT.class);
+    private static final String SCHEMA_NAME = "T";
+    private static final String TABLE_NAME = "A";
+    private static final String TABLE_FULL_NAME = SchemaUtil.getTableName(SCHEMA_NAME, TABLE_NAME);
+    private static HBaseTestingUtility hbaseTestUtil;
+    private static String zkQuorum;
+    private static Connection conn;
+    private static PigServer pigServer;
+    private static Configuration conf;
+
+    @BeforeClass
+    public static void setUp() throws Exception {
+        hbaseTestUtil = new HBaseTestingUtility();
+        hbaseTestUtil.getConfiguration().set(QueryServices.DROP_METADATA_ATTRIB, Boolean.toString(true));
+        hbaseTestUtil.startMiniCluster();
+
+        Class.forName(PhoenixDriver.class.getName());
+        zkQuorum = "localhost:" + hbaseTestUtil.getZkCluster().getClientPort();
+        Properties props = TestUtil.TEST_PROPERTIES;
+        props.put(QueryServices.DROP_METADATA_ATTRIB, Boolean.toString(true));
+        conn = DriverManager.getConnection(PhoenixRuntime.JDBC_PROTOCOL +
+                 PhoenixRuntime.JDBC_PROTOCOL_SEPARATOR + zkQuorum,props);
+
+        conf = hbaseTestUtil.getConfiguration();
+     }
+    
+    @Before
+    public void beforeTest() throws Exception {
+        pigServer = new PigServer(ExecType.LOCAL,
+                ConfigurationUtil.toProperties(conf));
+    }
+
+    /**
+     * Validates the schema returned for a table with Pig data types.
+     * @throws Exception
+     */
+    @Test
+    public void testSchemaForTable() throws Exception {
+        final String ddl = String.format("CREATE TABLE %s "
+                + "  (a_string varchar not null, a_binary varbinary not null, a_integer integer, cf1.a_float float"
+                + "  CONSTRAINT pk PRIMARY KEY (a_string, a_binary))\n", TABLE_FULL_NAME);
+        conn.createStatement().execute(ddl);
+
+        pigServer.registerQuery(String.format(
+                "A = load 'hbase://table/%s' using org.apache.phoenix.pig.PhoenixHBaseLoader('%s');", TABLE_FULL_NAME,
+                zkQuorum));
+        
+        final Schema schema = pigServer.dumpSchema("A");
+        List<FieldSchema> fields = schema.getFields();
+        assertEquals(4, fields.size());
+        assertTrue(fields.get(0).alias.equalsIgnoreCase("a_string"));
+        assertTrue(fields.get(0).type == DataType.CHARARRAY);
+        assertTrue(fields.get(1).alias.equalsIgnoreCase("a_binary"));
+        assertTrue(fields.get(1).type == DataType.BYTEARRAY);
+        assertTrue(fields.get(2).alias.equalsIgnoreCase("a_integer"));
+        assertTrue(fields.get(2).type == DataType.INTEGER);
+        assertTrue(fields.get(3).alias.equalsIgnoreCase("a_float"));
+        assertTrue(fields.get(3).type == DataType.FLOAT);
+    }
+
+    /**
+     * Validates the schema returned when specific columns of a table are given as part of LOAD .
+     * @throws Exception
+     */
+    @Test
+    public void testSchemaForTableWithSpecificColumns() throws Exception {
+        
+        //create the table
+        final String ddl = "CREATE TABLE " + TABLE_FULL_NAME 
+                + "  (ID INTEGER NOT NULL PRIMARY KEY,NAME VARCHAR, AGE INTEGER) ";
+        conn.createStatement().execute(ddl);
+        
+        
+        final String selectColumns = "ID,NAME";
+        pigServer.registerQuery(String.format(
+                "A = load 'hbase://table/%s/%s' using org.apache.phoenix.pig.PhoenixHBaseLoader('%s');",
+                TABLE_FULL_NAME, selectColumns, zkQuorum));
+        
+        Schema schema = pigServer.dumpSchema("A");
+        List<FieldSchema> fields = schema.getFields();
+        assertEquals(2, fields.size());
+        assertTrue(fields.get(0).alias.equalsIgnoreCase("ID"));
+        assertTrue(fields.get(0).type == DataType.INTEGER);
+        assertTrue(fields.get(1).alias.equalsIgnoreCase("NAME"));
+        assertTrue(fields.get(1).type == DataType.CHARARRAY);
+        
+    }
+    
+    /**
+     * Validates the schema returned when a SQL SELECT query is given as part of LOAD .
+     * @throws Exception
+     */
+    @Test
+    public void testSchemaForQuery() throws Exception {
+        
+       //create the table.
+        String ddl = String.format("CREATE TABLE " + TABLE_FULL_NAME +
+                 "  (A_STRING VARCHAR NOT NULL, A_DECIMAL DECIMAL NOT NULL, CF1.A_INTEGER INTEGER, CF2.A_DOUBLE DOUBLE"
+                + "  CONSTRAINT pk PRIMARY KEY (A_STRING, A_DECIMAL))\n", TABLE_FULL_NAME);
+        conn.createStatement().execute(ddl);
+        
+        
+        
+        //sql query for LOAD
+        final String sqlQuery = "SELECT A_STRING,CF1.A_INTEGER,CF2.A_DOUBLE FROM " + TABLE_FULL_NAME;
+        pigServer.registerQuery(String.format(
+                "A = load 'hbase://query/%s' using org.apache.phoenix.pig.PhoenixHBaseLoader('%s');",
+                sqlQuery, zkQuorum));
+        
+        //assert the schema.
+        Schema schema = pigServer.dumpSchema("A");
+        List<FieldSchema> fields = schema.getFields();
+        assertEquals(3, fields.size());
+        assertTrue(fields.get(0).alias.equalsIgnoreCase("a_string"));
+        assertTrue(fields.get(0).type == DataType.CHARARRAY);
+        assertTrue(fields.get(1).alias.equalsIgnoreCase("a_integer"));
+        assertTrue(fields.get(1).type == DataType.INTEGER);
+        assertTrue(fields.get(2).alias.equalsIgnoreCase("a_double"));
+        assertTrue(fields.get(2).type == DataType.DOUBLE);
+    }
+    
+    /**
+     * Validates the schema when it is given as part of LOAD..AS
+     * @throws Exception
+     */
+    @Test
+    public void testSchemaForTableWithAlias() throws Exception {
+        
+        //create the table.
+        String ddl = "CREATE TABLE  " + TABLE_FULL_NAME 
+                + "  (A_STRING VARCHAR NOT NULL, A_DECIMAL DECIMAL NOT NULL, CF1.A_INTEGER INTEGER, CF2.A_DOUBLE DOUBLE"
+                + "  CONSTRAINT pk PRIMARY KEY (A_STRING, A_DECIMAL)) \n";
+        conn.createStatement().execute(ddl);
+        
+        //select query given as part of LOAD.
+        final String sqlQuery = "SELECT A_STRING,A_DECIMAL,CF1.A_INTEGER,CF2.A_DOUBLE FROM " + TABLE_FULL_NAME;
+        
+        LOG.info(String.format("Generated SQL Query [%s]",sqlQuery));
+        
+        pigServer.registerQuery(String.format(
+                "raw = load 'hbase://query/%s' using org.apache.phoenix.pig.PhoenixHBaseLoader('%s') AS (a:chararray,b:bigdecimal,c:int,d:double);",
+                sqlQuery, zkQuorum));
+        
+        //test the schema.
+        Schema schema = pigServer.dumpSchema("raw");
+        List<FieldSchema> fields = schema.getFields();
+        assertEquals(4, fields.size());
+        assertTrue(fields.get(0).alias.equalsIgnoreCase("a"));
+        assertTrue(fields.get(0).type == DataType.CHARARRAY);
+        assertTrue(fields.get(1).alias.equalsIgnoreCase("b"));
+        assertTrue(fields.get(1).type == DataType.BIGDECIMAL);
+        assertTrue(fields.get(2).alias.equalsIgnoreCase("c"));
+        assertTrue(fields.get(2).type == DataType.INTEGER);
+        assertTrue(fields.get(3).alias.equalsIgnoreCase("d"));
+        assertTrue(fields.get(3).type == DataType.DOUBLE);
+    }
+    
+    /**
+     * @throws Exception
+     */
+    @Test
+    public void testDataForTable() throws Exception {
+        
+         //create the table
+         String ddl = "CREATE TABLE  " + TABLE_FULL_NAME 
+                + "  (ID  INTEGER NOT NULL PRIMARY KEY, NAME VARCHAR, AGE INTEGER) ";
+                
+        conn.createStatement().execute(ddl);
+        
+        //prepare data with 10 rows having age 25 and the other 30.
+        final String dml = "UPSERT INTO " + TABLE_FULL_NAME + " VALUES(?,?,?)";
+        PreparedStatement stmt = conn.prepareStatement(dml);
+        int rows = 20;
+        for(int i = 0 ; i < rows; i++) {
+            stmt.setInt(1, i);
+            stmt.setString(2, "a"+i);
+            stmt.setInt(3, (i % 2 == 0) ? 25 : 30);
+            stmt.execute();    
+        }
+        conn.commit();
+         
+        //load data and filter rows whose age is > 25
+        pigServer.registerQuery(String.format(
+                "A = load 'hbase://table/%s' using org.apache.phoenix.pig.PhoenixHBaseLoader('%s');", TABLE_FULL_NAME,
+                zkQuorum));
+        pigServer.registerQuery("B = FILTER A BY AGE > 25;");
+        
+        final Iterator<Tuple> iterator = pigServer.openIterator("B");
+        int recordsRead = 0;
+        while (iterator.hasNext()) {
+            final Tuple each = iterator.next();
+            assertEquals(3, each.size());
+            recordsRead++;
+        }
+        assertEquals(rows/2, recordsRead);
+    }
+    
+    /**
+     * @throws Exception
+     */
+    @Test
+    public void testDataForSQLQuery() throws Exception {
+        
+         //create the table
+         String ddl = "CREATE TABLE  " + TABLE_FULL_NAME 
+                + "  (ID  INTEGER NOT NULL PRIMARY KEY, NAME VARCHAR, AGE INTEGER) ";
+                
+        conn.createStatement().execute(ddl);
+        
+        //prepare data with 10 rows having age 25 and the other 30.
+        final String dml = "UPSERT INTO " + TABLE_FULL_NAME + " VALUES(?,?,?)";
+        PreparedStatement stmt = conn.prepareStatement(dml);
+        int rows = 20;
+        for(int i = 0 ; i < rows; i++) {
+            stmt.setInt(1, i);
+            stmt.setString(2, "a"+i);
+            stmt.setInt(3, (i % 2 == 0) ? 25 : 30);
+            stmt.execute();    
+        }
+        conn.commit();
+        
+        //sql query
+        final String sqlQuery = " SELECT ID,NAME,AGE FROM " + TABLE_FULL_NAME + " WHERE AGE > 25";
+        //load data and filter rows whose age is > 25
+        pigServer.registerQuery(String.format(
+                "A = load 'hbase://query/%s' using org.apache.phoenix.pig.PhoenixHBaseLoader('%s');", sqlQuery,
+                zkQuorum));
+        
+        final Iterator<Tuple> iterator = pigServer.openIterator("A");
+        int recordsRead = 0;
+        while (iterator.hasNext()) {
+            iterator.next();
+            recordsRead++;
+        }
+        assertEquals(rows/2, recordsRead);
+    }
+    
+    /**
+     * @throws Exception
+     */
+    @Test
+    public void testGroupingOfDataForTable() throws Exception {
+        
+         //create the table
+         String ddl = "CREATE TABLE  " + TABLE_FULL_NAME 
+                + "  (ID  INTEGER NOT NULL PRIMARY KEY, NAME VARCHAR, AGE INTEGER, SAL INTEGER) ";
+                
+        conn.createStatement().execute(ddl);
+        
+        //prepare data with 10 rows having age 25 and the other 30.
+        final String dml = "UPSERT INTO " + TABLE_FULL_NAME + " VALUES(?,?,?,?)";
+        PreparedStatement stmt = conn.prepareStatement(dml);
+        int rows = 20;
+        int j = 0, k = 0;
+        for(int i = 0 ; i < rows; i++) {
+            stmt.setInt(1, i);
+            stmt.setString(2, "a"+i);
+            if(i % 2 == 0) {
+                stmt.setInt(3, 25);
+                stmt.setInt(4, 10 * 2 * j++);    
+            } else {
+                stmt.setInt(3, 30);
+                stmt.setInt(4, 10 * 3 * k++);
+            }
+            
+            stmt.execute();    
+        }
+        conn.commit();
+        
+        //prepare the mock storage with expected output
+        final Data data = Storage.resetData(pigServer);
+        List<Tuple> expectedList = new ArrayList<Tuple>();
+        expectedList.add(Storage.tuple(0,180));
+        expectedList.add(Storage.tuple(0,270));
+        
+         //load data and filter rows whose age is > 25
+        pigServer.setBatchOn();
+        pigServer.registerQuery(String.format(
+                "A = load 'hbase://table/%s' using org.apache.phoenix.pig.PhoenixHBaseLoader('%s');", TABLE_FULL_NAME,
+                zkQuorum));
+        
+        pigServer.registerQuery("B = GROUP A BY AGE;");
+        pigServer.registerQuery("C = FOREACH B GENERATE MIN(A.SAL),MAX(A.SAL);");
+        pigServer.registerQuery("STORE C INTO 'out' using mock.Storage();");
+        pigServer.executeBatch();
+        
+        List<Tuple> actualList = data.get("out");
+        assertEquals(expectedList, actualList);
+    }
+    
+    /**
+     * Tests both  {@link PhoenixHBaseLoader} and {@link PhoenixHBaseStorage} 
+     * @throws Exception
+     */
+    @Test
+    public void testLoadAndStore() throws Exception {
+        
+         //create the tables
+         final String sourceTableddl = "CREATE TABLE  " + TABLE_FULL_NAME 
+                + "  (ID  INTEGER NOT NULL PRIMARY KEY, NAME VARCHAR, AGE INTEGER, SAL INTEGER) ";
+         
+         final String targetTable = "AGGREGATE";
+         final String targetTableddl = "CREATE TABLE " + targetTable 
+                 +  "(AGE INTEGER NOT NULL PRIMARY KEY , MIN_SAL INTEGER , MAX_SAL INTEGER) ";
+                 
+        conn.createStatement().execute(sourceTableddl);
+        conn.createStatement().execute(targetTableddl);
+        
+        //prepare data with 10 rows having age 25 and the other 30.
+        final String dml = "UPSERT INTO " + TABLE_FULL_NAME + " VALUES(?,?,?,?)";
+        PreparedStatement stmt = conn.prepareStatement(dml);
+        int rows = 20;
+        int j = 0, k = 0;
+        for(int i = 0 ; i < rows; i++) {
+            stmt.setInt(1, i);
+            stmt.setString(2, "a"+i);
+            if(i % 2 == 0) {
+                stmt.setInt(3, 25);
+                stmt.setInt(4, 10 * 2 * j++);    
+            } else {
+                stmt.setInt(3, 30);
+                stmt.setInt(4, 10 * 3 * k++);
+            }
+            
+            stmt.execute();    
+        }
+        conn.commit();
+        
+            
+         //load data and filter rows whose age is > 25
+        pigServer.setBatchOn();
+        pigServer.registerQuery(String.format(
+                "A = load 'hbase://table/%s' using org.apache.phoenix.pig.PhoenixHBaseLoader('%s');", TABLE_FULL_NAME,
+                zkQuorum));
+        
+        pigServer.registerQuery("B = GROUP A BY AGE;");
+        pigServer.registerQuery("C = FOREACH B GENERATE group as AGE,MIN(A.SAL),MAX(A.SAL);");
+        pigServer.registerQuery("STORE C INTO 'hbase://" + targetTable 
+                + "' using " + PhoenixHBaseStorage.class.getName() + "('"
+                 + zkQuorum + "', '-batchSize 1000');");
+        pigServer.executeBatch();
+        
+        //validate the data with what is stored.
+        final String selectQuery = "SELECT AGE , MIN_SAL ,MAX_SAL FROM " + targetTable + " ORDER BY AGE";
+        final ResultSet rs = conn.createStatement().executeQuery(selectQuery);
+        rs.next();
+        assertEquals(25, rs.getInt("AGE"));
+        assertEquals(0, rs.getInt("MIN_SAL"));
+        assertEquals(180, rs.getInt("MAX_SAL"));
+        rs.next();
+        assertEquals(30, rs.getInt("AGE"));
+        assertEquals(0, rs.getInt("MIN_SAL"));
+        assertEquals(270, rs.getInt("MAX_SAL"));
+    }
+    
+    @After
+    public void tearDown() throws Exception {
+        dropTable(TABLE_FULL_NAME);
+        pigServer.shutdown();
+    }
+
+
+    private void dropTable(String tableFullName) throws SQLException {
+      Preconditions.checkNotNull(conn);
+      conn.createStatement().execute(String.format("DROP TABLE %s",tableFullName));
+    }
+
+    @AfterClass
+    public static void tearDownAfterClass() throws Exception {
+        conn.close();
+        PhoenixDriver.INSTANCE.close();
+        hbaseTestUtil.shutdownMiniCluster();
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-phoenix/blob/a9b8eb9b/phoenix-pig/src/it/java/org/apache/phoenix/pig/PhoenixHBaseStorerIT.java
----------------------------------------------------------------------
diff --git a/phoenix-pig/src/it/java/org/apache/phoenix/pig/PhoenixHBaseStorerIT.java b/phoenix-pig/src/it/java/org/apache/phoenix/pig/PhoenixHBaseStorerIT.java
new file mode 100644
index 0000000..32d0ff9
--- /dev/null
+++ b/phoenix-pig/src/it/java/org/apache/phoenix/pig/PhoenixHBaseStorerIT.java
@@ -0,0 +1,195 @@
+/*
+ * Copyright 2010 The Apache Software Foundation
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ *distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you maynot use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicablelaw or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.phoenix.pig;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+
+import java.sql.Connection;
+import java.sql.DriverManager;
+import java.sql.ResultSet;
+import java.sql.Statement;
+import java.util.Collection;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.HBaseTestingUtility;
+import org.apache.phoenix.jdbc.PhoenixDriver;
+import org.apache.phoenix.util.PhoenixRuntime;
+import org.apache.pig.ExecType;
+import org.apache.pig.PigServer;
+import org.apache.pig.backend.executionengine.ExecJob.JOB_STATUS;
+import org.apache.pig.backend.hadoop.datastorage.ConfigurationUtil;
+import org.apache.pig.builtin.mock.Storage;
+import org.apache.pig.builtin.mock.Storage.Data;
+import org.apache.pig.data.Tuple;
+import org.apache.pig.data.TupleFactory;
+import org.junit.After;
+import org.junit.AfterClass;
+import org.junit.Before;
+import org.junit.BeforeClass;
+import org.junit.Test;
+
+import com.google.common.collect.Lists;
+
+public class PhoenixHBaseStorerIT {
+
+    private static TupleFactory tupleFactory;
+    private static HBaseTestingUtility hbaseTestUtil;
+    private static String zkQuorum;
+    private static Connection conn;
+    private static PigServer pigServer;
+    private static Configuration conf;
+
+    @BeforeClass
+    public static void setUp() throws Exception {
+        hbaseTestUtil = new HBaseTestingUtility();
+        hbaseTestUtil.startMiniCluster();
+
+        Class.forName(PhoenixDriver.class.getName());
+        zkQuorum = "localhost:" + hbaseTestUtil.getZkCluster().getClientPort();
+        conn = DriverManager.getConnection(PhoenixRuntime.JDBC_PROTOCOL +
+                 PhoenixRuntime.JDBC_PROTOCOL_SEPARATOR + zkQuorum);
+        conf = hbaseTestUtil.getConfiguration();
+        // Pig variables
+        tupleFactory = TupleFactory.getInstance();
+    }
+    
+    @Before
+    public void beforeTest() throws Exception {
+        pigServer = new PigServer(ExecType.LOCAL,
+                ConfigurationUtil.toProperties(conf));
+    }
+    
+    @After
+    public void tearDown() throws Exception {
+         pigServer.shutdown();
+    }
+
+    @AfterClass
+    public static void tearDownAfterClass() throws Exception {
+        conn.close();
+        PhoenixDriver.INSTANCE.close();
+        hbaseTestUtil.shutdownMiniCluster();
+    }
+
+    /**
+     * Basic test - writes data to a Phoenix table and compares the data written
+     * to expected
+     * 
+     * @throws Exception
+     */
+    @Test
+    public void testStorer() throws Exception {
+        final String tableName = "TABLE1";
+        final Statement stmt = conn.createStatement();
+
+        stmt.execute("CREATE TABLE " + tableName +
+                 " (ID INTEGER NOT NULL PRIMARY KEY, NAME VARCHAR)");
+
+        final Data data = Storage.resetData(pigServer);
+        final Collection<Tuple> list = Lists.newArrayList();
+
+        // Create input dataset
+        int rows = 100;
+        for (int i = 0; i < rows; i++) {
+            Tuple t = tupleFactory.newTuple();
+            t.append(i);
+            t.append("a" + i);
+            list.add(t);
+        }
+        data.set("in", "id:int, name:chararray", list);
+
+        pigServer.setBatchOn();
+        pigServer.registerQuery("A = LOAD 'in' USING mock.Storage();");
+
+        pigServer.registerQuery("Store A into 'hbase://" + tableName
+                               + "' using " + PhoenixHBaseStorage.class.getName() + "('"
+                                + zkQuorum + "', '-batchSize 1000');");
+
+         // Now run the Pig script
+        if (pigServer.executeBatch().get(0).getStatus() != JOB_STATUS.COMPLETED) {
+            throw new RuntimeException("Job failed", pigServer.executeBatch()
+                    .get(0).getException());
+        }
+
+        // Compare data in Phoenix table to the expected
+        final ResultSet rs = stmt
+                .executeQuery("SELECT id, name FROM table1 ORDER BY id");
+
+        for (int i = 0; i < rows; i++) {
+            assertTrue(rs.next());
+            assertEquals(i, rs.getInt(1));
+            assertEquals("a" +  i, rs.getString(2));
+        }
+    }
+    
+    /**
+     * Basic test - writes specific columns data to a Phoenix table and compares the data written
+     * to expected
+     * 
+     * @throws Exception
+     */
+    @Test
+    public void testStorerForSpecificColumns() throws Exception {
+        final String tableName = "TABLE2";
+        final Statement stmt = conn.createStatement();
+
+        stmt.execute("CREATE TABLE " + tableName +
+                 " (ID INTEGER NOT NULL PRIMARY KEY, NAME VARCHAR, AGE INTEGER)");
+
+        final Data data = Storage.resetData(pigServer);
+        final Collection<Tuple> list = Lists.newArrayList();
+
+        // Create input dataset
+        int rows = 100;
+        for (int i = 0; i < rows; i++) {
+            Tuple t = tupleFactory.newTuple();
+            t.append(i);
+            t.append("a" + i);
+            t.append(i * 2);
+            list.add(t);
+        }
+        data.set("in", "id:int, name:chararray,age:int", list);
+
+        pigServer.setBatchOn();
+        pigServer.registerQuery("A = LOAD 'in' USING mock.Storage();");
+        pigServer.registerQuery("B = FOREACH A GENERATE id,name;");
+        pigServer.registerQuery("Store B into 'hbase://" + tableName + "/ID,NAME"
+                               + "' using " + PhoenixHBaseStorage.class.getName() + "('"
+                                + zkQuorum + "', '-batchSize 1000');");
+
+         // Now run the Pig script
+        if (pigServer.executeBatch().get(0).getStatus() != JOB_STATUS.COMPLETED) {
+            throw new RuntimeException("Job failed", pigServer.executeBatch()
+                    .get(0).getException());
+        }
+
+        // Compare data in Phoenix table to the expected
+        final ResultSet rs = stmt
+                .executeQuery("SELECT id, name,age FROM " + tableName + "ORDER BY id");
+
+        for (int i = 0; i < rows; i++) {
+            assertTrue(rs.next());
+            assertEquals(i, rs.getInt(1));
+            assertEquals("a" +  i, rs.getString(2));
+            assertEquals(0, rs.getInt(3));
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-phoenix/blob/a9b8eb9b/phoenix-pig/src/it/java/org/apache/phoenix/pig/PhoenixPigConfigurationIT.java
----------------------------------------------------------------------
diff --git a/phoenix-pig/src/it/java/org/apache/phoenix/pig/PhoenixPigConfigurationIT.java b/phoenix-pig/src/it/java/org/apache/phoenix/pig/PhoenixPigConfigurationIT.java
new file mode 100644
index 0000000..e867a4d
--- /dev/null
+++ b/phoenix-pig/src/it/java/org/apache/phoenix/pig/PhoenixPigConfigurationIT.java
@@ -0,0 +1,107 @@
+/*
+ * Copyright 2010 The Apache Software Foundation
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ *distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you maynot use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicablelaw or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.phoenix.pig;
+
+import static org.apache.phoenix.util.PhoenixRuntime.JDBC_PROTOCOL_TERMINATOR;
+import static org.apache.phoenix.util.PhoenixRuntime.PHOENIX_TEST_DRIVER_URL_PARAM;
+import static org.apache.phoenix.util.TestUtil.TEST_PROPERTIES;
+import static org.junit.Assert.assertEquals;
+
+import java.sql.Connection;
+import java.sql.DriverManager;
+import java.util.Properties;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.phoenix.end2end.BaseHBaseManagedTimeIT;
+import org.apache.phoenix.util.SchemaUtil;
+import org.apache.phoenix.util.TestUtil;
+import org.junit.Test;
+
+public class PhoenixPigConfigurationIT extends BaseHBaseManagedTimeIT {
+    private static final String zkQuorum = TestUtil.LOCALHOST + JDBC_PROTOCOL_TERMINATOR + PHOENIX_TEST_DRIVER_URL_PARAM;
+    
+    @Test
+    public void testUpsertStatement() throws Exception {
+        Properties props = new Properties(TEST_PROPERTIES);
+        Connection conn = DriverManager.getConnection(getUrl(), props);
+        conn.setAutoCommit(false);
+        final String tableName = "TEST_TABLE";
+        try {
+            String ddl = "CREATE TABLE "+ tableName + 
+                    "  (a_string varchar not null, a_binary varbinary not null, col1 integer" +
+                    "  CONSTRAINT pk PRIMARY KEY (a_string, a_binary))\n";
+            createTestTable(getUrl(), ddl);
+            final PhoenixPigConfiguration configuration = newConfiguration (tableName);
+            final String upserStatement = configuration.getUpsertStatement();
+            final String expectedUpsertStatement = "UPSERT INTO " + tableName + " VALUES (?, ?, ?)"; 
+            assertEquals(expectedUpsertStatement, upserStatement);
+        } finally {
+            conn.close();
+        }
+    }
+    
+    @Test
+    public void testSelectStatement() throws Exception {
+        Properties props = new Properties(TEST_PROPERTIES);
+        Connection conn = DriverManager.getConnection(getUrl(), props);
+        conn.setAutoCommit(false);
+        final String tableName = "TEST_TABLE";
+        try {
+            String ddl = "CREATE TABLE "+ tableName + 
+                    "  (a_string varchar not null, a_binary varbinary not null, col1 integer" +
+                    "  CONSTRAINT pk PRIMARY KEY (a_string, a_binary))\n";
+            createTestTable(getUrl(), ddl);
+            final PhoenixPigConfiguration configuration = newConfiguration (tableName);
+            final String selectStatement = configuration.getSelectStatement();
+            final String expectedSelectStatement = "SELECT \"A_STRING\",\"A_BINARY\",\"0\".\"COL1\" FROM " + SchemaUtil.getEscapedArgument(tableName) ; 
+            assertEquals(expectedSelectStatement, selectStatement);
+        } finally {
+            conn.close();
+        }
+    }
+    
+    @Test
+    public void testSelectStatementForSpecificColumns() throws Exception {
+        Properties props = new Properties(TEST_PROPERTIES);
+        Connection conn = DriverManager.getConnection(getUrl(), props);
+        conn.setAutoCommit(false);
+        final String tableName = "TEST_TABLE";
+        try {
+            String ddl = "CREATE TABLE "+ tableName + 
+                    "  (a_string varchar not null, a_binary varbinary not null, col1 integer" +
+                    "  CONSTRAINT pk PRIMARY KEY (a_string, a_binary))\n";
+            createTestTable(getUrl(), ddl);
+            final PhoenixPigConfiguration configuration = newConfiguration (tableName);
+            configuration.setSelectColumns("a_binary");
+            final String selectStatement = configuration.getSelectStatement();
+            final String expectedSelectStatement = "SELECT \"A_BINARY\" FROM " + SchemaUtil.getEscapedArgument(tableName) ; 
+            assertEquals(expectedSelectStatement, selectStatement);
+        } finally {
+            conn.close();
+        }
+    }
+
+    private PhoenixPigConfiguration newConfiguration(String tableName) {
+        final Configuration configuration = new Configuration();
+        final PhoenixPigConfiguration phoenixConfiguration = new PhoenixPigConfiguration(configuration);
+        phoenixConfiguration.configure(zkQuorum, tableName.toUpperCase(), 100);
+        return phoenixConfiguration;
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-phoenix/blob/a9b8eb9b/phoenix-pig/src/main/java/org/apache/phoenix/pig/PhoenixHBaseLoader.java
----------------------------------------------------------------------
diff --git a/phoenix-pig/src/main/java/org/apache/phoenix/pig/PhoenixHBaseLoader.java b/phoenix-pig/src/main/java/org/apache/phoenix/pig/PhoenixHBaseLoader.java
new file mode 100644
index 0000000..1f81ad6
--- /dev/null
+++ b/phoenix-pig/src/main/java/org/apache/phoenix/pig/PhoenixHBaseLoader.java
@@ -0,0 +1,253 @@
+/*
+ * Copyright 2010 The Apache Software Foundation
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ *distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you maynot use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicablelaw or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.phoenix.pig;
+
+import static org.apache.commons.lang.StringUtils.isEmpty;
+
+import java.io.IOException;
+import java.util.Properties;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hbase.util.Pair;
+import org.apache.hadoop.io.NullWritable;
+import org.apache.hadoop.mapreduce.InputFormat;
+import org.apache.hadoop.mapreduce.Job;
+import org.apache.hadoop.mapreduce.RecordReader;
+import org.apache.phoenix.pig.hadoop.PhoenixInputFormat;
+import org.apache.phoenix.pig.hadoop.PhoenixRecord;
+import org.apache.phoenix.pig.util.PhoenixPigSchemaUtil;
+import org.apache.phoenix.pig.util.QuerySchemaParserFunction;
+import org.apache.phoenix.pig.util.TableSchemaParserFunction;
+import org.apache.phoenix.pig.util.TypeUtil;
+import org.apache.pig.Expression;
+import org.apache.pig.LoadFunc;
+import org.apache.pig.LoadMetadata;
+import org.apache.pig.PigException;
+import org.apache.pig.ResourceSchema;
+import org.apache.pig.ResourceStatistics;
+import org.apache.pig.backend.executionengine.ExecException;
+import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.PigSplit;
+import org.apache.pig.data.Tuple;
+import org.apache.pig.impl.util.ObjectSerializer;
+import org.apache.pig.impl.util.UDFContext;
+
+import com.google.common.base.Preconditions;
+
+/**
+ * LoadFunc to load data from HBase using Phoenix .
+ * 
+ * Example usage: 
+ * a) TABLE
+ *   i)   A = load 'hbase://table/HIRES'  using
+ * org.apache.phoenix.pig.PhoenixHBaseLoader('localhost');
+ *               
+ *       The above loads the data from a table 'HIRES'
+ *       
+ *   ii)  A = load 'hbase://table/HIRES/id,name' using
+ *       org.apache.phoenix.pig.PhoenixHBaseLoader('localhost');
+ *       
+ *       Here, only id, name are returned from the table HIRES as part of LOAD.
+ * 
+ * b)  QUERY
+ *   i)   B = load 'hbase://query/SELECT fname, lname FROM HIRES' using
+ *             org.apache.phoenix.pig.PhoenixHBaseLoader('localhost');
+ *       
+ *        The above loads fname and lname columns from 'HIRES' table.
+ * 
+ */
+public final class PhoenixHBaseLoader extends LoadFunc implements LoadMetadata {
+
+    private static final Log LOG = LogFactory.getLog(PhoenixHBaseLoader.class);
+    private static final String PHOENIX_TABLE_NAME_SCHEME = "hbase://table/";
+    private static final String PHOENIX_QUERY_SCHEME      = "hbase://query/";
+    private static final String RESOURCE_SCHEMA_SIGNATURE = "phoenix.pig.schema";
+   
+    private PhoenixPigConfiguration config;
+    private String tableName;
+    private String selectQuery;
+    private String zkQuorum ;
+    private PhoenixInputFormat inputFormat;
+    private RecordReader<NullWritable, PhoenixRecord> reader;
+    private String contextSignature;
+    private ResourceSchema schema;
+       
+    /**
+     * @param zkQuorum
+     */
+    public PhoenixHBaseLoader(String zkQuorum) {
+        super();
+        Preconditions.checkNotNull(zkQuorum);
+        Preconditions.checkState(zkQuorum.length() > 0, "Zookeeper quorum cannot be empty!");
+        this.zkQuorum = zkQuorum;
+    }
+    
+    @Override
+    public void setLocation(String location, Job job) throws IOException {        
+        final Configuration configuration = job.getConfiguration();
+        //explicitly turning off combining splits. 
+        configuration.setBoolean("pig.noSplitCombination", true);
+        this.initializePhoenixPigConfiguration(location, configuration);
+    }
+
+    /**
+     * Initialize PhoenixPigConfiguration if it is null. Called by {@link #setLocation} and {@link #getSchema}
+     * @param location
+     * @param configuration
+     * @throws PigException
+     */
+    private void initializePhoenixPigConfiguration(final String location, final Configuration configuration) throws PigException {
+        if(this.config != null) {
+            return;
+        }
+        this.config = new PhoenixPigConfiguration(configuration);
+        this.config.setServerName(this.zkQuorum);
+        Pair<String,String> pair = null;
+        try {
+            if (location.startsWith(PHOENIX_TABLE_NAME_SCHEME)) {
+                String tableSchema = location.substring(PHOENIX_TABLE_NAME_SCHEME.length());
+                final TableSchemaParserFunction parseFunction = new TableSchemaParserFunction();
+                pair =  parseFunction.apply(tableSchema);
+             } else if (location.startsWith(PHOENIX_QUERY_SCHEME)) {
+                this.selectQuery = location.substring(PHOENIX_QUERY_SCHEME.length());
+                final QuerySchemaParserFunction queryParseFunction = new QuerySchemaParserFunction(this.config);
+                pair = queryParseFunction.apply(this.selectQuery);
+                config.setSelectStatement(this.selectQuery);
+            }
+            this.tableName = pair.getFirst();
+            final String selectedColumns = pair.getSecond();
+            
+            if(isEmpty(this.tableName) && isEmpty(this.selectQuery)) {
+                printUsage(location);
+            }
+            this.config.setTableName(this.tableName);
+            if(!isEmpty(selectedColumns)) {
+                this.config.setSelectColumns(selectedColumns);    
+            }
+        } catch(IllegalArgumentException iae) {
+            printUsage(location);
+        } 
+    }
+
+  
+    @Override
+    public String relativeToAbsolutePath(String location, Path curDir) throws IOException {
+        return location;
+    }
+
+    @Override
+    public InputFormat getInputFormat() throws IOException {
+        if(inputFormat == null) {
+            inputFormat = new PhoenixInputFormat();
+        }
+        return inputFormat;
+    }
+
+    @SuppressWarnings("unchecked")
+    @Override
+    public void prepareToRead(RecordReader reader, PigSplit split) throws IOException {
+        this.reader = reader;
+        final String resourceSchemaAsStr = getValueFromUDFContext(this.contextSignature,RESOURCE_SCHEMA_SIGNATURE);
+        if (resourceSchemaAsStr == null) {
+            throw new IOException("Could not find schema in UDF context");
+        }
+       schema = (ResourceSchema)ObjectSerializer.deserialize(resourceSchemaAsStr); 
+    }
+
+     /*
+     * @see org.apache.pig.LoadFunc#setUDFContextSignature(java.lang.String)
+     */
+    @Override
+    public void setUDFContextSignature(String signature) {
+        this.contextSignature = signature;
+    }
+    
+    @Override
+    public Tuple getNext() throws IOException {
+        try {
+            if(!reader.nextKeyValue()) {
+               return null; 
+            }
+            final PhoenixRecord phoenixRecord = reader.getCurrentValue();
+            if(phoenixRecord == null) {
+                return null;
+            }
+            final Tuple tuple = TypeUtil.transformToTuple(phoenixRecord,schema.getFields());
+            return tuple;
+       } catch (InterruptedException e) {
+            int errCode = 6018;
+            final String errMsg = "Error while reading input";
+            throw new ExecException(errMsg, errCode,PigException.REMOTE_ENVIRONMENT, e);
+        }
+    }
+    
+    private void printUsage(final String location) throws PigException {
+        String locationErrMsg = String.format("The input location in load statement should be of the form " +
+                "%s<table name> or %s<query>. Got [%s] ",PHOENIX_TABLE_NAME_SCHEME,PHOENIX_QUERY_SCHEME,location);
+        LOG.error(locationErrMsg);
+        throw new PigException(locationErrMsg);
+    }
+    
+    @Override
+    public ResourceSchema getSchema(String location, Job job) throws IOException {
+        if(schema != null) {
+            return schema;
+        }
+        final Configuration configuration = job.getConfiguration();
+        this.initializePhoenixPigConfiguration(location, configuration);
+        this.schema = PhoenixPigSchemaUtil.getResourceSchema(this.config);
+        if(LOG.isDebugEnabled()) {
+            LOG.debug(String.format("Resource Schema generated for location [%s] is [%s]", location, schema.toString()));
+        }
+        this.storeInUDFContext(this.contextSignature, RESOURCE_SCHEMA_SIGNATURE, ObjectSerializer.serialize(schema));
+        return schema;
+    }
+
+    @Override
+    public ResourceStatistics getStatistics(String location, Job job) throws IOException {
+       // not implemented
+        return null;
+    }
+
+    @Override
+    public String[] getPartitionKeys(String location, Job job) throws IOException {
+     // not implemented
+        return null;
+    }
+
+    @Override
+    public void setPartitionFilter(Expression partitionFilter) throws IOException {
+     // not implemented
+    }
+ 
+    private void storeInUDFContext(final String signature,final String key,final String value) {
+        final UDFContext udfContext = UDFContext.getUDFContext();
+        final Properties props = udfContext.getUDFProperties(this.getClass(), new String[]{signature});
+        props.put(key, value);
+    }
+    
+    private String getValueFromUDFContext(final String signature,final String key) {
+        final UDFContext udfContext = UDFContext.getUDFContext();
+        final Properties props = udfContext.getUDFProperties(this.getClass(), new String[]{signature});
+        return props.getProperty(key);
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-phoenix/blob/a9b8eb9b/phoenix-pig/src/main/java/org/apache/phoenix/pig/PhoenixHBaseStorage.java
----------------------------------------------------------------------
diff --git a/phoenix-pig/src/main/java/org/apache/phoenix/pig/PhoenixHBaseStorage.java b/phoenix-pig/src/main/java/org/apache/phoenix/pig/PhoenixHBaseStorage.java
index 9e237f1..500e403 100644
--- a/phoenix-pig/src/main/java/org/apache/phoenix/pig/PhoenixHBaseStorage.java
+++ b/phoenix-pig/src/main/java/org/apache/phoenix/pig/PhoenixHBaseStorage.java
@@ -18,6 +18,8 @@
 package org.apache.phoenix.pig;
 
 import java.io.IOException;
+import java.net.URI;
+import java.net.URISyntaxException;
 import java.util.Properties;
 
 import org.apache.commons.cli.CommandLine;
@@ -31,6 +33,8 @@ import org.apache.hadoop.io.NullWritable;
 import org.apache.hadoop.mapreduce.Job;
 import org.apache.hadoop.mapreduce.OutputFormat;
 import org.apache.hadoop.mapreduce.RecordWriter;
+import org.apache.phoenix.pig.hadoop.PhoenixOutputFormat;
+import org.apache.phoenix.pig.hadoop.PhoenixRecord;
 import org.apache.pig.ResourceSchema;
 import org.apache.pig.ResourceSchema.ResourceFieldSchema;
 import org.apache.pig.StoreFuncInterface;
@@ -38,9 +42,6 @@ import org.apache.pig.data.Tuple;
 import org.apache.pig.impl.util.ObjectSerializer;
 import org.apache.pig.impl.util.UDFContext;
 
-import org.apache.phoenix.pig.hadoop.PhoenixOutputFormat;
-import org.apache.phoenix.pig.hadoop.PhoenixRecord;
-
 /**
  * StoreFunc that uses Phoenix to store data into HBase.
  * 
@@ -52,11 +53,21 @@ import org.apache.phoenix.pig.hadoop.PhoenixRecord;
  * argument to this StoreFunc is the server, the 2nd argument is the batch size
  * for upserts via Phoenix.
  * 
+ * Alternative usage: A = load 'testdata' as (a:chararray, b:chararray, 
+ *  e: datetime); STORE A into 'hbase://CORE.ENTITY_HISTORY/ID,F.B,F.E' using
+ * org.apache.bdaas.PhoenixHBaseStorage('localhost','-batchSize 5000');
+ * 
+ * The above reads a file 'testdata' and writes the elements ID, F.B, and F.E to HBase. 
+ * In this example, ID is the row key, and F is the column family for the data elements.  
+ * First argument to this StoreFunc is the server, the 2nd argument is the batch size
+ * for upserts via Phoenix. In this case, less than the full table row is required.
+ * For configuration message, look in the info log file.
+ *
  * Note that Pig types must be in sync with the target Phoenix data types. This
  * StoreFunc tries best to cast based on input Pig types and target Phoenix data
  * types, but it is recommended to supply appropriate schema.
  * 
- * This is only a STORE implementation. LoadFunc coming soon.
+ * 
  * 
  * 
  * 
@@ -65,7 +76,6 @@ import org.apache.phoenix.pig.hadoop.PhoenixRecord;
 public class PhoenixHBaseStorage implements StoreFuncInterface {
 
 	private PhoenixPigConfiguration config;
-	private String tableName;
 	private RecordWriter<NullWritable, PhoenixRecord> writer;
 	private String contextSignature = null;
 	private ResourceSchema schema;	
@@ -118,17 +128,28 @@ public class PhoenixHBaseStorage implements StoreFuncInterface {
 	 */
 	@Override
 	public void setStoreLocation(String location, Job job) throws IOException {
-		String prefix = "hbase://";
-		if (location.startsWith(prefix)) {
-			tableName = location.substring(prefix.length());
-		}
-		config = new PhoenixPigConfiguration(job.getConfiguration());
-		config.configure(server, tableName, batchSize);
-
-		String serializedSchema = getUDFProperties().getProperty(contextSignature + SCHEMA);
-		if (serializedSchema != null) {
-			schema = (ResourceSchema) ObjectSerializer.deserialize(serializedSchema);
-		}
+	    URI locationURI;
+        try {
+            locationURI = new URI(location);
+            if (!"hbase".equals(locationURI.getScheme())) {
+                throw new IOException(String.format("Location must use the hbase protocol, hbase://tableName[/columnList]. Supplied location=%s",location));
+            }
+            String tableName = locationURI.getAuthority();
+            // strip off the leading path token '/'
+            String columns = null;
+            if(!locationURI.getPath().isEmpty()) {
+                columns = locationURI.getPath().substring(1);
+            }
+            config = new PhoenixPigConfiguration(job.getConfiguration());
+            config.configure(server, tableName, batchSize, columns);
+            
+            String serializedSchema = getUDFProperties().getProperty(contextSignature + SCHEMA);
+            if (serializedSchema != null) {
+                schema = (ResourceSchema) ObjectSerializer.deserialize(serializedSchema);
+            }
+        } catch (URISyntaxException e) {
+            throw new IOException(String.format("Location must use the hbase protocol, hbase://tableName[/columnList]. Supplied location=%s",location),e);
+        }
 	}
 
 	@SuppressWarnings("unchecked")


[2/3] PHOENIX-11 Pig Loader (RaviMagham)

Posted by ja...@apache.org.
http://git-wip-us.apache.org/repos/asf/incubator-phoenix/blob/a9b8eb9b/phoenix-pig/src/main/java/org/apache/phoenix/pig/PhoenixPigConfiguration.java
----------------------------------------------------------------------
diff --git a/phoenix-pig/src/main/java/org/apache/phoenix/pig/PhoenixPigConfiguration.java b/phoenix-pig/src/main/java/org/apache/phoenix/pig/PhoenixPigConfiguration.java
index 3b0551f..e2521c5 100644
--- a/phoenix-pig/src/main/java/org/apache/phoenix/pig/PhoenixPigConfiguration.java
+++ b/phoenix-pig/src/main/java/org/apache/phoenix/pig/PhoenixPigConfiguration.java
@@ -18,32 +18,40 @@
 
 package org.apache.phoenix.pig;
 
+import static org.apache.commons.lang.StringUtils.isNotEmpty;
+
 import java.sql.Connection;
 import java.sql.DriverManager;
-import java.sql.ResultSet;
 import java.sql.SQLException;
-import java.util.ArrayList;
 import java.util.List;
 import java.util.Properties;
 
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.conf.Configuration;
-
 import org.apache.phoenix.jdbc.PhoenixConnection;
+import org.apache.phoenix.pig.util.ColumnInfoToStringEncoderDecoder;
 import org.apache.phoenix.util.ColumnInfo;
+import org.apache.phoenix.util.PhoenixRuntime;
 import org.apache.phoenix.util.QueryUtil;
 
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.base.Joiner;
+import com.google.common.base.Preconditions;
+import com.google.common.base.Splitter;
+import com.google.common.collect.Lists;
+
+
 /**
- * A container for configuration to be used with {@link PhoenixHBaseStorage}
- * 
- * 
+ * A container for configuration to be used with {@link PhoenixHBaseStorage} and {@link PhoenixHBaseLoader}
  * 
  */
 public class PhoenixPigConfiguration {
 	
 	private static final Log LOG = LogFactory.getLog(PhoenixPigConfiguration.class);
 	
+	private PhoenixPigConfigurationUtil util;
+	
 	/**
 	 * Speculative execution of Map tasks
 	 */
@@ -58,106 +66,232 @@ public class PhoenixPigConfiguration {
 	
 	public static final String TABLE_NAME = "phoenix.hbase.table.name";
 	
+	public static final String UPSERT_COLUMNS = "phoenix.hbase.upsert.columns";
+	
 	public static final String UPSERT_STATEMENT = "phoenix.upsert.stmt";
 	
+	public static final String UPSERT_COLUMN_INFO_KEY  = "phoenix.upsert.columninfos.list";
+	
+	public static final String SELECT_STATEMENT = "phoenix.select.stmt";
+	
 	public static final String UPSERT_BATCH_SIZE = "phoenix.upsert.batch.size";
 	
+	//columns projected given as part of LOAD.
+	public static final String SELECT_COLUMNS = "phoneix.select.query.columns";
+	
+	public static final String SELECT_COLUMN_INFO_KEY  = "phoenix.select.columninfos.list";
+	
+	// the delimiter supported during LOAD and STORE when projected columns are given.
+	public static final String COLUMN_NAMES_DELIMITER = "phoenix.column.names.delimiter";
+	
 	public static final long DEFAULT_UPSERT_BATCH_SIZE = 1000;
 	
-	private final Configuration conf;
+	public static final String DEFAULT_COLUMN_NAMES_DELIMITER = ",";
 	
-	private Connection conn;
-	private List<ColumnInfo> columnMetadataList;
+	private final Configuration conf;
 		
 	public PhoenixPigConfiguration(Configuration conf) {
 		this.conf = conf;
+		this.util = new PhoenixPigConfigurationUtil();
 	}
 	
 	public void configure(String server, String tableName, long batchSize) {
-		conf.set(SERVER_NAME, server);
-		conf.set(TABLE_NAME, tableName);
-		conf.setLong(UPSERT_BATCH_SIZE, batchSize);
-		conf.setBoolean(MAP_SPECULATIVE_EXEC, false);
-		conf.setBoolean(REDUCE_SPECULATIVE_EXEC, false);
+        configure(server,tableName,batchSize,null);
+    }
+	
+	public void configure(String server, String tableName, long batchSize, String columns) {
+	    conf.set(SERVER_NAME, server);
+        conf.set(TABLE_NAME, tableName);
+        conf.setLong(UPSERT_BATCH_SIZE, batchSize);
+        if (isNotEmpty(columns)) {
+            conf.set(UPSERT_COLUMNS, columns);
+        }
+        conf.setBoolean(MAP_SPECULATIVE_EXEC, false);
+        conf.setBoolean(REDUCE_SPECULATIVE_EXEC, false);
 	}
 	
+	
 	/**
 	 * Creates a {@link Connection} with autoCommit set to false.
 	 * @throws SQLException
 	 */
 	public Connection getConnection() throws SQLException {
-		Properties props = new Properties();
-		conn = DriverManager.getConnection(QueryUtil.getUrl(this.conf.get(SERVER_NAME)), props).unwrap(PhoenixConnection.class);
-		conn.setAutoCommit(false);
-		
-		setup(conn);
-		
-		return conn;
+	    return getUtil().getConnection(getConfiguration());
 	}
 	
-	/**
-	 * This method creates the Upsert statement and the Column Metadata
-	 * for the Pig query using {@link PhoenixHBaseStorage}. It also 
-	 * determines the batch size based on user provided options.
-	 * 
-	 * @param conn
-	 * @throws SQLException
-	 */
-	public void setup(Connection conn) throws SQLException {
-		// Reset batch size
-		long batchSize = getBatchSize() <= 0 ? ((PhoenixConnection) conn).getMutateBatchSize() : getBatchSize();
-		conf.setLong(UPSERT_BATCH_SIZE, batchSize);
-		
-		if (columnMetadataList == null) {
-			columnMetadataList = new ArrayList<ColumnInfo>();
-			String[] tableMetadata = getTableMetadata(getTableName());
-			ResultSet rs = conn.getMetaData().getColumns(null, tableMetadata[0], tableMetadata[1], null);
-			while (rs.next()) {
-				columnMetadataList.add(new ColumnInfo(rs.getString(QueryUtil.COLUMN_NAME_POSITION), rs.getInt(QueryUtil.DATA_TYPE_POSITION)));
-			}
-		}
-		
-		// Generating UPSERT statement without column name information.
-		String upsertStmt = QueryUtil.constructGenericUpsertStatement(getTableName(), columnMetadataList.size());
-		LOG.info("Phoenix Upsert Statement: " + upsertStmt);
-		conf.set(UPSERT_STATEMENT, upsertStmt);
-	}
-	
-	public String getUpsertStatement() {
-		return conf.get(UPSERT_STATEMENT);
+	public String getUpsertStatement() throws SQLException {
+		return getUtil().getUpsertStatement(getConfiguration(), getTableName());
 	}
 
-	public long getBatchSize() {
-		return conf.getLong(UPSERT_BATCH_SIZE, DEFAULT_UPSERT_BATCH_SIZE);
+	public long getBatchSize() throws SQLException {
+		return getUtil().getBatchSize(getConfiguration());
 	}
 
-
 	public String getServer() {
 		return conf.get(SERVER_NAME);
 	}
 
-	public List<ColumnInfo> getColumnMetadataList() {
-		return columnMetadataList;
+	public List<ColumnInfo> getColumnMetadataList() throws SQLException {
+	    return getUtil().getUpsertColumnMetadataList(getConfiguration(), getTableName());
+	}
+	
+	public String getUpsertColumns() {
+	    return conf.get(UPSERT_COLUMNS);
 	}
 	
 	public String getTableName() {
 		return conf.get(TABLE_NAME);
 	}
-
-	private String[] getTableMetadata(String table) {
-		String[] schemaAndTable = table.split("\\.");
-		assert schemaAndTable.length >= 1;
-
-		if (schemaAndTable.length == 1) {
-			return new String[] { "", schemaAndTable[0] };
-		}
-
-		return new String[] { schemaAndTable[0], schemaAndTable[1] };
-	}
-
 	
 	public Configuration getConfiguration() {
 		return this.conf;
 	}
+	
+	public String getSelectStatement() throws SQLException {
+	   return getUtil().getSelectStatement(getConfiguration(), getTableName());
+	}
+	
+	public List<ColumnInfo> getSelectColumnMetadataList() throws SQLException {
+        return getUtil().getSelectColumnMetadataList(getConfiguration(), getTableName());
+    }
+	
+	public void setServerName(final String zookeeperQuorum) {
+	    this.conf.set(SERVER_NAME, zookeeperQuorum);
+	}
+	
+	public void setTableName(final String tableName) {
+	    Preconditions.checkNotNull(tableName, "HBase Table name cannot be null!");
+	    this.conf.set(TABLE_NAME, tableName);
+	}
+	
+	public void setSelectStatement(final String selectStatement) {
+	    this.conf.set(SELECT_STATEMENT, selectStatement);
+	}
 
+	public void setSelectColumns(String selectColumns) {
+        this.conf.set(SELECT_COLUMNS, selectColumns);
+    }
+	
+	public PhoenixPigConfigurationUtil getUtil() {
+	    return this.util;
+	}
+	
+		
+	@VisibleForTesting
+	static class PhoenixPigConfigurationUtil {
+                
+        public Connection getConnection(final Configuration configuration) throws SQLException {
+            Preconditions.checkNotNull(configuration);
+            Properties props = new Properties();
+            final Connection conn = DriverManager.getConnection(QueryUtil.getUrl(configuration.get(SERVER_NAME)), props).unwrap(PhoenixConnection.class);
+            conn.setAutoCommit(false);
+            return conn;
+        }
+        
+        public List<ColumnInfo> getUpsertColumnMetadataList(final Configuration configuration,final String tableName) throws SQLException {
+            Preconditions.checkNotNull(configuration);
+            Preconditions.checkNotNull(tableName);
+            final String columnInfoStr = configuration.get(UPSERT_COLUMN_INFO_KEY);
+            if(isNotEmpty(columnInfoStr)) {
+                return ColumnInfoToStringEncoderDecoder.decode(columnInfoStr);
+            }
+            final Connection connection = getConnection(configuration);
+            String upsertColumns = configuration.get(UPSERT_COLUMNS);
+            List<String> upsertColumnList = null;
+            if(isNotEmpty(upsertColumns)) {
+                final String columnNamesDelimiter = configuration.get(COLUMN_NAMES_DELIMITER, DEFAULT_COLUMN_NAMES_DELIMITER);
+                upsertColumnList = Lists.newArrayList(Splitter.on(columnNamesDelimiter).omitEmptyStrings().trimResults().split(upsertColumns));
+                LOG.info(String.format("UseUpsertColumns=%s, upsertColumns=%s, upsertColumnSet.size()=%s, parsedColumns=%s "
+                        ,!upsertColumnList.isEmpty(),upsertColumns, upsertColumnList.size(), Joiner.on(DEFAULT_COLUMN_NAMES_DELIMITER).join(upsertColumnList)
+                        ));
+            } 
+           List<ColumnInfo> columnMetadataList = PhoenixRuntime.generateColumnInfo(connection, tableName, upsertColumnList);
+           final String encodedColumnInfos = ColumnInfoToStringEncoderDecoder.encode(columnMetadataList);
+           // we put the encoded column infos in the Configuration for re usability. 
+           configuration.set(UPSERT_COLUMN_INFO_KEY, encodedColumnInfos);
+           closeConnection(connection);
+           return columnMetadataList;
+        }
+        
+        public String getUpsertStatement(final Configuration configuration,final String tableName) throws SQLException {
+            Preconditions.checkNotNull(configuration);
+            Preconditions.checkNotNull(tableName);
+            String upsertStmt = configuration.get(UPSERT_STATEMENT);
+            if(isNotEmpty(upsertStmt)) {
+                return upsertStmt;
+            }
+            final boolean useUpsertColumns = isNotEmpty(configuration.get(UPSERT_COLUMNS,""));
+            final List<ColumnInfo> columnMetadataList = getUpsertColumnMetadataList(configuration, tableName);
+            if (useUpsertColumns) {
+                // Generating UPSERT statement without column name information.
+                upsertStmt = QueryUtil.constructUpsertStatement(tableName, columnMetadataList);
+                LOG.info("Phoenix Custom Upsert Statement: "+ upsertStmt);
+            } else {
+                // Generating UPSERT statement without column name information.
+                upsertStmt = QueryUtil.constructGenericUpsertStatement(tableName, columnMetadataList.size());
+                LOG.info("Phoenix Generic Upsert Statement: " + upsertStmt);
+            }
+            configuration.set(UPSERT_STATEMENT, upsertStmt);
+            return upsertStmt;
+            
+        }
+        
+        public List<ColumnInfo> getSelectColumnMetadataList(final Configuration configuration,final String tableName) throws SQLException {
+            Preconditions.checkNotNull(configuration);
+            Preconditions.checkNotNull(tableName);
+            final String columnInfoStr = configuration.get(SELECT_COLUMN_INFO_KEY);
+            if(isNotEmpty(columnInfoStr)) {
+                return ColumnInfoToStringEncoderDecoder.decode(columnInfoStr);
+            }
+            final Connection connection = getConnection(configuration);
+            String selectColumns = configuration.get(SELECT_COLUMNS);
+            List<String> selectColumnList = null;
+            if(isNotEmpty(selectColumns)) {
+                final String columnNamesDelimiter = configuration.get(COLUMN_NAMES_DELIMITER, DEFAULT_COLUMN_NAMES_DELIMITER);
+                selectColumnList = Lists.newArrayList(Splitter.on(columnNamesDelimiter).omitEmptyStrings().trimResults().split(selectColumns));
+                LOG.info(String.format("UseSelectColumns=%s, selectColumns=%s, selectColumnSet.size()=%s, parsedColumns=%s "
+                        ,!selectColumnList.isEmpty(),selectColumns, selectColumnList.size(), Joiner.on(DEFAULT_COLUMN_NAMES_DELIMITER).join(selectColumnList)
+                        ));
+            }
+           List<ColumnInfo> columnMetadataList = PhoenixRuntime.generateColumnInfo(connection, tableName, selectColumnList);
+           final String encodedColumnInfos = ColumnInfoToStringEncoderDecoder.encode(columnMetadataList);
+           // we put the encoded column infos in the Configuration for re usability. 
+           configuration.set(SELECT_COLUMN_INFO_KEY, encodedColumnInfos);
+           closeConnection(connection);
+           return columnMetadataList;
+        }
+        
+        public String getSelectStatement(final Configuration configuration,final String tableName) throws SQLException {
+            Preconditions.checkNotNull(configuration);
+            Preconditions.checkNotNull(tableName);
+            String selectStmt = configuration.get(SELECT_STATEMENT);
+            if(isNotEmpty(selectStmt)) {
+                return selectStmt;
+            }
+            final List<ColumnInfo> columnMetadataList = getSelectColumnMetadataList(configuration, tableName);
+            selectStmt = QueryUtil.constructSelectStatement(tableName, columnMetadataList);
+            LOG.info("Select Statement: "+ selectStmt);
+            configuration.set(SELECT_STATEMENT, selectStmt);
+            return selectStmt;
+        }
+        
+        public long getBatchSize(final Configuration configuration) throws SQLException {
+            Preconditions.checkNotNull(configuration);
+            long batchSize = configuration.getLong(UPSERT_BATCH_SIZE, DEFAULT_UPSERT_BATCH_SIZE);
+            if(batchSize <= 0) {
+               Connection conn = getConnection(configuration);
+               batchSize = ((PhoenixConnection) conn).getMutateBatchSize();
+               closeConnection(conn);
+            }
+            configuration.setLong(UPSERT_BATCH_SIZE, batchSize);
+            return batchSize;
+        }
+        
+        private void closeConnection(final Connection connection) throws SQLException {
+            if(connection != null) {
+                connection.close();
+            }
+        }
+    }
+    
 }

http://git-wip-us.apache.org/repos/asf/incubator-phoenix/blob/a9b8eb9b/phoenix-pig/src/main/java/org/apache/phoenix/pig/TypeUtil.java
----------------------------------------------------------------------
diff --git a/phoenix-pig/src/main/java/org/apache/phoenix/pig/TypeUtil.java b/phoenix-pig/src/main/java/org/apache/phoenix/pig/TypeUtil.java
deleted file mode 100644
index d6c1466..0000000
--- a/phoenix-pig/src/main/java/org/apache/phoenix/pig/TypeUtil.java
+++ /dev/null
@@ -1,189 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.phoenix.pig;
-
-import java.io.IOException;
-import java.sql.Date;
-import java.sql.Time;
-import java.sql.Timestamp;
-import java.sql.Types;
-
-import org.apache.pig.builtin.Utf8StorageConverter;
-import org.apache.pig.data.DataByteArray;
-import org.apache.pig.data.DataType;
-import org.joda.time.DateTime;
-
-import org.apache.phoenix.schema.PDataType;
-
-public class TypeUtil {
-	
-	private static final Utf8StorageConverter utf8Converter = new Utf8StorageConverter();
-	
-	/**
-	 * This method returns the most appropriate PDataType associated with 
-	 * the incoming Pig type. Note for Pig DataType DATETIME, returns DATE as 
-	 * inferredSqlType. 
-	 * 
-	 * This is later used to make a cast to targetPhoenixType accordingly. See
-	 * {@link #castPigTypeToPhoenix(Object, byte, PDataType)}
-	 * 
-	 * @param obj
-	 * @return PDataType
-	 */
-	public static PDataType getType(Object obj, byte type) {
-		if (obj == null) {
-			return null;
-		}
-	
-		PDataType sqlType;
-
-		switch (type) {
-		case DataType.BYTEARRAY:
-			sqlType = PDataType.VARBINARY;
-			break;
-		case DataType.CHARARRAY:
-			sqlType = PDataType.VARCHAR;
-			break;
-		case DataType.DOUBLE:
-			sqlType = PDataType.DOUBLE;
-			break;
-		case DataType.FLOAT:
-			sqlType = PDataType.FLOAT;
-			break;
-		case DataType.INTEGER:
-			sqlType = PDataType.INTEGER;
-			break;
-		case DataType.LONG:
-			sqlType = PDataType.LONG;
-			break;
-		case DataType.BOOLEAN:
-			sqlType = PDataType.BOOLEAN;
-			break;
-		case DataType.DATETIME:
-			sqlType = PDataType.DATE;
-			break;
-		default:
-			throw new RuntimeException("Unknown type " + obj.getClass().getName()
-					+ " passed to PhoenixHBaseStorage");
-		}
-
-		return sqlType;
-
-	}
-
-	/**
-	 * This method encodes a value with Phoenix data type. It begins
-	 * with checking whether an object is BINARY and makes a call to
-	 * {@link #castBytes(Object, PDataType)} to convery bytes to
-	 * targetPhoenixType
-	 * 
-	 * @param o
-	 * @param targetPhoenixType
-	 * @return Object
-	 */
-	public static Object castPigTypeToPhoenix(Object o, byte objectType, PDataType targetPhoenixType) {
-		PDataType inferredPType = getType(o, objectType);
-		
-		if(inferredPType == null) {
-			return null;
-		}
-		
-		if(inferredPType == PDataType.VARBINARY && targetPhoenixType != PDataType.VARBINARY) {
-			try {
-				o = castBytes(o, targetPhoenixType);
-				inferredPType = getType(o, DataType.findType(o));
-			} catch (IOException e) {
-				throw new RuntimeException("Error while casting bytes for object " +o);
-			}
-		}
-
-		if(inferredPType == PDataType.DATE) {
-			int inferredSqlType = targetPhoenixType.getSqlType();
-
-			if(inferredSqlType == Types.DATE) {
-				return new Date(((DateTime)o).getMillis());
-			} 
-			if(inferredSqlType == Types.TIME) {
-				return new Time(((DateTime)o).getMillis());
-			}
-			if(inferredSqlType == Types.TIMESTAMP) {
-				return new Timestamp(((DateTime)o).getMillis());
-			}
-		}
-		
-		if (targetPhoenixType == inferredPType || inferredPType.isCoercibleTo(targetPhoenixType)) {
-			return inferredPType.toObject(o, targetPhoenixType);
-		}
-		
-		throw new RuntimeException(o.getClass().getName()
-				+ " cannot be coerced to "+targetPhoenixType.toString());
-	}
-	
-	/**
-	 * This method converts bytes to the target type required
-	 * for Phoenix. It uses {@link Utf8StorageConverter} for
-	 * the conversion.
-	 * 
-	 * @param o
-	 * @param targetPhoenixType
-	 * @return Object
-	 * @throws IOException
-	 */
-    public static Object castBytes(Object o, PDataType targetPhoenixType) throws IOException {
-        byte[] bytes = ((DataByteArray)o).get();
-        
-        switch(targetPhoenixType) {
-        case CHAR:
-        case VARCHAR:
-            return utf8Converter.bytesToCharArray(bytes);
-        case UNSIGNED_SMALLINT:
-        case SMALLINT:
-            return utf8Converter.bytesToInteger(bytes).shortValue();
-        case UNSIGNED_TINYINT:
-        case TINYINT:
-            return utf8Converter.bytesToInteger(bytes).byteValue();
-        case UNSIGNED_INT:
-        case INTEGER:
-            return utf8Converter.bytesToInteger(bytes);
-        case BOOLEAN:
-            return utf8Converter.bytesToBoolean(bytes);
-        case DECIMAL:
-            return utf8Converter.bytesToBigDecimal(bytes);
-        case FLOAT:
-        case UNSIGNED_FLOAT:
-            return utf8Converter.bytesToFloat(bytes);
-        case DOUBLE:
-        case UNSIGNED_DOUBLE:
-            return utf8Converter.bytesToDouble(bytes);
-        case UNSIGNED_LONG:
-        case LONG:
-            return utf8Converter.bytesToLong(bytes);
-        case TIME:
-        case TIMESTAMP:
-        case DATE:
-        case UNSIGNED_TIME:
-        case UNSIGNED_TIMESTAMP:
-        case UNSIGNED_DATE:
-        	return utf8Converter.bytesToDateTime(bytes);
-        default:
-        	return o;
-        }        
-    }
-
-}

http://git-wip-us.apache.org/repos/asf/incubator-phoenix/blob/a9b8eb9b/phoenix-pig/src/main/java/org/apache/phoenix/pig/hadoop/PhoenixInputFormat.java
----------------------------------------------------------------------
diff --git a/phoenix-pig/src/main/java/org/apache/phoenix/pig/hadoop/PhoenixInputFormat.java b/phoenix-pig/src/main/java/org/apache/phoenix/pig/hadoop/PhoenixInputFormat.java
new file mode 100644
index 0000000..ebb9023
--- /dev/null
+++ b/phoenix-pig/src/main/java/org/apache/phoenix/pig/hadoop/PhoenixInputFormat.java
@@ -0,0 +1,167 @@
+/*
+ * Copyright 2010 The Apache Software Foundation
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ *distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you maynot use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicablelaw or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.phoenix.pig.hadoop;
+
+import java.io.IOException;
+import java.sql.Connection;
+import java.sql.SQLException;
+import java.sql.Statement;
+import java.util.List;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.client.Scan;
+import org.apache.hadoop.io.NullWritable;
+import org.apache.hadoop.mapreduce.InputFormat;
+import org.apache.hadoop.mapreduce.InputSplit;
+import org.apache.hadoop.mapreduce.JobContext;
+import org.apache.hadoop.mapreduce.RecordReader;
+import org.apache.hadoop.mapreduce.TaskAttemptContext;
+import org.apache.phoenix.compile.QueryPlan;
+import org.apache.phoenix.compile.StatementContext;
+import org.apache.phoenix.iterate.ResultIterator;
+import org.apache.phoenix.jdbc.PhoenixStatement;
+import org.apache.phoenix.pig.PhoenixPigConfiguration;
+import org.apache.phoenix.query.KeyRange;
+import org.apache.phoenix.schema.SaltingUtil;
+import org.apache.phoenix.schema.TableRef;
+import org.apache.phoenix.util.ScanUtil;
+
+import com.google.common.base.Preconditions;
+import com.google.common.base.Throwables;
+import com.google.common.collect.Lists;
+
+/**
+ * The InputFormat class for generating the splits and creating the record readers.
+ * 
+ */
+public final class PhoenixInputFormat extends InputFormat<NullWritable, PhoenixRecord> {
+
+    private static final Log LOG = LogFactory.getLog(PhoenixInputFormat.class);
+    private PhoenixPigConfiguration phoenixConfiguration;
+    private Connection connection;
+    private QueryPlan  queryPlan;
+    
+    /**
+     * instantiated by framework
+     */
+    public PhoenixInputFormat() {
+    }
+
+    @Override
+    public RecordReader<NullWritable, PhoenixRecord> createRecordReader(InputSplit split, TaskAttemptContext context)
+            throws IOException, InterruptedException {       
+        setConf(context.getConfiguration());
+        final QueryPlan queryPlan = getQueryPlan(context);
+        try {
+            return new PhoenixRecordReader(phoenixConfiguration,queryPlan);    
+        }catch(SQLException sqle) {
+            throw new IOException(sqle);
+        }
+    }
+    
+   
+
+    @Override
+    public List<InputSplit> getSplits(JobContext context) throws IOException, InterruptedException {        
+        List<InputSplit> splits = null;
+        try{
+            setConf(context.getConfiguration());
+            final QueryPlan queryPlan = getQueryPlan(context);
+            @SuppressWarnings("unused")
+            final ResultIterator iterator = queryPlan.iterator();
+            final List<KeyRange> allSplits = queryPlan.getSplits();
+            splits = generateSplits(queryPlan,allSplits);
+        } catch(SQLException sqlE) {
+            LOG.error(String.format(" Error [%s] in getSplits of PhoenixInputFormat ", sqlE.getMessage()));
+            Throwables.propagate(sqlE);
+        }
+        return splits;
+    }
+
+    private List<InputSplit> generateSplits(final QueryPlan qplan, final List<KeyRange> splits) throws IOException {
+        Preconditions.checkNotNull(qplan);
+        Preconditions.checkNotNull(splits);
+        final List<InputSplit> psplits = Lists.newArrayListWithExpectedSize(splits.size());
+        final StatementContext context = qplan.getContext();
+        final TableRef tableRef = qplan.getTableRef();
+        for (KeyRange split : splits) {
+            final Scan splitScan = new Scan(context.getScan());
+            if (tableRef.getTable().getBucketNum() != null) {
+                KeyRange minMaxRange = context.getMinMaxRange();
+                if (minMaxRange != null) {
+                    minMaxRange = SaltingUtil.addSaltByte(split.getLowerRange(), minMaxRange);
+                    split = split.intersect(minMaxRange);
+                }
+            }
+            // as the intersect code sets the actual start and stop row within the passed splitScan, we are fetching it back below.
+            if (ScanUtil.intersectScanRange(splitScan, split.getLowerRange(), split.getUpperRange(), context.getScanRanges().useSkipScanFilter())) {
+                final PhoenixInputSplit inputSplit = new PhoenixInputSplit(KeyRange.getKeyRange(splitScan.getStartRow(), splitScan.getStopRow()));
+                psplits.add(inputSplit);     
+            }
+        }
+        return psplits;
+    }
+    
+    public void setConf(Configuration configuration) {
+        this.phoenixConfiguration = new PhoenixPigConfiguration(configuration);
+    }
+
+    public PhoenixPigConfiguration getConf() {
+        return this.phoenixConfiguration;
+    }
+    
+    private Connection getConnection() {
+        try {
+            if (this.connection == null) {
+                this.connection = phoenixConfiguration.getConnection();
+           }
+        } catch (Exception e) {
+            throw new RuntimeException(e);
+        }
+        return connection;
+    }
+    
+    /**
+     * Returns the query plan associated with the select query.
+     * @param context
+     * @return
+     * @throws IOException
+     * @throws SQLException
+     */
+    private QueryPlan getQueryPlan(final JobContext context) throws IOException {
+        Preconditions.checkNotNull(context);
+        if(queryPlan == null) {
+            try{
+                final Connection connection = getConnection();
+                final String selectStatement = getConf().getSelectStatement();
+                Preconditions.checkNotNull(selectStatement);
+                final Statement statement = connection.createStatement();
+                final PhoenixStatement pstmt = statement.unwrap(PhoenixStatement.class);
+                this.queryPlan = pstmt.compileQuery(selectStatement);
+            } catch(Exception exception) {
+                LOG.error(String.format("Failed to get the query plan with error [%s]",exception.getMessage()));
+                throw new RuntimeException(exception);
+            }
+        }
+        return queryPlan;
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-phoenix/blob/a9b8eb9b/phoenix-pig/src/main/java/org/apache/phoenix/pig/hadoop/PhoenixInputSplit.java
----------------------------------------------------------------------
diff --git a/phoenix-pig/src/main/java/org/apache/phoenix/pig/hadoop/PhoenixInputSplit.java b/phoenix-pig/src/main/java/org/apache/phoenix/pig/hadoop/PhoenixInputSplit.java
new file mode 100644
index 0000000..43d69b3
--- /dev/null
+++ b/phoenix-pig/src/main/java/org/apache/phoenix/pig/hadoop/PhoenixInputSplit.java
@@ -0,0 +1,104 @@
+/*
+ * Copyright 2010 The Apache Software Foundation
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ *distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you maynot use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicablelaw or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.phoenix.pig.hadoop;
+
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+
+import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.mapreduce.InputSplit;
+import org.apache.phoenix.query.KeyRange;
+
+import com.google.common.base.Preconditions;
+
+/**
+ * 
+ * Input split class to hold the lower and upper bound range. {@link KeyRange}
+ * 
+ */
+public class PhoenixInputSplit extends InputSplit implements Writable {
+
+    private KeyRange keyRange;
+   
+    /**
+     * No Arg constructor
+     */
+    public PhoenixInputSplit() {
+    }
+    
+   /**
+    * 
+    * @param keyRange
+    */
+    public PhoenixInputSplit(final KeyRange keyRange) {
+        Preconditions.checkNotNull(keyRange);
+        this.keyRange = keyRange;
+    }
+    
+    @Override
+    public void readFields(DataInput input) throws IOException {
+        this.keyRange = new KeyRange ();
+        this.keyRange.readFields(input);
+    }
+    
+    @Override
+    public void write(DataOutput output) throws IOException {
+        Preconditions.checkNotNull(keyRange);
+        keyRange.write(output);
+    }
+
+    @Override
+    public long getLength() throws IOException, InterruptedException {
+         return 0;
+    }
+
+    @Override
+    public String[] getLocations() throws IOException, InterruptedException {
+        return new String[]{};
+    }
+
+    /**
+     * @return Returns the keyRange.
+     */
+    public KeyRange getKeyRange() {
+        return keyRange;
+    }
+
+    @Override
+    public int hashCode() {
+        final int prime = 31;
+        int result = 1;
+        result = prime * result + ((keyRange == null) ? 0 : keyRange.hashCode());
+        return result;
+    }
+
+    @Override
+    public boolean equals(Object obj) {
+        if (this == obj) { return true; }
+        if (obj == null) { return false; }
+        if (!(obj instanceof PhoenixInputSplit)) { return false; }
+        PhoenixInputSplit other = (PhoenixInputSplit)obj;
+        if (keyRange == null) {
+            if (other.keyRange != null) { return false; }
+        } else if (!keyRange.equals(other.keyRange)) { return false; }
+        return true;
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-phoenix/blob/a9b8eb9b/phoenix-pig/src/main/java/org/apache/phoenix/pig/hadoop/PhoenixRecord.java
----------------------------------------------------------------------
diff --git a/phoenix-pig/src/main/java/org/apache/phoenix/pig/hadoop/PhoenixRecord.java b/phoenix-pig/src/main/java/org/apache/phoenix/pig/hadoop/PhoenixRecord.java
index b9d03de..5063ed0 100644
--- a/phoenix-pig/src/main/java/org/apache/phoenix/pig/hadoop/PhoenixRecord.java
+++ b/phoenix-pig/src/main/java/org/apache/phoenix/pig/hadoop/PhoenixRecord.java
@@ -22,31 +22,35 @@ import java.io.DataInput;
 import java.io.DataOutput;
 import java.io.IOException;
 import java.sql.PreparedStatement;
+import java.sql.ResultSet;
 import java.sql.SQLException;
 import java.util.ArrayList;
 import java.util.List;
 
 import org.apache.hadoop.io.Writable;
+import org.apache.phoenix.pig.util.TypeUtil;
+import org.apache.phoenix.schema.PDataType;
+import org.apache.phoenix.util.ColumnInfo;
 import org.apache.pig.ResourceSchema.ResourceFieldSchema;
 import org.apache.pig.data.DataType;
 
-import org.apache.phoenix.pig.TypeUtil;
-import org.apache.phoenix.schema.PDataType;
-import org.apache.phoenix.util.ColumnInfo;
+import com.google.common.base.Preconditions;
 
 /**
  * A {@link Writable} representing a Phoenix record. This class
- * does a type mapping and sets the value accordingly in the 
- * {@link PreparedStatement}
- * 
+ * a) does a type mapping and sets the value accordingly in the {@link PreparedStatement}
+ * b) reads the column values from the {@link ResultSet}
  * 
- *
  */
 public class PhoenixRecord implements Writable {
 	
 	private final List<Object> values;
 	private final ResourceFieldSchema[] fieldSchemas;
 	
+	public PhoenixRecord() {
+	    this(null);
+	}
+	
 	public PhoenixRecord(ResourceFieldSchema[] fieldSchemas) {
 		this.values = new ArrayList<Object>();
 		this.fieldSchemas = fieldSchemas;
@@ -63,20 +67,35 @@ public class PhoenixRecord implements Writable {
 	public void write(PreparedStatement statement, List<ColumnInfo> columnMetadataList) throws SQLException {
 		for (int i = 0; i < columnMetadataList.size(); i++) {
 			Object o = values.get(i);
-			
+			ColumnInfo columnInfo = columnMetadataList.get(i);
 			byte type = (fieldSchemas == null) ? DataType.findType(o) : fieldSchemas[i].getType();
-			Object upsertValue = convertTypeSpecificValue(o, type, columnMetadataList.get(i).getSqlType());
-
-			if (upsertValue != null) {
-				statement.setObject(i + 1, upsertValue, columnMetadataList.get(i).getSqlType());
-			} else {
-				statement.setNull(i + 1, columnMetadataList.get(i).getSqlType());
-			}
+			try {
+                Object upsertValue = convertTypeSpecificValue(o, type, columnInfo.getSqlType());
+                if (upsertValue != null) {
+                    statement.setObject(i + 1, upsertValue, columnInfo.getSqlType());
+                } else {
+                    statement.setNull(i + 1, columnInfo.getSqlType());
+                }
+            } catch (RuntimeException re) {
+                throw new RuntimeException(String.format("Unable to process column %s, innerMessage=%s"
+                        ,columnInfo.toString(),re.getMessage()),re);
+                
+            }
 		}
 		
 		statement.execute();
 	}
 	
+	public void read(final ResultSet rs, final int noOfColumns) throws SQLException {
+	    Preconditions.checkNotNull(rs);
+        Preconditions.checkArgument(noOfColumns > 0, "No of arguments passed is <= 0");
+        values.clear();
+        for(int i = 1 ; i <= noOfColumns ; i++) {
+            Object obj = rs.getObject(i);
+            values.add(obj);
+        }
+	}
+	
 	public void add(Object value) {
 		values.add(value);
 	}
@@ -86,4 +105,8 @@ public class PhoenixRecord implements Writable {
 
 		return TypeUtil.castPigTypeToPhoenix(o, type, pDataType);
 	}
+
+    public List<Object> getValues() {
+        return values;
+    }
 }

http://git-wip-us.apache.org/repos/asf/incubator-phoenix/blob/a9b8eb9b/phoenix-pig/src/main/java/org/apache/phoenix/pig/hadoop/PhoenixRecordReader.java
----------------------------------------------------------------------
diff --git a/phoenix-pig/src/main/java/org/apache/phoenix/pig/hadoop/PhoenixRecordReader.java b/phoenix-pig/src/main/java/org/apache/phoenix/pig/hadoop/PhoenixRecordReader.java
new file mode 100644
index 0000000..24ad1ee
--- /dev/null
+++ b/phoenix-pig/src/main/java/org/apache/phoenix/pig/hadoop/PhoenixRecordReader.java
@@ -0,0 +1,133 @@
+/*
+ * Copyright 2010 The Apache Software Foundation
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ *distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you maynot use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicablelaw or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.phoenix.pig.hadoop;
+
+import java.io.IOException;
+import java.sql.SQLException;
+import java.util.List;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.hbase.client.Scan;
+import org.apache.hadoop.io.NullWritable;
+import org.apache.hadoop.mapreduce.InputSplit;
+import org.apache.hadoop.mapreduce.RecordReader;
+import org.apache.hadoop.mapreduce.TaskAttemptContext;
+import org.apache.phoenix.compile.QueryPlan;
+import org.apache.phoenix.iterate.ResultIterator;
+import org.apache.phoenix.iterate.TableResultIterator;
+import org.apache.phoenix.jdbc.PhoenixResultSet;
+import org.apache.phoenix.pig.PhoenixPigConfiguration;
+import org.apache.phoenix.query.KeyRange;
+import org.apache.phoenix.util.ColumnInfo;
+
+import com.google.common.base.Preconditions;
+import com.google.common.base.Throwables;
+
+/**
+ * RecordReader that process the scan and returns PhoenixRecord
+ * 
+ */
+public final class PhoenixRecordReader extends RecordReader<NullWritable,PhoenixRecord>{
+    
+    private static final Log LOG = LogFactory.getLog(PhoenixRecordReader.class);
+    private final PhoenixPigConfiguration phoenixConfiguration;
+    private final QueryPlan queryPlan;
+    private final List<ColumnInfo> columnInfos;
+    private NullWritable key =  NullWritable.get();
+    private PhoenixRecord value = null;
+    private ResultIterator resultIterator = null;
+    private PhoenixResultSet resultSet;
+    
+    public PhoenixRecordReader(final PhoenixPigConfiguration pConfiguration,final QueryPlan qPlan) throws SQLException {
+        
+        Preconditions.checkNotNull(pConfiguration);
+        Preconditions.checkNotNull(qPlan);
+        this.phoenixConfiguration = pConfiguration;
+        this.queryPlan = qPlan;
+        this.columnInfos = phoenixConfiguration.getSelectColumnMetadataList();
+     }
+
+    @Override
+    public void close() throws IOException {
+       if(resultIterator != null) {
+           try {
+               resultIterator.close();
+        } catch (SQLException e) {
+           LOG.error(" Error closing resultset.");
+        }
+       }
+    }
+
+    @Override
+    public NullWritable getCurrentKey() throws IOException, InterruptedException {
+        return key;
+    }
+
+    @Override
+    public PhoenixRecord getCurrentValue() throws IOException, InterruptedException {
+        return value;
+    }
+
+    @Override
+    public float getProgress() throws IOException, InterruptedException {
+        return 0;
+    }
+
+    @Override
+    public void initialize(InputSplit split, TaskAttemptContext context) throws IOException, InterruptedException {
+        final PhoenixInputSplit pSplit = (PhoenixInputSplit)split;
+        final KeyRange keyRange = pSplit.getKeyRange();
+        final Scan splitScan = queryPlan.getContext().getScan();
+        final Scan scan = new Scan(splitScan);
+        scan.setStartRow(keyRange.getLowerRange());
+        scan.setStopRow(keyRange.getUpperRange());
+         try {
+            this.resultIterator = new TableResultIterator(queryPlan.getContext(), queryPlan.getTableRef(),scan);
+            this.resultSet = new PhoenixResultSet(this.resultIterator, queryPlan.getProjector(),queryPlan.getContext().getStatement());
+        } catch (SQLException e) {
+            LOG.error(String.format(" Error [%s] initializing PhoenixRecordReader. ",e.getMessage()));
+            Throwables.propagate(e);
+        }
+        
+   }
+
+    @Override
+    public boolean nextKeyValue() throws IOException, InterruptedException {
+        if (key == null) {
+            key = NullWritable.get();
+        }
+        if (value == null) {
+            value =  new PhoenixRecord();
+        }
+        Preconditions.checkNotNull(this.resultSet);
+        try {
+            if(!resultSet.next()) {
+                return false;
+            }
+            value.read(resultSet,columnInfos.size());
+            return true;
+        } catch (SQLException e) {
+            LOG.error(String.format(" Error [%s] occurred while iterating over the resultset. ",e.getMessage()));
+            Throwables.propagate(e);
+        }
+        return false;
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-phoenix/blob/a9b8eb9b/phoenix-pig/src/main/java/org/apache/phoenix/pig/util/ColumnInfoToStringEncoderDecoder.java
----------------------------------------------------------------------
diff --git a/phoenix-pig/src/main/java/org/apache/phoenix/pig/util/ColumnInfoToStringEncoderDecoder.java b/phoenix-pig/src/main/java/org/apache/phoenix/pig/util/ColumnInfoToStringEncoderDecoder.java
new file mode 100644
index 0000000..3ea9b5b
--- /dev/null
+++ b/phoenix-pig/src/main/java/org/apache/phoenix/pig/util/ColumnInfoToStringEncoderDecoder.java
@@ -0,0 +1,69 @@
+/*
+ * Copyright 2010 The Apache Software Foundation
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ *distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you maynot use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicablelaw or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.phoenix.pig.util;
+
+import java.util.List;
+
+import org.apache.phoenix.util.ColumnInfo;
+
+import com.google.common.base.Function;
+import com.google.common.base.Joiner;
+import com.google.common.base.Preconditions;
+import com.google.common.base.Splitter;
+import com.google.common.collect.Iterables;
+import com.google.common.collect.Lists;
+
+/**
+ * 
+ * A codec to transform a {@link ColumnInfo} to a {@link String} and decode back.
+ *
+ */
+public final class ColumnInfoToStringEncoderDecoder {
+
+    private static final String COLUMN_INFO_DELIMITER = "|";
+    
+    private ColumnInfoToStringEncoderDecoder() {
+        
+    }
+    
+    public static String encode(List<ColumnInfo> columnInfos) {
+        Preconditions.checkNotNull(columnInfos);
+        return Joiner.on(COLUMN_INFO_DELIMITER).
+                        skipNulls().join(columnInfos);
+    }
+    
+    public static List<ColumnInfo> decode(final String columnInfoStr) {
+        Preconditions.checkNotNull(columnInfoStr);
+        List<ColumnInfo> columnInfos = Lists.newArrayList(
+                                Iterables.transform(
+                                        Splitter.on(COLUMN_INFO_DELIMITER).omitEmptyStrings().split(columnInfoStr),
+                                        new Function<String, ColumnInfo>() {
+                                            @Override
+                                            public ColumnInfo apply(String colInfo) {
+                                                if (colInfo.isEmpty()) {
+                                                      return null;
+                                                }
+                                                return ColumnInfo.fromString(colInfo);
+                                            }
+                                        }));
+        return columnInfos;
+        
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-phoenix/blob/a9b8eb9b/phoenix-pig/src/main/java/org/apache/phoenix/pig/util/PhoenixPigSchemaUtil.java
----------------------------------------------------------------------
diff --git a/phoenix-pig/src/main/java/org/apache/phoenix/pig/util/PhoenixPigSchemaUtil.java b/phoenix-pig/src/main/java/org/apache/phoenix/pig/util/PhoenixPigSchemaUtil.java
new file mode 100644
index 0000000..695b506
--- /dev/null
+++ b/phoenix-pig/src/main/java/org/apache/phoenix/pig/util/PhoenixPigSchemaUtil.java
@@ -0,0 +1,70 @@
+/*
+ * Copyright 2010 The Apache Software Foundation
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ *distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you maynot use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicablelaw or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.phoenix.pig.util;
+
+import java.io.IOException;
+import java.sql.SQLException;
+import java.util.List;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.phoenix.pig.PhoenixPigConfiguration;
+import org.apache.phoenix.schema.PDataType;
+import org.apache.phoenix.util.ColumnInfo;
+import org.apache.pig.ResourceSchema;
+import org.apache.pig.ResourceSchema.ResourceFieldSchema;
+
+/**
+ * 
+ * Utility to generate the ResourceSchema from the list of {@link ColumnInfo}
+ *
+ */
+public final class PhoenixPigSchemaUtil {
+
+    private static final Log LOG = LogFactory.getLog(PhoenixPigSchemaUtil.class);
+    
+    private PhoenixPigSchemaUtil() {
+    }
+    
+    public static ResourceSchema getResourceSchema(final PhoenixPigConfiguration phoenixConfiguration) throws IOException {
+        
+        final ResourceSchema schema = new ResourceSchema();
+        try {
+            final List<ColumnInfo> columns = phoenixConfiguration.getSelectColumnMetadataList();
+            ResourceFieldSchema fields[] = new ResourceFieldSchema[columns.size()];
+            int i = 0;
+            for(ColumnInfo cinfo : columns) {
+                int sqlType = cinfo.getSqlType();
+                PDataType phoenixDataType = PDataType.fromTypeId(sqlType);
+                byte pigType = TypeUtil.getPigDataTypeForPhoenixType(phoenixDataType);
+                ResourceFieldSchema field = new ResourceFieldSchema();
+                field.setType(pigType).setName(cinfo.getDisplayName());
+                fields[i++] = field;
+            }
+            schema.setFields(fields);    
+        } catch(SQLException sqle) {
+            LOG.error(String.format("Error: SQLException [%s] ",sqle.getMessage()));
+            throw new IOException(sqle);
+        }
+        
+        return schema;
+        
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-phoenix/blob/a9b8eb9b/phoenix-pig/src/main/java/org/apache/phoenix/pig/util/QuerySchemaParserFunction.java
----------------------------------------------------------------------
diff --git a/phoenix-pig/src/main/java/org/apache/phoenix/pig/util/QuerySchemaParserFunction.java b/phoenix-pig/src/main/java/org/apache/phoenix/pig/util/QuerySchemaParserFunction.java
new file mode 100644
index 0000000..1b3a90a
--- /dev/null
+++ b/phoenix-pig/src/main/java/org/apache/phoenix/pig/util/QuerySchemaParserFunction.java
@@ -0,0 +1,121 @@
+/*
+ * Copyright 2010 The Apache Software Foundation
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ *distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you maynot use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicablelaw or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.phoenix.pig.util;
+
+import java.sql.Connection;
+import java.sql.SQLException;
+import java.sql.Statement;
+import java.util.List;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.hbase.util.Pair;
+import org.apache.phoenix.compile.ColumnProjector;
+import org.apache.phoenix.compile.QueryPlan;
+import org.apache.phoenix.jdbc.PhoenixStatement;
+import org.apache.phoenix.pig.PhoenixPigConfiguration;
+
+import com.google.common.base.Function;
+import com.google.common.base.Joiner;
+import com.google.common.base.Preconditions;
+import com.google.common.base.Throwables;
+import com.google.common.collect.Lists;
+
+/**
+ * 
+ *  A function to parse the select query passed to LOAD into a Pair of <table Name, List<columns>
+ *
+ */
+public class QuerySchemaParserFunction implements Function<String,Pair<String,String>> {
+
+    private static final Log LOG = LogFactory.getLog(QuerySchemaParserFunction.class);
+    private PhoenixPigConfiguration phoenixConfiguration;
+    
+    public QuerySchemaParserFunction(PhoenixPigConfiguration phoenixConfiguration) {
+        Preconditions.checkNotNull(phoenixConfiguration);
+        this.phoenixConfiguration = phoenixConfiguration;
+    }
+    
+    @Override
+    public Pair<String, String> apply(final String selectStatement) {
+        Preconditions.checkNotNull(selectStatement);
+        Preconditions.checkArgument(!selectStatement.isEmpty(), "Select Query is empty!!");
+        Preconditions.checkNotNull(this.phoenixConfiguration);
+        Connection connection = null;
+        try {
+            connection = this.phoenixConfiguration.getConnection();
+            final Statement  statement = connection.createStatement();
+            final PhoenixStatement pstmt = statement.unwrap(PhoenixStatement.class);
+            final QueryPlan queryPlan = pstmt.compileQuery(selectStatement);
+            isValidStatement(queryPlan);
+            final String tableName = queryPlan.getTableRef().getTable().getName().getString();
+            final List<? extends ColumnProjector> projectedColumns = queryPlan.getProjector().getColumnProjectors();
+            final List<String> columns = Lists.transform(projectedColumns,
+                                                            new Function<ColumnProjector,String>() {
+                                                                @Override
+                                                                public String apply(ColumnProjector column) {
+                                                                    return column.getName();
+                                                                }
+                                                            });
+            final String columnsAsStr = Joiner.on(",").join(columns);
+            return new Pair<String, String>(tableName, columnsAsStr);
+        } catch (SQLException e) {
+            LOG.error(String.format(" Error [%s] parsing SELECT query [%s] ",e.getMessage(),selectStatement));
+            Throwables.propagate(e);
+        } finally {
+            if(connection != null) {
+                try {
+                    connection.close();
+                } catch(SQLException sqle) {
+                    Throwables.propagate(sqle);
+                }
+            }
+        }
+        return null;
+    }
+    
+    /**
+     * The method validates the statement passed to the query plan. List of conditions are
+     * <ol>
+     *   <li>Is a SELECT statement</li>
+     *   <li>doesn't contain ORDER BY expression</li>
+     *   <li>doesn't contain LIMIT</li>
+     *   <li>doesn't contain GROUP BY expression</li>
+     *   <li>doesn't contain DISTINCT</li>
+     *   <li>doesn't contain AGGREGATE functions</li>
+     * </ol>  
+     * @param queryPlan
+     * @return
+     */
+    private boolean isValidStatement(final QueryPlan queryPlan) {
+        if(queryPlan.getStatement().getOperation() != PhoenixStatement.Operation.QUERY) {
+            throw new IllegalArgumentException("Query passed isn't a SELECT statement");
+        }
+        if(!queryPlan.getOrderBy().getOrderByExpressions().isEmpty() 
+                || queryPlan.getLimit() != null 
+                || (queryPlan.getGroupBy() != null && !queryPlan.getGroupBy().isEmpty()) 
+                || queryPlan.getStatement().isDistinct()
+                || queryPlan.getStatement().isAggregate()) {
+            throw new IllegalArgumentException("SELECT statement shouldn't contain DISTINCT or ORDER BY or LIMIT or GROUP BY expressions");
+        }
+        return true;
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-phoenix/blob/a9b8eb9b/phoenix-pig/src/main/java/org/apache/phoenix/pig/util/TableSchemaParserFunction.java
----------------------------------------------------------------------
diff --git a/phoenix-pig/src/main/java/org/apache/phoenix/pig/util/TableSchemaParserFunction.java b/phoenix-pig/src/main/java/org/apache/phoenix/pig/util/TableSchemaParserFunction.java
new file mode 100644
index 0000000..5bca30e
--- /dev/null
+++ b/phoenix-pig/src/main/java/org/apache/phoenix/pig/util/TableSchemaParserFunction.java
@@ -0,0 +1,54 @@
+/*
+ * Copyright 2010 The Apache Software Foundation
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ *distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you maynot use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicablelaw or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.phoenix.pig.util;
+
+
+
+import org.apache.hadoop.hbase.util.Pair;
+
+import com.google.common.base.Function;
+import com.google.common.base.Preconditions;
+import com.google.common.base.Splitter;
+import com.google.common.collect.Iterables;
+
+/**
+ * 
+ * A function to parse the table schema passed to LOAD/STORE into a Pair of <table Name, columns>
+ *
+ */
+public final class TableSchemaParserFunction implements Function<String,Pair<String,String>> {
+
+    private static final char TABLE_COLUMN_DELIMITER    = '/';
+    
+    @Override
+    public Pair<String, String> apply(final String tableSchema) {
+        Preconditions.checkNotNull(tableSchema);
+        Preconditions.checkArgument(!tableSchema.isEmpty(), "HBase Table name is empty!!");
+        
+        final String  tokens[] = Iterables.toArray(Splitter.on(TABLE_COLUMN_DELIMITER).
+                                    trimResults().omitEmptyStrings().split(tableSchema) , String.class); 
+        final String tableName = tokens[0];
+        String columns = null;
+        if(tokens.length > 1) {
+            columns = tokens[1];    
+        }
+        return new Pair<String, String>(tableName, columns);
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-phoenix/blob/a9b8eb9b/phoenix-pig/src/main/java/org/apache/phoenix/pig/util/TypeUtil.java
----------------------------------------------------------------------
diff --git a/phoenix-pig/src/main/java/org/apache/phoenix/pig/util/TypeUtil.java b/phoenix-pig/src/main/java/org/apache/phoenix/pig/util/TypeUtil.java
new file mode 100644
index 0000000..f3cacfd
--- /dev/null
+++ b/phoenix-pig/src/main/java/org/apache/phoenix/pig/util/TypeUtil.java
@@ -0,0 +1,312 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.phoenix.pig.util;
+
+import java.io.IOException;
+import java.sql.Date;
+import java.sql.Time;
+import java.sql.Timestamp;
+import java.sql.Types;
+import java.util.List;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.phoenix.pig.hadoop.PhoenixRecord;
+import org.apache.phoenix.schema.PDataType;
+import org.apache.pig.PigException;
+import org.apache.pig.ResourceSchema.ResourceFieldSchema;
+import org.apache.pig.builtin.Utf8StorageConverter;
+import org.apache.pig.data.DataByteArray;
+import org.apache.pig.data.DataType;
+import org.apache.pig.data.Tuple;
+import org.apache.pig.data.TupleFactory;
+import org.joda.time.DateTime;
+
+import com.google.common.base.Preconditions;
+import com.google.common.collect.ImmutableMap;
+import com.google.common.collect.ImmutableMap.Builder;
+
+public final class TypeUtil {
+	
+    private static final Log LOG = LogFactory.getLog(TypeUtil.class);
+	private static final Utf8StorageConverter utf8Converter = new Utf8StorageConverter();
+	private static final ImmutableMap<PDataType,Byte> phoenixTypeToPigDataType = init();	
+	
+	private TypeUtil(){
+	}
+	
+	/**
+	 * A map of Phoenix to Pig data types.
+	 * @return
+	 */
+	private static ImmutableMap<PDataType, Byte> init() {
+        final ImmutableMap.Builder<PDataType,Byte> builder = new Builder<PDataType,Byte> ();
+        builder.put(PDataType.LONG,DataType.LONG);
+        builder.put(PDataType.VARBINARY,DataType.BYTEARRAY);
+        builder.put(PDataType.CHAR,DataType.CHARARRAY);
+        builder.put(PDataType.VARCHAR,DataType.CHARARRAY);
+        builder.put(PDataType.DOUBLE,DataType.DOUBLE);
+        builder.put(PDataType.FLOAT,DataType.FLOAT);
+        builder.put(PDataType.INTEGER,DataType.INTEGER);
+        builder.put(PDataType.TINYINT,DataType.INTEGER);
+        builder.put(PDataType.SMALLINT,DataType.INTEGER);
+        builder.put(PDataType.DECIMAL,DataType.BIGDECIMAL);
+        builder.put(PDataType.TIME,DataType.DATETIME);
+        builder.put(PDataType.TIMESTAMP,DataType.DATETIME);
+        builder.put(PDataType.BOOLEAN,DataType.BOOLEAN);
+        builder.put(PDataType.DATE,DataType.DATETIME);
+        builder.put(PDataType.UNSIGNED_DATE,DataType.DATETIME);
+        builder.put(PDataType.UNSIGNED_DOUBLE,DataType.DOUBLE);
+        builder.put(PDataType.UNSIGNED_FLOAT,DataType.FLOAT);
+        builder.put(PDataType.UNSIGNED_INT,DataType.INTEGER);
+        builder.put(PDataType.UNSIGNED_LONG,DataType.LONG);
+        builder.put(PDataType.UNSIGNED_SMALLINT,DataType.INTEGER);
+        builder.put(PDataType.UNSIGNED_TIME,DataType.DATETIME);
+        builder.put(PDataType.UNSIGNED_TIMESTAMP,DataType.DATETIME);
+        builder.put(PDataType.UNSIGNED_TINYINT,DataType.INTEGER);
+        return builder.build();
+    }
+    /**
+	 * This method returns the most appropriate PDataType associated with 
+	 * the incoming Pig type. Note for Pig DataType DATETIME, returns DATE as 
+	 * inferredSqlType. 
+	 * 
+	 * This is later used to make a cast to targetPhoenixType accordingly. See
+	 * {@link #castPigTypeToPhoenix(Object, byte, PDataType)}
+	 * 
+	 * @param obj
+	 * @return PDataType
+	 */
+	public static PDataType getType(Object obj, byte type) {
+		if (obj == null) {
+			return null;
+		}
+	
+		PDataType sqlType;
+
+		switch (type) {
+		case DataType.BYTEARRAY:
+			sqlType = PDataType.VARBINARY;
+			break;
+		case DataType.CHARARRAY:
+			sqlType = PDataType.VARCHAR;
+			break;
+		case DataType.DOUBLE:
+			sqlType = PDataType.DOUBLE;
+			break;
+		case DataType.FLOAT:
+			sqlType = PDataType.FLOAT;
+			break;
+		case DataType.INTEGER:
+			sqlType = PDataType.INTEGER;
+			break;
+		case DataType.LONG:
+			sqlType = PDataType.LONG;
+			break;
+		case DataType.BOOLEAN:
+			sqlType = PDataType.BOOLEAN;
+			break;
+		case DataType.DATETIME:
+			sqlType = PDataType.DATE;
+			break;
+		default:
+			throw new RuntimeException("Unknown type " + obj.getClass().getName()
+					+ " passed to PhoenixHBaseStorage");
+		}
+
+		return sqlType;
+
+	}
+
+	/**
+	 * This method encodes a value with Phoenix data type. It begins
+	 * with checking whether an object is BINARY and makes a call to
+	 * {@link #castBytes(Object, PDataType)} to convery bytes to
+	 * targetPhoenixType
+	 * 
+	 * @param o
+	 * @param targetPhoenixType
+	 * @return Object
+	 */
+	public static Object castPigTypeToPhoenix(Object o, byte objectType, PDataType targetPhoenixType) {
+		PDataType inferredPType = getType(o, objectType);
+		
+		if(inferredPType == null) {
+			return null;
+		}
+		
+		if(inferredPType == PDataType.VARBINARY && targetPhoenixType != PDataType.VARBINARY) {
+			try {
+				o = castBytes(o, targetPhoenixType);
+				inferredPType = getType(o, DataType.findType(o));
+			} catch (IOException e) {
+				throw new RuntimeException("Error while casting bytes for object " +o);
+			}
+		}
+
+		if(inferredPType == PDataType.DATE) {
+			int inferredSqlType = targetPhoenixType.getSqlType();
+
+			if(inferredSqlType == Types.DATE) {
+				return new Date(((DateTime)o).getMillis());
+			} 
+			if(inferredSqlType == Types.TIME) {
+				return new Time(((DateTime)o).getMillis());
+			}
+			if(inferredSqlType == Types.TIMESTAMP) {
+				return new Timestamp(((DateTime)o).getMillis());
+			}
+		}
+		
+		if (targetPhoenixType == inferredPType || inferredPType.isCoercibleTo(targetPhoenixType)) {
+			return inferredPType.toObject(o, targetPhoenixType);
+		}
+		
+		throw new RuntimeException(o.getClass().getName()
+				+ " cannot be coerced to "+targetPhoenixType.toString());
+	}
+	
+	/**
+	 * This method converts bytes to the target type required
+	 * for Phoenix. It uses {@link Utf8StorageConverter} for
+	 * the conversion.
+	 * 
+	 * @param o
+	 * @param targetPhoenixType
+	 * @return Object
+	 * @throws IOException
+	 */
+    public static Object castBytes(Object o, PDataType targetPhoenixType) throws IOException {
+        byte[] bytes = ((DataByteArray)o).get();
+        
+        switch(targetPhoenixType) {
+        case CHAR:
+        case VARCHAR:
+            return utf8Converter.bytesToCharArray(bytes);
+        case UNSIGNED_SMALLINT:
+        case SMALLINT:
+            return utf8Converter.bytesToInteger(bytes).shortValue();
+        case UNSIGNED_TINYINT:
+        case TINYINT:
+            return utf8Converter.bytesToInteger(bytes).byteValue();
+        case UNSIGNED_INT:
+        case INTEGER:
+            return utf8Converter.bytesToInteger(bytes);
+        case BOOLEAN:
+            return utf8Converter.bytesToBoolean(bytes);
+        case DECIMAL:
+            return utf8Converter.bytesToBigDecimal(bytes);
+        case FLOAT:
+        case UNSIGNED_FLOAT:
+            return utf8Converter.bytesToFloat(bytes);
+        case DOUBLE:
+        case UNSIGNED_DOUBLE:
+            return utf8Converter.bytesToDouble(bytes);
+        case UNSIGNED_LONG:
+        case LONG:
+            return utf8Converter.bytesToLong(bytes);
+        case TIME:
+        case TIMESTAMP:
+        case DATE:
+        case UNSIGNED_TIME:
+        case UNSIGNED_TIMESTAMP:
+        case UNSIGNED_DATE:
+        	return utf8Converter.bytesToDateTime(bytes);
+        default:
+        	return o;
+        }        
+    }
+    
+    /**
+     * Transforms the PhoenixRecord to Pig {@link Tuple}.
+     * @param record
+     * @param projectedColumns
+     * @return
+     * @throws IOException
+     */
+    public static Tuple transformToTuple(final PhoenixRecord record, final ResourceFieldSchema[] projectedColumns) throws IOException {
+        
+        List<Object> columnValues = record.getValues();
+        if(columnValues == null || columnValues.size() == 0 || projectedColumns == null || projectedColumns.length != columnValues.size()) {
+            return null;
+        }
+        int columns = columnValues.size();
+        Tuple tuple = TupleFactory.getInstance().newTuple(columns);
+        try {
+            for(int i = 0 ; i < columns ; i++) {
+                final ResourceFieldSchema fieldSchema = projectedColumns[i];
+                Object object = columnValues.get(i);
+                if (object == null) {
+                    tuple.set(i, null);
+                    continue;
+                }
+                
+                switch(fieldSchema.getType()) {
+                    case DataType.BYTEARRAY:
+                        byte[] bytes = PDataType.fromTypeId(PDataType.BINARY.getSqlType()).toBytes(object);
+                        tuple.set(i,new DataByteArray(bytes,0,bytes.length));
+                        break;
+                    case DataType.CHARARRAY:
+                        tuple.set(i,DataType.toString(object));
+                        break;
+                    case DataType.DOUBLE:
+                        tuple.set(i,DataType.toDouble(object));
+                        break;
+                    case DataType.FLOAT:
+                        tuple.set(i,DataType.toFloat(object));
+                        break;
+                    case DataType.INTEGER:
+                        tuple.set(i,DataType.toInteger(object));
+                        break;
+                    case DataType.LONG:
+                        tuple.set(i,DataType.toLong(object));
+                        break;
+                    case DataType.BOOLEAN:
+                        tuple.set(i,DataType.toBoolean(object));
+                        break;
+                    case DataType.DATETIME:
+                        tuple.set(i,DataType.toDateTime(object));
+                        break;
+                    default:
+                        throw new RuntimeException(String.format(" Not supported [%s] pig type" , fieldSchema));
+                }
+            }
+        } catch( Exception ex) {
+            final String errorMsg = String.format(" Error transforming PhoenixRecord to Tuple [%s] ", ex.getMessage());
+            LOG.error(errorMsg);
+            throw new PigException(errorMsg);
+        }
+          return tuple;
+    }
+    
+    /**
+     * Returns the mapping pig data type for a given phoenix data type.
+     * @param phoenixDataType
+     * @return
+     */
+    public static Byte getPigDataTypeForPhoenixType(final PDataType phoenixDataType) {
+        Preconditions.checkNotNull(phoenixDataType);
+        final Byte pigDataType = phoenixTypeToPigDataType.get(phoenixDataType);
+        if(LOG.isDebugEnabled()) {
+            LOG.debug(String.format(" For PhoenixDataType [%s] , pigDataType is [%s] " , phoenixDataType.getSqlTypeName() , DataType.findTypeName(pigDataType)));    
+        }
+        return pigDataType;
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-phoenix/blob/a9b8eb9b/phoenix-pig/src/test/java/org/apache/phoenix/pig/PhoenixHBaseStorageTest.java
----------------------------------------------------------------------
diff --git a/phoenix-pig/src/test/java/org/apache/phoenix/pig/PhoenixHBaseStorageTest.java b/phoenix-pig/src/test/java/org/apache/phoenix/pig/PhoenixHBaseStorageTest.java
deleted file mode 100644
index 1360774..0000000
--- a/phoenix-pig/src/test/java/org/apache/phoenix/pig/PhoenixHBaseStorageTest.java
+++ /dev/null
@@ -1,136 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.phoenix.pig;
-
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertTrue;
-
-import java.sql.Connection;
-import java.sql.DriverManager;
-import java.sql.ResultSet;
-import java.sql.Statement;
-import java.util.Collection;
-
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.hbase.HBaseTestingUtility;
-import org.apache.phoenix.jdbc.PhoenixDriver;
-import org.apache.phoenix.pig.PhoenixHBaseStorage;
-import org.apache.phoenix.util.ConfigUtil;
-import org.apache.phoenix.util.PhoenixRuntime;
-import org.apache.pig.ExecType;
-import org.apache.pig.PigServer;
-import org.apache.pig.backend.executionengine.ExecJob.JOB_STATUS;
-import org.apache.pig.builtin.mock.Storage;
-import org.apache.pig.builtin.mock.Storage.Data;
-import org.apache.pig.data.Tuple;
-import org.apache.pig.data.TupleFactory;
-import org.junit.AfterClass;
-import org.junit.BeforeClass;
-import org.junit.Test;
-
-import com.google.common.collect.Lists;
-
-/**
- * Tests for {@link PhoenixHBaseStorage}
- * 
- */
-public class PhoenixHBaseStorageTest {
-    private static TupleFactory tupleFactory;
-    private static HBaseTestingUtility hbaseTestUtil;
-    private static String zkQuorum;
-    private static Connection conn;
-    private static PigServer pigServer;
-
-    @BeforeClass
-    public static void setUp() throws Exception {
-        hbaseTestUtil = new HBaseTestingUtility();
-        Configuration conf = hbaseTestUtil.getConfiguration();
-        ConfigUtil.setReplicationConfigIfAbsent(conf);
-        hbaseTestUtil.startMiniCluster();
-
-        Class.forName(PhoenixDriver.class.getName());
-        zkQuorum = "localhost:" + hbaseTestUtil.getZkCluster().getClientPort();
-        conn = DriverManager.getConnection(PhoenixRuntime.JDBC_PROTOCOL
-                + PhoenixRuntime.JDBC_PROTOCOL_SEPARATOR + zkQuorum);
-
-        // Pig variables
-        pigServer = new PigServer(ExecType.LOCAL);
-        tupleFactory = TupleFactory.getInstance();
-    }
-
-    @AfterClass
-    public static void tearDownAfterClass() throws Exception {
-        conn.close();
-        PhoenixDriver.INSTANCE.close();
-        hbaseTestUtil.shutdownMiniCluster();
-        pigServer.shutdown();
-    }
-
-    /**
-     * Basic test - writes data to a Phoenix table and compares the data written
-     * to expected
-     * 
-     * @throws Exception
-     */
-    @Test
-    public void testStorer() throws Exception {
-        final String tableName = "TABLE1";
-        final Statement stmt = conn.createStatement();
-
-        stmt.execute("CREATE TABLE " + tableName
-                + " (ID INTEGER NOT NULL PRIMARY KEY, NAME VARCHAR)");
-
-        final Data data = Storage.resetData(pigServer);
-        final Collection<Tuple> list = Lists.newArrayList();
-
-        // Create input dataset
-        int rows = 100;
-        for (int i = 0; i < rows; i++) {
-            Tuple t = tupleFactory.newTuple();
-            t.append(i);
-            t.append("a" + i);
-            list.add(t);
-        }
-        data.set("in", "id:int, name:chararray", list);
-
-        pigServer.setBatchOn();
-        pigServer.registerQuery("A = LOAD 'in' USING mock.Storage();");
-
-        pigServer.registerQuery("Store A into 'hbase://" + tableName
-                + "' using " + PhoenixHBaseStorage.class.getName() + "('"
-                + zkQuorum + "', '-batchSize 1000');");
-
-        // Now run the Pig script
-        if (pigServer.executeBatch().get(0).getStatus() != JOB_STATUS.COMPLETED) {
-            throw new RuntimeException("Job failed", pigServer.executeBatch()
-                    .get(0).getException());
-        }
-
-        // Compare data in Phoenix table to the expected
-        final ResultSet rs = stmt
-                .executeQuery("SELECT id, name FROM table1 ORDER BY id");
-
-        for (int i = 0; i < rows; i++) {
-            assertTrue(rs.next());
-            assertEquals(i, rs.getInt(1));
-            assertEquals("a" + i, rs.getString(2));
-        }
-    }
-
-}

http://git-wip-us.apache.org/repos/asf/incubator-phoenix/blob/a9b8eb9b/phoenix-pig/src/test/java/org/apache/phoenix/pig/PhoenixPigConfigurationTest.java
----------------------------------------------------------------------
diff --git a/phoenix-pig/src/test/java/org/apache/phoenix/pig/PhoenixPigConfigurationTest.java b/phoenix-pig/src/test/java/org/apache/phoenix/pig/PhoenixPigConfigurationTest.java
new file mode 100644
index 0000000..0337563
--- /dev/null
+++ b/phoenix-pig/src/test/java/org/apache/phoenix/pig/PhoenixPigConfigurationTest.java
@@ -0,0 +1,49 @@
+/*
+ * Copyright 2010 The Apache Software Foundation
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ *distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you maynot use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicablelaw or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.phoenix.pig;
+
+import static org.junit.Assert.assertEquals;
+
+import java.sql.SQLException;
+
+import org.apache.hadoop.conf.Configuration;
+import org.junit.Test;
+
+
+/**
+ * Tests for PhoenixPigConfiguration. 
+ *
+ */
+public class PhoenixPigConfigurationTest {
+
+  
+    @Test
+    public void testBasicConfiguration() throws SQLException {
+        Configuration conf = new Configuration();
+        final PhoenixPigConfiguration phoenixConfiguration = new PhoenixPigConfiguration(conf);
+        final String zkQuorum = "localhost";
+        final String tableName = "TABLE";
+        final long batchSize = 100;
+        phoenixConfiguration.configure(zkQuorum, tableName, batchSize);
+        assertEquals(zkQuorum,phoenixConfiguration.getServer());
+        assertEquals(tableName,phoenixConfiguration.getTableName());
+        assertEquals(batchSize,phoenixConfiguration.getBatchSize());
+     }
+}

http://git-wip-us.apache.org/repos/asf/incubator-phoenix/blob/a9b8eb9b/phoenix-pig/src/test/java/org/apache/phoenix/pig/util/ColumnInfoToStringEncoderDecoderTest.java
----------------------------------------------------------------------
diff --git a/phoenix-pig/src/test/java/org/apache/phoenix/pig/util/ColumnInfoToStringEncoderDecoderTest.java b/phoenix-pig/src/test/java/org/apache/phoenix/pig/util/ColumnInfoToStringEncoderDecoderTest.java
new file mode 100644
index 0000000..9777bb5
--- /dev/null
+++ b/phoenix-pig/src/test/java/org/apache/phoenix/pig/util/ColumnInfoToStringEncoderDecoderTest.java
@@ -0,0 +1,61 @@
+/*
+ * Copyright 2010 The Apache Software Foundation
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ *distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you maynot use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicablelaw or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.phoenix.pig.util;
+
+import static org.junit.Assert.assertEquals;
+
+import java.util.List;
+
+import org.apache.phoenix.pig.util.ColumnInfoToStringEncoderDecoder;
+import org.apache.phoenix.schema.PDataType;
+import org.apache.phoenix.util.ColumnInfo;
+import org.junit.Test;
+
+import com.google.common.collect.Lists;
+
+/**
+ * Tests methods on {@link ColumnInfoToStringEncoderDecoder}
+ */
+public class ColumnInfoToStringEncoderDecoderTest {
+
+    @Test
+    public void testEncode() {
+        final ColumnInfo columnInfo = new ColumnInfo("col1", PDataType.VARCHAR.getSqlType());
+        final String encodedColumnInfo = ColumnInfoToStringEncoderDecoder.encode(Lists.newArrayList(columnInfo));
+        assertEquals(columnInfo.toString(),encodedColumnInfo);
+    }
+    
+    @Test
+    public void testDecode() {
+        final ColumnInfo columnInfo = new ColumnInfo("col1", PDataType.VARCHAR.getSqlType());
+        final String encodedColumnInfo = ColumnInfoToStringEncoderDecoder.encode(Lists.newArrayList(columnInfo));
+        assertEquals(columnInfo.toString(),encodedColumnInfo);
+    }
+    
+    @Test
+    public void testEncodeDecodeWithNulls() {
+        final ColumnInfo columnInfo1 = new ColumnInfo("col1", PDataType.VARCHAR.getSqlType());
+        final ColumnInfo columnInfo2 = null;
+        final String columnInfoStr = ColumnInfoToStringEncoderDecoder.encode(Lists.newArrayList(columnInfo1,columnInfo2));
+        final List<ColumnInfo> decodedColumnInfo = ColumnInfoToStringEncoderDecoder.decode(columnInfoStr);
+        assertEquals(1,decodedColumnInfo.size()); 
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-phoenix/blob/a9b8eb9b/phoenix-pig/src/test/java/org/apache/phoenix/pig/util/PhoenixPigSchemaUtilTest.java
----------------------------------------------------------------------
diff --git a/phoenix-pig/src/test/java/org/apache/phoenix/pig/util/PhoenixPigSchemaUtilTest.java b/phoenix-pig/src/test/java/org/apache/phoenix/pig/util/PhoenixPigSchemaUtilTest.java
new file mode 100644
index 0000000..310128c
--- /dev/null
+++ b/phoenix-pig/src/test/java/org/apache/phoenix/pig/util/PhoenixPigSchemaUtilTest.java
@@ -0,0 +1,84 @@
+/*
+ * Copyright 2010 The Apache Software Foundation
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ *distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you maynot use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicablelaw or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.phoenix.pig.util;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.fail;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
+
+import java.io.IOException;
+import java.sql.SQLException;
+import java.sql.Types;
+import java.util.List;
+
+import org.apache.phoenix.pig.PhoenixPigConfiguration;
+import org.apache.phoenix.pig.util.PhoenixPigSchemaUtil;
+import org.apache.phoenix.schema.IllegalDataException;
+import org.apache.phoenix.util.ColumnInfo;
+import org.apache.pig.ResourceSchema;
+import org.apache.pig.ResourceSchema.ResourceFieldSchema;
+import org.apache.pig.data.DataType;
+import org.junit.Test;
+
+import com.google.common.collect.ImmutableList;
+
+/**
+ * 
+ * Tests on PhoenixPigSchemaUtil
+ */
+public class PhoenixPigSchemaUtilTest {
+
+    private static final ColumnInfo ID_COLUMN = new ColumnInfo("ID", Types.BIGINT);
+    private static final ColumnInfo NAME_COLUMN = new ColumnInfo("NAME", Types.VARCHAR);
+    private static final ColumnInfo LOCATION_COLUMN = new ColumnInfo("LOCATION", Types.ARRAY);
+    
+    
+    @Test
+    public void testSchema() throws SQLException, IOException {
+        
+        final PhoenixPigConfiguration configuration = mock(PhoenixPigConfiguration.class);
+        final List<ColumnInfo> columnInfos = ImmutableList.of(ID_COLUMN,NAME_COLUMN);
+        when(configuration.getSelectColumnMetadataList()).thenReturn(columnInfos);
+        final ResourceSchema actual = PhoenixPigSchemaUtil.getResourceSchema(configuration);
+        
+        // expected schema.
+        final ResourceFieldSchema[] fields = new ResourceFieldSchema[2];
+        fields[0] = new ResourceFieldSchema().setName("ID")
+                                                .setType(DataType.LONG);
+
+        fields[1] = new ResourceFieldSchema().setName("NAME")
+                                                .setType(DataType.CHARARRAY);
+        final ResourceSchema expected = new ResourceSchema().setFields(fields);
+        
+        assertEquals(expected.toString(), actual.toString());
+        
+    }
+    
+    @Test(expected=IllegalDataException.class)
+    public void testUnSupportedTypes() throws SQLException, IOException {
+        
+        final PhoenixPigConfiguration configuration = mock(PhoenixPigConfiguration.class);
+        final List<ColumnInfo> columnInfos = ImmutableList.of(ID_COLUMN,LOCATION_COLUMN);
+        when(configuration.getSelectColumnMetadataList()).thenReturn(columnInfos);
+        PhoenixPigSchemaUtil.getResourceSchema(configuration);
+        fail("We currently don't support Array type yet. WIP!!");
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-phoenix/blob/a9b8eb9b/phoenix-pig/src/test/java/org/apache/phoenix/pig/util/QuerySchemaParserFunctionTest.java
----------------------------------------------------------------------
diff --git a/phoenix-pig/src/test/java/org/apache/phoenix/pig/util/QuerySchemaParserFunctionTest.java b/phoenix-pig/src/test/java/org/apache/phoenix/pig/util/QuerySchemaParserFunctionTest.java
new file mode 100644
index 0000000..3daf4e1
--- /dev/null
+++ b/phoenix-pig/src/test/java/org/apache/phoenix/pig/util/QuerySchemaParserFunctionTest.java
@@ -0,0 +1,109 @@
+/*
+ * Copyright 2010 The Apache Software Foundation
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ *distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you maynot use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicablelaw or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.phoenix.pig.util;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.fail;
+
+import java.sql.Connection;
+import java.sql.DriverManager;
+import java.sql.SQLException;
+
+import org.apache.hadoop.hbase.util.Pair;
+import org.apache.phoenix.pig.PhoenixPigConfiguration;
+import org.apache.phoenix.pig.util.QuerySchemaParserFunction;
+import org.apache.phoenix.query.BaseConnectionlessQueryTest;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+import org.mockito.Mockito;
+
+import com.google.common.base.Joiner;
+
+/**
+ * 
+ * Unit tests to validate the query passed to LOAD .
+ *
+ */
+public class QuerySchemaParserFunctionTest extends BaseConnectionlessQueryTest {
+
+    private PhoenixPigConfiguration phoenixConfiguration;
+    private Connection conn;
+    private QuerySchemaParserFunction function;
+    
+    @Before
+    public void setUp() throws SQLException {
+        phoenixConfiguration = Mockito.mock(PhoenixPigConfiguration.class);
+        conn = DriverManager.getConnection(getUrl());
+        Mockito.when(phoenixConfiguration.getConnection()).thenReturn(conn);
+        function = new QuerySchemaParserFunction(phoenixConfiguration);
+    }
+    
+    @Test(expected=RuntimeException.class)
+    public void testSelectQuery() {
+        final String selectQuery = "SELECT col1 FROM test";
+        function.apply(selectQuery);
+        fail("Should fail as the table [test] doesn't exist");
+   }
+    
+    @Test
+    public void testValidSelectQuery() throws SQLException {
+        String ddl = "CREATE TABLE EMPLOYEE " +
+                "  (id integer not null, name varchar, age integer,location varchar " +
+                "  CONSTRAINT pk PRIMARY KEY (id))\n";
+        createTestTable(getUrl(), ddl);
+  
+        final String selectQuery = "SELECT name,age,location FROM EMPLOYEE";
+        Pair<String,String> pair = function.apply(selectQuery);
+         
+        assertEquals(pair.getFirst(), "EMPLOYEE");
+        assertEquals(pair.getSecond(),Joiner.on(',').join("NAME","AGE","LOCATION"));
+    }
+    
+    @Test(expected=RuntimeException.class)
+    public void testUpsertQuery() throws SQLException {
+        String ddl = "CREATE TABLE EMPLOYEE " +
+                "  (id integer not null, name varchar, age integer,location varchar " +
+                "  CONSTRAINT pk PRIMARY KEY (id))\n";
+        createTestTable(getUrl(), ddl);
+  
+        final String upsertQuery = "UPSERT INTO EMPLOYEE (ID, NAME) VALUES (?, ?)";
+        
+        function.apply(upsertQuery);
+        fail(" Function call successful despite passing an UPSERT query");
+    }
+    
+    @Test(expected=IllegalArgumentException.class)
+    public void testAggregationQuery() throws SQLException {
+        String ddl = "CREATE TABLE EMPLOYEE " +
+                "  (id integer not null, name varchar, age integer,location varchar " +
+                "  CONSTRAINT pk PRIMARY KEY (id))\n";
+        createTestTable(getUrl(), ddl);
+  
+        final String selectQuery = "SELECT MAX(ID) FROM EMPLOYEE";
+        function.apply(selectQuery);
+        fail(" Function call successful despite passing an aggreagate query");
+    }
+
+    @After
+    public void tearDown() throws SQLException {
+        conn.close();
+    }
+}