You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@phoenix.apache.org by ra...@apache.org on 2014/12/10 01:51:48 UTC

[2/3] phoenix git commit: PHOENIX-1454 Map-Reduce-over-Phoenix-Tables

http://git-wip-us.apache.org/repos/asf/phoenix/blob/cabb16f7/phoenix-pig/src/main/java/org/apache/phoenix/pig/PhoenixHBaseStorage.java
----------------------------------------------------------------------
diff --git a/phoenix-pig/src/main/java/org/apache/phoenix/pig/PhoenixHBaseStorage.java b/phoenix-pig/src/main/java/org/apache/phoenix/pig/PhoenixHBaseStorage.java
index 5f6fcdd..5abc9c8 100644
--- a/phoenix-pig/src/main/java/org/apache/phoenix/pig/PhoenixHBaseStorage.java
+++ b/phoenix-pig/src/main/java/org/apache/phoenix/pig/PhoenixHBaseStorage.java
@@ -20,6 +20,8 @@ package org.apache.phoenix.pig;
 import java.io.IOException;
 import java.net.URI;
 import java.net.URISyntaxException;
+import java.sql.SQLException;
+import java.util.List;
 import java.util.Properties;
 
 import org.apache.commons.cli.CommandLine;
@@ -28,13 +30,17 @@ import org.apache.commons.cli.GnuParser;
 import org.apache.commons.cli.HelpFormatter;
 import org.apache.commons.cli.Options;
 import org.apache.commons.cli.ParseException;
+import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hbase.HConstants;
 import org.apache.hadoop.io.NullWritable;
 import org.apache.hadoop.mapreduce.Job;
 import org.apache.hadoop.mapreduce.OutputFormat;
 import org.apache.hadoop.mapreduce.RecordWriter;
-import org.apache.phoenix.pig.hadoop.PhoenixOutputFormat;
-import org.apache.phoenix.pig.hadoop.PhoenixRecord;
+import org.apache.phoenix.mapreduce.PhoenixOutputFormat;
+import org.apache.phoenix.mapreduce.util.PhoenixConfigurationUtil;
+import org.apache.phoenix.pig.writable.PhoenixPigDBWritable;
+import org.apache.phoenix.util.ColumnInfo;
 import org.apache.pig.ResourceSchema;
 import org.apache.pig.ResourceSchema.ResourceFieldSchema;
 import org.apache.pig.StoreFuncInterface;
@@ -71,74 +77,76 @@ import org.apache.pig.impl.util.UDFContext;
 @SuppressWarnings("rawtypes")
 public class PhoenixHBaseStorage implements StoreFuncInterface {
 
-	private PhoenixPigConfiguration config;
-	private RecordWriter<NullWritable, PhoenixRecord> writer;
-	private String contextSignature = null;
-	private ResourceSchema schema;	
-	private long batchSize;
-	private final PhoenixOutputFormat outputFormat = new PhoenixOutputFormat();
-
-	// Set of options permitted
-	private final static Options validOptions = new Options();
-	private final static CommandLineParser parser = new GnuParser();
-	private final static String SCHEMA = "_schema";
-
-	private final CommandLine configuredOptions;
-	private final String server;
-
-	public PhoenixHBaseStorage(String server) throws ParseException {
-		this(server, null);
-	}
-
-	public PhoenixHBaseStorage(String server, String optString)
-			throws ParseException {
-		populateValidOptions();
-		this.server = server;
-
-		String[] optsArr = optString == null ? new String[0] : optString.split(" ");
-		try {
-			configuredOptions = parser.parse(validOptions, optsArr);
-		} catch (ParseException e) {
-			HelpFormatter formatter = new HelpFormatter();
-			formatter.printHelp("[-batchSize]", validOptions);
-			throw e;
-		}
-
-		batchSize = Long.parseLong(configuredOptions.getOptionValue("batchSize"));
-	}
-
-	private static void populateValidOptions() {
-		validOptions.addOption("batchSize", true, "Specify upsert batch size");
-	}
-
-	/**
-	 * Returns UDFProperties based on <code>contextSignature</code>.
-	 */
-	private Properties getUDFProperties() {
-		return UDFContext.getUDFContext().getUDFProperties(this.getClass(), new String[] { contextSignature });
-	}
-
-	
-	/**
-	 * Parse the HBase table name and configure job
-	 */
-	@Override
-	public void setStoreLocation(String location, Job job) throws IOException {
-	    URI locationURI;
+	private Configuration config;
+    private RecordWriter<NullWritable, PhoenixPigDBWritable> writer;
+    private List<ColumnInfo> columnInfo = null;
+    private String contextSignature = null;
+    private ResourceSchema schema;  
+    private long batchSize;
+    private final PhoenixOutputFormat outputFormat = new PhoenixOutputFormat();
+
+    // Set of options permitted
+    private final static Options validOptions = new Options();
+    private final static CommandLineParser parser = new GnuParser();
+    private final static String SCHEMA = "_schema";
+
+    private final CommandLine configuredOptions;
+    private final String server;
+
+    public PhoenixHBaseStorage(String server) throws ParseException {
+        this(server, null);
+    }
+
+    public PhoenixHBaseStorage(String server, String optString)
+            throws ParseException {
+        populateValidOptions();
+        this.server = server;
+
+        String[] optsArr = optString == null ? new String[0] : optString.split(" ");
+        try {
+            configuredOptions = parser.parse(validOptions, optsArr);
+        } catch (ParseException e) {
+            HelpFormatter formatter = new HelpFormatter();
+            formatter.printHelp("[-batchSize]", validOptions);
+            throw e;
+        }
+        batchSize = Long.parseLong(configuredOptions.getOptionValue("batchSize"));
+    }
+
+    private static void populateValidOptions() {
+        validOptions.addOption("batchSize", true, "Specify upsert batch size");
+    }
+
+    /**
+     * Returns UDFProperties based on <code>contextSignature</code>.
+     */
+    private Properties getUDFProperties() {
+        return UDFContext.getUDFContext().getUDFProperties(this.getClass(), new String[] { contextSignature });
+    }
+
+    
+    /**
+     * Parse the HBase table name and configure job
+     */
+    @Override
+    public void setStoreLocation(String location, Job job) throws IOException {
+        URI locationURI;
         try {
             locationURI = new URI(location);
             if (!"hbase".equals(locationURI.getScheme())) {
                 throw new IOException(String.format("Location must use the hbase protocol, hbase://tableName[/columnList]. Supplied location=%s",location));
             }
+            config = job.getConfiguration();
+            config.set(HConstants.ZOOKEEPER_QUORUM, server);
             String tableName = locationURI.getAuthority();
             // strip off the leading path token '/'
             String columns = null;
             if(!locationURI.getPath().isEmpty()) {
                 columns = locationURI.getPath().substring(1);
+                PhoenixConfigurationUtil.setUpsertColumnNames(config, columns);
             }
-            config = new PhoenixPigConfiguration(job.getConfiguration());
-            config.configure(server, tableName, batchSize, columns);
-            
+            PhoenixConfigurationUtil.setOutputTableName(config,tableName);
+            PhoenixConfigurationUtil.setBatchSize(config,batchSize);
             String serializedSchema = getUDFProperties().getProperty(contextSignature + SCHEMA);
             if (serializedSchema != null) {
                 schema = (ResourceSchema) ObjectSerializer.deserialize(serializedSchema);
@@ -146,59 +154,61 @@ public class PhoenixHBaseStorage implements StoreFuncInterface {
         } catch (URISyntaxException e) {
             throw new IOException(String.format("Location must use the hbase protocol, hbase://tableName[/columnList]. Supplied location=%s",location),e);
         }
-	}
+    }
 
-	@SuppressWarnings("unchecked")
+    @SuppressWarnings("unchecked")
     @Override
-	public void prepareToWrite(RecordWriter writer) throws IOException {
-		this.writer =writer;
-	}
+    public void prepareToWrite(RecordWriter writer) throws IOException {
+        this.writer = writer;
+        try {
+            this.columnInfo = PhoenixConfigurationUtil.getUpsertColumnMetadataList(this.config);
+        } catch(SQLException sqle) {
+            throw new IOException(sqle);
+        }
+    }
 
-	@Override
-	public void putNext(Tuple t) throws IOException {
+    @Override
+    public void putNext(Tuple t) throws IOException {
         ResourceFieldSchema[] fieldSchemas = (schema == null) ? null : schema.getFields();      
-        
-        PhoenixRecord record = new PhoenixRecord(fieldSchemas);
-        
+        PhoenixPigDBWritable record = PhoenixPigDBWritable.newInstance(fieldSchemas,this.columnInfo);
         for(int i=0; i<t.size(); i++) {
-        	record.add(t.get(i));
+            record.add(t.get(i));
+        }
+        try {
+            this.writer.write(null, record);
+        } catch (InterruptedException e) {
+            throw new RuntimeException(e);
         }
         
-		try {
-			writer.write(null, record);
-		} catch (InterruptedException e) {
-			throw new RuntimeException(e);
-		}
-        
-	}
+    }
 
-	@Override
-	public void setStoreFuncUDFContextSignature(String signature) {
+    @Override
+    public void setStoreFuncUDFContextSignature(String signature) {
         this.contextSignature = signature;
-	}
-
-	@Override
-	public void cleanupOnFailure(String location, Job job) throws IOException {
-	}
-
-	@Override
-	public void cleanupOnSuccess(String location, Job job) throws IOException {
-	}
-
-	@Override
-	public String relToAbsPathForStoreLocation(String location, Path curDir) throws IOException {
-		return location;
-	}
-
-	@Override
-	public OutputFormat getOutputFormat() throws IOException {
-		return outputFormat;
-	}
-
-	@Override
-	public void checkSchema(ResourceSchema s) throws IOException {
-		schema = s;
-		getUDFProperties().setProperty(contextSignature + SCHEMA, ObjectSerializer.serialize(schema));
-	}
+    }
+
+    @Override
+    public void cleanupOnFailure(String location, Job job) throws IOException {
+    }
+
+    @Override
+    public void cleanupOnSuccess(String location, Job job) throws IOException {
+    }
+
+    @Override
+    public String relToAbsPathForStoreLocation(String location, Path curDir) throws IOException {
+        return location;
+    }
+
+    @Override
+    public OutputFormat getOutputFormat() throws IOException {
+        return outputFormat;
+    }
+
+    @Override
+    public void checkSchema(ResourceSchema s) throws IOException {
+        schema = s;
+        getUDFProperties().setProperty(contextSignature + SCHEMA, ObjectSerializer.serialize(schema));
+    }
 
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/phoenix/blob/cabb16f7/phoenix-pig/src/main/java/org/apache/phoenix/pig/PhoenixPigConfiguration.java
----------------------------------------------------------------------
diff --git a/phoenix-pig/src/main/java/org/apache/phoenix/pig/PhoenixPigConfiguration.java b/phoenix-pig/src/main/java/org/apache/phoenix/pig/PhoenixPigConfiguration.java
deleted file mode 100644
index c6b6ec9..0000000
--- a/phoenix-pig/src/main/java/org/apache/phoenix/pig/PhoenixPigConfiguration.java
+++ /dev/null
@@ -1,340 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.phoenix.pig;
-
-import static org.apache.commons.lang.StringUtils.isNotEmpty;
-
-import java.sql.Connection;
-import java.sql.DriverManager;
-import java.sql.SQLException;
-import java.util.List;
-import java.util.Properties;
-
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.phoenix.jdbc.PhoenixConnection;
-import org.apache.phoenix.pig.util.ColumnInfoToStringEncoderDecoder;
-import org.apache.phoenix.util.ColumnInfo;
-import org.apache.phoenix.util.PhoenixRuntime;
-import org.apache.phoenix.util.QueryUtil;
-
-import com.google.common.annotations.VisibleForTesting;
-import com.google.common.base.Joiner;
-import com.google.common.base.Preconditions;
-import com.google.common.base.Splitter;
-import com.google.common.collect.Lists;
-
-
-/**
- * A container for configuration to be used with {@link PhoenixHBaseStorage} and {@link PhoenixHBaseLoader}
- * 
- */
-public class PhoenixPigConfiguration {
-	
-	private static final Log LOG = LogFactory.getLog(PhoenixPigConfiguration.class);
-	
-	private PhoenixPigConfigurationUtil util;
-	
-	/**
-	 * Speculative execution of Map tasks
-	 */
-	public static final String MAP_SPECULATIVE_EXEC = "mapred.map.tasks.speculative.execution";
-
-	/**
-	 * Speculative execution of Reduce tasks
-	 */
-	public static final String REDUCE_SPECULATIVE_EXEC = "mapred.reduce.tasks.speculative.execution";
-	
-	public static final String SERVER_NAME = "phoenix.hbase.server.name";
-	
-	public static final String TABLE_NAME = "phoenix.hbase.table.name";
-	
-	public static final String UPSERT_COLUMNS = "phoenix.hbase.upsert.columns";
-	
-	public static final String UPSERT_STATEMENT = "phoenix.upsert.stmt";
-	
-	public static final String UPSERT_COLUMN_INFO_KEY  = "phoenix.upsert.columninfos.list";
-	
-	public static final String SELECT_STATEMENT = "phoenix.select.stmt";
-	
-	public static final String UPSERT_BATCH_SIZE = "phoenix.upsert.batch.size";
-	
-	//columns projected given as part of LOAD.
-	public static final String SELECT_COLUMNS = "phoneix.select.query.columns";
-	
-	public static final String SELECT_COLUMN_INFO_KEY  = "phoenix.select.columninfos.list";
-	
-	public static final String SCHEMA_TYPE = "phoenix.select.schema.type";
-	
-	// the delimiter supported during LOAD and STORE when projected columns are given.
-	public static final String COLUMN_NAMES_DELIMITER = "phoenix.column.names.delimiter";
-	
-	public static final long DEFAULT_UPSERT_BATCH_SIZE = 1000;
-	
-	public static final String DEFAULT_COLUMN_NAMES_DELIMITER = ",";
-	
-	private final Configuration conf;
-		
-	public PhoenixPigConfiguration(Configuration conf) {
-		this.conf = conf;
-		this.util = new PhoenixPigConfigurationUtil();
-	}
-	
-	public void configure(String server, String tableName, long batchSize) {
-        configure(server,tableName,batchSize,null);
-    }
-	
-	public void configure(String server, String tableName, long batchSize, String columns) {
-	    conf.set(SERVER_NAME, server);
-        conf.set(TABLE_NAME, tableName);
-        conf.setLong(UPSERT_BATCH_SIZE, batchSize);
-        if (isNotEmpty(columns)) {
-            conf.set(UPSERT_COLUMNS, columns);
-        }
-        conf.setBoolean(MAP_SPECULATIVE_EXEC, false);
-        conf.setBoolean(REDUCE_SPECULATIVE_EXEC, false);
-	}
-	
-	
-	/**
-	 * Creates a {@link Connection} with autoCommit set to false.
-	 * @throws SQLException
-	 */
-	public Connection getConnection() throws SQLException {
-	    return getUtil().getConnection(getConfiguration());
-	}
-	
-	public String getUpsertStatement() throws SQLException {
-		return getUtil().getUpsertStatement(getConfiguration(), getTableName());
-	}
-
-	public long getBatchSize() throws SQLException {
-		return getUtil().getBatchSize(getConfiguration());
-	}
-
-	public String getServer() {
-		return conf.get(SERVER_NAME);
-	}
-
-	public List<ColumnInfo> getColumnMetadataList() throws SQLException {
-	    return getUtil().getUpsertColumnMetadataList(getConfiguration(), getTableName());
-	}
-	
-	public String getUpsertColumns() {
-	    return conf.get(UPSERT_COLUMNS);
-	}
-	
-	public String getTableName() {
-		return conf.get(TABLE_NAME);
-	}
-	
-	public Configuration getConfiguration() {
-		return this.conf;
-	}
-	
-	public String getSelectStatement() throws SQLException {
-	   return getUtil().getSelectStatement(getConfiguration(), getTableName());
-	}
-	
-	public List<ColumnInfo> getSelectColumnMetadataList() throws SQLException {
-        return getUtil().getSelectColumnMetadataList(getConfiguration(), getTableName());
-    }
-	
-	public int getSelectColumnsCount() throws SQLException {
-		return getUtil().getSelectColumnsCount(getConfiguration(), getTableName());
-	}
-	
-	public SchemaType getSchemaType() {
-		final String schemaTp = conf.get(SCHEMA_TYPE);
-		return SchemaType.valueOf(schemaTp);
-	}
-	
-	
-	public void setServerName(final String zookeeperQuorum) {
-	    this.conf.set(SERVER_NAME, zookeeperQuorum);
-	}
-	
-	public void setTableName(final String tableName) {
-	    Preconditions.checkNotNull(tableName, "HBase Table name cannot be null!");
-	    this.conf.set(TABLE_NAME, tableName);
-	}
-	
-	public void setSelectStatement(final String selectStatement) {
-	    this.conf.set(SELECT_STATEMENT, selectStatement);
-	}
-
-	public void setSelectColumns(String selectColumns) {
-        this.conf.set(SELECT_COLUMNS, selectColumns);
-    }
-	
-	public PhoenixPigConfigurationUtil getUtil() {
-	    return this.util;
-	}
-	
-	public void setSchemaType(final SchemaType schemaType) {
-		this.conf.set(SCHEMA_TYPE, schemaType.name());
-	}
-	
-	public enum SchemaType {
-		TABLE,
-		QUERY;
-	}
-	
-		
-	@VisibleForTesting
-	static class PhoenixPigConfigurationUtil {
-                
-        public Connection getConnection(final Configuration configuration) throws SQLException {
-            Preconditions.checkNotNull(configuration);
-            Properties props = new Properties();
-            final Connection conn = DriverManager.getConnection(QueryUtil.getUrl(configuration.get(SERVER_NAME)), props).unwrap(PhoenixConnection.class);
-            conn.setAutoCommit(false);
-            return conn;
-        }
-        
-      public List<ColumnInfo> getUpsertColumnMetadataList(final Configuration configuration,final String tableName) throws SQLException {
-            Preconditions.checkNotNull(configuration);
-            Preconditions.checkNotNull(tableName);
-            final String columnInfoStr = configuration.get(UPSERT_COLUMN_INFO_KEY);
-            if(isNotEmpty(columnInfoStr)) {
-                return ColumnInfoToStringEncoderDecoder.decode(columnInfoStr);
-            }
-            final Connection connection = getConnection(configuration);
-            String upsertColumns = configuration.get(UPSERT_COLUMNS);
-            List<String> upsertColumnList = null;
-            if(isNotEmpty(upsertColumns)) {
-                final String columnNamesDelimiter = configuration.get(COLUMN_NAMES_DELIMITER, DEFAULT_COLUMN_NAMES_DELIMITER);
-                upsertColumnList = Lists.newArrayList(Splitter.on(columnNamesDelimiter).omitEmptyStrings().trimResults().split(upsertColumns));
-                LOG.info(String.format("UseUpsertColumns=%s, upsertColumns=%s, upsertColumnSet.size()=%s, parsedColumns=%s "
-                        ,!upsertColumnList.isEmpty(),upsertColumns, upsertColumnList.size(), Joiner.on(DEFAULT_COLUMN_NAMES_DELIMITER).join(upsertColumnList)
-                        ));
-            } 
-           List<ColumnInfo> columnMetadataList = PhoenixRuntime.generateColumnInfo(connection, tableName, upsertColumnList);
-           final String encodedColumnInfos = ColumnInfoToStringEncoderDecoder.encode(columnMetadataList);
-           // we put the encoded column infos in the Configuration for re usability. 
-           configuration.set(UPSERT_COLUMN_INFO_KEY, encodedColumnInfos);
-           closeConnection(connection);
-           return columnMetadataList;
-        }
-        
-        public String getUpsertStatement(final Configuration configuration,final String tableName) throws SQLException {
-            Preconditions.checkNotNull(configuration);
-            Preconditions.checkNotNull(tableName);
-            String upsertStmt = configuration.get(UPSERT_STATEMENT);
-            if(isNotEmpty(upsertStmt)) {
-                return upsertStmt;
-            }
-            final boolean useUpsertColumns = isNotEmpty(configuration.get(UPSERT_COLUMNS,""));
-            final List<ColumnInfo> columnMetadataList = getUpsertColumnMetadataList(configuration, tableName);
-            if (useUpsertColumns) {
-                // Generating UPSERT statement without column name information.
-                upsertStmt = QueryUtil.constructUpsertStatement(tableName, columnMetadataList);
-                LOG.info("Phoenix Custom Upsert Statement: "+ upsertStmt);
-            } else {
-                // Generating UPSERT statement without column name information.
-                upsertStmt = QueryUtil.constructGenericUpsertStatement(tableName, columnMetadataList.size());
-                LOG.info("Phoenix Generic Upsert Statement: " + upsertStmt);
-            }
-            configuration.set(UPSERT_STATEMENT, upsertStmt);
-            return upsertStmt;
-            
-        }
-        
-        public List<ColumnInfo> getSelectColumnMetadataList(final Configuration configuration,final String tableName) throws SQLException {
-            Preconditions.checkNotNull(configuration);
-            Preconditions.checkNotNull(tableName);
-            final String columnInfoStr = configuration.get(SELECT_COLUMN_INFO_KEY);
-            if(isNotEmpty(columnInfoStr)) {
-                return ColumnInfoToStringEncoderDecoder.decode(columnInfoStr);
-            }
-            final Connection connection = getConnection(configuration);
-            final List<String> selectColumnList = getSelectColumnList(configuration);
-            final List<ColumnInfo> columnMetadataList = PhoenixRuntime.generateColumnInfo(connection, tableName, selectColumnList);
-            final String encodedColumnInfos = ColumnInfoToStringEncoderDecoder.encode(columnMetadataList);
-            // we put the encoded column infos in the Configuration for re usability. 
-            configuration.set(SELECT_COLUMN_INFO_KEY, encodedColumnInfos);
-            closeConnection(connection);
-            return columnMetadataList;
-        }
-
-		private List<String> getSelectColumnList(
-				final Configuration configuration) {
-			String selectColumns = configuration.get(SELECT_COLUMNS);
-            List<String> selectColumnList = null;
-            if(isNotEmpty(selectColumns)) {
-                final String columnNamesDelimiter = configuration.get(COLUMN_NAMES_DELIMITER, DEFAULT_COLUMN_NAMES_DELIMITER);
-                selectColumnList = Lists.newArrayList(Splitter.on(columnNamesDelimiter).omitEmptyStrings().trimResults().split(selectColumns));
-                LOG.info(String.format("UseSelectColumns=%s, selectColumns=%s, selectColumnSet.size()=%s, parsedColumns=%s "
-                        ,!selectColumnList.isEmpty(),selectColumns, selectColumnList.size(), Joiner.on(DEFAULT_COLUMN_NAMES_DELIMITER).join(selectColumnList)
-                        ));
-            }
-			return selectColumnList;
-		}
-        
-        public String getSelectStatement(final Configuration configuration,final String tableName) throws SQLException {
-            Preconditions.checkNotNull(configuration);
-            Preconditions.checkNotNull(tableName);
-            String selectStmt = configuration.get(SELECT_STATEMENT);
-            if(isNotEmpty(selectStmt)) {
-                return selectStmt;
-            }
-            final List<ColumnInfo> columnMetadataList = getSelectColumnMetadataList(configuration, tableName);
-            selectStmt = QueryUtil.constructSelectStatement(tableName, columnMetadataList);
-            LOG.info("Select Statement: "+ selectStmt);
-            configuration.set(SELECT_STATEMENT, selectStmt);
-            return selectStmt;
-        }
-        
-        public long getBatchSize(final Configuration configuration) throws SQLException {
-            Preconditions.checkNotNull(configuration);
-            long batchSize = configuration.getLong(UPSERT_BATCH_SIZE, DEFAULT_UPSERT_BATCH_SIZE);
-            if(batchSize <= 0) {
-               Connection conn = getConnection(configuration);
-               batchSize = ((PhoenixConnection) conn).getMutateBatchSize();
-               closeConnection(conn);
-            }
-            configuration.setLong(UPSERT_BATCH_SIZE, batchSize);
-            return batchSize;
-        }
-        
-        public int getSelectColumnsCount(Configuration configuration,
-				String tableName) throws SQLException {
-        	Preconditions.checkNotNull(configuration);
-        	final String schemaTp = configuration.get(SCHEMA_TYPE);
-        	final SchemaType schemaType = SchemaType.valueOf(schemaTp);
-        	int count = 0;
-        	if(SchemaType.QUERY.equals(schemaType)) {
-        		List<String> selectedColumnList = getSelectColumnList(configuration);
-        		count = selectedColumnList == null ? 0 : selectedColumnList.size();
-        	} else {
-        		List<ColumnInfo> columnInfos = getSelectColumnMetadataList(configuration,tableName);
-        		count = columnInfos == null ? 0 : columnInfos.size();
-        	}
-			return count;
-		}
-        
-        private void closeConnection(final Connection connection) throws SQLException {
-            if(connection != null) {
-                connection.close();
-            }
-        }
-    }
-    
-}

http://git-wip-us.apache.org/repos/asf/phoenix/blob/cabb16f7/phoenix-pig/src/main/java/org/apache/phoenix/pig/hadoop/PhoenixInputFormat.java
----------------------------------------------------------------------
diff --git a/phoenix-pig/src/main/java/org/apache/phoenix/pig/hadoop/PhoenixInputFormat.java b/phoenix-pig/src/main/java/org/apache/phoenix/pig/hadoop/PhoenixInputFormat.java
deleted file mode 100644
index 2ef7914..0000000
--- a/phoenix-pig/src/main/java/org/apache/phoenix/pig/hadoop/PhoenixInputFormat.java
+++ /dev/null
@@ -1,142 +0,0 @@
-/*
- * Copyright 2010 The Apache Software Foundation
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- *distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you maynot use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicablelaw or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.phoenix.pig.hadoop;
-
-import java.io.IOException;
-import java.sql.Connection;
-import java.sql.SQLException;
-import java.sql.Statement;
-import java.util.List;
-
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.hbase.client.Scan;
-import org.apache.hadoop.io.NullWritable;
-import org.apache.hadoop.mapreduce.InputFormat;
-import org.apache.hadoop.mapreduce.InputSplit;
-import org.apache.hadoop.mapreduce.JobContext;
-import org.apache.hadoop.mapreduce.RecordReader;
-import org.apache.hadoop.mapreduce.TaskAttemptContext;
-import org.apache.phoenix.compile.QueryPlan;
-import org.apache.phoenix.jdbc.PhoenixStatement;
-import org.apache.phoenix.pig.PhoenixPigConfiguration;
-import org.apache.phoenix.query.KeyRange;
-
-import com.google.common.base.Preconditions;
-import com.google.common.collect.Lists;
-
-/**
- * The InputFormat class for generating the splits and creating the record readers.
- * 
- */
-public final class PhoenixInputFormat extends InputFormat<NullWritable, PhoenixRecord> {
-
-    private static final Log LOG = LogFactory.getLog(PhoenixInputFormat.class);
-    private PhoenixPigConfiguration phoenixConfiguration;
-    private Connection connection;
-    private QueryPlan  queryPlan;
-    
-    /**
-     * instantiated by framework
-     */
-    public PhoenixInputFormat() {
-    }
-
-    @Override
-    public RecordReader<NullWritable, PhoenixRecord> createRecordReader(InputSplit split, TaskAttemptContext context)
-            throws IOException, InterruptedException {       
-        setConf(context.getConfiguration());
-        final QueryPlan queryPlan = getQueryPlan(context);
-        try {
-            return new PhoenixRecordReader(phoenixConfiguration,queryPlan);    
-        }catch(SQLException sqle) {
-            throw new IOException(sqle);
-        }
-    }
-    
-   
-
-    @Override
-    public List<InputSplit> getSplits(JobContext context) throws IOException, InterruptedException {  
-        setConf(context.getConfiguration());
-        final QueryPlan queryPlan = getQueryPlan(context);
-        final List<KeyRange> allSplits = queryPlan.getSplits();
-        final List<InputSplit> splits = generateSplits(queryPlan,allSplits);
-        return splits;
-    }
-
-    private List<InputSplit> generateSplits(final QueryPlan qplan, final List<KeyRange> splits) throws IOException {
-        Preconditions.checkNotNull(qplan);
-        Preconditions.checkNotNull(splits);
-        final List<InputSplit> psplits = Lists.newArrayListWithExpectedSize(splits.size());
-        for (List<Scan> scans : qplan.getScans()) {
-            psplits.add(new PhoenixInputSplit(scans));
-        }
-        return psplits;
-    }
-    
-    public void setConf(Configuration configuration) {
-        this.phoenixConfiguration = new PhoenixPigConfiguration(configuration);
-    }
-
-    public PhoenixPigConfiguration getConf() {
-        return this.phoenixConfiguration;
-    }
-    
-    private Connection getConnection() {
-        try {
-            if (this.connection == null) {
-                this.connection = phoenixConfiguration.getConnection();
-           }
-        } catch (Exception e) {
-            throw new RuntimeException(e);
-        }
-        return connection;
-    }
-    
-    /**
-     * Returns the query plan associated with the select query.
-     * @param context
-     * @return
-     * @throws IOException
-     * @throws SQLException
-     */
-    private QueryPlan getQueryPlan(final JobContext context) throws IOException {
-        Preconditions.checkNotNull(context);
-        if(queryPlan == null) {
-            try{
-                final Connection connection = getConnection();
-                final String selectStatement = getConf().getSelectStatement();
-                Preconditions.checkNotNull(selectStatement);
-                final Statement statement = connection.createStatement();
-                final PhoenixStatement pstmt = statement.unwrap(PhoenixStatement.class);
-                // Optimize the query plan so that we potentially use secondary indexes
-                this.queryPlan = pstmt.optimizeQuery(selectStatement);
-                // Initialize the query plan so it sets up the parallel scans
-                queryPlan.iterator();
-            } catch(Exception exception) {
-                LOG.error(String.format("Failed to get the query plan with error [%s]",exception.getMessage()));
-                throw new RuntimeException(exception);
-            }
-        }
-        return queryPlan;
-    }
-}

http://git-wip-us.apache.org/repos/asf/phoenix/blob/cabb16f7/phoenix-pig/src/main/java/org/apache/phoenix/pig/hadoop/PhoenixInputSplit.java
----------------------------------------------------------------------
diff --git a/phoenix-pig/src/main/java/org/apache/phoenix/pig/hadoop/PhoenixInputSplit.java b/phoenix-pig/src/main/java/org/apache/phoenix/pig/hadoop/PhoenixInputSplit.java
deleted file mode 100644
index 7414d67..0000000
--- a/phoenix-pig/src/main/java/org/apache/phoenix/pig/hadoop/PhoenixInputSplit.java
+++ /dev/null
@@ -1,127 +0,0 @@
-/*
- * Copyright 2010 The Apache Software Foundation
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- *distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you maynot use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicablelaw or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.phoenix.pig.hadoop;
-
-import java.io.DataInput;
-import java.io.DataOutput;
-import java.io.IOException;
-import java.util.List;
-
-import org.apache.hadoop.hbase.client.Scan;
-import org.apache.hadoop.io.Writable;
-import org.apache.hadoop.io.WritableUtils;
-import org.apache.hadoop.mapreduce.InputSplit;
-import org.apache.phoenix.query.KeyRange;
-
-import com.google.common.base.Preconditions;
-import com.google.common.collect.Lists;
-
-/**
- * 
- * Input split class to hold the lower and upper bound range. {@link KeyRange}
- * 
- */
-public class PhoenixInputSplit extends InputSplit implements Writable {
-
-    private List<Scan> scans;
-    private KeyRange keyRange;
-   
-    /**
-     * No Arg constructor
-     */
-    public PhoenixInputSplit() {
-    }
-    
-   /**
-    * 
-    * @param keyRange
-    */
-    public PhoenixInputSplit(final List<Scan> scans) {
-        Preconditions.checkNotNull(scans);
-        Preconditions.checkState(!scans.isEmpty());
-        this.scans = scans;
-        init();
-    }
-    
-    public List<Scan> getScans() {
-        return scans;
-    }
-    
-    public KeyRange getKeyRange() {
-        return keyRange;
-    }
-    
-    private void init() {
-        this.keyRange = KeyRange.getKeyRange(scans.get(0).getStartRow(), scans.get(scans.size()-1).getStopRow());
-    }
-    
-    @Override
-    public void readFields(DataInput input) throws IOException {
-        int count = WritableUtils.readVInt(input);
-        scans = Lists.newArrayListWithExpectedSize(count);
-        for (int i = 0; i < count; i++) {
-            Scan scan = new Scan();
-            scan.readFields(input);
-            scans.add(scan);
-        }
-        init();
-    }
-    
-    @Override
-    public void write(DataOutput output) throws IOException {
-        Preconditions.checkNotNull(scans);
-        WritableUtils.writeVInt(output, scans.size());
-        for (Scan scan : scans) {
-            scan.write(output);
-        }
-    }
-
-    @Override
-    public long getLength() throws IOException, InterruptedException {
-         return 0;
-    }
-
-    @Override
-    public String[] getLocations() throws IOException, InterruptedException {
-        return new String[]{};
-    }
-
-    @Override
-    public int hashCode() {
-        final int prime = 31;
-        int result = 1;
-        result = prime * result + keyRange.hashCode();
-        return result;
-    }
-
-    @Override
-    public boolean equals(Object obj) {
-        // TODO: review: it's a reasonable check to use the keyRange,
-        // but it's not perfect. Do we need an equals impl?
-        if (this == obj) { return true; }
-        if (obj == null) { return false; }
-        if (!(obj instanceof PhoenixInputSplit)) { return false; }
-        PhoenixInputSplit other = (PhoenixInputSplit)obj;
-        if (keyRange == null) {
-            if (other.keyRange != null) { return false; }
-        } else if (!keyRange.equals(other.keyRange)) { return false; }
-        return true;
-    }
-}

http://git-wip-us.apache.org/repos/asf/phoenix/blob/cabb16f7/phoenix-pig/src/main/java/org/apache/phoenix/pig/hadoop/PhoenixOutputCommitter.java
----------------------------------------------------------------------
diff --git a/phoenix-pig/src/main/java/org/apache/phoenix/pig/hadoop/PhoenixOutputCommitter.java b/phoenix-pig/src/main/java/org/apache/phoenix/pig/hadoop/PhoenixOutputCommitter.java
deleted file mode 100644
index a8d9d8f..0000000
--- a/phoenix-pig/src/main/java/org/apache/phoenix/pig/hadoop/PhoenixOutputCommitter.java
+++ /dev/null
@@ -1,111 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.phoenix.pig.hadoop;
-
-import java.io.IOException;
-import java.sql.Connection;
-import java.sql.SQLException;
-
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
-import org.apache.hadoop.mapreduce.JobContext;
-import org.apache.hadoop.mapreduce.OutputCommitter;
-import org.apache.hadoop.mapreduce.TaskAttemptContext;
-
-import org.apache.phoenix.jdbc.PhoenixStatement;
-
-/**
- * 
- * {@link OutputCommitter} implementation for Pig tasks using Phoenix
- * connections to upsert to HBase
- * 
- * 
- *
- */
-public class PhoenixOutputCommitter extends OutputCommitter {
-	private final Log LOG = LogFactory.getLog(PhoenixOutputCommitter.class);
-	
-	private final PhoenixOutputFormat outputFormat;
-	
-	public PhoenixOutputCommitter(PhoenixOutputFormat outputFormat) {
-		if(outputFormat == null) {
-			throw new IllegalArgumentException("PhoenixOutputFormat must not be null.");
-		}
-		this.outputFormat = outputFormat;
-	}
-
-	/**
-	 *  TODO implement rollback functionality. 
-	 *  
-	 *  {@link PhoenixStatement#execute(String)} is buffered on the client, this makes 
-	 *  it difficult to implement rollback as once a commit is issued it's hard to go 
-	 *  back all the way to undo. 
-	 */
-	@Override
-	public void abortTask(TaskAttemptContext context) throws IOException {
-	}
-
-	@Override
-	public void commitTask(TaskAttemptContext context) throws IOException {
-		commit(outputFormat.getConnection(context.getConfiguration()));
-	}
-
-	@Override
-	public boolean needsTaskCommit(TaskAttemptContext context) throws IOException {
-		return true;
-	}
-
-	@Override
-	public void setupJob(JobContext jobContext) throws IOException {		
-	}
-
-	@Override
-	public void setupTask(TaskAttemptContext context) throws IOException {
-	}
-
-	/**
-	 * Commit a transaction on task completion
-	 * 
-	 * @param connection
-	 * @throws IOException
-	 */
-	private void commit(Connection connection) throws IOException {
-		try {
-			if (connection == null || connection.isClosed()) {
-				throw new IOException("Trying to commit a connection that is null or closed: "+ connection);
-			}
-		} catch (SQLException e) {
-			throw new IOException("Exception calling isClosed on connection", e);
-		}
-
-		try {
-			LOG.debug("Commit called on task completion");
-			connection.commit();
-		} catch (SQLException e) {
-			throw new IOException("Exception while trying to commit a connection. ", e);
-		} finally {
-			try {
-				LOG.debug("Closing connection to database on task completion");
-				connection.close();
-			} catch (SQLException e) {
-				LOG.warn("Exception while trying to close database connection", e);
-			}
-		}
-	}
-}

http://git-wip-us.apache.org/repos/asf/phoenix/blob/cabb16f7/phoenix-pig/src/main/java/org/apache/phoenix/pig/hadoop/PhoenixOutputFormat.java
----------------------------------------------------------------------
diff --git a/phoenix-pig/src/main/java/org/apache/phoenix/pig/hadoop/PhoenixOutputFormat.java b/phoenix-pig/src/main/java/org/apache/phoenix/pig/hadoop/PhoenixOutputFormat.java
deleted file mode 100644
index 9c29f8f..0000000
--- a/phoenix-pig/src/main/java/org/apache/phoenix/pig/hadoop/PhoenixOutputFormat.java
+++ /dev/null
@@ -1,94 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.phoenix.pig.hadoop;
-
-import java.io.IOException;
-import java.sql.Connection;
-import java.sql.SQLException;
-
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.io.NullWritable;
-import org.apache.hadoop.mapreduce.JobContext;
-import org.apache.hadoop.mapreduce.OutputCommitter;
-import org.apache.hadoop.mapreduce.OutputFormat;
-import org.apache.hadoop.mapreduce.RecordWriter;
-import org.apache.hadoop.mapreduce.TaskAttemptContext;
-
-import org.apache.phoenix.pig.PhoenixPigConfiguration;
-
-/**
- * {@link OutputFormat} implementation for Phoenix
- * 
- * 
- *
- */
-public class PhoenixOutputFormat extends OutputFormat<NullWritable, PhoenixRecord> {
-	private static final Log LOG = LogFactory.getLog(PhoenixOutputFormat.class);
-	
-	private Connection connection;
-	private PhoenixPigConfiguration config;
-
-	@Override
-	public void checkOutputSpecs(JobContext jobContext) throws IOException, InterruptedException {		
-	}
-
-	/**
-	 * TODO Implement {@link OutputCommitter} to rollback in case of task failure
-	 */
-	
-	@Override
-	public OutputCommitter getOutputCommitter(TaskAttemptContext context) throws IOException, InterruptedException {
-		return new PhoenixOutputCommitter(this);
-	}
-
-	@Override
-	public RecordWriter<NullWritable, PhoenixRecord> getRecordWriter(TaskAttemptContext context) throws IOException, InterruptedException {
-		try {
-			return new PhoenixRecordWriter(getConnection(context.getConfiguration()), config);
-		} catch (SQLException e) {
-			throw new IOException(e);
-		}
-	}
-	
-	/**
-	 * This method creates a database connection. A single instance is created
-	 * and passed around for re-use.
-	 * 
-	 * @param configuration
-	 * @return
-	 * @throws IOException
-	 */
-	synchronized Connection getConnection(Configuration configuration) throws IOException {
-	    if (connection != null) { 
-	    	return connection; 
-	    }
-	    
-	    config = new PhoenixPigConfiguration(configuration);	    
-		try {
-			LOG.info("Initializing new Phoenix connection...");
-			connection = config.getConnection();
-			LOG.info("Initialized Phoenix connection, autoCommit="+ connection.getAutoCommit());
-			return connection;
-		} catch (SQLException e) {
-			throw new IOException(e);
-		}
-	}
-}

http://git-wip-us.apache.org/repos/asf/phoenix/blob/cabb16f7/phoenix-pig/src/main/java/org/apache/phoenix/pig/hadoop/PhoenixRecord.java
----------------------------------------------------------------------
diff --git a/phoenix-pig/src/main/java/org/apache/phoenix/pig/hadoop/PhoenixRecord.java b/phoenix-pig/src/main/java/org/apache/phoenix/pig/hadoop/PhoenixRecord.java
deleted file mode 100644
index c18ef85..0000000
--- a/phoenix-pig/src/main/java/org/apache/phoenix/pig/hadoop/PhoenixRecord.java
+++ /dev/null
@@ -1,112 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.phoenix.pig.hadoop;
-
-import java.io.DataInput;
-import java.io.DataOutput;
-import java.io.IOException;
-import java.sql.PreparedStatement;
-import java.sql.ResultSet;
-import java.sql.SQLException;
-import java.util.ArrayList;
-import java.util.List;
-
-import org.apache.hadoop.io.Writable;
-import org.apache.phoenix.pig.util.TypeUtil;
-import org.apache.phoenix.schema.PDataType;
-import org.apache.phoenix.util.ColumnInfo;
-import org.apache.pig.ResourceSchema.ResourceFieldSchema;
-import org.apache.pig.data.DataType;
-
-import com.google.common.base.Preconditions;
-
-/**
- * A {@link Writable} representing a Phoenix record. This class
- * a) does a type mapping and sets the value accordingly in the {@link PreparedStatement}
- * b) reads the column values from the {@link ResultSet}
- * 
- */
-public class PhoenixRecord implements Writable {
-	
-	private final List<Object> values;
-	private final ResourceFieldSchema[] fieldSchemas;
-	
-	public PhoenixRecord() {
-	    this(null);
-	}
-	
-	public PhoenixRecord(ResourceFieldSchema[] fieldSchemas) {
-		this.values = new ArrayList<Object>();
-		this.fieldSchemas = fieldSchemas;
-	}
-	
-	@Override
-	public void readFields(DataInput in) throws IOException {		
-	}
-
-	@Override
-	public void write(DataOutput out) throws IOException {		
-	}
-	
-	public void write(PreparedStatement statement, List<ColumnInfo> columnMetadataList) throws SQLException {
-		for (int i = 0; i < columnMetadataList.size(); i++) {
-			Object o = values.get(i);
-			ColumnInfo columnInfo = columnMetadataList.get(i);
-			byte type = (fieldSchemas == null) ? DataType.findType(o) : fieldSchemas[i].getType();
-			try {
-                Object upsertValue = convertTypeSpecificValue(o, type, columnInfo.getSqlType());
-                if (upsertValue != null) {
-                    statement.setObject(i + 1, upsertValue, columnInfo.getSqlType());
-                } else {
-                    statement.setNull(i + 1, columnInfo.getSqlType());
-                }
-            } catch (RuntimeException re) {
-                throw new RuntimeException(String.format("Unable to process column %s, innerMessage=%s"
-                        ,columnInfo.toString(),re.getMessage()),re);
-                
-            }
-		}
-		
-		statement.execute();
-	}
-	
-	public void read(final ResultSet rs, final int noOfColumns) throws SQLException {
-	    Preconditions.checkNotNull(rs);
-        Preconditions.checkArgument(noOfColumns > 0, "Number of arguments passed is <= 0");
-        values.clear();
-        for(int i = 1 ; i <= noOfColumns ; i++) {
-            Object obj = rs.getObject(i);
-            values.add(obj);
-        }
-	}
-	
-	public void add(Object value) {
-		values.add(value);
-	}
-
-	private Object convertTypeSpecificValue(Object o, byte type, Integer sqlType) {
-		PDataType pDataType = PDataType.fromTypeId(sqlType);
-
-		return TypeUtil.castPigTypeToPhoenix(o, type, pDataType);
-	}
-
-    public List<Object> getValues() {
-        return values;
-    }
-}

http://git-wip-us.apache.org/repos/asf/phoenix/blob/cabb16f7/phoenix-pig/src/main/java/org/apache/phoenix/pig/hadoop/PhoenixRecordReader.java
----------------------------------------------------------------------
diff --git a/phoenix-pig/src/main/java/org/apache/phoenix/pig/hadoop/PhoenixRecordReader.java b/phoenix-pig/src/main/java/org/apache/phoenix/pig/hadoop/PhoenixRecordReader.java
deleted file mode 100644
index f6808a8..0000000
--- a/phoenix-pig/src/main/java/org/apache/phoenix/pig/hadoop/PhoenixRecordReader.java
+++ /dev/null
@@ -1,142 +0,0 @@
-/*
- * Copyright 2010 The Apache Software Foundation
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- *distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you maynot use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicablelaw or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.phoenix.pig.hadoop;
-
-import java.io.IOException;
-import java.sql.SQLException;
-import java.util.List;
-
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
-import org.apache.hadoop.hbase.client.Scan;
-import org.apache.hadoop.io.NullWritable;
-import org.apache.hadoop.mapreduce.InputSplit;
-import org.apache.hadoop.mapreduce.RecordReader;
-import org.apache.hadoop.mapreduce.TaskAttemptContext;
-import org.apache.phoenix.compile.QueryPlan;
-import org.apache.phoenix.iterate.ConcatResultIterator;
-import org.apache.phoenix.iterate.LookAheadResultIterator;
-import org.apache.phoenix.iterate.PeekingResultIterator;
-import org.apache.phoenix.iterate.ResultIterator;
-import org.apache.phoenix.iterate.SequenceResultIterator;
-import org.apache.phoenix.iterate.TableResultIterator;
-import org.apache.phoenix.jdbc.PhoenixResultSet;
-import org.apache.phoenix.pig.PhoenixPigConfiguration;
-
-import com.google.common.base.Preconditions;
-import com.google.common.base.Throwables;
-import com.google.common.collect.Lists;
-
-/**
- * RecordReader that process the scan and returns PhoenixRecord
- * 
- */
-public final class PhoenixRecordReader extends RecordReader<NullWritable,PhoenixRecord>{
-    
-    private static final Log LOG = LogFactory.getLog(PhoenixRecordReader.class);
-    private final PhoenixPigConfiguration phoenixConfiguration;
-    private final QueryPlan queryPlan;
-    private final int columnsCount;
-    private NullWritable key =  NullWritable.get();
-    private PhoenixRecord value = null;
-    private ResultIterator resultIterator = null;
-    private PhoenixResultSet resultSet;
-    
-    public PhoenixRecordReader(final PhoenixPigConfiguration pConfiguration,final QueryPlan qPlan) throws SQLException {
-        
-        Preconditions.checkNotNull(pConfiguration);
-        Preconditions.checkNotNull(qPlan);
-        this.phoenixConfiguration = pConfiguration;
-        this.queryPlan = qPlan;
-        this.columnsCount = phoenixConfiguration.getSelectColumnsCount();
-     }
-
-    @Override
-    public void close() throws IOException {
-       if(resultIterator != null) {
-           try {
-               resultIterator.close();
-        } catch (SQLException e) {
-           LOG.error(" Error closing resultset.");
-        }
-       }
-    }
-
-    @Override
-    public NullWritable getCurrentKey() throws IOException, InterruptedException {
-        return key;
-    }
-
-    @Override
-    public PhoenixRecord getCurrentValue() throws IOException, InterruptedException {
-        return value;
-    }
-
-    @Override
-    public float getProgress() throws IOException, InterruptedException {
-        return 0;
-    }
-
-    @Override
-    public void initialize(InputSplit split, TaskAttemptContext context) throws IOException, InterruptedException {
-        final PhoenixInputSplit pSplit = (PhoenixInputSplit)split;
-        final List<Scan> scans = pSplit.getScans();
-        try {
-            List<PeekingResultIterator> iterators = Lists.newArrayListWithExpectedSize(scans.size());
-            for (Scan scan : scans) {
-                final TableResultIterator tableResultIterator = new TableResultIterator(queryPlan.getContext(), queryPlan.getTableRef(),scan);
-                PeekingResultIterator peekingResultIterator = LookAheadResultIterator.wrap(tableResultIterator);
-                iterators.add(peekingResultIterator);
-            }
-            ResultIterator iterator = ConcatResultIterator.newIterator(iterators);
-            if(queryPlan.getContext().getSequenceManager().getSequenceCount() > 0) {
-                iterator = new SequenceResultIterator(iterator, queryPlan.getContext().getSequenceManager());
-            }
-            this.resultIterator = iterator;
-            this.resultSet = new PhoenixResultSet(this.resultIterator, queryPlan.getProjector(),queryPlan.getContext().getStatement());
-        } catch (SQLException e) {
-            LOG.error(String.format(" Error [%s] initializing PhoenixRecordReader. ",e.getMessage()));
-            Throwables.propagate(e);
-        }
-        
-   }
-
-    @Override
-    public boolean nextKeyValue() throws IOException, InterruptedException {
-        if (key == null) {
-            key = NullWritable.get();
-        }
-        if (value == null) {
-            value =  new PhoenixRecord();
-        }
-        Preconditions.checkNotNull(this.resultSet);
-        try {
-            if(!resultSet.next()) {
-                return false;
-            }
-            value.read(resultSet,columnsCount);
-            return true;
-        } catch (SQLException e) {
-            LOG.error(String.format(" Error [%s] occurred while iterating over the resultset. ",e.getMessage()));
-            Throwables.propagate(e);
-        }
-        return false;
-    }
-}

http://git-wip-us.apache.org/repos/asf/phoenix/blob/cabb16f7/phoenix-pig/src/main/java/org/apache/phoenix/pig/hadoop/PhoenixRecordWriter.java
----------------------------------------------------------------------
diff --git a/phoenix-pig/src/main/java/org/apache/phoenix/pig/hadoop/PhoenixRecordWriter.java b/phoenix-pig/src/main/java/org/apache/phoenix/pig/hadoop/PhoenixRecordWriter.java
deleted file mode 100644
index c980a38..0000000
--- a/phoenix-pig/src/main/java/org/apache/phoenix/pig/hadoop/PhoenixRecordWriter.java
+++ /dev/null
@@ -1,83 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.phoenix.pig.hadoop;
-
-import java.io.IOException;
-import java.sql.Connection;
-import java.sql.PreparedStatement;
-import java.sql.SQLException;
-
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
-import org.apache.hadoop.io.NullWritable;
-import org.apache.hadoop.mapreduce.RecordWriter;
-import org.apache.hadoop.mapreduce.TaskAttemptContext;
-
-import org.apache.phoenix.pig.PhoenixPigConfiguration;
-
-/**
- * 
- * {@link RecordWriter} implementation for Phoenix
- * 
- * 
- *
- */
-public class PhoenixRecordWriter extends RecordWriter<NullWritable, PhoenixRecord> {
-	
-	private static final Log LOG = LogFactory.getLog(PhoenixRecordWriter.class);
-	
-	private long numRecords = 0;
-	
-	private final Connection conn;
-	private final PreparedStatement statement;
-	private final PhoenixPigConfiguration config;
-	private final long batchSize;
-	
-	public PhoenixRecordWriter(Connection conn, PhoenixPigConfiguration config) throws SQLException {
-		this.conn = conn;
-		this.config = config;
-		this.batchSize = config.getBatchSize();
-		this.statement = this.conn.prepareStatement(config.getUpsertStatement());
-	}
-
-
-	/**
-	 * Committing and closing the connection is handled by {@link PhoenixOutputCommitter}.
-	 * 
-	 */
-	@Override
-	public void close(TaskAttemptContext context) throws IOException, InterruptedException {
-	}
-
-	@Override
-	public void write(NullWritable n, PhoenixRecord record) throws IOException, InterruptedException {		
-		try {
-			record.write(statement, config.getColumnMetadataList());
-			numRecords++;
-
-			if (numRecords % batchSize == 0) {
-				LOG.debug("commit called on a batch of size : " + batchSize);
-				conn.commit();
-			}
-		} catch (SQLException e) {
-			throw new IOException("Exception while committing to database.", e);
-		}
-	}
-
-}

http://git-wip-us.apache.org/repos/asf/phoenix/blob/cabb16f7/phoenix-pig/src/main/java/org/apache/phoenix/pig/util/ColumnInfoToStringEncoderDecoder.java
----------------------------------------------------------------------
diff --git a/phoenix-pig/src/main/java/org/apache/phoenix/pig/util/ColumnInfoToStringEncoderDecoder.java b/phoenix-pig/src/main/java/org/apache/phoenix/pig/util/ColumnInfoToStringEncoderDecoder.java
deleted file mode 100644
index 3ea9b5b..0000000
--- a/phoenix-pig/src/main/java/org/apache/phoenix/pig/util/ColumnInfoToStringEncoderDecoder.java
+++ /dev/null
@@ -1,69 +0,0 @@
-/*
- * Copyright 2010 The Apache Software Foundation
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- *distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you maynot use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicablelaw or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.phoenix.pig.util;
-
-import java.util.List;
-
-import org.apache.phoenix.util.ColumnInfo;
-
-import com.google.common.base.Function;
-import com.google.common.base.Joiner;
-import com.google.common.base.Preconditions;
-import com.google.common.base.Splitter;
-import com.google.common.collect.Iterables;
-import com.google.common.collect.Lists;
-
-/**
- * 
- * A codec to transform a {@link ColumnInfo} to a {@link String} and decode back.
- *
- */
-public final class ColumnInfoToStringEncoderDecoder {
-
-    private static final String COLUMN_INFO_DELIMITER = "|";
-    
-    private ColumnInfoToStringEncoderDecoder() {
-        
-    }
-    
-    public static String encode(List<ColumnInfo> columnInfos) {
-        Preconditions.checkNotNull(columnInfos);
-        return Joiner.on(COLUMN_INFO_DELIMITER).
-                        skipNulls().join(columnInfos);
-    }
-    
-    public static List<ColumnInfo> decode(final String columnInfoStr) {
-        Preconditions.checkNotNull(columnInfoStr);
-        List<ColumnInfo> columnInfos = Lists.newArrayList(
-                                Iterables.transform(
-                                        Splitter.on(COLUMN_INFO_DELIMITER).omitEmptyStrings().split(columnInfoStr),
-                                        new Function<String, ColumnInfo>() {
-                                            @Override
-                                            public ColumnInfo apply(String colInfo) {
-                                                if (colInfo.isEmpty()) {
-                                                      return null;
-                                                }
-                                                return ColumnInfo.fromString(colInfo);
-                                            }
-                                        }));
-        return columnInfos;
-        
-    }
-}

http://git-wip-us.apache.org/repos/asf/phoenix/blob/cabb16f7/phoenix-pig/src/main/java/org/apache/phoenix/pig/util/PhoenixPigSchemaUtil.java
----------------------------------------------------------------------
diff --git a/phoenix-pig/src/main/java/org/apache/phoenix/pig/util/PhoenixPigSchemaUtil.java b/phoenix-pig/src/main/java/org/apache/phoenix/pig/util/PhoenixPigSchemaUtil.java
index 9f8a5e4..dfba844 100644
--- a/phoenix-pig/src/main/java/org/apache/phoenix/pig/util/PhoenixPigSchemaUtil.java
+++ b/phoenix-pig/src/main/java/org/apache/phoenix/pig/util/PhoenixPigSchemaUtil.java
@@ -25,8 +25,9 @@ import java.util.List;
 
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
-import org.apache.phoenix.pig.PhoenixPigConfiguration;
-import org.apache.phoenix.pig.PhoenixPigConfiguration.SchemaType;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.phoenix.mapreduce.util.PhoenixConfigurationUtil;
+import org.apache.phoenix.mapreduce.util.PhoenixConfigurationUtil.SchemaType;
 import org.apache.phoenix.schema.PDataType;
 import org.apache.phoenix.util.ColumnInfo;
 import org.apache.pig.ResourceSchema;
@@ -41,24 +42,25 @@ import com.google.common.base.Preconditions;
  */
 public final class PhoenixPigSchemaUtil {
 
-    private static final Log LOG = LogFactory.getLog(PhoenixPigSchemaUtil.class);
+private static final Log LOG = LogFactory.getLog(PhoenixPigSchemaUtil.class);
     
     private PhoenixPigSchemaUtil() {
     }
     
-    public static ResourceSchema getResourceSchema(final PhoenixPigConfiguration phoenixConfiguration) throws IOException {
+    public static ResourceSchema getResourceSchema(final Configuration configuration) throws IOException {
         
         final ResourceSchema schema = new ResourceSchema();
         try {
-        	List<ColumnInfo> columns = null;
-        	if(SchemaType.QUERY.equals(phoenixConfiguration.getSchemaType())) {
-        		final String sqlQuery = phoenixConfiguration.getSelectStatement();
-        		Preconditions.checkNotNull(sqlQuery, "No Sql Query exists within the configuration");
-        		final SqlQueryToColumnInfoFunction function = new SqlQueryToColumnInfoFunction(phoenixConfiguration);
-        		columns = function.apply(sqlQuery);
-        	} else {
-        		columns = phoenixConfiguration.getSelectColumnMetadataList();
-        	}
+            List<ColumnInfo> columns = null;
+            final SchemaType schemaType = PhoenixConfigurationUtil.getSchemaType(configuration);
+            if(SchemaType.QUERY.equals(schemaType)) {
+                final String sqlQuery = PhoenixConfigurationUtil.getSelectStatement(configuration);
+                Preconditions.checkNotNull(sqlQuery, "No Sql Query exists within the configuration");
+                final SqlQueryToColumnInfoFunction function = new SqlQueryToColumnInfoFunction(configuration);
+                columns = function.apply(sqlQuery);
+            } else {
+                columns = PhoenixConfigurationUtil.getSelectColumnMetadataList(configuration);
+            }
             ResourceFieldSchema fields[] = new ResourceFieldSchema[columns.size()];
             int i = 0;
             for(ColumnInfo cinfo : columns) {
@@ -76,6 +78,5 @@ public final class PhoenixPigSchemaUtil {
         }
         
         return schema;
-        
     }
 }

http://git-wip-us.apache.org/repos/asf/phoenix/blob/cabb16f7/phoenix-pig/src/main/java/org/apache/phoenix/pig/util/QuerySchemaParserFunction.java
----------------------------------------------------------------------
diff --git a/phoenix-pig/src/main/java/org/apache/phoenix/pig/util/QuerySchemaParserFunction.java b/phoenix-pig/src/main/java/org/apache/phoenix/pig/util/QuerySchemaParserFunction.java
index 1b3a90a..f0148a6 100644
--- a/phoenix-pig/src/main/java/org/apache/phoenix/pig/util/QuerySchemaParserFunction.java
+++ b/phoenix-pig/src/main/java/org/apache/phoenix/pig/util/QuerySchemaParserFunction.java
@@ -26,16 +26,16 @@ import java.util.List;
 
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.hbase.util.Pair;
 import org.apache.phoenix.compile.ColumnProjector;
 import org.apache.phoenix.compile.QueryPlan;
 import org.apache.phoenix.jdbc.PhoenixStatement;
-import org.apache.phoenix.pig.PhoenixPigConfiguration;
+import org.apache.phoenix.mapreduce.util.ConnectionUtil;
 
 import com.google.common.base.Function;
 import com.google.common.base.Joiner;
 import com.google.common.base.Preconditions;
-import com.google.common.base.Throwables;
 import com.google.common.collect.Lists;
 
 /**
@@ -46,21 +46,20 @@ import com.google.common.collect.Lists;
 public class QuerySchemaParserFunction implements Function<String,Pair<String,String>> {
 
     private static final Log LOG = LogFactory.getLog(QuerySchemaParserFunction.class);
-    private PhoenixPigConfiguration phoenixConfiguration;
+    private final Configuration configuration;
     
-    public QuerySchemaParserFunction(PhoenixPigConfiguration phoenixConfiguration) {
-        Preconditions.checkNotNull(phoenixConfiguration);
-        this.phoenixConfiguration = phoenixConfiguration;
+    public QuerySchemaParserFunction(Configuration configuration) {
+        Preconditions.checkNotNull(configuration);
+        this.configuration = configuration;
     }
     
     @Override
     public Pair<String, String> apply(final String selectStatement) {
         Preconditions.checkNotNull(selectStatement);
         Preconditions.checkArgument(!selectStatement.isEmpty(), "Select Query is empty!!");
-        Preconditions.checkNotNull(this.phoenixConfiguration);
         Connection connection = null;
         try {
-            connection = this.phoenixConfiguration.getConnection();
+            connection = ConnectionUtil.getConnection(this.configuration);
             final Statement  statement = connection.createStatement();
             final PhoenixStatement pstmt = statement.unwrap(PhoenixStatement.class);
             final QueryPlan queryPlan = pstmt.compileQuery(selectStatement);
@@ -78,17 +77,17 @@ public class QuerySchemaParserFunction implements Function<String,Pair<String,St
             return new Pair<String, String>(tableName, columnsAsStr);
         } catch (SQLException e) {
             LOG.error(String.format(" Error [%s] parsing SELECT query [%s] ",e.getMessage(),selectStatement));
-            Throwables.propagate(e);
+            throw new RuntimeException(e);
         } finally {
             if(connection != null) {
                 try {
                     connection.close();
                 } catch(SQLException sqle) {
-                    Throwables.propagate(sqle);
+                    LOG.error(" Error closing connection ");
+                    throw new RuntimeException(sqle);
                 }
             }
         }
-        return null;
     }
     
     /**

http://git-wip-us.apache.org/repos/asf/phoenix/blob/cabb16f7/phoenix-pig/src/main/java/org/apache/phoenix/pig/util/SqlQueryToColumnInfoFunction.java
----------------------------------------------------------------------
diff --git a/phoenix-pig/src/main/java/org/apache/phoenix/pig/util/SqlQueryToColumnInfoFunction.java b/phoenix-pig/src/main/java/org/apache/phoenix/pig/util/SqlQueryToColumnInfoFunction.java
index 52f646c..78bf3a7 100644
--- a/phoenix-pig/src/main/java/org/apache/phoenix/pig/util/SqlQueryToColumnInfoFunction.java
+++ b/phoenix-pig/src/main/java/org/apache/phoenix/pig/util/SqlQueryToColumnInfoFunction.java
@@ -26,60 +26,59 @@ import java.util.List;
 
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
 import org.apache.phoenix.compile.ColumnProjector;
 import org.apache.phoenix.compile.QueryPlan;
 import org.apache.phoenix.jdbc.PhoenixStatement;
-import org.apache.phoenix.pig.PhoenixPigConfiguration;
+import org.apache.phoenix.mapreduce.util.ConnectionUtil;
 import org.apache.phoenix.util.ColumnInfo;
 
 import com.google.common.base.Function;
 import com.google.common.base.Preconditions;
-import com.google.common.base.Throwables;
 import com.google.common.collect.Lists;
 
 public final class SqlQueryToColumnInfoFunction implements Function<String,List<ColumnInfo>> {
 	
-	private static final Log LOG = LogFactory.getLog(SqlQueryToColumnInfoFunction.class);
-	private final PhoenixPigConfiguration phoenixConfiguration;
+    private static final Log LOG = LogFactory.getLog(SqlQueryToColumnInfoFunction.class);
+    private final Configuration configuration;
 
-	public SqlQueryToColumnInfoFunction(
-			final PhoenixPigConfiguration phoenixPigConfiguration) {
-		super();
-		this.phoenixConfiguration = phoenixPigConfiguration;
-	}
+    public SqlQueryToColumnInfoFunction(final Configuration configuration) {
+        this.configuration = configuration;
+    }
 
-	@Override
-	public List<ColumnInfo> apply(String sqlQuery) {
-		Preconditions.checkNotNull(sqlQuery);
-		Connection connection = null;
-		List<ColumnInfo> columnInfos = null;
+    @Override
+    public List<ColumnInfo> apply(String sqlQuery) {
+        Preconditions.checkNotNull(sqlQuery);
+        Connection connection = null;
+        List<ColumnInfo> columnInfos = null;
         try {
-            connection = this.phoenixConfiguration.getConnection();
+            connection = ConnectionUtil.getConnection(this.configuration);
             final Statement  statement = connection.createStatement();
             final PhoenixStatement pstmt = statement.unwrap(PhoenixStatement.class);
             final QueryPlan queryPlan = pstmt.compileQuery(sqlQuery);
             final List<? extends ColumnProjector> projectedColumns = queryPlan.getProjector().getColumnProjectors();
             columnInfos = Lists.newArrayListWithCapacity(projectedColumns.size());
             columnInfos = Lists.transform(projectedColumns, new Function<ColumnProjector,ColumnInfo>() {
-            	@Override
-				public ColumnInfo apply(final ColumnProjector columnProjector) {
-					return new ColumnInfo(columnProjector.getName(), columnProjector.getExpression().getDataType().getSqlType());
-				}
-            	
+                @Override
+                public ColumnInfo apply(final ColumnProjector columnProjector) {
+                    return new ColumnInfo(columnProjector.getName(), columnProjector.getExpression().getDataType().getSqlType());
+                }
+                
             });
-	   } catch (SQLException e) {
+       } catch (SQLException e) {
             LOG.error(String.format(" Error [%s] parsing SELECT query [%s] ",e.getMessage(),sqlQuery));
-            Throwables.propagate(e);
+            throw new RuntimeException(e);
         } finally {
             if(connection != null) {
                 try {
                     connection.close();
                 } catch(SQLException sqle) {
-                    Throwables.propagate(sqle);
+                    LOG.error("Error closing connection!!");
+                    throw new RuntimeException(sqle);
                 }
             }
         }
-		return columnInfos;
-	}
+        return columnInfos;
+    }
 
 }

http://git-wip-us.apache.org/repos/asf/phoenix/blob/cabb16f7/phoenix-pig/src/main/java/org/apache/phoenix/pig/util/TypeUtil.java
----------------------------------------------------------------------
diff --git a/phoenix-pig/src/main/java/org/apache/phoenix/pig/util/TypeUtil.java b/phoenix-pig/src/main/java/org/apache/phoenix/pig/util/TypeUtil.java
index 1cdd66d..20b56b4 100644
--- a/phoenix-pig/src/main/java/org/apache/phoenix/pig/util/TypeUtil.java
+++ b/phoenix-pig/src/main/java/org/apache/phoenix/pig/util/TypeUtil.java
@@ -27,7 +27,7 @@ import java.util.List;
 
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
-import org.apache.phoenix.pig.hadoop.PhoenixRecord;
+import org.apache.phoenix.pig.writable.PhoenixPigDBWritable;
 import org.apache.phoenix.schema.PDataType;
 import org.apache.pig.PigException;
 import org.apache.pig.ResourceSchema.ResourceFieldSchema;
@@ -47,16 +47,16 @@ public final class TypeUtil {
 	
     private static final Log LOG = LogFactory.getLog(TypeUtil.class);
     private static final HBaseBinaryConverter binaryConverter = new HBaseBinaryConverter ();
-	private static final ImmutableMap<PDataType,Byte> phoenixTypeToPigDataType = init();	
-	
-	private TypeUtil(){
-	}
-	
-	/**
-	 * A map of Phoenix to Pig data types.
-	 * @return
-	 */
-	private static ImmutableMap<PDataType, Byte> init() {
+    private static final ImmutableMap<PDataType,Byte> phoenixTypeToPigDataType = init();    
+    
+    private TypeUtil(){
+    }
+    
+    /**
+     * A map of Phoenix to Pig data types.
+     * @return
+     */
+    private static ImmutableMap<PDataType, Byte> init() {
         final ImmutableMap.Builder<PDataType,Byte> builder = new Builder<PDataType,Byte> ();
         builder.put(PDataType.LONG,DataType.LONG);
         builder.put(PDataType.VARBINARY,DataType.BYTEARRAY);
@@ -84,121 +84,121 @@ public final class TypeUtil {
         return builder.build();
     }
     /**
-	 * This method returns the most appropriate PDataType associated with 
-	 * the incoming Pig type. Note for Pig DataType DATETIME, returns DATE as 
-	 * inferredSqlType. 
-	 * 
-	 * This is later used to make a cast to targetPhoenixType accordingly. See
-	 * {@link #castPigTypeToPhoenix(Object, byte, PDataType)}
-	 * 
-	 * @param obj
-	 * @return PDataType
-	 */
-	public static PDataType getType(Object obj, byte type) {
-		if (obj == null) {
-			return null;
-		}
-		PDataType sqlType;
+     * This method returns the most appropriate PDataType associated with 
+     * the incoming Pig type. Note for Pig DataType DATETIME, returns DATE as 
+     * inferredSqlType. 
+     * 
+     * This is later used to make a cast to targetPhoenixType accordingly. See
+     * {@link #castPigTypeToPhoenix(Object, byte, PDataType)}
+     * 
+     * @param obj
+     * @return PDataType
+     */
+    public static PDataType getType(Object obj, byte type) {
+        if (obj == null) {
+            return null;
+        }
+        PDataType sqlType;
 
-		switch (type) {
-		case DataType.BYTEARRAY:
-			sqlType = PDataType.VARBINARY;
-			break;
-		case DataType.CHARARRAY:
-			sqlType = PDataType.VARCHAR;
-			break;
-		case DataType.DOUBLE:
-		case DataType.BIGDECIMAL:
-			sqlType = PDataType.DOUBLE;
-			break;
-		case DataType.FLOAT:
-			sqlType = PDataType.FLOAT;
-			break;
-		case DataType.INTEGER:
-			sqlType = PDataType.INTEGER;
-			break;
-		case DataType.LONG:
-		case DataType.BIGINTEGER:
-			sqlType = PDataType.LONG;
-			break;
-		case DataType.BOOLEAN:
-			sqlType = PDataType.BOOLEAN;
-			break;
-		case DataType.DATETIME:
-			sqlType = PDataType.DATE;
-			break;
-		case DataType.BYTE:
-			sqlType = PDataType.TINYINT;
-			break;
-		default:
-			throw new RuntimeException("Unknown type " + obj.getClass().getName()
-					+ " passed to PhoenixHBaseStorage");
-		}
+        switch (type) {
+        case DataType.BYTEARRAY:
+            sqlType = PDataType.VARBINARY;
+            break;
+        case DataType.CHARARRAY:
+            sqlType = PDataType.VARCHAR;
+            break;
+        case DataType.DOUBLE:
+        case DataType.BIGDECIMAL:
+            sqlType = PDataType.DOUBLE;
+            break;
+        case DataType.FLOAT:
+            sqlType = PDataType.FLOAT;
+            break;
+        case DataType.INTEGER:
+            sqlType = PDataType.INTEGER;
+            break;
+        case DataType.LONG:
+        case DataType.BIGINTEGER:
+            sqlType = PDataType.LONG;
+            break;
+        case DataType.BOOLEAN:
+            sqlType = PDataType.BOOLEAN;
+            break;
+        case DataType.DATETIME:
+            sqlType = PDataType.DATE;
+            break;
+        case DataType.BYTE:
+            sqlType = PDataType.TINYINT;
+            break;
+        default:
+            throw new RuntimeException("Unknown type " + obj.getClass().getName()
+                    + " passed to PhoenixHBaseStorage");
+        }
 
-		return sqlType;
+        return sqlType;
 
-	}
+    }
 
-	/**
-	 * This method encodes a value with Phoenix data type. It begins
-	 * with checking whether an object is BINARY and makes a call to
-	 * {@link #castBytes(Object, PDataType)} to convery bytes to
-	 * targetPhoenixType
-	 * 
-	 * @param o
-	 * @param targetPhoenixType
-	 * @return Object
-	 */
-	public static Object castPigTypeToPhoenix(Object o, byte objectType, PDataType targetPhoenixType) {
-		PDataType inferredPType = getType(o, objectType);
-		
-		if(inferredPType == null) {
-			return null;
-		}
+    /**
+     * This method encodes a value with Phoenix data type. It begins
+     * with checking whether an object is BINARY and makes a call to
+     * {@link #castBytes(Object, PDataType)} to convery bytes to
+     * targetPhoenixType
+     * 
+     * @param o
+     * @param targetPhoenixType
+     * @return Object
+     */
+    public static Object castPigTypeToPhoenix(Object o, byte objectType, PDataType targetPhoenixType) {
+        PDataType inferredPType = getType(o, objectType);
+        
+        if(inferredPType == null) {
+            return null;
+        }
 
-		if(inferredPType == PDataType.VARBINARY) {
-			try {
-				o = castBytes(o, targetPhoenixType);
-				if(targetPhoenixType != PDataType.VARBINARY && targetPhoenixType != PDataType.BINARY) {
-					inferredPType = getType(o, DataType.findType(o));	
-				}
-			} catch (IOException e) {
-				throw new RuntimeException("Error while casting bytes for object " +o);
-			}
-		}
-		if(inferredPType == PDataType.DATE) {
-			int inferredSqlType = targetPhoenixType.getSqlType();
+        if(inferredPType == PDataType.VARBINARY) {
+            try {
+                o = castBytes(o, targetPhoenixType);
+                if(targetPhoenixType != PDataType.VARBINARY && targetPhoenixType != PDataType.BINARY) {
+                    inferredPType = getType(o, DataType.findType(o));   
+                }
+            } catch (IOException e) {
+                throw new RuntimeException("Error while casting bytes for object " +o);
+            }
+        }
+        if(inferredPType == PDataType.DATE) {
+            int inferredSqlType = targetPhoenixType.getSqlType();
 
-			if(inferredSqlType == Types.DATE) {
-				return new Date(((DateTime)o).getMillis());
-			} 
-			if(inferredSqlType == Types.TIME) {
-				return new Time(((DateTime)o).getMillis());
-			}
-			if(inferredSqlType == Types.TIMESTAMP) {
-				return new Timestamp(((DateTime)o).getMillis());
-			}
-		}
-		
-		if (targetPhoenixType == inferredPType || inferredPType.isCoercibleTo(targetPhoenixType)) {
-			return inferredPType.toObject(o, targetPhoenixType);
-		}
-		
-		throw new RuntimeException(o.getClass().getName()
-				+ " cannot be coerced to "+targetPhoenixType.toString());
-	}
-	
-	/**
-	 * This method converts bytes to the target type required
-	 * for Phoenix. It uses {@link Utf8StorageConverter} for
-	 * the conversion.
-	 * 
-	 * @param o
-	 * @param targetPhoenixType
-	 * @return Object
-	 * @throws IOException
-	 */
-	private static Object castBytes(Object o, PDataType targetPhoenixType) throws IOException {
+            if(inferredSqlType == Types.DATE) {
+                return new Date(((DateTime)o).getMillis());
+            } 
+            if(inferredSqlType == Types.TIME) {
+                return new Time(((DateTime)o).getMillis());
+            }
+            if(inferredSqlType == Types.TIMESTAMP) {
+                return new Timestamp(((DateTime)o).getMillis());
+            }
+        }
+        
+        if (targetPhoenixType == inferredPType || inferredPType.isCoercibleTo(targetPhoenixType)) {
+            return inferredPType.toObject(o, targetPhoenixType);
+        }
+        
+        throw new RuntimeException(o.getClass().getName()
+                + " cannot be coerced to "+targetPhoenixType.toString());
+    }
+    
+    /**
+     * This method converts bytes to the target type required
+     * for Phoenix. It uses {@link Utf8StorageConverter} for
+     * the conversion.
+     * 
+     * @param o
+     * @param targetPhoenixType
+     * @return Object
+     * @throws IOException
+     */
+    private static Object castBytes(Object o, PDataType targetPhoenixType) throws IOException {
         byte[] bytes = ((DataByteArray)o).get();
         
         switch(targetPhoenixType) {
@@ -213,7 +213,7 @@ public final class TypeUtil {
             return binaryConverter.bytesToInteger(bytes).byteValue();
         case UNSIGNED_INT:
         case INTEGER:
-        	return binaryConverter.bytesToInteger(bytes);
+            return binaryConverter.bytesToInteger(bytes);
         case BOOLEAN:
             return binaryConverter.bytesToBoolean(bytes);
         case FLOAT:
@@ -227,9 +227,9 @@ public final class TypeUtil {
             return binaryConverter.bytesToLong(bytes);
         case VARBINARY : 
         case BINARY:
-        	 return bytes;
+             return bytes;
         default:
-        	return o;
+            return o;
         }        
     }
     
@@ -240,7 +240,7 @@ public final class TypeUtil {
      * @return
      * @throws IOException
      */
-    public static Tuple transformToTuple(final PhoenixRecord record, final ResourceFieldSchema[] projectedColumns) throws IOException {
+    public static Tuple transformToTuple(final PhoenixPigDBWritable record, final ResourceFieldSchema[] projectedColumns) throws IOException {
         
         List<Object> columnValues = record.getValues();
         if(columnValues == null || columnValues.size() == 0 || projectedColumns == null || projectedColumns.length != columnValues.size()) {

http://git-wip-us.apache.org/repos/asf/phoenix/blob/cabb16f7/phoenix-pig/src/main/java/org/apache/phoenix/pig/writable/PhoenixPigDBWritable.java
----------------------------------------------------------------------
diff --git a/phoenix-pig/src/main/java/org/apache/phoenix/pig/writable/PhoenixPigDBWritable.java b/phoenix-pig/src/main/java/org/apache/phoenix/pig/writable/PhoenixPigDBWritable.java
new file mode 100644
index 0000000..a7399c9
--- /dev/null
+++ b/phoenix-pig/src/main/java/org/apache/phoenix/pig/writable/PhoenixPigDBWritable.java
@@ -0,0 +1,121 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.phoenix.pig.writable;
+
+import java.sql.PreparedStatement;
+import java.sql.ResultSet;
+import java.sql.SQLException;
+import java.util.ArrayList;
+import java.util.List;
+
+import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.mapreduce.lib.db.DBWritable;
+import org.apache.phoenix.pig.util.TypeUtil;
+import org.apache.phoenix.schema.PDataType;
+import org.apache.phoenix.util.ColumnInfo;
+import org.apache.pig.ResourceSchema.ResourceFieldSchema;
+import org.apache.pig.data.DataType;
+
+import com.google.common.base.Preconditions;
+
+/**
+ * A {@link Writable} representing a Phoenix record. This class
+ * a) does a type mapping and sets the value accordingly in the {@link PreparedStatement}
+ * b) reads the column values from the {@link ResultSet}
+ * 
+ */
+public class PhoenixPigDBWritable implements DBWritable {
+    
+    private final List<Object> values;
+    private ResourceFieldSchema[] fieldSchemas;
+    private List<ColumnInfo> columnMetadataList;
+  
+    public PhoenixPigDBWritable() {
+        this.values = new ArrayList<Object>();
+    }
+    
+    @Override
+    public void write(PreparedStatement statement) throws SQLException {
+        for (int i = 0; i < columnMetadataList.size(); i++) {
+            Object o = values.get(i);
+            ColumnInfo columnInfo = columnMetadataList.get(i);
+            byte type = (fieldSchemas == null) ? DataType.findType(o) : fieldSchemas[i].getType();
+            try {
+                Object upsertValue = convertTypeSpecificValue(o, type, columnInfo.getSqlType());
+                if (upsertValue != null) {
+                    statement.setObject(i + 1, upsertValue, columnInfo.getSqlType());
+                } else {
+                    statement.setNull(i + 1, columnInfo.getSqlType());
+                }
+            } catch (RuntimeException re) {
+                throw new RuntimeException(String.format("Unable to process column %s, innerMessage=%s"
+                        ,columnInfo.toString(),re.getMessage()),re);
+                
+            }
+        }
+    }
+    
+    public void add(Object value) {
+        values.add(value);
+    }
+
+    private Object convertTypeSpecificValue(Object o, byte type, Integer sqlType) {
+        PDataType pDataType = PDataType.fromTypeId(sqlType);
+        return TypeUtil.castPigTypeToPhoenix(o, type, pDataType);
+    }
+
+    public List<Object> getValues() {
+        return values;
+    }
+
+    @Override
+    public void readFields(final ResultSet rs) throws SQLException {
+        Preconditions.checkNotNull(rs);
+        final int noOfColumns = rs.getMetaData().getColumnCount();
+        values.clear();
+        for(int i = 1 ; i <= noOfColumns ; i++) {
+            Object obj = rs.getObject(i);
+            values.add(obj);
+        }
+    }
+
+    public ResourceFieldSchema[] getFieldSchemas() {
+        return fieldSchemas;
+    }
+
+    public void setFieldSchemas(ResourceFieldSchema[] fieldSchemas) {
+        this.fieldSchemas = fieldSchemas;
+    }
+
+    public List<ColumnInfo> getColumnMetadataList() {
+        return columnMetadataList;
+    }
+
+    public void setColumnMetadataList(List<ColumnInfo> columnMetadataList) {
+        this.columnMetadataList = columnMetadataList;
+    }
+
+    public static PhoenixPigDBWritable newInstance(final ResourceFieldSchema[] fieldSchemas,
+            final List<ColumnInfo> columnMetadataList) {
+        final PhoenixPigDBWritable dbWritable = new PhoenixPigDBWritable ();
+        dbWritable.setFieldSchemas(fieldSchemas);
+        dbWritable.setColumnMetadataList(columnMetadataList);
+        return dbWritable;
+    }
+
+}