You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@phoenix.apache.org by td...@apache.org on 2015/06/24 22:24:41 UTC

[45/49] phoenix git commit: PHOENIX-1981 : PhoenixHBase Load and Store Funcs should handle all Pig data types

PHOENIX-1981 : PhoenixHBase Load and Store Funcs should handle all Pig data types


Project: http://git-wip-us.apache.org/repos/asf/phoenix/repo
Commit: http://git-wip-us.apache.org/repos/asf/phoenix/commit/8076126a
Tree: http://git-wip-us.apache.org/repos/asf/phoenix/tree/8076126a
Diff: http://git-wip-us.apache.org/repos/asf/phoenix/diff/8076126a

Branch: refs/heads/json
Commit: 8076126a741a0cf2a5839b88904fa08bfdfb6cdb
Parents: b61ef77
Author: Prashant Kommireddi <pk...@pkommireddi-ltm.internal.salesforce.com>
Authored: Mon May 18 19:41:08 2015 -0700
Committer: Eli Levine <el...@apache.org>
Committed: Mon Jun 15 18:17:44 2015 -0700

----------------------------------------------------------------------
 .../org/apache/phoenix/pig/util/TypeUtil.java   | 415 +++++++++----------
 .../apache/phoenix/pig/util/TypeUtilTest.java   |  52 +++
 2 files changed, 251 insertions(+), 216 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/phoenix/blob/8076126a/phoenix-pig/src/main/java/org/apache/phoenix/pig/util/TypeUtil.java
----------------------------------------------------------------------
diff --git a/phoenix-pig/src/main/java/org/apache/phoenix/pig/util/TypeUtil.java b/phoenix-pig/src/main/java/org/apache/phoenix/pig/util/TypeUtil.java
index bdee3a4..6549445 100644
--- a/phoenix-pig/src/main/java/org/apache/phoenix/pig/util/TypeUtil.java
+++ b/phoenix-pig/src/main/java/org/apache/phoenix/pig/util/TypeUtil.java
@@ -1,19 +1,11 @@
 /*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
+ * Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE
+ * file distributed with this work for additional information regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the
+ * License. You may obtain a copy of the License at http://www.apache.org/licenses/LICENSE-2.0 Unless required by
+ * applicable law or agreed to in writing, software distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the specific language
+ * governing permissions and limitations under the License.
  */
 
 package org.apache.phoenix.pig.util;
@@ -29,11 +21,11 @@ import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.phoenix.pig.writable.PhoenixPigDBWritable;
 import org.apache.phoenix.schema.types.PBinary;
-import org.apache.phoenix.schema.types.PChar;
-import org.apache.phoenix.schema.types.PDecimal;
 import org.apache.phoenix.schema.types.PBoolean;
+import org.apache.phoenix.schema.types.PChar;
 import org.apache.phoenix.schema.types.PDataType;
 import org.apache.phoenix.schema.types.PDate;
+import org.apache.phoenix.schema.types.PDecimal;
 import org.apache.phoenix.schema.types.PDouble;
 import org.apache.phoenix.schema.types.PFloat;
 import org.apache.phoenix.schema.types.PInteger;
@@ -56,7 +48,6 @@ import org.apache.phoenix.schema.types.PVarchar;
 import org.apache.pig.PigException;
 import org.apache.pig.ResourceSchema.ResourceFieldSchema;
 import org.apache.pig.backend.hadoop.hbase.HBaseBinaryConverter;
-import org.apache.pig.builtin.Utf8StorageConverter;
 import org.apache.pig.data.DataByteArray;
 import org.apache.pig.data.DataType;
 import org.apache.pig.data.Tuple;
@@ -68,258 +59,250 @@ import com.google.common.collect.ImmutableMap;
 import com.google.common.collect.ImmutableMap.Builder;
 
 public final class TypeUtil {
-	
+
     private static final Log LOG = LogFactory.getLog(TypeUtil.class);
-    private static final HBaseBinaryConverter binaryConverter = new HBaseBinaryConverter ();
-	private static final ImmutableMap<PDataType,Byte> phoenixTypeToPigDataType = init();
-	
-	private TypeUtil(){
-	}
-	
-	/**
-	 * A map of Phoenix to Pig data types.
-	 * @return
-	 */
-	private static ImmutableMap<PDataType, Byte> init() {
-        final ImmutableMap.Builder<PDataType,Byte> builder = new Builder<PDataType,Byte> ();
-        builder.put(PLong.INSTANCE,DataType.LONG);
-        builder.put(PVarbinary.INSTANCE,DataType.BYTEARRAY);
-        builder.put(PChar.INSTANCE,DataType.CHARARRAY);
-        builder.put(PVarchar.INSTANCE,DataType.CHARARRAY);
-        builder.put(PDouble.INSTANCE,DataType.DOUBLE);
-        builder.put(PFloat.INSTANCE,DataType.FLOAT);
-        builder.put(PInteger.INSTANCE,DataType.INTEGER);
-        builder.put(PTinyint.INSTANCE,DataType.INTEGER);
-        builder.put(PSmallint.INSTANCE,DataType.INTEGER);
-        builder.put(PDecimal.INSTANCE,DataType.BIGDECIMAL);
-        builder.put(PTime.INSTANCE,DataType.DATETIME);
-        builder.put(PTimestamp.INSTANCE,DataType.DATETIME);
-        builder.put(PBoolean.INSTANCE,DataType.BOOLEAN);
-        builder.put(PDate.INSTANCE,DataType.DATETIME);
-        builder.put(PUnsignedDate.INSTANCE,DataType.DATETIME);
-        builder.put(PUnsignedDouble.INSTANCE,DataType.DOUBLE);
-        builder.put(PUnsignedFloat.INSTANCE,DataType.FLOAT);
-        builder.put(PUnsignedInt.INSTANCE,DataType.INTEGER);
-        builder.put(PUnsignedLong.INSTANCE,DataType.LONG);
-        builder.put(PUnsignedSmallint.INSTANCE,DataType.INTEGER);
-        builder.put(PUnsignedTime.INSTANCE,DataType.DATETIME);
-        builder.put(PUnsignedTimestamp.INSTANCE,DataType.DATETIME);
-        builder.put(PUnsignedTinyint.INSTANCE,DataType.INTEGER);
+    private static final HBaseBinaryConverter BINARY_CONVERTER = new HBaseBinaryConverter();
+    private static final ImmutableMap<PDataType, Byte> PHOENIX_TO_PIG_TYPE = init();
+    private static final TupleFactory TUPLE_FACTORY = TupleFactory.getInstance();
+
+    private TypeUtil() {}
+
+    /**
+     * A map of Phoenix to Pig data types.
+     * 
+     * @return
+     */
+    private static ImmutableMap<PDataType, Byte> init() {
+        final ImmutableMap.Builder<PDataType, Byte> builder = new Builder<PDataType, Byte>();
+        builder.put(PLong.INSTANCE, DataType.LONG);
+        builder.put(PVarbinary.INSTANCE, DataType.BYTEARRAY);
+        builder.put(PChar.INSTANCE, DataType.CHARARRAY);
+        builder.put(PVarchar.INSTANCE, DataType.CHARARRAY);
+        builder.put(PDouble.INSTANCE, DataType.DOUBLE);
+        builder.put(PFloat.INSTANCE, DataType.FLOAT);
+        builder.put(PInteger.INSTANCE, DataType.INTEGER);
+        builder.put(PTinyint.INSTANCE, DataType.INTEGER);
+        builder.put(PSmallint.INSTANCE, DataType.INTEGER);
+        builder.put(PDecimal.INSTANCE, DataType.BIGDECIMAL);
+        builder.put(PTime.INSTANCE, DataType.DATETIME);
+        builder.put(PTimestamp.INSTANCE, DataType.DATETIME);
+        builder.put(PBoolean.INSTANCE, DataType.BOOLEAN);
+        builder.put(PDate.INSTANCE, DataType.DATETIME);
+        builder.put(PUnsignedDate.INSTANCE, DataType.DATETIME);
+        builder.put(PUnsignedDouble.INSTANCE, DataType.DOUBLE);
+        builder.put(PUnsignedFloat.INSTANCE, DataType.FLOAT);
+        builder.put(PUnsignedInt.INSTANCE, DataType.INTEGER);
+        builder.put(PUnsignedLong.INSTANCE, DataType.LONG);
+        builder.put(PUnsignedSmallint.INSTANCE, DataType.INTEGER);
+        builder.put(PUnsignedTime.INSTANCE, DataType.DATETIME);
+        builder.put(PUnsignedTimestamp.INSTANCE, DataType.DATETIME);
+        builder.put(PUnsignedTinyint.INSTANCE, DataType.INTEGER);
         return builder.build();
     }
+
+    /**
+     * This method returns the most appropriate PDataType associated with the incoming Pig type. Note for Pig DataType
+     * DATETIME, returns DATE as inferredSqlType. This is later used to make a cast to targetPhoenixType accordingly.
+     * See {@link #castPigTypeToPhoenix(Object, byte, PDataType)}
+     * 
+     * @param obj
+     * @return PDataType
+     */
+    public static PDataType getType(Object obj, byte type) {
+        if (obj == null) { return null; }
+        PDataType sqlType;
+
+        switch (type) {
+        case DataType.BYTEARRAY:
+            sqlType = PVarbinary.INSTANCE;
+            break;
+        case DataType.CHARARRAY:
+            sqlType = PVarchar.INSTANCE;
+            break;
+        case DataType.DOUBLE:
+        case DataType.BIGDECIMAL:
+            sqlType = PDouble.INSTANCE;
+            break;
+        case DataType.FLOAT:
+            sqlType = PFloat.INSTANCE;
+            break;
+        case DataType.INTEGER:
+            sqlType = PInteger.INSTANCE;
+            break;
+        case DataType.LONG:
+        case DataType.BIGINTEGER:
+            sqlType = PLong.INSTANCE;
+            break;
+        case DataType.BOOLEAN:
+            sqlType = PBoolean.INSTANCE;
+            break;
+        case DataType.DATETIME:
+            sqlType = PDate.INSTANCE;
+            break;
+        case DataType.BYTE:
+            sqlType = PTinyint.INSTANCE;
+            break;
+        default:
+            throw new RuntimeException("Unknown type " + obj.getClass().getName() + " passed to PhoenixHBaseStorage");
+        }
+
+        return sqlType;
+
+    }
+
     /**
-	 * This method returns the most appropriate PDataType associated with 
-	 * the incoming Pig type. Note for Pig DataType DATETIME, returns DATE as 
-	 * inferredSqlType. 
-	 * 
-	 * This is later used to make a cast to targetPhoenixType accordingly. See
-	 * {@link #castPigTypeToPhoenix(Object, byte, PDataType)}
-	 * 
-	 * @param obj
-	 * @return PDataType
-	 */
-	public static PDataType getType(Object obj, byte type) {
-		if (obj == null) {
-			return null;
-		}
-		PDataType sqlType;
+     * This method encodes a value with Phoenix data type. It begins with checking whether an object is BINARY and makes
+     * a call to {@link #castBytes(Object, PDataType)} to convery bytes to targetPhoenixType
+     * 
+     * @param o
+     * @param targetPhoenixType
+     * @return Object
+     */
+    public static Object castPigTypeToPhoenix(Object o, byte objectType, PDataType targetPhoenixType) {
+        PDataType inferredPType = getType(o, objectType);
 
-		switch (type) {
-		case DataType.BYTEARRAY:
-			sqlType = PVarbinary.INSTANCE;
-			break;
-		case DataType.CHARARRAY:
-			sqlType = PVarchar.INSTANCE;
-			break;
-		case DataType.DOUBLE:
-		case DataType.BIGDECIMAL:
-			sqlType = PDouble.INSTANCE;
-			break;
-		case DataType.FLOAT:
-			sqlType = PFloat.INSTANCE;
-			break;
-		case DataType.INTEGER:
-			sqlType = PInteger.INSTANCE;
-			break;
-		case DataType.LONG:
-		case DataType.BIGINTEGER:
-			sqlType = PLong.INSTANCE;
-			break;
-		case DataType.BOOLEAN:
-			sqlType = PBoolean.INSTANCE;
-			break;
-		case DataType.DATETIME:
-			sqlType = PDate.INSTANCE;
-			break;
-		case DataType.BYTE:
-			sqlType = PTinyint.INSTANCE;
-			break;
-		default:
-			throw new RuntimeException("Unknown type " + obj.getClass().getName()
-					+ " passed to PhoenixHBaseStorage");
-		}
+        if (inferredPType == null) { return null; }
 
-		return sqlType;
+        if (inferredPType == PVarbinary.INSTANCE) {
+            try {
+                o = castBytes(o, targetPhoenixType);
+                if (targetPhoenixType != PVarbinary.INSTANCE && targetPhoenixType != PBinary.INSTANCE) {
+                    inferredPType = getType(o, DataType.findType(o));
+                }
+            } catch (IOException e) {
+                throw new RuntimeException("Error while casting bytes for object " + o);
+            }
+        }
+        if (inferredPType == PDate.INSTANCE) {
+            int inferredSqlType = targetPhoenixType.getSqlType();
 
-	}
+            if (inferredSqlType == Types.DATE) { return new Date(((DateTime)o).getMillis()); }
+            if (inferredSqlType == Types.TIME) { return new Time(((DateTime)o).getMillis()); }
+            if (inferredSqlType == Types.TIMESTAMP) { return new Timestamp(((DateTime)o).getMillis()); }
+        }
 
-	/**
-	 * This method encodes a value with Phoenix data type. It begins
-	 * with checking whether an object is BINARY and makes a call to
-	 * {@link #castBytes(Object, PDataType)} to convery bytes to
-	 * targetPhoenixType
-	 * 
-	 * @param o
-	 * @param targetPhoenixType
-	 * @return Object
-	 */
-	public static Object castPigTypeToPhoenix(Object o, byte objectType, PDataType targetPhoenixType) {
-		PDataType inferredPType = getType(o, objectType);
-		
-		if(inferredPType == null) {
-			return null;
-		}
+        if (targetPhoenixType == inferredPType || inferredPType.isCoercibleTo(targetPhoenixType)) { return inferredPType
+                .toObject(o, targetPhoenixType); }
 
-		if(inferredPType == PVarbinary.INSTANCE) {
-			try {
-				o = castBytes(o, targetPhoenixType);
-				if(targetPhoenixType != PVarbinary.INSTANCE && targetPhoenixType != PBinary.INSTANCE) {
-					inferredPType = getType(o, DataType.findType(o));	
-				}
-			} catch (IOException e) {
-				throw new RuntimeException("Error while casting bytes for object " +o);
-			}
-		}
-		if(inferredPType == PDate.INSTANCE) {
-			int inferredSqlType = targetPhoenixType.getSqlType();
+        throw new RuntimeException(o.getClass().getName() + " cannot be coerced to " + targetPhoenixType.toString());
+    }
 
-			if(inferredSqlType == Types.DATE) {
-				return new Date(((DateTime)o).getMillis());
-			} 
-			if(inferredSqlType == Types.TIME) {
-				return new Time(((DateTime)o).getMillis());
-			}
-			if(inferredSqlType == Types.TIMESTAMP) {
-				return new Timestamp(((DateTime)o).getMillis());
-			}
-		}
-		
-		if (targetPhoenixType == inferredPType || inferredPType.isCoercibleTo(targetPhoenixType)) {
-			return inferredPType.toObject(o, targetPhoenixType);
-		}
-		
-		throw new RuntimeException(o.getClass().getName()
-				+ " cannot be coerced to "+targetPhoenixType.toString());
-	}
-	
-	/**
-	 * This method converts bytes to the target type required
-	 * for Phoenix. It uses {@link Utf8StorageConverter} for
-	 * the conversion.
-	 * 
-	 * @param o
-	 * @param targetPhoenixType
-	 * @return Object
-	 * @throws IOException
-	 */
-	private static Object castBytes(Object o, PDataType targetPhoenixType) throws IOException {
+    /**
+     * This method converts bytes to the target type required for Phoenix. It uses {@link HBaseBinaryConverter} for the
+     * conversion.
+     * 
+     * @param o
+     * @param targetPhoenixType
+     * @return Object
+     * @throws IOException
+     */
+    private static Object castBytes(Object o, PDataType targetPhoenixType) throws IOException {
         byte[] bytes = ((DataByteArray)o).get();
 
         if (PDataType.equalsAny(targetPhoenixType, PChar.INSTANCE, PVarchar.INSTANCE)) {
-            return binaryConverter.bytesToCharArray(bytes);
+            return BINARY_CONVERTER.bytesToCharArray(bytes);
         } else if (PDataType.equalsAny(targetPhoenixType, PUnsignedSmallint.INSTANCE, PSmallint.INSTANCE)) {
-            return binaryConverter.bytesToInteger(bytes).shortValue();
+            return BINARY_CONVERTER.bytesToInteger(bytes).shortValue();
         } else if (PDataType.equalsAny(targetPhoenixType, PUnsignedTinyint.INSTANCE, PTinyint.INSTANCE)) {
-            return binaryConverter.bytesToInteger(bytes).byteValue();
+            return BINARY_CONVERTER.bytesToInteger(bytes).byteValue();
         } else if (PDataType.equalsAny(targetPhoenixType, PUnsignedInt.INSTANCE, PInteger.INSTANCE)) {
-            return binaryConverter.bytesToInteger(bytes);
+            return BINARY_CONVERTER.bytesToInteger(bytes);
         } else if (targetPhoenixType.equals(PBoolean.INSTANCE)) {
-            return binaryConverter.bytesToBoolean(bytes);
+            return BINARY_CONVERTER.bytesToBoolean(bytes);
         } else if (PDataType.equalsAny(targetPhoenixType, PFloat.INSTANCE, PUnsignedFloat.INSTANCE)) {
-            return binaryConverter.bytesToFloat(bytes);
+            return BINARY_CONVERTER.bytesToFloat(bytes);
         } else if (PDataType.equalsAny(targetPhoenixType, PDouble.INSTANCE, PUnsignedDouble.INSTANCE)) {
-            return binaryConverter.bytesToDouble(bytes);
+            return BINARY_CONVERTER.bytesToDouble(bytes);
         } else if (PDataType.equalsAny(targetPhoenixType, PUnsignedLong.INSTANCE, PLong.INSTANCE)) {
-            return binaryConverter.bytesToLong(bytes);
+            return BINARY_CONVERTER.bytesToLong(bytes);
         } else if (PDataType.equalsAny(targetPhoenixType, PVarbinary.INSTANCE, PBinary.INSTANCE)) {
             return bytes;
         } else {
             return o;
-        }        
+        }
     }
-    
+
     /**
      * Transforms the PhoenixRecord to Pig {@link Tuple}.
+     * 
      * @param record
      * @param projectedColumns
      * @return
      * @throws IOException
      */
-    public static Tuple transformToTuple(final PhoenixPigDBWritable record, final ResourceFieldSchema[] projectedColumns) throws IOException {
-        
+    public static Tuple transformToTuple(final PhoenixPigDBWritable record, final ResourceFieldSchema[] projectedColumns)
+            throws IOException {
+
         List<Object> columnValues = record.getValues();
-        if(columnValues == null || columnValues.size() == 0 || projectedColumns == null || projectedColumns.length != columnValues.size()) {
-            return null;
-        }
-        int columns = columnValues.size();
-        Tuple tuple = TupleFactory.getInstance().newTuple(columns);
+        if (columnValues == null || columnValues.size() == 0 || projectedColumns == null
+                || projectedColumns.length != columnValues.size()) { return null; }
+        int numColumns = columnValues.size();
+        Tuple tuple = TUPLE_FACTORY.newTuple(numColumns);
         try {
-            for(int i = 0 ; i < columns ; i++) {
+            for (int i = 0; i < numColumns; i++) {
                 final ResourceFieldSchema fieldSchema = projectedColumns[i];
                 Object object = columnValues.get(i);
                 if (object == null) {
                     tuple.set(i, null);
                     continue;
                 }
-                
-                switch(fieldSchema.getType()) {
-                    case DataType.BYTEARRAY:
-                        byte[] bytes = PDataType.fromTypeId(PBinary.INSTANCE.getSqlType()).toBytes(object);
-                        tuple.set(i,new DataByteArray(bytes,0,bytes.length));
-                        break;
-                    case DataType.CHARARRAY:
-                        tuple.set(i,DataType.toString(object));
-                        break;
-                    case DataType.DOUBLE:
-                        tuple.set(i,DataType.toDouble(object));
-                        break;
-                    case DataType.FLOAT:
-                        tuple.set(i,DataType.toFloat(object));
-                        break;
-                    case DataType.INTEGER:
-                        tuple.set(i,DataType.toInteger(object));
-                        break;
-                    case DataType.LONG:
-                        tuple.set(i,DataType.toLong(object));
-                        break;
-                    case DataType.BOOLEAN:
-                        tuple.set(i,DataType.toBoolean(object));
-                        break;
-                    case DataType.DATETIME:
-                        tuple.set(i,DataType.toDateTime(object));
-                        break;
-                    default:
-                        throw new RuntimeException(String.format(" Not supported [%s] pig type" , fieldSchema));
+
+                switch (fieldSchema.getType()) {
+                case DataType.BYTEARRAY:
+                    byte[] bytes = PDataType.fromTypeId(PBinary.INSTANCE.getSqlType()).toBytes(object);
+                    tuple.set(i, new DataByteArray(bytes, 0, bytes.length));
+                    break;
+                case DataType.CHARARRAY:
+                    tuple.set(i, DataType.toString(object));
+                    break;
+                case DataType.DOUBLE:
+                    tuple.set(i, DataType.toDouble(object));
+                    break;
+                case DataType.FLOAT:
+                    tuple.set(i, DataType.toFloat(object));
+                    break;
+                case DataType.INTEGER:
+                    tuple.set(i, DataType.toInteger(object));
+                    break;
+                case DataType.LONG:
+                    tuple.set(i, DataType.toLong(object));
+                    break;
+                case DataType.BOOLEAN:
+                    tuple.set(i, DataType.toBoolean(object));
+                    break;
+                case DataType.DATETIME:
+                    tuple.set(i, DataType.toDateTime(object));
+                    break;
+                case DataType.BIGDECIMAL:
+                    tuple.set(i, DataType.toBigDecimal(object));
+                    break;
+                case DataType.BIGINTEGER:
+                    tuple.set(i, DataType.toBigInteger(object));
+                    break;
+                default:
+                    throw new RuntimeException(String.format(" Not supported [%s] pig type", fieldSchema));
                 }
             }
-        } catch( Exception ex) {
+        } catch (Exception ex) {
             final String errorMsg = String.format(" Error transforming PhoenixRecord to Tuple [%s] ", ex.getMessage());
             LOG.error(errorMsg);
             throw new PigException(errorMsg);
         }
-          return tuple;
+        return tuple;
     }
-    
+
     /**
      * Returns the mapping pig data type for a given phoenix data type.
+     * 
      * @param phoenixDataType
      * @return
      */
     public static Byte getPigDataTypeForPhoenixType(final PDataType phoenixDataType) {
         Preconditions.checkNotNull(phoenixDataType);
-        final Byte pigDataType = phoenixTypeToPigDataType.get(phoenixDataType);
-        if(LOG.isDebugEnabled()) {
-            LOG.debug(String.format(" For PhoenixDataType [%s] , pigDataType is [%s] " , phoenixDataType.getSqlTypeName() , DataType.findTypeName(pigDataType)));    
+        final Byte pigDataType = PHOENIX_TO_PIG_TYPE.get(phoenixDataType);
+        if (LOG.isDebugEnabled()) {
+            LOG.debug(String.format(" For PhoenixDataType [%s] , pigDataType is [%s] ",
+                    phoenixDataType.getSqlTypeName(), DataType.findTypeName(pigDataType)));
         }
         return pigDataType;
     }

http://git-wip-us.apache.org/repos/asf/phoenix/blob/8076126a/phoenix-pig/src/test/java/org/apache/phoenix/pig/util/TypeUtilTest.java
----------------------------------------------------------------------
diff --git a/phoenix-pig/src/test/java/org/apache/phoenix/pig/util/TypeUtilTest.java b/phoenix-pig/src/test/java/org/apache/phoenix/pig/util/TypeUtilTest.java
new file mode 100644
index 0000000..25d9f48
--- /dev/null
+++ b/phoenix-pig/src/test/java/org/apache/phoenix/pig/util/TypeUtilTest.java
@@ -0,0 +1,52 @@
+package org.apache.phoenix.pig.util;
+
+import static org.junit.Assert.assertEquals;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
+
+import java.math.BigDecimal;
+import java.math.BigInteger;
+import java.util.List;
+
+import org.apache.phoenix.pig.writable.PhoenixPigDBWritable;
+import org.apache.pig.ResourceSchema.ResourceFieldSchema;
+import org.apache.pig.data.DataType;
+import org.apache.pig.data.Tuple;
+import org.junit.Test;
+
+import com.google.common.collect.Lists;
+
+public class TypeUtilTest {
+
+    @Test
+    public void testTransformToTuple() throws Exception {
+        PhoenixPigDBWritable record = mock(PhoenixPigDBWritable.class);
+        List<Object> values = Lists.newArrayList();
+        values.add("213123");
+        values.add(1231123);
+        values.add(31231231232131L);
+        values.add("bytearray".getBytes());
+        when(record.getValues()).thenReturn(values);
+
+        ResourceFieldSchema field = new ResourceFieldSchema().setType(DataType.CHARARRAY);
+        ResourceFieldSchema field1 = new ResourceFieldSchema().setType(DataType.INTEGER);
+        ResourceFieldSchema field2 = new ResourceFieldSchema().setType(DataType.LONG);
+        ResourceFieldSchema field3 = new ResourceFieldSchema().setType(DataType.BYTEARRAY);
+        ResourceFieldSchema[] projectedColumns = { field, field1, field2, field3 };
+
+        Tuple t = TypeUtil.transformToTuple(record, projectedColumns);
+
+        assertEquals(DataType.LONG, DataType.findType(t.get(2)));
+
+        field = new ResourceFieldSchema().setType(DataType.BIGDECIMAL);
+        field1 = new ResourceFieldSchema().setType(DataType.BIGINTEGER);
+        values.clear();
+        values.add(new BigDecimal(123123123.123213));
+        values.add(new BigInteger("1312313231312"));
+        ResourceFieldSchema[] columns = { field, field1 };
+        t = TypeUtil.transformToTuple(record, columns);
+
+        assertEquals(DataType.BIGDECIMAL, DataType.findType(t.get(0)));
+        assertEquals(DataType.BIGINTEGER, DataType.findType(t.get(1)));
+    }
+}