You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@phoenix.apache.org by el...@apache.org on 2014/04/08 21:48:01 UTC

git commit: Commit PHOENIX-898 patch to 4.0

Repository: incubator-phoenix
Updated Branches:
  refs/heads/4.0 666ad87bf -> 1207882b6


Commit PHOENIX-898 patch to 4.0


Project: http://git-wip-us.apache.org/repos/asf/incubator-phoenix/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-phoenix/commit/1207882b
Tree: http://git-wip-us.apache.org/repos/asf/incubator-phoenix/tree/1207882b
Diff: http://git-wip-us.apache.org/repos/asf/incubator-phoenix/diff/1207882b

Branch: refs/heads/4.0
Commit: 1207882b6d5bc3000c148b8f892e7df4b5be391f
Parents: 666ad87
Author: Eli Levine <el...@apache.org>
Authored: Tue Apr 8 11:52:48 2014 -0700
Committer: Eli Levine <el...@apache.org>
Committed: Tue Apr 8 11:52:48 2014 -0700

----------------------------------------------------------------------
 .../apache/phoenix/query/QueryConstants.java    |   1 +
 .../org/apache/phoenix/util/PhoenixRuntime.java | 111 ++++++++++++++++++-
 .../apache/phoenix/util/PhoenixRuntimeTest.java |  36 +++++-
 .../apache/phoenix/pig/PhoenixHBaseStorage.java |  37 +++++--
 .../phoenix/pig/PhoenixPigConfiguration.java    |  79 ++++++++-----
 .../phoenix/pig/hadoop/PhoenixRecord.java       |  24 ++--
 .../phoenix/pig/hadoop/PhoenixRecordWriter.java |   1 -
 7 files changed, 242 insertions(+), 47 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-phoenix/blob/1207882b/phoenix-core/src/main/java/org/apache/phoenix/query/QueryConstants.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/query/QueryConstants.java b/phoenix-core/src/main/java/org/apache/phoenix/query/QueryConstants.java
index 06bccce..fe348e8 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/query/QueryConstants.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/query/QueryConstants.java
@@ -96,6 +96,7 @@ import org.apache.phoenix.util.ByteUtil;
  */
 public interface QueryConstants {
     public static final String NAME_SEPARATOR = ".";
+    public static final String NAME_SEPARATOR_REGEX = "\\.";
     public final static byte[] NAME_SEPARATOR_BYTES = Bytes.toBytes(NAME_SEPARATOR);
     public static final byte NAME_SEPARATOR_BYTE = NAME_SEPARATOR_BYTES[0];
     public static final String NULL_SCHEMA_NAME = "";

http://git-wip-us.apache.org/repos/asf/incubator-phoenix/blob/1207882b/phoenix-core/src/main/java/org/apache/phoenix/util/PhoenixRuntime.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/util/PhoenixRuntime.java b/phoenix-core/src/main/java/org/apache/phoenix/util/PhoenixRuntime.java
index 27c0c2a..a679762 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/util/PhoenixRuntime.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/util/PhoenixRuntime.java
@@ -29,7 +29,9 @@ import java.util.Collections;
 import java.util.Iterator;
 import java.util.List;
 import java.util.Properties;
+import java.util.Set;
 import java.util.StringTokenizer;
+import java.util.TreeSet;
 
 import org.apache.hadoop.hbase.Cell;
 import org.apache.hadoop.hbase.KeyValue;
@@ -40,8 +42,10 @@ import org.apache.phoenix.coprocessor.MetaDataProtocol.MetaDataMutationResult;
 import org.apache.phoenix.coprocessor.MetaDataProtocol.MutationCode;
 import org.apache.phoenix.jdbc.PhoenixConnection;
 import org.apache.phoenix.query.QueryConstants;
+import org.apache.phoenix.schema.ColumnNotFoundException;
 import org.apache.phoenix.schema.MetaDataClient;
 import org.apache.phoenix.schema.PColumn;
+import org.apache.phoenix.schema.PColumnFamily;
 import org.apache.phoenix.schema.PDataType;
 import org.apache.phoenix.schema.PTable;
 import org.apache.phoenix.schema.PTableKey;
@@ -325,7 +329,7 @@ public class PhoenixRuntime {
         };
     }
     
-    private static PTable getTable(Connection conn, String name) throws SQLException {
+    public static PTable getTable(Connection conn, String name) throws SQLException {
         PTable table = null;
         PhoenixConnection pconn = conn.unwrap(PhoenixConnection.class);
         try {
@@ -343,6 +347,111 @@ public class PhoenixRuntime {
     }
     
     /**
+     * Get list of ColumnInfos that contain Column Name and its associated
+     * PDataType for an import. The supplied list of columns can be null -- if it is non-null,
+     * it represents a user-supplied list of columns to be imported.
+     *
+     * @param conn Phoenix connection from which metadata will be read
+     * @param tableName Phoenix table name whose columns are to be checked. Can include a schema
+     *                  name
+     * @param columns user-supplied list of import columns, can be null
+     */
+    public static List<ColumnInfo> generateColumnInfo(Connection conn,
+            String tableName, List<String> columns)
+            throws SQLException {
+
+		PTable table = PhoenixRuntime.getTable(conn, tableName);
+        List<ColumnInfo> columnInfoList = Lists.newArrayList();
+        Set<String> unresolvedColumnNames = new TreeSet<String>();
+        if (columns == null) {
+        	// use all columns in the table
+        	for(PColumn pColumn : table.getColumns()) {
+      	       int sqlType = pColumn.getDataType().getResultSetSqlType();        
+               columnInfoList.add(new ColumnInfo(pColumn.toString(), sqlType));
+        	}
+        } else {
+            // Leave "null" as indication to skip b/c it doesn't exist
+            for (int i = 0; i < columns.size(); i++) {
+                String columnName = columns.get(i);
+                try {
+                    ColumnInfo columnInfo = PhoenixRuntime.getColumnInfo(table, columnName);
+                    columnInfoList.add(columnInfo);
+                } catch (ColumnNotFoundException cnfe) {
+                	unresolvedColumnNames.add(columnName.trim());
+                }
+            }
+                
+            if (unresolvedColumnNames.size()>0) {
+            	StringBuilder exceptionMessage = new StringBuilder();
+            	boolean first = true;
+            	exceptionMessage.append("Unable to resolve these column names:\n");
+            	for (String col : unresolvedColumnNames) {
+            		if (first) first = false;
+            		else exceptionMessage.append(",");
+            		exceptionMessage.append(col);
+            	}
+            	exceptionMessage.append("\nAvailable columns with column families:\n");
+            	first = true;
+            	for (PColumn pColumn : table.getColumns()) {
+            		if (first) first = false;
+            		else exceptionMessage.append(",");
+            		exceptionMessage.append(pColumn.toString());
+            	}
+                throw new SQLException(exceptionMessage.toString()); 
+            }
+                
+        }
+        return columnInfoList;
+    }
+
+    /**
+     * Returns the column info for the given column for the given table.
+     * 
+     * @param table
+     * @param columnName User-specified column name. May be family-qualified or bare.
+     * @return columnInfo associated with the column in the table
+     * @throws SQLException if parameters are null, or if column is not found.
+     */
+    public static ColumnInfo getColumnInfo(PTable table, String columnName) throws SQLException {
+    	if (table==null) {
+    		throw new SQLException("Table must not be null.");
+    	}
+    	if (columnName==null) {
+    		throw new SQLException("columnName must not be null.");
+    	}
+    	columnName = columnName.trim();
+    	PColumn pColumn = null;
+    	if (columnName.contains(QueryConstants.NAME_SEPARATOR)) {
+    		String[] tokens = columnName.split(QueryConstants.NAME_SEPARATOR_REGEX);
+    		if (tokens.length!=2) {
+    			throw new SQLException(String.format("Unable to process column %s, expected family-qualified name.",columnName));
+    		}
+    		String familyName = tokens[0];
+    		String familyColumn = tokens[1];
+            PColumnFamily family = table.getColumnFamily(familyName);
+            pColumn = family.getColumn(familyColumn);
+    	} else {
+    		pColumn = table.getColumn(columnName);
+    	}
+        return getColumnInfo(pColumn);
+    }
+    
+    /**
+     * Constructs a column info for the supplied pColumn
+     * @param pColumn
+     * @return columnInfo
+     * @throws SQLException if the parameter is null.
+     */
+    public static ColumnInfo getColumnInfo(PColumn pColumn) throws SQLException {
+    	if (pColumn==null) {
+    		throw new SQLException("pColumn must not be null.");
+    	}
+    	int sqlType = pColumn.getDataType().getResultSetSqlType();
+        ColumnInfo columnInfo = new ColumnInfo(pColumn.toString(),sqlType);
+        return columnInfo;
+    }
+    
+    /**
      * Encode the primary key values from the table as a byte array. The values must
      * be in the same order as the primary key constraint. If the connection and
      * table are both tenant-specific, the tenant ID column must not be present in

http://git-wip-us.apache.org/repos/asf/incubator-phoenix/blob/1207882b/phoenix-core/src/test/java/org/apache/phoenix/util/PhoenixRuntimeTest.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/test/java/org/apache/phoenix/util/PhoenixRuntimeTest.java b/phoenix-core/src/test/java/org/apache/phoenix/util/PhoenixRuntimeTest.java
index abb2b7a..6d12d95 100644
--- a/phoenix-core/src/test/java/org/apache/phoenix/util/PhoenixRuntimeTest.java
+++ b/phoenix-core/src/test/java/org/apache/phoenix/util/PhoenixRuntimeTest.java
@@ -18,14 +18,21 @@
 
 package org.apache.phoenix.util;
 
-import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.*;
 
 import java.sql.Connection;
 import java.sql.Date;
 import java.sql.DriverManager;
+import java.sql.SQLException;
 import java.util.Arrays;
+import java.util.List;
+
+import junit.framework.Assert;
 
 import org.apache.phoenix.query.BaseConnectionlessQueryTest;
+import org.apache.phoenix.query.QueryConstants;
+import org.apache.phoenix.schema.PColumn;
+import org.apache.phoenix.schema.PTable;
 import org.junit.Test;
 
 public class PhoenixRuntimeTest extends BaseConnectionlessQueryTest {
@@ -41,4 +48,31 @@ public class PhoenixRuntimeTest extends BaseConnectionlessQueryTest {
         Object[] actualValues = PhoenixRuntime.decodePK(conn, "T", value);
         assertEquals(Arrays.asList(expectedValues), Arrays.asList(actualValues));
     }
+    
+    @Test
+    public void testColumnFamilyParsing() {
+    	String columnName = "F.C";
+    	assertTrue(columnName.contains(QueryConstants.NAME_SEPARATOR));
+		String[] tokens = columnName.split(QueryConstants.NAME_SEPARATOR_REGEX);
+    	assertTrue(tokens.length==2);
+    	
+		String familyName = tokens[0];
+		String familyColumn = tokens[1];
+		assertEquals("F",familyName);
+		assertEquals("C",familyColumn);
+		
+    }
+    
+    
+    //TODO add tests for the following:
+    /*
+     * 
+       public static List<ColumnInfo> generateColumnInfo(Connection conn,
+            String tableName, List<String> columns)
+            throws SQLException {...}
+            
+        public static ColumnInfo getColumnInfo(PTable table, String columnName) throws SQLException {...}
+        
+        public static ColumnInfo getColumnInfo(PColumn pColumn) throws SQLException {...}
+     */
 }

http://git-wip-us.apache.org/repos/asf/incubator-phoenix/blob/1207882b/phoenix-pig/src/main/java/org/apache/phoenix/pig/PhoenixHBaseStorage.java
----------------------------------------------------------------------
diff --git a/phoenix-pig/src/main/java/org/apache/phoenix/pig/PhoenixHBaseStorage.java b/phoenix-pig/src/main/java/org/apache/phoenix/pig/PhoenixHBaseStorage.java
index 9e237f1..3ef64b9 100644
--- a/phoenix-pig/src/main/java/org/apache/phoenix/pig/PhoenixHBaseStorage.java
+++ b/phoenix-pig/src/main/java/org/apache/phoenix/pig/PhoenixHBaseStorage.java
@@ -18,6 +18,8 @@
 package org.apache.phoenix.pig;
 
 import java.io.IOException;
+import java.net.URI;
+import java.net.URISyntaxException;
 import java.util.Properties;
 
 import org.apache.commons.cli.CommandLine;
@@ -31,6 +33,8 @@ import org.apache.hadoop.io.NullWritable;
 import org.apache.hadoop.mapreduce.Job;
 import org.apache.hadoop.mapreduce.OutputFormat;
 import org.apache.hadoop.mapreduce.RecordWriter;
+import org.apache.phoenix.pig.hadoop.PhoenixOutputFormat;
+import org.apache.phoenix.pig.hadoop.PhoenixRecord;
 import org.apache.pig.ResourceSchema;
 import org.apache.pig.ResourceSchema.ResourceFieldSchema;
 import org.apache.pig.StoreFuncInterface;
@@ -38,9 +42,6 @@ import org.apache.pig.data.Tuple;
 import org.apache.pig.impl.util.ObjectSerializer;
 import org.apache.pig.impl.util.UDFContext;
 
-import org.apache.phoenix.pig.hadoop.PhoenixOutputFormat;
-import org.apache.phoenix.pig.hadoop.PhoenixRecord;
-
 /**
  * StoreFunc that uses Phoenix to store data into HBase.
  * 
@@ -52,6 +53,16 @@ import org.apache.phoenix.pig.hadoop.PhoenixRecord;
  * argument to this StoreFunc is the server, the 2nd argument is the batch size
  * for upserts via Phoenix.
  * 
+ * Alternative usage: A = load 'testdata' as (a:chararray, b:chararray, 
+ *  e: datetime); STORE A into 'hbase://CORE.ENTITY_HISTORY/ID,F.B,F.E' using
+ * org.apache.bdaas.PhoenixHBaseStorage('localhost','-batchSize 5000');
+ * 
+ * The above reads a file 'testdata' and writes the elements ID, F.B, and F.E to HBase. 
+ * In this example, ID is the row key, and F is the column family for the data elements.  
+ * First argument to this StoreFunc is the server, the 2nd argument is the batch size
+ * for upserts via Phoenix. In this case, less than the full table row is required.
+ * For configuration message, look in the info log file.
+ * 
  * Note that Pig types must be in sync with the target Phoenix data types. This
  * StoreFunc tries best to cast based on input Pig types and target Phoenix data
  * types, but it is recommended to supply appropriate schema.
@@ -65,7 +76,6 @@ import org.apache.phoenix.pig.hadoop.PhoenixRecord;
 public class PhoenixHBaseStorage implements StoreFuncInterface {
 
 	private PhoenixPigConfiguration config;
-	private String tableName;
 	private RecordWriter<NullWritable, PhoenixRecord> writer;
 	private String contextSignature = null;
 	private ResourceSchema schema;	
@@ -118,12 +128,21 @@ public class PhoenixHBaseStorage implements StoreFuncInterface {
 	 */
 	@Override
 	public void setStoreLocation(String location, Job job) throws IOException {
-		String prefix = "hbase://";
-		if (location.startsWith(prefix)) {
-			tableName = location.substring(prefix.length());
+		
+		URI locationURI;
+		try {
+			locationURI = new URI(location);
+			if (!"hbase".equals(locationURI.getScheme())) {
+				throw new IOException(String.format("Location must use the hbase protocol, hbase://tableName[/columnList]. Supplied location=%s",location));
+			}
+			String tableName = locationURI.getAuthority();
+			// strip off the leading path token '/'
+			String columns = locationURI.getPath().substring(1);
+			config = new PhoenixPigConfiguration(job.getConfiguration());
+			config.configure(server, tableName, batchSize, columns);
+		} catch (URISyntaxException e) {
+			throw new IOException(String.format("Location must use the hbase protocol, hbase://tableName[/columnList]. Supplied location=%s",location),e);
 		}
-		config = new PhoenixPigConfiguration(job.getConfiguration());
-		config.configure(server, tableName, batchSize);
 
 		String serializedSchema = getUDFProperties().getProperty(contextSignature + SCHEMA);
 		if (serializedSchema != null) {

http://git-wip-us.apache.org/repos/asf/incubator-phoenix/blob/1207882b/phoenix-pig/src/main/java/org/apache/phoenix/pig/PhoenixPigConfiguration.java
----------------------------------------------------------------------
diff --git a/phoenix-pig/src/main/java/org/apache/phoenix/pig/PhoenixPigConfiguration.java b/phoenix-pig/src/main/java/org/apache/phoenix/pig/PhoenixPigConfiguration.java
index 3b0551f..9a09e3a 100644
--- a/phoenix-pig/src/main/java/org/apache/phoenix/pig/PhoenixPigConfiguration.java
+++ b/phoenix-pig/src/main/java/org/apache/phoenix/pig/PhoenixPigConfiguration.java
@@ -20,20 +20,21 @@ package org.apache.phoenix.pig;
 
 import java.sql.Connection;
 import java.sql.DriverManager;
-import java.sql.ResultSet;
 import java.sql.SQLException;
-import java.util.ArrayList;
 import java.util.List;
 import java.util.Properties;
 
+import org.apache.commons.lang.StringUtils;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.conf.Configuration;
-
 import org.apache.phoenix.jdbc.PhoenixConnection;
 import org.apache.phoenix.util.ColumnInfo;
+import org.apache.phoenix.util.PhoenixRuntime;
 import org.apache.phoenix.util.QueryUtil;
 
+import com.google.common.collect.Lists;
+
 /**
  * A container for configuration to be used with {@link PhoenixHBaseStorage}
  * 
@@ -58,6 +59,8 @@ public class PhoenixPigConfiguration {
 	
 	public static final String TABLE_NAME = "phoenix.hbase.table.name";
 	
+	public static final String UPSERT_COLUMNS = "phoenix.hbase.upsert.columns";
+	
 	public static final String UPSERT_STATEMENT = "phoenix.upsert.stmt";
 	
 	public static final String UPSERT_BATCH_SIZE = "phoenix.upsert.batch.size";
@@ -74,9 +77,17 @@ public class PhoenixPigConfiguration {
 	}
 	
 	public void configure(String server, String tableName, long batchSize) {
+		configure(server,tableName,batchSize,null);
+	}
+	
+	public void configure(String server, String tableName, long batchSize, String columns) {
 		conf.set(SERVER_NAME, server);
 		conf.set(TABLE_NAME, tableName);
 		conf.setLong(UPSERT_BATCH_SIZE, batchSize);
+		if (columns!=null) {
+			conf.set(UPSERT_COLUMNS, columns);
+		}
+
 		conf.setBoolean(MAP_SPECULATIVE_EXEC, false);
 		conf.setBoolean(REDUCE_SPECULATIVE_EXEC, false);
 	}
@@ -107,23 +118,49 @@ public class PhoenixPigConfiguration {
 		// Reset batch size
 		long batchSize = getBatchSize() <= 0 ? ((PhoenixConnection) conn).getMutateBatchSize() : getBatchSize();
 		conf.setLong(UPSERT_BATCH_SIZE, batchSize);
+		boolean useUpsertColumns = false;
 		
 		if (columnMetadataList == null) {
-			columnMetadataList = new ArrayList<ColumnInfo>();
-			String[] tableMetadata = getTableMetadata(getTableName());
-			ResultSet rs = conn.getMetaData().getColumns(null, tableMetadata[0], tableMetadata[1], null);
-			while (rs.next()) {
-				columnMetadataList.add(new ColumnInfo(rs.getString(QueryUtil.COLUMN_NAME_POSITION), rs.getInt(QueryUtil.DATA_TYPE_POSITION)));
+			String upsertColumns = getUpsertColumns();
+			List<String> upsertColumnList = Lists.newArrayList();
+			if (StringUtils.isNotEmpty(upsertColumns)) {
+				useUpsertColumns = true;
+				String[] upsertColumnTokens = upsertColumns.split(",");
+				for (String token : upsertColumnTokens) {
+					upsertColumnList.add(token.trim());
+				}
 			}
+
+			StringBuilder parsedColumns = new StringBuilder();
+			boolean first = true;
+			for (String key : upsertColumnList) {
+				if (first) first=false;
+				else parsedColumns.append(",");
+				parsedColumns.append(key);
+			}
+			LOG.info(String.format("UseUpsertColumns=%s, upsertColumns=%s, upsertColumnSet.size()=%s, parsedColumns=%s "
+					,useUpsertColumns,upsertColumns, upsertColumnList.size(), parsedColumns.toString()
+					));
+
+			columnMetadataList = PhoenixRuntime.generateColumnInfo(conn, getTableName(), upsertColumnList);
+			
 		}
 		
-		// Generating UPSERT statement without column name information.
-		String upsertStmt = QueryUtil.constructGenericUpsertStatement(getTableName(), columnMetadataList.size());
-		LOG.info("Phoenix Upsert Statement: " + upsertStmt);
-		conf.set(UPSERT_STATEMENT, upsertStmt);
+		if (useUpsertColumns) {
+			// Generating UPSERT statement without column name information.
+			String upsertStmt = QueryUtil.constructUpsertStatement(getTableName(), columnMetadataList);
+			LOG.info("Phoenix Custom Upsert Statement: " + upsertStmt);
+			conf.set(UPSERT_STATEMENT, upsertStmt);
+			
+		} else {
+			// Generating UPSERT statement without column name information.
+			String upsertStmt = QueryUtil.constructGenericUpsertStatement(getTableName(), columnMetadataList.size());
+			LOG.info("Phoenix Generic Upsert Statement: " + upsertStmt);
+			conf.set(UPSERT_STATEMENT, upsertStmt);
+		}
 	}
 	
-	public String getUpsertStatement() {
+    public String getUpsertStatement() {
 		return conf.get(UPSERT_STATEMENT);
 	}
 
@@ -140,22 +177,14 @@ public class PhoenixPigConfiguration {
 		return columnMetadataList;
 	}
 	
+	public String getUpsertColumns() {
+		return conf.get(UPSERT_COLUMNS);
+	}
+	
 	public String getTableName() {
 		return conf.get(TABLE_NAME);
 	}
 
-	private String[] getTableMetadata(String table) {
-		String[] schemaAndTable = table.split("\\.");
-		assert schemaAndTable.length >= 1;
-
-		if (schemaAndTable.length == 1) {
-			return new String[] { "", schemaAndTable[0] };
-		}
-
-		return new String[] { schemaAndTable[0], schemaAndTable[1] };
-	}
-
-	
 	public Configuration getConfiguration() {
 		return this.conf;
 	}

http://git-wip-us.apache.org/repos/asf/incubator-phoenix/blob/1207882b/phoenix-pig/src/main/java/org/apache/phoenix/pig/hadoop/PhoenixRecord.java
----------------------------------------------------------------------
diff --git a/phoenix-pig/src/main/java/org/apache/phoenix/pig/hadoop/PhoenixRecord.java b/phoenix-pig/src/main/java/org/apache/phoenix/pig/hadoop/PhoenixRecord.java
index b9d03de..92035eb 100644
--- a/phoenix-pig/src/main/java/org/apache/phoenix/pig/hadoop/PhoenixRecord.java
+++ b/phoenix-pig/src/main/java/org/apache/phoenix/pig/hadoop/PhoenixRecord.java
@@ -27,12 +27,11 @@ import java.util.ArrayList;
 import java.util.List;
 
 import org.apache.hadoop.io.Writable;
-import org.apache.pig.ResourceSchema.ResourceFieldSchema;
-import org.apache.pig.data.DataType;
-
 import org.apache.phoenix.pig.TypeUtil;
 import org.apache.phoenix.schema.PDataType;
 import org.apache.phoenix.util.ColumnInfo;
+import org.apache.pig.ResourceSchema.ResourceFieldSchema;
+import org.apache.pig.data.DataType;
 
 /**
  * A {@link Writable} representing a Phoenix record. This class
@@ -63,17 +62,22 @@ public class PhoenixRecord implements Writable {
 	public void write(PreparedStatement statement, List<ColumnInfo> columnMetadataList) throws SQLException {
 		for (int i = 0; i < columnMetadataList.size(); i++) {
 			Object o = values.get(i);
+			ColumnInfo columnInfo = columnMetadataList.get(i);
 			
 			byte type = (fieldSchemas == null) ? DataType.findType(o) : fieldSchemas[i].getType();
-			Object upsertValue = convertTypeSpecificValue(o, type, columnMetadataList.get(i).getSqlType());
-
-			if (upsertValue != null) {
-				statement.setObject(i + 1, upsertValue, columnMetadataList.get(i).getSqlType());
-			} else {
-				statement.setNull(i + 1, columnMetadataList.get(i).getSqlType());
+			try {
+				Object upsertValue = convertTypeSpecificValue(o, type, columnInfo.getSqlType());
+				if (upsertValue != null) {
+					statement.setObject(i + 1, upsertValue, columnInfo.getSqlType());
+				} else {
+					statement.setNull(i + 1, columnInfo.getSqlType());
+				}
+			} catch (RuntimeException re) {
+				throw new RuntimeException(String.format("Unable to process column %s, innerMessage=%s"
+						,columnInfo.toString(),re.getMessage()),re);
+				
 			}
 		}
-		
 		statement.execute();
 	}
 	

http://git-wip-us.apache.org/repos/asf/incubator-phoenix/blob/1207882b/phoenix-pig/src/main/java/org/apache/phoenix/pig/hadoop/PhoenixRecordWriter.java
----------------------------------------------------------------------
diff --git a/phoenix-pig/src/main/java/org/apache/phoenix/pig/hadoop/PhoenixRecordWriter.java b/phoenix-pig/src/main/java/org/apache/phoenix/pig/hadoop/PhoenixRecordWriter.java
index c980a38..fbee4ca 100644
--- a/phoenix-pig/src/main/java/org/apache/phoenix/pig/hadoop/PhoenixRecordWriter.java
+++ b/phoenix-pig/src/main/java/org/apache/phoenix/pig/hadoop/PhoenixRecordWriter.java
@@ -28,7 +28,6 @@ import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.io.NullWritable;
 import org.apache.hadoop.mapreduce.RecordWriter;
 import org.apache.hadoop.mapreduce.TaskAttemptContext;
-
 import org.apache.phoenix.pig.PhoenixPigConfiguration;
 
 /**