You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hawq.apache.org by rv...@apache.org on 2015/09/19 02:36:25 UTC

[42/51] [partial] incubator-hawq git commit: SGA import

http://git-wip-us.apache.org/repos/asf/incubator-hawq/blob/8b26974c/contrib/hawq-hadoop/hawq-mapreduce-ao/src/main/java/com/pivotal/hawq/mapreduce/ao/io/HAWQAOFileReader.java
----------------------------------------------------------------------
diff --git a/contrib/hawq-hadoop/hawq-mapreduce-ao/src/main/java/com/pivotal/hawq/mapreduce/ao/io/HAWQAOFileReader.java b/contrib/hawq-hadoop/hawq-mapreduce-ao/src/main/java/com/pivotal/hawq/mapreduce/ao/io/HAWQAOFileReader.java
new file mode 100644
index 0000000..bf2129e
--- /dev/null
+++ b/contrib/hawq-hadoop/hawq-mapreduce-ao/src/main/java/com/pivotal/hawq/mapreduce/ao/io/HAWQAOFileReader.java
@@ -0,0 +1,469 @@
+package com.pivotal.hawq.mapreduce.ao.io;
+
+import com.pivotal.hawq.mapreduce.HAWQException;
+import com.pivotal.hawq.mapreduce.ao.file.HAWQAOSplit;
+import com.pivotal.hawq.mapreduce.ao.util.CompressZlib;
+import com.pivotal.hawq.mapreduce.util.HAWQConvertUtil;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FSDataInputStream;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.mapreduce.InputSplit;
+
+import java.io.IOException;
+
+/**
+ * Supply analysis of binary file. Get different header in file and analyze them
+ * to read correct data next.
+ */
+public final class HAWQAOFileReader
+{
+
+	private final int smallContentArraySize = 4 * 1024 * 1024; // 4M
+	private final int largeContentArraySize = 5 * smallContentArraySize; // 20M
+	private final int offsetArraySize = 32 * 1024; // 32K
+
+	private FileSystem fs = null;
+	private FSDataInputStream dis = null;
+
+	private boolean checksum;
+	private int blockSize;
+	private String compressType;
+
+	private long fileLength;
+	private long readLength;
+
+	// 2M Bytes for memtuples in small content
+	private byte[] smallTuple_Varblock = new byte[smallContentArraySize];
+	// Default 10M Bytes for memtuples in large content
+	private byte[] largeTuple = new byte[largeContentArraySize];
+	// Actual data size in memtuples
+	private int effectiveSmallTupleLength = -1;
+	// Actual data size in memtuples
+	private int effectiveLargeTupleLength = -1;
+
+	private int[] offsets = new int[offsetArraySize];
+	private int effectiveOffsetLength = -1;
+	private int currentPosInOffsets = -1;
+
+	private byte[] buffer = new byte[8];
+
+	public HAWQAOFileReader(Configuration conf, InputSplit split)
+			throws IOException
+	{
+		HAWQAOSplit aosplit = (HAWQAOSplit) split;
+		checksum = aosplit.getChecksum();
+		compressType = aosplit.getCompressType().toLowerCase();
+		if (!compressType.equals("none") && !compressType.equals("quicklz")
+				&& !compressType.equals("zlib") && !compressType.equals(""))
+			throw new IOException("Compression type " + compressType
+					+ " is not supported yet");
+		blockSize = aosplit.getBlockSize();
+		fileLength = aosplit.getLength();
+		readLength = 0;
+
+		Path path = aosplit.getPath();
+		fs = path.getFileSystem(conf);
+		dis = fs.open(path);
+	}
+
+	/**
+	 * Read tuple into record
+	 * 
+	 * @param record
+	 * @return false if failed to read a record from file
+	 * @throws IOException
+	 * @throws HAWQException
+	 */
+	public boolean readRecord(HAWQAORecord record) throws IOException,
+			HAWQException
+	{
+		if (effectiveSmallTupleLength == -1 && effectiveLargeTupleLength == -1)
+		{
+			if (readLength >= fileLength)
+				return false;
+			if (dis.available() == 0)
+			{
+				if (readLength < fileLength)
+					System.err.println("Expect file length: " + fileLength
+							+ ", read length in fact: " + readLength);
+				return false;
+			}
+
+			readMemtuplesFromDis();
+			if (effectiveSmallTupleLength == -1
+					&& effectiveLargeTupleLength == -1)
+				return false;
+		}
+		record.reset();
+		getRecordFromMemtuples(record);
+		return true;
+	}
+
+	/**
+	 * @throws IOException
+	 */
+	public void close() throws IOException
+	{
+		if (dis != null)
+			dis.close();
+	}
+
+	/**
+	 * if memtuples is empty, then read bytes from data input stream and saved
+	 * in memtuples
+	 * 
+	 * @throws IOException
+	 * @throws HAWQException
+	 */
+	private void readMemtuplesFromDis() throws HAWQException, IOException
+	{
+		int blockHeaderBytes_0_3, blockHeaderBytes_4_7;
+		do {
+			dis.readFully(buffer);
+			readLength += 8;
+			blockHeaderBytes_0_3 = HAWQConvertUtil.bytesToInt(buffer, 0);
+			blockHeaderBytes_4_7 = HAWQConvertUtil.bytesToInt(buffer, 4);
+		}while (blockHeaderBytes_0_3 == 0 && blockHeaderBytes_4_7 == 0);
+
+		int reserved0 = (blockHeaderBytes_0_3 >> 31);
+		if (reserved0 != 0)
+			throw new HAWQException("reserved0 of AOHeader is not 0",
+					HAWQException.WRONGFILEFORMAT_EXCEPTION);
+		int headerKind = (blockHeaderBytes_0_3 & 0x70000000) >> 28;
+
+		switch (headerKind)
+		{
+		case AoHeaderKind.SmallContent:
+			readMemtuplesFromSmallContent(blockHeaderBytes_0_3,
+					blockHeaderBytes_4_7, true);
+			break;
+		case AoHeaderKind.LargeContent:
+			readMemtuplesFromLargeContent(blockHeaderBytes_0_3,
+					blockHeaderBytes_4_7);
+			break;
+		case AoHeaderKind.NonBulkDenseContent:
+			throw new HAWQException("Not support NonBulkDenseContent yet");
+		case AoHeaderKind.BulkDenseContent:
+			throw new HAWQException("Not support BulkDenseContent yet");
+		default:
+			throw new HAWQException("Unexpected Append-Only header kind "
+					+ headerKind, HAWQException.WRONGFILEFORMAT_EXCEPTION);
+		}
+	}
+
+	/**
+	 * Read bytes into memtuples
+	 * 
+	 * @param blockHeaderBytes_0_3
+	 *            byte 0 to byte 3 of small content header
+	 * @param blockHeaderBytes_4_7
+	 *            byte 4 to byte 7 of small content header
+	 * @param isSmall
+	 *            true means this small content dose not belong to a large
+	 *            content
+	 * @throws IOException
+	 * @throws HAWQException
+	 */
+	private void readMemtuplesFromSmallContent(int blockHeaderBytes_0_3,
+			int blockHeaderBytes_4_7, boolean isSmall) throws HAWQException,
+			IOException
+	{
+		// Small content header
+		int rowCount = (blockHeaderBytes_0_3 & 0x00FFFC00) >> 10;
+		int executorBlockKind = (blockHeaderBytes_0_3 & 0x07000000) >> 24;
+		int hasFirstRowNum = (blockHeaderBytes_0_3 & 0x08000000) >> 27;
+		if (checksum)
+		{
+			// skip checksum
+			dis.readFully(buffer);
+			readLength += 8;
+		}
+		if (hasFirstRowNum == 1)
+		{
+			// TODO: firstRowNum skip??
+			dis.readFully(buffer);
+			readLength += 8;
+		}
+		int uncompressedLen = ((blockHeaderBytes_0_3 & 0x000003FF) << 11)
+				| (((blockHeaderBytes_4_7 & 0xFFE00000) >> 21) & 0x000007FF);
+		int compressedLen = (blockHeaderBytes_4_7 & 0x001FFFFF);
+		int overallByteLen;
+		if (compressedLen == 0)
+			// Uncompressed
+			overallByteLen = AoRelationVersion.RoundUp(uncompressedLen,
+					AoRelationVersion.GetLatest());
+		else
+			overallByteLen = AoRelationVersion.RoundUp(compressedLen,
+					AoRelationVersion.GetLatest());
+		readLength += overallByteLen;
+		switch (executorBlockKind)
+		{
+		case AoExecutorBlockKind.SingleRow:
+			// Single row
+			if (isSmall && rowCount != 1)
+				throw new HAWQException("Wrong row count for single row: "
+						+ rowCount, HAWQException.WRONGFILEFORMAT_EXCEPTION);
+			if (compressedLen != 0)
+			{
+				byte[] tempBytes = new byte[overallByteLen];
+				dis.readFully(tempBytes);
+				if (compressType.equals("zlib"))
+					tempBytes = CompressZlib.decompress(tempBytes);
+				else if (compressType.equals("quicklz"))
+				{
+					throw new IOException("compresstype quicklz is not supported anymore");
+				}
+				if (tempBytes.length != uncompressedLen)
+					throw new IOException("tempBytes.length != uncompressedLen");
+				System.arraycopy(tempBytes, 0, smallTuple_Varblock, 0,
+						tempBytes.length);
+				effectiveSmallTupleLength = tempBytes.length;
+			}
+			else
+			{
+				effectiveSmallTupleLength = overallByteLen;
+				while (overallByteLen != 0)
+					overallByteLen -= dis.read(smallTuple_Varblock,
+							effectiveSmallTupleLength - overallByteLen,
+							overallByteLen);
+			}
+			if (isSmall)
+			{
+				offsets[0] = 0;
+				currentPosInOffsets = 0;
+				effectiveOffsetLength = 1;
+			}
+			break;
+		case AoExecutorBlockKind.VarBlock:
+			// Multiple rows
+			if (compressedLen != 0)
+			{
+				byte[] allbytes = new byte[overallByteLen];
+				dis.readFully(allbytes);
+				if (compressType.equals("zlib"))
+					allbytes = CompressZlib.decompress(allbytes);
+				else if (compressType.equals("quicklz"))
+				{
+					throw new IOException("compresstype quicklz is not supported anymore");
+				}
+				if (allbytes.length != uncompressedLen)
+					throw new IOException("allbytes.length != uncompressedLen");
+				overallByteLen = allbytes.length;
+				System.arraycopy(allbytes, 0, smallTuple_Varblock, 0,
+						allbytes.length);
+			}
+			else
+			{
+				int tempLength = overallByteLen;
+				while (tempLength != 0)
+					tempLength -= dis.read(smallTuple_Varblock, overallByteLen
+							- tempLength, tempLength);
+			}
+
+			int posInAllBytes = 0;
+			int varblockHeader_0_3 = HAWQConvertUtil.bytesToInt(
+					smallTuple_Varblock, posInAllBytes);
+			posInAllBytes += 4;
+			int varblockHeader_4_7 = HAWQConvertUtil.bytesToInt(
+					smallTuple_Varblock, posInAllBytes);
+			posInAllBytes += 4;
+			overallByteLen -= 2 * 4;
+			int varBlockVersion = (varblockHeader_0_3 & 0x03000000) >> 24;
+			if (varBlockVersion != 1)
+				throw new HAWQException("version " + varBlockVersion
+						+ " is not 1", HAWQException.WRONGFILEFORMAT_EXCEPTION);
+			int reserved_varblock = (varblockHeader_0_3 & 0x7C000000) >> 26;
+			if (reserved_varblock != 0)
+				throw new HAWQException("Reserved not 0",
+						HAWQException.WRONGFILEFORMAT_EXCEPTION);
+			int moreReserved_varblock = (varblockHeader_4_7 & 0xFF000000) >> 24;
+			if (moreReserved_varblock != 0)
+				throw new HAWQException("More reserved not 0",
+						HAWQException.WRONGFILEFORMAT_EXCEPTION);
+			int itemCount = (varblockHeader_4_7 & 0x00FFFFFF);
+			if (rowCount != itemCount)
+				throw new HAWQException("row count in content header is "
+						+ rowCount + " and item count in varblock header is "
+						+ itemCount, HAWQException.WRONGFILEFORMAT_EXCEPTION);
+			int itemLenByteSum = varblockHeader_0_3 & 0x00FFFFFF;
+			effectiveSmallTupleLength = itemLenByteSum + 8;
+			posInAllBytes += itemLenByteSum;
+			overallByteLen -= itemLenByteSum;
+			int offsetAreSmall = (varblockHeader_0_3 >> 31) & 0x00000001;
+			if (offsetAreSmall == 1)
+			{
+				effectiveOffsetLength = overallByteLen / 2;
+				for (int i = 0; i < effectiveOffsetLength; ++i)
+				{
+					offsets[i] = ((int) HAWQConvertUtil.bytesToShort(
+							smallTuple_Varblock, posInAllBytes)) & 0x0000FFFF;
+					posInAllBytes += 2;
+				}
+			}
+			else
+			{
+				effectiveOffsetLength = overallByteLen / 3;
+				for (int i = 0; i < effectiveOffsetLength; ++i)
+				{
+					offsets[i] = HAWQConvertUtil.threeBytesToInt(
+							smallTuple_Varblock, posInAllBytes);
+					posInAllBytes += 3;
+				}
+			}
+			currentPosInOffsets = 0;
+			break;
+		default:
+			throw new HAWQException("Unexpected executor block kind "
+					+ executorBlockKind,
+					HAWQException.WRONGFILEFORMAT_EXCEPTION);
+		}
+	}
+
+	private void readMemtuplesFromLargeContent(int blockHeaderBytes_0_3,
+			int blockHeaderBytes_4_7) throws HAWQException, IOException
+	{
+		int reserved1 = (blockHeaderBytes_0_3 & 0x00800000) >> 23;
+		if (reserved1 != 0)
+			throw new HAWQException(
+					"reserved1 of AOHeader_LargeContent is not 0",
+					HAWQException.WRONGFILEFORMAT_EXCEPTION);
+		int hasFirstRowNum = (blockHeaderBytes_0_3 & 0x08000000) >> 27;
+		if (checksum)
+		{
+			// skip checksum
+			dis.readFully(buffer);
+			readLength += 8;
+		}
+		if (hasFirstRowNum == 1)
+		{
+			// TODO: firstRowNum skip??
+			dis.readFully(buffer);
+			readLength += 8;
+		}
+		int executorBlockKind = (blockHeaderBytes_0_3 & 0x07000000) >> 24;
+		// AppendOnlyExecutorReadBlock_GetContents:591
+		if (executorBlockKind != AoExecutorBlockKind.SingleRow)
+			throw new HAWQException(
+					"executor block kind of large content should be "
+							+ AoExecutorBlockKind.SingleRow + " rather than "
+							+ executorBlockKind,
+					HAWQException.WRONGFILEFORMAT_EXCEPTION);
+		int largeRowCount = ((blockHeaderBytes_0_3 & 0x007FFFFF) << 2)
+				| (((blockHeaderBytes_4_7 & 0xC0000000) >> 30) & 0x00000003);
+		if (largeRowCount != 1)
+			throw new HAWQException(
+					"row count in large content should be 1 rather than "
+							+ largeRowCount,
+					HAWQException.WRONGFILEFORMAT_EXCEPTION);
+		int largeContentLength = blockHeaderBytes_4_7 & 0x3FFFFFFF;
+		int smallNumOfLarge = (largeContentLength - 1) / blockSize + 1;
+		effectiveLargeTupleLength = 0;
+		for (int i = 0; i < smallNumOfLarge; ++i)
+		{
+			dis.readFully(buffer);
+			readLength += 8;
+			int smallblockHeaderBytes_0_3 = HAWQConvertUtil.bytesToInt(buffer,
+					0);
+			int smallblockHeaderBytes_4_7 = HAWQConvertUtil.bytesToInt(buffer,
+					4);
+			readMemtuplesFromSmallContent(smallblockHeaderBytes_0_3,
+					smallblockHeaderBytes_4_7, false);
+			if (effectiveLargeTupleLength + effectiveSmallTupleLength > largeTuple.length)
+			{
+				// buffer for large content is not big enough, each allocate 2M
+				// Bytes for largeMemtuples
+				byte[] temp = new byte[largeTuple.length
+						+ smallContentArraySize * 2];
+				System.arraycopy(largeTuple, 0, temp, 0,
+						effectiveLargeTupleLength);
+				largeTuple = temp;
+			}
+			System.arraycopy(smallTuple_Varblock, 0, largeTuple,
+					effectiveLargeTupleLength, effectiveSmallTupleLength);
+			effectiveLargeTupleLength += effectiveSmallTupleLength;
+			effectiveSmallTupleLength = -1;
+		}
+		if (largeContentLength != effectiveLargeTupleLength)
+			throw new HAWQException("Fail to read tuple from file",
+					HAWQException.WRONGFILEFORMAT_EXCEPTION);
+		offsets = new int[1];
+		offsets[0] = 0;
+		currentPosInOffsets = 0;
+	}
+
+	/**
+	 * Get tuple from array memtuples
+	 * 
+	 * @return array contains values of one tuple(must be imperfect)
+	 * @throws IOException
+	 * @throws HAWQException
+	 */
+	private void getRecordFromMemtuples(HAWQAORecord record)
+			throws IOException, HAWQException
+	{
+		if (effectiveLargeTupleLength != -1)
+		{
+			// large tuple
+			record.setMemtuples(largeTuple, 0, effectiveLargeTupleLength - 1);
+			effectiveLargeTupleLength = -1;
+		}
+		else
+		{
+			// small tuples
+			if (currentPosInOffsets == effectiveOffsetLength - 1
+					|| offsets[currentPosInOffsets + 1] == 0)
+			{
+				record.setMemtuples(smallTuple_Varblock,
+									offsets[currentPosInOffsets],
+									effectiveSmallTupleLength - 1);
+				effectiveSmallTupleLength = -1;
+				effectiveOffsetLength = -1;
+				currentPosInOffsets = -1;
+			}
+			else
+			{
+				record.setMemtuples(smallTuple_Varblock,
+									offsets[currentPosInOffsets],
+									offsets[currentPosInOffsets + 1] - 1);
+			}
+			++currentPosInOffsets;
+		}
+	}
+
+	public static class AoHeaderKind
+	{
+		public static final int SmallContent = 1;
+		public static final int LargeContent = 2;
+		public static final int NonBulkDenseContent = 3;
+		public static final int BulkDenseContent = 4;
+	}
+
+	public static class AoExecutorBlockKind
+	{
+		public static final int None = 0;
+		public static final int VarBlock = 1;
+		public static final int SingleRow = 2;
+		public static final int Max = Integer.MAX_VALUE;
+	}
+
+	public static class AoRelationVersion
+	{
+		public static final int None = 0;
+		public static final int Original = 1;
+		public static final int Aligned64bit = 2;
+		public static final int Max = Integer.MAX_VALUE;
+
+		public static int GetLatest()
+		{
+			return Aligned64bit;
+		}
+
+		public static int RoundUp(int length, int version)
+		{
+			return (version > Original) ? (((length + 7) / 8) * 8)
+					: (((length + 3) / 4) * 4);
+		}
+	}
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-hawq/blob/8b26974c/contrib/hawq-hadoop/hawq-mapreduce-ao/src/main/java/com/pivotal/hawq/mapreduce/ao/io/HAWQAORecord.java
----------------------------------------------------------------------
diff --git a/contrib/hawq-hadoop/hawq-mapreduce-ao/src/main/java/com/pivotal/hawq/mapreduce/ao/io/HAWQAORecord.java b/contrib/hawq-hadoop/hawq-mapreduce-ao/src/main/java/com/pivotal/hawq/mapreduce/ao/io/HAWQAORecord.java
new file mode 100644
index 0000000..e377ed1
--- /dev/null
+++ b/contrib/hawq-hadoop/hawq-mapreduce-ao/src/main/java/com/pivotal/hawq/mapreduce/ao/io/HAWQAORecord.java
@@ -0,0 +1,1051 @@
+package com.pivotal.hawq.mapreduce.ao.io;
+
+import com.pivotal.hawq.mapreduce.HAWQException;
+import com.pivotal.hawq.mapreduce.HAWQRecord;
+import com.pivotal.hawq.mapreduce.datatype.HAWQBox;
+import com.pivotal.hawq.mapreduce.datatype.HAWQCidr;
+import com.pivotal.hawq.mapreduce.datatype.HAWQCircle;
+import com.pivotal.hawq.mapreduce.datatype.HAWQInet;
+import com.pivotal.hawq.mapreduce.datatype.HAWQInterval;
+import com.pivotal.hawq.mapreduce.datatype.HAWQLseg;
+import com.pivotal.hawq.mapreduce.datatype.HAWQMacaddr;
+import com.pivotal.hawq.mapreduce.datatype.HAWQPath;
+import com.pivotal.hawq.mapreduce.datatype.HAWQPoint;
+import com.pivotal.hawq.mapreduce.datatype.HAWQPolygon;
+import com.pivotal.hawq.mapreduce.datatype.HAWQVarbit;
+import com.pivotal.hawq.mapreduce.schema.HAWQField;
+import com.pivotal.hawq.mapreduce.schema.HAWQPrimitiveField;
+import com.pivotal.hawq.mapreduce.schema.HAWQSchema;
+import com.pivotal.hawq.mapreduce.util.HAWQConvertUtil;
+
+import java.io.UnsupportedEncodingException;
+import java.math.BigDecimal;
+import java.sql.Array;
+import java.sql.Date;
+import java.sql.Time;
+import java.sql.Timestamp;
+
+/**
+ * Extends {@link HAWQRecord} and realize abstract get/set methods in
+ * HAWQRecord. For append only file, object of this class is generated and
+ * return back to customer.
+ */
+public class HAWQAORecord extends HAWQRecord
+{
+	private String encoding;
+	/*
+	 * GPSQL-1047
+	 * 
+	 * If database is working on mac, this value is true;
+	 */
+	private boolean isMac;
+
+	private HAWQPrimitiveField.PrimitiveType[] schemaType = null;
+	private int columnNum;
+
+	private byte[] memtuples = null;
+	private int start;
+	private int end;
+	private int isLargeTup;
+
+	private int[] offsetOfEachColumn;
+	private boolean[] nullmap;
+
+	private int numOf8MoreBytes = 0;
+	private int[] offsetOf8MoreBytes = null;
+	private int numOf4MoreBytes = 0;
+	private int[] offsetOf4MoreBytes = null;
+	private int numOf2Bytes = 0;
+	private int[] offsetOf2Bytes = null;
+	private int numOf1Byte = 0;
+	private int[] offsetOf1Bytes = null;
+	private int numOfOffset = 0;
+
+	private HAWQPrimitiveField.PrimitiveType[] typesOf4MoreBytes = null;
+	private HAWQPrimitiveField.PrimitiveType[] typesOf8MoreBytes = null;
+
+	private int nullmapExtraBytes;
+
+	/**
+	 * Constructor
+	 * 
+	 * @param schema
+	 *            schema
+	 * @param encoding
+	 *            encoding in this file
+	 * @param version
+	 *            version
+	 * @throws HAWQException
+	 *             when there are unsupproted types in schema
+	 */
+	public HAWQAORecord(HAWQSchema schema, String encoding, String version)
+			throws HAWQException
+	{
+		super(schema);
+		columnNum = schema.getFieldCount();
+		schemaType = new HAWQPrimitiveField.PrimitiveType[columnNum];
+		offsetOfEachColumn = new int[columnNum];
+		nullmap = new boolean[columnNum];
+		for (int i = 0; i < columnNum; i++)
+		{
+			HAWQField field = schema.getField(i + 1);
+			if (field.isPrimitive())
+				schemaType[i] = field.asPrimitive().getType();
+			else
+				throw new HAWQException("User define type is not supported yet");
+			values[i] = null;
+			offsetOfEachColumn[i] = -1;
+			nullmap[i] = false;
+		}
+		nullmapExtraBytes = -1;
+		initFromSchema();
+		isMac = version.contains("on i386-apple-darwin");
+		offsetOf8MoreBytes = new int[numOf8MoreBytes];
+		offsetOf4MoreBytes = new int[numOf4MoreBytes + numOfOffset];
+		offsetOf2Bytes = new int[numOf2Bytes + numOfOffset];
+		offsetOf1Bytes = new int[numOf1Byte];
+		this.encoding = encoding;
+		encodingMap();
+	}
+
+	private void encodingMap()
+	{
+		if (encoding.equals("ISO_8859_5"))
+			encoding = "ISO-8859-5";
+		else if (encoding.equals("ISO_8859_6"))
+			encoding = "ISO-8859-6";
+		else if (encoding.equals("ISO_8859_7"))
+			encoding = "ISO-8859-7";
+		else if (encoding.equals("ISO_8859_8"))
+			encoding = "ISO-8859-8";
+		else if (encoding.equals("KOI8R"))
+			encoding = "KOI8-R";
+		else if (encoding.equals("UTF8"))
+			encoding = "UTF-8";
+		else if (encoding.equals("WIN1250"))
+			encoding = "windows-1250";
+		else if (encoding.equals("WIN1251"))
+			encoding = "windows-1251";
+		else if (encoding.equals("WIN1252"))
+			encoding = "windows-1252";
+		else if (encoding.equals("WIN1253"))
+			encoding = "windows-1253";
+		else if (encoding.equals("WIN1254"))
+			encoding = "windows-1254";
+		else if (encoding.equals("WIN1255"))
+			encoding = "windows-1255";
+		else if (encoding.equals("WIN1256"))
+			encoding = "windows-1256";
+		else if (encoding.equals("WIN1257"))
+			encoding = "windows-1257";
+		else if (encoding.equals("WIN1258"))
+			encoding = "windows-1258";
+		else if (encoding.equals("JOHAB"))
+			encoding = "x-Johab";
+		else if (encoding.equals("WIN874"))
+			encoding = "x-windows-874";
+	}
+
+	@Override
+	public boolean getBoolean(int fieldIndex) throws HAWQException
+	{
+		checkFieldIndex(fieldIndex);
+		if (values[fieldIndex - 1] == null)
+			get(fieldIndex);
+
+		return super.getBoolean(fieldIndex);
+	}
+
+	@Override
+	public byte getByte(int fieldIndex) throws HAWQException
+	{
+		checkFieldIndex(fieldIndex);
+		if (values[fieldIndex - 1] == null)
+			get(fieldIndex);
+
+		return super.getByte(fieldIndex);
+	}
+
+	@Override
+	public byte[] getBytes(int fieldIndex) throws HAWQException
+	{
+		checkFieldIndex(fieldIndex);
+		if (values[fieldIndex - 1] == null)
+			get(fieldIndex);
+
+		return super.getBytes(fieldIndex);
+	}
+
+	@Override
+	public double getDouble(int fieldIndex) throws HAWQException
+	{
+		checkFieldIndex(fieldIndex);
+		if (values[fieldIndex - 1] == null)
+			get(fieldIndex);
+
+		return super.getDouble(fieldIndex);
+	}
+
+	@Override
+	public float getFloat(int fieldIndex) throws HAWQException
+	{
+		checkFieldIndex(fieldIndex);
+		if (values[fieldIndex - 1] == null)
+			get(fieldIndex);
+
+		return super.getFloat(fieldIndex);
+	}
+
+	@Override
+	public int getInt(int fieldIndex) throws HAWQException
+	{
+		checkFieldIndex(fieldIndex);
+		if (values[fieldIndex - 1] == null)
+			get(fieldIndex);
+
+		return super.getInt(fieldIndex);
+	}
+
+	@Override
+	public long getLong(int fieldIndex) throws HAWQException
+	{
+		checkFieldIndex(fieldIndex);
+		if (values[fieldIndex - 1] == null)
+			get(fieldIndex);
+
+		return super.getLong(fieldIndex);
+	}
+
+	@Override
+	public short getShort(int fieldIndex) throws HAWQException
+	{
+		checkFieldIndex(fieldIndex);
+		if (values[fieldIndex - 1] == null)
+			get(fieldIndex);
+
+		return super.getShort(fieldIndex);
+	}
+
+	@Override
+	public String getString(int fieldIndex) throws HAWQException
+	{
+		checkFieldIndex(fieldIndex);
+		if (values[fieldIndex - 1] == null)
+			get(fieldIndex);
+
+		return super.getString(fieldIndex);
+	}
+
+	@Override
+	public char getChar(int fieldIndex) throws HAWQException
+	{
+		checkFieldIndex(fieldIndex);
+		if (values[fieldIndex - 1] == null)
+			get(fieldIndex);
+
+		return super.getChar(fieldIndex);
+	}
+
+	@Override
+	public boolean isNull(int fieldIndex) throws HAWQException
+	{
+		checkFieldIndex(fieldIndex);
+		if (values[fieldIndex - 1] == null)
+			get(fieldIndex);
+
+		return (values[fieldIndex - 1] == null);
+	}
+
+	@Override
+	public Timestamp getTimestamp(int fieldIndex) throws HAWQException
+	{
+		checkFieldIndex(fieldIndex);
+		if (values[fieldIndex - 1] == null)
+			get(fieldIndex);
+
+		return super.getTimestamp(fieldIndex);
+	}
+
+	@Override
+	public Time getTime(int fieldIndex) throws HAWQException
+	{
+		checkFieldIndex(fieldIndex);
+		if (values[fieldIndex - 1] == null)
+			get(fieldIndex);
+
+		return super.getTime(fieldIndex);
+	}
+
+	@Override
+	public Date getDate(int fieldIndex) throws HAWQException
+	{
+		checkFieldIndex(fieldIndex);
+		if (values[fieldIndex - 1] == null)
+			get(fieldIndex);
+
+		return super.getDate(fieldIndex);
+	}
+
+	@Override
+	public BigDecimal getBigDecimal(int fieldIndex) throws HAWQException
+	{
+		checkFieldIndex(fieldIndex);
+		if (values[fieldIndex - 1] == null)
+			get(fieldIndex);
+
+		return super.getBigDecimal(fieldIndex);
+	}
+
+	@Override
+	public Array getArray(int fieldIndex) throws HAWQException
+	{
+		checkFieldIndex(fieldIndex);
+		if (values[fieldIndex - 1] == null)
+			get(fieldIndex);
+
+		return super.getArray(fieldIndex);
+	}
+
+	@Override
+	public HAWQBox getBox(int fieldIndex) throws HAWQException
+	{
+		checkFieldIndex(fieldIndex);
+		if (values[fieldIndex - 1] == null)
+			get(fieldIndex);
+
+		return super.getBox(fieldIndex);
+	}
+
+	@Override
+	public HAWQCircle getCircle(int fieldIndex) throws HAWQException
+	{
+		checkFieldIndex(fieldIndex);
+		if (values[fieldIndex - 1] == null)
+			get(fieldIndex);
+
+		return super.getCircle(fieldIndex);
+	}
+
+	@Override
+	public HAWQInterval getInterval(int fieldIndex) throws HAWQException
+	{
+		checkFieldIndex(fieldIndex);
+		if (values[fieldIndex - 1] == null)
+			get(fieldIndex);
+
+		return super.getInterval(fieldIndex);
+	}
+
+	@Override
+	public HAWQLseg getLseg(int fieldIndex) throws HAWQException
+	{
+		checkFieldIndex(fieldIndex);
+		if (values[fieldIndex - 1] == null)
+			get(fieldIndex);
+
+		return super.getLseg(fieldIndex);
+	}
+
+	@Override
+	public HAWQPath getPath(int fieldIndex) throws HAWQException
+	{
+		checkFieldIndex(fieldIndex);
+		if (values[fieldIndex - 1] == null)
+			get(fieldIndex);
+
+		return super.getPath(fieldIndex);
+	}
+
+	@Override
+	public HAWQPoint getPoint(int fieldIndex) throws HAWQException
+	{
+		checkFieldIndex(fieldIndex);
+		if (values[fieldIndex - 1] == null)
+			get(fieldIndex);
+
+		return super.getPoint(fieldIndex);
+	}
+
+	@Override
+	public HAWQPolygon getPolygon(int fieldIndex) throws HAWQException
+	{
+		checkFieldIndex(fieldIndex);
+		if (values[fieldIndex - 1] == null)
+			get(fieldIndex);
+
+		return super.getPolygon(fieldIndex);
+	}
+
+	@Override
+	public HAWQMacaddr getMacaddr(int fieldIndex) throws HAWQException
+	{
+		checkFieldIndex(fieldIndex);
+		if (values[fieldIndex - 1] == null)
+			get(fieldIndex);
+
+		return super.getMacaddr(fieldIndex);
+	}
+
+	@Override
+	public HAWQInet getInet(int fieldIndex) throws HAWQException
+	{
+		checkFieldIndex(fieldIndex);
+		if (values[fieldIndex - 1] == null)
+			get(fieldIndex);
+
+		return super.getInet(fieldIndex);
+	}
+
+	@Override
+	public HAWQCidr getCidr(int fieldIndex) throws HAWQException
+	{
+		checkFieldIndex(fieldIndex);
+		if (values[fieldIndex - 1] == null)
+			get(fieldIndex);
+
+		return super.getCidr(fieldIndex);
+	}
+
+	@Override
+	public HAWQVarbit getVarbit(int fieldIndex) throws HAWQException
+	{
+		checkFieldIndex(fieldIndex);
+		if (values[fieldIndex - 1] == null)
+			get(fieldIndex);
+
+		return super.getVarbit(fieldIndex);
+	}
+
+	public Object getObject(int fieldIndex) throws HAWQException
+	{
+		checkFieldIndex(fieldIndex);
+		if (values[fieldIndex - 1] == null)
+			get(fieldIndex);
+
+		return super.getObject(fieldIndex);
+	}
+
+	@Override
+	public void reset()
+	{
+		for (int i = 0; i < columnNum; ++i)
+		{
+			values[i] = null;
+			offsetOfEachColumn[i] = -1;
+			nullmap[i] = false;
+		}
+		nullmapExtraBytes = -1;
+	}
+
+	public void setMemtuples(byte[] memtuples, int start, int end)
+	{
+		this.memtuples = memtuples;
+		this.start = start;
+		this.end = end;
+	}
+
+	private void initFromSchema() throws HAWQException
+	{
+		for (int i = 0; i < columnNum; ++i)
+		{
+			HAWQField field = super.getSchema().getField(i + 1);
+			if (field.isArray())
+			{
+				++numOfOffset;
+				continue;
+			}
+
+			switch (schemaType[i])
+			{
+			case FLOAT8:
+			case INT8:
+			case TIME:
+			case TIMETZ:
+			case TIMESTAMP:
+			case TIMESTAMPTZ:
+			case LSEG:
+			case BOX:
+			case CIRCLE:
+			case INTERVAL:
+			case POINT:
+				++numOf8MoreBytes;
+				break;
+			case MACADDR:
+			case INT4:
+			case FLOAT4:
+			case DATE:
+				++numOf4MoreBytes;
+				break;
+			case INT2:
+				++numOf2Bytes;
+				break;
+			case BOOL:
+				++numOf1Byte;
+				break;
+			case TEXT:
+			case BPCHAR:
+			case VARCHAR:
+			case INET:
+			case CIDR:
+			case XML:
+			case NUMERIC:
+			case BYTEA:
+			case PATH:
+			case POLYGON:
+			case BIT:
+			case VARBIT:
+				++numOfOffset;
+				break;
+			default:
+				break;
+			}
+		}
+		typesOf8MoreBytes = new HAWQPrimitiveField.PrimitiveType[numOf8MoreBytes];
+		typesOf4MoreBytes = new HAWQPrimitiveField.PrimitiveType[numOf4MoreBytes];
+
+		int posIn8ByteTypes = 0, posIn4ByteTypes = 0;
+		for (int i = 0; i < columnNum; ++i)
+		{
+			HAWQField field = super.getSchema().getField(i + 1);
+			if (field.isArray())
+				continue;
+
+			switch (schemaType[i])
+			{
+			case FLOAT8:
+			case INT8:
+			case TIME:
+			case TIMETZ:
+			case TIMESTAMP:
+			case TIMESTAMPTZ:
+			case LSEG:
+			case BOX:
+			case CIRCLE:
+			case INTERVAL:
+			case POINT:
+				typesOf8MoreBytes[posIn8ByteTypes++] = schemaType[i];
+				break;
+			case MACADDR:
+			case INT4:
+			case FLOAT4:
+			case DATE:
+				typesOf4MoreBytes[posIn4ByteTypes++] = schemaType[i];
+				break;
+			default:
+				break;
+			}
+		}
+	}
+
+	private void get(int fieldIndex) throws HAWQException
+	{
+		if (offsetOfEachColumn[0] == -1)
+			initOffset();
+		int i = fieldIndex - 1;
+		int offset = offsetOfEachColumn[i];
+		if (offset == 0)
+		{
+			values[i] = null;
+			return;
+		}
+
+		HAWQField field = schema.getField(i + 1);
+		if (field.isArray())
+		{
+			int offset_array;
+			if (isLargeTup != 0)
+				offset_array = HAWQConvertUtil.bytesToInt(memtuples, offset);
+			else
+				offset_array = ((int) HAWQConvertUtil.bytesToShort(memtuples,
+						offset)) & 0x0000FFFF;
+			offset_array += start;
+			offset_array += nullmapExtraBytes;
+			values[i] = HAWQConvertUtil.bytesToArray(memtuples, schemaType[i],
+					offset_array);
+			return;
+		}
+
+		switch (schemaType[i])
+		{
+		case INT4:
+			values[i] = HAWQConvertUtil.bytesToInt(memtuples, offset);
+			break;
+		case INT8:
+			values[i] = HAWQConvertUtil.bytesToLong(memtuples, offset);
+			break;
+		case FLOAT4:
+			values[i] = HAWQConvertUtil.bytesToFloat(memtuples, offset);
+			break;
+		case FLOAT8:
+			values[i] = HAWQConvertUtil.bytesToDouble(memtuples, offset);
+			break;
+		case BOOL:
+			values[i] = HAWQConvertUtil.byteToBoolean(memtuples[offset]);
+			break;
+		case INT2:
+			values[i] = HAWQConvertUtil.bytesToShort(memtuples, offset);
+			break;
+		case TEXT:
+		case BPCHAR:
+		case VARCHAR:
+		case XML:
+			int offset_varchar;
+			if (isLargeTup != 0)
+				offset_varchar = HAWQConvertUtil.bytesToInt(memtuples, offset);
+			else
+				offset_varchar = ((int) HAWQConvertUtil.bytesToShort(memtuples,
+						offset)) & 0x0000FFFF;
+			offset_varchar += start;
+			offset_varchar += nullmapExtraBytes;
+			byte lead_String = memtuples[offset_varchar];
+			int offset_String,
+			length_String;
+			if ((lead_String & 0x80) != 0)
+			{
+				offset_String = offset_varchar + 1;
+				length_String = (lead_String & 0x7F) - 1;
+			}
+			else
+			{
+				offset_String = offset_varchar + 4;
+				length_String = Integer.reverseBytes(HAWQConvertUtil
+						.bytesToInt(memtuples, offset_varchar)) - 4;
+			}
+			try
+			{
+				values[i] = new String(memtuples, offset_String, length_String,
+						encoding);
+			}
+			catch (UnsupportedEncodingException e)
+			{
+				throw new HAWQException("Encoding " + encoding
+						+ " is not supported yet");
+			}
+			catch (StringIndexOutOfBoundsException e)
+			{
+				throw new HAWQException(
+						"Cannot convert bytes to string with offset is "
+								+ offset_String + " and length is "
+								+ length_String);
+			}
+			break;
+		case INET:
+			int offset_inet;
+			if (isLargeTup != 0)
+				offset_inet = HAWQConvertUtil.bytesToInt(memtuples, offset);
+			else
+				offset_inet = ((int) HAWQConvertUtil.bytesToShort(memtuples,
+						offset)) & 0x0000FFFF;
+			offset_inet += start;
+			offset_inet += nullmapExtraBytes;
+			values[i] = HAWQConvertUtil.bytesToInet(memtuples, offset_inet);
+			break;
+		case CIDR:
+			int offset_cidr;
+			if (isLargeTup != 0)
+				offset_cidr = HAWQConvertUtil.bytesToInt(memtuples, offset);
+			else
+				offset_cidr = ((int) HAWQConvertUtil.bytesToShort(memtuples,
+						offset)) & 0x0000FFFF;
+			offset_cidr += start;
+			offset_cidr += nullmapExtraBytes;
+			values[i] = HAWQConvertUtil.bytesToCidr(memtuples, offset_cidr);
+			break;
+		case NUMERIC:
+			int offset_numeric;
+			if (isLargeTup != 0)
+				offset_numeric = HAWQConvertUtil.bytesToInt(memtuples, offset);
+			else
+				offset_numeric = ((int) HAWQConvertUtil.bytesToShort(memtuples,
+						offset)) & 0x0000FFFF;
+			offset_numeric += start;
+			offset_numeric += nullmapExtraBytes;
+			values[i] = HAWQConvertUtil
+					.bytesToDecimal(memtuples, offset_numeric);
+			break;
+		case TIME:
+			values[i] = HAWQConvertUtil.toTime(
+					HAWQConvertUtil.bytesToLong(memtuples, offset));
+			break;
+		case TIMETZ:
+			values[i] = HAWQConvertUtil.toTimeTz(memtuples, offset);
+			break;
+		case TIMESTAMP:
+			values[i] = HAWQConvertUtil.toTimestamp(
+					HAWQConvertUtil.bytesToLong(memtuples, offset), false);
+			break;
+		case TIMESTAMPTZ:
+			values[i] = HAWQConvertUtil.toTimestamp(
+					HAWQConvertUtil.bytesToLong(memtuples, offset), true);
+			break;
+		case DATE:
+			values[i] = HAWQConvertUtil.toDate(
+					HAWQConvertUtil.bytesToInt(memtuples, offset));
+			break;
+		case BYTEA:
+			int offset_varbinary;
+			if (isLargeTup != 0)
+				offset_varbinary = HAWQConvertUtil
+						.bytesToInt(memtuples, offset);
+			else
+				offset_varbinary = ((int) HAWQConvertUtil.bytesToShort(
+						memtuples, offset)) & 0x0000FFFF;
+			offset_varbinary += start;
+			offset_varbinary += nullmapExtraBytes;
+			byte lead_Bytes = memtuples[offset_varbinary];
+			int offset_Bytes,
+			length_Bytes;
+			if ((lead_Bytes & 0x80) != 0)
+			{
+				offset_Bytes = offset_varbinary + 1;
+				length_Bytes = (lead_Bytes & 0x7F) - 1;
+			}
+			else
+			{
+				offset_Bytes = offset_varbinary + 4;
+				length_Bytes = Integer.reverseBytes(HAWQConvertUtil.bytesToInt(
+						memtuples, offset_varbinary)) - 4;
+			}
+
+			byte[] bytes = new byte[length_Bytes];
+			System.arraycopy(memtuples, offset_Bytes, bytes, 0, length_Bytes);
+			values[i] = bytes;
+			break;
+		case INTERVAL:
+			values[fieldIndex - 1] = HAWQConvertUtil.bytesToInterval(memtuples,
+					offset);
+			break;
+		case POINT:
+			double x = HAWQConvertUtil.bytesToDouble(memtuples, offset);
+			double y = HAWQConvertUtil.bytesToDouble(memtuples, offset + 8);
+			values[fieldIndex - 1] = new HAWQPoint(x, y);
+			break;
+		case LSEG:
+			double lseg_x_1 = HAWQConvertUtil.bytesToDouble(memtuples, offset);
+			double lseg_y_1 = HAWQConvertUtil.bytesToDouble(memtuples,
+					offset + 8);
+			double lseg_x_2 = HAWQConvertUtil.bytesToDouble(memtuples,
+					offset + 16);
+			double lseg_y_2 = HAWQConvertUtil.bytesToDouble(memtuples,
+					offset + 24);
+			values[fieldIndex - 1] = new HAWQLseg(lseg_x_1, lseg_y_1, lseg_x_2,
+					lseg_y_2);
+			break;
+		case BOX:
+			double box_x_1 = HAWQConvertUtil.bytesToDouble(memtuples, offset);
+			double box_y_1 = HAWQConvertUtil.bytesToDouble(memtuples,
+					offset + 8);
+			double box_x_2 = HAWQConvertUtil.bytesToDouble(memtuples,
+					offset + 16);
+			double box_y_2 = HAWQConvertUtil.bytesToDouble(memtuples,
+					offset + 24);
+			values[fieldIndex - 1] = new HAWQBox(box_x_1, box_y_1, box_x_2,
+					box_y_2);
+			break;
+		case CIRCLE:
+			double centerX = HAWQConvertUtil.bytesToDouble(memtuples, offset);
+			double centerY = HAWQConvertUtil.bytesToDouble(memtuples,
+					offset + 8);
+			double r = HAWQConvertUtil.bytesToDouble(memtuples, offset + 16);
+			values[fieldIndex - 1] = new HAWQCircle(centerX, centerY, r);
+			break;
+		case PATH:
+			int offset_path;
+			if (isLargeTup != 0)
+				offset_path = HAWQConvertUtil.bytesToInt(memtuples, offset);
+			else
+				offset_path = ((int) HAWQConvertUtil.bytesToShort(memtuples,
+						offset)) & 0x0000FFFF;
+			offset_path += start;
+			offset_path += nullmapExtraBytes;
+			values[fieldIndex - 1] = HAWQConvertUtil.bytesToPath(memtuples,
+					offset_path);
+			break;
+		case POLYGON:
+			int offset_polygon;
+			if (isLargeTup != 0)
+				offset_polygon = HAWQConvertUtil.bytesToInt(memtuples, offset);
+			else
+				offset_polygon = ((int) HAWQConvertUtil.bytesToShort(memtuples,
+						offset)) & 0x0000FFFF;
+			offset_polygon += start;
+			offset_polygon += nullmapExtraBytes;
+			values[fieldIndex - 1] = HAWQConvertUtil.bytesToPolygon(memtuples,
+					offset_polygon);
+			break;
+		case BIT:
+		case VARBIT:
+			int offser_varbit;
+			if (isLargeTup != 0)
+				offser_varbit = HAWQConvertUtil.bytesToInt(memtuples, offset);
+			else
+				offser_varbit = ((int) HAWQConvertUtil.bytesToShort(memtuples,
+						offset)) & 0x0000FFFF;
+			offser_varbit += start;
+			offser_varbit += nullmapExtraBytes;
+			values[fieldIndex - 1] = HAWQConvertUtil.bytesToVarbit(memtuples,
+					offser_varbit);
+			break;
+		case MACADDR:
+			values[fieldIndex - 1] = new HAWQMacaddr(memtuples, offset);
+			break;
+		default:
+			break;
+		}
+	}
+
+	private void initOffset() throws HAWQException
+	{
+		int posInMemTuples = start;
+		int lengthMemTuple = HAWQConvertUtil.bytesToInt(memtuples,
+				posInMemTuples);
+		posInMemTuples += 4;
+		if ((lengthMemTuple & 0x80000000) == 0)
+			throw new HAWQException("Fail to get length memtuple",
+					HAWQException.WRONGFILEFORMAT_EXCEPTION);
+		int tupleBytesNum = lengthMemTuple & 0x7FFFFFF8;
+		if (tupleBytesNum != end - start + 1)
+			throw new HAWQException("Different size of data tuples: "
+					+ tupleBytesNum + " and " + (end - start + 1),
+					HAWQException.WRONGFILEFORMAT_EXCEPTION);
+		int hasExternal = lengthMemTuple & 4;
+		if (hasExternal != 0)
+			throw new HAWQException("hasExternal bit of length tuple is not 0",
+					HAWQException.WRONGFILEFORMAT_EXCEPTION);
+
+		int hasNull = lengthMemTuple & 1;
+		int numOfNullmapBytes = 0;
+		if (hasNull != 0)
+		{
+			numOfNullmapBytes = ((columnNum - 1) / 32 + 1) * 4;
+			byte[] numDigit = new byte[numOfNullmapBytes];
+			for (int i = 0; i < numDigit.length; ++i)
+				numDigit[i] = memtuples[posInMemTuples++];
+
+			for (int i = 0; i < columnNum; ++i)
+			{
+				if (((numDigit[i / 8] >> (i % 8)) & 1) != 0)
+					nullmap[i] = true;
+				else
+					nullmap[i] = false;
+			}
+		}
+
+		isLargeTup = lengthMemTuple & 2;
+		int tempNumOf4Bytes, tempNumOf2Bytes;
+		if (isLargeTup != 0)
+		{
+			// offsets are 4 bytes
+			tempNumOf4Bytes = numOf4MoreBytes + numOfOffset;
+			tempNumOf2Bytes = numOf2Bytes;
+		}
+		else
+		{
+			// offsets are 2 bytes
+			tempNumOf4Bytes = numOf4MoreBytes;
+			tempNumOf2Bytes = numOf2Bytes + numOfOffset;
+		}
+
+		if (numOf8MoreBytes != 0)
+		{
+			/*
+			 * GPSQL-990
+			 * 
+			 * nullmapExtraBytes should be calculated even though all data that
+			 * 8 more bytes is null
+			 */
+			int length = 4 + numOfNullmapBytes;
+			if (length == 8)
+				// Null map need 4 bytes, size of null map and length
+				// memtuple is 8 and no need to fill up
+				nullmapExtraBytes = 0;
+			else if (length % 8 == 0)
+				// Null map is 12, 20, etc. no need to fill up. But
+				// there is some bytes more for null map
+				nullmapExtraBytes = numOfNullmapBytes - 4;
+			else
+			{
+				// null map is 0, 8, etc. need to fill up 4 bytes, so we
+				// skip this 4 bytes
+				posInMemTuples += 4;
+				nullmapExtraBytes = numOfNullmapBytes;
+			}
+		}
+		else
+		{
+			/*
+			 * GPSQL-941
+			 * 
+			 * If there is no 8 more bytes and there is null value in tuple, the
+			 * nullmapExtraBytes is equal to numOfNullmapBytes rather than 0
+			 */
+			nullmapExtraBytes = numOfNullmapBytes;
+		}
+
+		int pos = 0;
+		for (int i = 0; i < numOf8MoreBytes; ++i)
+		{
+			if (hasNull != 0 && nullmap[pos])
+			{
+				offsetOf8MoreBytes[i] = 0;
+			}
+			else
+			{
+				offsetOf8MoreBytes[i] = posInMemTuples;
+				switch (typesOf8MoreBytes[i])
+				{
+				case FLOAT8:
+				case INT8:
+				case TIME:
+				case TIMESTAMP:
+				case TIMESTAMPTZ:
+					posInMemTuples += 8;
+					break;
+				case TIMETZ:
+					/*
+					 * GPSQL-1047
+					 * 
+					 * For 64-bit os and db, timetz is aligned to 16 bits.
+					 */
+					if (isMac)
+						posInMemTuples += 12;
+					else
+					{
+						posInMemTuples += 16;
+						if (i == numOf8MoreBytes - 1)
+							posInMemTuples -= 4;
+					}
+					break;
+				case LSEG:
+				case BOX:
+					posInMemTuples += 32;
+					break;
+				case CIRCLE:
+					posInMemTuples += 24;
+					break;
+				case INTERVAL:
+				case POINT:
+					posInMemTuples += 16;
+					break;
+				default:
+					break;
+				}
+			}
+			++pos;
+		}
+		for (int i = 0; i < tempNumOf4Bytes; ++i)
+		{
+			if (hasNull != 0 && nullmap[pos])
+			{
+				offsetOf4MoreBytes[i] = 0;
+			}
+			else
+			{
+				offsetOf4MoreBytes[i] = posInMemTuples;
+				switch (typesOf4MoreBytes[i])
+				{
+				case MACADDR:
+					posInMemTuples += 8;
+					if (i == tempNumOf4Bytes - 1)
+						posInMemTuples -= 2;
+					break;
+				case INT4:
+				case FLOAT4:
+				case DATE:
+					posInMemTuples += 4;
+					break;
+				default:
+					break;
+				}
+			}
+			++pos;
+		}
+		for (int i = 0; i < tempNumOf2Bytes; ++i)
+		{
+			if (hasNull != 0 && nullmap[pos])
+			{
+				offsetOf2Bytes[i] = 0;
+			}
+			else
+			{
+				offsetOf2Bytes[i] = posInMemTuples;
+				posInMemTuples += 2;
+			}
+			++pos;
+		}
+		for (int i = 0; i < numOf1Byte; ++i)
+		{
+			if (hasNull != 0 && nullmap[pos])
+			{
+				offsetOf1Bytes[i] = 0;
+			}
+			else
+			{
+				offsetOf1Bytes[i] = posInMemTuples++;
+			}
+			++pos;
+		}
+
+		int posIn8Byte = 0, posIn4Byte = 0, posIn2Byte = 0, posIn1Byte = 0;
+		for (int i = 0; i < columnNum; ++i)
+		{
+			HAWQField field = super.getSchema().getField(i + 1);
+			if (field.isArray())
+			{
+				if (isLargeTup != 0)
+					offsetOfEachColumn[i] = offsetOf4MoreBytes[posIn4Byte++];
+				else
+					offsetOfEachColumn[i] = offsetOf2Bytes[posIn2Byte++];
+				continue;
+			}
+
+			switch (schemaType[i])
+			{
+			case INT8:
+			case FLOAT8:
+			case TIME:
+			case TIMETZ:
+			case TIMESTAMP:
+			case TIMESTAMPTZ:
+			case INTERVAL:
+			case POINT:
+			case LSEG:
+			case BOX:
+			case CIRCLE:
+				offsetOfEachColumn[i] = offsetOf8MoreBytes[posIn8Byte++];
+				break;
+			case MACADDR:
+			case INT4:
+			case FLOAT4:
+			case DATE:
+				offsetOfEachColumn[i] = offsetOf4MoreBytes[posIn4Byte++];
+				break;
+			case INT2:
+				offsetOfEachColumn[i] = offsetOf2Bytes[posIn2Byte++];
+				break;
+			case BOOL:
+				offsetOfEachColumn[i] = offsetOf1Bytes[posIn1Byte++];
+				break;
+			case PATH:
+			case POLYGON:
+			case VARCHAR:
+			case INET:
+			case CIDR:
+			case XML:
+			case BPCHAR:
+			case TEXT:
+			case BYTEA:
+			case NUMERIC:
+			case BIT:
+			case VARBIT:
+				if (isLargeTup != 0)
+					offsetOfEachColumn[i] = offsetOf4MoreBytes[posIn4Byte++];
+				else
+					offsetOfEachColumn[i] = offsetOf2Bytes[posIn2Byte++];
+				break;
+			default:
+				break;
+			}
+		}
+	}
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-hawq/blob/8b26974c/contrib/hawq-hadoop/hawq-mapreduce-ao/src/main/java/com/pivotal/hawq/mapreduce/ao/util/CompressZlib.java
----------------------------------------------------------------------
diff --git a/contrib/hawq-hadoop/hawq-mapreduce-ao/src/main/java/com/pivotal/hawq/mapreduce/ao/util/CompressZlib.java b/contrib/hawq-hadoop/hawq-mapreduce-ao/src/main/java/com/pivotal/hawq/mapreduce/ao/util/CompressZlib.java
new file mode 100644
index 0000000..3788644
--- /dev/null
+++ b/contrib/hawq-hadoop/hawq-mapreduce-ao/src/main/java/com/pivotal/hawq/mapreduce/ao/util/CompressZlib.java
@@ -0,0 +1,196 @@
+package com.pivotal.hawq.mapreduce.ao.util;
+
+import java.io.ByteArrayOutputStream;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStream;
+import java.util.zip.Deflater;
+import java.util.zip.DeflaterOutputStream;
+import java.util.zip.Inflater;
+import java.util.zip.InflaterInputStream;
+
+public final class CompressZlib
+{
+
+	/**
+	 * Compress
+	 * 
+	 * @param data
+	 *            data to compress
+	 * @param level
+	 *            compress level
+	 * @return byte[] data after compress
+	 */
+	public static byte[] compress(byte[] data, int level)
+	{
+		byte[] output = new byte[0];
+
+		Deflater compresser = new Deflater(level);
+
+		compresser.reset();
+		compresser.setInput(data);
+		compresser.finish();
+		ByteArrayOutputStream bos = new ByteArrayOutputStream(data.length);
+		try
+		{
+			byte[] buf = new byte[1024];
+			while (!compresser.finished())
+			{
+				int i = compresser.deflate(buf);
+				bos.write(buf, 0, i);
+			}
+			output = bos.toByteArray();
+		}
+		catch (Exception e)
+		{
+			output = data;
+			e.printStackTrace();
+		}
+		finally
+		{
+			try
+			{
+				bos.close();
+			}
+			catch (IOException e)
+			{
+				e.printStackTrace();
+			}
+		}
+		compresser.end();
+		return output;
+	}
+
+	/**
+	 * Compress
+	 * 
+	 * @param data
+	 *            data to compress
+	 * @param os
+	 *            output stream
+	 */
+	public static void compress(byte[] data, OutputStream os)
+	{
+		DeflaterOutputStream dos = new DeflaterOutputStream(os);
+
+		try
+		{
+			dos.write(data, 0, data.length);
+
+			dos.finish();
+
+			dos.flush();
+		}
+		catch (IOException e)
+		{
+			e.printStackTrace();
+		}
+	}
+
+	/**
+	 * uncompress
+	 * 
+	 * @param data
+	 *            data to uncompress
+	 * @return byte[] data after uncompress
+	 */
+	public static byte[] decompress(byte[] data)
+	{
+		byte[] output = new byte[0];
+
+		Inflater decompresser = new Inflater();
+		decompresser.reset();
+		decompresser.setInput(data);
+
+		ByteArrayOutputStream o = new ByteArrayOutputStream(data.length);
+		try
+		{
+			byte[] buf = new byte[1024];
+			while (!decompresser.finished())
+			{
+				int i = decompresser.inflate(buf);
+				o.write(buf, 0, i);
+			}
+			output = o.toByteArray();
+		}
+		catch (Exception e)
+		{
+			output = data;
+			e.printStackTrace();
+		}
+		finally
+		{
+			try
+			{
+				o.close();
+			}
+			catch (IOException e)
+			{
+				e.printStackTrace();
+			}
+		}
+
+		decompresser.end();
+		return output;
+	}
+
+	/**
+	 * uncompress
+	 * 
+	 * @param is
+	 *            input stream
+	 * @return byte array output stream
+	 */
+	public static ByteArrayOutputStream decompress(InputStream is)
+	{
+		InflaterInputStream iis = new InflaterInputStream(is);
+		ByteArrayOutputStream o = new ByteArrayOutputStream(1024);
+		try
+		{
+			int i = 1024;
+			byte[] buf = new byte[i];
+
+			while ((i = iis.read(buf, 0, i)) > 0)
+			{
+				o.write(buf, 0, i);
+			}
+
+		}
+		catch (IOException e)
+		{
+			e.printStackTrace();
+		}
+		return o;
+	}
+
+	/**
+	 * uncompress
+	 * 
+	 * @param is
+	 *            input stream
+	 * @param length
+	 * @return byte array output stream
+	 */
+	public static ByteArrayOutputStream decompress(InputStream is, int length)
+	{
+		InflaterInputStream iis = new InflaterInputStream(is);
+		ByteArrayOutputStream o = new ByteArrayOutputStream(1024);
+		try
+		{
+			int i = 1024;
+			byte[] buf = new byte[i];
+
+			while ((i = iis.read(buf, 0, length)) > 0)
+			{
+				o.write(buf, 0, i);
+				length -= i;
+			}
+
+		}
+		catch (IOException e)
+		{
+			e.printStackTrace();
+		}
+		return o;
+	}
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-hawq/blob/8b26974c/contrib/hawq-hadoop/hawq-mapreduce-ao/src/test/java/com/pivotal/hawq/mapreduce/ao/io/HAWQAOFileReaderTest.java
----------------------------------------------------------------------
diff --git a/contrib/hawq-hadoop/hawq-mapreduce-ao/src/test/java/com/pivotal/hawq/mapreduce/ao/io/HAWQAOFileReaderTest.java b/contrib/hawq-hadoop/hawq-mapreduce-ao/src/test/java/com/pivotal/hawq/mapreduce/ao/io/HAWQAOFileReaderTest.java
new file mode 100644
index 0000000..fe0b0f0
--- /dev/null
+++ b/contrib/hawq-hadoop/hawq-mapreduce-ao/src/test/java/com/pivotal/hawq/mapreduce/ao/io/HAWQAOFileReaderTest.java
@@ -0,0 +1,105 @@
+package com.pivotal.hawq.mapreduce.ao.io;
+
+public class HAWQAOFileReaderTest
+{
+
+	private static int printUsage()
+	{
+		System.out
+				.println("HAWQAOFileReaderTest <database_url> <table_name> <whetherToLong>");
+		return 2;
+	}
+
+	public static void main(String[] args) throws Exception
+	{
+		/*
+		if (args.length != 3)
+		{
+			System.exit(printUsage());
+		}
+
+		String db_url = args[0];
+		String table_name = args[1];
+		String whetherToLogStr = args[2];
+
+		Metadata metadata = new Metadata(db_url, null, "", table_name);
+		if (metadata.getTableType() != Database.TableType.AO_TABLE)
+			throw new HAWQException("Only ao is supported");
+		HAWQFileStatus[] fileAttributes = metadata.getFileStatus();
+
+		BufferedWriter bw = null;
+		boolean whetherToLog = whetherToLogStr.equals("Y");
+		if (whetherToLog)
+		{
+			bw = new BufferedWriter(new FileWriter(table_name + "_test"));
+		}
+
+		Configuration conf = new Configuration();
+		conf.addResource(new Path(
+				"/Users/wangj77/hadoop-2.0.2-alpha-gphd-2.0.1/etc/hadoop/hdfs-site.xml"));
+		conf.addResource(new Path(
+				"/home/ioformat/hdfs/etc/hadoop/hdfs-site.xml"));
+		conf.reloadConfiguration();
+
+		for (int i = 0; i < fileAttributes.length; i++)
+		{
+			String pathStr = fileAttributes[i].getFilePath();
+			long fileLength = fileAttributes[i].getFileLength();
+			HAWQAOFileStatus aofilestatus = (HAWQAOFileStatus) fileAttributes[i];
+			boolean checksum = aofilestatus.getChecksum();
+			String compressType = aofilestatus.getCompressType();
+			int blocksize = aofilestatus.getBlockSize();
+			Path path = new Path(pathStr);
+			HAWQAOSplit aosplit;
+			if (fileLength != 0)
+			{
+				FileSystem fs = path.getFileSystem(conf);
+				BlockLocation[] blkLocations = fs.getFileBlockLocations(
+						fs.getFileStatus(path), 0, fileLength);
+				// not splitable
+				aosplit = new HAWQAOSplit(path, 0, fileLength,
+						blkLocations[0].getHosts(), checksum, compressType,
+						blocksize);
+			}
+			else
+			{
+				// Create empty hosts array for zero length files
+				aosplit = new HAWQAOSplit(path, 0, fileLength, new String[0],
+						checksum, compressType, blocksize);
+			}
+
+			String tableEncoding = metadata.getTableEncoding();
+			HAWQSchema schema = metadata.getSchema();
+			String version = metadata.getVersion();
+
+			HAWQAOFileReader reader = new HAWQAOFileReader(conf, aosplit);
+
+			HAWQAORecord record = new HAWQAORecord(schema, tableEncoding,
+					version);
+
+			long begin = System.currentTimeMillis();
+			while (reader.readRecord(record))
+			{
+				if (whetherToLog)
+				{
+					int columnCount = record.getSchema().getFieldCount();
+					bw.write(record.getString(1));
+					for (int j = 2; j <= columnCount; j++)
+					{
+						bw.write("|");
+						bw.write(record.getString(j));
+					}
+					bw.write("\n");
+				}
+			}
+			long end = System.currentTimeMillis();
+			System.out.println("Time elapsed: " + (end - begin) + " for "
+					+ fileAttributes[i]);
+		}
+		if (whetherToLog)
+		{
+			bw.close();
+		}
+		*/
+	}
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-hawq/blob/8b26974c/contrib/hawq-hadoop/hawq-mapreduce-ao/src/test/java/com/pivotal/hawq/mapreduce/ao/io/HAWQAORecordTest.java
----------------------------------------------------------------------
diff --git a/contrib/hawq-hadoop/hawq-mapreduce-ao/src/test/java/com/pivotal/hawq/mapreduce/ao/io/HAWQAORecordTest.java b/contrib/hawq-hadoop/hawq-mapreduce-ao/src/test/java/com/pivotal/hawq/mapreduce/ao/io/HAWQAORecordTest.java
new file mode 100644
index 0000000..b21ff9a
--- /dev/null
+++ b/contrib/hawq-hadoop/hawq-mapreduce-ao/src/test/java/com/pivotal/hawq/mapreduce/ao/io/HAWQAORecordTest.java
@@ -0,0 +1,98 @@
+package com.pivotal.hawq.mapreduce.ao.io;
+
+import java.io.BufferedWriter;
+import java.io.FileWriter;
+import java.math.BigDecimal;
+import java.sql.Date;
+import java.sql.Time;
+import java.sql.Timestamp;
+import java.util.ArrayList;
+
+import com.pivotal.hawq.mapreduce.schema.HAWQField;
+import com.pivotal.hawq.mapreduce.schema.HAWQPrimitiveField;
+import com.pivotal.hawq.mapreduce.schema.HAWQSchema;
+import org.junit.*;
+
+import com.pivotal.hawq.mapreduce.HAWQException;
+
+public class HAWQAORecordTest
+{
+
+	@Test
+	public void TestGetColumnCount()
+	{
+		HAWQSchema schema = new HAWQSchema("aaa", HAWQSchema.optional_field(
+				HAWQPrimitiveField.PrimitiveType.INT4, "a"));
+		String encoding = new String("UTF8");
+		HAWQAORecord record;
+		try
+		{
+			record = new HAWQAORecord(schema, encoding, "on i386-apple-darwin");
+			record.getSchema().getFieldCount();
+		}
+		catch (HAWQException e)
+		{
+			e.printStackTrace();
+		}
+	}
+
+	public static void main(String[] args) throws Exception
+	{
+		ArrayList<HAWQField> fields = new ArrayList<HAWQField>();
+		fields.add(HAWQSchema.optional_field(
+				HAWQPrimitiveField.PrimitiveType.INT2, "a"));
+		fields.add(HAWQSchema.optional_field(
+				HAWQPrimitiveField.PrimitiveType.FLOAT4, "b"));
+		fields.add(HAWQSchema.optional_field(
+				HAWQPrimitiveField.PrimitiveType.NUMERIC, "c"));
+		fields.add(HAWQSchema.optional_field(
+				HAWQPrimitiveField.PrimitiveType.VARCHAR, "d"));
+		fields.add(HAWQSchema.optional_field(
+				HAWQPrimitiveField.PrimitiveType.BOOL, "e"));
+		fields.add(HAWQSchema.optional_field(
+				HAWQPrimitiveField.PrimitiveType.DATE, "f"));
+		fields.add(HAWQSchema.optional_field(
+				HAWQPrimitiveField.PrimitiveType.INT8, "g"));
+		fields.add(HAWQSchema.optional_field(
+				HAWQPrimitiveField.PrimitiveType.TIME, "h"));
+		fields.add(HAWQSchema.optional_field(
+				HAWQPrimitiveField.PrimitiveType.TIMESTAMP, "i"));
+		fields.add(HAWQSchema.optional_field(
+				HAWQPrimitiveField.PrimitiveType.FLOAT8, "j"));
+		fields.add(HAWQSchema.optional_field(
+				HAWQPrimitiveField.PrimitiveType.BYTEA, "l"));
+		HAWQAORecord record = new HAWQAORecord(
+				new HAWQSchema("alltype", fields), "UTF8", "on i386-apple-darwin");
+
+		record.setShort(1, (short) 1);
+		record.setFloat(2, (float) 2.3);
+		record.setBigDecimal(3, new BigDecimal("3452345.234234"));
+		record.setString(4, "sdfgsgrea");
+		record.setBoolean(5, true);
+		record.setDate(6, new Date(1323434234355555449l));
+		record.setLong(7, 4);
+		record.setTime(8, new Time(1323434234355555449l));
+		record.setTimestamp(9, new Timestamp(1323434234355555449l));
+		record.setDouble(10, 5.6);
+		String temp = "dfgsfgs";
+		record.setBytes(11, temp.getBytes());
+
+		BufferedWriter bw = new BufferedWriter(new FileWriter(
+				"HAWQAORecordTest"));
+		long begin = System.currentTimeMillis();
+		for (int i = 0; i < 8388608; i++)
+		{
+			int columnCount = record.getSchema().getFieldCount();
+			bw.write(record.getString(1));
+			for (int j = 2; j <= columnCount; j++)
+			{
+				bw.write("|");
+				bw.write(record.getString(j));
+			}
+			bw.write("\n");
+		}
+		long end = System.currentTimeMillis();
+		System.out.println("Time elapsed: " + (end - begin));
+		bw.close();
+	}
+}

http://git-wip-us.apache.org/repos/asf/incubator-hawq/blob/8b26974c/contrib/hawq-hadoop/hawq-mapreduce-ao/test-aoFileReader.sh
----------------------------------------------------------------------
diff --git a/contrib/hawq-hadoop/hawq-mapreduce-ao/test-aoFileReader.sh b/contrib/hawq-hadoop/hawq-mapreduce-ao/test-aoFileReader.sh
new file mode 100755
index 0000000..3223842
--- /dev/null
+++ b/contrib/hawq-hadoop/hawq-mapreduce-ao/test-aoFileReader.sh
@@ -0,0 +1,20 @@
+#!/bin/sh
+# A script to run the tests
+# test-startup.sh
+
+if [ $# -ne 3 ]
+then
+  echo "#USAGE: ./test-aoFileReader.sh <database> <tablename> <whetherToLog>"
+  exit
+fi
+
+SRC_JAR_NAME=hawq-mapreduce-ao-1.0.0
+COMMON_JAR_NAME=../hawq-mapreduce-common/target/hawq-mapreduce-common-1.0.0
+
+export HADOOP_CLASSPATH=lib/postgresql-9.2-1003-jdbc4.jar:target/${SRC_JAR_NAME}.jar:target/${SRC_JAR_NAME}-tests.jar:${COMMON_JAR_NAME}.jar:${HADOOP_CLASSPATH}
+
+DB_NAME=$1
+DB_URL=localhost:5432/${DB_NAME}
+TABLE_NAME=$2
+
+time hadoop jar target/${SRC_JAR_NAME}-tests.jar com.pivotal.hawq.mapreduce.ao.io.HAWQAOFileReaderTest ${DB_URL} ${TABLE_NAME} $3

http://git-wip-us.apache.org/repos/asf/incubator-hawq/blob/8b26974c/contrib/hawq-hadoop/hawq-mapreduce-ao/test-record.sh
----------------------------------------------------------------------
diff --git a/contrib/hawq-hadoop/hawq-mapreduce-ao/test-record.sh b/contrib/hawq-hadoop/hawq-mapreduce-ao/test-record.sh
new file mode 100755
index 0000000..60b16bc
--- /dev/null
+++ b/contrib/hawq-hadoop/hawq-mapreduce-ao/test-record.sh
@@ -0,0 +1,10 @@
+#!/bin/sh
+# A script to run the tests
+# test-startup.sh
+
+SRC_JAR_NAME=hawq-mapreduce-ao-1.0.0
+COMMON_JAR_NAME=../hawq-mapreduce-common/target/hawq-mapreduce-common-1.0.0
+
+export HADOOP_CLASSPATH=lib/postgresql-9.2-1003-jdbc4.jar:target/${SRC_JAR_NAME}.jar:target/${SRC_JAR_NAME}-tests.jar:${COMMON_JAR_NAME}.jar:${HADOOP_CLASSPATH}
+
+time hadoop jar target/${SRC_JAR_NAME}-tests.jar com.pivotal.hawq.mapreduce.ao.io.HAWQAORecordTest

http://git-wip-us.apache.org/repos/asf/incubator-hawq/blob/8b26974c/contrib/hawq-hadoop/hawq-mapreduce-common/.gitignore
----------------------------------------------------------------------
diff --git a/contrib/hawq-hadoop/hawq-mapreduce-common/.gitignore b/contrib/hawq-hadoop/hawq-mapreduce-common/.gitignore
new file mode 100644
index 0000000..2f7896d
--- /dev/null
+++ b/contrib/hawq-hadoop/hawq-mapreduce-common/.gitignore
@@ -0,0 +1 @@
+target/

http://git-wip-us.apache.org/repos/asf/incubator-hawq/blob/8b26974c/contrib/hawq-hadoop/hawq-mapreduce-common/pom.xml
----------------------------------------------------------------------
diff --git a/contrib/hawq-hadoop/hawq-mapreduce-common/pom.xml b/contrib/hawq-hadoop/hawq-mapreduce-common/pom.xml
new file mode 100644
index 0000000..7a6f6c5
--- /dev/null
+++ b/contrib/hawq-hadoop/hawq-mapreduce-common/pom.xml
@@ -0,0 +1,51 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<project xmlns="http://maven.apache.org/POM/4.0.0"
+         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+    <parent>
+        <artifactId>hawq-hadoop</artifactId>
+        <groupId>com.pivotal.hawq</groupId>
+        <version>1.1.0</version>
+        <relativePath>../pom.xml</relativePath>
+    </parent>
+    <modelVersion>4.0.0</modelVersion>
+
+    <artifactId>hawq-mapreduce-common</artifactId>
+    <packaging>jar</packaging>
+
+    <dependencies>
+        <dependency>
+            <groupId>org.codehaus.modello</groupId>
+            <artifactId>modello-plugin-snakeyaml</artifactId>
+            <version>1.8.1</version>
+            <exclusions>
+                <exclusion>
+                    <groupId>org.codehaus.plexus</groupId>
+                    <artifactId>plexus-container-default</artifactId>
+                </exclusion>
+            </exclusions>
+        </dependency>
+        <dependency>
+            <groupId>org.postgresql</groupId>
+            <artifactId>postgresql</artifactId>
+            <version>9.2-1003-jdbc4</version>
+        </dependency>
+    </dependencies>
+
+    <build>
+        <plugins>
+            <plugin>
+                <groupId>com.atlassian.maven.plugins</groupId>
+                <artifactId>maven-clover2-plugin</artifactId>
+                <configuration>
+                    <licenseLocation>../lib/clover.license</licenseLocation>
+                    <excludes>
+                        <exclude>**/HAWQJdbcUtils.java</exclude>
+                        <exclude>**/MetadataSQLAccessor.java</exclude>
+                    </excludes>
+                </configuration>
+            </plugin>
+        </plugins>
+    </build>
+
+</project>
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-hawq/blob/8b26974c/contrib/hawq-hadoop/hawq-mapreduce-common/src/main/java/com/pivotal/hawq/mapreduce/HAWQException.java
----------------------------------------------------------------------
diff --git a/contrib/hawq-hadoop/hawq-mapreduce-common/src/main/java/com/pivotal/hawq/mapreduce/HAWQException.java b/contrib/hawq-hadoop/hawq-mapreduce-common/src/main/java/com/pivotal/hawq/mapreduce/HAWQException.java
new file mode 100644
index 0000000..c722137
--- /dev/null
+++ b/contrib/hawq-hadoop/hawq-mapreduce-common/src/main/java/com/pivotal/hawq/mapreduce/HAWQException.java
@@ -0,0 +1,50 @@
+package com.pivotal.hawq.mapreduce;
+
+@SuppressWarnings("serial")
+public class HAWQException extends Exception {
+
+    private int etype;
+
+    public static final int DEFAULT_EXCEPTION = 0;
+    public static final int WRONGFILEFORMAT_EXCEPTION = 1;
+
+    public HAWQException() {
+    }
+
+	/**
+	 * Constructor
+	 * 
+	 * @param emsg
+	 *            error message
+	 */
+    public HAWQException(String emsg) {
+        super(emsg);
+    }
+
+	/**
+	 * Constructor
+	 * 
+	 * @param emsg
+	 *            error message
+	 * @param etype
+	 *            0 is default type, and 1 means wrong file format exception
+	 */
+    public HAWQException(String emsg, int etype) {
+        super(emsg);
+        this.etype = etype;
+    }
+
+    @Override
+    public String getMessage() {
+        StringBuffer buffer = new StringBuffer();
+        switch (etype) {
+            case WRONGFILEFORMAT_EXCEPTION:
+                buffer.append("Wrong File Format: ");
+                buffer.append(super.getMessage());
+                return buffer.toString();
+            default:
+                buffer.append(super.getMessage());
+                return buffer.toString();
+        }
+    }
+}